for src_dir_cfg in cfg['src_dirs']:
self._sources.append(
Bunch.TriggeredSource(
- DirTrigger(src_dir_cfg['path']),
+ DirTrigger(src_dir_cfg['path'], src_dir_cfg['cool_time'], src_dir_cfg['max_time']),
Migrator(src_dir_cfg['path'], cfg['dst_dir']['path']),
src_dir_cfg['cleanup']
)
self._worker_thread.join()
logging.info("Stopped bunch %s.", self._id)
+ def is_running(self):
+ return self._worker_thread.is_alive()
+
def _worker(self):
- # start triggers
+ for source in self._sources:
+ source.trigger.start()
while not self._stop:
for source in self._sources:
if source.trigger.is_triggering():
logging.info("Got source trigger for bunch %s.", self._id)
source.trigger.reset()
- source.migrator.migrate()
+ source.migrator.migrate(source.cleanup)
if source.cleanup:
source.migrator.cleanup()
except Exception as e:
self._id, str(e))
time.sleep(10.0)
- # stop triggers
+ for source in self._sources:
+ source.trigger.stop()