self._worker_thread.join()
logging.info("Stopped bunch %s.", self._id)
+ def is_running(self):
+ return self._worker_thread.is_alive()
+
def _worker(self):
- # start triggers
+ for source in self._sources:
+ source.trigger.start()
while not self._stop:
for source in self._sources:
if source.trigger.is_triggering():
logging.info("Got source trigger for bunch %s.", self._id)
source.trigger.reset()
- source.migrator.migrate()
+ source.migrator.migrate(source.cleanup)
if source.cleanup:
source.migrator.cleanup()
except Exception as e:
self._id, str(e))
time.sleep(10.0)
- # stop triggers
+ for source in self._sources:
+ source.trigger.stop()