cache test db connection

Oliver Tonnhofer 2014-09-04 16:24:32 +02:00
parent 9b39791076
commit 1e4a6063f2
1 changed files with 25 additions and 4 deletions

View File

@ -47,6 +47,7 @@ def setup():
def teardown():
shutil.rmtree(tmpdir)
drop_schemas()
_close_test_connection(db_conf)
db_conf = {
@ -74,13 +75,14 @@ def create_geom_in_row(rowdict):
return rowdict
def query_row(db_conf, table, osmid):
conn = psycopg2.connect(**db_conf)
conn = _test_connection(db_conf)
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute('select * from %s.%s where osm_id = %%s' % (TEST_SCHEMA_PRODUCTION, table), [osmid])
results = []
for row in cur.fetchall():
create_geom_in_row(row)
results.append(row)
cur.close()
if not results:
return None
@ -89,6 +91,7 @@ def query_row(db_conf, table, osmid):
return results
def imposm3_import(db_conf, pbf, mapping_file):
_close_test_connection(db_conf)
conn = pg_db_url(db_conf)
try:
@ -108,6 +111,7 @@ def imposm3_import(db_conf, pbf, mapping_file):
raise
def imposm3_deploy(db_conf, mapping_file):
_close_test_connection(db_conf)
conn = pg_db_url(db_conf)
try:
@ -125,6 +129,7 @@ def imposm3_deploy(db_conf, mapping_file):
raise
def imposm3_revert_deploy(db_conf, mapping_file):
_close_test_connection(db_conf)
conn = pg_db_url(db_conf)
try:
@ -142,6 +147,7 @@ def imposm3_revert_deploy(db_conf, mapping_file):
raise
def imposm3_remove_backups(db_conf, mapping_file):
_close_test_connection(db_conf)
conn = pg_db_url(db_conf)
try:
@ -157,6 +163,7 @@ def imposm3_remove_backups(db_conf, mapping_file):
raise
def imposm3_update(db_conf, osc, mapping_file):
_close_test_connection(db_conf)
conn = pg_db_url(db_conf)
try:
@ -190,12 +197,26 @@ def cache_query(nodes='', ways='', relations='', deps='', full=''):
print out
return json.loads(out)
def _test_connection(db_conf):
if '_connection' in db_conf:
return db_conf['_connection']
db_conf['_connection'] = psycopg2.connect(**db_conf)
return db_conf['_connection']
def _close_test_connection(db_conf):
if '_connection' in db_conf:
db_conf['_connection'].close()
del db_conf['_connection']
def table_exists(table, schema=TEST_SCHEMA_IMPORT):
conn = psycopg2.connect(**db_conf)
conn = _test_connection(db_conf)
cur = conn.cursor()
cur.execute("SELECT EXISTS(SELECT * FROM information_schema.tables WHERE table_name='%s' AND table_schema='%s')"
% (table, schema))
return cur.fetchone()[0]
exists = cur.fetchone()[0]
cur.close()
return exists
def assert_missing_node(id):
data = cache_query(nodes=[id])
@ -218,7 +239,7 @@ def assert_cached_way(id):
raise AssertionError('way %d not found' % id)
def drop_schemas():
conn = psycopg2.connect(**db_conf)
conn = _test_connection(db_conf)
cur = conn.cursor()
cur.execute("DROP SCHEMA IF EXISTS %s CASCADE" % TEST_SCHEMA_IMPORT)
cur.execute("DROP SCHEMA IF EXISTS %s CASCADE" % TEST_SCHEMA_PRODUCTION)