refactored pbf parser

renamed/document read barrier
Oliver Tonnhofer 2015-04-28 10:33:13 +02:00
parent c2bb7e7d2e
commit 132d938bb4
5 changed files with 278 additions and 101 deletions

View File

@ -5,7 +5,6 @@ import (
"sync"
"github.com/omniscale/imposm3/element"
"github.com/omniscale/imposm3/util"
)
type parser struct {
@ -16,8 +15,8 @@ type parser struct {
relations chan []element.Relation
nParser int
wg sync.WaitGroup
waySync *util.SyncPoint
relSync *util.SyncPoint
waySync *barrier
relSync *barrier
}
func NewParser(pbf *Pbf, coords chan []element.Node, nodes chan []element.Node, ways chan []element.Way, relations chan []element.Relation) *parser {
@ -32,7 +31,7 @@ func NewParser(pbf *Pbf, coords chan []element.Node, nodes chan []element.Node,
}
}
func (p *parser) Start() {
func (p *parser) Parse() {
blocks := p.pbf.BlockPositions()
for i := 0; i < p.nParser; i++ {
p.wg.Add(1)
@ -41,26 +40,41 @@ func (p *parser) Start() {
p.parseBlock(block)
}
if p.waySync != nil {
p.waySync.Sync()
p.waySync.doneWait()
}
if p.relSync != nil {
p.relSync.Sync()
p.relSync.doneWait()
}
p.wg.Done()
}()
}
p.wg.Wait()
}
func (p *parser) Wait() {
p.wg.Wait()
}
func (p *parser) Close() {
p.wg.Wait()
}
func (p *parser) NotifyWays(cb func()) {
p.waySync = util.NewSyncPoint(p.nParser, cb)
// FinishedCoords registers a single function that gets called when all
// nodes and coords are parsed. The callback should block until it is
// safe to continue with parsing of all ways.
// This only works when the PBF file is ordered by type (nodes before ways before relations).
func (p *parser) FinishedCoords(cb func()) {
p.waySync = newBarrier(cb)
p.waySync.add(p.nParser)
}
func (p *parser) NotifyRelations(cb func()) {
p.relSync = util.NewSyncPoint(p.nParser, cb)
// FinishedWays registers a single function that gets called when all
// nodes and coords are parsed. The callback should block until it is
// safe to continue with parsing of all ways.
// This only works when the PBF file is ordered by type (nodes before ways before relations).
func (p *parser) FinishedWays(cb func()) {
p.relSync = newBarrier(cb)
p.relSync.add(p.nParser)
}
func (p *parser) parseBlock(pos block) {
@ -88,20 +102,58 @@ func (p *parser) parseBlock(pos block) {
parsedWays := readWays(group.Ways, block, stringtable)
if len(parsedWays) > 0 && p.ways != nil {
if p.waySync != nil {
p.waySync.Sync()
p.waySync.doneWait()
}
p.ways <- parsedWays
}
parsedRelations := readRelations(group.Relations, block, stringtable)
if len(parsedRelations) > 0 && p.relations != nil {
if p.waySync != nil {
p.waySync.Sync()
p.waySync.doneWait()
}
if p.relSync != nil {
p.relSync.Sync()
p.relSync.doneWait()
}
p.relations <- parsedRelations
}
}
}
// barrier is a struct to synchronize multiple goroutines.
// Works similar to a WaitGroup. Except:
// Calls callback function once all goroutines called doneWait().
// doneWait() blocks until the callback returns. doneWait() does not
// block after all goroutines were blocked once.
type barrier struct {
synced bool
wg sync.WaitGroup
once sync.Once
callbackWg sync.WaitGroup
callback func()
}
func newBarrier(callback func()) *barrier {
s := &barrier{callback: callback}
s.callbackWg.Add(1)
return s
}
func (s *barrier) add(delta int) {
s.wg.Add(delta)
}
func (s *barrier) doneWait() {
if s.synced {
return
}
s.wg.Done()
s.wg.Wait()
s.once.Do(s.call)
s.callbackWg.Wait()
}
func (s *barrier) call() {
s.callback()
s.synced = true
s.callbackWg.Done()
}

192
parser/pbf/process_test.go Normal file
View File

@ -0,0 +1,192 @@
package pbf
import (
"sync"
"testing"
"github.com/omniscale/imposm3/element"
)
func TestParser(t *testing.T) {
nodes := make(chan []element.Node)
coords := make(chan []element.Node)
ways := make(chan []element.Way)
relations := make(chan []element.Relation)
pbf, err := Open("monaco-20150428.osm.pbf")
if err != nil {
t.Fatal(err)
}
p := NewParser(pbf, coords, nodes, ways, relations)
wg := sync.WaitGroup{}
var numNodes, numCoords, numWays, numRelations int64
go func() {
wg.Add(1)
for nd := range nodes {
numNodes += int64(len(nd))
}
wg.Done()
}()
go func() {
wg.Add(1)
for nd := range coords {
numCoords += int64(len(nd))
}
wg.Done()
}()
go func() {
wg.Add(1)
for ways := range ways {
numWays += int64(len(ways))
}
wg.Done()
}()
go func() {
wg.Add(1)
for rels := range relations {
numRelations += int64(len(rels))
}
wg.Done()
}()
p.Parse()
close(nodes)
close(coords)
close(ways)
close(relations)
wg.Wait()
if numCoords != 17233 {
t.Error("parsed an unexpected number of coords:", numCoords)
}
if numNodes != 978 {
t.Error("parsed an unexpected number of nodes:", numNodes)
}
if numWays != 2398 {
t.Error("parsed an unexpected number of ways:", numWays)
}
if numRelations != 108 {
t.Error("parsed an unexpected number of relations:", numRelations)
}
}
func TestParserNotify(t *testing.T) {
nodes := make(chan []element.Node)
coords := make(chan []element.Node)
ways := make(chan []element.Way)
relations := make(chan []element.Relation)
pbf, err := Open("monaco-20150428.osm.pbf")
if err != nil {
t.Fatal(err)
}
p := NewParser(pbf, coords, nodes, ways, relations)
waysWg := sync.WaitGroup{}
p.FinishedCoords(func() {
waysWg.Add(1)
coords <- nil
nodes <- nil
waysWg.Done()
waysWg.Wait()
})
wg := sync.WaitGroup{}
var numNodes, numCoords, numWays, numRelations int64
waysWg.Add(1)
go func() {
wg.Add(1)
for nd := range nodes {
if nd == nil {
waysWg.Done()
waysWg.Wait()
continue
}
numNodes += int64(len(nd))
}
wg.Done()
}()
waysWg.Add(1)
go func() {
wg.Add(1)
for nd := range coords {
if nd == nil {
waysWg.Done()
waysWg.Wait()
continue
}
numCoords += int64(len(nd))
}
wg.Done()
}()
go func() {
wg.Add(1)
for ways := range ways {
numWays += int64(len(ways))
}
wg.Done()
}()
go func() {
wg.Add(1)
for rels := range relations {
numRelations += int64(len(rels))
}
wg.Done()
}()
p.Parse()
close(nodes)
close(coords)
close(ways)
close(relations)
wg.Wait()
if numCoords != 17233 {
t.Error("parsed an unexpected number of coords:", numCoords)
}
if numNodes != 978 {
t.Error("parsed an unexpected number of nodes:", numNodes)
}
if numWays != 2398 {
t.Error("parsed an unexpected number of ways:", numWays)
}
if numRelations != 108 {
t.Error("parsed an unexpected number of relations:", numRelations)
}
}
func TestBarrier(t *testing.T) {
done := make(chan bool)
check := int32(0)
bar := newBarrier(func() {
done <- true
check = 1
})
bar.add(2)
wait := func() {
if check != 0 {
panic("check set")
}
bar.doneWait()
if check != 1 {
panic("check not set")
}
}
go wait()
go wait()
<-done
// does not wait/block
bar.doneWait()
}

View File

@ -16,7 +16,6 @@ import (
"github.com/omniscale/imposm3/mapping"
"github.com/omniscale/imposm3/parser/pbf"
"github.com/omniscale/imposm3/stats"
"github.com/omniscale/imposm3/util"
)
var log = logging.NewLogger("reader")
@ -71,36 +70,33 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
parser := pbf.NewParser(pbfFile, coords, nodes, ways, relations)
coordsSynced := make(chan bool)
coordsSync := util.NewSyncPoint(int(nCoords+nNodes), func() {
coordsSynced <- true
})
parser.NotifyWays(func() {
// wait for all coords/nodes to be processed before continuing with
// ways. required for -limitto checks
coordsSync := sync.WaitGroup{}
parser.FinishedCoords(func() {
for i := 0; int64(i) < nCoords; i++ {
coords <- nil
}
for i := 0; int64(i) < nNodes; i++ {
nodes <- nil
}
<-coordsSynced
coordsSync.Wait()
})
waysSynced := make(chan bool)
waysSync := util.NewSyncPoint(int(nWays), func() {
waysSynced <- true
})
parser.NotifyRelations(func() {
// wait for all ways to be processed before continuing with
// relations. required for -limitto checks
waysSync := sync.WaitGroup{}
parser.FinishedWays(func() {
for i := 0; int64(i) < nWays; i++ {
ways <- nil
}
<-waysSynced
waysSync.Wait()
})
parser.Start()
waitWriter := sync.WaitGroup{}
for i := 0; int64(i) < nWays; i++ {
waysSync.Add(1)
waitWriter.Add(1)
go func() {
var skip, hit int
@ -108,7 +104,8 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
m := tagmapping.WayTagFilter()
for ws := range ways {
if ws == nil {
waysSync.Sync()
waysSync.Done()
waysSync.Wait()
continue
}
if skipWays {
@ -180,6 +177,7 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
}
for i := 0; int64(i) < nCoords; i++ {
coordsSync.Add(1)
waitWriter.Add(1)
go func() {
var skip, hit int
@ -187,7 +185,8 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
defer g.Finish()
for nds := range coords {
if nds == nil {
coordsSync.Sync()
coordsSync.Done()
coordsSync.Wait()
continue
}
if withLimiter {
@ -208,6 +207,7 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
}
for i := 0; int64(i) < nNodes; i++ {
coordsSync.Add(1)
waitWriter.Add(1)
go func() {
g := geos.NewGeos()
@ -215,7 +215,8 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
m := tagmapping.NodeTagFilter()
for nds := range nodes {
if nds == nil {
coordsSync.Sync()
coordsSync.Done()
coordsSync.Wait()
continue
}
numWithTags := 0
@ -237,7 +238,7 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
}()
}
parser.Close()
parser.Parse()
close(relations)
close(ways)
close(nodes)

View File

@ -1,36 +0,0 @@
package util
import (
"sync"
)
type SyncPoint struct {
synced bool
wg sync.WaitGroup
once sync.Once
callbackWg sync.WaitGroup
callback func()
}
func NewSyncPoint(n int, callback func()) *SyncPoint {
s := &SyncPoint{callback: callback}
s.wg.Add(n)
s.callbackWg.Add(1)
return s
}
func (s *SyncPoint) Sync() {
if s.synced {
return
}
s.wg.Done()
s.wg.Wait()
s.once.Do(s.Call)
s.callbackWg.Wait()
}
func (s *SyncPoint) Call() {
s.callback()
s.synced = true
s.callbackWg.Done()
}

View File

@ -1,32 +0,0 @@
package util
import (
"testing"
)
func TestSyncPoint(t *testing.T) {
done := make(chan bool)
check := int32(0)
sp := NewSyncPoint(2, func() {
done <- true
check = 1
})
wait := func() {
if check != 0 {
panic("check set")
}
sp.Sync()
if check != 1 {
panic("check not set")
}
}
go wait()
go wait()
<-done
// does not wait/block
sp.Sync()
}