reorganized postgis package

master
Oliver Tonnhofer 2013-06-11 10:42:32 +02:00
parent bad86777af
commit aa2c24b8d2
4 changed files with 339 additions and 320 deletions

View File

@ -13,126 +13,6 @@ import (
var log = logging.NewLogger("PostGIS")
type ColumnSpec struct {
Name string
FieldType mapping.FieldType
Type ColumnType
}
type TableSpec struct {
Name string
Schema string
Columns []ColumnSpec
GeometryType string
Srid int
}
type GeneralizedTableSpec struct {
Name string
SourceName string
Source *TableSpec
SourceGeneralized *GeneralizedTableSpec
Tolerance float64
Where string
created bool
}
func (col *ColumnSpec) AsSQL() string {
return fmt.Sprintf("\"%s\" %s", col.Name, col.Type.Name())
}
func (spec *TableSpec) CreateTableSQL() string {
cols := []string{
"id SERIAL PRIMARY KEY",
}
for _, col := range spec.Columns {
if col.Type.Name() == "GEOMETRY" {
continue
}
cols = append(cols, col.AsSQL())
}
columnSQL := strings.Join(cols, ",\n")
return fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS "%s"."%s" (
%s
);`,
spec.Schema,
spec.Name,
columnSQL,
)
}
func (spec *TableSpec) InsertSQL() string {
var cols []string
var vars []string
for _, col := range spec.Columns {
cols = append(cols, "\""+col.Name+"\"")
vars = append(vars,
col.Type.PrepareInsertSql(len(vars)+1, spec))
}
columns := strings.Join(cols, ", ")
placeholders := strings.Join(vars, ", ")
return fmt.Sprintf(`INSERT INTO "%s"."%s" (%s) VALUES (%s)`,
spec.Schema,
spec.Name,
columns,
placeholders,
)
}
func (spec *TableSpec) DeleteSQL() string {
var idColumName string
for _, col := range spec.Columns {
if col.FieldType.Name == "id" {
idColumName = col.Name
break
}
}
if idColumName == "" {
panic("missing id column")
}
return fmt.Sprintf(`DELETE FROM "%s"."%s" WHERE "%s" = $1`,
spec.Schema,
spec.Name,
idColumName,
)
}
func NewTableSpec(pg *PostGIS, t *mapping.Table) *TableSpec {
spec := TableSpec{
Name: pg.Prefix + t.Name,
Schema: pg.Schema,
GeometryType: t.Type,
Srid: pg.Config.Srid,
}
for _, field := range t.Fields {
fieldType := field.FieldType()
if fieldType == nil {
continue
}
pgType, ok := pgTypes[fieldType.GoType]
if !ok {
log.Errorf("unhandled field type %v, using string type", fieldType)
pgType = pgTypes["string"]
}
col := ColumnSpec{field.Name, *fieldType, pgType}
spec.Columns = append(spec.Columns, col)
}
return &spec
}
func NewGeneralizedTableSpec(pg *PostGIS, t *mapping.GeneralizedTable) *GeneralizedTableSpec {
spec := GeneralizedTableSpec{
Name: pg.Prefix + t.Name,
Tolerance: t.Tolerance,
Where: t.SqlFilter,
SourceName: t.SourceTableName,
}
return &spec
}
type SQLError struct {
query string
originalError error
@ -208,72 +88,6 @@ func (pg *PostGIS) createSchema(schema string) error {
return nil
}
type PostGIS struct {
Db *sql.DB
Schema string
BackupSchema string
Config database.Config
Tables map[string]*TableSpec
GeneralizedTables map[string]*GeneralizedTableSpec
Prefix string
}
func schemasFromConnectionParams(params string) (string, string) {
parts := strings.Fields(params)
var schema, backupSchema string
for _, p := range parts {
if strings.HasPrefix(p, "schema=") {
schema = strings.Replace(p, "schema=", "", 1)
} else if strings.HasPrefix(p, "backupschema=") {
backupSchema = strings.Replace(p, "backupschema=", "", 1)
}
}
if schema == "" {
schema = "import"
}
if backupSchema == "" {
backupSchema = "backup"
}
return schema, backupSchema
}
func prefixFromConnectionParams(params string) string {
parts := strings.Fields(params)
var prefix string
for _, p := range parts {
if strings.HasPrefix(p, "prefix=") {
prefix = strings.Replace(p, "prefix=", "", 1)
break
}
}
if prefix == "" {
prefix = "osm_"
}
if prefix[len(prefix)-1] != '_' {
prefix = prefix + "_"
}
return prefix
}
func (pg *PostGIS) Open() error {
var err error
params, err := pq.ParseURL(pg.Config.ConnectionParams)
if err != nil {
return err
}
pg.Db, err = sql.Open("postgres", params)
if err != nil {
return err
}
// check that the connection actually works
err = pg.Db.Ping()
if err != nil {
return err
}
return nil
}
func (pg *PostGIS) InsertBatch(table string, rows [][]interface{}) error {
spec, ok := pg.Tables[table]
if !ok {
@ -364,140 +178,6 @@ func (pg *PostGIS) TableNames() []string {
return names
}
func tableExists(tx *sql.Tx, schema, table string) (bool, error) {
var exists bool
sql := fmt.Sprintf(`SELECT EXISTS(SELECT * FROM information_schema.tables WHERE table_name='%s' AND table_schema='%s')`,
table, schema)
row := tx.QueryRow(sql)
err := row.Scan(&exists)
if err != nil {
return false, err
}
return exists, nil
}
func dropTableIfExists(tx *sql.Tx, schema, table string) error {
sql := fmt.Sprintf(`DROP TABLE IF EXISTS "%s"."%s"`, schema, table)
_, err := tx.Exec(sql)
return err
}
func (pg *PostGIS) rotate(source, dest, backup string) error {
defer log.StopStep(log.StartStep(fmt.Sprintf("Rotating tables")))
if err := pg.createSchema(backup); err != nil {
return err
}
tx, err := pg.Db.Begin()
if err != nil {
return err
}
defer rollbackIfTx(&tx)
for _, tableName := range pg.TableNames() {
tableName = pg.Prefix + tableName
log.Printf("Rotating %s from %s -> %s -> %s", tableName, source, dest, backup)
backupExists, err := tableExists(tx, backup, tableName)
if err != nil {
return err
}
sourceExists, err := tableExists(tx, source, tableName)
if err != nil {
return err
}
destExists, err := tableExists(tx, dest, tableName)
if err != nil {
return err
}
if !sourceExists {
log.Warnf("skipping rotate of %s, table does not exists in %s", tableName, source)
continue
}
if destExists {
log.Printf("backup of %s, to %s", tableName, backup)
if backupExists {
err = dropTableIfExists(tx, backup, tableName)
if err != nil {
return err
}
}
sql := fmt.Sprintf(`ALTER TABLE "%s"."%s" SET SCHEMA "%s"`, dest, tableName, backup)
_, err = tx.Exec(sql)
if err != nil {
return err
}
}
sql := fmt.Sprintf(`ALTER TABLE "%s"."%s" SET SCHEMA "%s"`, source, tableName, dest)
_, err = tx.Exec(sql)
if err != nil {
return err
}
}
err = tx.Commit()
if err != nil {
return err
}
tx = nil // set nil to prevent rollback
return nil
}
func (pg *PostGIS) Deploy() error {
return pg.rotate(pg.Schema, "public", pg.BackupSchema)
}
func (pg *PostGIS) RevertDeploy() error {
return pg.rotate(pg.BackupSchema, "public", pg.Schema)
}
func rollbackIfTx(tx **sql.Tx) {
if *tx != nil {
if err := tx.Rollback(); err != nil {
log.Fatal("rollback failed", err)
}
}
}
func (pg *PostGIS) RemoveBackup() error {
tx, err := pg.Db.Begin()
if err != nil {
return err
}
defer rollbackIfTx(&tx)
backup := pg.BackupSchema
for _, tableName := range pg.TableNames() {
tableName = pg.Prefix + tableName
backupExists, err := tableExists(tx, backup, tableName)
if err != nil {
return err
}
if backupExists {
log.Printf("removing backup of %s from %s", tableName, backup)
err = dropTableIfExists(tx, backup, tableName)
if err != nil {
return err
}
}
}
err = tx.Commit()
if err != nil {
return err
}
tx = nil // set nil to prevent rollback
return nil
}
// Finish creates spatial indices on all tables.
func (pg *PostGIS) Finish() error {
defer log.StopStep(log.StartStep(fmt.Sprintf("Creating geometry indices")))
@ -655,6 +335,35 @@ func (pg *PostGIS) generalizeTable(table *GeneralizedTableSpec) error {
return nil
}
type PostGIS struct {
Db *sql.DB
Schema string
BackupSchema string
Config database.Config
Tables map[string]*TableSpec
GeneralizedTables map[string]*GeneralizedTableSpec
Prefix string
}
func (pg *PostGIS) Open() error {
var err error
params, err := pq.ParseURL(pg.Config.ConnectionParams)
if err != nil {
return err
}
pg.Db, err = sql.Open("postgres", params)
if err != nil {
return err
}
// check that the connection actually works
err = pg.Db.Ping()
if err != nil {
return err
}
return nil
}
func New(conf database.Config, m *mapping.Mapping) (database.DB, error) {
db := &PostGIS{}
db.Tables = make(map[string]*TableSpec)

113
database/postgis/rotate.go Normal file
View File

@ -0,0 +1,113 @@
package postgis
import (
"fmt"
)
func (pg *PostGIS) rotate(source, dest, backup string) error {
defer log.StopStep(log.StartStep(fmt.Sprintf("Rotating tables")))
if err := pg.createSchema(backup); err != nil {
return err
}
tx, err := pg.Db.Begin()
if err != nil {
return err
}
defer rollbackIfTx(&tx)
for _, tableName := range pg.TableNames() {
tableName = pg.Prefix + tableName
log.Printf("Rotating %s from %s -> %s -> %s", tableName, source, dest, backup)
backupExists, err := tableExists(tx, backup, tableName)
if err != nil {
return err
}
sourceExists, err := tableExists(tx, source, tableName)
if err != nil {
return err
}
destExists, err := tableExists(tx, dest, tableName)
if err != nil {
return err
}
if !sourceExists {
log.Warnf("skipping rotate of %s, table does not exists in %s", tableName, source)
continue
}
if destExists {
log.Printf("backup of %s, to %s", tableName, backup)
if backupExists {
err = dropTableIfExists(tx, backup, tableName)
if err != nil {
return err
}
}
sql := fmt.Sprintf(`ALTER TABLE "%s"."%s" SET SCHEMA "%s"`, dest, tableName, backup)
_, err = tx.Exec(sql)
if err != nil {
return err
}
}
sql := fmt.Sprintf(`ALTER TABLE "%s"."%s" SET SCHEMA "%s"`, source, tableName, dest)
_, err = tx.Exec(sql)
if err != nil {
return err
}
}
err = tx.Commit()
if err != nil {
return err
}
tx = nil // set nil to prevent rollback
return nil
}
func (pg *PostGIS) Deploy() error {
return pg.rotate(pg.Schema, "public", pg.BackupSchema)
}
func (pg *PostGIS) RevertDeploy() error {
return pg.rotate(pg.BackupSchema, "public", pg.Schema)
}
func (pg *PostGIS) RemoveBackup() error {
tx, err := pg.Db.Begin()
if err != nil {
return err
}
defer rollbackIfTx(&tx)
backup := pg.BackupSchema
for _, tableName := range pg.TableNames() {
tableName = pg.Prefix + tableName
backupExists, err := tableExists(tx, backup, tableName)
if err != nil {
return err
}
if backupExists {
log.Printf("removing backup of %s from %s", tableName, backup)
err = dropTableIfExists(tx, backup, tableName)
if err != nil {
return err
}
}
}
err = tx.Commit()
if err != nil {
return err
}
tx = nil // set nil to prevent rollback
return nil
}

127
database/postgis/spec.go Normal file
View File

@ -0,0 +1,127 @@
package postgis
import (
"fmt"
"goposm/mapping"
"strings"
)
type ColumnSpec struct {
Name string
FieldType mapping.FieldType
Type ColumnType
}
type TableSpec struct {
Name string
Schema string
Columns []ColumnSpec
GeometryType string
Srid int
}
type GeneralizedTableSpec struct {
Name string
SourceName string
Source *TableSpec
SourceGeneralized *GeneralizedTableSpec
Tolerance float64
Where string
created bool
}
func (col *ColumnSpec) AsSQL() string {
return fmt.Sprintf("\"%s\" %s", col.Name, col.Type.Name())
}
func (spec *TableSpec) CreateTableSQL() string {
cols := []string{
"id SERIAL PRIMARY KEY",
}
for _, col := range spec.Columns {
if col.Type.Name() == "GEOMETRY" {
continue
}
cols = append(cols, col.AsSQL())
}
columnSQL := strings.Join(cols, ",\n")
return fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS "%s"."%s" (
%s
);`,
spec.Schema,
spec.Name,
columnSQL,
)
}
func (spec *TableSpec) InsertSQL() string {
var cols []string
var vars []string
for _, col := range spec.Columns {
cols = append(cols, "\""+col.Name+"\"")
vars = append(vars,
col.Type.PrepareInsertSql(len(vars)+1, spec))
}
columns := strings.Join(cols, ", ")
placeholders := strings.Join(vars, ", ")
return fmt.Sprintf(`INSERT INTO "%s"."%s" (%s) VALUES (%s)`,
spec.Schema,
spec.Name,
columns,
placeholders,
)
}
func (spec *TableSpec) DeleteSQL() string {
var idColumName string
for _, col := range spec.Columns {
if col.FieldType.Name == "id" {
idColumName = col.Name
break
}
}
if idColumName == "" {
panic("missing id column")
}
return fmt.Sprintf(`DELETE FROM "%s"."%s" WHERE "%s" = $1`,
spec.Schema,
spec.Name,
idColumName,
)
}
func NewTableSpec(pg *PostGIS, t *mapping.Table) *TableSpec {
spec := TableSpec{
Name: pg.Prefix + t.Name,
Schema: pg.Schema,
GeometryType: t.Type,
Srid: pg.Config.Srid,
}
for _, field := range t.Fields {
fieldType := field.FieldType()
if fieldType == nil {
continue
}
pgType, ok := pgTypes[fieldType.GoType]
if !ok {
log.Errorf("unhandled field type %v, using string type", fieldType)
pgType = pgTypes["string"]
}
col := ColumnSpec{field.Name, *fieldType, pgType}
spec.Columns = append(spec.Columns, col)
}
return &spec
}
func NewGeneralizedTableSpec(pg *PostGIS, t *mapping.GeneralizedTable) *GeneralizedTableSpec {
spec := GeneralizedTableSpec{
Name: pg.Prefix + t.Name,
Tolerance: t.Tolerance,
Where: t.SqlFilter,
SourceName: t.SourceTableName,
}
return &spec
}

70
database/postgis/util.go Normal file
View File

@ -0,0 +1,70 @@
package postgis
import (
"database/sql"
"fmt"
"strings"
)
func schemasFromConnectionParams(params string) (string, string) {
parts := strings.Fields(params)
var schema, backupSchema string
for _, p := range parts {
if strings.HasPrefix(p, "schema=") {
schema = strings.Replace(p, "schema=", "", 1)
} else if strings.HasPrefix(p, "backupschema=") {
backupSchema = strings.Replace(p, "backupschema=", "", 1)
}
}
if schema == "" {
schema = "import"
}
if backupSchema == "" {
backupSchema = "backup"
}
return schema, backupSchema
}
func prefixFromConnectionParams(params string) string {
parts := strings.Fields(params)
var prefix string
for _, p := range parts {
if strings.HasPrefix(p, "prefix=") {
prefix = strings.Replace(p, "prefix=", "", 1)
break
}
}
if prefix == "" {
prefix = "osm_"
}
if prefix[len(prefix)-1] != '_' {
prefix = prefix + "_"
}
return prefix
}
func tableExists(tx *sql.Tx, schema, table string) (bool, error) {
var exists bool
sql := fmt.Sprintf(`SELECT EXISTS(SELECT * FROM information_schema.tables WHERE table_name='%s' AND table_schema='%s')`,
table, schema)
row := tx.QueryRow(sql)
err := row.Scan(&exists)
if err != nil {
return false, err
}
return exists, nil
}
func dropTableIfExists(tx *sql.Tx, schema, table string) error {
sql := fmt.Sprintf(`DROP TABLE IF EXISTS "%s"."%s"`, schema, table)
_, err := tx.Exec(sql)
return err
}
func rollbackIfTx(tx **sql.Tx) {
if *tx != nil {
if err := tx.Rollback(); err != nil {
log.Fatal("rollback failed", err)
}
}
}