From 3901c5d8c408ce094f44be629928179197c678e8 Mon Sep 17 00:00:00 2001 From: Oliver Tonnhofer Date: Mon, 2 Dec 2013 13:10:42 +0100 Subject: [PATCH] make db schema configurable from cmd-line and config.json --- config/config.go | 26 +++++++- database/database.go | 3 + database/postgis/postgis.go | 19 +++--- database/postgis/rotate.go | 10 ++- database/postgis/spec.go | 4 +- database/postgis/tx.go | 2 +- database/postgis/util.go | 19 ------ diff/process.go | 4 ++ import_/import.go | 3 + test/imposm_system_test.py | 130 ++++++++++++++++++++++++++++++++---- 10 files changed, 171 insertions(+), 49 deletions(-) diff --git a/config/config.go b/config/config.go index 4206418..df138bb 100644 --- a/config/config.go +++ b/config/config.go @@ -16,10 +16,20 @@ type Config struct { LimitTo string `json:"limitto"` LimitToCacheBuffer float64 `json:"limitto_cache_buffer"` Srid int `json:"srid"` + Schemas Schemas `json:"schemas"` +} + +type Schemas struct { + Import string `json:"import"` + Production string `json:"production"` + Backup string `json:"backup"` } const defaultSrid = 3857 const defaultCacheDir = "/tmp/imposm3" +const defaultSchemaImport = "import" +const defaultSchemaProduction = "production" +const defaultSchemaBackup = "backup" var ImportFlags = flag.NewFlagSet("import", flag.ExitOnError) var DiffFlags = flag.NewFlagSet("diff", flag.ExitOnError) @@ -34,6 +44,7 @@ type _BaseOptions struct { ConfigFile string Httpprofile string Quiet bool + Schemas Schemas } func (o *_BaseOptions) updateFromConfig() error { @@ -54,6 +65,17 @@ func (o *_BaseOptions) updateFromConfig() error { return err } } + + if o.Schemas.Import == defaultSchemaImport { + o.Schemas.Import = conf.Schemas.Import + } + if o.Schemas.Production == defaultSchemaProduction { + o.Schemas.Production = conf.Schemas.Production + } + if o.Schemas.Backup == defaultSchemaBackup { + o.Schemas.Backup = conf.Schemas.Backup + } + if o.Connection == "" { o.Connection = conf.Connection } @@ -114,7 +136,9 @@ func addBaseFlags(flags *flag.FlagSet) { flags.StringVar(&BaseOptions.ConfigFile, "config", "", "config (json)") flags.StringVar(&BaseOptions.Httpprofile, "httpprofile", "", "bind address for profile server") flags.BoolVar(&BaseOptions.Quiet, "quiet", false, "quiet log output") - + flags.StringVar(&BaseOptions.Schemas.Import, "dbschema-import", defaultSchemaImport, "db schema for imports") + flags.StringVar(&BaseOptions.Schemas.Production, "dbschema-production", defaultSchemaProduction, "db schema for production") + flags.StringVar(&BaseOptions.Schemas.Backup, "dbschema-backup", defaultSchemaBackup, "db schema for backups") } func UsageImport() { diff --git a/database/database.go b/database/database.go index 1f37148..e3ed0ba 100644 --- a/database/database.go +++ b/database/database.go @@ -10,6 +10,9 @@ import ( type Config struct { ConnectionParams string Srid int + ImportSchema string + ProductionSchema string + BackupSchema string } type DB interface { diff --git a/database/postgis/postgis.go b/database/postgis/postgis.go index 453528f..ea17544 100644 --- a/database/postgis/postgis.go +++ b/database/postgis/postgis.go @@ -113,7 +113,7 @@ func (pg *PostGIS) createSchema(schema string) error { // Init creates schema and tables, drops existing data. func (pg *PostGIS) Init() error { - if err := pg.createSchema(pg.Schema); err != nil { + if err := pg.createSchema(pg.Config.ImportSchema); err != nil { return err } @@ -173,7 +173,7 @@ func createIndex(pg *PostGIS, tableName string, columns []ColumnSpec) error { for _, col := range columns { if col.Type.Name() == "GEOMETRY" { sql := fmt.Sprintf(`CREATE INDEX "%s_geom" ON "%s"."%s" USING GIST ("%s")`, - tableName, pg.Schema, tableName, col.Name) + tableName, pg.Config.ImportSchema, tableName, col.Name) step := log.StartStep(fmt.Sprintf("Creating geometry index on %s", tableName)) _, err := pg.Db.Exec(sql) log.StopStep(step) @@ -183,7 +183,7 @@ func createIndex(pg *PostGIS, tableName string, columns []ColumnSpec) error { } if col.FieldType.Name == "id" { sql := fmt.Sprintf(`CREATE INDEX "%s_osm_id_idx" ON "%s"."%s" USING BTREE ("%s")`, - tableName, pg.Schema, tableName, col.Name) + tableName, pg.Config.ImportSchema, tableName, col.Name) step := log.StartStep(fmt.Sprintf("Creating OSM id index on %s", tableName)) _, err := pg.Db.Exec(sql) log.StopStep(step) @@ -282,7 +282,7 @@ func (pg *PostGIS) generalizeTable(table *GeneralizedTableSpec) error { cols = append(cols, col.Type.GeneralizeSql(&col, table)) } - if err := dropTableIfExists(tx, pg.Schema, table.FullName); err != nil { + if err := dropTableIfExists(tx, pg.Config.ImportSchema, table.FullName); err != nil { return err } @@ -295,7 +295,7 @@ func (pg *PostGIS) generalizeTable(table *GeneralizedTableSpec) error { sourceTable = table.Source.FullName } sql := fmt.Sprintf(`CREATE TABLE "%s"."%s" AS (SELECT %s FROM "%s"."%s"%s)`, - pg.Schema, table.FullName, columnSQL, pg.Schema, + pg.Config.ImportSchema, table.FullName, columnSQL, pg.Config.ImportSchema, sourceTable, where) _, err = tx.Exec(sql) @@ -355,7 +355,7 @@ func clusterTable(pg *PostGIS, tableName string, srid int, columns []ColumnSpec) if col.Type.Name() == "GEOMETRY" { step := log.StartStep(fmt.Sprintf("Indexing %s on geohash", tableName)) sql := fmt.Sprintf(`CREATE INDEX "%s_geom_geohash" ON "%s"."%s" (ST_GeoHash(ST_Transform(ST_SetSRID(Box2D(%s), %d), 4326)))`, - tableName, pg.Schema, tableName, col.Name, srid) + tableName, pg.Config.ImportSchema, tableName, col.Name, srid) _, err := pg.Db.Exec(sql) log.StopStep(step) if err != nil { @@ -364,7 +364,7 @@ func clusterTable(pg *PostGIS, tableName string, srid int, columns []ColumnSpec) step = log.StartStep(fmt.Sprintf("Clustering %s on geohash", tableName)) sql = fmt.Sprintf(`CLUSTER "%s_geom_geohash" ON "%s"."%s"`, - tableName, pg.Schema, tableName) + tableName, pg.Config.ImportSchema, tableName) _, err = pg.Db.Exec(sql) log.StopStep(step) if err != nil { @@ -376,7 +376,7 @@ func clusterTable(pg *PostGIS, tableName string, srid int, columns []ColumnSpec) step := log.StartStep(fmt.Sprintf("Analysing %s", tableName)) sql := fmt.Sprintf(`ANALYSE "%s"."%s"`, - pg.Schema, tableName) + pg.Config.ImportSchema, tableName) _, err := pg.Db.Exec(sql) log.StopStep(step) if err != nil { @@ -389,8 +389,6 @@ func clusterTable(pg *PostGIS, tableName string, srid int, columns []ColumnSpec) type PostGIS struct { Db *sql.DB Params string - Schema string - BackupSchema string Config database.Config Tables map[string]*TableSpec GeneralizedTables map[string]*GeneralizedTableSpec @@ -586,7 +584,6 @@ func New(conf database.Config, m *mapping.Mapping) (database.DB, error) { return nil, err } params = disableDefaultSslOnLocalhost(params) - db.Schema, db.BackupSchema = schemasFromConnectionParams(params) db.Prefix = prefixFromConnectionParams(params) for name, table := range m.Tables { diff --git a/database/postgis/rotate.go b/database/postgis/rotate.go index 5149ac6..5abf9e4 100644 --- a/database/postgis/rotate.go +++ b/database/postgis/rotate.go @@ -7,6 +7,10 @@ import ( func (pg *PostGIS) rotate(source, dest, backup string) error { defer log.StopStep(log.StartStep(fmt.Sprintf("Rotating tables"))) + if err := pg.createSchema(dest); err != nil { + return err + } + if err := pg.createSchema(backup); err != nil { return err } @@ -71,11 +75,11 @@ func (pg *PostGIS) rotate(source, dest, backup string) error { } func (pg *PostGIS) Deploy() error { - return pg.rotate(pg.Schema, "public", pg.BackupSchema) + return pg.rotate(pg.Config.ImportSchema, pg.Config.ProductionSchema, pg.Config.BackupSchema) } func (pg *PostGIS) RevertDeploy() error { - return pg.rotate(pg.BackupSchema, "public", pg.Schema) + return pg.rotate(pg.Config.BackupSchema, pg.Config.ProductionSchema, pg.Config.ImportSchema) } func (pg *PostGIS) RemoveBackup() error { @@ -85,7 +89,7 @@ func (pg *PostGIS) RemoveBackup() error { } defer rollbackIfTx(&tx) - backup := pg.BackupSchema + backup := pg.Config.BackupSchema for _, tableName := range pg.tableNames() { tableName = pg.Prefix + tableName diff --git a/database/postgis/spec.go b/database/postgis/spec.go index 7bea30c..2be0dd4 100644 --- a/database/postgis/spec.go +++ b/database/postgis/spec.go @@ -116,7 +116,7 @@ func NewTableSpec(pg *PostGIS, t *mapping.Table) *TableSpec { spec := TableSpec{ Name: t.Name, FullName: pg.Prefix + t.Name, - Schema: pg.Schema, + Schema: pg.Config.ImportSchema, GeometryType: string(t.Type), Srid: pg.Config.Srid, } @@ -140,7 +140,7 @@ func NewGeneralizedTableSpec(pg *PostGIS, t *mapping.GeneralizedTable) *Generali spec := GeneralizedTableSpec{ Name: t.Name, FullName: pg.Prefix + t.Name, - Schema: pg.Schema, + Schema: pg.Config.ImportSchema, Tolerance: t.Tolerance, Where: t.SqlFilter, SourceName: t.SourceTableName, diff --git a/database/postgis/tx.go b/database/postgis/tx.go index d699cbe..086e825 100644 --- a/database/postgis/tx.go +++ b/database/postgis/tx.go @@ -49,7 +49,7 @@ func (tt *bulkTableTx) Begin(tx *sql.Tx) error { } tt.Tx = tx - _, err = tx.Exec(fmt.Sprintf(`TRUNCATE TABLE "%s"."%s" RESTART IDENTITY`, tt.Pg.Schema, tt.Table)) + _, err = tx.Exec(fmt.Sprintf(`TRUNCATE TABLE "%s"."%s" RESTART IDENTITY`, tt.Pg.Config.ImportSchema, tt.Table)) if err != nil { return err } diff --git a/database/postgis/util.go b/database/postgis/util.go index a8c9650..c728476 100644 --- a/database/postgis/util.go +++ b/database/postgis/util.go @@ -8,25 +8,6 @@ import ( "sync" ) -func schemasFromConnectionParams(params string) (string, string) { - parts := strings.Fields(params) - var schema, backupSchema string - for _, p := range parts { - if strings.HasPrefix(p, "schema=") { - schema = strings.Replace(p, "schema=", "", 1) - } else if strings.HasPrefix(p, "backupschema=") { - backupSchema = strings.Replace(p, "backupschema=", "", 1) - } - } - if schema == "" { - schema = "import" - } - if backupSchema == "" { - backupSchema = "backup" - } - return schema, backupSchema -} - // disableDefaultSslOnLocalhost adds sslmode=disable to params // when host is localhost/127.0.0.1 and the sslmode param and // PGSSLMODE environment are both not set. diff --git a/diff/process.go b/diff/process.go index 60be0de..a7a4eaf 100644 --- a/diff/process.go +++ b/diff/process.go @@ -53,6 +53,10 @@ func Update(oscFile string, geometryLimiter *limit.Limiter, expireor expire.Expi dbConf := database.Config{ ConnectionParams: config.BaseOptions.Connection, Srid: config.BaseOptions.Srid, + // we apply diff imports on the Production schema + ImportSchema: config.BaseOptions.Schemas.Production, + ProductionSchema: config.BaseOptions.Schemas.Production, + BackupSchema: config.BaseOptions.Schemas.Backup, } db, err := database.Open(dbConf, tagmapping) if err != nil { diff --git a/import_/import.go b/import_/import.go index e239aec..1f334e4 100644 --- a/import_/import.go +++ b/import_/import.go @@ -63,6 +63,9 @@ func Import() { conf := database.Config{ ConnectionParams: config.BaseOptions.Connection, Srid: config.BaseOptions.Srid, + ImportSchema: config.BaseOptions.Schemas.Import, + ProductionSchema: config.BaseOptions.Schemas.Production, + BackupSchema: config.BaseOptions.Schemas.Backup, } db, err = database.Open(conf, tagmapping) if err != nil { diff --git a/test/imposm_system_test.py b/test/imposm_system_test.py index 2d1aed1..fa777b2 100644 --- a/test/imposm_system_test.py +++ b/test/imposm_system_test.py @@ -25,12 +25,17 @@ def setup(): def teardown(): shutil.rmtree(tmpdir) + drop_test_schemas() db_conf = { 'host': 'localhost', } +TEST_SCHEMA_IMPORT = "imposm3testimport" +TEST_SCHEMA_PRODUCTION = "imposm3testpublic" +TEST_SCHEMA_BACKUP = "imposm3testbackup" + def merc_point(lon, lat): pole = 6378137 * math.pi # 20037508.342789244 @@ -50,7 +55,7 @@ def create_geom_in_row(rowdict): def query_row(db_conf, table, osmid): conn = psycopg2.connect(**db_conf) cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) - cur.execute('select * from import.%s where osm_id = %%s' % table, [osmid]) + cur.execute('select * from %s.%s where osm_id = %%s' % (TEST_SCHEMA_PRODUCTION, table), [osmid]) results = [] for row in cur.fetchall(): create_geom_in_row(row) @@ -66,29 +71,80 @@ def imposm3_import(db_conf, pbf): conn = pg_db_url(db_conf) try: - print subprocess.check_output( + print subprocess.check_output(( "../imposm3 import -connection %s -read %s" " -write" " -cachedir %s" " -diff" " -overwritecache" + " -dbschema-import " + TEST_SCHEMA_IMPORT + " -optimize" - " -mapping test_mapping.json " % ( + " -mapping test_mapping.json ") % ( conn, pbf, tmpdir, ), shell=True) except subprocess.CalledProcessError, ex: print ex.output raise +def imposm3_deploy(db_conf): + conn = pg_db_url(db_conf) + + try: + print subprocess.check_output(( + "../imposm3 import -connection %s" + " -dbschema-import " + TEST_SCHEMA_IMPORT + + " -dbschema-production " + TEST_SCHEMA_PRODUCTION + + " -dbschema-backup " + TEST_SCHEMA_BACKUP + + " -deployproduction" + " -mapping test_mapping.json ") % ( + conn, + ), shell=True) + except subprocess.CalledProcessError, ex: + print ex.output + raise + +def imposm3_revert_deploy(db_conf): + conn = pg_db_url(db_conf) + + try: + print subprocess.check_output(( + "../imposm3 import -connection %s" + " -dbschema-import " + TEST_SCHEMA_IMPORT + + " -dbschema-production " + TEST_SCHEMA_PRODUCTION + + " -dbschema-backup " + TEST_SCHEMA_BACKUP + + " -revertdeploy" + " -mapping test_mapping.json ") % ( + conn, + ), shell=True) + except subprocess.CalledProcessError, ex: + print ex.output + raise + +def imposm3_remove_backups(db_conf): + conn = pg_db_url(db_conf) + + try: + print subprocess.check_output(( + "../imposm3 import -connection %s" + " -dbschema-backup " + TEST_SCHEMA_BACKUP + + " -removebackup" + " -mapping test_mapping.json ") % ( + conn, + ), shell=True) + except subprocess.CalledProcessError, ex: + print ex.output + raise + def imposm3_update(db_conf, osc): conn = pg_db_url(db_conf) try: - print subprocess.check_output( + print subprocess.check_output(( "../imposm3 diff -connection %s" " -cachedir %s" " -limitto clipping-3857.geojson" - " -mapping test_mapping.json %s" % ( + " -dbschema-production " + TEST_SCHEMA_PRODUCTION + + " -mapping test_mapping.json %s") % ( conn, tmpdir, osc, ), shell=True) except subprocess.CalledProcessError, ex: @@ -113,11 +169,11 @@ def cache_query(nodes='', ways='', relations='', deps='', full=''): print out return json.loads(out) -def table_exists(table): +def table_exists(table, schema=TEST_SCHEMA_IMPORT): conn = psycopg2.connect(**db_conf) cur = conn.cursor() cur.execute("SELECT EXISTS(SELECT * FROM information_schema.tables WHERE table_name='%s' AND table_schema='%s')" - % (table, 'import')) + % (table, schema)) return cur.fetchone()[0] def assert_missing_node(id): @@ -140,19 +196,29 @@ def assert_cached_way(id): if not data['ways'][str(id)]: raise AssertionError('way %d not found' % id) -def drop_import_schema(): +def drop_test_schemas(): conn = psycopg2.connect(**db_conf) cur = conn.cursor() - cur.execute("DROP SCHEMA IF EXISTS import CASCADE") + cur.execute("DROP SCHEMA IF EXISTS %s CASCADE" % TEST_SCHEMA_IMPORT) + cur.execute("DROP SCHEMA IF EXISTS %s CASCADE" % TEST_SCHEMA_PRODUCTION) + cur.execute("DROP SCHEMA IF EXISTS %s CASCADE" % TEST_SCHEMA_BACKUP) conn.commit() ####################################################################### def test_import(): """Import succeeds""" - drop_import_schema() - assert not table_exists('osm_roads') + drop_test_schemas() + assert not table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT) imposm3_import(db_conf, './build/test.pbf') - assert table_exists('osm_roads') + assert table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT) + +def test_deploy(): + """Deploy succeeds""" + assert not table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION) + imposm3_deploy(db_conf) + assert table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION) + assert not table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT) + ####################################################################### def test_imported_landusage(): @@ -436,3 +502,43 @@ def test_duplicate_ids2(): assert query_row(db_conf, 'osm_buildings', -51001) == None assert query_row(db_conf, 'osm_buildings', -51011)['type'] == 'mp' assert query_row(db_conf, 'osm_buildings', 51011) == None + +####################################################################### +def test_deploy_and_revert_deploy(): + """Revert deploy succeeds""" + assert not table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT) + assert table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION) + assert not table_exists('osm_roads', schema=TEST_SCHEMA_BACKUP) + + # import again to have a new import schema + imposm3_import(db_conf, './build/test.pbf') + assert table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT) + + imposm3_deploy(db_conf) + assert not table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT) + assert table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION) + assert table_exists('osm_roads', schema=TEST_SCHEMA_BACKUP) + + imposm3_revert_deploy(db_conf) + assert table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT) + assert table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION) + assert not table_exists('osm_roads', schema=TEST_SCHEMA_BACKUP) + +def test_remove_backup(): + """Remove backup succeeds""" + assert table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT) + assert table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION) + assert not table_exists('osm_roads', schema=TEST_SCHEMA_BACKUP) + + imposm3_deploy(db_conf) + + assert not table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT) + assert table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION) + assert table_exists('osm_roads', schema=TEST_SCHEMA_BACKUP) + + imposm3_remove_backups(db_conf) + + assert not table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT) + assert table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION) + assert not table_exists('osm_roads', schema=TEST_SCHEMA_BACKUP) +