make db schema configurable from cmd-line and config.json

master
Oliver Tonnhofer 2013-12-02 13:10:42 +01:00
parent 122bfc4f31
commit 3901c5d8c4
10 changed files with 171 additions and 49 deletions

View File

@ -16,10 +16,20 @@ type Config struct {
LimitTo string `json:"limitto"`
LimitToCacheBuffer float64 `json:"limitto_cache_buffer"`
Srid int `json:"srid"`
Schemas Schemas `json:"schemas"`
}
type Schemas struct {
Import string `json:"import"`
Production string `json:"production"`
Backup string `json:"backup"`
}
const defaultSrid = 3857
const defaultCacheDir = "/tmp/imposm3"
const defaultSchemaImport = "import"
const defaultSchemaProduction = "production"
const defaultSchemaBackup = "backup"
var ImportFlags = flag.NewFlagSet("import", flag.ExitOnError)
var DiffFlags = flag.NewFlagSet("diff", flag.ExitOnError)
@ -34,6 +44,7 @@ type _BaseOptions struct {
ConfigFile string
Httpprofile string
Quiet bool
Schemas Schemas
}
func (o *_BaseOptions) updateFromConfig() error {
@ -54,6 +65,17 @@ func (o *_BaseOptions) updateFromConfig() error {
return err
}
}
if o.Schemas.Import == defaultSchemaImport {
o.Schemas.Import = conf.Schemas.Import
}
if o.Schemas.Production == defaultSchemaProduction {
o.Schemas.Production = conf.Schemas.Production
}
if o.Schemas.Backup == defaultSchemaBackup {
o.Schemas.Backup = conf.Schemas.Backup
}
if o.Connection == "" {
o.Connection = conf.Connection
}
@ -114,7 +136,9 @@ func addBaseFlags(flags *flag.FlagSet) {
flags.StringVar(&BaseOptions.ConfigFile, "config", "", "config (json)")
flags.StringVar(&BaseOptions.Httpprofile, "httpprofile", "", "bind address for profile server")
flags.BoolVar(&BaseOptions.Quiet, "quiet", false, "quiet log output")
flags.StringVar(&BaseOptions.Schemas.Import, "dbschema-import", defaultSchemaImport, "db schema for imports")
flags.StringVar(&BaseOptions.Schemas.Production, "dbschema-production", defaultSchemaProduction, "db schema for production")
flags.StringVar(&BaseOptions.Schemas.Backup, "dbschema-backup", defaultSchemaBackup, "db schema for backups")
}
func UsageImport() {

View File

@ -10,6 +10,9 @@ import (
type Config struct {
ConnectionParams string
Srid int
ImportSchema string
ProductionSchema string
BackupSchema string
}
type DB interface {

View File

@ -113,7 +113,7 @@ func (pg *PostGIS) createSchema(schema string) error {
// Init creates schema and tables, drops existing data.
func (pg *PostGIS) Init() error {
if err := pg.createSchema(pg.Schema); err != nil {
if err := pg.createSchema(pg.Config.ImportSchema); err != nil {
return err
}
@ -173,7 +173,7 @@ func createIndex(pg *PostGIS, tableName string, columns []ColumnSpec) error {
for _, col := range columns {
if col.Type.Name() == "GEOMETRY" {
sql := fmt.Sprintf(`CREATE INDEX "%s_geom" ON "%s"."%s" USING GIST ("%s")`,
tableName, pg.Schema, tableName, col.Name)
tableName, pg.Config.ImportSchema, tableName, col.Name)
step := log.StartStep(fmt.Sprintf("Creating geometry index on %s", tableName))
_, err := pg.Db.Exec(sql)
log.StopStep(step)
@ -183,7 +183,7 @@ func createIndex(pg *PostGIS, tableName string, columns []ColumnSpec) error {
}
if col.FieldType.Name == "id" {
sql := fmt.Sprintf(`CREATE INDEX "%s_osm_id_idx" ON "%s"."%s" USING BTREE ("%s")`,
tableName, pg.Schema, tableName, col.Name)
tableName, pg.Config.ImportSchema, tableName, col.Name)
step := log.StartStep(fmt.Sprintf("Creating OSM id index on %s", tableName))
_, err := pg.Db.Exec(sql)
log.StopStep(step)
@ -282,7 +282,7 @@ func (pg *PostGIS) generalizeTable(table *GeneralizedTableSpec) error {
cols = append(cols, col.Type.GeneralizeSql(&col, table))
}
if err := dropTableIfExists(tx, pg.Schema, table.FullName); err != nil {
if err := dropTableIfExists(tx, pg.Config.ImportSchema, table.FullName); err != nil {
return err
}
@ -295,7 +295,7 @@ func (pg *PostGIS) generalizeTable(table *GeneralizedTableSpec) error {
sourceTable = table.Source.FullName
}
sql := fmt.Sprintf(`CREATE TABLE "%s"."%s" AS (SELECT %s FROM "%s"."%s"%s)`,
pg.Schema, table.FullName, columnSQL, pg.Schema,
pg.Config.ImportSchema, table.FullName, columnSQL, pg.Config.ImportSchema,
sourceTable, where)
_, err = tx.Exec(sql)
@ -355,7 +355,7 @@ func clusterTable(pg *PostGIS, tableName string, srid int, columns []ColumnSpec)
if col.Type.Name() == "GEOMETRY" {
step := log.StartStep(fmt.Sprintf("Indexing %s on geohash", tableName))
sql := fmt.Sprintf(`CREATE INDEX "%s_geom_geohash" ON "%s"."%s" (ST_GeoHash(ST_Transform(ST_SetSRID(Box2D(%s), %d), 4326)))`,
tableName, pg.Schema, tableName, col.Name, srid)
tableName, pg.Config.ImportSchema, tableName, col.Name, srid)
_, err := pg.Db.Exec(sql)
log.StopStep(step)
if err != nil {
@ -364,7 +364,7 @@ func clusterTable(pg *PostGIS, tableName string, srid int, columns []ColumnSpec)
step = log.StartStep(fmt.Sprintf("Clustering %s on geohash", tableName))
sql = fmt.Sprintf(`CLUSTER "%s_geom_geohash" ON "%s"."%s"`,
tableName, pg.Schema, tableName)
tableName, pg.Config.ImportSchema, tableName)
_, err = pg.Db.Exec(sql)
log.StopStep(step)
if err != nil {
@ -376,7 +376,7 @@ func clusterTable(pg *PostGIS, tableName string, srid int, columns []ColumnSpec)
step := log.StartStep(fmt.Sprintf("Analysing %s", tableName))
sql := fmt.Sprintf(`ANALYSE "%s"."%s"`,
pg.Schema, tableName)
pg.Config.ImportSchema, tableName)
_, err := pg.Db.Exec(sql)
log.StopStep(step)
if err != nil {
@ -389,8 +389,6 @@ func clusterTable(pg *PostGIS, tableName string, srid int, columns []ColumnSpec)
type PostGIS struct {
Db *sql.DB
Params string
Schema string
BackupSchema string
Config database.Config
Tables map[string]*TableSpec
GeneralizedTables map[string]*GeneralizedTableSpec
@ -586,7 +584,6 @@ func New(conf database.Config, m *mapping.Mapping) (database.DB, error) {
return nil, err
}
params = disableDefaultSslOnLocalhost(params)
db.Schema, db.BackupSchema = schemasFromConnectionParams(params)
db.Prefix = prefixFromConnectionParams(params)
for name, table := range m.Tables {

View File

@ -7,6 +7,10 @@ import (
func (pg *PostGIS) rotate(source, dest, backup string) error {
defer log.StopStep(log.StartStep(fmt.Sprintf("Rotating tables")))
if err := pg.createSchema(dest); err != nil {
return err
}
if err := pg.createSchema(backup); err != nil {
return err
}
@ -71,11 +75,11 @@ func (pg *PostGIS) rotate(source, dest, backup string) error {
}
func (pg *PostGIS) Deploy() error {
return pg.rotate(pg.Schema, "public", pg.BackupSchema)
return pg.rotate(pg.Config.ImportSchema, pg.Config.ProductionSchema, pg.Config.BackupSchema)
}
func (pg *PostGIS) RevertDeploy() error {
return pg.rotate(pg.BackupSchema, "public", pg.Schema)
return pg.rotate(pg.Config.BackupSchema, pg.Config.ProductionSchema, pg.Config.ImportSchema)
}
func (pg *PostGIS) RemoveBackup() error {
@ -85,7 +89,7 @@ func (pg *PostGIS) RemoveBackup() error {
}
defer rollbackIfTx(&tx)
backup := pg.BackupSchema
backup := pg.Config.BackupSchema
for _, tableName := range pg.tableNames() {
tableName = pg.Prefix + tableName

View File

@ -116,7 +116,7 @@ func NewTableSpec(pg *PostGIS, t *mapping.Table) *TableSpec {
spec := TableSpec{
Name: t.Name,
FullName: pg.Prefix + t.Name,
Schema: pg.Schema,
Schema: pg.Config.ImportSchema,
GeometryType: string(t.Type),
Srid: pg.Config.Srid,
}
@ -140,7 +140,7 @@ func NewGeneralizedTableSpec(pg *PostGIS, t *mapping.GeneralizedTable) *Generali
spec := GeneralizedTableSpec{
Name: t.Name,
FullName: pg.Prefix + t.Name,
Schema: pg.Schema,
Schema: pg.Config.ImportSchema,
Tolerance: t.Tolerance,
Where: t.SqlFilter,
SourceName: t.SourceTableName,

View File

@ -49,7 +49,7 @@ func (tt *bulkTableTx) Begin(tx *sql.Tx) error {
}
tt.Tx = tx
_, err = tx.Exec(fmt.Sprintf(`TRUNCATE TABLE "%s"."%s" RESTART IDENTITY`, tt.Pg.Schema, tt.Table))
_, err = tx.Exec(fmt.Sprintf(`TRUNCATE TABLE "%s"."%s" RESTART IDENTITY`, tt.Pg.Config.ImportSchema, tt.Table))
if err != nil {
return err
}

View File

@ -8,25 +8,6 @@ import (
"sync"
)
func schemasFromConnectionParams(params string) (string, string) {
parts := strings.Fields(params)
var schema, backupSchema string
for _, p := range parts {
if strings.HasPrefix(p, "schema=") {
schema = strings.Replace(p, "schema=", "", 1)
} else if strings.HasPrefix(p, "backupschema=") {
backupSchema = strings.Replace(p, "backupschema=", "", 1)
}
}
if schema == "" {
schema = "import"
}
if backupSchema == "" {
backupSchema = "backup"
}
return schema, backupSchema
}
// disableDefaultSslOnLocalhost adds sslmode=disable to params
// when host is localhost/127.0.0.1 and the sslmode param and
// PGSSLMODE environment are both not set.

View File

@ -53,6 +53,10 @@ func Update(oscFile string, geometryLimiter *limit.Limiter, expireor expire.Expi
dbConf := database.Config{
ConnectionParams: config.BaseOptions.Connection,
Srid: config.BaseOptions.Srid,
// we apply diff imports on the Production schema
ImportSchema: config.BaseOptions.Schemas.Production,
ProductionSchema: config.BaseOptions.Schemas.Production,
BackupSchema: config.BaseOptions.Schemas.Backup,
}
db, err := database.Open(dbConf, tagmapping)
if err != nil {

View File

@ -63,6 +63,9 @@ func Import() {
conf := database.Config{
ConnectionParams: config.BaseOptions.Connection,
Srid: config.BaseOptions.Srid,
ImportSchema: config.BaseOptions.Schemas.Import,
ProductionSchema: config.BaseOptions.Schemas.Production,
BackupSchema: config.BaseOptions.Schemas.Backup,
}
db, err = database.Open(conf, tagmapping)
if err != nil {

View File

@ -25,12 +25,17 @@ def setup():
def teardown():
shutil.rmtree(tmpdir)
drop_test_schemas()
db_conf = {
'host': 'localhost',
}
TEST_SCHEMA_IMPORT = "imposm3testimport"
TEST_SCHEMA_PRODUCTION = "imposm3testpublic"
TEST_SCHEMA_BACKUP = "imposm3testbackup"
def merc_point(lon, lat):
pole = 6378137 * math.pi # 20037508.342789244
@ -50,7 +55,7 @@ def create_geom_in_row(rowdict):
def query_row(db_conf, table, osmid):
conn = psycopg2.connect(**db_conf)
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute('select * from import.%s where osm_id = %%s' % table, [osmid])
cur.execute('select * from %s.%s where osm_id = %%s' % (TEST_SCHEMA_PRODUCTION, table), [osmid])
results = []
for row in cur.fetchall():
create_geom_in_row(row)
@ -66,29 +71,80 @@ def imposm3_import(db_conf, pbf):
conn = pg_db_url(db_conf)
try:
print subprocess.check_output(
print subprocess.check_output((
"../imposm3 import -connection %s -read %s"
" -write"
" -cachedir %s"
" -diff"
" -overwritecache"
" -dbschema-import " + TEST_SCHEMA_IMPORT +
" -optimize"
" -mapping test_mapping.json " % (
" -mapping test_mapping.json ") % (
conn, pbf, tmpdir,
), shell=True)
except subprocess.CalledProcessError, ex:
print ex.output
raise
def imposm3_deploy(db_conf):
conn = pg_db_url(db_conf)
try:
print subprocess.check_output((
"../imposm3 import -connection %s"
" -dbschema-import " + TEST_SCHEMA_IMPORT +
" -dbschema-production " + TEST_SCHEMA_PRODUCTION +
" -dbschema-backup " + TEST_SCHEMA_BACKUP +
" -deployproduction"
" -mapping test_mapping.json ") % (
conn,
), shell=True)
except subprocess.CalledProcessError, ex:
print ex.output
raise
def imposm3_revert_deploy(db_conf):
conn = pg_db_url(db_conf)
try:
print subprocess.check_output((
"../imposm3 import -connection %s"
" -dbschema-import " + TEST_SCHEMA_IMPORT +
" -dbschema-production " + TEST_SCHEMA_PRODUCTION +
" -dbschema-backup " + TEST_SCHEMA_BACKUP +
" -revertdeploy"
" -mapping test_mapping.json ") % (
conn,
), shell=True)
except subprocess.CalledProcessError, ex:
print ex.output
raise
def imposm3_remove_backups(db_conf):
conn = pg_db_url(db_conf)
try:
print subprocess.check_output((
"../imposm3 import -connection %s"
" -dbschema-backup " + TEST_SCHEMA_BACKUP +
" -removebackup"
" -mapping test_mapping.json ") % (
conn,
), shell=True)
except subprocess.CalledProcessError, ex:
print ex.output
raise
def imposm3_update(db_conf, osc):
conn = pg_db_url(db_conf)
try:
print subprocess.check_output(
print subprocess.check_output((
"../imposm3 diff -connection %s"
" -cachedir %s"
" -limitto clipping-3857.geojson"
" -mapping test_mapping.json %s" % (
" -dbschema-production " + TEST_SCHEMA_PRODUCTION +
" -mapping test_mapping.json %s") % (
conn, tmpdir, osc,
), shell=True)
except subprocess.CalledProcessError, ex:
@ -113,11 +169,11 @@ def cache_query(nodes='', ways='', relations='', deps='', full=''):
print out
return json.loads(out)
def table_exists(table):
def table_exists(table, schema=TEST_SCHEMA_IMPORT):
conn = psycopg2.connect(**db_conf)
cur = conn.cursor()
cur.execute("SELECT EXISTS(SELECT * FROM information_schema.tables WHERE table_name='%s' AND table_schema='%s')"
% (table, 'import'))
% (table, schema))
return cur.fetchone()[0]
def assert_missing_node(id):
@ -140,19 +196,29 @@ def assert_cached_way(id):
if not data['ways'][str(id)]:
raise AssertionError('way %d not found' % id)
def drop_import_schema():
def drop_test_schemas():
conn = psycopg2.connect(**db_conf)
cur = conn.cursor()
cur.execute("DROP SCHEMA IF EXISTS import CASCADE")
cur.execute("DROP SCHEMA IF EXISTS %s CASCADE" % TEST_SCHEMA_IMPORT)
cur.execute("DROP SCHEMA IF EXISTS %s CASCADE" % TEST_SCHEMA_PRODUCTION)
cur.execute("DROP SCHEMA IF EXISTS %s CASCADE" % TEST_SCHEMA_BACKUP)
conn.commit()
#######################################################################
def test_import():
"""Import succeeds"""
drop_import_schema()
assert not table_exists('osm_roads')
drop_test_schemas()
assert not table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT)
imposm3_import(db_conf, './build/test.pbf')
assert table_exists('osm_roads')
assert table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT)
def test_deploy():
"""Deploy succeeds"""
assert not table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION)
imposm3_deploy(db_conf)
assert table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION)
assert not table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT)
#######################################################################
def test_imported_landusage():
@ -436,3 +502,43 @@ def test_duplicate_ids2():
assert query_row(db_conf, 'osm_buildings', -51001) == None
assert query_row(db_conf, 'osm_buildings', -51011)['type'] == 'mp'
assert query_row(db_conf, 'osm_buildings', 51011) == None
#######################################################################
def test_deploy_and_revert_deploy():
"""Revert deploy succeeds"""
assert not table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT)
assert table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION)
assert not table_exists('osm_roads', schema=TEST_SCHEMA_BACKUP)
# import again to have a new import schema
imposm3_import(db_conf, './build/test.pbf')
assert table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT)
imposm3_deploy(db_conf)
assert not table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT)
assert table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION)
assert table_exists('osm_roads', schema=TEST_SCHEMA_BACKUP)
imposm3_revert_deploy(db_conf)
assert table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT)
assert table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION)
assert not table_exists('osm_roads', schema=TEST_SCHEMA_BACKUP)
def test_remove_backup():
"""Remove backup succeeds"""
assert table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT)
assert table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION)
assert not table_exists('osm_roads', schema=TEST_SCHEMA_BACKUP)
imposm3_deploy(db_conf)
assert not table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT)
assert table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION)
assert table_exists('osm_roads', schema=TEST_SCHEMA_BACKUP)
imposm3_remove_backups(db_conf)
assert not table_exists('osm_roads', schema=TEST_SCHEMA_IMPORT)
assert table_exists('osm_roads', schema=TEST_SCHEMA_PRODUCTION)
assert not table_exists('osm_roads', schema=TEST_SCHEMA_BACKUP)