4 from watchdog.events import DirDeletedEvent, FileDeletedEvent, FileSystemEventHandler
5 from watchdog.observers import Observer
7 class DirTrigger(FileSystemEventHandler):
9 def __init__(self, dir_path, cool_time, max_time):
10 self._dir_path = dir_path
11 self._cool_time = cool_time
12 self._max_time = max_time
13 self._got_event = False
14 self._last_event_time = 0.0
15 self._last_reset_time = 0.0
16 self._lock = threading.Lock()
17 self._observer = Observer()
18 self._observer.schedule(self, path=dir_path, recursive=True)
22 self._observer.start()
23 except Exception as e:
24 logging.warning("Error starting file observer for %s: %s", self._dir_path, str(e))
29 def is_triggering(self):
33 return ( self._got_event and
34 (now - self._last_event_time > self._cool_time) ) or \
35 now - self._last_reset_time > self._max_time
42 self._got_event = False
43 self._last_reset_time = time.time()
47 def on_any_event(self, event):
48 if event.event_type == FileDeletedEvent or event.event_type == DirDeletedEvent:
53 self._got_event = True
54 self._last_event_time = time.time()