From 132d938bb4b29021d61ba3af5941952751452ea1 Mon Sep 17 00:00:00 2001 From: Oliver Tonnhofer Date: Tue, 28 Apr 2015 10:33:13 +0200 Subject: [PATCH] refactored pbf parser renamed/document read barrier --- parser/pbf/process.go | 80 +++++++++++++--- parser/pbf/process_test.go | 192 +++++++++++++++++++++++++++++++++++++ reader/reader.go | 39 ++++---- util/sync.go | 36 ------- util/sync_test.go | 32 ------- 5 files changed, 278 insertions(+), 101 deletions(-) create mode 100644 parser/pbf/process_test.go delete mode 100644 util/sync.go delete mode 100644 util/sync_test.go diff --git a/parser/pbf/process.go b/parser/pbf/process.go index 1498ad7..ae36762 100644 --- a/parser/pbf/process.go +++ b/parser/pbf/process.go @@ -5,7 +5,6 @@ import ( "sync" "github.com/omniscale/imposm3/element" - "github.com/omniscale/imposm3/util" ) type parser struct { @@ -16,8 +15,8 @@ type parser struct { relations chan []element.Relation nParser int wg sync.WaitGroup - waySync *util.SyncPoint - relSync *util.SyncPoint + waySync *barrier + relSync *barrier } func NewParser(pbf *Pbf, coords chan []element.Node, nodes chan []element.Node, ways chan []element.Way, relations chan []element.Relation) *parser { @@ -32,7 +31,7 @@ func NewParser(pbf *Pbf, coords chan []element.Node, nodes chan []element.Node, } } -func (p *parser) Start() { +func (p *parser) Parse() { blocks := p.pbf.BlockPositions() for i := 0; i < p.nParser; i++ { p.wg.Add(1) @@ -41,26 +40,41 @@ func (p *parser) Start() { p.parseBlock(block) } if p.waySync != nil { - p.waySync.Sync() + p.waySync.doneWait() } if p.relSync != nil { - p.relSync.Sync() + p.relSync.doneWait() } p.wg.Done() }() } + p.wg.Wait() +} + +func (p *parser) Wait() { + p.wg.Wait() } func (p *parser) Close() { p.wg.Wait() } -func (p *parser) NotifyWays(cb func()) { - p.waySync = util.NewSyncPoint(p.nParser, cb) +// FinishedCoords registers a single function that gets called when all +// nodes and coords are parsed. The callback should block until it is +// safe to continue with parsing of all ways. +// This only works when the PBF file is ordered by type (nodes before ways before relations). +func (p *parser) FinishedCoords(cb func()) { + p.waySync = newBarrier(cb) + p.waySync.add(p.nParser) } -func (p *parser) NotifyRelations(cb func()) { - p.relSync = util.NewSyncPoint(p.nParser, cb) +// FinishedWays registers a single function that gets called when all +// nodes and coords are parsed. The callback should block until it is +// safe to continue with parsing of all ways. +// This only works when the PBF file is ordered by type (nodes before ways before relations). +func (p *parser) FinishedWays(cb func()) { + p.relSync = newBarrier(cb) + p.relSync.add(p.nParser) } func (p *parser) parseBlock(pos block) { @@ -88,20 +102,58 @@ func (p *parser) parseBlock(pos block) { parsedWays := readWays(group.Ways, block, stringtable) if len(parsedWays) > 0 && p.ways != nil { if p.waySync != nil { - p.waySync.Sync() + p.waySync.doneWait() } p.ways <- parsedWays } parsedRelations := readRelations(group.Relations, block, stringtable) if len(parsedRelations) > 0 && p.relations != nil { if p.waySync != nil { - p.waySync.Sync() + p.waySync.doneWait() } if p.relSync != nil { - p.relSync.Sync() + p.relSync.doneWait() } p.relations <- parsedRelations } } - +} + +// barrier is a struct to synchronize multiple goroutines. +// Works similar to a WaitGroup. Except: +// Calls callback function once all goroutines called doneWait(). +// doneWait() blocks until the callback returns. doneWait() does not +// block after all goroutines were blocked once. +type barrier struct { + synced bool + wg sync.WaitGroup + once sync.Once + callbackWg sync.WaitGroup + callback func() +} + +func newBarrier(callback func()) *barrier { + s := &barrier{callback: callback} + s.callbackWg.Add(1) + return s +} + +func (s *barrier) add(delta int) { + s.wg.Add(delta) +} + +func (s *barrier) doneWait() { + if s.synced { + return + } + s.wg.Done() + s.wg.Wait() + s.once.Do(s.call) + s.callbackWg.Wait() +} + +func (s *barrier) call() { + s.callback() + s.synced = true + s.callbackWg.Done() } diff --git a/parser/pbf/process_test.go b/parser/pbf/process_test.go new file mode 100644 index 0000000..cf5a6ad --- /dev/null +++ b/parser/pbf/process_test.go @@ -0,0 +1,192 @@ +package pbf + +import ( + "sync" + "testing" + + "github.com/omniscale/imposm3/element" +) + +func TestParser(t *testing.T) { + nodes := make(chan []element.Node) + coords := make(chan []element.Node) + ways := make(chan []element.Way) + relations := make(chan []element.Relation) + pbf, err := Open("monaco-20150428.osm.pbf") + if err != nil { + t.Fatal(err) + } + p := NewParser(pbf, coords, nodes, ways, relations) + + wg := sync.WaitGroup{} + + var numNodes, numCoords, numWays, numRelations int64 + + go func() { + wg.Add(1) + for nd := range nodes { + numNodes += int64(len(nd)) + } + wg.Done() + }() + + go func() { + wg.Add(1) + for nd := range coords { + numCoords += int64(len(nd)) + } + wg.Done() + }() + + go func() { + wg.Add(1) + for ways := range ways { + numWays += int64(len(ways)) + } + wg.Done() + }() + + go func() { + wg.Add(1) + for rels := range relations { + numRelations += int64(len(rels)) + } + wg.Done() + }() + + p.Parse() + close(nodes) + close(coords) + close(ways) + close(relations) + wg.Wait() + + if numCoords != 17233 { + t.Error("parsed an unexpected number of coords:", numCoords) + } + if numNodes != 978 { + t.Error("parsed an unexpected number of nodes:", numNodes) + } + if numWays != 2398 { + t.Error("parsed an unexpected number of ways:", numWays) + } + if numRelations != 108 { + t.Error("parsed an unexpected number of relations:", numRelations) + } +} + +func TestParserNotify(t *testing.T) { + nodes := make(chan []element.Node) + coords := make(chan []element.Node) + ways := make(chan []element.Way) + relations := make(chan []element.Relation) + pbf, err := Open("monaco-20150428.osm.pbf") + if err != nil { + t.Fatal(err) + } + p := NewParser(pbf, coords, nodes, ways, relations) + waysWg := sync.WaitGroup{} + p.FinishedCoords(func() { + waysWg.Add(1) + coords <- nil + nodes <- nil + waysWg.Done() + waysWg.Wait() + }) + + wg := sync.WaitGroup{} + + var numNodes, numCoords, numWays, numRelations int64 + + waysWg.Add(1) + go func() { + wg.Add(1) + for nd := range nodes { + if nd == nil { + waysWg.Done() + waysWg.Wait() + continue + } + numNodes += int64(len(nd)) + } + wg.Done() + }() + + waysWg.Add(1) + go func() { + wg.Add(1) + for nd := range coords { + if nd == nil { + waysWg.Done() + waysWg.Wait() + continue + } + numCoords += int64(len(nd)) + } + wg.Done() + }() + + go func() { + wg.Add(1) + for ways := range ways { + numWays += int64(len(ways)) + } + wg.Done() + }() + + go func() { + wg.Add(1) + for rels := range relations { + numRelations += int64(len(rels)) + } + wg.Done() + }() + + p.Parse() + close(nodes) + close(coords) + close(ways) + close(relations) + wg.Wait() + + if numCoords != 17233 { + t.Error("parsed an unexpected number of coords:", numCoords) + } + if numNodes != 978 { + t.Error("parsed an unexpected number of nodes:", numNodes) + } + if numWays != 2398 { + t.Error("parsed an unexpected number of ways:", numWays) + } + if numRelations != 108 { + t.Error("parsed an unexpected number of relations:", numRelations) + } +} + +func TestBarrier(t *testing.T) { + done := make(chan bool) + check := int32(0) + bar := newBarrier(func() { + done <- true + check = 1 + }) + bar.add(2) + + wait := func() { + if check != 0 { + panic("check set") + } + bar.doneWait() + if check != 1 { + panic("check not set") + } + } + go wait() + go wait() + + <-done + + // does not wait/block + bar.doneWait() + +} diff --git a/reader/reader.go b/reader/reader.go index c863af6..4115b74 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -16,7 +16,6 @@ import ( "github.com/omniscale/imposm3/mapping" "github.com/omniscale/imposm3/parser/pbf" "github.com/omniscale/imposm3/stats" - "github.com/omniscale/imposm3/util" ) var log = logging.NewLogger("reader") @@ -71,36 +70,33 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics, parser := pbf.NewParser(pbfFile, coords, nodes, ways, relations) - coordsSynced := make(chan bool) - coordsSync := util.NewSyncPoint(int(nCoords+nNodes), func() { - coordsSynced <- true - }) - parser.NotifyWays(func() { + // wait for all coords/nodes to be processed before continuing with + // ways. required for -limitto checks + coordsSync := sync.WaitGroup{} + parser.FinishedCoords(func() { for i := 0; int64(i) < nCoords; i++ { coords <- nil } for i := 0; int64(i) < nNodes; i++ { nodes <- nil } - <-coordsSynced + coordsSync.Wait() }) - waysSynced := make(chan bool) - waysSync := util.NewSyncPoint(int(nWays), func() { - waysSynced <- true - }) - parser.NotifyRelations(func() { + // wait for all ways to be processed before continuing with + // relations. required for -limitto checks + waysSync := sync.WaitGroup{} + parser.FinishedWays(func() { for i := 0; int64(i) < nWays; i++ { ways <- nil } - <-waysSynced + waysSync.Wait() }) - parser.Start() - waitWriter := sync.WaitGroup{} for i := 0; int64(i) < nWays; i++ { + waysSync.Add(1) waitWriter.Add(1) go func() { var skip, hit int @@ -108,7 +104,8 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics, m := tagmapping.WayTagFilter() for ws := range ways { if ws == nil { - waysSync.Sync() + waysSync.Done() + waysSync.Wait() continue } if skipWays { @@ -180,6 +177,7 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics, } for i := 0; int64(i) < nCoords; i++ { + coordsSync.Add(1) waitWriter.Add(1) go func() { var skip, hit int @@ -187,7 +185,8 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics, defer g.Finish() for nds := range coords { if nds == nil { - coordsSync.Sync() + coordsSync.Done() + coordsSync.Wait() continue } if withLimiter { @@ -208,6 +207,7 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics, } for i := 0; int64(i) < nNodes; i++ { + coordsSync.Add(1) waitWriter.Add(1) go func() { g := geos.NewGeos() @@ -215,7 +215,8 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics, m := tagmapping.NodeTagFilter() for nds := range nodes { if nds == nil { - coordsSync.Sync() + coordsSync.Done() + coordsSync.Wait() continue } numWithTags := 0 @@ -237,7 +238,7 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics, }() } - parser.Close() + parser.Parse() close(relations) close(ways) close(nodes) diff --git a/util/sync.go b/util/sync.go deleted file mode 100644 index 75d1bb2..0000000 --- a/util/sync.go +++ /dev/null @@ -1,36 +0,0 @@ -package util - -import ( - "sync" -) - -type SyncPoint struct { - synced bool - wg sync.WaitGroup - once sync.Once - callbackWg sync.WaitGroup - callback func() -} - -func NewSyncPoint(n int, callback func()) *SyncPoint { - s := &SyncPoint{callback: callback} - s.wg.Add(n) - s.callbackWg.Add(1) - return s -} - -func (s *SyncPoint) Sync() { - if s.synced { - return - } - s.wg.Done() - s.wg.Wait() - s.once.Do(s.Call) - s.callbackWg.Wait() -} - -func (s *SyncPoint) Call() { - s.callback() - s.synced = true - s.callbackWg.Done() -} diff --git a/util/sync_test.go b/util/sync_test.go deleted file mode 100644 index 5f356ff..0000000 --- a/util/sync_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package util - -import ( - "testing" -) - -func TestSyncPoint(t *testing.T) { - done := make(chan bool) - check := int32(0) - sp := NewSyncPoint(2, func() { - done <- true - check = 1 - }) - - wait := func() { - if check != 0 { - panic("check set") - } - sp.Sync() - if check != 1 { - panic("check not set") - } - } - go wait() - go wait() - - <-done - - // does not wait/block - sp.Sync() - -}