+import logging
+import time
+import threading
+
+from dirtrigger import DirTrigger
+from migrator import Migrator
+
+class Bunch:
+ class TriggeredSource:
+ def __init__(self, trigger, migrator, cleanup):
+ self.trigger = trigger
+ self.migrator = migrator
+ self.cleanup = cleanup
+
+ def __init__(self, cfg):
+ self._id = cfg['id']
+ self._sources = []
+ for src_dir_cfg in cfg['src_dirs']:
+ self._sources.append(
+ Bunch.TriggeredSource(
+ DirTrigger(src_dir_cfg['path']),
+ Migrator(src_dir_cfg['path'], cfg['dst_dir']['path']),
+ src_dir_cfg['cleanup']
+ )
+ )
+ logging.info("Created bunch %s with %d sources.", self._id, len(self._sources))
+
+ def start(self):
+ logging.info("Starting bunch %s...", self._id)
+ self._stop = False
+ self._worker_thread = threading.Thread(target=self._worker)
+ self._worker_thread.start()
+
+ def stop(self):
+ logging.info("Stopping bunch %s...", self._id)
+ self._stop = True
+ self._worker_thread.join()
+ logging.info("Stopped bunch %s.", self._id)
+
+ def _worker(self):
+ # start triggers
+
+ while not self._stop:
+ for source in self._sources:
+ try:
+ if source.trigger.is_triggering():
+ logging.info("Got source trigger for bunch %s.", self._id)
+ source.trigger.reset()
+ source.migrator.migrate()
+ if source.cleanup:
+ source.migrator.cleanup()
+ except Exception as e:
+ logging.error("Error migrating source for bunch %s: %s",
+ self._id, str(e))
+ time.sleep(10.0)
+
+ # stop triggers