imposm3/diff/process.go

336 lines
7.4 KiB
Go
Raw Normal View History

2013-07-29 10:18:08 +04:00
package diff
2013-06-11 12:33:10 +04:00
import (
"fmt"
2013-08-29 17:44:15 +04:00
"imposm3/cache"
"imposm3/config"
"imposm3/database"
_ "imposm3/database/postgis"
"imposm3/diff/parser"
diffstate "imposm3/diff/state"
"imposm3/element"
"imposm3/expire"
"imposm3/geom/geos"
"imposm3/geom/limit"
"imposm3/logging"
"imposm3/mapping"
"imposm3/stats"
"imposm3/writer"
2013-06-11 12:33:10 +04:00
"io"
)
2013-07-29 10:18:08 +04:00
var log = logging.NewLogger("diff")
2013-06-11 12:33:10 +04:00
2013-07-30 10:17:47 +04:00
func Update(oscFile string, geometryLimiter *limit.Limiter, force bool) {
state, err := diffstate.ParseFromOsc(oscFile)
2013-07-16 16:03:42 +04:00
if err != nil {
log.Fatal(err)
}
lastState, err := diffstate.ParseLastState(config.BaseOptions.CacheDir)
2013-07-16 16:03:42 +04:00
if err != nil {
log.Fatal(err)
}
2013-07-26 17:59:03 +04:00
if lastState != nil && lastState.Sequence != 0 && state != nil && state.Sequence <= lastState.Sequence {
2013-07-16 16:03:42 +04:00
if !force {
log.Warn(state, " already imported")
return
}
}
2013-07-10 11:50:50 +04:00
defer log.StopStep(log.StartStep(fmt.Sprintf("Processing %s", oscFile)))
elems, errc := parser.Parse(oscFile)
2013-06-11 12:33:10 +04:00
osmCache := cache.NewOSMCache(config.BaseOptions.CacheDir)
2013-07-16 16:03:42 +04:00
err = osmCache.Open()
2013-06-11 12:33:10 +04:00
if err != nil {
2013-07-10 11:50:50 +04:00
log.Fatal("osm cache: ", err)
2013-06-11 12:33:10 +04:00
}
diffCache := cache.NewDiffCache(config.BaseOptions.CacheDir)
2013-06-11 12:33:10 +04:00
err = diffCache.Open()
if err != nil {
2013-07-10 11:50:50 +04:00
log.Fatal("diff cache: ", err)
2013-06-11 12:33:10 +04:00
}
tagmapping, err := mapping.NewMapping(config.BaseOptions.MappingFile)
2013-06-11 12:33:10 +04:00
if err != nil {
log.Fatal(err)
}
2013-07-10 11:50:50 +04:00
dbConf := database.Config{
ConnectionParams: config.BaseOptions.Connection,
Srid: config.BaseOptions.Srid,
2013-06-11 12:33:10 +04:00
}
2013-07-10 11:50:50 +04:00
db, err := database.Open(dbConf, tagmapping)
2013-06-11 12:33:10 +04:00
if err != nil {
2013-07-10 11:50:50 +04:00
log.Fatal("database open: ", err)
2013-06-11 12:33:10 +04:00
}
2013-11-04 17:32:54 +04:00
defer db.Close()
2013-06-11 12:33:10 +04:00
err = db.Begin()
if err != nil {
log.Fatal(err)
}
2013-06-11 12:33:10 +04:00
delDb, ok := db.(database.Deleter)
if !ok {
log.Fatal("database not deletable")
}
genDb, ok := db.(database.Generalizer)
if ok {
genDb.EnableGeneralizeUpdates()
}
2013-07-29 10:18:08 +04:00
deleter := NewDeleter(
2013-06-11 12:33:10 +04:00
delDb,
osmCache,
diffCache,
tagmapping.PointMatcher(),
tagmapping.LineStringMatcher(),
tagmapping.PolygonMatcher(),
)
progress := stats.NewStatsReporter()
2013-06-11 12:33:10 +04:00
2013-07-12 16:57:06 +04:00
expiredTiles := expire.NewTiles(14)
2013-06-11 12:33:10 +04:00
relTagFilter := tagmapping.RelationTagFilter()
wayTagFilter := tagmapping.WayTagFilter()
nodeTagFilter := tagmapping.NodeTagFilter()
relations := make(chan *element.Relation)
ways := make(chan *element.Way)
nodes := make(chan *element.Node)
relWriter := writer.NewRelationWriter(osmCache, diffCache, relations,
2013-10-28 14:37:58 +04:00
db, progress, config.BaseOptions.Srid)
2013-07-30 10:17:47 +04:00
relWriter.SetLimiter(geometryLimiter)
2013-07-12 16:57:06 +04:00
relWriter.SetExpireTiles(expiredTiles)
2013-06-11 12:33:10 +04:00
relWriter.Start()
wayWriter := writer.NewWayWriter(osmCache, diffCache, ways, db,
2013-10-28 14:37:58 +04:00
progress, config.BaseOptions.Srid)
2013-07-30 10:17:47 +04:00
wayWriter.SetLimiter(geometryLimiter)
2013-07-12 16:57:06 +04:00
wayWriter.SetExpireTiles(expiredTiles)
2013-06-11 12:33:10 +04:00
wayWriter.Start()
nodeWriter := writer.NewNodeWriter(osmCache, nodes, db,
2013-10-28 14:37:58 +04:00
progress, config.BaseOptions.Srid)
2013-07-30 10:17:47 +04:00
nodeWriter.SetLimiter(geometryLimiter)
2013-06-11 12:33:10 +04:00
nodeWriter.Start()
nodeIds := make(map[int64]bool)
wayIds := make(map[int64]bool)
relIds := make(map[int64]bool)
2013-07-10 11:50:50 +04:00
step := log.StartStep("Parsing changes, updating cache and removing elements")
g := geos.NewGeos()
2013-06-11 12:33:10 +04:00
For:
for {
select {
case elem := <-elems:
2013-07-10 11:50:50 +04:00
if elem.Rel != nil {
relTagFilter.Filter(&elem.Rel.Tags)
progress.AddRelations(1)
} else if elem.Way != nil {
wayTagFilter.Filter(&elem.Way.Tags)
progress.AddWays(1)
} else if elem.Node != nil {
nodeTagFilter.Filter(&elem.Node.Tags)
if len(elem.Node.Tags) > 0 {
progress.AddNodes(1)
}
progress.AddCoords(1)
}
2013-06-11 12:33:10 +04:00
if elem.Del {
deleter.Delete(elem)
if !elem.Add {
if elem.Rel != nil {
if err := osmCache.Relations.DeleteRelation(elem.Rel.Id); err != nil {
log.Fatal(err)
}
} else if elem.Way != nil {
if err := osmCache.Ways.DeleteWay(elem.Way.Id); err != nil {
log.Fatal(err)
}
diffCache.Ways.Delete(elem.Way.Id)
} else if elem.Node != nil {
if err := osmCache.Nodes.DeleteNode(elem.Node.Id); err != nil {
log.Fatal(err)
}
if err := osmCache.Coords.DeleteCoord(elem.Node.Id); err != nil {
log.Fatal(err)
}
}
2013-06-11 12:33:10 +04:00
}
}
if elem.Add {
if elem.Rel != nil {
// check if first member is cached to avoid caching
// unneeded relations (typical outside of our coverage)
if memberIsCached(elem.Rel.Members, osmCache.Ways) {
osmCache.Relations.PutRelation(elem.Rel)
relIds[elem.Rel.Id] = true
}
2013-06-11 12:33:10 +04:00
} else if elem.Way != nil {
// check if first coord is cached to avoid caching
// unneeded ways (typical outside of our coverage)
if coordIsCached(elem.Way.Refs, osmCache.Coords) {
osmCache.Ways.PutWay(elem.Way)
wayIds[elem.Way.Id] = true
}
2013-06-11 12:33:10 +04:00
} else if elem.Node != nil {
if geometryLimiter == nil || geometryLimiter.IntersectsBuffer(g, elem.Node.Long, elem.Node.Lat) {
osmCache.Nodes.PutNode(elem.Node)
osmCache.Coords.PutCoords([]element.Node{*elem.Node})
nodeIds[elem.Node.Id] = true
}
2013-06-11 12:33:10 +04:00
}
}
case err := <-errc:
2013-07-10 11:50:50 +04:00
if err != io.EOF {
log.Fatal(err)
2013-06-11 12:33:10 +04:00
}
break For
}
}
// mark member ways from deleted relations for re-insert
for id, _ := range deleter.DeletedMemberWays() {
wayIds[id] = true
}
2013-07-10 11:50:50 +04:00
progress.Stop()
log.StopStep(step)
step = log.StartStep("Writing added/modified elements")
progress = stats.NewStatsReporter()
2013-06-11 12:33:10 +04:00
// mark depending ways for (re)insert
2013-06-11 12:33:10 +04:00
for nodeId, _ := range nodeIds {
dependers := diffCache.Coords.Get(nodeId)
for _, way := range dependers {
wayIds[way] = true
}
}
// mark depending relations for (re)insert
2013-06-11 12:33:10 +04:00
for wayId, _ := range wayIds {
dependers := diffCache.Ways.Get(wayId)
2013-07-02 10:49:44 +04:00
// mark depending relations for (re)insert
2013-06-11 12:33:10 +04:00
for _, rel := range dependers {
relIds[rel] = true
}
}
for relId, _ := range relIds {
rel, err := osmCache.Relations.GetRelation(relId)
if err != nil {
if err != cache.NotFound {
2013-07-10 11:50:50 +04:00
log.Print(rel, err)
}
2013-06-11 12:33:10 +04:00
continue
}
2013-07-02 10:49:44 +04:00
// insert new relation
progress.AddRelations(1)
2013-06-11 12:33:10 +04:00
relations <- rel
}
for wayId, _ := range wayIds {
way, err := osmCache.Ways.GetWay(wayId)
if err != nil {
if err != cache.NotFound {
log.Print(way, err)
}
continue
}
// insert new way
progress.AddWays(1)
ways <- way
}
for nodeId, _ := range nodeIds {
node, err := osmCache.Nodes.GetNode(nodeId)
if err != nil {
if err != cache.NotFound {
log.Print(node, err)
}
// missing nodes can still be Coords
// no `continue` here
}
if node != nil {
// insert new node
progress.AddNodes(1)
nodes <- node
}
}
2013-06-11 12:33:10 +04:00
close(relations)
close(ways)
close(nodes)
2013-10-29 19:32:16 +04:00
nodeWriter.Wait()
relWriter.Wait()
wayWriter.Wait()
2013-06-11 12:33:10 +04:00
if genDb != nil {
genDb.GeneralizeUpdates()
}
err = db.End()
if err != nil {
log.Fatal(err)
}
err = db.Close()
if err != nil {
log.Fatal(err)
}
2013-06-11 12:33:10 +04:00
osmCache.Close()
diffCache.Close()
2013-07-10 11:50:50 +04:00
log.StopStep(step)
2013-07-12 16:57:06 +04:00
step = log.StartStep("Updating expired tiles db")
expire.WriteTileExpireDb(
expiredTiles.SortedTiles(),
"/tmp/expire_tiles.db",
)
log.StopStep(step)
progress.Stop()
2013-07-16 16:03:42 +04:00
if state != nil {
err = diffstate.WriteLastState(config.BaseOptions.CacheDir, state)
if err != nil {
log.Warn(err) // warn only
}
}
}
func memberIsCached(members []element.Member, wayCache *cache.WaysCache) bool {
for _, m := range members {
if m.Type == element.WAY {
_, err := wayCache.GetWay(m.Id)
if err != nil {
return false
}
return true
}
}
return false
}
func coordIsCached(refs []int64, coordCache *cache.DeltaCoordsCache) bool {
if len(refs) <= 0 {
return false
}
_, err := coordCache.GetCoord(refs[0])
2013-07-16 16:03:42 +04:00
if err != nil {
return false
2013-07-16 16:03:42 +04:00
}
return true
2013-06-11 12:33:10 +04:00
}