support non-bulk imports for diff mode

master
Oliver Tonnhofer 2013-06-21 10:33:49 +02:00
parent cf984157dd
commit c8492b06f7
5 changed files with 70 additions and 38 deletions

View File

@ -19,6 +19,10 @@ type DB interface {
RowInserter
}
type BulkBeginner interface {
BeginBulk() error
}
type RowInserter interface {
Insert(string, []interface{})
}

View File

@ -15,7 +15,7 @@ type InsertBuffer struct {
wg *sync.WaitGroup
}
func NewInsertBuffer(pg *PostGIS) *InsertBuffer {
func NewInsertBuffer(pg *PostGIS, bulkImport bool) *InsertBuffer {
ib := InsertBuffer{
In: make(chan InsertElement),
Tables: make(map[string]*TableTx),
@ -23,7 +23,7 @@ func NewInsertBuffer(pg *PostGIS) *InsertBuffer {
}
ib.wg.Add(1)
for tableName, table := range pg.Tables {
tt := pg.NewTableTx(table)
tt := pg.NewTableTx(table, bulkImport)
err := tt.Begin()
if err != nil {
panic(err) // TODO

View File

@ -36,8 +36,8 @@ func (t *geometryType) Name() string {
}
func (t *geometryType) PrepareInsertSql(i int, spec *TableSpec) string {
return fmt.Sprintf("ST_GeomFromWKB($%d, %d)",
i, spec.Srid,
return fmt.Sprintf("$%d::Geometry",
i,
)
}

View File

@ -440,7 +440,12 @@ func (pg *PostGIS) Insert(table string, row []interface{}) {
}
func (pg *PostGIS) Begin() error {
pg.InputBuffer = NewInsertBuffer(pg)
pg.InputBuffer = NewInsertBuffer(pg, false)
return nil
}
func (pg *PostGIS) BeginBulk() error {
pg.InputBuffer = NewInsertBuffer(pg, true)
return nil
}
@ -453,14 +458,17 @@ func (pg *PostGIS) End() error {
}
type TableTx struct {
Pg *PostGIS
Tx *sql.Tx
Table string
Spec *TableSpec
Stmt *sql.Stmt
Sql string
wg *sync.WaitGroup
rows chan []interface{}
Pg *PostGIS
Tx *sql.Tx
Table string
Spec *TableSpec
InsertStmt *sql.Stmt
DeleteStmt *sql.Stmt
InsertSql string
DeleteSql string
bulkImport bool
wg *sync.WaitGroup
rows chan []interface{}
}
func (tt *TableTx) Begin() error {
@ -473,12 +481,30 @@ func (tt *TableTx) Begin() error {
if err != nil {
return err
}
tt.Sql = tt.Spec.CopySQL()
stmt, err := tt.Tx.Prepare(tt.Sql)
if err != nil {
return &SQLError{tt.Sql, err}
if tt.bulkImport {
tt.InsertSql = tt.Spec.CopySQL()
} else {
tt.InsertSql = tt.Spec.InsertSQL()
}
tt.Stmt = stmt
stmt, err := tt.Tx.Prepare(tt.InsertSql)
if err != nil {
return &SQLError{tt.InsertSql, err}
}
tt.InsertStmt = stmt
if !tt.bulkImport {
// bulkImport creates COPY FROM STDIN stmt that doesn't
// permit other stmt
tt.DeleteSql = tt.Spec.DeleteSQL()
stmt, err = tt.Tx.Prepare(tt.DeleteSql)
if err != nil {
return &SQLError{tt.DeleteSql, err}
}
tt.DeleteStmt = stmt
}
return nil
}
@ -489,26 +515,22 @@ func (tt *TableTx) Insert(row []interface{}) error {
func (tt *TableTx) loop() {
for row := range tt.rows {
_, err := tt.Stmt.Exec(row...)
_, err := tt.InsertStmt.Exec(row...)
if err != nil {
// TODO
log.Fatal(&SQLInsertError{SQLError{tt.Sql, err}, row})
log.Fatal(&SQLInsertError{SQLError{tt.InsertSql, err}, row})
}
}
tt.wg.Done()
}
func (tt *TableTx) Delete(id int64) error {
sql := tt.Spec.DeleteSQL()
stmt, err := tt.Tx.Prepare(sql)
if err != nil {
return &SQLError{sql, err}
if tt.bulkImport {
panic("unable to delete in bulkImport mode")
}
defer stmt.Close()
_, err = stmt.Exec(id)
_, err := tt.DeleteStmt.Exec(id)
if err != nil {
return &SQLInsertError{SQLError{sql, err}, id}
return &SQLInsertError{SQLError{tt.DeleteSql, err}, id}
}
return nil
}
@ -516,8 +538,8 @@ func (tt *TableTx) Delete(id int64) error {
func (tt *TableTx) Commit() error {
close(tt.rows)
tt.wg.Wait()
if tt.Stmt != nil {
_, err := tt.Stmt.Exec()
if tt.bulkImport && tt.InsertStmt != nil {
_, err := tt.InsertStmt.Exec()
if err != nil {
return err
}
@ -534,13 +556,14 @@ func (tt *TableTx) Rollback() {
rollbackIfTx(&tt.Tx)
}
func (pg *PostGIS) NewTableTx(spec *TableSpec) *TableTx {
func (pg *PostGIS) NewTableTx(spec *TableSpec, bulkImport bool) *TableTx {
tt := &TableTx{
Pg: pg,
Table: spec.Name,
Spec: spec,
wg: &sync.WaitGroup{},
rows: make(chan []interface{}, 64),
Pg: pg,
Table: spec.Name,
Spec: spec,
wg: &sync.WaitGroup{},
rows: make(chan []interface{}, 64),
bulkImport: bulkImport,
}
tt.wg.Add(1)
go tt.loop()

View File

@ -176,12 +176,17 @@ func main() {
die(err)
}
err = db.Begin()
bulkDb, ok := db.(database.BulkBeginner)
if ok {
err = bulkDb.BeginBulk()
} else {
err = db.Begin()
}
if err != nil {
die(err)
}
var diffCache *cache.DiffCache
var diffCache *cache.DiffCache
if *diff {
diffCache = cache.NewDiffCache(*cachedir)
if err = diffCache.Remove(); err != nil {