force synchronous inserts during diff import

this makes sure that the source geometries for generalizes geometries
are inserted
master
Oliver Tonnhofer 2013-11-01 11:25:41 +01:00
parent 5203225bcd
commit 35e66529a7
2 changed files with 60 additions and 78 deletions

View File

@ -15,34 +15,39 @@ func newTxRouter(pg *PostGIS, bulkImport bool) *TxRouter {
Tables: make(map[string]TableTx), Tables: make(map[string]TableTx),
} }
var tx *sql.Tx if bulkImport {
var err error for tableName, table := range pg.Tables {
if !bulkImport { tt := NewBulkTableTx(pg, table)
tx, err = pg.Db.Begin() err := tt.Begin(nil)
if err != nil {
panic(err) // TODO
}
txr.Tables[tableName] = tt
}
} else {
tx, err := pg.Db.Begin()
if err != nil { if err != nil {
panic(err) // TODO panic(err) // TODO
} }
txr.tx = tx txr.tx = tx
} for tableName, table := range pg.Tables {
for tableName, table := range pg.Tables { tt := NewSynchronousTableTx(pg, table.FullName, table)
tt := NewTableTx(pg, table, bulkImport) err := tt.Begin(tx)
err := tt.Begin(tx) if err != nil {
if err != nil { panic(err) // TODO
panic(err) // TODO }
} txr.Tables[tableName] = tt
txr.Tables[tableName] = tt }
} for tableName, table := range pg.GeneralizedTables {
if !bulkImport { tt := NewSynchronousTableTx(pg, table.FullName, table)
for tableName, table := range pg.GeneralizedTables {
tt := NewGeneralizedTableTx(pg, table)
err := tt.Begin(tx) err := tt.Begin(tx)
if err != nil { if err != nil {
panic(err) // TODO panic(err) // TODO
} }
txr.Tables[tableName] = tt txr.Tables[tableName] = tt
} }
} }
return &txr return &txr
} }

View File

@ -15,35 +15,31 @@ type TableTx interface {
Rollback() Rollback()
} }
type tableTx struct { type bulkTableTx struct {
Pg *PostGIS Pg *PostGIS
Tx *sql.Tx Tx *sql.Tx
Table string Table string
Spec *TableSpec Spec *TableSpec
InsertStmt *sql.Stmt InsertStmt *sql.Stmt
DeleteStmt *sql.Stmt
InsertSql string InsertSql string
DeleteSql string
bulkImport bool
wg *sync.WaitGroup wg *sync.WaitGroup
rows chan []interface{} rows chan []interface{}
} }
func NewTableTx(pg *PostGIS, spec *TableSpec, bulkImport bool) TableTx { func NewBulkTableTx(pg *PostGIS, spec *TableSpec) TableTx {
tt := &tableTx{ tt := &bulkTableTx{
Pg: pg, Pg: pg,
Table: spec.FullName, Table: spec.FullName,
Spec: spec, Spec: spec,
wg: &sync.WaitGroup{}, wg: &sync.WaitGroup{},
rows: make(chan []interface{}, 64), rows: make(chan []interface{}, 64),
bulkImport: bulkImport,
} }
tt.wg.Add(1) tt.wg.Add(1)
go tt.loop() go tt.loop()
return tt return tt
} }
func (tt *tableTx) Begin(tx *sql.Tx) error { func (tt *bulkTableTx) Begin(tx *sql.Tx) error {
var err error var err error
if tx == nil { if tx == nil {
tx, err = tt.Pg.Db.Begin() tx, err = tt.Pg.Db.Begin()
@ -53,18 +49,12 @@ func (tt *tableTx) Begin(tx *sql.Tx) error {
} }
tt.Tx = tx tt.Tx = tx
if tt.bulkImport { _, err = tx.Exec(fmt.Sprintf(`TRUNCATE TABLE "%s"."%s" RESTART IDENTITY`, tt.Pg.Schema, tt.Table))
_, err = tx.Exec(fmt.Sprintf(`TRUNCATE TABLE "%s"."%s" RESTART IDENTITY`, tt.Pg.Schema, tt.Table)) if err != nil {
if err != nil { return err
return err
}
} }
if tt.bulkImport { tt.InsertSql = tt.Spec.CopySQL()
tt.InsertSql = tt.Spec.CopySQL()
} else {
tt.InsertSql = tt.Spec.InsertSQL()
}
stmt, err := tt.Tx.Prepare(tt.InsertSql) stmt, err := tt.Tx.Prepare(tt.InsertSql)
if err != nil { if err != nil {
@ -72,26 +62,15 @@ func (tt *tableTx) Begin(tx *sql.Tx) error {
} }
tt.InsertStmt = stmt tt.InsertStmt = stmt
if !tt.bulkImport {
// bulkImport creates COPY FROM STDIN stmt that doesn't
// permit other stmt
tt.DeleteSql = tt.Spec.DeleteSQL()
stmt, err = tt.Tx.Prepare(tt.DeleteSql)
if err != nil {
return &SQLError{tt.DeleteSql, err}
}
tt.DeleteStmt = stmt
}
return nil return nil
} }
func (tt *tableTx) Insert(row []interface{}) error { func (tt *bulkTableTx) Insert(row []interface{}) error {
tt.rows <- row tt.rows <- row
return nil return nil
} }
func (tt *tableTx) loop() { func (tt *bulkTableTx) loop() {
for row := range tt.rows { for row := range tt.rows {
_, err := tt.InsertStmt.Exec(row...) _, err := tt.InsertStmt.Exec(row...)
if err != nil { if err != nil {
@ -102,25 +81,18 @@ func (tt *tableTx) loop() {
tt.wg.Done() tt.wg.Done()
} }
func (tt *tableTx) Delete(id int64) error { func (tt *bulkTableTx) Delete(id int64) error {
if tt.bulkImport { panic("unable to delete in bulkImport mode")
panic("unable to delete in bulkImport mode")
}
_, err := tt.DeleteStmt.Exec(id)
if err != nil {
return &SQLInsertError{SQLError{tt.DeleteSql, err}, id}
}
return nil
} }
func (tt *tableTx) End() { func (tt *bulkTableTx) End() {
close(tt.rows) close(tt.rows)
tt.wg.Wait() tt.wg.Wait()
} }
func (tt *tableTx) Commit() error { func (tt *bulkTableTx) Commit() error {
tt.End() tt.End()
if tt.bulkImport && tt.InsertStmt != nil { if tt.InsertStmt != nil {
_, err := tt.InsertStmt.Exec() _, err := tt.InsertStmt.Exec()
if err != nil { if err != nil {
return err return err
@ -134,31 +106,36 @@ func (tt *tableTx) Commit() error {
return nil return nil
} }
func (tt *tableTx) Rollback() { func (tt *bulkTableTx) Rollback() {
rollbackIfTx(&tt.Tx) rollbackIfTx(&tt.Tx)
} }
type generalizedTableTx struct { type syncTableTx struct {
Pg *PostGIS Pg *PostGIS
Tx *sql.Tx Tx *sql.Tx
Table string Table string
Spec *GeneralizedTableSpec Spec tableSpec
InsertStmt *sql.Stmt InsertStmt *sql.Stmt
DeleteStmt *sql.Stmt DeleteStmt *sql.Stmt
InsertSql string InsertSql string
DeleteSql string DeleteSql string
} }
func NewGeneralizedTableTx(pg *PostGIS, spec *GeneralizedTableSpec) TableTx { type tableSpec interface {
tt := &generalizedTableTx{ InsertSQL() string
DeleteSQL() string
}
func NewSynchronousTableTx(pg *PostGIS, tableName string, spec tableSpec) TableTx {
tt := &syncTableTx{
Pg: pg, Pg: pg,
Table: spec.FullName, Table: tableName,
Spec: spec, Spec: spec,
} }
return tt return tt
} }
func (tt *generalizedTableTx) Begin(tx *sql.Tx) error { func (tt *syncTableTx) Begin(tx *sql.Tx) error {
var err error var err error
if tx == nil { if tx == nil {
tx, err = tt.Pg.Db.Begin() tx, err = tt.Pg.Db.Begin()
@ -186,15 +163,15 @@ func (tt *generalizedTableTx) Begin(tx *sql.Tx) error {
return nil return nil
} }
func (tt *generalizedTableTx) Insert(row []interface{}) error { func (tt *syncTableTx) Insert(row []interface{}) error {
_, err := tt.InsertStmt.Exec(row[0]) _, err := tt.InsertStmt.Exec(row...)
if err != nil { if err != nil {
return &SQLInsertError{SQLError{tt.InsertSql, err}, row} return &SQLInsertError{SQLError{tt.InsertSql, err}, row}
} }
return nil return nil
} }
func (tt *generalizedTableTx) Delete(id int64) error { func (tt *syncTableTx) Delete(id int64) error {
_, err := tt.DeleteStmt.Exec(id) _, err := tt.DeleteStmt.Exec(id)
if err != nil { if err != nil {
return &SQLInsertError{SQLError{tt.DeleteSql, err}, id} return &SQLInsertError{SQLError{tt.DeleteSql, err}, id}
@ -202,10 +179,10 @@ func (tt *generalizedTableTx) Delete(id int64) error {
return nil return nil
} }
func (tt *generalizedTableTx) End() { func (tt *syncTableTx) End() {
} }
func (tt *generalizedTableTx) Commit() error { func (tt *syncTableTx) Commit() error {
err := tt.Tx.Commit() err := tt.Tx.Commit()
if err != nil { if err != nil {
return err return err
@ -214,6 +191,6 @@ func (tt *generalizedTableTx) Commit() error {
return nil return nil
} }
func (tt *generalizedTableTx) Rollback() { func (tt *syncTableTx) Rollback() {
rollbackIfTx(&tt.Tx) rollbackIfTx(&tt.Tx)
} }