X-Git-Url: http://git.treefish.org/~alex/photosort.git/blobdiff_plain/ef745f5c8e8191f8ff341cb6fcf39179e34eb2bd..5b9ec26e3f502c56d73d11cafb8b411541db89d5:/src/bunch.py?ds=sidebyside diff --git a/src/bunch.py b/src/bunch.py index 713a618..689d9b0 100644 --- a/src/bunch.py +++ b/src/bunch.py @@ -7,51 +7,67 @@ from migrator import Migrator class Bunch: class TriggeredSource: - def __init__(self, trigger, migrator, cleanup): + def __init__(self, idx, trigger, migrator, cleanup): + self.idx = idx self.trigger = trigger self.migrator = migrator self.cleanup = cleanup - def __init__(self, cfg): - self._id = cfg['id'] + def __init__(self, idx, cfg): + self._idx = idx + source_idx = 1 self._sources = [] for src_dir_cfg in cfg['src_dirs']: self._sources.append( Bunch.TriggeredSource( - DirTrigger(src_dir_cfg['path']), - Migrator(src_dir_cfg['path'], cfg['dst_dir']['path']), + source_idx, + DirTrigger(src_dir_cfg['path'], src_dir_cfg['cool_time'], + src_dir_cfg['max_time']), + Migrator(src_dir_cfg['path'], cfg['dst_dir']['path'], + os.path.join(cfg['cache_dir'], "%d.db" % idx)), src_dir_cfg['cleanup'] ) ) - logging.info("Created bunch %s with %d sources.", self._id, len(self._sources)) + source_idx += 1 + logging.info("Created bunch #%d with %d sources.", self._idx, len(self._sources)) def start(self): - logging.info("Starting bunch %s...", self._id) + logging.info("Starting bunch #%d...", self._idx) self._stop = False self._worker_thread = threading.Thread(target=self._worker) self._worker_thread.start() def stop(self): - logging.info("Stopping bunch %s...", self._id) + logging.info("Stopping bunch #%d...", self._idx) self._stop = True self._worker_thread.join() - logging.info("Stopped bunch %s.", self._id) + for src in self._sources: + src.migrator.close() + logging.info("Stopped bunch #%d.", self._idx) + + def is_running(self): + return self._worker_thread.is_alive() def _worker(self): - # start triggers + for src in self._sources: + src.trigger.start() while not self._stop: - for source in self._sources: + for src in self._sources: try: - if source.trigger.is_triggering(): - logging.info("Got source trigger for bunch %s.", self._id) - source.trigger.reset() - source.migrator.migrate() - if source.cleanup: - source.migrator.cleanup() + if src.trigger.is_triggering(): + logging.info("Got trigger for source #%d.%d.", self._idx, src.idx) + before = time.time() + src.trigger.reset() + src.migrator.migrate(src.cleanup) + if src.cleanup: + src.migrator.cleanup() + logging.info("Migration took %.1fs for source #%d.%d.", + time.time() - before, self._idx, src.idx) except Exception as e: - logging.error("Error migrating source for bunch %s: %s", - self._id, str(e)) + logging.error("Error migrating source #%d.%d: %s", + self._idx, src.idx, str(e)) time.sleep(10.0) - # stop triggers + for src in self._sources: + src.trigger.stop()