first import with single transaction per table

master
Oliver Tonnhofer 2013-06-11 14:12:45 +02:00
parent aa2c24b8d2
commit 8a9fe9bf2f
8 changed files with 239 additions and 50 deletions

View File

@ -12,8 +12,15 @@ type Config struct {
}
type DB interface {
Begin() error
End() error
Abort() error
Init() error
InsertBatch(string, [][]interface{}) error
RowInserter
}
type RowInserter interface {
Insert(string, []interface{})
}
type Deployer interface {
@ -64,8 +71,11 @@ func ConnectionType(param string) string {
type NullDb struct{}
func (n *NullDb) Init() error { return nil }
func (n *NullDb) InsertBatch(string, [][]interface{}) error { return nil }
func (n *NullDb) Init() error { return nil }
func (n *NullDb) Begin() error { return nil }
func (n *NullDb) End() error { return nil }
func (n *NullDb) Abort() error { return nil }
func (n *NullDb) Insert(string, []interface{}) {}
func NewNullDb(conf Config, m *mapping.Mapping) (DB, error) {
return &NullDb{}, nil

View File

@ -0,0 +1,74 @@
package postgis
import (
"sync"
)
type InsertElement struct {
Table string
Row []interface{}
}
type InsertBuffer struct {
In chan InsertElement
Tables map[string]*TableTx
wg *sync.WaitGroup
}
func NewInsertBuffer(pg *PostGIS) *InsertBuffer {
ib := InsertBuffer{
In: make(chan InsertElement, 64),
Tables: make(map[string]*TableTx),
wg: &sync.WaitGroup{},
}
ib.wg.Add(1)
for tableName, table := range pg.Tables {
tt := pg.NewTableTx(table)
err := tt.Begin()
if err != nil {
panic(err) // TODO
}
ib.Tables[tableName] = tt
}
go ib.loop()
return &ib
}
func (ib *InsertBuffer) End() error {
ib.Close()
for _, tt := range ib.Tables {
if err := tt.Commit(); err != nil {
return err
}
}
return nil
}
func (ib *InsertBuffer) Abort() error {
ib.Close()
for _, tt := range ib.Tables {
tt.Rollback()
}
return nil
}
func (ib *InsertBuffer) Close() {
close(ib.In)
ib.wg.Wait()
}
func (ib *InsertBuffer) Insert(table string, row []interface{}) {
ib.In <- InsertElement{table, row}
}
func (ib *InsertBuffer) loop() {
for elem := range ib.In {
tt, ok := ib.Tables[elem.Table]
if !ok {
panic("unknown table " + elem.Table)
}
tt.Insert(elem.Row)
}
ib.wg.Done()
}

View File

@ -31,17 +31,17 @@ func (e *SQLInsertError) Error() string {
return fmt.Sprintf("SQL Error: %s in query %s (%+v)", e.originalError.Error(), e.query, e.data)
}
func (pg *PostGIS) createTable(spec TableSpec) error {
func createTable(tx *sql.Tx, spec TableSpec) error {
var sql string
var err error
sql = fmt.Sprintf(`DROP TABLE IF EXISTS "%s"."%s"`, spec.Schema, spec.Name)
_, err = pg.Db.Exec(sql)
_, err = tx.Exec(sql)
if err != nil {
return &SQLError{sql, err}
}
sql = spec.CreateTableSQL()
_, err = pg.Db.Exec(sql)
_, err = tx.Exec(sql)
if err != nil {
return &SQLError{sql, err}
}
@ -51,7 +51,7 @@ func (pg *PostGIS) createTable(spec TableSpec) error {
}
sql = fmt.Sprintf("SELECT AddGeometryColumn('%s', '%s', 'geometry', %d, '%s', 2);",
spec.Schema, spec.Name, spec.Srid, geomType)
row := pg.Db.QueryRow(sql)
row := tx.QueryRow(sql)
var void interface{}
err = row.Scan(&void)
if err != nil {
@ -159,11 +159,21 @@ func (pg *PostGIS) Init() error {
return err
}
tx, err := pg.Db.Begin()
if err != nil {
return err
}
defer rollbackIfTx(&tx)
for _, spec := range pg.Tables {
if err := pg.createTable(*spec); err != nil {
if err := createTable(tx, *spec); err != nil {
return err
}
}
err = tx.Commit()
if err != nil {
return err
}
tx = nil
return nil
}
@ -343,6 +353,7 @@ type PostGIS struct {
Tables map[string]*TableSpec
GeneralizedTables map[string]*GeneralizedTableSpec
Prefix string
InputBuffer *InsertBuffer
}
func (pg *PostGIS) Open() error {
@ -364,6 +375,91 @@ func (pg *PostGIS) Open() error {
return nil
}
func (pg *PostGIS) Insert(table string, row []interface{}) {
pg.InputBuffer.Insert(table, row)
}
func (pg *PostGIS) Begin() error {
pg.InputBuffer = NewInsertBuffer(pg)
return nil
}
func (pg *PostGIS) Abort() error {
return pg.InputBuffer.Abort()
}
func (pg *PostGIS) End() error {
return pg.InputBuffer.End()
}
type TableTx struct {
Pg *PostGIS
Tx *sql.Tx
Table string
Spec *TableSpec
Stmt *sql.Stmt
Sql string
}
func (tt *TableTx) Begin() error {
tx, err := tt.Pg.Db.Begin()
if err != nil {
return err
}
tt.Tx = tx
tt.Sql = tt.Spec.InsertSQL()
stmt, err := tt.Tx.Prepare(tt.Sql)
if err != nil {
return &SQLError{tt.Sql, err}
}
tt.Stmt = stmt
return nil
}
func (tt *TableTx) Insert(row []interface{}) error {
_, err := tt.Stmt.Exec(row...)
if err != nil {
return &SQLInsertError{SQLError{tt.Sql, err}, row}
}
return nil
}
func (tt *TableTx) Delete(id int64) error {
sql := tt.Spec.DeleteSQL()
stmt, err := tt.Tx.Prepare(sql)
if err != nil {
return &SQLError{sql, err}
}
defer stmt.Close()
_, err = stmt.Exec(id)
if err != nil {
return &SQLInsertError{SQLError{sql, err}, id}
}
return nil
}
func (tt *TableTx) Commit() error {
err := tt.Tx.Commit()
if err != nil {
return err
}
tt.Tx = nil
return nil
}
func (tt *TableTx) Rollback() {
rollbackIfTx(&tt.Tx)
}
func (pg *PostGIS) NewTableTx(spec *TableSpec) *TableTx {
return &TableTx{
Pg: pg,
Table: spec.Name,
Spec: spec,
}
}
func New(conf database.Config, m *mapping.Mapping) (database.DB, error) {
db := &PostGIS{}
db.Tables = make(map[string]*TableSpec)

View File

@ -185,6 +185,11 @@ func main() {
if err != nil {
die(err)
}
err = db.Begin()
if err != nil {
die(err)
}
var diffCache *cache.DiffCache
if *diff {
@ -197,16 +202,13 @@ func main() {
}
}
insertBuffer := writer.NewInsertBuffer()
dbWriter := writer.NewDbWriter(db, insertBuffer.Out)
pointsTagMatcher := tagmapping.PointMatcher()
lineStringsTagMatcher := tagmapping.LineStringMatcher()
polygonsTagMatcher := tagmapping.PolygonMatcher()
relations := osmCache.Relations.Iter()
relWriter := writer.NewRelationWriter(osmCache, diffCache, relations,
insertBuffer, polygonsTagMatcher, progress)
db, polygonsTagMatcher, progress)
relWriter.SetClipper(geometryClipper)
relWriter.Start()
@ -214,7 +216,7 @@ func main() {
relWriter.Close()
ways := osmCache.Ways.Iter()
wayWriter := writer.NewWayWriter(osmCache, diffCache, ways, insertBuffer,
wayWriter := writer.NewWayWriter(osmCache, diffCache, ways, db,
lineStringsTagMatcher, polygonsTagMatcher, progress)
wayWriter.SetClipper(geometryClipper)
wayWriter.Start()
@ -223,7 +225,7 @@ func main() {
wayWriter.Close()
nodes := osmCache.Nodes.Iter()
nodeWriter := writer.NewNodeWriter(osmCache, nodes, insertBuffer,
nodeWriter := writer.NewNodeWriter(osmCache, nodes, db,
pointsTagMatcher, progress)
nodeWriter.SetClipper(geometryClipper)
nodeWriter.Start()
@ -231,8 +233,13 @@ func main() {
// blocks till the Nodes.Iter() finishes
nodeWriter.Close()
insertBuffer.Close()
dbWriter.Close()
err = db.End()
if err != nil {
die(err)
}
// insertBuffer.Close()
// dbWriter.Close()
progress.Stop()
if *diff {

View File

@ -2,6 +2,7 @@ package writer
import (
"goposm/cache"
"goposm/database"
"goposm/element"
"goposm/geom"
"goposm/geom/geos"
@ -19,7 +20,7 @@ type NodeWriter struct {
}
func NewNodeWriter(osmCache *cache.OSMCache, nodes chan *element.Node,
insertBuffer *InsertBuffer, tagMatcher *mapping.TagMatcher, progress *stats.Statistics) *OsmElemWriter {
insertBuffer database.RowInserter, tagMatcher *mapping.TagMatcher, progress *stats.Statistics) *OsmElemWriter {
nw := NodeWriter{
OsmElemWriter: OsmElemWriter{
osmCache: osmCache,

View File

@ -3,6 +3,7 @@ package writer
import (
"fmt"
"goposm/cache"
"goposm/database"
"goposm/element"
"goposm/geom"
"goposm/geom/geos"
@ -20,7 +21,7 @@ type RelationWriter struct {
}
func NewRelationWriter(osmCache *cache.OSMCache, diffCache *cache.DiffCache, rel chan *element.Relation,
insertBuffer *InsertBuffer, tagMatcher *mapping.TagMatcher, progress *stats.Statistics) *OsmElemWriter {
insertBuffer database.RowInserter, tagMatcher *mapping.TagMatcher, progress *stats.Statistics) *OsmElemWriter {
rw := RelationWriter{
OsmElemWriter: OsmElemWriter{
osmCache: osmCache,

View File

@ -2,6 +2,7 @@ package writer
import (
"goposm/cache"
"goposm/database"
"goposm/element"
"goposm/geom"
"goposm/geom/geos"
@ -20,7 +21,7 @@ type WayWriter struct {
}
func NewWayWriter(osmCache *cache.OSMCache, diffCache *cache.DiffCache, ways chan *element.Way,
insertBuffer *InsertBuffer, lineStringTagMatcher *mapping.TagMatcher,
insertBuffer database.RowInserter, lineStringTagMatcher *mapping.TagMatcher,
polygonTagMatcher *mapping.TagMatcher, progress *stats.Statistics) *OsmElemWriter {
ww := WayWriter{
OsmElemWriter: OsmElemWriter{

View File

@ -7,7 +7,6 @@ import (
"goposm/geom/clipper"
"goposm/mapping"
"goposm/stats"
"log"
"runtime"
"sync"
)
@ -16,38 +15,38 @@ type ErrorLevel interface {
Level() int
}
type DbWriter struct {
Db database.DB
In chan InsertBatch
wg *sync.WaitGroup
}
// type DbWriter struct {
// Db database.DB
// In chan InsertBatch
// wg *sync.WaitGroup
// }
func NewDbWriter(db database.DB, in chan InsertBatch) *DbWriter {
dw := DbWriter{
Db: db,
In: in,
wg: &sync.WaitGroup{},
}
for i := 0; i < runtime.NumCPU(); i++ {
dw.wg.Add(1)
go dw.loop()
}
return &dw
}
// func NewDbWriter(db database.DB, in chan InsertBatch) *DbWriter {
// dw := DbWriter{
// Db: db,
// In: in,
// wg: &sync.WaitGroup{},
// }
// for i := 0; i < runtime.NumCPU(); i++ {
// dw.wg.Add(1)
// go dw.loop()
// }
// return &dw
// }
func (dw *DbWriter) Close() {
dw.wg.Wait()
}
// func (dw *DbWriter) Close() {
// dw.wg.Wait()
// }
func (dw *DbWriter) loop() {
for batch := range dw.In {
err := dw.Db.InsertBatch(batch.Table, batch.Rows)
if err != nil {
log.Println(err)
}
}
dw.wg.Done()
}
// func (dw *DbWriter) loop() {
// for batch := range dw.In {
// err := dw.Db.InsertBatch(batch.Table, batch.Rows)
// if err != nil {
// log.Println(err)
// }
// }
// dw.wg.Done()
// }
type looper interface {
loop()
@ -57,7 +56,7 @@ type OsmElemWriter struct {
osmCache *cache.OSMCache
diffCache *cache.DiffCache
progress *stats.Statistics
insertBuffer *InsertBuffer
insertBuffer database.RowInserter
wg *sync.WaitGroup
clipper *clipper.Clipper
writer looper