]> git.treefish.org Git - photosort.git/blob - src/bunch.py
689d9b0d53fd7ea696063cdc1fe6dec3989f592c
[photosort.git] / src / bunch.py
1 import logging
2 import time
3 import threading
4
5 from dirtrigger import DirTrigger
6 from migrator import Migrator
7
8 class Bunch:
9     class TriggeredSource:
10         def __init__(self, idx, trigger, migrator, cleanup):
11             self.idx = idx
12             self.trigger = trigger
13             self.migrator = migrator
14             self.cleanup = cleanup
15
16     def __init__(self, idx, cfg):
17         self._idx = idx
18         source_idx = 1
19         self._sources = []
20         for src_dir_cfg in cfg['src_dirs']:
21             self._sources.append(
22                 Bunch.TriggeredSource(
23                     source_idx,
24                     DirTrigger(src_dir_cfg['path'], src_dir_cfg['cool_time'],
25                                src_dir_cfg['max_time']),
26                     Migrator(src_dir_cfg['path'], cfg['dst_dir']['path'],
27                              os.path.join(cfg['cache_dir'], "%d.db" % idx)),
28                     src_dir_cfg['cleanup']
29                 )
30             )
31             source_idx += 1
32         logging.info("Created bunch #%d with %d sources.", self._idx, len(self._sources))
33
34     def start(self):
35         logging.info("Starting bunch #%d...", self._idx)
36         self._stop = False
37         self._worker_thread = threading.Thread(target=self._worker)
38         self._worker_thread.start()
39
40     def stop(self):
41         logging.info("Stopping bunch #%d...", self._idx)
42         self._stop = True
43         self._worker_thread.join()
44         for src in self._sources:
45             src.migrator.close()
46         logging.info("Stopped bunch #%d.", self._idx)
47
48     def is_running(self):
49         return self._worker_thread.is_alive()
50
51     def _worker(self):
52         for src in self._sources:
53             src.trigger.start()
54
55         while not self._stop:
56             for src in self._sources:
57                 try:
58                     if src.trigger.is_triggering():
59                         logging.info("Got trigger for source #%d.%d.", self._idx, src.idx)
60                         before = time.time()
61                         src.trigger.reset()
62                         src.migrator.migrate(src.cleanup)
63                         if src.cleanup:
64                             src.migrator.cleanup()
65                         logging.info("Migration took %.1fs for source #%d.%d.",
66                                      time.time() - before, self._idx, src.idx)
67                 except Exception as e:
68                     logging.error("Error migrating source #%d.%d: %s",
69                                   self._idx, src.idx, str(e))
70             time.sleep(10.0)
71
72         for src in self._sources:
73             src.trigger.stop()