first postgis writer

master
Oliver Tonnhofer 2013-05-08 16:45:14 +02:00
parent 2f8a72777c
commit e517f5f3fc
5 changed files with 349 additions and 41 deletions

View File

@ -86,6 +86,7 @@ func UnmarshalNode(data []byte) (node *element.Node, err error) {
}
func MarshalWay(way *element.Way) ([]byte, error) {
// TODO reuse Way to avoid make(Tags) for each way in TagsAsArray
pbfWay := &Way{}
pbfWay.Id = &way.Id
pbfWay.Refs = way.Refs

View File

@ -1,49 +1,294 @@
package main
package db
import (
"database/sql"
"fmt"
_ "github.com/bmizerany/pq"
"goposm/element"
"goposm/geom"
"log"
"strings"
)
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
db, err := sql.Open("postgres", "user=olt host=/var/run/postgresql dbname=osm sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
_, err = db.Exec("DROP TABLE IF EXISTS test")
if err != nil {
log.Fatal(err)
}
_, err = db.Exec("CREATE TABLE IF NOT EXISTS test (val VARCHAR);")
if err != nil {
log.Fatal(err)
}
_, err = db.Query("SELECT AddGeometryColumn('test', 'geom', 3857, 'LINESTRING', 2);")
if err != nil {
log.Fatal(err)
}
size := 16
nodes := make([]element.Node, size)
for i := 0; i < size; i++ {
nodes[i] = element.Node{Lat: 0, Long: float64(i)}
}
wkb := geom.LineString(nodes)
stmt, err := db.Prepare("INSERT INTO test (val, geom) VALUES ($1, ST_SetSRID(ST_GeomFromWKB($2), 3857));")
if err != nil {
log.Fatal(err)
}
_, err = stmt.Exec("test", wkb)
if err != nil {
log.Fatal(err)
}
type Config struct {
Type string
ConnectionParams string
Srid int
Schema string
}
type DB interface {
InsertWays([]element.Way, TableSpec) error
Init(specs []TableSpec) error
}
type ColumnSpec struct {
Name string
Type string
}
type TableSpec struct {
Name string
Schema string
Columns []ColumnSpec
GeometryType string
Srid int
}
func (col *ColumnSpec) AsSQL() string {
return fmt.Sprintf("\"%s\" %s", col.Name, col.Type)
}
func (spec *TableSpec) CreateTableSQL() string {
cols := []string{
"id SERIAL PRIMARY KEY",
"osm_id BIGINT",
}
for _, col := range spec.Columns {
cols = append(cols, col.AsSQL())
}
columnSQL := strings.Join(cols, ",\n")
return fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS "%s"."%s" (
%s
);`,
spec.Schema,
spec.Name,
columnSQL,
)
}
func (spec *TableSpec) WayValues(way element.Way) []interface{} {
values := make([]interface{}, 0, len(spec.Columns)+2)
values = append(values, way.Id)
values = append(values, way.Wkb)
for _, col := range spec.Columns {
v, ok := way.Tags[col.Name]
if !ok {
values = append(values, nil)
} else {
values = append(values, v)
}
}
return values
}
func (spec *TableSpec) InsertSQL() string {
cols := []string{"osm_id", "geometry"}
vars := []string{
"$1",
fmt.Sprintf("ST_GeomFromWKB($2, %d)", spec.Srid),
}
for i, col := range spec.Columns {
cols = append(cols, col.Name)
vars = append(vars, fmt.Sprintf("$%d", i+3))
}
columns := strings.Join(cols, ", ")
placeholders := strings.Join(vars, ", ")
return fmt.Sprintf(`INSERT INTO "%s"."%s" (%s) VALUES (%s)`,
spec.Schema,
spec.Name,
columns,
placeholders,
)
}
type SQLError struct {
query string
originalError error
}
func (e *SQLError) Error() string {
return fmt.Sprintf("SQL Error: %s in query %s", e.originalError.Error(), e.query)
}
func (pg *PostGIS) createTable(spec TableSpec) error {
var sql string
var err error
sql = fmt.Sprintf(`DROP TABLE IF EXISTS "%s"."%s"`, spec.Schema, spec.Name)
_, err = pg.Db.Exec(sql)
if err != nil {
return &SQLError{sql, err}
}
sql = spec.CreateTableSQL()
log.Println(sql)
_, err = pg.Db.Exec(sql)
if err != nil {
return &SQLError{sql, err}
}
sql = fmt.Sprintf("SELECT AddGeometryColumn('%s', '%s', 'geometry', %d, '%s', 2);",
spec.Schema, spec.Name, spec.Srid, spec.GeometryType)
_, err = pg.Db.Query(sql)
if err != nil {
return &SQLError{sql, err}
}
return nil
}
func (pg *PostGIS) createSchema() error {
var sql string
var err error
sql = fmt.Sprintf("CREATE SCHEMA \"%s\"", pg.Config.Schema)
_, err = pg.Db.Exec(sql)
if err != nil {
return &SQLError{sql, err}
}
return nil
}
type PostGIS struct {
Db *sql.DB
Config Config
}
func (pg *PostGIS) Open() error {
var err error
pg.Db, err = sql.Open("postgres", "user=olt host=localhost dbname=olt sslmode=disable")
return err
}
func (pg *PostGIS) WayInserter(spec TableSpec, ways chan []element.Way) error {
for ws := range ways {
err := pg.InsertWays(ws, spec)
if err != nil {
return err
}
}
return nil
}
func (pg *PostGIS) InsertWays(ways []element.Way, spec TableSpec) error {
tx, err := pg.Db.Begin()
if err != nil {
return err
}
defer func() {
if tx != nil {
if err := tx.Rollback(); err != nil {
log.Println("rollback failed", err)
}
}
}()
sql := spec.InsertSQL()
stmt, err := tx.Prepare(sql)
if err != nil {
return &SQLError{sql, err}
}
for _, w := range ways {
_, err := stmt.Exec(spec.WayValues(w)...)
if err != nil {
return &SQLError{sql, err}
}
}
err = tx.Commit()
if err != nil {
return err
}
tx = nil
return nil
}
func (pg *PostGIS) Init(specs []TableSpec) error {
if err := pg.createSchema(); err != nil {
return err
}
for _, spec := range specs {
if err := pg.createTable(spec); err != nil {
return err
}
}
return nil
}
func Open(conf Config) (DB, error) {
if conf.Type != "postgres" {
panic("unsupported database type: " + conf.Type)
}
db := &PostGIS{}
db.Config = conf
err := db.Open()
if err != nil {
return nil, err
}
return db, nil
}
// func InitDb() {
// rawDb, err := sql.Open("postgres", "user=olt host=localhost dbname=olt sslmode=disable")
// if err != nil {
// log.Fatal(err)
// }
// defer rawDb.Close()
// pg := PostGIS{rawDb, "public"}
// pg.createSchema()
// spec := TableSpec{
// "goposm_test",
// pg.Schema,
// []ColumnSpec{
// {"name", "VARCHAR"},
// {"highway", "VARCHAR"},
// },
// "LINESTRING",
// 3857,
// }
// err = pg.createTable(spec)
// if err != nil {
// log.Fatal(err)
// }
// }
// func InsertWays(ways chan []element.Way, wg *sync.WaitGroup) {
// wg.Add(1)
// defer wg.Done()
// rawDb, err := sql.Open("postgres", "user=olt host=localhost dbname=olt sslmode=disable")
// if err != nil {
// log.Fatal(err)
// }
// defer rawDb.Close()
// pg := PostGIS{rawDb, "public"}
// spec := TableSpec{
// "goposm_test",
// pg.Schema,
// []ColumnSpec{
// {"name", "VARCHAR"},
// {"highway", "VARCHAR"},
// },
// "LINESTRING",
// 3857,
// }
// for ws := range ways {
// err = pg.insertWays(ws, spec)
// if err != nil {
// log.Fatal(err)
// }
// }
// }
// func main() {
// wayChan := make(chan element.Way)
// wg := &sync.WaitGroup{}
// go InsertWays(wayChan, wg)
// ways := []element.Way{
// {OSMElem: element.OSMElem{1234, element.Tags{"name": "Foo"}}, Wkb: []byte{0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0}},
// // {OSMElem: element.OSMElem{6666, element.Tags{"name": "Baz", "type": "motorway"}}},
// // {OSMElem: element.OSMElem{9999, element.Tags{"name": "Bar", "type": "bar"}}},
// }
// for _, w := range ways {
// wayChan <- w
// }
// close(wayChan)
// wg.Wait()
// }

View File

@ -17,6 +17,7 @@ type Way struct {
OSMElem
Refs []int64
Nodes []Node
Wkb []byte
}
type MemberType int

View File

@ -4,9 +4,13 @@ import (
"flag"
"fmt"
"goposm/cache"
"goposm/db"
"goposm/element"
"goposm/geom"
"goposm/geom/geos"
"goposm/mapping"
"goposm/parser"
"goposm/proj"
"goposm/stats"
"log"
"os"
@ -197,26 +201,76 @@ func main() {
}
waitFill := sync.WaitGroup{}
wayChan := make(chan []element.Way)
waitDb := &sync.WaitGroup{}
config := db.Config{"postgres", "user=olt host=localhost dbname=olt sslmode=disable", 3857, "public"}
pg, err := db.Open(config)
if err != nil {
log.Fatal(err)
}
specs := []db.TableSpec{
{
"goposm_test",
config.Schema,
[]db.ColumnSpec{
{"name", "VARCHAR"},
{"highway", "VARCHAR"},
},
"LINESTRING",
config.Srid,
},
}
pg.Init(specs)
for i := 0; i < runtime.NumCPU(); i++ {
waitDb.Add(1)
go func() {
for ways := range wayChan {
pg.InsertWays(ways, specs[0])
}
waitDb.Done()
}()
}
for i := 0; i < runtime.NumCPU(); i++ {
waitFill.Add(1)
go func() {
geos := geos.NewGEOS()
defer geos.Finish()
batch := make([]element.Way, 0, 10*1024)
for w := range way {
progress.AddWays(1)
ok := osmCache.Coords.FillWay(w)
if !ok {
continue
}
proj.NodesToMerc(w.Nodes)
w.Wkb, err = geom.LineStringWKB(geos, w.Nodes)
if err != nil {
log.Println(err)
continue
}
batch = append(batch, *w)
if len(batch) >= 10*1024 {
wayChan <- batch
batch = make([]element.Way, 0, 10*1024)
}
if true {
for _, node := range w.Nodes {
diffCache.Coords.Add(node.Id, w.Id)
}
}
}
wayChan <- batch
waitFill.Done()
}()
}
waitFill.Wait()
close(wayChan)
waitDb.Wait()
}
//parser.PBFStats(os.Args[1])

View File

@ -1,6 +1,7 @@
package proj
import (
"goposm/element"
"math"
)
@ -17,3 +18,9 @@ func mercToWgs(x, y float64) (long, lat float64) {
lat = 180.0 / math.Pi * (2*math.Atan(math.Exp((y/pole)*math.Pi)) - math.Pi/2)
return long, lat
}
func NodesToMerc(nodes []element.Node) {
for _, nd := range nodes {
nd.Long, nd.Lat = wgsToMerc(nd.Long, nd.Lat)
}
}