5 from dirtrigger import DirTrigger
6 from migrator import Migrator
10 def __init__(self, idx, trigger, migrator, cleanup):
12 self.trigger = trigger
13 self.migrator = migrator
14 self.cleanup = cleanup
16 def __init__(self, idx, cfg):
20 for src_dir_cfg in cfg['src_dirs']:
22 Bunch.TriggeredSource(
24 DirTrigger(src_dir_cfg['path'], src_dir_cfg['cool_time'],
25 src_dir_cfg['max_time']),
26 Migrator(src_dir_cfg['path'], cfg['dst_dir']['path'],
27 os.path.join(cfg['cache_dir'], "%d.db" % idx)),
28 src_dir_cfg['cleanup']
32 logging.info("Created bunch #%d with %d sources.", self._idx, len(self._sources))
35 logging.info("Starting bunch #%d...", self._idx)
37 self._worker_thread = threading.Thread(target=self._worker)
38 self._worker_thread.start()
41 logging.info("Stopping bunch #%d...", self._idx)
43 self._worker_thread.join()
44 for src in self._sources:
46 logging.info("Stopped bunch #%d.", self._idx)
49 return self._worker_thread.is_alive()
52 for src in self._sources:
56 for src in self._sources:
58 if src.trigger.is_triggering():
59 logging.info("Got trigger for source #%d.%d.", self._idx, src.idx)
62 src.migrator.migrate(src.cleanup)
64 src.migrator.cleanup()
65 logging.info("Migration took %.1fs for source #%d.%d.",
66 time.time() - before, self._idx, src.idx)
67 except Exception as e:
68 logging.error("Error migrating source #%d.%d: %s",
69 self._idx, src.idx, str(e))
72 for src in self._sources: