self._worker_thread.join()
logging.info("Stopped bunch %s.", self._id)
+ def is_running(self):
+ return self._worker_thread.is_alive()
+
def _worker(self):
- # start triggers
+ for source in self._sources:
+ source.trigger.start()
while not self._stop:
for source in self._sources:
if source.trigger.is_triggering():
logging.info("Got source trigger for bunch %s.", self._id)
source.trigger.reset()
- source.migrator.migrate()
+ source.migrator.migrate(source.cleanup)
if source.cleanup:
source.migrator.cleanup()
except Exception as e:
self._id, str(e))
time.sleep(10.0)
- # stop triggers
+ for source in self._sources:
+ source.trigger.stop()
-class DirTrigger:
+import threading
+import time
+from watchdog.events import FileSystemEventHandler
+from watchdog.observers import Observer
+
+class DirTrigger(FileSystemEventHandler):
+ COOL_TIME = 10
+ MAX_TIME = 86400
+
def __init__(self, dir_path):
- pass
+ self._got_event = False
+ self._last_event_time = 0.0
+ self._last_reset_time = 0.0
+ self._lock = threading.Lock()
+ self._observer = Observer()
+ self._observer.schedule(self, path=dir_path, recursive=True)
+
+ def start(self):
+ self._observer.start()
+
+ def stop(self):
+ self._observer.stop()
def is_triggering(self):
- return False
+ self._lock.acquire()
+ try:
+ return ( self._got_event and
+ (time.time() - self._last_event_time > DirTrigger.COOL_TIME) ) or \
+ time.time() - self._last_reset_time > DirTrigger.MAX_TIME
+ finally:
+ self._lock.release()
+
+ def reset(self):
+ self._got_event = False
+ self._last_reset_time = time.time()
+
+ def on_any_event(self, event):
+ self._lock.acquire()
+ try:
+ self._got_event = True
+ self._last_event_time = time.time()
+ finally:
+ self._lock.release()