insert into multiple tables

master
Oliver Tonnhofer 2013-05-14 16:15:35 +02:00
parent 8ea0572410
commit bcb5c9e9f3
5 changed files with 372 additions and 200 deletions

View File

@ -2,9 +2,10 @@ package db
import (
"database/sql"
"errors"
"fmt"
_ "github.com/bmizerany/pq"
"goposm/element"
"goposm/mapping"
"log"
"strings"
)
@ -17,14 +18,13 @@ type Config struct {
}
type DB interface {
InsertWays([]element.Way, TableSpec) error
Init(specs []TableSpec) error
Init(*mapping.Mapping) error
InsertBatch(string, [][]interface{}) error
}
type ColumnSpec struct {
Name string
Type string
Value func(string, map[string]string, interface{}) interface{}
Name string
Type string
}
type TableSpec struct {
Name string
@ -41,7 +41,7 @@ func (col *ColumnSpec) AsSQL() string {
func (spec *TableSpec) CreateTableSQL() string {
cols := []string{
"id SERIAL PRIMARY KEY",
"osm_id BIGINT",
// "osm_id BIGINT",
}
for _, col := range spec.Columns {
cols = append(cols, col.AsSQL())
@ -57,34 +57,18 @@ func (spec *TableSpec) CreateTableSQL() string {
)
}
func (spec *TableSpec) WayValues(way element.Way) []interface{} {
values := make([]interface{}, 0, len(spec.Columns)+2)
values = append(values, way.Id)
values = append(values, way.Wkb)
for _, col := range spec.Columns {
v, ok := way.Tags[col.Name]
if !ok {
values = append(values, nil)
} else {
if col.Value != nil {
values = append(values, col.Value(v, way.Tags, way))
} else {
values = append(values, v)
}
}
}
return values
}
func (spec *TableSpec) InsertSQL() string {
cols := []string{"osm_id", "geometry"}
vars := []string{
"$1",
fmt.Sprintf("ST_GeomFromWKB($2, %d)", spec.Srid),
cols := []string{
// "osm_id",
// "geometry",
}
for i, col := range spec.Columns {
vars := []string{
// "$1",
// fmt.Sprintf("ST_GeomFromWKB($2, %d)", spec.Srid),
}
for _, col := range spec.Columns {
cols = append(cols, col.Name)
vars = append(vars, fmt.Sprintf("$%d", i+3))
vars = append(vars, fmt.Sprintf("$%d", len(vars)+1))
}
columns := strings.Join(cols, ", ")
placeholders := strings.Join(vars, ", ")
@ -97,6 +81,20 @@ func (spec *TableSpec) InsertSQL() string {
)
}
func NewTableSpec(conf *Config, t *mapping.Table) *TableSpec {
spec := TableSpec{
Name: t.Name,
Schema: conf.Schema,
GeometryType: t.Type,
Srid: conf.Srid,
}
for _, field := range t.Fields {
col := ColumnSpec{field.Key, "VARCHAR"}
spec.Columns = append(spec.Columns, col)
}
return &spec
}
type SQLError struct {
query string
originalError error
@ -132,7 +130,9 @@ func (pg *PostGIS) createTable(spec TableSpec) error {
}
sql = fmt.Sprintf("SELECT AddGeometryColumn('%s', '%s', 'geometry', %d, '%s', 2);",
spec.Schema, spec.Name, spec.Srid, spec.GeometryType)
_, err = pg.Db.Query(sql)
row := pg.Db.QueryRow(sql)
var void interface{}
err = row.Scan(&void)
if err != nil {
return &SQLError{sql, err}
}
@ -170,6 +170,7 @@ func (pg *PostGIS) createSchema() error {
type PostGIS struct {
Db *sql.DB
Config Config
Tables map[string]*TableSpec
}
func (pg *PostGIS) Open() error {
@ -189,17 +190,12 @@ func (pg *PostGIS) Open() error {
return nil
}
func (pg *PostGIS) WayInserter(spec TableSpec, ways chan []element.Way) error {
for ws := range ways {
err := pg.InsertWays(ws, spec)
if err != nil {
return err
}
func (pg *PostGIS) InsertBatch(table string, rows [][]interface{}) error {
spec, ok := pg.Tables[table]
if !ok {
return errors.New("unkown table: " + table)
}
return nil
}
func (pg *PostGIS) InsertWays(ways []element.Way, spec TableSpec) error {
tx, err := pg.Db.Begin()
if err != nil {
return err
@ -217,11 +213,12 @@ func (pg *PostGIS) InsertWays(ways []element.Way, spec TableSpec) error {
if err != nil {
return &SQLError{sql, err}
}
defer stmt.Close()
for _, w := range ways {
_, err := stmt.Exec(spec.WayValues(w)...)
for _, row := range rows {
_, err := stmt.Exec(row...)
if err != nil {
return &SQLInsertError{SQLError{sql, err}, spec.WayValues(w)}
return &SQLInsertError{SQLError{sql, err}, row}
}
}
@ -231,14 +228,19 @@ func (pg *PostGIS) InsertWays(ways []element.Way, spec TableSpec) error {
}
tx = nil
return nil
}
func (pg *PostGIS) Init(specs []TableSpec) error {
func (pg *PostGIS) Init(m *mapping.Mapping) error {
if err := pg.createSchema(); err != nil {
return err
}
for _, spec := range specs {
if err := pg.createTable(spec); err != nil {
for name, table := range m.Tables {
pg.Tables[name] = NewTableSpec(&pg.Config, table)
}
for _, spec := range pg.Tables {
if err := pg.createTable(*spec); err != nil {
return err
}
}
@ -250,6 +252,7 @@ func Open(conf Config) (DB, error) {
panic("unsupported database type: " + conf.Type)
}
db := &PostGIS{}
db.Tables = make(map[string]*TableSpec)
db.Config = conf
err := db.Open()
if err != nil {
@ -257,78 +260,3 @@ func Open(conf Config) (DB, error) {
}
return db, nil
}
// func InitDb() {
// rawDb, err := sql.Open("postgres", "user=olt host=localhost dbname=olt sslmode=disable")
// if err != nil {
// log.Fatal(err)
// }
// defer rawDb.Close()
// pg := PostGIS{rawDb, "public"}
// pg.createSchema()
// spec := TableSpec{
// "goposm_test",
// pg.Schema,
// []ColumnSpec{
// {"name", "VARCHAR"},
// {"highway", "VARCHAR"},
// },
// "LINESTRING",
// 3857,
// }
// err = pg.createTable(spec)
// if err != nil {
// log.Fatal(err)
// }
// }
// func InsertWays(ways chan []element.Way, wg *sync.WaitGroup) {
// wg.Add(1)
// defer wg.Done()
// rawDb, err := sql.Open("postgres", "user=olt host=localhost dbname=olt sslmode=disable")
// if err != nil {
// log.Fatal(err)
// }
// defer rawDb.Close()
// pg := PostGIS{rawDb, "public"}
// spec := TableSpec{
// "goposm_test",
// pg.Schema,
// []ColumnSpec{
// {"name", "VARCHAR"},
// {"highway", "VARCHAR"},
// },
// "LINESTRING",
// 3857,
// }
// for ws := range ways {
// err = pg.insertWays(ws, spec)
// if err != nil {
// log.Fatal(err)
// }
// }
// }
// func main() {
// wayChan := make(chan element.Way)
// wg := &sync.WaitGroup{}
// go InsertWays(wayChan, wg)
// ways := []element.Way{
// {OSMElem: element.OSMElem{1234, element.Tags{"name": "Foo"}}, Wkb: []byte{0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0}},
// // {OSMElem: element.OSMElem{6666, element.Tags{"name": "Baz", "type": "motorway"}}},
// // {OSMElem: element.OSMElem{9999, element.Tags{"name": "Bar", "type": "bar"}}},
// }
// for _, w := range ways {
// wayChan <- w
// }
// close(wayChan)
// wg.Wait()
// }

View File

@ -11,6 +11,7 @@ import (
"goposm/parser"
"goposm/proj"
"goposm/stats"
"goposm/writer"
"log"
"os"
"runtime"
@ -258,46 +259,38 @@ func main() {
if err != nil {
log.Fatal(err)
}
specs := []db.TableSpec{
{
"goposm_test",
conf.Schema,
[]db.ColumnSpec{
{"name", "VARCHAR", nil},
{"highway", "VARCHAR", nil},
{"oneway", "SMALLINT", mapping.Direction},
},
"GEOMETRY",
conf.Srid,
},
}
err = pg.Init(specs)
err = pg.Init(tagmapping)
if err != nil {
log.Fatal(err)
}
writeDBChan := make(chan writer.InsertBatch)
writeChan := make(chan writer.InsertElement)
waitBuffer := sync.WaitGroup{}
for i := 0; i < runtime.NumCPU(); i++ {
waitDb.Add(1)
go func() {
for ways := range wayChan {
err := pg.InsertWays(ways, specs[0])
if err != nil {
log.Fatal(err)
}
}
writer.DBWriter(pg, writeDBChan)
waitDb.Done()
}()
}
waitBuffer.Add(1)
go func() {
writer.BufferInsertElements(writeChan, writeDBChan)
waitBuffer.Done()
}()
for i := 0; i < runtime.NumCPU(); i++ {
waitFill.Add(1)
go func() {
lineStringTables := tagmapping.LineStringTables()
polygonTables := tagmapping.PolygonTables()
lineStrings := tagmapping.LineStringMatcher()
polygons := tagmapping.PolygonMatcher()
var err error
geos := geos.NewGEOS()
defer geos.Finish()
batch := make([]element.Way, 0, dbImportBatchSize)
for w := range way {
progress.AddWays(1)
ok := osmCache.Coords.FillWay(w)
@ -305,7 +298,7 @@ func main() {
continue
}
proj.NodesToMerc(w.Nodes)
if tables := lineStringTables.Tables(w.Tags); len(tables) > 0 {
if matches := lineStrings.Match(w.OSMElem); len(matches) > 0 {
way := element.Way{}
way.Id = w.Id
way.Tags = w.Tags
@ -319,10 +312,14 @@ func main() {
log.Println(err)
continue
}
batch = append(batch, way)
for _, match := range matches {
row := match.Row(&way.OSMElem)
writeChan <- writer.InsertElement{match.Table, row}
}
}
if w.IsClosed() {
if tables := polygonTables.Tables(w.Tags); len(tables) > 0 {
if matches := polygons.Match(w.OSMElem); len(matches) > 0 {
way := element.Way{}
way.Id = w.Id
way.Tags = w.Tags
@ -336,26 +333,25 @@ func main() {
log.Println(err)
continue
}
batch = append(batch, way)
for _, match := range matches {
row := match.Row(&way.OSMElem)
writeChan <- writer.InsertElement{match.Table, row}
}
}
}
// log.Println(w.Id, w.Tags, m.Tables(w.Tags))
if len(batch) >= int(dbImportBatchSize) {
wayChan <- batch
batch = make([]element.Way, 0, dbImportBatchSize)
}
if *diff {
diffCache.Coords.AddFromWay(w)
}
}
wayChan <- batch
waitFill.Done()
}()
}
waitFill.Wait()
close(wayChan)
close(writeChan)
waitBuffer.Wait()
close(writeDBChan)
waitDb.Wait()
diffCache.Coords.Close()
}

View File

@ -3,6 +3,7 @@ package mapping
import (
"encoding/json"
"flag"
"goposm/element"
"log"
"os"
)
@ -13,12 +14,13 @@ type Field struct {
}
type Table struct {
Name string
Type string `json:"type"`
Mapping map[string][]string `json:"mapping"`
Fields map[string]*Field `json:"fields"`
}
type Tables map[string]Table
type Tables map[string]*Table
type Mapping struct {
Tables Tables `json:"tables"`
@ -44,9 +46,10 @@ func (t *Table) ExtraTags() map[string]bool {
return tags
}
func (m *Mapping) FillFieldKeys() {
for _, t := range m.Tables {
func (m *Mapping) prepare() {
for name, t := range m.Tables {
t.FillFieldKeys()
t.Name = name
}
}
@ -69,6 +72,16 @@ func (m *Mapping) mappings(tableType string, mappings map[string]map[string][]st
}
}
func (m *Mapping) tables(tableType string) map[string]*TableFields {
result := make(map[string]*TableFields)
for name, t := range m.Tables {
if t.Type == tableType {
result[name] = t.TableFields()
}
}
return result
}
func (m *Mapping) extraTags(tableType string, tags map[string]bool) {
for _, t := range m.Tables {
if t.Type != tableType {
@ -108,28 +121,22 @@ func (m *Mapping) RelationTagFilter() *TagFilter {
return &TagFilter{mappings, tags}
}
func (m *Mapping) PointTables() *TagFilter {
func (m *Mapping) PointMatcher() *TagMatcher {
mappings := make(map[string]map[string][]string)
m.mappings("point", mappings)
tags := make(map[string]bool)
m.extraTags("point", tags)
return &TagFilter{mappings, tags}
return &TagMatcher{mappings, m.tables("point")}
}
func (m *Mapping) LineStringTables() *TagFilter {
func (m *Mapping) LineStringMatcher() *TagMatcher {
mappings := make(map[string]map[string][]string)
m.mappings("linestring", mappings)
tags := make(map[string]bool)
m.extraTags("linestring", tags)
return &TagFilter{mappings, tags}
return &TagMatcher{mappings, m.tables("linestring")}
}
func (m *Mapping) PolygonTables() *TagFilter {
func (m *Mapping) PolygonMatcher() *TagMatcher {
mappings := make(map[string]map[string][]string)
m.mappings("polygon", mappings)
tags := make(map[string]bool)
m.extraTags("polygon", tags)
return &TagFilter{mappings, tags}
return &TagMatcher{mappings, m.tables("polygon")}
}
type TagFilter struct {
@ -177,30 +184,46 @@ func (f *RelationTagFilter) Filter(tags map[string]string) bool {
return f.TagFilter.Filter(tags)
}
func (f *TagFilter) Tables(tags map[string]string) []string {
tables := make(map[string]bool)
type TagMatcher struct {
mappings map[string]map[string][]string
tables map[string]*TableFields
}
for k, v := range tags {
values, ok := f.mappings[k]
type Match struct {
Key string
Value string
Table string
tableFields *TableFields
}
func (m *Match) Row(elem *element.OSMElem) []interface{} {
return m.tableFields.MakeRow(elem, *m)
}
func (tagMatcher *TagMatcher) Match(elem element.OSMElem) []Match {
tables := make(map[string]Match)
for k, v := range elem.Tags {
values, ok := tagMatcher.mappings[k]
if ok {
if tbls, ok := values["__any__"]; ok {
for _, t := range tbls {
tables[t] = true
tables[t] = Match{k, v, t, tagMatcher.tables[t]}
}
continue
} else if tbls, ok := values[v]; ok {
for _, t := range tbls {
tables[t] = true
tables[t] = Match{k, v, t, tagMatcher.tables[t]}
}
continue
}
}
}
var tableNames []string
for name, _ := range tables {
tableNames = append(tableNames, name)
var matches []Match
for _, match := range tables {
matches = append(matches, match)
}
return tableNames
return matches
}
func NewMapping(filename string) (*Mapping, error) {
@ -216,27 +239,10 @@ func NewMapping(filename string) (*Mapping, error) {
return nil, err
}
mapping.FillFieldKeys()
mapping.prepare()
return &mapping, nil
}
func Bool(val string, tags map[string]string, elem interface{}) interface{} {
if val == "" || val == "0" || val == "false" || val == "no" {
return false
}
return true
}
func Direction(val string, tags map[string]string, elem interface{}) interface{} {
if val == "1" || val == "yes" || val == "true" {
return 1
} else if val == "-1" {
return -1
} else {
return 0
}
}
func main() {
// data := `
// {

194
mapping/fields.go Normal file
View File

@ -0,0 +1,194 @@
package mapping
import (
"goposm/element"
"log"
"strconv"
)
type FieldSpec struct {
Name string
Type string
ValueFunc func(string, *element.OSMElem, Match) interface{}
}
func (f *FieldSpec) Value(elem *element.OSMElem, match Match) interface{} {
if f.ValueFunc != nil {
return f.ValueFunc(elem.Tags[f.Name], elem, match)
}
return nil
}
type TableFields struct {
fields []FieldSpec
}
func (t *TableFields) MakeRow(elem *element.OSMElem, match Match) []interface{} {
var row []interface{}
for _, field := range t.fields {
row = append(row, field.Value(elem, match))
}
return row
}
func (t *Table) TableFields() *TableFields {
result := TableFields{}
for _, mappingField := range t.Fields {
field := FieldSpec{}
field.Name = mappingField.Key
switch mappingField.Type {
case "id":
field.ValueFunc = Id
case "string":
field.ValueFunc = String
case "direction":
field.ValueFunc = Direction
case "bool":
field.ValueFunc = Bool
case "integer":
field.ValueFunc = Integer
case "wayzorder":
field.ValueFunc = WayZOrder
case "mapping_key":
field.ValueFunc = Key
case "mapping_value":
field.ValueFunc = Value
default:
log.Println("unhandled type:", mappingField.Type)
}
result.fields = append(result.fields, field)
}
return &result
}
// type RowBuilder struct {
// tables map[string]*TableFields
// }
// func NewRowBuilder(m *Mapping) *RowBuilder {
// rb := RowBuilder{make(map[string]*TableFields)}
// for name, t := range m.Tables {
// rb.tables[name] = t.TableFields()
// }
// return &rb
// }
func Bool(val string, elem *element.OSMElem, match Match) interface{} {
if val == "" || val == "0" || val == "false" || val == "no" {
return false
}
return true
}
func String(val string, elem *element.OSMElem, match Match) interface{} {
return val
}
func Integer(val string, elem *element.OSMElem, match Match) interface{} {
v, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil
}
return v
}
func Id(val string, elem *element.OSMElem, match Match) interface{} {
return elem.Id
}
func Key(val string, elem *element.OSMElem, match Match) interface{} {
return match.Key
}
func Value(val string, elem *element.OSMElem, match Match) interface{} {
return match.Value
}
func Direction(val string, elem *element.OSMElem, match Match) interface{} {
if val == "1" || val == "yes" || val == "true" {
return 1
} else if val == "-1" {
return -1
} else {
return 0
}
}
var wayRanks map[string]int
func init() {
wayRanks = map[string]int{
"minor": 3,
"road": 3,
"unclassified": 3,
"residential": 3,
"tertiary_link": 3,
"tertiary": 4,
"secondary_link": 3,
"secondary": 5,
"primary_link": 3,
"primary": 6,
"trunk_link": 3,
"trunk": 8,
"motorway_link": 3,
"motorway": 9,
}
}
func WayZOrder(val string, elem *element.OSMElem, match Match) interface{} {
var z int32
layer, _ := strconv.ParseInt(elem.Tags["layer"], 10, 64)
z += int32(layer) * 10
rank := wayRanks[match.Value]
if rank == 0 {
if _, ok := elem.Tags["railway"]; ok {
rank = 7
}
}
z += int32(rank)
tunnel := elem.Tags["tunnel"]
if tunnel == "true" || tunnel == "yes" || tunnel == "1" {
z -= 10
}
bridge := elem.Tags["bridge"]
if bridge == "true" || bridge == "yes" || bridge == "1" {
z += 10
}
return z
}
// brunnel_bool = Bool()
// def extra_fields(self):
// return []
// def value(self, val, osm_elem):
// tags = osm_elem.tags
// z_order = 0
// l = self.layer(tags)
// z_order += l * 10
// r = self.rank.get(osm_elem.type, 0)
// if not r:
// r = 7 if 'railway' in tags else 0
// z_order += r
// if self.brunnel_bool.value(tags.get('tunnel'), {}):
// z_order -= 10
// if self.brunnel_bool.value(tags.get('bridge'), {}):
// z_order += 10
// return z_order
// def layer(self, tags):
// l = tags.get('layer', 0)
// try:
// return int(l)
// except ValueError:
// return 0

48
writer/writer.go Normal file
View File

@ -0,0 +1,48 @@
package writer
import (
"goposm/db"
"log"
)
const batchSize = 1024
func DBWriter(db db.DB, in chan InsertBatch) {
for batch := range in {
err := db.InsertBatch(batch.Table, batch.Rows)
if err != nil {
log.Println(err)
}
}
}
type InsertBatch struct {
Table string
Rows [][]interface{}
}
func BufferInsertElements(in chan InsertElement, out chan InsertBatch) {
buffer := make(map[string]*InsertBatch)
for elem := range in {
if batch, ok := buffer[elem.Table]; ok {
batch.Rows = append(batch.Rows, elem.Row)
} else {
buffer[elem.Table] = &InsertBatch{elem.Table, [][]interface{}{elem.Row}}
}
if len(buffer[elem.Table].Rows) > batchSize {
batch := buffer[elem.Table]
delete(buffer, elem.Table)
out <- *batch
}
}
for table, batch := range buffer {
delete(buffer, table)
out <- *batch
}
}
type InsertElement struct {
Table string
Row []interface{}
}