5 from dirtrigger import DirTrigger
6 from migrator import Migrator
10 def __init__(self, trigger, migrator, cleanup):
11 self.trigger = trigger
12 self.migrator = migrator
13 self.cleanup = cleanup
15 def __init__(self, cfg):
18 for src_dir_cfg in cfg['src_dirs']:
20 Bunch.TriggeredSource(
21 DirTrigger(src_dir_cfg['path'], src_dir_cfg['cool_time'], src_dir_cfg['max_time']),
22 Migrator(src_dir_cfg['path'], cfg['dst_dir']['path']),
23 src_dir_cfg['cleanup']
26 logging.info("Created bunch %s with %d sources.", self._id, len(self._sources))
29 logging.info("Starting bunch %s...", self._id)
31 self._worker_thread = threading.Thread(target=self._worker)
32 self._worker_thread.start()
35 logging.info("Stopping bunch %s...", self._id)
37 self._worker_thread.join()
38 logging.info("Stopped bunch %s.", self._id)
41 return self._worker_thread.is_alive()
44 for source in self._sources:
45 source.trigger.start()
48 for source in self._sources:
50 if source.trigger.is_triggering():
51 logging.info("Got source trigger for bunch %s.", self._id)
52 source.trigger.reset()
53 source.migrator.migrate(source.cleanup)
55 source.migrator.cleanup()
56 except Exception as e:
57 logging.error("Error migrating source for bunch %s: %s",
61 for source in self._sources: