wip: update diff importer for single transaction imports

master
Oliver Tonnhofer 2013-06-21 11:04:53 +02:00
parent c8492b06f7
commit 509ee23db9
3 changed files with 57 additions and 65 deletions

View File

@ -1,27 +1,28 @@
package postgis
import (
"sync"
)
type InsertElement struct {
Table string
Row []interface{}
}
type DeleteElement struct {
Table string
Id int64
}
type InsertBuffer struct {
In chan InsertElement
Tables map[string]*TableTx
wg *sync.WaitGroup
insertc chan InsertElement
deletec chan DeleteElement
done chan bool
Tables map[string]*TableTx
}
func NewInsertBuffer(pg *PostGIS, bulkImport bool) *InsertBuffer {
ib := InsertBuffer{
In: make(chan InsertElement),
Tables: make(map[string]*TableTx),
wg: &sync.WaitGroup{},
insertc: make(chan InsertElement),
deletec: make(chan DeleteElement),
done: make(chan bool),
Tables: make(map[string]*TableTx),
}
ib.wg.Add(1)
for tableName, table := range pg.Tables {
tt := pg.NewTableTx(table, bulkImport)
err := tt.Begin()
@ -54,21 +55,34 @@ func (ib *InsertBuffer) Abort() error {
}
func (ib *InsertBuffer) Close() {
close(ib.In)
ib.wg.Wait()
ib.done <- true
}
func (ib *InsertBuffer) Insert(table string, row []interface{}) {
ib.In <- InsertElement{table, row}
ib.insertc <- InsertElement{table, row}
}
func (ib *InsertBuffer) Delete(table string, id int64) {
ib.deletec <- DeleteElement{table, id}
}
func (ib *InsertBuffer) loop() {
for elem := range ib.In {
tt, ok := ib.Tables[elem.Table]
if !ok {
panic("unknown table " + elem.Table)
for {
select {
case elem := <-ib.insertc:
tt, ok := ib.Tables[elem.Table]
if !ok {
panic("unknown table " + elem.Table)
}
tt.Insert(elem.Row)
case elem := <-ib.deletec:
tt, ok := ib.Tables[elem.Table]
if !ok {
panic("unknown table " + elem.Table)
}
tt.Delete(elem.Id)
case <-ib.done:
return
}
tt.Insert(elem.Row)
}
ib.wg.Done()
}

View File

@ -124,38 +124,6 @@ func (pg *PostGIS) InsertBatch(table string, rows [][]interface{}) error {
return nil
}
func (pg *PostGIS) Delete(table string, id int64) error {
spec, ok := pg.Tables[table]
if !ok {
return errors.New("unkown table: " + table)
}
tx, err := pg.Db.Begin()
if err != nil {
return err
}
defer rollbackIfTx(&tx)
sql := spec.DeleteSQL()
stmt, err := tx.Prepare(sql)
if err != nil {
return &SQLError{sql, err}
}
defer stmt.Close()
_, err = stmt.Exec(id)
if err != nil {
return &SQLInsertError{SQLError{sql, err}, id}
}
err = tx.Commit()
if err != nil {
return err
}
tx = nil // set nil to prevent rollback
return nil
}
func (pg *PostGIS) Init() error {
if err := pg.createSchema(pg.Schema); err != nil {
return err
@ -439,6 +407,11 @@ func (pg *PostGIS) Insert(table string, row []interface{}) {
pg.InputBuffer.Insert(table, row)
}
func (pg *PostGIS) Delete(table string, id int64) error {
pg.InputBuffer.Delete(table, id)
return nil
}
func (pg *PostGIS) Begin() error {
pg.InputBuffer = NewInsertBuffer(pg, false)
return nil
@ -477,9 +450,12 @@ func (tt *TableTx) Begin() error {
return err
}
tt.Tx = tx
_, err = tx.Exec(fmt.Sprintf(`TRUNCATE TABLE "%s"."%s" RESTART IDENTITY`, tt.Pg.Schema, tt.Table))
if err != nil {
return err
if tt.bulkImport {
_, err = tx.Exec(fmt.Sprintf(`TRUNCATE TABLE "%s"."%s" RESTART IDENTITY`, tt.Pg.Schema, tt.Table))
if err != nil {
return err
}
}
if tt.bulkImport {

View File

@ -54,6 +54,11 @@ func main() {
log.Fatal(err)
}
err = db.Begin()
if err != nil {
log.Fatal(err)
}
delDb, ok := db.(database.Deleter)
if !ok {
log.Fatal("database not deletable")
@ -75,9 +80,6 @@ func main() {
wayTagFilter := tagmapping.WayTagFilter()
nodeTagFilter := tagmapping.NodeTagFilter()
insertBuffer := writer.NewInsertBuffer()
dbWriter := writer.NewDbWriter(db, insertBuffer.Out)
pointsTagMatcher := tagmapping.PointMatcher()
lineStringsTagMatcher := tagmapping.LineStringMatcher()
polygonsTagMatcher := tagmapping.PolygonMatcher()
@ -86,18 +88,20 @@ func main() {
ways := make(chan *element.Way)
nodes := make(chan *element.Node)
srid := 3857 // TODO
relWriter := writer.NewRelationWriter(osmCache, diffCache, relations,
insertBuffer, polygonsTagMatcher, progress)
db, polygonsTagMatcher, progress, srid)
relWriter.SetClipper(geometryClipper)
relWriter.Start()
wayWriter := writer.NewWayWriter(osmCache, diffCache, ways, insertBuffer,
lineStringsTagMatcher, polygonsTagMatcher, progress)
wayWriter := writer.NewWayWriter(osmCache, diffCache, ways, db,
lineStringsTagMatcher, polygonsTagMatcher, progress, srid)
wayWriter.SetClipper(geometryClipper)
wayWriter.Start()
nodeWriter := writer.NewNodeWriter(osmCache, nodes, insertBuffer,
pointsTagMatcher, progress)
nodeWriter := writer.NewNodeWriter(osmCache, nodes, db,
pointsTagMatcher, progress, srid)
nodeWriter.SetClipper(geometryClipper)
nodeWriter.Start()
@ -183,8 +187,6 @@ For:
relWriter.Close()
wayWriter.Close()
insertBuffer.Close()
dbWriter.Close()
progress.Stop()
osmCache.Coords.Flush()
osmCache.Close()