X-Git-Url: http://git.treefish.org/~alex/photosort.git/blobdiff_plain/9abf01c0adb9b5e7b0d74da76bf7f86b49e30cb6..4ff31898a4b6230a0d68f98a11e8cd19e361c162:/src/bunch.py diff --git a/src/bunch.py b/src/bunch.py index 2ac33ca..5bc3edd 100644 --- a/src/bunch.py +++ b/src/bunch.py @@ -1,4 +1,5 @@ import logging +import os import time import threading @@ -7,56 +8,67 @@ from migrator import Migrator class Bunch: class TriggeredSource: - def __init__(self, trigger, migrator, cleanup): + def __init__(self, idx, trigger, migrator, cleanup): + self.idx = idx self.trigger = trigger self.migrator = migrator self.cleanup = cleanup - def __init__(self, cfg): - self._id = cfg['id'] + def __init__(self, idx, cache_dir, cfg): + self._idx = idx + source_idx = 1 self._sources = [] for src_dir_cfg in cfg['src_dirs']: self._sources.append( Bunch.TriggeredSource( - DirTrigger(src_dir_cfg['path'], src_dir_cfg['cool_time'], src_dir_cfg['max_time']), - Migrator(src_dir_cfg['path'], cfg['dst_dir']['path']), + source_idx, + DirTrigger(src_dir_cfg['path'], src_dir_cfg['cool_time'], + src_dir_cfg['max_time']), + Migrator(src_dir_cfg['path'], cfg['dst_dir']['path'], + os.path.join(cache_dir, "%d.db" % idx)), src_dir_cfg['cleanup'] ) ) - logging.info("Created bunch %s with %d sources.", self._id, len(self._sources)) + source_idx += 1 + logging.info("Created bunch #%d with %d sources.", self._idx, len(self._sources)) def start(self): - logging.info("Starting bunch %s...", self._id) + logging.info("Starting bunch #%d...", self._idx) self._stop = False self._worker_thread = threading.Thread(target=self._worker) self._worker_thread.start() def stop(self): - logging.info("Stopping bunch %s...", self._id) + logging.info("Stopping bunch #%d...", self._idx) self._stop = True self._worker_thread.join() - logging.info("Stopped bunch %s.", self._id) + for src in self._sources: + src.migrator.close() + logging.info("Stopped bunch #%d.", self._idx) def is_running(self): return self._worker_thread.is_alive() def _worker(self): - for source in self._sources: - source.trigger.start() + for src in self._sources: + src.trigger.start() while not self._stop: - for source in self._sources: + for src in self._sources: try: - if source.trigger.is_triggering(): - logging.info("Got source trigger for bunch %s.", self._id) - source.trigger.reset() - source.migrator.migrate(source.cleanup) - if source.cleanup: - source.migrator.cleanup() + if src.trigger.is_triggering(): + logging.info("Got trigger for source #%d.%d.", self._idx, src.idx) + before = time.time() + src.trigger.reset() + src.migrator.migrate(src.cleanup) + if src.cleanup: + src.migrator.cleanup() + logging.info("Migration took %.1fs for source #%d.%d.", + time.time() - before, self._idx, src.idx) except Exception as e: - logging.error("Error migrating source for bunch %s: %s", - self._id, str(e)) + logging.error("Error migrating source #%d.%d: %s", + self._idx, src.idx, str(e)) time.sleep(10.0) - for source in self._sources: - source.trigger.stop() + for src in self._sources: + src.trigger.stop()