refactored parser/cache code into reader package

master
Oliver Tonnhofer 2013-05-21 09:07:37 +02:00
parent f00e2fa383
commit ff5e3e96aa
3 changed files with 126 additions and 111 deletions

112
goposm.go
View File

@ -10,8 +10,8 @@ import (
"goposm/geom"
"goposm/geom/geos"
"goposm/mapping"
"goposm/parser"
"goposm/proj"
"goposm/reader"
"goposm/stats"
"goposm/writer"
"log"
@ -24,20 +24,9 @@ import (
"time"
)
var skipCoords, skipNodes, skipWays bool
var dbImportBatchSize int64
func init() {
if os.Getenv("GOPOSM_SKIP_COORDS") != "" {
skipCoords = true
}
if os.Getenv("GOPOSM_SKIP_NODES") != "" {
skipNodes = true
}
if os.Getenv("GOPOSM_SKIP_WAYS") != "" {
skipWays = true
}
dbImportBatchSize, _ = strconv.ParseInt(
os.Getenv("GOPOSM_DBIMPORT_BATCHSIZE"), 10, 32)
@ -50,103 +39,6 @@ type ErrorLevel interface {
Level() int
}
func parse(cache *cache.OSMCache, progress *stats.Statistics, tagmapping *mapping.Mapping, filename string) {
nodes := make(chan []element.Node, 16)
coords := make(chan []element.Node, 16)
ways := make(chan []element.Way, 16)
relations := make(chan []element.Relation, 16)
positions := parser.PBFBlockPositions(filename)
waitParser := sync.WaitGroup{}
for i := 0; i < runtime.NumCPU(); i++ {
waitParser.Add(1)
go func() {
for pos := range positions {
parser.ParseBlock(
pos,
coords,
nodes,
ways,
relations,
)
}
waitParser.Done()
}()
}
waitCounter := sync.WaitGroup{}
for i := 0; i < runtime.NumCPU(); i++ {
waitCounter.Add(1)
go func() {
m := tagmapping.WayTagFilter()
for ws := range ways {
if skipWays {
continue
}
for i, _ := range ws {
m.Filter(&ws[i].Tags)
}
cache.Ways.PutWays(ws)
progress.AddWays(len(ws))
}
waitCounter.Done()
}()
}
for i := 0; i < runtime.NumCPU(); i++ {
waitCounter.Add(1)
go func() {
m := tagmapping.RelationTagFilter()
for rels := range relations {
for i, _ := range rels {
m.Filter(&rels[i].Tags)
}
cache.Relations.PutRelations(rels)
progress.AddRelations(len(rels))
}
waitCounter.Done()
}()
}
for i := 0; i < runtime.NumCPU(); i++ {
waitCounter.Add(1)
go func() {
for nds := range coords {
if skipCoords {
continue
}
cache.Coords.PutCoords(nds)
progress.AddCoords(len(nds))
}
waitCounter.Done()
}()
}
for i := 0; i < 2; i++ {
waitCounter.Add(1)
go func() {
m := tagmapping.NodeTagFilter()
for nds := range nodes {
if skipNodes {
continue
}
for i, _ := range nds {
m.Filter(&nds[i].Tags)
}
n, _ := cache.Nodes.PutNodes(nds)
progress.AddNodes(n)
}
waitCounter.Done()
}()
}
waitParser.Wait()
close(coords)
close(nodes)
close(ways)
close(relations)
waitCounter.Wait()
}
var (
cpuprofile = flag.String("cpuprofile", "", "filename of cpu profile output")
memprofile = flag.String("memprofile", "", "dir name of mem profile output and interval (fname:interval)")
@ -220,7 +112,7 @@ func main() {
if *read != "" {
osmCache.Coords.SetLinearImport(true)
parse(osmCache, progress, tagmapping, *read)
reader.ReadPbf(osmCache, progress, tagmapping, *read)
osmCache.Coords.SetLinearImport(false)
progress.Reset()
osmCache.Coords.Flush()

View File

@ -46,7 +46,7 @@ func ReadPrimitiveBlock(pos BlockPosition) *osmpbf.PrimitiveBlock {
}
func (pbf *PBF) BlockPositions() (positions chan BlockPosition) {
positions = make(chan BlockPosition, 16)
positions = make(chan BlockPosition, 8)
go func() {
for {
offset, size := pbf.NextDataPosition()

123
reader/reader.go Normal file
View File

@ -0,0 +1,123 @@
package reader
import (
"goposm/cache"
"goposm/element"
"goposm/mapping"
"goposm/parser"
"goposm/stats"
"os"
"runtime"
"sync"
)
var skipCoords, skipNodes, skipWays bool
func init() {
if os.Getenv("GOPOSM_SKIP_COORDS") != "" {
skipCoords = true
}
if os.Getenv("GOPOSM_SKIP_NODES") != "" {
skipNodes = true
}
if os.Getenv("GOPOSM_SKIP_WAYS") != "" {
skipWays = true
}
}
func ReadPbf(cache *cache.OSMCache, progress *stats.Statistics, tagmapping *mapping.Mapping, filename string) {
nodes := make(chan []element.Node, 4)
coords := make(chan []element.Node, 4)
ways := make(chan []element.Way, 4)
relations := make(chan []element.Relation, 4)
positions := parser.PBFBlockPositions(filename)
waitParser := sync.WaitGroup{}
for i := 0; i < runtime.NumCPU(); i++ {
waitParser.Add(1)
go func() {
for pos := range positions {
parser.ParseBlock(
pos,
coords,
nodes,
ways,
relations,
)
}
waitParser.Done()
}()
}
waitWriter := sync.WaitGroup{}
for i := 0; i < runtime.NumCPU(); i++ {
waitWriter.Add(1)
go func() {
m := tagmapping.WayTagFilter()
for ws := range ways {
if skipWays {
continue
}
for i, _ := range ws {
m.Filter(&ws[i].Tags)
}
cache.Ways.PutWays(ws)
progress.AddWays(len(ws))
}
waitWriter.Done()
}()
}
for i := 0; i < runtime.NumCPU(); i++ {
waitWriter.Add(1)
go func() {
m := tagmapping.RelationTagFilter()
for rels := range relations {
for i, _ := range rels {
m.Filter(&rels[i].Tags)
}
cache.Relations.PutRelations(rels)
progress.AddRelations(len(rels))
}
waitWriter.Done()
}()
}
for i := 0; i < runtime.NumCPU(); i++ {
waitWriter.Add(1)
go func() {
for nds := range coords {
if skipCoords {
continue
}
cache.Coords.PutCoords(nds)
progress.AddCoords(len(nds))
}
waitWriter.Done()
}()
}
for i := 0; i < runtime.NumCPU(); i++ {
waitWriter.Add(1)
go func() {
m := tagmapping.NodeTagFilter()
for nds := range nodes {
for i, _ := range nds {
m.Filter(&nds[i].Tags)
}
cache.Nodes.PutNodes(nds)
progress.AddNodes(len(nds))
}
waitWriter.Done()
}()
}
waitParser.Wait()
close(coords)
close(nodes)
close(ways)
close(relations)
waitWriter.Wait()
}