From ba70ab33934a4110b1e332fb9c332344dcc09aa8 Mon Sep 17 00:00:00 2001 From: Oliver Tonnhofer Date: Tue, 6 Dec 2016 12:46:55 +0100 Subject: [PATCH] update diff parser API --- update/diff/parser.go | 84 +++++++++++++----- update/process.go | 201 ++++++++++++++++++++---------------------- 2 files changed, 161 insertions(+), 124 deletions(-) diff --git a/update/diff/parser.go b/update/diff/parser.go index 3816f92..26a72d5 100644 --- a/update/diff/parser.go +++ b/update/diff/parser.go @@ -3,6 +3,7 @@ package diff import ( "compress/gzip" "encoding/xml" + "io" "os" "strconv" "time" @@ -22,37 +23,77 @@ type DiffElem struct { Rel *element.Relation } -func Parse(diff string) (chan DiffElem, chan error) { - elems := make(chan DiffElem) - errc := make(chan error) - go parse(diff, elems, errc, false) - return elems, errc +type Parser struct { + reader io.Reader + elems chan DiffElem + errc chan error + metadata bool + running bool + onClose func() error } -func ParseFull(diff string) (chan DiffElem, chan error) { - elems := make(chan DiffElem) - errc := make(chan error) - go parse(diff, elems, errc, true) - return elems, errc +func (p *Parser) SetWithMetadata(metadata bool) { + p.metadata = metadata } -func parse(diff string, elems chan DiffElem, errc chan error, metadata bool) { - defer close(elems) - defer close(errc) - - file, err := os.Open(diff) - if err != nil { - errc <- err - return +func (p *Parser) Next() (DiffElem, error) { + if !p.running { + p.running = true + go parse(p.reader, p.elems, p.errc, p.metadata) + } + select { + case elem, ok := <-p.elems: + if !ok { + p.elems = nil + } else { + return elem, nil + } + case err, ok := <-p.errc: + if !ok { + p.errc = nil + } else { + if p.onClose != nil { + p.onClose() + p.onClose = nil + } + return DiffElem{}, err + } + } + if p.onClose != nil { + err := p.onClose() + p.onClose = nil + return DiffElem{}, err + } + return DiffElem{}, nil +} + +func NewDecoder(r io.Reader) *Parser { + elems := make(chan DiffElem) + errc := make(chan error) + return &Parser{reader: r, elems: elems, errc: errc} +} + +func NewOscGzDecoder(fname string) (*Parser, error) { + file, err := os.Open(fname) + if err != nil { + return nil, err } - defer file.Close() reader, err := gzip.NewReader(file) if err != nil { - errc <- err - return + file.Close() + return nil, err } + elems := make(chan DiffElem) + errc := make(chan error) + return &Parser{reader: reader, elems: elems, errc: errc, onClose: file.Close}, nil +} + +func parse(reader io.Reader, elems chan DiffElem, errc chan error, metadata bool) { + defer close(elems) + defer close(errc) + decoder := xml.NewDecoder(reader) add := false @@ -190,6 +231,7 @@ NextToken: rel = &element.Relation{} newElem = true case "osmChange": + errc <- io.EOF return } diff --git a/update/process.go b/update/process.go index 1c9f6fe..e0a79d4 100644 --- a/update/process.go +++ b/update/process.go @@ -3,6 +3,7 @@ package update import ( "errors" "fmt" + "io" "path/filepath" "runtime" @@ -100,7 +101,10 @@ func Update(oscFile string, geometryLimiter *limit.Limiter, expireor expire.Expi defer log.StopStep(log.StartStep(fmt.Sprintf("Processing %s", oscFile))) - elems, errc := diff.Parse(oscFile) + parser, err := diff.NewOscGzDecoder(oscFile) + if err != nil { + return err + } tagmapping, err := mapping.NewMapping(config.BaseOptions.MappingFile) if err != nil { @@ -197,117 +201,108 @@ func Update(oscFile string, geometryLimiter *limit.Limiter, expireor expire.Expi g := geos.NewGeos() for { - select { - case elem, ok := <-elems: - if !ok { - elems = nil - break - } - if elem.Rel != nil { - relTagFilter.Filter(&elem.Rel.Tags) - progress.AddRelations(1) - } else if elem.Way != nil { - wayTagFilter.Filter(&elem.Way.Tags) - progress.AddWays(1) - } else if elem.Node != nil { - nodeTagFilter.Filter(&elem.Node.Tags) - if len(elem.Node.Tags) > 0 { - progress.AddNodes(1) - } - progress.AddCoords(1) + elem, err := parser.Next() + if err == io.EOF { + break // finished + } + if err != nil { + return diffError(err, "") + } + if elem.Rel != nil { + relTagFilter.Filter(&elem.Rel.Tags) + progress.AddRelations(1) + } else if elem.Way != nil { + wayTagFilter.Filter(&elem.Way.Tags) + progress.AddWays(1) + } else if elem.Node != nil { + nodeTagFilter.Filter(&elem.Node.Tags) + if len(elem.Node.Tags) > 0 { + progress.AddNodes(1) } + progress.AddCoords(1) + } - // always delete, to prevent duplicate elements from overlap of initial - // import and diff import - if err := deleter.Delete(elem); err != nil && err != cache.NotFound { - return diffError(err, "delete element %#v", elem) - } - if elem.Del { - // no new or modified elem -> remove from cache - if elem.Rel != nil { - if err := osmCache.Relations.DeleteRelation(elem.Rel.Id); err != nil && err != cache.NotFound { - return diffError(err, "delete relation %v", elem.Rel) - } - } else if elem.Way != nil { - if err := osmCache.Ways.DeleteWay(elem.Way.Id); err != nil && err != cache.NotFound { - return diffError(err, "delete way %v", elem.Way) - } - if err := diffCache.Ways.Delete(elem.Way.Id); err != nil && err != cache.NotFound { - return diffError(err, "delete way references %v", elem.Way) - } - } else if elem.Node != nil { - if err := osmCache.Nodes.DeleteNode(elem.Node.Id); err != nil && err != cache.NotFound { - return diffError(err, "delete node %v", elem.Node) - } - if err := osmCache.Coords.DeleteCoord(elem.Node.Id); err != nil && err != cache.NotFound { - return diffError(err, "delete coord %v", elem.Node) - } + // always delete, to prevent duplicate elements from overlap of initial + // import and diff import + if err := deleter.Delete(elem); err != nil && err != cache.NotFound { + return diffError(err, "delete element %#v", elem) + } + if elem.Del { + // no new or modified elem -> remove from cache + if elem.Rel != nil { + if err := osmCache.Relations.DeleteRelation(elem.Rel.Id); err != nil && err != cache.NotFound { + return diffError(err, "delete relation %v", elem.Rel) } - } - if elem.Mod && elem.Node != nil && elem.Node.Tags == nil { - // handle modifies where a node drops all tags + } else if elem.Way != nil { + if err := osmCache.Ways.DeleteWay(elem.Way.Id); err != nil && err != cache.NotFound { + return diffError(err, "delete way %v", elem.Way) + } + if err := diffCache.Ways.Delete(elem.Way.Id); err != nil && err != cache.NotFound { + return diffError(err, "delete way references %v", elem.Way) + } + } else if elem.Node != nil { if err := osmCache.Nodes.DeleteNode(elem.Node.Id); err != nil && err != cache.NotFound { return diffError(err, "delete node %v", elem.Node) } - } - if elem.Add || elem.Mod { - if elem.Rel != nil { - // check if first member is cached to avoid caching - // unneeded relations (typical outside of our coverage) - cached, err := osmCache.FirstMemberIsCached(elem.Rel.Members) - if err != nil { - return diffError(err, "query first member %v", elem.Rel) - } - if cached { - err := osmCache.Relations.PutRelation(elem.Rel) - if err != nil { - return diffError(err, "put relation %v", elem.Rel) - } - relIds[elem.Rel.Id] = struct{}{} - } - } else if elem.Way != nil { - // check if first coord is cached to avoid caching - // unneeded ways (typical outside of our coverage) - cached, err := osmCache.Coords.FirstRefIsCached(elem.Way.Refs) - if err != nil { - return diffError(err, "query first ref %v", elem.Way) - } - if cached { - err := osmCache.Ways.PutWay(elem.Way) - if err != nil { - return diffError(err, "put way %v", elem.Way) - } - wayIds[elem.Way.Id] = struct{}{} - } - } else if elem.Node != nil { - addNode := true - if geometryLimiter != nil { - if !geometryLimiter.IntersectsBuffer(g, elem.Node.Long, elem.Node.Lat) { - addNode = false - } - } - if addNode { - err := osmCache.Nodes.PutNode(elem.Node) - if err != nil { - return diffError(err, "put node %v", elem.Node) - } - err = osmCache.Coords.PutCoords([]element.Node{*elem.Node}) - if err != nil { - return diffError(err, "put coord %v", elem.Node) - } - nodeIds[elem.Node.Id] = struct{}{} - } + if err := osmCache.Coords.DeleteCoord(elem.Node.Id); err != nil && err != cache.NotFound { + return diffError(err, "delete coord %v", elem.Node) } } - case err, ok := <-errc: - if !ok { - errc = nil - break - } - return diffError(err, "") } - if errc == nil && elems == nil { - break + if elem.Mod && elem.Node != nil && elem.Node.Tags == nil { + // handle modifies where a node drops all tags + if err := osmCache.Nodes.DeleteNode(elem.Node.Id); err != nil && err != cache.NotFound { + return diffError(err, "delete node %v", elem.Node) + } + } + if elem.Add || elem.Mod { + if elem.Rel != nil { + // check if first member is cached to avoid caching + // unneeded relations (typical outside of our coverage) + cached, err := osmCache.FirstMemberIsCached(elem.Rel.Members) + if err != nil { + return diffError(err, "query first member %v", elem.Rel) + } + if cached { + err := osmCache.Relations.PutRelation(elem.Rel) + if err != nil { + return diffError(err, "put relation %v", elem.Rel) + } + relIds[elem.Rel.Id] = struct{}{} + } + } else if elem.Way != nil { + // check if first coord is cached to avoid caching + // unneeded ways (typical outside of our coverage) + cached, err := osmCache.Coords.FirstRefIsCached(elem.Way.Refs) + if err != nil { + return diffError(err, "query first ref %v", elem.Way) + } + if cached { + err := osmCache.Ways.PutWay(elem.Way) + if err != nil { + return diffError(err, "put way %v", elem.Way) + } + wayIds[elem.Way.Id] = struct{}{} + } + } else if elem.Node != nil { + addNode := true + if geometryLimiter != nil { + if !geometryLimiter.IntersectsBuffer(g, elem.Node.Long, elem.Node.Lat) { + addNode = false + } + } + if addNode { + err := osmCache.Nodes.PutNode(elem.Node) + if err != nil { + return diffError(err, "put node %v", elem.Node) + } + err = osmCache.Coords.PutCoords([]element.Node{*elem.Node}) + if err != nil { + return diffError(err, "put coord %v", elem.Node) + } + nodeIds[elem.Node.Id] = struct{}{} + } + } } }