{
+ "cache_dir": "/some/path",
"bunches": [
{
"dst_dir": {
import logging
+import os
import time
import threading
self.migrator = migrator
self.cleanup = cleanup
- def __init__(self, idx, cfg):
+ def __init__(self, idx, cache_dir, cfg):
self._idx = idx
source_idx = 1
self._sources = []
for src_dir_cfg in cfg['src_dirs']:
self._sources.append(
+ reg_db = os.path.join(cache_dir, "reg_%d_%d.db" % (idx, source_idx))
Bunch.TriggeredSource(
source_idx,
DirTrigger(src_dir_cfg['path'], src_dir_cfg['cool_time'],
src_dir_cfg['max_time']),
- Migrator(src_dir_cfg['path'], cfg['dst_dir']['path']),
+ Migrator(src_dir_cfg['path'], cfg['dst_dir']['path'], reg_db),
src_dir_cfg['cleanup']
)
)
import misc
class Migrator:
- def __init__(self, src_dir, dst_dir):
+ def __init__(self, src_dir, dst_dir, reg_db=None):
self._base_src_dir = src_dir
self._base_dst_dir = dst_dir
+ self._reg = Registry(reg_db)
def migrate(self, remove):
for src_file_name, src_file_path in misc.walk_media_files(self._base_src_dir):
logging.debug('Migrating %s...', src_file_name)
try:
- self._migrate_single(src_file_name, src_file_path, remove)
+ if not self._reg.is_registered(src_file_path):
+ self._migrate_single(src_file_name, src_file_path, remove)
+ self._reg.register(src_file_path)
+ else:
+ self._reg.refresh(src_file_path)
except Exception as e:
logging.error('Error migrating %s: %s', src_file_path, str(e))
bunch_idx = 1
bunches = []
for bunch_cfg in cfg['bunches']:
- bunches.append( Bunch(bunch_idx, bunch_cfg) )
+ bunches.append( Bunch(bunch_idx, cfg['cache_dir'], bunch_cfg) )
bunch_idx += 1
for bunch in bunches:
--- /dev/null
+import logging
+import os
+import sqlite3
+import time
+
+import misc
+
+class Registry:
+ def __init__(self, db_file):
+ if not db_file: return
+ if not os.path.isfile(db_file):
+ self._conn = sqlite3.connect(db_file, check_same_thread=False)
+ self._create_db()
+ else:
+ self._conn = sqlite3.connect(db_file, check_same_thread=False)
+
+ def is_registered(self, name):
+ if not db_file: return False
+ c = self._conn.cursor()
+ c.execute("SELECT COUNT(*) FROM cache WHERE name=?", (name,))
+ return c.fetchone() != None
+
+ def clean(self):
+ if not db_file: return
+ c = self._conn.cursor()
+ c.execute("DELETE FROM cache WHERE access_time<?", (int(time.time()) - 604800,))
+ self._conn.commit()
+
+
+ def register(self, name):
+ if not db_file: return
+ c = self._conn.cursor()
+ c.execute( '''INSERT INTO cache (name, access_time) VALUES (?, ?)''',
+ (name, int(time.time())) )
+ self._conn.commit()
+
+ def refresh(self, name):
+ if not db_file: return
+ c = self._conn.cursor()
+ c.execute( "UPDATE cache SET access_time=? WHERE name=?",
+ (int(time.time()), name) )
+ self._conn.commit()
+
+ def _create_db(self):
+ c = self._conn.cursor()
+ c.execute('''CREATE TABLE cache (name TEXT PRIMARY KEY, access_time INTEGER)''')
+ self._conn.commit()