self._worker_thread.join()
         logging.info("Stopped bunch %s.", self._id)
 
+    def is_running(self):
+        return self._worker_thread.is_alive()
+
     def _worker(self):
-        # start triggers
+        for source in self._sources:
+            source.trigger.start()
 
         while not self._stop:
             for source in self._sources:
                     if source.trigger.is_triggering():
                         logging.info("Got source trigger for bunch %s.", self._id)
                         source.trigger.reset()
-                        source.migrator.migrate()
+                        source.migrator.migrate(source.cleanup)
                         if source.cleanup:
                             source.migrator.cleanup()
                 except Exception as e:
                                   self._id, str(e))
             time.sleep(10.0)
 
-        # stop triggers
+        for source in self._sources:
+            source.trigger.stop()
 
-class DirTrigger:
+import threading
+import time
+from watchdog.events import FileSystemEventHandler
+from watchdog.observers import Observer
+
+class DirTrigger(FileSystemEventHandler):
+    COOL_TIME = 10
+    MAX_TIME = 86400
+
     def __init__(self, dir_path):
-        pass
+        self._got_event = False
+        self._last_event_time = 0.0
+        self._last_reset_time = 0.0
+        self._lock = threading.Lock()
+        self._observer = Observer()
+        self._observer.schedule(self, path=dir_path, recursive=True)
+
+    def start(self):
+        self._observer.start()
+
+    def stop(self):
+        self._observer.stop()
 
     def is_triggering(self):
-        return False
+        self._lock.acquire()
+        try:
+            return ( self._got_event and
+                     (time.time() - self._last_event_time > DirTrigger.COOL_TIME) ) or \
+                     time.time() - self._last_reset_time > DirTrigger.MAX_TIME
+        finally:
+            self._lock.release()
+
+    def reset(self):
+        self._got_event = False
+        self._last_reset_time = time.time()
+
+    def on_any_event(self, event):
+        self._lock.acquire()
+        try:
+            self._got_event = True
+            self._last_event_time = time.time()
+        finally:
+            self._lock.release()