import logging
+import os
import time
import threading
class Bunch:
class TriggeredSource:
- def __init__(self, trigger, migrator, cleanup):
+ def __init__(self, idx, trigger, migrator, cleanup):
+ self.idx = idx
self.trigger = trigger
self.migrator = migrator
self.cleanup = cleanup
- def __init__(self, cfg):
- self._id = cfg['id']
+ def __init__(self, idx, cache_dir, cfg):
+ self._idx = idx
+ source_idx = 1
self._sources = []
for src_dir_cfg in cfg['src_dirs']:
self._sources.append(
Bunch.TriggeredSource(
- DirTrigger(src_dir_cfg['path']),
- Migrator(src_dir_cfg['path'], cfg['dst_dir']['path']),
+ source_idx,
+ DirTrigger(src_dir_cfg['path'], src_dir_cfg['cool_time'],
+ src_dir_cfg['max_time']),
+ Migrator(src_dir_cfg['path'], cfg['dst_dir']['path'],
+ os.path.join(cache_dir, "%d.db" % idx)),
src_dir_cfg['cleanup']
)
)
- logging.info("Created bunch %s with %d sources.", self._id, len(self._sources))
+ source_idx += 1
+ logging.info("Created bunch #%d with %d sources.", self._idx, len(self._sources))
def start(self):
- logging.info("Starting bunch %s...", self._id)
+ logging.info("Starting bunch #%d...", self._idx)
self._stop = False
self._worker_thread = threading.Thread(target=self._worker)
self._worker_thread.start()
def stop(self):
- logging.info("Stopping bunch %s...", self._id)
+ logging.info("Stopping bunch #%d...", self._idx)
self._stop = True
self._worker_thread.join()
- logging.info("Stopped bunch %s.", self._id)
+ for src in self._sources:
+ src.migrator.close()
+ logging.info("Stopped bunch #%d.", self._idx)
+
+ def is_running(self):
+ return self._worker_thread.is_alive()
def _worker(self):
- # start triggers
+ for src in self._sources:
+ src.trigger.start()
while not self._stop:
- for source in self._sources:
+ for src in self._sources:
try:
- if source.trigger.is_triggering():
- logging.info("Got source trigger for bunch %s.", self._id)
- source.trigger.reset()
- source.migrator.migrate()
- if source.cleanup:
- source.migrator.cleanup()
+ if src.trigger.is_triggering():
+ logging.info("Got trigger for source #%d.%d.", self._idx, src.idx)
+ before = time.time()
+ src.trigger.reset()
+ src.migrator.migrate(src.cleanup)
+ if src.cleanup:
+ src.migrator.cleanup()
+ logging.info("Migration took %.1fs for source #%d.%d.",
+ time.time() - before, self._idx, src.idx)
except Exception as e:
- logging.error("Error migrating source for bunch %s: %s",
- self._id, str(e))
+ logging.error("Error migrating source #%d.%d: %s",
+ self._idx, src.idx, str(e))
time.sleep(10.0)
- # stop triggers
+ for src in self._sources:
+ src.trigger.stop()