6 from dirtrigger import DirTrigger
7 from migrator import Migrator
10 class TriggeredSource:
11 def __init__(self, idx, trigger, migrator, cleanup):
13 self.trigger = trigger
14 self.migrator = migrator
15 self.cleanup = cleanup
17 def __init__(self, idx, cache_dir, cfg):
21 for src_dir_cfg in cfg['src_dirs']:
23 reg_db = os.path.join(cache_dir, "reg_%d_%d.db" % (idx, source_idx))
24 Bunch.TriggeredSource(
26 DirTrigger(src_dir_cfg['path'], src_dir_cfg['cool_time'],
27 src_dir_cfg['max_time']),
28 Migrator(src_dir_cfg['path'], cfg['dst_dir']['path'], reg_db),
29 src_dir_cfg['cleanup']
33 logging.info("Created bunch #%d with %d sources.", self._idx, len(self._sources))
36 logging.info("Starting bunch #%d...", self._idx)
38 self._worker_thread = threading.Thread(target=self._worker)
39 self._worker_thread.start()
42 logging.info("Stopping bunch #%d...", self._idx)
44 self._worker_thread.join()
45 logging.info("Stopped bunch #%d.", self._idx)
48 return self._worker_thread.is_alive()
51 for src in self._sources:
55 for src in self._sources:
57 if src.trigger.is_triggering():
58 logging.info("Got trigger for source #%d.%d.", self._idx, src.idx)
61 src.migrator.migrate(src.cleanup)
63 src.migrator.cleanup()
64 logging.info("Migration took %.1fs for source #%d.%d.",
65 time.time() - before, self._idx, src.idx)
66 except Exception as e:
67 logging.error("Error migrating source #%d.%d: %s",
68 self._idx, src.idx, str(e))
71 for src in self._sources: