refactored nodes/ways/relation writing in to wtire package

master
Oliver Tonnhofer 2013-05-21 09:50:10 +02:00
parent ff5e3e96aa
commit 18eadb0f51
5 changed files with 303 additions and 157 deletions

174
goposm.go
View File

@ -2,15 +2,10 @@ package main
import (
"flag"
"fmt"
"goposm/cache"
"goposm/database"
_ "goposm/database/postgis"
"goposm/element"
"goposm/geom"
"goposm/geom/geos"
"goposm/mapping"
"goposm/proj"
"goposm/reader"
"goposm/stats"
"goposm/writer"
@ -20,7 +15,6 @@ import (
"runtime/pprof"
"strconv"
"strings"
"sync"
"time"
)
@ -35,10 +29,6 @@ func init() {
}
}
type ErrorLevel interface {
Level() int
}
var (
cpuprofile = flag.String("cpuprofile", "", "filename of cpu profile output")
memprofile = flag.String("memprofile", "", "dir name of mem profile output and interval (fname:interval)")
@ -129,8 +119,6 @@ func main() {
log.Fatal(err)
}
waitFill := sync.WaitGroup{}
wayChan := make(chan []element.Way)
conf := database.Config{
Type: "postgis",
ConnectionParams: *connection,
@ -149,156 +137,28 @@ func main() {
insertBuffer := writer.NewInsertBuffer()
dbWriter := writer.NewDbWriter(pg, insertBuffer.Out)
rel := osmCache.Relations.Iter()
polygons := tagmapping.PolygonMatcher()
pointsTagMatcher := tagmapping.PointMatcher()
lineStringsTagMatcher := tagmapping.LineStringMatcher()
polygonsTagMatcher := tagmapping.PolygonMatcher()
for r := range rel {
progress.AddRelations(1)
err := osmCache.Ways.FillMembers(r.Members)
if err == cache.NotFound {
// fmt.Println("missing ways for relation", r.Id)
} else if err != nil {
fmt.Println(err)
continue
}
for _, m := range r.Members {
if m.Way == nil {
continue
}
err := osmCache.Coords.FillWay(m.Way)
if err == cache.NotFound {
// fmt.Println("missing nodes for way", m.Way.Id, "in relation", r.Id)
} else if err != nil {
fmt.Println(err)
continue
}
proj.NodesToMerc(m.Way.Nodes)
}
relations := osmCache.Relations.Iter()
relWriter := writer.NewRelationWriter(osmCache, relations,
insertBuffer, polygonsTagMatcher, progress)
// blocks till the Relations.Iter() finishes
relWriter.Close()
err = geom.BuildRelation(r)
if err != nil {
if err, ok := err.(ErrorLevel); ok {
if err.Level() <= 0 {
continue
}
}
log.Println(err)
continue
}
if matches := polygons.Match(&r.OSMElem); len(matches) > 0 {
for _, match := range matches {
row := match.Row(&r.OSMElem)
insertBuffer.Insert(match.Table, row)
}
err := osmCache.InsertedWays.PutMembers(r.Members)
if err != nil {
fmt.Println(err)
}
}
}
way := osmCache.Ways.Iter()
for i := 0; i < runtime.NumCPU(); i++ {
waitFill.Add(1)
go func() {
lineStrings := tagmapping.LineStringMatcher()
polygons := tagmapping.PolygonMatcher()
geos := geos.NewGEOS()
defer geos.Finish()
for w := range way {
progress.AddWays(1)
inserted, err := osmCache.InsertedWays.IsInserted(w.Id)
if err != nil {
log.Println(err)
continue
}
if inserted {
continue
}
err = osmCache.Coords.FillWay(w)
if err != nil {
continue
}
proj.NodesToMerc(w.Nodes)
if matches := lineStrings.Match(&w.OSMElem); len(matches) > 0 {
// make copy to avoid interference with polygon matches
way := element.Way(*w)
way.Geom, err = geom.LineStringWKB(geos, way.Nodes)
if err != nil {
if err, ok := err.(ErrorLevel); ok {
if err.Level() <= 0 {
continue
}
}
log.Println(err)
continue
}
for _, match := range matches {
row := match.Row(&way.OSMElem)
insertBuffer.Insert(match.Table, row)
}
}
if w.IsClosed() {
if matches := polygons.Match(&w.OSMElem); len(matches) > 0 {
way := element.Way(*w)
way.Geom, err = geom.PolygonWKB(geos, way.Nodes)
if err != nil {
if err, ok := err.(ErrorLevel); ok {
if err.Level() <= 0 {
continue
}
}
log.Println(err)
continue
}
for _, match := range matches {
row := match.Row(&way.OSMElem)
insertBuffer.Insert(match.Table, row)
}
}
}
if *diff {
diffCache.Coords.AddFromWay(w)
}
}
waitFill.Done()
}()
}
waitFill.Wait()
close(wayChan)
diffCache.Coords.Close()
ways := osmCache.Ways.Iter()
wayWriter := writer.NewWayWriter(osmCache, ways, insertBuffer,
lineStringsTagMatcher, polygonsTagMatcher, progress)
nodes := osmCache.Nodes.Iter()
points := tagmapping.PointMatcher()
geos := geos.NewGEOS()
defer geos.Finish()
for n := range nodes {
progress.AddNodes(1)
if matches := points.Match(&n.OSMElem); len(matches) > 0 {
proj.NodeToMerc(n)
n.Geom, err = geom.PointWKB(geos, *n)
if err != nil {
if err, ok := err.(ErrorLevel); ok {
if err.Level() <= 0 {
continue
}
}
log.Println(err)
continue
}
for _, match := range matches {
row := match.Row(&n.OSMElem)
insertBuffer.Insert(match.Table, row)
}
nodeWriter := writer.NewNodeWriter(osmCache, nodes, insertBuffer,
pointsTagMatcher, progress)
}
// fmt.Println(r)
}
diffCache.Coords.Close()
wayWriter.Close()
nodeWriter.Close()
insertBuffer.Close()
dbWriter.Close()

75
writer/nodes.go Normal file
View File

@ -0,0 +1,75 @@
package writer
import (
"goposm/cache"
"goposm/element"
"goposm/geom"
"goposm/geom/geos"
"goposm/mapping"
"goposm/proj"
"goposm/stats"
"log"
"runtime"
"sync"
)
type NodeWriter struct {
osmCache *cache.OSMCache
nodes chan *element.Node
tagMatcher *mapping.TagMatcher
progress *stats.Statistics
insertBuffer *InsertBuffer
wg *sync.WaitGroup
}
func NewNodeWriter(osmCache *cache.OSMCache, nodes chan *element.Node,
insertBuffer *InsertBuffer, tagMatcher *mapping.TagMatcher, progress *stats.Statistics) *NodeWriter {
nw := NodeWriter{
osmCache: osmCache,
nodes: nodes,
insertBuffer: insertBuffer,
tagMatcher: tagMatcher,
progress: progress,
wg: &sync.WaitGroup{},
}
for i := 0; i < runtime.NumCPU(); i++ {
nw.wg.Add(1)
go nw.loop()
}
return &nw
}
func (nw *NodeWriter) Close() {
nw.wg.Wait()
}
func (nw *NodeWriter) loop() {
geos := geos.NewGEOS()
defer geos.Finish()
var err error
for n := range nw.nodes {
nw.progress.AddNodes(1)
if matches := nw.tagMatcher.Match(&n.OSMElem); len(matches) > 0 {
proj.NodeToMerc(n)
n.Geom, err = geom.PointWKB(geos, *n)
if err != nil {
if err, ok := err.(ErrorLevel); ok {
if err.Level() <= 0 {
continue
}
}
log.Println(err)
continue
}
for _, match := range matches {
row := match.Row(&n.OSMElem)
nw.insertBuffer.Insert(match.Table, row)
}
}
// fmt.Println(r)
}
nw.wg.Done()
}

94
writer/relations.go Normal file
View File

@ -0,0 +1,94 @@
package writer
import (
"fmt"
"goposm/cache"
"goposm/element"
"goposm/geom"
"goposm/mapping"
"goposm/proj"
"goposm/stats"
"log"
"runtime"
"sync"
)
type RelationWriter struct {
osmCache *cache.OSMCache
rel chan *element.Relation
tagMatcher *mapping.TagMatcher
progress *stats.Statistics
insertBuffer *InsertBuffer
wg *sync.WaitGroup
}
func NewRelationWriter(osmCache *cache.OSMCache, rel chan *element.Relation,
insertBuffer *InsertBuffer, tagMatcher *mapping.TagMatcher, progress *stats.Statistics) *RelationWriter {
rw := RelationWriter{
osmCache: osmCache,
rel: rel,
insertBuffer: insertBuffer,
tagMatcher: tagMatcher,
progress: progress,
wg: &sync.WaitGroup{},
}
for i := 0; i < runtime.NumCPU(); i++ {
rw.wg.Add(1)
go rw.loop()
}
return &rw
}
func (rw *RelationWriter) Close() {
rw.wg.Wait()
}
func (rw *RelationWriter) loop() {
for r := range rw.rel {
rw.progress.AddRelations(1)
err := rw.osmCache.Ways.FillMembers(r.Members)
if err == cache.NotFound {
// fmt.Println("missing ways for relation", r.Id)
} else if err != nil {
fmt.Println(err)
continue
}
for _, m := range r.Members {
if m.Way == nil {
continue
}
err := rw.osmCache.Coords.FillWay(m.Way)
if err == cache.NotFound {
// fmt.Println("missing nodes for way", m.Way.Id, "in relation", r.Id)
} else if err != nil {
fmt.Println(err)
continue
}
proj.NodesToMerc(m.Way.Nodes)
}
err = geom.BuildRelation(r)
if err != nil {
if err, ok := err.(ErrorLevel); ok {
if err.Level() <= 0 {
continue
}
}
log.Println(err)
continue
}
if matches := rw.tagMatcher.Match(&r.OSMElem); len(matches) > 0 {
for _, match := range matches {
row := match.Row(&r.OSMElem)
rw.insertBuffer.Insert(match.Table, row)
}
err := rw.osmCache.InsertedWays.PutMembers(r.Members)
if err != nil {
fmt.Println(err)
}
}
}
rw.wg.Done()
}

113
writer/ways.go Normal file
View File

@ -0,0 +1,113 @@
package writer
import (
"goposm/cache"
"goposm/element"
"goposm/geom"
"goposm/geom/geos"
"goposm/mapping"
"goposm/proj"
"goposm/stats"
"log"
"runtime"
"sync"
)
type WayWriter struct {
osmCache *cache.OSMCache
ways chan *element.Way
lineStringTagMatcher *mapping.TagMatcher
polygonTagMatcher *mapping.TagMatcher
progress *stats.Statistics
insertBuffer *InsertBuffer
wg *sync.WaitGroup
}
func NewWayWriter(osmCache *cache.OSMCache, ways chan *element.Way,
insertBuffer *InsertBuffer, lineStringTagMatcher *mapping.TagMatcher,
polygonTagMatcher *mapping.TagMatcher, progress *stats.Statistics) *WayWriter {
ww := WayWriter{
osmCache: osmCache,
ways: ways,
insertBuffer: insertBuffer,
lineStringTagMatcher: lineStringTagMatcher,
polygonTagMatcher: polygonTagMatcher,
progress: progress,
wg: &sync.WaitGroup{},
}
for i := 0; i < runtime.NumCPU(); i++ {
ww.wg.Add(1)
go ww.loop()
}
return &ww
}
func (ww *WayWriter) Close() {
ww.wg.Wait()
}
func (ww *WayWriter) loop() {
geos := geos.NewGEOS()
defer geos.Finish()
for w := range ww.ways {
ww.progress.AddWays(1)
inserted, err := ww.osmCache.InsertedWays.IsInserted(w.Id)
if err != nil {
log.Println(err)
continue
}
if inserted {
continue
}
err = ww.osmCache.Coords.FillWay(w)
if err != nil {
continue
}
proj.NodesToMerc(w.Nodes)
if matches := ww.lineStringTagMatcher.Match(&w.OSMElem); len(matches) > 0 {
// make copy to avoid interference with polygon matches
way := element.Way(*w)
way.Geom, err = geom.LineStringWKB(geos, way.Nodes)
if err != nil {
if err, ok := err.(ErrorLevel); ok {
if err.Level() <= 0 {
continue
}
}
log.Println(err)
continue
}
for _, match := range matches {
row := match.Row(&way.OSMElem)
ww.insertBuffer.Insert(match.Table, row)
}
}
if w.IsClosed() {
if matches := ww.polygonTagMatcher.Match(&w.OSMElem); len(matches) > 0 {
way := element.Way(*w)
way.Geom, err = geom.PolygonWKB(geos, way.Nodes)
if err != nil {
if err, ok := err.(ErrorLevel); ok {
if err.Level() <= 0 {
continue
}
}
log.Println(err)
continue
}
for _, match := range matches {
row := match.Row(&way.OSMElem)
ww.insertBuffer.Insert(match.Table, row)
}
}
}
// if *diff {
// ww.diffCache.Coords.AddFromWay(w)
// }
}
ww.wg.Done()
}

View File

@ -7,6 +7,10 @@ import (
"sync"
)
type ErrorLevel interface {
Level() int
}
type DbWriter struct {
Db database.DB
In chan InsertBatch