From e3593761b5bfa4876359c7a184c55b5a77d3e02d Mon Sep 17 00:00:00 2001 From: Oliver Tonnhofer Date: Tue, 6 Dec 2016 14:47:34 +0100 Subject: [PATCH] refactor parser.pbf API --- import_/import.go | 24 +++++++------ parser/pbf/lowlevel.go | 36 +++++++++++--------- parser/pbf/pbf_test.go | 2 +- parser/pbf/process.go | 70 ++++++++++++++++++++------------------ parser/pbf/process_test.go | 27 +++++++++------ reader/reader.go | 30 +++++++++++----- update/process.go | 2 +- update/state/state.go | 21 +++++++----- 8 files changed, 121 insertions(+), 91 deletions(-) diff --git a/import_/import.go b/import_/import.go index e866cce..1478f36 100644 --- a/import_/import.go +++ b/import_/import.go @@ -14,7 +14,6 @@ import ( "github.com/omniscale/imposm3/geom/limit" "github.com/omniscale/imposm3/logging" "github.com/omniscale/imposm3/mapping" - "github.com/omniscale/imposm3/parser/pbf" "github.com/omniscale/imposm3/reader" "github.com/omniscale/imposm3/stats" "github.com/omniscale/imposm3/update/state" @@ -102,11 +101,6 @@ func Import() { } progress := stats.NewStatsReporter() - pbfFile, err := pbf.Open(config.ImportOptions.Read) - if err != nil { - log.Fatal(err) - } - if !config.ImportOptions.Appendcache { // enable optimization if we don't append to existing cache osmCache.Coords.SetLinearImport(true) @@ -116,16 +110,26 @@ func Import() { if config.BaseOptions.LimitToCacheBuffer == 0.0 { readLimiter = nil } - reader.ReadPbf(osmCache, progress, tagmapping, - pbfFile, readLimiter) + + err := reader.ReadPbf(config.ImportOptions.Read, + osmCache, + progress, + tagmapping, + readLimiter, + ) + if err != nil { + log.Fatal(err) + } osmCache.Coords.SetLinearImport(false) elementCounts = progress.Stop() osmCache.Close() log.StopStep(step) if config.ImportOptions.Diff { - diffstate := state.FromPbf(pbfFile, config.ImportOptions.DiffStateBefore) - if diffstate != nil { + diffstate, err := state.FromPbf(config.ImportOptions.Read, config.ImportOptions.DiffStateBefore) + if err != nil { + log.Print("error parsing diff state form PBF", err) + } else if diffstate != nil { os.MkdirAll(config.BaseOptions.DiffDir, 0755) err := diffstate.WriteToFile(path.Join(config.BaseOptions.DiffDir, "last.state.txt")) if err != nil { diff --git a/parser/pbf/lowlevel.go b/parser/pbf/lowlevel.go index 9edd5d4..33d986b 100644 --- a/parser/pbf/lowlevel.go +++ b/parser/pbf/lowlevel.go @@ -84,7 +84,7 @@ func readPrimitiveBlock(pos block) *osmpbf.PrimitiveBlock { return block } -func readAndParseHeaderBlock(pos block) (*pbfHeader, error) { +func readAndParseHeaderBlock(pos block) (*Header, error) { raw, err := readBlobData(pos) if err != nil { return nil, err @@ -102,7 +102,7 @@ func readAndParseHeaderBlock(pos block) (*pbfHeader, error) { } } - result := &pbfHeader{} + result := &Header{} timestamp := header.GetOsmosisReplicationTimestamp() result.Time = time.Unix(timestamp, 0 /* nanoseconds */) result.Sequence = header.GetOsmosisReplicationSequenceNumber() @@ -111,27 +111,28 @@ func readAndParseHeaderBlock(pos block) (*pbfHeader, error) { return result, nil } -type Pbf struct { +type pbf struct { file *os.File - Filename string + filename string offset int64 - Header *pbfHeader + header *Header } -type pbfHeader struct { +type Header struct { Time time.Time Sequence int64 + Filename string RequiredFeatures []string OptionalFeatures []string } -func Open(filename string) (f *Pbf, err error) { +func open(filename string) (f *pbf, err error) { file, err := os.Open(filename) if err != nil { return nil, err } - f = &Pbf{Filename: filename, file: file} + f = &pbf{filename: filename, file: file} err = f.parseHeader() if err != nil { file.Close() @@ -140,21 +141,22 @@ func Open(filename string) (f *Pbf, err error) { return f, nil } -func (pbf *Pbf) Close() error { +func (pbf *pbf) close() error { return pbf.file.Close() } -func (pbf *Pbf) parseHeader() error { +func (pbf *pbf) parseHeader() error { offset, size, header := pbf.nextBlock() if header.GetType() != "OSMHeader" { panic("invalid block type, expected OSMHeader, got " + header.GetType()) } var err error - pbf.Header, err = readAndParseHeaderBlock(block{pbf.Filename, offset, size}) + pbf.header, err = readAndParseHeaderBlock(block{pbf.filename, offset, size}) + pbf.header.Filename = pbf.filename return err } -func (pbf *Pbf) nextBlock() (offset int64, size int32, header *osmpbf.BlobHeader) { +func (pbf *pbf) nextBlock() (offset int64, size int32, header *osmpbf.BlobHeader) { header = pbf.nextBlobHeader() size = header.GetDatasize() offset = pbf.offset @@ -164,32 +166,32 @@ func (pbf *Pbf) nextBlock() (offset int64, size int32, header *osmpbf.BlobHeader return offset, size, header } -func (pbf *Pbf) BlockPositions() (positions chan block) { +func (pbf *pbf) BlockPositions() (positions chan block) { positions = make(chan block, 8) go func() { for { offset, size, header := pbf.nextBlock() if size == 0 { close(positions) - pbf.Close() + pbf.close() return } if header.GetType() != "OSMData" { panic("invalid block type, expected OSMData, got " + header.GetType()) } - positions <- block{pbf.Filename, offset, size} + positions <- block{pbf.filename, offset, size} } }() return } -func (pbf *Pbf) nextBlobHeaderSize() (size int32) { +func (pbf *pbf) nextBlobHeaderSize() (size int32) { pbf.offset += 4 structs.Read(pbf.file, structs.BigEndian, &size) return } -func (pbf *Pbf) nextBlobHeader() *osmpbf.BlobHeader { +func (pbf *pbf) nextBlobHeader() *osmpbf.BlobHeader { var blobHeader = &osmpbf.BlobHeader{} size := pbf.nextBlobHeaderSize() diff --git a/parser/pbf/pbf_test.go b/parser/pbf/pbf_test.go index a4d1bd4..b86ea7e 100644 --- a/parser/pbf/pbf_test.go +++ b/parser/pbf/pbf_test.go @@ -14,7 +14,7 @@ import ( func BenchmarkHello(b *testing.B) { b.StopTimer() - pbf, err := Open("./monaco-20150428.osm.pbf") + pbf, err := open("./monaco-20150428.osm.pbf") if err != nil { panic(err) } diff --git a/parser/pbf/process.go b/parser/pbf/process.go index 8616f7d..0569b1d 100644 --- a/parser/pbf/process.go +++ b/parser/pbf/process.go @@ -7,8 +7,8 @@ import ( "github.com/omniscale/imposm3/element" ) -type parser struct { - pbf *Pbf +type Parser struct { + pbf *pbf coords chan []element.Node nodes chan []element.Node ways chan []element.Way @@ -19,19 +19,34 @@ type parser struct { relSync *barrier } -func NewParser(pbf *Pbf, coords chan []element.Node, nodes chan []element.Node, ways chan []element.Way, relations chan []element.Relation) *parser { - return &parser{ - pbf: pbf, - coords: coords, - nodes: nodes, - ways: ways, - relations: relations, - nParser: runtime.NumCPU(), - wg: sync.WaitGroup{}, +func NewParser( + filename string, +) (*Parser, error) { + pbf, err := open(filename) + if err != nil { + return nil, err } + return &Parser{ + pbf: pbf, + nParser: runtime.NumCPU(), + wg: sync.WaitGroup{}, + }, nil } -func (p *parser) Parse() { +func (p *Parser) Header() Header { + return *p.pbf.header +} + +func (p *Parser) Parse( + coords chan []element.Node, + nodes chan []element.Node, + ways chan []element.Way, + relations chan []element.Relation, +) { + p.coords = coords + p.nodes = nodes + p.ways = ways + p.relations = relations blocks := p.pbf.BlockPositions() for i := 0; i < p.nParser; i++ { p.wg.Add(1) @@ -49,40 +64,27 @@ func (p *parser) Parse() { }() } p.wg.Wait() - - if p.nodes != nil { - close(p.nodes) - } - if p.coords != nil { - close(p.coords) - } - if p.ways != nil { - close(p.ways) - } - if p.relations != nil { - close(p.relations) - } } -// FinishedCoords registers a single function that gets called when all -// nodes and coords are parsed. The callback should block until it is -// safe to continue with parsing of all ways. +// RegisterFirstWayCallback registers a callback that gets called when the +// the first way is parsed. The callback should block until it is +// safe to send ways to the way channel. // This only works when the PBF file is ordered by type (nodes before ways before relations). -func (p *parser) FinishedCoords(cb func()) { +func (p *Parser) RegisterFirstWayCallback(cb func()) { p.waySync = newBarrier(cb) p.waySync.add(p.nParser) } -// FinishedWays registers a single function that gets called when all -// nodes and coords are parsed. The callback should block until it is -// safe to continue with parsing of all ways. +// RegisterFirstRelationCallback registers a callback that gets called when the +// the first relation is parsed. The callback should block until it is +// safe to send relations to the relation channel. // This only works when the PBF file is ordered by type (nodes before ways before relations). -func (p *parser) FinishedWays(cb func()) { +func (p *Parser) RegisterFirstRelationCallback(cb func()) { p.relSync = newBarrier(cb) p.relSync.add(p.nParser) } -func (p *parser) parseBlock(pos block) { +func (p *Parser) parseBlock(pos block) { block := readPrimitiveBlock(pos) stringtable := newStringTable(block.GetStringtable()) diff --git a/parser/pbf/process_test.go b/parser/pbf/process_test.go index 7e4e737..8ab157a 100644 --- a/parser/pbf/process_test.go +++ b/parser/pbf/process_test.go @@ -12,11 +12,10 @@ func TestParser(t *testing.T) { coords := make(chan []element.Node) ways := make(chan []element.Way) relations := make(chan []element.Relation) - pbf, err := Open("monaco-20150428.osm.pbf") + p, err := NewParser("monaco-20150428.osm.pbf") if err != nil { t.Fatal(err) } - p := NewParser(pbf, coords, nodes, ways, relations) wg := sync.WaitGroup{} @@ -54,7 +53,11 @@ func TestParser(t *testing.T) { wg.Done() }() - p.Parse() + p.Parse(coords, nodes, ways, relations) + close(coords) + close(nodes) + close(ways) + close(relations) wg.Wait() if numCoords != 17233 { @@ -74,11 +77,10 @@ func TestParser(t *testing.T) { func TestParseCoords(t *testing.T) { coords := make(chan []element.Node) - pbf, err := Open("monaco-20150428.osm.pbf") + p, err := NewParser("monaco-20150428.osm.pbf") if err != nil { t.Fatal(err) } - p := NewParser(pbf, coords, nil, nil, nil) wg := sync.WaitGroup{} @@ -92,7 +94,8 @@ func TestParseCoords(t *testing.T) { wg.Done() }() - p.Parse() + p.Parse(coords, nil, nil, nil) + close(coords) wg.Wait() if numCoords != 17233 { @@ -105,13 +108,13 @@ func TestParserNotify(t *testing.T) { coords := make(chan []element.Node) ways := make(chan []element.Way) relations := make(chan []element.Relation) - pbf, err := Open("monaco-20150428.osm.pbf") + + p, err := NewParser("monaco-20150428.osm.pbf") if err != nil { t.Fatal(err) } - p := NewParser(pbf, coords, nodes, ways, relations) waysWg := sync.WaitGroup{} - p.FinishedCoords(func() { + p.RegisterFirstWayCallback(func() { waysWg.Add(1) coords <- nil nodes <- nil @@ -167,7 +170,11 @@ func TestParserNotify(t *testing.T) { wg.Done() }() - p.Parse() + p.Parse(coords, nodes, ways, relations) + close(coords) + close(nodes) + close(ways) + close(relations) wg.Wait() if numCoords != 17233 { diff --git a/reader/reader.go b/reader/reader.go index c348072..726f58c 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -50,10 +50,13 @@ func readersForCpus(cpus int) (int64, int64, int64, int64, int64) { return int64(math.Ceil(cpuf * 0.75)), int64(math.Ceil(cpuf * 0.25)), int64(math.Ceil(cpuf * 0.25)), int64(math.Ceil(cpuf * 0.25)), int64(math.Ceil(cpuf * 0.25)) } -func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics, - tagmapping *mapping.Mapping, pbfFile *pbf.Pbf, +func ReadPbf( + filename string, + cache *osmcache.OSMCache, + progress *stats.Statistics, + tagmapping *mapping.Mapping, limiter *limit.Limiter, -) { +) error { nodes := make(chan []element.Node, 4) coords := make(chan []element.Node, 4) ways := make(chan []element.Way, 4) @@ -64,16 +67,19 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics, withLimiter = true } - if pbfFile.Header.Time.Unix() != 0 { - log.Printf("reading %s with data till %v", pbfFile.Filename, pbfFile.Header.Time.Local()) + parser, err := pbf.NewParser(filename) + if err != nil { + return err } - parser := pbf.NewParser(pbfFile, coords, nodes, ways, relations) + if header := parser.Header(); header.Time.Unix() != 0 { + log.Printf("reading %s with data till %v", filename, header.Time.Local()) + } // wait for all coords/nodes to be processed before continuing with // ways. required for -limitto checks coordsSync := sync.WaitGroup{} - parser.FinishedCoords(func() { + parser.RegisterFirstWayCallback(func() { for i := 0; int64(i) < nCoords; i++ { coords <- nil } @@ -86,7 +92,7 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics, // wait for all ways to be processed before continuing with // relations. required for -limitto checks waysSync := sync.WaitGroup{} - parser.FinishedWays(func() { + parser.RegisterFirstRelationCallback(func() { for i := 0; int64(i) < nWays; i++ { ways <- nil } @@ -238,6 +244,12 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics, }() } - parser.Parse() + parser.Parse(coords, nodes, ways, relations) + close(nodes) + close(coords) + close(ways) + close(relations) waitWriter.Wait() + + return nil } diff --git a/update/process.go b/update/process.go index e0a79d4..d0fc693 100644 --- a/update/process.go +++ b/update/process.go @@ -83,7 +83,7 @@ func Diff() { } func Update(oscFile string, geometryLimiter *limit.Limiter, expireor expire.Expireor, osmCache *cache.OSMCache, diffCache *cache.DiffCache, force bool) error { - state, err := diffstate.ParseFromOsc(oscFile) + state, err := diffstate.FromOscGz(oscFile) if err != nil { return err } diff --git a/update/state/state.go b/update/state/state.go index a982d7c..e2debbb 100644 --- a/update/state/state.go +++ b/update/state/state.go @@ -58,7 +58,7 @@ func WriteLastState(cacheDir string, state *DiffState) error { return state.WriteToFile(stateFile) } -func ParseFromOsc(oscFile string) (*DiffState, error) { +func FromOscGz(oscFile string) (*DiffState, error) { var stateFile string if !strings.HasSuffix(oscFile, ".osc.gz") { log.Warn("cannot read state file for non .osc.gz files") @@ -79,15 +79,18 @@ func ParseFromOsc(oscFile string) (*DiffState, error) { return ParseFile(stateFile) } -func FromPbf(pbfFile *pbf.Pbf, before time.Duration) *DiffState { +func FromPbf(filename string, before time.Duration) (*DiffState, error) { + pbfFile, err := pbf.NewParser(filename) + if err != nil { + return nil, err + } var timestamp time.Time - if pbfFile.Header.Time.Unix() != 0 { - timestamp = pbfFile.Header.Time + if pbfFile.Header().Time.Unix() != 0 { + timestamp = pbfFile.Header().Time } else { - fstat, err := os.Stat(pbfFile.Filename) + fstat, err := os.Stat(filename) if err != nil { - log.Warn("unable to stat pbffile: ", err) - return nil + return nil, err } timestamp = fstat.ModTime() } @@ -96,12 +99,12 @@ func FromPbf(pbfFile *pbf.Pbf, before time.Duration) *DiffState { seq := estimateSequence(replicationUrl, timestamp) if seq == 0 { - return nil + return nil, nil } // start earlier seq -= int32(before.Minutes()) - return &DiffState{Time: timestamp, Url: replicationUrl, Sequence: seq} + return &DiffState{Time: timestamp, Url: replicationUrl, Sequence: seq}, nil } func ParseFile(stateFile string) (*DiffState, error) {