From e517f5f3fc9be5d81720926e91b2664473f4f4a1 Mon Sep 17 00:00:00 2001 From: Oliver Tonnhofer Date: Wed, 8 May 2013 16:45:14 +0200 Subject: [PATCH] first postgis writer --- binary/serialize.go | 1 + db/postgis.go | 325 ++++++++++++++++++++++++++++++++++++++------ element/element.go | 1 + goposm.go | 56 +++++++- proj/proj.go | 7 + 5 files changed, 349 insertions(+), 41 deletions(-) diff --git a/binary/serialize.go b/binary/serialize.go index a6cdcaa..e53abb9 100644 --- a/binary/serialize.go +++ b/binary/serialize.go @@ -86,6 +86,7 @@ func UnmarshalNode(data []byte) (node *element.Node, err error) { } func MarshalWay(way *element.Way) ([]byte, error) { + // TODO reuse Way to avoid make(Tags) for each way in TagsAsArray pbfWay := &Way{} pbfWay.Id = &way.Id pbfWay.Refs = way.Refs diff --git a/db/postgis.go b/db/postgis.go index 28c0b4e..4692219 100644 --- a/db/postgis.go +++ b/db/postgis.go @@ -1,49 +1,294 @@ -package main +package db import ( "database/sql" + "fmt" _ "github.com/bmizerany/pq" "goposm/element" - "goposm/geom" "log" + "strings" ) -func main() { - log.SetFlags(log.LstdFlags | log.Lshortfile) - db, err := sql.Open("postgres", "user=olt host=/var/run/postgresql dbname=osm sslmode=disable") - if err != nil { - log.Fatal(err) - } - defer db.Close() - - _, err = db.Exec("DROP TABLE IF EXISTS test") - if err != nil { - log.Fatal(err) - } - _, err = db.Exec("CREATE TABLE IF NOT EXISTS test (val VARCHAR);") - if err != nil { - log.Fatal(err) - } - _, err = db.Query("SELECT AddGeometryColumn('test', 'geom', 3857, 'LINESTRING', 2);") - if err != nil { - log.Fatal(err) - } - - size := 16 - nodes := make([]element.Node, size) - for i := 0; i < size; i++ { - nodes[i] = element.Node{Lat: 0, Long: float64(i)} - } - wkb := geom.LineString(nodes) - - stmt, err := db.Prepare("INSERT INTO test (val, geom) VALUES ($1, ST_SetSRID(ST_GeomFromWKB($2), 3857));") - if err != nil { - log.Fatal(err) - } - - _, err = stmt.Exec("test", wkb) - if err != nil { - log.Fatal(err) - } - +type Config struct { + Type string + ConnectionParams string + Srid int + Schema string } + +type DB interface { + InsertWays([]element.Way, TableSpec) error + Init(specs []TableSpec) error +} + +type ColumnSpec struct { + Name string + Type string +} +type TableSpec struct { + Name string + Schema string + Columns []ColumnSpec + GeometryType string + Srid int +} + +func (col *ColumnSpec) AsSQL() string { + return fmt.Sprintf("\"%s\" %s", col.Name, col.Type) +} + +func (spec *TableSpec) CreateTableSQL() string { + cols := []string{ + "id SERIAL PRIMARY KEY", + "osm_id BIGINT", + } + for _, col := range spec.Columns { + cols = append(cols, col.AsSQL()) + } + columnSQL := strings.Join(cols, ",\n") + return fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS "%s"."%s" ( + %s + );`, + spec.Schema, + spec.Name, + columnSQL, + ) +} + +func (spec *TableSpec) WayValues(way element.Way) []interface{} { + values := make([]interface{}, 0, len(spec.Columns)+2) + values = append(values, way.Id) + values = append(values, way.Wkb) + for _, col := range spec.Columns { + v, ok := way.Tags[col.Name] + if !ok { + values = append(values, nil) + } else { + values = append(values, v) + } + } + return values +} + +func (spec *TableSpec) InsertSQL() string { + cols := []string{"osm_id", "geometry"} + vars := []string{ + "$1", + fmt.Sprintf("ST_GeomFromWKB($2, %d)", spec.Srid), + } + for i, col := range spec.Columns { + cols = append(cols, col.Name) + vars = append(vars, fmt.Sprintf("$%d", i+3)) + } + columns := strings.Join(cols, ", ") + placeholders := strings.Join(vars, ", ") + + return fmt.Sprintf(`INSERT INTO "%s"."%s" (%s) VALUES (%s)`, + spec.Schema, + spec.Name, + columns, + placeholders, + ) +} + +type SQLError struct { + query string + originalError error +} + +func (e *SQLError) Error() string { + return fmt.Sprintf("SQL Error: %s in query %s", e.originalError.Error(), e.query) +} + +func (pg *PostGIS) createTable(spec TableSpec) error { + var sql string + var err error + + sql = fmt.Sprintf(`DROP TABLE IF EXISTS "%s"."%s"`, spec.Schema, spec.Name) + _, err = pg.Db.Exec(sql) + if err != nil { + return &SQLError{sql, err} + } + + sql = spec.CreateTableSQL() + log.Println(sql) + _, err = pg.Db.Exec(sql) + if err != nil { + return &SQLError{sql, err} + } + sql = fmt.Sprintf("SELECT AddGeometryColumn('%s', '%s', 'geometry', %d, '%s', 2);", + spec.Schema, spec.Name, spec.Srid, spec.GeometryType) + _, err = pg.Db.Query(sql) + if err != nil { + return &SQLError{sql, err} + } + return nil +} + +func (pg *PostGIS) createSchema() error { + var sql string + var err error + + sql = fmt.Sprintf("CREATE SCHEMA \"%s\"", pg.Config.Schema) + _, err = pg.Db.Exec(sql) + if err != nil { + return &SQLError{sql, err} + } + return nil +} + +type PostGIS struct { + Db *sql.DB + Config Config +} + +func (pg *PostGIS) Open() error { + var err error + pg.Db, err = sql.Open("postgres", "user=olt host=localhost dbname=olt sslmode=disable") + return err +} + +func (pg *PostGIS) WayInserter(spec TableSpec, ways chan []element.Way) error { + for ws := range ways { + err := pg.InsertWays(ws, spec) + if err != nil { + return err + } + } + return nil +} + +func (pg *PostGIS) InsertWays(ways []element.Way, spec TableSpec) error { + tx, err := pg.Db.Begin() + if err != nil { + return err + } + defer func() { + if tx != nil { + if err := tx.Rollback(); err != nil { + log.Println("rollback failed", err) + } + } + }() + + sql := spec.InsertSQL() + stmt, err := tx.Prepare(sql) + if err != nil { + return &SQLError{sql, err} + } + + for _, w := range ways { + _, err := stmt.Exec(spec.WayValues(w)...) + if err != nil { + return &SQLError{sql, err} + } + } + + err = tx.Commit() + if err != nil { + return err + } + tx = nil + return nil +} + +func (pg *PostGIS) Init(specs []TableSpec) error { + if err := pg.createSchema(); err != nil { + return err + } + for _, spec := range specs { + if err := pg.createTable(spec); err != nil { + return err + } + } + return nil +} + +func Open(conf Config) (DB, error) { + if conf.Type != "postgres" { + panic("unsupported database type: " + conf.Type) + } + db := &PostGIS{} + db.Config = conf + err := db.Open() + if err != nil { + return nil, err + } + return db, nil +} + +// func InitDb() { +// rawDb, err := sql.Open("postgres", "user=olt host=localhost dbname=olt sslmode=disable") +// if err != nil { +// log.Fatal(err) +// } +// defer rawDb.Close() + +// pg := PostGIS{rawDb, "public"} +// pg.createSchema() + +// spec := TableSpec{ +// "goposm_test", +// pg.Schema, +// []ColumnSpec{ +// {"name", "VARCHAR"}, +// {"highway", "VARCHAR"}, +// }, +// "LINESTRING", +// 3857, +// } +// err = pg.createTable(spec) +// if err != nil { +// log.Fatal(err) +// } +// } + +// func InsertWays(ways chan []element.Way, wg *sync.WaitGroup) { +// wg.Add(1) +// defer wg.Done() + +// rawDb, err := sql.Open("postgres", "user=olt host=localhost dbname=olt sslmode=disable") +// if err != nil { +// log.Fatal(err) +// } +// defer rawDb.Close() + +// pg := PostGIS{rawDb, "public"} + +// spec := TableSpec{ +// "goposm_test", +// pg.Schema, +// []ColumnSpec{ +// {"name", "VARCHAR"}, +// {"highway", "VARCHAR"}, +// }, +// "LINESTRING", +// 3857, +// } + +// for ws := range ways { +// err = pg.insertWays(ws, spec) +// if err != nil { +// log.Fatal(err) +// } +// } +// } + +// func main() { +// wayChan := make(chan element.Way) +// wg := &sync.WaitGroup{} + +// go InsertWays(wayChan, wg) + +// ways := []element.Way{ +// {OSMElem: element.OSMElem{1234, element.Tags{"name": "Foo"}}, Wkb: []byte{0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0}}, +// // {OSMElem: element.OSMElem{6666, element.Tags{"name": "Baz", "type": "motorway"}}}, +// // {OSMElem: element.OSMElem{9999, element.Tags{"name": "Bar", "type": "bar"}}}, +// } +// for _, w := range ways { +// wayChan <- w +// } +// close(wayChan) +// wg.Wait() +// } diff --git a/element/element.go b/element/element.go index b50bd50..9281631 100644 --- a/element/element.go +++ b/element/element.go @@ -17,6 +17,7 @@ type Way struct { OSMElem Refs []int64 Nodes []Node + Wkb []byte } type MemberType int diff --git a/goposm.go b/goposm.go index 2c4d3e8..9866642 100644 --- a/goposm.go +++ b/goposm.go @@ -4,9 +4,13 @@ import ( "flag" "fmt" "goposm/cache" + "goposm/db" "goposm/element" + "goposm/geom" + "goposm/geom/geos" "goposm/mapping" "goposm/parser" + "goposm/proj" "goposm/stats" "log" "os" @@ -197,26 +201,76 @@ func main() { } waitFill := sync.WaitGroup{} + wayChan := make(chan []element.Way) + waitDb := &sync.WaitGroup{} + config := db.Config{"postgres", "user=olt host=localhost dbname=olt sslmode=disable", 3857, "public"} + pg, err := db.Open(config) + if err != nil { + log.Fatal(err) + } + specs := []db.TableSpec{ + { + "goposm_test", + config.Schema, + []db.ColumnSpec{ + {"name", "VARCHAR"}, + {"highway", "VARCHAR"}, + }, + "LINESTRING", + config.Srid, + }, + } + pg.Init(specs) + for i := 0; i < runtime.NumCPU(); i++ { + waitDb.Add(1) + go func() { + for ways := range wayChan { + pg.InsertWays(ways, specs[0]) + } + waitDb.Done() + }() + } + for i := 0; i < runtime.NumCPU(); i++ { waitFill.Add(1) - go func() { + geos := geos.NewGEOS() + defer geos.Finish() + + batch := make([]element.Way, 0, 10*1024) for w := range way { progress.AddWays(1) ok := osmCache.Coords.FillWay(w) if !ok { continue } + + proj.NodesToMerc(w.Nodes) + w.Wkb, err = geom.LineStringWKB(geos, w.Nodes) + if err != nil { + log.Println(err) + continue + } + batch = append(batch, *w) + + if len(batch) >= 10*1024 { + wayChan <- batch + batch = make([]element.Way, 0, 10*1024) + } + if true { for _, node := range w.Nodes { diffCache.Coords.Add(node.Id, w.Id) } } } + wayChan <- batch waitFill.Done() }() } waitFill.Wait() + close(wayChan) + waitDb.Wait() } //parser.PBFStats(os.Args[1]) diff --git a/proj/proj.go b/proj/proj.go index 96c9f80..d53a9e6 100644 --- a/proj/proj.go +++ b/proj/proj.go @@ -1,6 +1,7 @@ package proj import ( + "goposm/element" "math" ) @@ -17,3 +18,9 @@ func mercToWgs(x, y float64) (long, lat float64) { lat = 180.0 / math.Pi * (2*math.Atan(math.Exp((y/pole)*math.Pi)) - math.Pi/2) return long, lat } + +func NodesToMerc(nodes []element.Node) { + for _, nd := range nodes { + nd.Long, nd.Lat = wgsToMerc(nd.Long, nd.Lat) + } +}