import MySQLdb
import time
+from tasks.core import Logger
from tasks.api import IDataMigrator, IDataInitializer
import tasks
task = t
if task:
- sys.stdout.write("Running %s %s" % (task.get_name(), type))
- start = time.time()
- task.setup(self.cfg, self.src_db, self.target_db)
+ task.setup(self.cfg, self.src_db, self.target_db, self.logger)
task.process()
self.target_db.commit()
- sys.stdout.write("\t(%.2fs)\n" % (time.time() - start))
done.append(task)
return done
passwd = self.cfg.get('target', 'pass'),
charset = 'utf8')
+ self.logger = Logger()
+
def run(self, only_task = None):
"""Run all tasks or a single one identified by its name with only_task"""
self.done = []
print "Usage: %s <config_file> [task_name]" % sys.argv[0]
print "Tasks:"
for task in manager.list_tasks():
- print " %-24s%s" % (task.get_name(), task.__doc__)
+ print " %-28s%s" % (task.get_name(), task.__doc__)
sys.exit(1)
count = self.src_cursor.fetchone()[0]
self.stats = { 'total': count, 'imported': 0, 'ignored': 0}
+ self.start(count)
for offset in range(0, count):
- if offset % 400 == 0:
- self.step()
+ self.step(offset)
query = "INSERT INTO %s.media_collections (\n %s\n)\n" \
"SELECT \n %s\n FROM %s.Support AS s LIMIT %d, 1" % (
if errno == DUP_ENTRY:
self.src_cursor.execute("SELECT Cote FROM %s.Support LIMIT %d, 1" % (self.src_db_name, offset))
id = self.src_cursor.fetchone()[0]
- print "Collection %s not imported: %s" % (id, errmsg)
+ self.warn("Collection %s not imported: %s" % (id, errmsg))
self.stats['ignored'] += 1
else:
raise e
+ self.end()
+
class CollectionsEnumMapper(EnumMapper):
"""Map simple enumerations into the collections table"""
read_cursor = self.target_db.cursor()
+ self.start(self.stats['total'])
for pattern in self.patterns:
read_cursor.execute("SELECT old_code FROM media_collections WHERE old_code REGEXP %s",
(pattern['regex'],))
break
self.stats['matched'] += 1
- if self.stats['matched'] % 500 == 0:
- self.step()
+ self.step(self.stats['matched'])
old_code = row[0]
match = re.match(pattern['regex'], old_code)
if (year == -1 or serial == -1):
self.target_cursor.execute("DELETE FROM media_collections WHERE old_code = %s", (old_code,));
- print 'Removed record, old code is missing year or serial: %s' % old_code
+ self.warn('Removed record, old code is missing year or serial: %s' % old_code)
self.stats['removed'] += 1
continue
(errno, errstr) = e
if errno == DUP_ENTRY:
self.target_cursor.execute("DELETE FROM media_collections WHERE old_code = %s", (old_code,));
- print 'Removed record, code conversion caused a duplicate entry: %s -> %s' % (old_code, new_code)
+ self.warn('Removed record, code conversion caused a duplicate entry: %s -> %s' % (old_code, new_code))
self.stats['removed'] += 1
else:
raise e
self.stats['remaining'] = self.stats['total'] - self.stats['removed']
self.stats['unmatched'] = self.stats['total'] - self.stats['matched']
+ self.end()
class CollectionsPublishersMapper(DataMigrator):
"""Map publishers and publisher collections into the collections table"""
self.src_cursor.execute("SELECT Cote, TRIM(Editeur), TRIM(Collect_Serie) FROM Support");
i = 0
+ self.start(self.src_cursor.rowcount)
while True:
row = self.src_cursor.fetchone()
if not row:
if coll_name and len(coll_name):
coll_id = publishers[name]['sub'][coll_name]
except KeyError:
- print "Unknown publisher/collection: '%s' / '%s'" % name, coll_name
+ self.warn("Unknown publisher/collection: '%s' / '%s'" % name, coll_name)
except KeyError:
- print "Unknown publisher: '%s'" % name
+ self.warn("Unknown publisher: '%s'" % name)
elif coll_name and len(coll_name):
raise Exception("Row %s has publisher collection '%s' but no publisher" % (old_code, coll_name))
self.target_cursor.execute("UPDATE media_collections SET publisher_id = %s, publisher_collection_id = %s "
"WHERE old_code = %s", (id, coll_id, old_code));
- if i % 500 == 0:
- self.step()
+ self.step(i)
i += 1
self.stats['with_publisher'] = self.target_cursor.fetchone()[0]
self.target_cursor.execute("SELECT COUNT(*) FROM media_collections WHERE publisher_collection_id IS NOT NULL")
self.stats['with_collection'] = self.target_cursor.fetchone()[0]
+ self.end()
class CollectionsCollectorNormalizer(DataMigrator):
"""Migrate collector_is_creator flag"""
import sys
from telemeta.core import *
from datetime import date
+from sys import stdout
+import time
class DataMigrationTask(Component):
- def setup(self, cfg, src_db, target_db):
+ def setup(self, cfg, src_db, target_db, logger):
self.target_db = target_db
self.target_cursor = target_db.cursor()
self.src_db = src_db
self.src_db_name = cfg.get('src', 'name')
self.target_db_name = cfg.get('target', 'name')
self.stats = {}
-
- def step(self):
- sys.stdout.write('.')
- sys.stdout.flush()
+ self.logger = logger;
def src(self, query, args = None):
self.src_cursor.execute(query, args)
def target(self, query, args = None):
self.target_cursor.execute(query, args)
+ def step(self, position = -1):
+ self.logger.progress(position)
+
+ def start(self, count = 1):
+ self.logger.start(self, count)
+
+ def end(self):
+ self.logger.end()
+
+ def warn(self, msg):
+ self.logger.warn(msg)
+
class DataMigrator(DataMigrationTask):
def build_assignments(self, map):
return 0
+ def get_type(self):
+ return 'migrator'
class DataInitializer(DataMigrationTask):
- pass
+
+ def get_type(self):
+ return 'initializer'
class GroupedItemsManager(object):
enum_tables.append(target_base)
else:
enum_tables.append(target_base + 's')
-
+
+ self.src_cursor.execute("SELECT COUNT(*) FROM %s" % src_table)
+ total = self.src_cursor.fetchone()[0]
+ self.start(total)
while not offset or self.src_cursor.rowcount:
self.src_cursor.execute("SELECT %s, %s FROM %s LIMIT %d, %d" % (
src_id_field, ", ".join(src_fields), src_table, offset, buffer_size))
if self.target_cursor.rowcount > 1:
raise Exception("Updated more than one row, this shouldn't happen..")
elif not self.target_cursor.rowcount:
- print "Can't find migrated entry: %s" % code
+ self.warn("Can't find migrated entry: %s" % code)
else:
- print "Can't find value '%s' in %s" % (value, enum_tables[i])
+ self.warn("Can't find value '%s' in %s" % (value, enum_tables[i]))
+
+ self.step()
offset += self.src_cursor.rowcount
- self.step()
+
+ self.end()
+
+class Logger(object):
+ CESC = "\x1B["
+ CNOCURSOR = CESC + "?25l"
+ CCURSOR = CESC + "?25h"
+ CBOLD = CESC + "1m"
+ CGREEN = CESC + "32m"
+ CRED = CESC + "31m"
+ CBLUE = CESC + "34m"
+ CCYAN = CESC + "36m"
+ CRESET = CESC + "0m" + CCURSOR
+
+ def warn(self, message):
+ self.warnings.append(message)
+ if self.need_newline:
+ sys.stdout.write("\n")
+ sys.stdout.write("%s\n" % message)
+ self.need_newline = False
+
+ def color1(self, msg):
+ stdout.write(self.CBOLD + self.CGREEN + msg + self.CRESET)
+
+ def color2(self, msg):
+ stdout.write(self.CBOLD + self.CCYAN + msg + self.CRESET)
+
+ def color3(self, msg):
+ stdout.write(self.CBOLD + self.CRED + msg + self.CRESET)
+
+ def print_progress(self, ratio, start = False):
+ msg = "Running %s %s..." % (self.task.get_name(), self.task.get_type())
+ msg = "%-46s" % msg
+
+ p = ""
+ r = int(round(ratio * 10))
+ for i in range(0, r):
+ p += '='
+ for i in range(r, 10):
+ p += ' '
+
+ self.color1("\r" + msg)
+ self.color2(" [%s] %d%%" % (p, ratio * 100))
+ self.color3(" warnings: %d" % len(self.warnings))
+
+ sys.stdout.flush()
+ self.need_newline = True
+
+
+ def start(self, task, count = 1):
+ self.start_time = time.time()
+ self.task = task
+ self.count = count
+ self.position = 0
+ self.warnings = []
+ self.print_progress(0, True)
+
+ def end(self):
+ self.print_progress(1)
+ self.color2(" (%.2fs)\n" % (time.time() - self.start_time))
+ self.need_newline = False
+ self.task = None
+
+ def progress(self, position = -1):
+ if position == -1:
+ self.position += 1
+ else:
+ self.position = position
+ self.print_progress(float(self.position) / self.count)
+
self.src_cursor.execute("SELECT Cote_Phono, Annee_Enreg, Dates_Enregistr FROM Phono")
i = 0
+ self.start(self.src_cursor.rowcount)
while True:
row = self.src_cursor.fetchone()
if not row:
else:
self.stats['nosuchitem'] += 1
else:
- print "incoherent date (%s): %s -- %s" % (old_code, str(data), str(row))
+ self.warn("incoherent date (%s): %s -- %s" % (old_code, str(data), str(row)))
self.stats['incoherent'] += 1
else:
- #print '|%s|' % row[0]
self.stats['unparsed'] += 1
i += 1
- if i % 1000 == 0:
- self.step()
+ self.step(i)
+
+ self.end()
class DateRangeSynchronizer(DataMigrator):
"Ensures collections recording years contain items recording dates"
items_cursor = self.target_db.cursor()
update_cursor = self.target_db.cursor()
i = 0
+ self.start(self.target_cursor.rowcount)
while True:
row = self.target_cursor.fetchone()
if not row:
if synced:
self.stats['synced'] += 1
- if i % 400 == 0:
- self.step()
+ self.step(i)
i += 1
+
+ self.end()
return "enumerations"
def process(self):
+ self.start(len(self.map))
for src in self.map:
- self.step()
dest = self.map[src]
if src == 'Form':
src_field = 'Form'
self.target_cursor.execute("INSERT INTO `" + dest + "` (value) " +
"SELECT " + src_field +" FROM " + self.src_db_name + ".`" + src + "`")
+ self.step()
+
+ self.end()
+
implements(IDataMigrator)
- def setup(self, cfg, src_db, target_db):
- super(EthnicGroupsMigrator, self).setup(cfg, src_db, target_db)
+ def setup(self, cfg, src_db, target_db, logger):
+ super(EthnicGroupsMigrator, self).setup(cfg, src_db, target_db, logger)
self.data = GroupedItemsManager()
def get_name(self):
"VALUES(%s, %s)", (id, alias))
def process(self):
+ self.start(2)
self.extract()
+ self.step()
self.insert()
+ self.step()
self.stats = {
'groups': len(self.data.groups),
'aliases': self.data.count_items()
}
+ self.end()
naliases = 0
nhistoric_names = 0
- def setup(self, cfg, src_db, target_db):
- super(GeoEthnoImporter, self).setup(cfg, src_db, target_db)
+ def setup(self, cfg, src_db, target_db, logger):
+ super(GeoEthnoImporter, self).setup(cfg, src_db, target_db, logger)
self.cursor = self.target_cursor
self.dom = dom.parse(cfg.get('geoethno', 'xml_file'))
self.known_types = []
raise "Can't find tag 'TERMES-SPECIFIQUES' under tag '%s'" % node.nodeName
terms = terms[0]
+ self.step()
for n in terms.childNodes:
if (n.nodeType == dom.Node.ELEMENT_NODE):
self.process_children(n, name)
self.path.pop()
- if len(self.path) == 1:
- self.step()
def process(self):
self.path = []
+ self.start(len(self.dom.getElementsByTagName('TERMES-SPECIFIQUES')))
self.process_children(self.dom.getElementsByTagName('GEOETHNO')[0], '')
self.stats = {
'types': len(self.known_types),
'aliases': self.naliases,
'historical names': self.nhistoric_names
}
+ self.end()
- def warn(self, msg):
- print u"\nWarning: %s: %s\n" % ("/".join(self.path), msg)
-
class Error(Exception):
def __init__(self, importer, msg):
'unparsed' : 0,
'duplicate' : 0
}
+
+ self.start(self.stats['total'])
unknown_families = []
while True:
self.stats['duplicate'] += 1
else:
if not prefix in unknown_families:
- print "Unknown family prefix: " + prefix
+ self.warn("Unknown family prefix: " + prefix)
unknown_families.append(prefix)
self.stats['badprefix'] += 1
else:
- print "Can't parse instrument code: %s" % code
+ self.warn("Can't parse instrument code: %s" % code)
self.stats['unparsed'] += 1
+ self.step()
+
+ self.end()
class InstrumentAliasesMigrator(DataMigrator):
"""Migrate the instrument aliases"""
#self.src("SELECT Vernaculaire FROM Vernaculaire")
instr_cursor = self.src_db.cursor()
- i = 0
unknown_instruments = []
+ self.start(self.stats['total'])
while True:
- if i % 200 == 0:
- self.step()
- i += 1
row = self.src_cursor.fetchone()
if not row:
break
(alias_id, instrument_id))
self.stats['relations'] += 1
elif not row[0] in unknown_instruments:
- print "No such instrument: %s" % row[0]
+ self.warn("No such instrument: %s" % row[0])
unknown_instruments.append(row[0])
+
+ self.step()
+
self.stats['badinstruments'] = len(unknown_instruments)
+ self.end()
class ItemPerformancesMigrator(DataMigrator):
"""Migrate items performances"""
'migrated' : 0,
'nosuchitem' : 0
}
+ self.start(self.stats['total'])
while True:
row = self.src_cursor.fetchone()
if not row:
self.stats['migrated'] += 1
else:
- print "No such item: %s" % oldcode
+ self.warn("No such item: %s" % oldcode)
self.stats['nosuchitem'] += 1
+ self.step()
+
+ self.end()
+
+
return "items:copy"
def process(self):
+ self.start()
assign = self.build_assignments(self.map)
target_fields = [str(a[0]) for a in assign]
src_fields = [str(a[1]) for a in assign]
self.stats['imported'] = self.target_cursor.fetchone()[0]
self.stats['ignored'] = self.stats['total'] - self.stats['imported']
- print "Couldn't import the following items, no such (or un-migrated) collection:"
+ self.warn("Couldn't import the following items, no such (or un-migrated) collection:")
query = "SELECT p.Cote_Phono, p.Cote_Support FROM %s.Phono AS p " \
"LEFT JOIN media_collections AS c ON p.Cote_Support = c.old_code " \
row = self.target_cursor.fetchone()
if not row:
break
- print " %s (collection: %s)" % (row[0], row[1])
+ self.warn(" %s (collection: %s)" % (row[0], row[1]))
+
+ self.end()
class ItemsEnumMapper(EnumMapper):
"""Map simple enumerations into the items table"""
return "items:keywords"
def process(self):
+ self.start(7)
self.step()
self.target_cursor.execute("DELETE FROM context_keywords")
self.target_cursor.execute("INSERT INTO context_keywords (value) "
self.target_cursor.execute(query % self.src_db_name)
self.stats['nosuchkeyword'] = self.target_cursor.rowcount
if self.target_cursor.rowcount:
- print "Unknown keywords:"
+ self.warn("Unknown keywords:")
query = "SELECT f.Mot_Clef, COUNT(*) FROM %s.Fonction_Usage AS f " \
"LEFT JOIN context_keywords AS k ON f.Mot_Clef = k.value " \
"WHERE k.value IS NULL AND f.Mot_Clef <> '' GROUP BY f.Mot_Clef"
row = self.target_cursor.fetchone()
if not row:
break
- print " %s: count=%d" % row
+ self.warn(" %s: count=%d" % row)
+
+ self.end()
class ItemsLocationsMapper(DataMigrator):
"""Migrate items locations trying to map them to the Geo Ethno thesaurus"""
'nomap' : 0
}
- i = 0
self.target("SET foreign_key_checks = 0")
+ self.start(self.stats['total'])
while True:
- if i % 1000 == 0:
- self.step()
- i += 1
row = self.src_cursor.fetchone()
if not row:
break
else:
self.stats['nomap'] += 1
+ self.step()
+
self.target("SET foreign_key_checks = 1")
+ self.end()
implements(IDataMigrator)
- def setup(self, cfg, src_db, target_db):
- super(PublishersMigrator, self).setup(cfg, src_db, target_db)
+ def setup(self, cfg, src_db, target_db, logger):
+ super(PublishersMigrator, self).setup(cfg, src_db, target_db, logger)
self.data = GroupedItemsManager()
def get_name(self):
def process(self):
+ self.start(2)
self.extract()
+ self.step()
self.insert()
+ self.step()
self.stats = {
'publishers': len(self.data.groups),
'collections': self.data.count_items()
}
+ self.end()
def process(self):
#self.target_cursor.execute("SHOW TABLES")
#tables = self.target_cursor.fetchall()
+ self.start(len(self.tables))
tables = self.tables
for t in tables:
#table = t[0]
for f in fields:
if f[5] == 'auto_increment':
self.target_cursor.execute("ALTER TABLE " + table + " AUTO_INCREMENT = 1")
+ self.step()
+
+ self.end()