diff options
-rw-r--r-- | move-postjournal-elasticsearch | 105 |
1 files changed, 105 insertions, 0 deletions
diff --git a/move-postjournal-elasticsearch b/move-postjournal-elasticsearch new file mode 100644 index 0000000..15b2309 --- /dev/null +++ b/move-postjournal-elasticsearch @@ -0,0 +1,105 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +import string +import sys +import time +import sqlalchemy +from elasticsearch import Elasticsearch +from elasticsearch import helpers + +def populate_from_scraper_real(scraper): + sys.stdout.write(scraper + ": ") + sys.stdout.flush() + + es = Elasticsearch() + + sql = "select * from swdata" + filename = "sqlite:///data/%s.sqlite" % scraper + create = sqlalchemy.create_engine +# print "opening %s" % filename + engine = create(filename, echo=False, connect_args={'timeout': 300}) + connection = engine.connect() + result = connection.execute(sql) + + data = [] + skipped = 0 + id = 0 + actions = [] + for row in result: + entry = dict(row.items()) + + # Handle OEP scraper 2012-06-16 + if not 'caseyear' in entry or entry['caseyear'] is None or \ + not 'caseseqnr' in entry or entry['caseseqnr'] is None: + if True: + if entry['caseid'] is None: +# print "Strange entry, skipping: ", entry + skipped = skipped + 1 + continue + entry['caseyear'], entry['caseseqnr'] = entry['caseid'].split("/") + + entry['scraper'] = scraper + + # Handle missing scrapestamputc field, some OEP entries are + # missing them. + if 'scrapestamputc' in entry and entry['scrapestamputc'] is not None: +# print "scrapestamputc: %s" % entry['scrapestamputc'] + entry['scrapestamputc'] = entry['scrapestamputc'] + '+0000' + +# print entry + # Workaround for postliste-stortinget failing on some PDFs + if entry['doctype'] == u'Avs./mot:': + continue + + # Clean up numbers + for field in ['caseyear','caseseqnr', 'casedocseq']: + if field in entry: + entry[field] = int(entry[field]) + + indexname = 'index-' + entry['agency'] + indexname = indexname.replace(' ', '_').replace(',', '').lower() + action = { + "_type": "publicjournal", + "_index": indexname, + "_id": "%d/%d-%d" % (entry['caseyear'], + entry['caseseqnr'], + entry['casedocseq']), + "_source": entry + } + actions.append(action) + id += 1 + + while (len(actions) > 10000 and len(actions) % 10000 == 0): + sys.stdout.write(".") + sys.stdout.flush() + helpers.bulk(es, actions) + del actions[0:len(actions)] + break + connection.close() + + if (len(actions) > 0): + helpers.bulk(es, actions) + del actions[0:len(actions)] + + print "done" + return len(data) - skipped + +def populate_from_scraper(scraper): + ret = populate_from_scraper_real(scraper) + if ret is None: + time.sleep(10) + ret = populate_from_scraper_real(scraper) + return ret + +def main(): + scrapers = [ + 'postliste-oep', + ] + + for scraper in scrapers: + print +# print "Moving data from " + scraper + populate_from_scraper(scraper) + +main() |