From ff5e3e96aaf239bb7a3ebe3ea5c97d6fea6be720 Mon Sep 17 00:00:00 2001 From: Oliver Tonnhofer Date: Tue, 21 May 2013 09:07:37 +0200 Subject: [PATCH] refactored parser/cache code into reader package --- goposm.go | 112 +---------------------------------------- parser/lowlevel.go | 2 +- reader/reader.go | 123 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 111 deletions(-) create mode 100644 reader/reader.go diff --git a/goposm.go b/goposm.go index 17d7bb1..262a55f 100644 --- a/goposm.go +++ b/goposm.go @@ -10,8 +10,8 @@ import ( "goposm/geom" "goposm/geom/geos" "goposm/mapping" - "goposm/parser" "goposm/proj" + "goposm/reader" "goposm/stats" "goposm/writer" "log" @@ -24,20 +24,9 @@ import ( "time" ) -var skipCoords, skipNodes, skipWays bool var dbImportBatchSize int64 func init() { - if os.Getenv("GOPOSM_SKIP_COORDS") != "" { - skipCoords = true - } - if os.Getenv("GOPOSM_SKIP_NODES") != "" { - skipNodes = true - } - if os.Getenv("GOPOSM_SKIP_WAYS") != "" { - skipWays = true - } - dbImportBatchSize, _ = strconv.ParseInt( os.Getenv("GOPOSM_DBIMPORT_BATCHSIZE"), 10, 32) @@ -50,103 +39,6 @@ type ErrorLevel interface { Level() int } -func parse(cache *cache.OSMCache, progress *stats.Statistics, tagmapping *mapping.Mapping, filename string) { - nodes := make(chan []element.Node, 16) - coords := make(chan []element.Node, 16) - ways := make(chan []element.Way, 16) - relations := make(chan []element.Relation, 16) - - positions := parser.PBFBlockPositions(filename) - - waitParser := sync.WaitGroup{} - for i := 0; i < runtime.NumCPU(); i++ { - waitParser.Add(1) - go func() { - for pos := range positions { - parser.ParseBlock( - pos, - coords, - nodes, - ways, - relations, - ) - } - waitParser.Done() - }() - } - - waitCounter := sync.WaitGroup{} - - for i := 0; i < runtime.NumCPU(); i++ { - waitCounter.Add(1) - go func() { - m := tagmapping.WayTagFilter() - for ws := range ways { - if skipWays { - continue - } - for i, _ := range ws { - m.Filter(&ws[i].Tags) - } - cache.Ways.PutWays(ws) - progress.AddWays(len(ws)) - } - waitCounter.Done() - }() - } - for i := 0; i < runtime.NumCPU(); i++ { - waitCounter.Add(1) - go func() { - m := tagmapping.RelationTagFilter() - for rels := range relations { - for i, _ := range rels { - m.Filter(&rels[i].Tags) - } - cache.Relations.PutRelations(rels) - progress.AddRelations(len(rels)) - } - waitCounter.Done() - }() - } - for i := 0; i < runtime.NumCPU(); i++ { - waitCounter.Add(1) - go func() { - for nds := range coords { - if skipCoords { - continue - } - cache.Coords.PutCoords(nds) - progress.AddCoords(len(nds)) - } - waitCounter.Done() - }() - } - for i := 0; i < 2; i++ { - waitCounter.Add(1) - go func() { - m := tagmapping.NodeTagFilter() - for nds := range nodes { - if skipNodes { - continue - } - for i, _ := range nds { - m.Filter(&nds[i].Tags) - } - n, _ := cache.Nodes.PutNodes(nds) - progress.AddNodes(n) - } - waitCounter.Done() - }() - } - - waitParser.Wait() - close(coords) - close(nodes) - close(ways) - close(relations) - waitCounter.Wait() -} - var ( cpuprofile = flag.String("cpuprofile", "", "filename of cpu profile output") memprofile = flag.String("memprofile", "", "dir name of mem profile output and interval (fname:interval)") @@ -220,7 +112,7 @@ func main() { if *read != "" { osmCache.Coords.SetLinearImport(true) - parse(osmCache, progress, tagmapping, *read) + reader.ReadPbf(osmCache, progress, tagmapping, *read) osmCache.Coords.SetLinearImport(false) progress.Reset() osmCache.Coords.Flush() diff --git a/parser/lowlevel.go b/parser/lowlevel.go index 782ca78..8b2fb9f 100644 --- a/parser/lowlevel.go +++ b/parser/lowlevel.go @@ -46,7 +46,7 @@ func ReadPrimitiveBlock(pos BlockPosition) *osmpbf.PrimitiveBlock { } func (pbf *PBF) BlockPositions() (positions chan BlockPosition) { - positions = make(chan BlockPosition, 16) + positions = make(chan BlockPosition, 8) go func() { for { offset, size := pbf.NextDataPosition() diff --git a/reader/reader.go b/reader/reader.go new file mode 100644 index 0000000..c9f6b2a --- /dev/null +++ b/reader/reader.go @@ -0,0 +1,123 @@ +package reader + +import ( + "goposm/cache" + "goposm/element" + "goposm/mapping" + "goposm/parser" + "goposm/stats" + "os" + "runtime" + "sync" +) + +var skipCoords, skipNodes, skipWays bool + +func init() { + if os.Getenv("GOPOSM_SKIP_COORDS") != "" { + skipCoords = true + } + if os.Getenv("GOPOSM_SKIP_NODES") != "" { + skipNodes = true + } + if os.Getenv("GOPOSM_SKIP_WAYS") != "" { + skipWays = true + } +} + +func ReadPbf(cache *cache.OSMCache, progress *stats.Statistics, tagmapping *mapping.Mapping, filename string) { + nodes := make(chan []element.Node, 4) + coords := make(chan []element.Node, 4) + ways := make(chan []element.Way, 4) + relations := make(chan []element.Relation, 4) + + positions := parser.PBFBlockPositions(filename) + + waitParser := sync.WaitGroup{} + for i := 0; i < runtime.NumCPU(); i++ { + waitParser.Add(1) + go func() { + for pos := range positions { + parser.ParseBlock( + pos, + coords, + nodes, + ways, + relations, + ) + } + waitParser.Done() + }() + } + + waitWriter := sync.WaitGroup{} + + for i := 0; i < runtime.NumCPU(); i++ { + waitWriter.Add(1) + go func() { + m := tagmapping.WayTagFilter() + for ws := range ways { + if skipWays { + continue + } + for i, _ := range ws { + m.Filter(&ws[i].Tags) + } + cache.Ways.PutWays(ws) + progress.AddWays(len(ws)) + } + waitWriter.Done() + }() + } + + for i := 0; i < runtime.NumCPU(); i++ { + waitWriter.Add(1) + go func() { + m := tagmapping.RelationTagFilter() + for rels := range relations { + for i, _ := range rels { + m.Filter(&rels[i].Tags) + } + cache.Relations.PutRelations(rels) + progress.AddRelations(len(rels)) + } + waitWriter.Done() + }() + } + + for i := 0; i < runtime.NumCPU(); i++ { + waitWriter.Add(1) + go func() { + for nds := range coords { + if skipCoords { + continue + } + cache.Coords.PutCoords(nds) + progress.AddCoords(len(nds)) + } + waitWriter.Done() + }() + } + + for i := 0; i < runtime.NumCPU(); i++ { + waitWriter.Add(1) + go func() { + m := tagmapping.NodeTagFilter() + for nds := range nodes { + for i, _ := range nds { + m.Filter(&nds[i].Tags) + } + cache.Nodes.PutNodes(nds) + progress.AddNodes(len(nds)) + } + waitWriter.Done() + }() + } + + waitParser.Wait() + close(coords) + close(nodes) + close(ways) + close(relations) + waitWriter.Wait() +}