]> git.treefish.org Git - photosort.git/blob - src/bunch.py
added missing import
[photosort.git] / src / bunch.py
1 import logging
2 import os
3 import time
4 import threading
5
6 from dirtrigger import DirTrigger
7 from migrator import Migrator
8
9 class Bunch:
10     class TriggeredSource:
11         def __init__(self, idx, trigger, migrator, cleanup):
12             self.idx = idx
13             self.trigger = trigger
14             self.migrator = migrator
15             self.cleanup = cleanup
16
17     def __init__(self, idx, cache_dir, cfg):
18         self._idx = idx
19         source_idx = 1
20         self._sources = []
21         for src_dir_cfg in cfg['src_dirs']:
22             self._sources.append(
23                 Bunch.TriggeredSource(
24                     source_idx,
25                     DirTrigger(src_dir_cfg['path'], src_dir_cfg['cool_time'],
26                                src_dir_cfg['max_time']),
27                     Migrator(src_dir_cfg['path'], cfg['dst_dir']['path'],
28                              os.path.join(cache_dir, "%d.db" % idx)),
29                     src_dir_cfg['cleanup']
30                 )
31             )
32             source_idx += 1
33         logging.info("Created bunch #%d with %d sources.", self._idx, len(self._sources))
34
35     def start(self):
36         logging.info("Starting bunch #%d...", self._idx)
37         self._stop = False
38         self._worker_thread = threading.Thread(target=self._worker)
39         self._worker_thread.start()
40
41     def stop(self):
42         logging.info("Stopping bunch #%d...", self._idx)
43         self._stop = True
44         self._worker_thread.join()
45         for src in self._sources:
46             src.migrator.close()
47         logging.info("Stopped bunch #%d.", self._idx)
48
49     def is_running(self):
50         return self._worker_thread.is_alive()
51
52     def _worker(self):
53         for src in self._sources:
54             src.trigger.start()
55
56         while not self._stop:
57             for src in self._sources:
58                 try:
59                     if src.trigger.is_triggering():
60                         logging.info("Got trigger for source #%d.%d.", self._idx, src.idx)
61                         before = time.time()
62                         src.trigger.reset()
63                         src.migrator.migrate(src.cleanup)
64                         if src.cleanup:
65                             src.migrator.cleanup()
66                         logging.info("Migration took %.1fs for source #%d.%d.",
67                                      time.time() - before, self._idx, src.idx)
68                 except Exception as e:
69                     logging.error("Error migrating source #%d.%d: %s",
70                                   self._idx, src.idx, str(e))
71             time.sleep(10.0)
72
73         for src in self._sources:
74             src.trigger.stop()