refactor parser.pbf API

master
Oliver Tonnhofer 2016-12-06 14:47:34 +01:00
parent 7d4bec6909
commit e3593761b5
8 changed files with 121 additions and 91 deletions

View File

@ -14,7 +14,6 @@ import (
"github.com/omniscale/imposm3/geom/limit"
"github.com/omniscale/imposm3/logging"
"github.com/omniscale/imposm3/mapping"
"github.com/omniscale/imposm3/parser/pbf"
"github.com/omniscale/imposm3/reader"
"github.com/omniscale/imposm3/stats"
"github.com/omniscale/imposm3/update/state"
@ -102,11 +101,6 @@ func Import() {
}
progress := stats.NewStatsReporter()
pbfFile, err := pbf.Open(config.ImportOptions.Read)
if err != nil {
log.Fatal(err)
}
if !config.ImportOptions.Appendcache {
// enable optimization if we don't append to existing cache
osmCache.Coords.SetLinearImport(true)
@ -116,16 +110,26 @@ func Import() {
if config.BaseOptions.LimitToCacheBuffer == 0.0 {
readLimiter = nil
}
reader.ReadPbf(osmCache, progress, tagmapping,
pbfFile, readLimiter)
err := reader.ReadPbf(config.ImportOptions.Read,
osmCache,
progress,
tagmapping,
readLimiter,
)
if err != nil {
log.Fatal(err)
}
osmCache.Coords.SetLinearImport(false)
elementCounts = progress.Stop()
osmCache.Close()
log.StopStep(step)
if config.ImportOptions.Diff {
diffstate := state.FromPbf(pbfFile, config.ImportOptions.DiffStateBefore)
if diffstate != nil {
diffstate, err := state.FromPbf(config.ImportOptions.Read, config.ImportOptions.DiffStateBefore)
if err != nil {
log.Print("error parsing diff state form PBF", err)
} else if diffstate != nil {
os.MkdirAll(config.BaseOptions.DiffDir, 0755)
err := diffstate.WriteToFile(path.Join(config.BaseOptions.DiffDir, "last.state.txt"))
if err != nil {

View File

@ -84,7 +84,7 @@ func readPrimitiveBlock(pos block) *osmpbf.PrimitiveBlock {
return block
}
func readAndParseHeaderBlock(pos block) (*pbfHeader, error) {
func readAndParseHeaderBlock(pos block) (*Header, error) {
raw, err := readBlobData(pos)
if err != nil {
return nil, err
@ -102,7 +102,7 @@ func readAndParseHeaderBlock(pos block) (*pbfHeader, error) {
}
}
result := &pbfHeader{}
result := &Header{}
timestamp := header.GetOsmosisReplicationTimestamp()
result.Time = time.Unix(timestamp, 0 /* nanoseconds */)
result.Sequence = header.GetOsmosisReplicationSequenceNumber()
@ -111,27 +111,28 @@ func readAndParseHeaderBlock(pos block) (*pbfHeader, error) {
return result, nil
}
type Pbf struct {
type pbf struct {
file *os.File
Filename string
filename string
offset int64
Header *pbfHeader
header *Header
}
type pbfHeader struct {
type Header struct {
Time time.Time
Sequence int64
Filename string
RequiredFeatures []string
OptionalFeatures []string
}
func Open(filename string) (f *Pbf, err error) {
func open(filename string) (f *pbf, err error) {
file, err := os.Open(filename)
if err != nil {
return nil, err
}
f = &Pbf{Filename: filename, file: file}
f = &pbf{filename: filename, file: file}
err = f.parseHeader()
if err != nil {
file.Close()
@ -140,21 +141,22 @@ func Open(filename string) (f *Pbf, err error) {
return f, nil
}
func (pbf *Pbf) Close() error {
func (pbf *pbf) close() error {
return pbf.file.Close()
}
func (pbf *Pbf) parseHeader() error {
func (pbf *pbf) parseHeader() error {
offset, size, header := pbf.nextBlock()
if header.GetType() != "OSMHeader" {
panic("invalid block type, expected OSMHeader, got " + header.GetType())
}
var err error
pbf.Header, err = readAndParseHeaderBlock(block{pbf.Filename, offset, size})
pbf.header, err = readAndParseHeaderBlock(block{pbf.filename, offset, size})
pbf.header.Filename = pbf.filename
return err
}
func (pbf *Pbf) nextBlock() (offset int64, size int32, header *osmpbf.BlobHeader) {
func (pbf *pbf) nextBlock() (offset int64, size int32, header *osmpbf.BlobHeader) {
header = pbf.nextBlobHeader()
size = header.GetDatasize()
offset = pbf.offset
@ -164,32 +166,32 @@ func (pbf *Pbf) nextBlock() (offset int64, size int32, header *osmpbf.BlobHeader
return offset, size, header
}
func (pbf *Pbf) BlockPositions() (positions chan block) {
func (pbf *pbf) BlockPositions() (positions chan block) {
positions = make(chan block, 8)
go func() {
for {
offset, size, header := pbf.nextBlock()
if size == 0 {
close(positions)
pbf.Close()
pbf.close()
return
}
if header.GetType() != "OSMData" {
panic("invalid block type, expected OSMData, got " + header.GetType())
}
positions <- block{pbf.Filename, offset, size}
positions <- block{pbf.filename, offset, size}
}
}()
return
}
func (pbf *Pbf) nextBlobHeaderSize() (size int32) {
func (pbf *pbf) nextBlobHeaderSize() (size int32) {
pbf.offset += 4
structs.Read(pbf.file, structs.BigEndian, &size)
return
}
func (pbf *Pbf) nextBlobHeader() *osmpbf.BlobHeader {
func (pbf *pbf) nextBlobHeader() *osmpbf.BlobHeader {
var blobHeader = &osmpbf.BlobHeader{}
size := pbf.nextBlobHeaderSize()

View File

@ -14,7 +14,7 @@ import (
func BenchmarkHello(b *testing.B) {
b.StopTimer()
pbf, err := Open("./monaco-20150428.osm.pbf")
pbf, err := open("./monaco-20150428.osm.pbf")
if err != nil {
panic(err)
}

View File

@ -7,8 +7,8 @@ import (
"github.com/omniscale/imposm3/element"
)
type parser struct {
pbf *Pbf
type Parser struct {
pbf *pbf
coords chan []element.Node
nodes chan []element.Node
ways chan []element.Way
@ -19,19 +19,34 @@ type parser struct {
relSync *barrier
}
func NewParser(pbf *Pbf, coords chan []element.Node, nodes chan []element.Node, ways chan []element.Way, relations chan []element.Relation) *parser {
return &parser{
pbf: pbf,
coords: coords,
nodes: nodes,
ways: ways,
relations: relations,
nParser: runtime.NumCPU(),
wg: sync.WaitGroup{},
func NewParser(
filename string,
) (*Parser, error) {
pbf, err := open(filename)
if err != nil {
return nil, err
}
return &Parser{
pbf: pbf,
nParser: runtime.NumCPU(),
wg: sync.WaitGroup{},
}, nil
}
func (p *parser) Parse() {
func (p *Parser) Header() Header {
return *p.pbf.header
}
func (p *Parser) Parse(
coords chan []element.Node,
nodes chan []element.Node,
ways chan []element.Way,
relations chan []element.Relation,
) {
p.coords = coords
p.nodes = nodes
p.ways = ways
p.relations = relations
blocks := p.pbf.BlockPositions()
for i := 0; i < p.nParser; i++ {
p.wg.Add(1)
@ -49,40 +64,27 @@ func (p *parser) Parse() {
}()
}
p.wg.Wait()
if p.nodes != nil {
close(p.nodes)
}
if p.coords != nil {
close(p.coords)
}
if p.ways != nil {
close(p.ways)
}
if p.relations != nil {
close(p.relations)
}
}
// FinishedCoords registers a single function that gets called when all
// nodes and coords are parsed. The callback should block until it is
// safe to continue with parsing of all ways.
// RegisterFirstWayCallback registers a callback that gets called when the
// the first way is parsed. The callback should block until it is
// safe to send ways to the way channel.
// This only works when the PBF file is ordered by type (nodes before ways before relations).
func (p *parser) FinishedCoords(cb func()) {
func (p *Parser) RegisterFirstWayCallback(cb func()) {
p.waySync = newBarrier(cb)
p.waySync.add(p.nParser)
}
// FinishedWays registers a single function that gets called when all
// nodes and coords are parsed. The callback should block until it is
// safe to continue with parsing of all ways.
// RegisterFirstRelationCallback registers a callback that gets called when the
// the first relation is parsed. The callback should block until it is
// safe to send relations to the relation channel.
// This only works when the PBF file is ordered by type (nodes before ways before relations).
func (p *parser) FinishedWays(cb func()) {
func (p *Parser) RegisterFirstRelationCallback(cb func()) {
p.relSync = newBarrier(cb)
p.relSync.add(p.nParser)
}
func (p *parser) parseBlock(pos block) {
func (p *Parser) parseBlock(pos block) {
block := readPrimitiveBlock(pos)
stringtable := newStringTable(block.GetStringtable())

View File

@ -12,11 +12,10 @@ func TestParser(t *testing.T) {
coords := make(chan []element.Node)
ways := make(chan []element.Way)
relations := make(chan []element.Relation)
pbf, err := Open("monaco-20150428.osm.pbf")
p, err := NewParser("monaco-20150428.osm.pbf")
if err != nil {
t.Fatal(err)
}
p := NewParser(pbf, coords, nodes, ways, relations)
wg := sync.WaitGroup{}
@ -54,7 +53,11 @@ func TestParser(t *testing.T) {
wg.Done()
}()
p.Parse()
p.Parse(coords, nodes, ways, relations)
close(coords)
close(nodes)
close(ways)
close(relations)
wg.Wait()
if numCoords != 17233 {
@ -74,11 +77,10 @@ func TestParser(t *testing.T) {
func TestParseCoords(t *testing.T) {
coords := make(chan []element.Node)
pbf, err := Open("monaco-20150428.osm.pbf")
p, err := NewParser("monaco-20150428.osm.pbf")
if err != nil {
t.Fatal(err)
}
p := NewParser(pbf, coords, nil, nil, nil)
wg := sync.WaitGroup{}
@ -92,7 +94,8 @@ func TestParseCoords(t *testing.T) {
wg.Done()
}()
p.Parse()
p.Parse(coords, nil, nil, nil)
close(coords)
wg.Wait()
if numCoords != 17233 {
@ -105,13 +108,13 @@ func TestParserNotify(t *testing.T) {
coords := make(chan []element.Node)
ways := make(chan []element.Way)
relations := make(chan []element.Relation)
pbf, err := Open("monaco-20150428.osm.pbf")
p, err := NewParser("monaco-20150428.osm.pbf")
if err != nil {
t.Fatal(err)
}
p := NewParser(pbf, coords, nodes, ways, relations)
waysWg := sync.WaitGroup{}
p.FinishedCoords(func() {
p.RegisterFirstWayCallback(func() {
waysWg.Add(1)
coords <- nil
nodes <- nil
@ -167,7 +170,11 @@ func TestParserNotify(t *testing.T) {
wg.Done()
}()
p.Parse()
p.Parse(coords, nodes, ways, relations)
close(coords)
close(nodes)
close(ways)
close(relations)
wg.Wait()
if numCoords != 17233 {

View File

@ -50,10 +50,13 @@ func readersForCpus(cpus int) (int64, int64, int64, int64, int64) {
return int64(math.Ceil(cpuf * 0.75)), int64(math.Ceil(cpuf * 0.25)), int64(math.Ceil(cpuf * 0.25)), int64(math.Ceil(cpuf * 0.25)), int64(math.Ceil(cpuf * 0.25))
}
func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
tagmapping *mapping.Mapping, pbfFile *pbf.Pbf,
func ReadPbf(
filename string,
cache *osmcache.OSMCache,
progress *stats.Statistics,
tagmapping *mapping.Mapping,
limiter *limit.Limiter,
) {
) error {
nodes := make(chan []element.Node, 4)
coords := make(chan []element.Node, 4)
ways := make(chan []element.Way, 4)
@ -64,16 +67,19 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
withLimiter = true
}
if pbfFile.Header.Time.Unix() != 0 {
log.Printf("reading %s with data till %v", pbfFile.Filename, pbfFile.Header.Time.Local())
parser, err := pbf.NewParser(filename)
if err != nil {
return err
}
parser := pbf.NewParser(pbfFile, coords, nodes, ways, relations)
if header := parser.Header(); header.Time.Unix() != 0 {
log.Printf("reading %s with data till %v", filename, header.Time.Local())
}
// wait for all coords/nodes to be processed before continuing with
// ways. required for -limitto checks
coordsSync := sync.WaitGroup{}
parser.FinishedCoords(func() {
parser.RegisterFirstWayCallback(func() {
for i := 0; int64(i) < nCoords; i++ {
coords <- nil
}
@ -86,7 +92,7 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
// wait for all ways to be processed before continuing with
// relations. required for -limitto checks
waysSync := sync.WaitGroup{}
parser.FinishedWays(func() {
parser.RegisterFirstRelationCallback(func() {
for i := 0; int64(i) < nWays; i++ {
ways <- nil
}
@ -238,6 +244,12 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics,
}()
}
parser.Parse()
parser.Parse(coords, nodes, ways, relations)
close(nodes)
close(coords)
close(ways)
close(relations)
waitWriter.Wait()
return nil
}

View File

@ -83,7 +83,7 @@ func Diff() {
}
func Update(oscFile string, geometryLimiter *limit.Limiter, expireor expire.Expireor, osmCache *cache.OSMCache, diffCache *cache.DiffCache, force bool) error {
state, err := diffstate.ParseFromOsc(oscFile)
state, err := diffstate.FromOscGz(oscFile)
if err != nil {
return err
}

View File

@ -58,7 +58,7 @@ func WriteLastState(cacheDir string, state *DiffState) error {
return state.WriteToFile(stateFile)
}
func ParseFromOsc(oscFile string) (*DiffState, error) {
func FromOscGz(oscFile string) (*DiffState, error) {
var stateFile string
if !strings.HasSuffix(oscFile, ".osc.gz") {
log.Warn("cannot read state file for non .osc.gz files")
@ -79,15 +79,18 @@ func ParseFromOsc(oscFile string) (*DiffState, error) {
return ParseFile(stateFile)
}
func FromPbf(pbfFile *pbf.Pbf, before time.Duration) *DiffState {
func FromPbf(filename string, before time.Duration) (*DiffState, error) {
pbfFile, err := pbf.NewParser(filename)
if err != nil {
return nil, err
}
var timestamp time.Time
if pbfFile.Header.Time.Unix() != 0 {
timestamp = pbfFile.Header.Time
if pbfFile.Header().Time.Unix() != 0 {
timestamp = pbfFile.Header().Time
} else {
fstat, err := os.Stat(pbfFile.Filename)
fstat, err := os.Stat(filename)
if err != nil {
log.Warn("unable to stat pbffile: ", err)
return nil
return nil, err
}
timestamp = fstat.ModTime()
}
@ -96,12 +99,12 @@ func FromPbf(pbfFile *pbf.Pbf, before time.Duration) *DiffState {
seq := estimateSequence(replicationUrl, timestamp)
if seq == 0 {
return nil
return nil, nil
}
// start earlier
seq -= int32(before.Minutes())
return &DiffState{Time: timestamp, Url: replicationUrl, Sequence: seq}
return &DiffState{Time: timestamp, Url: replicationUrl, Sequence: seq}, nil
}
func ParseFile(stateFile string) (*DiffState, error) {