update diff parser API

master
Oliver Tonnhofer 2016-12-06 12:46:55 +01:00
parent f8d128f966
commit ba70ab3393
2 changed files with 161 additions and 124 deletions

View File

@ -3,6 +3,7 @@ package diff
import (
"compress/gzip"
"encoding/xml"
"io"
"os"
"strconv"
"time"
@ -22,37 +23,77 @@ type DiffElem struct {
Rel *element.Relation
}
func Parse(diff string) (chan DiffElem, chan error) {
elems := make(chan DiffElem)
errc := make(chan error)
go parse(diff, elems, errc, false)
return elems, errc
type Parser struct {
reader io.Reader
elems chan DiffElem
errc chan error
metadata bool
running bool
onClose func() error
}
func ParseFull(diff string) (chan DiffElem, chan error) {
elems := make(chan DiffElem)
errc := make(chan error)
go parse(diff, elems, errc, true)
return elems, errc
func (p *Parser) SetWithMetadata(metadata bool) {
p.metadata = metadata
}
func parse(diff string, elems chan DiffElem, errc chan error, metadata bool) {
defer close(elems)
defer close(errc)
file, err := os.Open(diff)
if err != nil {
errc <- err
return
func (p *Parser) Next() (DiffElem, error) {
if !p.running {
p.running = true
go parse(p.reader, p.elems, p.errc, p.metadata)
}
select {
case elem, ok := <-p.elems:
if !ok {
p.elems = nil
} else {
return elem, nil
}
case err, ok := <-p.errc:
if !ok {
p.errc = nil
} else {
if p.onClose != nil {
p.onClose()
p.onClose = nil
}
return DiffElem{}, err
}
}
if p.onClose != nil {
err := p.onClose()
p.onClose = nil
return DiffElem{}, err
}
return DiffElem{}, nil
}
func NewDecoder(r io.Reader) *Parser {
elems := make(chan DiffElem)
errc := make(chan error)
return &Parser{reader: r, elems: elems, errc: errc}
}
func NewOscGzDecoder(fname string) (*Parser, error) {
file, err := os.Open(fname)
if err != nil {
return nil, err
}
defer file.Close()
reader, err := gzip.NewReader(file)
if err != nil {
errc <- err
return
file.Close()
return nil, err
}
elems := make(chan DiffElem)
errc := make(chan error)
return &Parser{reader: reader, elems: elems, errc: errc, onClose: file.Close}, nil
}
func parse(reader io.Reader, elems chan DiffElem, errc chan error, metadata bool) {
defer close(elems)
defer close(errc)
decoder := xml.NewDecoder(reader)
add := false
@ -190,6 +231,7 @@ NextToken:
rel = &element.Relation{}
newElem = true
case "osmChange":
errc <- io.EOF
return
}

View File

@ -3,6 +3,7 @@ package update
import (
"errors"
"fmt"
"io"
"path/filepath"
"runtime"
@ -100,7 +101,10 @@ func Update(oscFile string, geometryLimiter *limit.Limiter, expireor expire.Expi
defer log.StopStep(log.StartStep(fmt.Sprintf("Processing %s", oscFile)))
elems, errc := diff.Parse(oscFile)
parser, err := diff.NewOscGzDecoder(oscFile)
if err != nil {
return err
}
tagmapping, err := mapping.NewMapping(config.BaseOptions.MappingFile)
if err != nil {
@ -197,117 +201,108 @@ func Update(oscFile string, geometryLimiter *limit.Limiter, expireor expire.Expi
g := geos.NewGeos()
for {
select {
case elem, ok := <-elems:
if !ok {
elems = nil
break
}
if elem.Rel != nil {
relTagFilter.Filter(&elem.Rel.Tags)
progress.AddRelations(1)
} else if elem.Way != nil {
wayTagFilter.Filter(&elem.Way.Tags)
progress.AddWays(1)
} else if elem.Node != nil {
nodeTagFilter.Filter(&elem.Node.Tags)
if len(elem.Node.Tags) > 0 {
progress.AddNodes(1)
}
progress.AddCoords(1)
elem, err := parser.Next()
if err == io.EOF {
break // finished
}
if err != nil {
return diffError(err, "")
}
if elem.Rel != nil {
relTagFilter.Filter(&elem.Rel.Tags)
progress.AddRelations(1)
} else if elem.Way != nil {
wayTagFilter.Filter(&elem.Way.Tags)
progress.AddWays(1)
} else if elem.Node != nil {
nodeTagFilter.Filter(&elem.Node.Tags)
if len(elem.Node.Tags) > 0 {
progress.AddNodes(1)
}
progress.AddCoords(1)
}
// always delete, to prevent duplicate elements from overlap of initial
// import and diff import
if err := deleter.Delete(elem); err != nil && err != cache.NotFound {
return diffError(err, "delete element %#v", elem)
}
if elem.Del {
// no new or modified elem -> remove from cache
if elem.Rel != nil {
if err := osmCache.Relations.DeleteRelation(elem.Rel.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete relation %v", elem.Rel)
}
} else if elem.Way != nil {
if err := osmCache.Ways.DeleteWay(elem.Way.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete way %v", elem.Way)
}
if err := diffCache.Ways.Delete(elem.Way.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete way references %v", elem.Way)
}
} else if elem.Node != nil {
if err := osmCache.Nodes.DeleteNode(elem.Node.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete node %v", elem.Node)
}
if err := osmCache.Coords.DeleteCoord(elem.Node.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete coord %v", elem.Node)
}
// always delete, to prevent duplicate elements from overlap of initial
// import and diff import
if err := deleter.Delete(elem); err != nil && err != cache.NotFound {
return diffError(err, "delete element %#v", elem)
}
if elem.Del {
// no new or modified elem -> remove from cache
if elem.Rel != nil {
if err := osmCache.Relations.DeleteRelation(elem.Rel.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete relation %v", elem.Rel)
}
}
if elem.Mod && elem.Node != nil && elem.Node.Tags == nil {
// handle modifies where a node drops all tags
} else if elem.Way != nil {
if err := osmCache.Ways.DeleteWay(elem.Way.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete way %v", elem.Way)
}
if err := diffCache.Ways.Delete(elem.Way.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete way references %v", elem.Way)
}
} else if elem.Node != nil {
if err := osmCache.Nodes.DeleteNode(elem.Node.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete node %v", elem.Node)
}
}
if elem.Add || elem.Mod {
if elem.Rel != nil {
// check if first member is cached to avoid caching
// unneeded relations (typical outside of our coverage)
cached, err := osmCache.FirstMemberIsCached(elem.Rel.Members)
if err != nil {
return diffError(err, "query first member %v", elem.Rel)
}
if cached {
err := osmCache.Relations.PutRelation(elem.Rel)
if err != nil {
return diffError(err, "put relation %v", elem.Rel)
}
relIds[elem.Rel.Id] = struct{}{}
}
} else if elem.Way != nil {
// check if first coord is cached to avoid caching
// unneeded ways (typical outside of our coverage)
cached, err := osmCache.Coords.FirstRefIsCached(elem.Way.Refs)
if err != nil {
return diffError(err, "query first ref %v", elem.Way)
}
if cached {
err := osmCache.Ways.PutWay(elem.Way)
if err != nil {
return diffError(err, "put way %v", elem.Way)
}
wayIds[elem.Way.Id] = struct{}{}
}
} else if elem.Node != nil {
addNode := true
if geometryLimiter != nil {
if !geometryLimiter.IntersectsBuffer(g, elem.Node.Long, elem.Node.Lat) {
addNode = false
}
}
if addNode {
err := osmCache.Nodes.PutNode(elem.Node)
if err != nil {
return diffError(err, "put node %v", elem.Node)
}
err = osmCache.Coords.PutCoords([]element.Node{*elem.Node})
if err != nil {
return diffError(err, "put coord %v", elem.Node)
}
nodeIds[elem.Node.Id] = struct{}{}
}
if err := osmCache.Coords.DeleteCoord(elem.Node.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete coord %v", elem.Node)
}
}
case err, ok := <-errc:
if !ok {
errc = nil
break
}
return diffError(err, "")
}
if errc == nil && elems == nil {
break
if elem.Mod && elem.Node != nil && elem.Node.Tags == nil {
// handle modifies where a node drops all tags
if err := osmCache.Nodes.DeleteNode(elem.Node.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete node %v", elem.Node)
}
}
if elem.Add || elem.Mod {
if elem.Rel != nil {
// check if first member is cached to avoid caching
// unneeded relations (typical outside of our coverage)
cached, err := osmCache.FirstMemberIsCached(elem.Rel.Members)
if err != nil {
return diffError(err, "query first member %v", elem.Rel)
}
if cached {
err := osmCache.Relations.PutRelation(elem.Rel)
if err != nil {
return diffError(err, "put relation %v", elem.Rel)
}
relIds[elem.Rel.Id] = struct{}{}
}
} else if elem.Way != nil {
// check if first coord is cached to avoid caching
// unneeded ways (typical outside of our coverage)
cached, err := osmCache.Coords.FirstRefIsCached(elem.Way.Refs)
if err != nil {
return diffError(err, "query first ref %v", elem.Way)
}
if cached {
err := osmCache.Ways.PutWay(elem.Way)
if err != nil {
return diffError(err, "put way %v", elem.Way)
}
wayIds[elem.Way.Id] = struct{}{}
}
} else if elem.Node != nil {
addNode := true
if geometryLimiter != nil {
if !geometryLimiter.IntersectsBuffer(g, elem.Node.Long, elem.Node.Lat) {
addNode = false
}
}
if addNode {
err := osmCache.Nodes.PutNode(elem.Node)
if err != nil {
return diffError(err, "put node %v", elem.Node)
}
err = osmCache.Coords.PutCoords([]element.Node{*elem.Node})
if err != nil {
return diffError(err, "put coord %v", elem.Node)
}
nodeIds[elem.Node.Id] = struct{}{}
}
}
}
}