From 35e66529a7789bc5de3f3fff7b3354617a6ecb6b Mon Sep 17 00:00:00 2001 From: Oliver Tonnhofer Date: Fri, 1 Nov 2013 11:25:41 +0100 Subject: [PATCH] force synchronous inserts during diff import this makes sure that the source geometries for generalizes geometries are inserted --- database/postgis/router.go | 39 ++++++++------- database/postgis/tx.go | 99 +++++++++++++++----------------------- 2 files changed, 60 insertions(+), 78 deletions(-) diff --git a/database/postgis/router.go b/database/postgis/router.go index 86c5919..2f27adc 100644 --- a/database/postgis/router.go +++ b/database/postgis/router.go @@ -15,34 +15,39 @@ func newTxRouter(pg *PostGIS, bulkImport bool) *TxRouter { Tables: make(map[string]TableTx), } - var tx *sql.Tx - var err error - if !bulkImport { - tx, err = pg.Db.Begin() + if bulkImport { + for tableName, table := range pg.Tables { + tt := NewBulkTableTx(pg, table) + err := tt.Begin(nil) + if err != nil { + panic(err) // TODO + } + txr.Tables[tableName] = tt + } + } else { + tx, err := pg.Db.Begin() if err != nil { panic(err) // TODO } txr.tx = tx - } - for tableName, table := range pg.Tables { - tt := NewTableTx(pg, table, bulkImport) - err := tt.Begin(tx) - if err != nil { - panic(err) // TODO - } - txr.Tables[tableName] = tt - } - if !bulkImport { - for tableName, table := range pg.GeneralizedTables { - tt := NewGeneralizedTableTx(pg, table) + for tableName, table := range pg.Tables { + tt := NewSynchronousTableTx(pg, table.FullName, table) + err := tt.Begin(tx) + if err != nil { + panic(err) // TODO + } + txr.Tables[tableName] = tt + } + for tableName, table := range pg.GeneralizedTables { + tt := NewSynchronousTableTx(pg, table.FullName, table) err := tt.Begin(tx) if err != nil { panic(err) // TODO } txr.Tables[tableName] = tt } - } + return &txr } diff --git a/database/postgis/tx.go b/database/postgis/tx.go index ac94410..d699cbe 100644 --- a/database/postgis/tx.go +++ b/database/postgis/tx.go @@ -15,35 +15,31 @@ type TableTx interface { Rollback() } -type tableTx struct { +type bulkTableTx struct { Pg *PostGIS Tx *sql.Tx Table string Spec *TableSpec InsertStmt *sql.Stmt - DeleteStmt *sql.Stmt InsertSql string - DeleteSql string - bulkImport bool wg *sync.WaitGroup rows chan []interface{} } -func NewTableTx(pg *PostGIS, spec *TableSpec, bulkImport bool) TableTx { - tt := &tableTx{ - Pg: pg, - Table: spec.FullName, - Spec: spec, - wg: &sync.WaitGroup{}, - rows: make(chan []interface{}, 64), - bulkImport: bulkImport, +func NewBulkTableTx(pg *PostGIS, spec *TableSpec) TableTx { + tt := &bulkTableTx{ + Pg: pg, + Table: spec.FullName, + Spec: spec, + wg: &sync.WaitGroup{}, + rows: make(chan []interface{}, 64), } tt.wg.Add(1) go tt.loop() return tt } -func (tt *tableTx) Begin(tx *sql.Tx) error { +func (tt *bulkTableTx) Begin(tx *sql.Tx) error { var err error if tx == nil { tx, err = tt.Pg.Db.Begin() @@ -53,18 +49,12 @@ func (tt *tableTx) Begin(tx *sql.Tx) error { } tt.Tx = tx - if tt.bulkImport { - _, err = tx.Exec(fmt.Sprintf(`TRUNCATE TABLE "%s"."%s" RESTART IDENTITY`, tt.Pg.Schema, tt.Table)) - if err != nil { - return err - } + _, err = tx.Exec(fmt.Sprintf(`TRUNCATE TABLE "%s"."%s" RESTART IDENTITY`, tt.Pg.Schema, tt.Table)) + if err != nil { + return err } - if tt.bulkImport { - tt.InsertSql = tt.Spec.CopySQL() - } else { - tt.InsertSql = tt.Spec.InsertSQL() - } + tt.InsertSql = tt.Spec.CopySQL() stmt, err := tt.Tx.Prepare(tt.InsertSql) if err != nil { @@ -72,26 +62,15 @@ func (tt *tableTx) Begin(tx *sql.Tx) error { } tt.InsertStmt = stmt - if !tt.bulkImport { - // bulkImport creates COPY FROM STDIN stmt that doesn't - // permit other stmt - tt.DeleteSql = tt.Spec.DeleteSQL() - stmt, err = tt.Tx.Prepare(tt.DeleteSql) - if err != nil { - return &SQLError{tt.DeleteSql, err} - } - tt.DeleteStmt = stmt - } - return nil } -func (tt *tableTx) Insert(row []interface{}) error { +func (tt *bulkTableTx) Insert(row []interface{}) error { tt.rows <- row return nil } -func (tt *tableTx) loop() { +func (tt *bulkTableTx) loop() { for row := range tt.rows { _, err := tt.InsertStmt.Exec(row...) if err != nil { @@ -102,25 +81,18 @@ func (tt *tableTx) loop() { tt.wg.Done() } -func (tt *tableTx) Delete(id int64) error { - if tt.bulkImport { - panic("unable to delete in bulkImport mode") - } - _, err := tt.DeleteStmt.Exec(id) - if err != nil { - return &SQLInsertError{SQLError{tt.DeleteSql, err}, id} - } - return nil +func (tt *bulkTableTx) Delete(id int64) error { + panic("unable to delete in bulkImport mode") } -func (tt *tableTx) End() { +func (tt *bulkTableTx) End() { close(tt.rows) tt.wg.Wait() } -func (tt *tableTx) Commit() error { +func (tt *bulkTableTx) Commit() error { tt.End() - if tt.bulkImport && tt.InsertStmt != nil { + if tt.InsertStmt != nil { _, err := tt.InsertStmt.Exec() if err != nil { return err @@ -134,31 +106,36 @@ func (tt *tableTx) Commit() error { return nil } -func (tt *tableTx) Rollback() { +func (tt *bulkTableTx) Rollback() { rollbackIfTx(&tt.Tx) } -type generalizedTableTx struct { +type syncTableTx struct { Pg *PostGIS Tx *sql.Tx Table string - Spec *GeneralizedTableSpec + Spec tableSpec InsertStmt *sql.Stmt DeleteStmt *sql.Stmt InsertSql string DeleteSql string } -func NewGeneralizedTableTx(pg *PostGIS, spec *GeneralizedTableSpec) TableTx { - tt := &generalizedTableTx{ +type tableSpec interface { + InsertSQL() string + DeleteSQL() string +} + +func NewSynchronousTableTx(pg *PostGIS, tableName string, spec tableSpec) TableTx { + tt := &syncTableTx{ Pg: pg, - Table: spec.FullName, + Table: tableName, Spec: spec, } return tt } -func (tt *generalizedTableTx) Begin(tx *sql.Tx) error { +func (tt *syncTableTx) Begin(tx *sql.Tx) error { var err error if tx == nil { tx, err = tt.Pg.Db.Begin() @@ -186,15 +163,15 @@ func (tt *generalizedTableTx) Begin(tx *sql.Tx) error { return nil } -func (tt *generalizedTableTx) Insert(row []interface{}) error { - _, err := tt.InsertStmt.Exec(row[0]) +func (tt *syncTableTx) Insert(row []interface{}) error { + _, err := tt.InsertStmt.Exec(row...) if err != nil { return &SQLInsertError{SQLError{tt.InsertSql, err}, row} } return nil } -func (tt *generalizedTableTx) Delete(id int64) error { +func (tt *syncTableTx) Delete(id int64) error { _, err := tt.DeleteStmt.Exec(id) if err != nil { return &SQLInsertError{SQLError{tt.DeleteSql, err}, id} @@ -202,10 +179,10 @@ func (tt *generalizedTableTx) Delete(id int64) error { return nil } -func (tt *generalizedTableTx) End() { +func (tt *syncTableTx) End() { } -func (tt *generalizedTableTx) Commit() error { +func (tt *syncTableTx) Commit() error { err := tt.Tx.Commit() if err != nil { return err @@ -214,6 +191,6 @@ func (tt *generalizedTableTx) Commit() error { return nil } -func (tt *generalizedTableTx) Rollback() { +func (tt *syncTableTx) Rollback() { rollbackIfTx(&tt.Tx) }