diff --git a/cache/delta.go b/cache/delta.go index 35a6f4f..871593e 100644 --- a/cache/delta.go +++ b/cache/delta.go @@ -2,10 +2,11 @@ package cache import ( "container/list" - "github.com/omniscale/imposm3/cache/binary" - "github.com/omniscale/imposm3/element" "sort" "sync" + + "github.com/omniscale/imposm3/cache/binary" + "github.com/omniscale/imposm3/element" ) type byId []element.Node @@ -152,12 +153,15 @@ func (self *DeltaCoordsCache) SetLinearImport(v bool) { self.linearImport = v } -func (self *DeltaCoordsCache) Flush() { +func (self *DeltaCoordsCache) Flush() error { self.mu.Lock() defer self.mu.Unlock() for bunchId, bunch := range self.table { if bunch.needsWrite { - self.putCoordsPacked(bunchId, bunch.coords) + err := self.putCoordsPacked(bunchId, bunch.coords) + if err != nil { + return err + } } } @@ -165,10 +169,15 @@ func (self *DeltaCoordsCache) Flush() { for k, _ := range self.table { delete(self.table, k) } + return nil } -func (self *DeltaCoordsCache) Close() { - self.Flush() +func (self *DeltaCoordsCache) Close() error { + err := self.Flush() + if err != nil { + return err + } self.cache.Close() + return nil } func (self *DeltaCoordsCache) SetReadOnly(val bool) { @@ -270,7 +279,10 @@ func (self *DeltaCoordsCache) PutCoords(nodes []element.Node) error { if self.linearImport && int64(i) > self.bunchSize && int64(i) < int64(totalNodes)-self.bunchSize { // no need to handle concurrent updates to the same // bunch if we are not at the boundary of a self.bunchSize - self.putCoordsPacked(currentBunchId, nodes[start:i]) + err := self.putCoordsPacked(currentBunchId, nodes[start:i]) + if err != nil { + return err + } } else { bunch, err := self.getBunch(currentBunchId) if err != nil { @@ -389,8 +401,11 @@ func (self *DeltaCoordsCache) getBunch(bunchId int64) (*coordsBunch, error) { self.lruList.MoveToFront(bunch.elem) } bunch.Lock() - self.CheckCapacity() + err := self.CheckCapacity() self.mu.Unlock() + if err != nil { + return nil, err + } if needsGet { nodes, err := self.getCoordsPacked(bunchId, nodes) @@ -403,14 +418,16 @@ func (self *DeltaCoordsCache) getBunch(bunchId int64) (*coordsBunch, error) { return bunch, nil } -func (self *DeltaCoordsCache) CheckCapacity() { +func (self *DeltaCoordsCache) CheckCapacity() error { for int64(len(self.table)) > self.capacity { elem := self.lruList.Back() bunchId := self.lruList.Remove(elem).(int64) bunch := self.table[bunchId] bunch.elem = nil if bunch.needsWrite { - self.putCoordsPacked(bunchId, bunch.coords) + if err := self.putCoordsPacked(bunchId, bunch.coords); err != nil { + return err + } } select { case freeNodes <- bunch.coords: @@ -418,15 +435,19 @@ func (self *DeltaCoordsCache) CheckCapacity() { } delete(self.table, bunchId) } + return nil } -func (self *DeltaCoordsCache) FirstRefIsCached(refs []int64) bool { +func (self *DeltaCoordsCache) FirstRefIsCached(refs []int64) (bool, error) { if len(refs) <= 0 { - return false + return false, nil } _, err := self.GetCoord(refs[0]) - if err != nil { - return false + if err == NotFound { + return false, nil } - return true + if err != nil { + return false, err + } + return true, nil } diff --git a/cache/ways.go b/cache/ways.go index aba0447..b6cc385 100644 --- a/cache/ways.go +++ b/cache/ways.go @@ -113,17 +113,20 @@ func (self *WaysCache) FillMembers(members []element.Member) error { return nil } -func (self *WaysCache) FirstMemberIsCached(members []element.Member) bool { +func (self *WaysCache) FirstMemberIsCached(members []element.Member) (bool, error) { for _, m := range members { if m.Type == element.WAY { _, err := self.GetWay(m.Id) - if err != nil { - return false + if err == NotFound { + return false, nil } - return true + if err != nil { + return false, err + } + return true, nil } } - return false + return false, nil } type InsertedWaysCache struct { diff --git a/cmd/main.go b/cmd/main.go index 6cf61d4..3576a65 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,11 +2,12 @@ package cmd import ( "fmt" - "github.com/omniscale/imposm3/cache" golog "log" "os" "runtime" + "github.com/omniscale/imposm3/cache" + "github.com/omniscale/imposm3/cache/query" "github.com/omniscale/imposm3/config" "github.com/omniscale/imposm3/diff" @@ -88,7 +89,7 @@ func Main(usage func()) { if err != nil { osmCache.Close() diffCache.Close() - log.Fatal(err) + log.Fatalf("unable to process %s: %v", oscFile, err) } } // explicitly Close since os.Exit prevents defers diff --git a/diff/process.go b/diff/process.go index 3028538..b516ec6 100644 --- a/diff/process.go +++ b/diff/process.go @@ -4,6 +4,8 @@ import ( "errors" "fmt" "io" + "path/filepath" + "runtime" "github.com/omniscale/imposm3/cache" "github.com/omniscale/imposm3/config" @@ -154,31 +156,33 @@ For: } if elem.Del { if err := deleter.Delete(elem); err != nil { - return err + return diffError(err, "delete element", elem) } if !elem.Add { // no new or modified elem -> remove from cache if elem.Rel != nil { - if err := osmCache.Relations.DeleteRelation(elem.Rel.Id); err != nil { - return err + if err := osmCache.Relations.DeleteRelation(elem.Rel.Id); err != nil && err != cache.NotFound { + return diffError(err, "delete relation %v", elem.Rel) } } else if elem.Way != nil { - if err := osmCache.Ways.DeleteWay(elem.Way.Id); err != nil { - return err + if err := osmCache.Ways.DeleteWay(elem.Way.Id); err != nil && err != cache.NotFound { + return diffError(err, "delete way %v", elem.Way) + } + if err := diffCache.Ways.Delete(elem.Way.Id); err != nil && err != cache.NotFound { + return diffError(err, "delete way references %v", elem.Way) } - diffCache.Ways.Delete(elem.Way.Id) } else if elem.Node != nil { - if err := osmCache.Nodes.DeleteNode(elem.Node.Id); err != nil { - return err + if err := osmCache.Nodes.DeleteNode(elem.Node.Id); err != nil && err != cache.NotFound { + return diffError(err, "delete node %v", elem.Node) } - if err := osmCache.Coords.DeleteCoord(elem.Node.Id); err != nil { - return err + if err := osmCache.Coords.DeleteCoord(elem.Node.Id); err != nil && err != cache.NotFound { + return diffError(err, "delete coord %v", elem.Node) } } } else if elem.Node != nil && elem.Node.Tags == nil { // handle modifies where a node drops all tags - if err := osmCache.Nodes.DeleteNode(elem.Node.Id); err != nil { - return err + if err := osmCache.Nodes.DeleteNode(elem.Node.Id); err != nil && err != cache.NotFound { + return diffError(err, "delete node %v", elem.Node) } } } @@ -186,15 +190,29 @@ For: if elem.Rel != nil { // check if first member is cached to avoid caching // unneeded relations (typical outside of our coverage) - if osmCache.Ways.FirstMemberIsCached(elem.Rel.Members) { - osmCache.Relations.PutRelation(elem.Rel) + cached, err := osmCache.Ways.FirstMemberIsCached(elem.Rel.Members) + if err != nil { + return diffError(err, "query first member %v", elem.Rel) + } + if cached { + err := osmCache.Relations.PutRelation(elem.Rel) + if err != nil { + return diffError(err, "put relation %v", elem.Rel) + } relIds[elem.Rel.Id] = true } } else if elem.Way != nil { // check if first coord is cached to avoid caching // unneeded ways (typical outside of our coverage) - if osmCache.Coords.FirstRefIsCached(elem.Way.Refs) { - osmCache.Ways.PutWay(elem.Way) + cached, err := osmCache.Coords.FirstRefIsCached(elem.Way.Refs) + if err != nil { + return diffError(err, "query first ref %v", elem.Way) + } + if cached { + err := osmCache.Ways.PutWay(elem.Way) + if err != nil { + return diffError(err, "put way %v", elem.Way) + } wayIds[elem.Way.Id] = true } } else if elem.Node != nil { @@ -207,15 +225,21 @@ For: } } if addNode { - osmCache.Nodes.PutNode(elem.Node) - osmCache.Coords.PutCoords([]element.Node{*elem.Node}) + err := osmCache.Nodes.PutNode(elem.Node) + if err != nil { + return diffError(err, "put node %v", elem.Node) + } + err = osmCache.Coords.PutCoords([]element.Node{*elem.Node}) + if err != nil { + return diffError(err, "put coord %v", elem.Node) + } nodeIds[elem.Node.Id] = true } } } case err := <-errc: if err != io.EOF { - return err + return diffError(err, "") } break For } @@ -253,7 +277,7 @@ For: rel, err := osmCache.Relations.GetRelation(relId) if err != nil { if err != cache.NotFound { - log.Print(rel, err) + return diffError(err, "could not get relation %v", relId) } continue } @@ -266,7 +290,7 @@ For: way, err := osmCache.Ways.GetWay(wayId) if err != nil { if err != cache.NotFound { - log.Print(way, err) + return diffError(err, "could not get way %v", wayId) } continue } @@ -279,7 +303,7 @@ For: node, err := osmCache.Nodes.GetNode(nodeId) if err != nil { if err != cache.NotFound { - log.Print(node, err) + return diffError(err, "could not get node %v", nodeId) } // missing nodes can still be Coords // no `continue` here @@ -327,3 +351,9 @@ For: } return nil } + +func diffError(err error, msg string, args ...interface{}) error { + _, file, line, _ := runtime.Caller(1) + return fmt.Errorf("diff process error (%s:%d): %s %v", + filepath.Base(file), line, fmt.Sprintf(msg, args...), err) +} diff --git a/reader/reader.go b/reader/reader.go index 87fc381..7537db8 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -118,16 +118,23 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics, for i, _ := range ws { m.Filter(&ws[i].Tags) if withLimiter { - if !cache.Coords.FirstRefIsCached(ws[i].Refs) { + cached, err := cache.Coords.FirstRefIsCached(ws[i].Refs) + if err != nil { + log.Errorf("error while checking for cached refs of way %d: %v", ws[i].Id, err) + cached = true // don't skip in case of error + } + if cached { + hit += 1 + } else { ws[i].Id = osmcache.SKIP skip += 1 - - } else { - hit += 1 } } } - cache.Ways.PutWays(ws) + err := cache.Ways.PutWays(ws) + if err != nil { + log.Errorf("error while caching ways: %v", err) + } progress.AddWays(len(ws)) } @@ -149,17 +156,23 @@ func ReadPbf(cache *osmcache.OSMCache, progress *stats.Statistics, numWithTags += 1 } if withLimiter { - if !cache.Ways.FirstMemberIsCached(rels[i].Members) { - skip += 1 - - rels[i].Id = osmcache.SKIP - } else { + cached, err := cache.Ways.FirstMemberIsCached(rels[i].Members) + if err != nil { + log.Errorf("error while checking for cached members of relation %d: %v", rels[i].Id, err) + cached = true // don't skip in case of error + } + if cached { hit += 1 - + } else { + skip += 1 + rels[i].Id = osmcache.SKIP } } } - cache.Relations.PutRelations(rels) + err := cache.Relations.PutRelations(rels) + if err != nil { + log.Errorf("error while caching relation: %v", err) + } progress.AddRelations(numWithTags) }