refactored writer package

master
Oliver Tonnhofer 2013-05-28 12:54:19 +02:00
parent 2a699a09c8
commit ac3810e17c
4 changed files with 116 additions and 179 deletions

View File

@ -4,53 +4,34 @@ import (
"goposm/cache"
"goposm/element"
"goposm/geom"
"goposm/geom/clipper"
"goposm/geom/geos"
"goposm/mapping"
"goposm/proj"
"goposm/stats"
"log"
"runtime"
"sync"
)
type NodeWriter struct {
osmCache *cache.OSMCache
nodes chan *element.Node
tagMatcher *mapping.TagMatcher
progress *stats.Statistics
insertBuffer *InsertBuffer
wg *sync.WaitGroup
clipper *clipper.Clipper
OsmElemWriter
nodes chan *element.Node
tagMatcher *mapping.TagMatcher
}
func NewNodeWriter(osmCache *cache.OSMCache, nodes chan *element.Node,
insertBuffer *InsertBuffer, tagMatcher *mapping.TagMatcher, progress *stats.Statistics) *NodeWriter {
insertBuffer *InsertBuffer, tagMatcher *mapping.TagMatcher, progress *stats.Statistics) *OsmElemWriter {
nw := NodeWriter{
osmCache: osmCache,
nodes: nodes,
insertBuffer: insertBuffer,
tagMatcher: tagMatcher,
progress: progress,
wg: &sync.WaitGroup{},
OsmElemWriter: OsmElemWriter{
osmCache: osmCache,
progress: progress,
wg: &sync.WaitGroup{},
insertBuffer: insertBuffer,
},
nodes: nodes,
tagMatcher: tagMatcher,
}
return &nw
}
func (nw *NodeWriter) SetClipper(clipper *clipper.Clipper) {
nw.clipper = clipper
}
func (nw *NodeWriter) Start() {
for i := 0; i < runtime.NumCPU(); i++ {
nw.wg.Add(1)
go nw.loop()
}
}
func (nw *NodeWriter) Close() {
nw.wg.Wait()
nw.OsmElemWriter.writer = &nw
return &nw.OsmElemWriter
}
func (nw *NodeWriter) loop() {
@ -79,20 +60,13 @@ func (nw *NodeWriter) loop() {
continue
}
if len(parts) >= 1 {
for _, match := range matches {
row := match.Row(&n.OSMElem)
nw.insertBuffer.Insert(match.Table.Name, row)
}
nw.insertMatches(&n.OSMElem, matches)
}
} else {
for _, match := range matches {
row := match.Row(&n.OSMElem)
nw.insertBuffer.Insert(match.Table.Name, row)
}
nw.insertMatches(&n.OSMElem, matches)
}
}
// fmt.Println(r)
}
nw.wg.Done()
}

View File

@ -5,53 +5,34 @@ import (
"goposm/cache"
"goposm/element"
"goposm/geom"
"goposm/geom/clipper"
"goposm/geom/geos"
"goposm/mapping"
"goposm/proj"
"goposm/stats"
"log"
"runtime"
"sync"
)
type RelationWriter struct {
osmCache *cache.OSMCache
rel chan *element.Relation
tagMatcher *mapping.TagMatcher
progress *stats.Statistics
insertBuffer *InsertBuffer
wg *sync.WaitGroup
clipper *clipper.Clipper
OsmElemWriter
rel chan *element.Relation
tagMatcher *mapping.TagMatcher
}
func NewRelationWriter(osmCache *cache.OSMCache, rel chan *element.Relation,
insertBuffer *InsertBuffer, tagMatcher *mapping.TagMatcher, progress *stats.Statistics) *RelationWriter {
insertBuffer *InsertBuffer, tagMatcher *mapping.TagMatcher, progress *stats.Statistics) *OsmElemWriter {
rw := RelationWriter{
osmCache: osmCache,
rel: rel,
insertBuffer: insertBuffer,
tagMatcher: tagMatcher,
progress: progress,
wg: &sync.WaitGroup{},
OsmElemWriter: OsmElemWriter{
osmCache: osmCache,
progress: progress,
wg: &sync.WaitGroup{},
insertBuffer: insertBuffer,
},
rel: rel,
tagMatcher: tagMatcher,
}
return &rw
}
func (rw *RelationWriter) SetClipper(clipper *clipper.Clipper) {
rw.clipper = clipper
}
func (rw *RelationWriter) Start() {
for i := 0; i < runtime.NumCPU(); i++ {
rw.wg.Add(1)
go rw.loop()
}
}
func (rw *RelationWriter) Close() {
rw.wg.Wait()
rw.OsmElemWriter.writer = &rw
return &rw.OsmElemWriter
}
func (rw *RelationWriter) loop() {
@ -94,9 +75,6 @@ func (rw *RelationWriter) loop() {
}
if matches := rw.tagMatcher.Match(&r.Tags); len(matches) > 0 {
if rw.clipper != nil {
if r.Geom.Geom == nil {
panic("foo")
}
parts, err := rw.clipper.Clip(r.Geom.Geom)
if err != nil {
log.Println(err)
@ -105,16 +83,10 @@ func (rw *RelationWriter) loop() {
for _, g := range parts {
rel := element.Relation(*r)
rel.Geom = &element.Geometry{g, geos.AsWkb(g)}
for _, match := range matches {
row := match.Row(&rel.OSMElem)
rw.insertBuffer.Insert(match.Table.Name, row)
}
rw.insertMatches(&r.OSMElem, matches)
}
} else {
for _, match := range matches {
row := match.Row(&r.OSMElem)
rw.insertBuffer.Insert(match.Table.Name, row)
}
rw.insertMatches(&r.OSMElem, matches)
}
err := rw.osmCache.InsertedWays.PutMembers(r.Members)
if err != nil {

View File

@ -4,56 +4,37 @@ import (
"goposm/cache"
"goposm/element"
"goposm/geom"
"goposm/geom/clipper"
"goposm/geom/geos"
"goposm/mapping"
"goposm/proj"
"goposm/stats"
"log"
"runtime"
"sync"
)
type WayWriter struct {
osmCache *cache.OSMCache
OsmElemWriter
ways chan *element.Way
lineStringTagMatcher *mapping.TagMatcher
polygonTagMatcher *mapping.TagMatcher
progress *stats.Statistics
insertBuffer *InsertBuffer
wg *sync.WaitGroup
clipper *clipper.Clipper
}
func NewWayWriter(osmCache *cache.OSMCache, ways chan *element.Way,
insertBuffer *InsertBuffer, lineStringTagMatcher *mapping.TagMatcher,
polygonTagMatcher *mapping.TagMatcher, progress *stats.Statistics) *WayWriter {
polygonTagMatcher *mapping.TagMatcher, progress *stats.Statistics) *OsmElemWriter {
ww := WayWriter{
osmCache: osmCache,
OsmElemWriter: OsmElemWriter{
osmCache: osmCache,
progress: progress,
wg: &sync.WaitGroup{},
insertBuffer: insertBuffer,
},
ways: ways,
insertBuffer: insertBuffer,
lineStringTagMatcher: lineStringTagMatcher,
polygonTagMatcher: polygonTagMatcher,
progress: progress,
wg: &sync.WaitGroup{},
}
return &ww
}
func (ww *WayWriter) SetClipper(clipper *clipper.Clipper) {
ww.clipper = clipper
}
func (ww *WayWriter) Start() {
for i := 0; i < runtime.NumCPU(); i++ {
ww.wg.Add(1)
go ww.loop()
}
}
func (ww *WayWriter) Close() {
ww.wg.Wait()
ww.OsmElemWriter.writer = &ww
return &ww.OsmElemWriter
}
func (ww *WayWriter) loop() {
@ -76,73 +57,11 @@ func (ww *WayWriter) loop() {
}
proj.NodesToMerc(w.Nodes)
if matches := ww.lineStringTagMatcher.Match(&w.Tags); len(matches) > 0 {
// make copy to avoid interference with polygon matches
way := element.Way(*w)
way.Geom, err = geom.LineStringWkb(geos, way.Nodes)
if err != nil {
if err, ok := err.(ErrorLevel); ok {
if err.Level() <= 0 {
continue
}
}
log.Println(err)
continue
}
if ww.clipper != nil {
parts, err := ww.clipper.Clip(way.Geom.Geom)
if err != nil {
log.Println(err)
continue
}
for _, g := range parts {
way := element.Way(*w)
way.Geom = &element.Geometry{g, geos.AsWkb(g)}
for _, match := range matches {
row := match.Row(&way.OSMElem)
ww.insertBuffer.Insert(match.Table.Name, row)
}
}
} else {
for _, match := range matches {
row := match.Row(&way.OSMElem)
ww.insertBuffer.Insert(match.Table.Name, row)
}
}
ww.buildAndInsert(geos, w, matches, geom.LineStringWkb)
}
if w.IsClosed() {
if matches := ww.polygonTagMatcher.Match(&w.Tags); len(matches) > 0 {
way := element.Way(*w)
way.Geom, err = geom.PolygonWkb(geos, way.Nodes)
if err != nil {
if err, ok := err.(ErrorLevel); ok {
if err.Level() <= 0 {
continue
}
}
log.Println(err)
continue
}
if ww.clipper != nil {
parts, err := ww.clipper.Clip(way.Geom.Geom)
if err != nil {
log.Println(err)
continue
}
for _, g := range parts {
way := element.Way(*w)
way.Geom = &element.Geometry{g, geos.AsWkb(g)}
for _, match := range matches {
row := match.Row(&way.OSMElem)
ww.insertBuffer.Insert(match.Table.Name, row)
}
}
} else {
for _, match := range matches {
row := match.Row(&way.OSMElem)
ww.insertBuffer.Insert(match.Table.Name, row)
}
}
ww.buildAndInsert(geos, w, matches, geom.PolygonWkb)
}
}
@ -152,3 +71,35 @@ func (ww *WayWriter) loop() {
}
ww.wg.Done()
}
type geomBuilder func(*geos.Geos, []element.Node) (*element.Geometry, error)
func (ww *WayWriter) buildAndInsert(geos *geos.Geos, w *element.Way, matches []mapping.Match, builder geomBuilder) {
var err error
// make copy to avoid interference with polygon/linestring matches
way := element.Way(*w)
way.Geom, err = builder(geos, way.Nodes)
if err != nil {
if err, ok := err.(ErrorLevel); ok {
if err.Level() <= 0 {
return
}
}
log.Println(err)
return
}
if ww.clipper != nil {
parts, err := ww.clipper.Clip(way.Geom.Geom)
if err != nil {
log.Println(err)
return
}
for _, g := range parts {
way := element.Way(*w)
way.Geom = &element.Geometry{g, geos.AsWkb(g)}
ww.insertMatches(&w.OSMElem, matches)
}
} else {
ww.insertMatches(&w.OSMElem, matches)
}
}

View File

@ -1,7 +1,12 @@
package writer
import (
"goposm/cache"
"goposm/database"
"goposm/element"
"goposm/geom/clipper"
"goposm/mapping"
"goposm/stats"
"log"
"runtime"
"sync"
@ -43,3 +48,38 @@ func (dw *DbWriter) loop() {
}
dw.wg.Done()
}
type looper interface {
loop()
}
type OsmElemWriter struct {
osmCache *cache.OSMCache
progress *stats.Statistics
insertBuffer *InsertBuffer
wg *sync.WaitGroup
clipper *clipper.Clipper
writer looper
}
func (writer *OsmElemWriter) SetClipper(clipper *clipper.Clipper) {
writer.clipper = clipper
}
func (writer *OsmElemWriter) Start() {
for i := 0; i < runtime.NumCPU(); i++ {
writer.wg.Add(1)
go writer.writer.loop()
}
}
func (writer *OsmElemWriter) Close() {
writer.wg.Wait()
}
func (writer *OsmElemWriter) insertMatches(elem *element.OSMElem, matches []mapping.Match) {
for _, match := range matches {
row := match.Row(elem)
writer.insertBuffer.Insert(match.Table.Name, row)
}
}