X-Git-Url: http://git.treefish.org/~alex/photosort.git/blobdiff_plain/224556236ecd0f48f461738bc8157a82f5009618..ef745f5c8e8191f8ff341cb6fcf39179e34eb2bd:/src/bunch.py?ds=sidebyside diff --git a/src/bunch.py b/src/bunch.py new file mode 100644 index 0000000..713a618 --- /dev/null +++ b/src/bunch.py @@ -0,0 +1,57 @@ +import logging +import time +import threading + +from dirtrigger import DirTrigger +from migrator import Migrator + +class Bunch: + class TriggeredSource: + def __init__(self, trigger, migrator, cleanup): + self.trigger = trigger + self.migrator = migrator + self.cleanup = cleanup + + def __init__(self, cfg): + self._id = cfg['id'] + self._sources = [] + for src_dir_cfg in cfg['src_dirs']: + self._sources.append( + Bunch.TriggeredSource( + DirTrigger(src_dir_cfg['path']), + Migrator(src_dir_cfg['path'], cfg['dst_dir']['path']), + src_dir_cfg['cleanup'] + ) + ) + logging.info("Created bunch %s with %d sources.", self._id, len(self._sources)) + + def start(self): + logging.info("Starting bunch %s...", self._id) + self._stop = False + self._worker_thread = threading.Thread(target=self._worker) + self._worker_thread.start() + + def stop(self): + logging.info("Stopping bunch %s...", self._id) + self._stop = True + self._worker_thread.join() + logging.info("Stopped bunch %s.", self._id) + + def _worker(self): + # start triggers + + while not self._stop: + for source in self._sources: + try: + if source.trigger.is_triggering(): + logging.info("Got source trigger for bunch %s.", self._id) + source.trigger.reset() + source.migrator.migrate() + if source.cleanup: + source.migrator.cleanup() + except Exception as e: + logging.error("Error migrating source for bunch %s: %s", + self._id, str(e)) + time.sleep(10.0) + + # stop triggers