imposm3/update/process.go

437 lines
11 KiB
Go
Raw Normal View History

2016-12-06 13:00:52 +03:00
package update
2013-06-11 12:33:10 +04:00
import (
"errors"
2013-06-11 12:33:10 +04:00
"fmt"
2016-12-06 14:46:55 +03:00
"io"
2015-01-05 11:56:39 +03:00
"path/filepath"
"runtime"
2014-08-04 17:19:35 +04:00
"github.com/omniscale/imposm3/cache"
"github.com/omniscale/imposm3/config"
"github.com/omniscale/imposm3/database"
_ "github.com/omniscale/imposm3/database/postgis"
"github.com/omniscale/imposm3/element"
"github.com/omniscale/imposm3/expire"
"github.com/omniscale/imposm3/geom/geos"
"github.com/omniscale/imposm3/geom/limit"
"github.com/omniscale/imposm3/logging"
"github.com/omniscale/imposm3/mapping"
2016-12-06 17:34:01 +03:00
"github.com/omniscale/imposm3/parser/diff"
2014-08-04 17:19:35 +04:00
"github.com/omniscale/imposm3/stats"
2016-12-06 13:00:52 +03:00
diffstate "github.com/omniscale/imposm3/update/state"
2014-08-04 17:19:35 +04:00
"github.com/omniscale/imposm3/writer"
2013-06-11 12:33:10 +04:00
)
2013-07-29 10:18:08 +04:00
var log = logging.NewLogger("diff")
2013-06-11 12:33:10 +04:00
2018-06-07 21:23:06 +03:00
func Diff(baseOpts config.Base, files []string) {
if baseOpts.Quiet {
2015-11-22 14:25:35 +03:00
logging.SetQuiet(true)
}
var geometryLimiter *limit.Limiter
2018-06-07 21:23:06 +03:00
if baseOpts.LimitTo != "" {
2015-11-22 14:25:35 +03:00
var err error
step := log.StartStep("Reading limitto geometries")
geometryLimiter, err = limit.NewFromGeoJSON(
2018-06-07 21:23:06 +03:00
baseOpts.LimitTo,
baseOpts.LimitToCacheBuffer,
baseOpts.Srid,
2015-11-22 14:25:35 +03:00
)
if err != nil {
log.Fatal(err)
}
log.StopStep(step)
}
2018-06-07 21:23:06 +03:00
osmCache := cache.NewOSMCache(baseOpts.CacheDir)
2015-11-22 14:25:35 +03:00
err := osmCache.Open()
if err != nil {
log.Fatal("osm cache: ", err)
}
defer osmCache.Close()
2018-06-07 21:23:06 +03:00
diffCache := cache.NewDiffCache(baseOpts.CacheDir)
2015-11-22 14:25:35 +03:00
err = diffCache.Open()
if err != nil {
log.Fatal("diff cache: ", err)
}
2016-11-23 16:43:04 +03:00
var exp expire.Expireor
2018-06-07 21:23:06 +03:00
if baseOpts.ExpireTilesDir != "" {
tileexpire := expire.NewTileList(baseOpts.ExpireTilesZoom, baseOpts.ExpireTilesDir)
2016-11-23 16:43:04 +03:00
exp = tileexpire
defer func() {
if err := tileexpire.Flush(); err != nil {
log.Error("error while writing tile expire file:", err)
}
}()
}
2018-06-07 21:23:06 +03:00
for _, oscFile := range files {
err := Update(baseOpts, oscFile, geometryLimiter, exp, osmCache, diffCache, false)
2015-11-22 14:25:35 +03:00
if err != nil {
osmCache.Close()
diffCache.Close()
log.Fatalf("unable to process %s: %v", oscFile, err)
}
}
// explicitly Close since os.Exit prevents defers
osmCache.Close()
diffCache.Close()
}
2018-06-07 21:23:06 +03:00
func Update(
baseOpts config.Base,
oscFile string,
geometryLimiter *limit.Limiter,
expireor expire.Expireor,
osmCache *cache.OSMCache,
diffCache *cache.DiffCache,
force bool,
) error {
2016-12-06 16:47:34 +03:00
state, err := diffstate.FromOscGz(oscFile)
2013-07-16 16:03:42 +04:00
if err != nil {
return err
2013-07-16 16:03:42 +04:00
}
2018-06-07 21:23:06 +03:00
lastState, err := diffstate.ParseLastState(baseOpts.DiffDir)
2013-07-16 16:03:42 +04:00
if err != nil {
2013-11-08 19:03:58 +04:00
log.Warn(err)
2013-07-16 16:03:42 +04:00
}
2013-07-26 17:59:03 +04:00
if lastState != nil && lastState.Sequence != 0 && state != nil && state.Sequence <= lastState.Sequence {
2013-07-16 16:03:42 +04:00
if !force {
log.Warn(state, " already imported")
return nil
2013-07-16 16:03:42 +04:00
}
}
2013-07-10 11:50:50 +04:00
defer log.StopStep(log.StartStep(fmt.Sprintf("Processing %s", oscFile)))
2016-12-06 17:28:41 +03:00
parser, err := diff.NewOscGzParser(oscFile)
2016-12-06 14:46:55 +03:00
if err != nil {
return err
}
2013-06-11 12:33:10 +04:00
2018-06-07 21:23:06 +03:00
tagmapping, err := mapping.FromFile(baseOpts.MappingFile)
2013-06-11 12:33:10 +04:00
if err != nil {
return err
2013-06-11 12:33:10 +04:00
}
2013-07-10 11:50:50 +04:00
dbConf := database.Config{
2018-06-07 21:23:06 +03:00
ConnectionParams: baseOpts.Connection,
Srid: baseOpts.Srid,
// we apply diff imports on the Production schema
2018-06-07 21:23:06 +03:00
ImportSchema: baseOpts.Schemas.Production,
ProductionSchema: baseOpts.Schemas.Production,
BackupSchema: baseOpts.Schemas.Backup,
2013-06-11 12:33:10 +04:00
}
db, err := database.Open(dbConf, &tagmapping.Conf)
2013-06-11 12:33:10 +04:00
if err != nil {
return errors.New("database open: " + err.Error())
2013-06-11 12:33:10 +04:00
}
2013-11-04 17:32:54 +04:00
defer db.Close()
2013-06-11 12:33:10 +04:00
err = db.Begin()
if err != nil {
return err
}
2013-06-11 12:33:10 +04:00
delDb, ok := db.(database.Deleter)
if !ok {
return errors.New("database not deletable")
2013-06-11 12:33:10 +04:00
}
genDb, ok := db.(database.Generalizer)
if ok {
genDb.EnableGeneralizeUpdates()
}
2013-07-29 10:18:08 +04:00
deleter := NewDeleter(
2013-06-11 12:33:10 +04:00
delDb,
osmCache,
diffCache,
tagmapping.Conf.SingleIdSpace,
tagmapping.PointMatcher,
tagmapping.LineStringMatcher,
tagmapping.PolygonMatcher,
2017-05-16 17:30:00 +03:00
tagmapping.RelationMatcher,
tagmapping.RelationMemberMatcher,
2013-06-11 12:33:10 +04:00
)
2016-11-22 18:44:27 +03:00
deleter.SetExpireor(expireor)
2013-06-11 12:33:10 +04:00
progress := stats.NewStatsReporter()
2013-06-11 12:33:10 +04:00
relTagFilter := tagmapping.RelationTagFilter()
wayTagFilter := tagmapping.WayTagFilter()
nodeTagFilter := tagmapping.NodeTagFilter()
relations := make(chan *element.Relation)
ways := make(chan *element.Way)
nodes := make(chan *element.Node)
relWriter := writer.NewRelationWriter(osmCache, diffCache,
tagmapping.Conf.SingleIdSpace,
relations,
db, progress,
tagmapping.PolygonMatcher,
tagmapping.RelationMatcher,
tagmapping.RelationMemberMatcher,
2018-06-07 21:23:06 +03:00
baseOpts.Srid)
2013-07-30 10:17:47 +04:00
relWriter.SetLimiter(geometryLimiter)
2013-11-08 19:03:58 +04:00
relWriter.SetExpireor(expireor)
2013-06-11 12:33:10 +04:00
relWriter.Start()
wayWriter := writer.NewWayWriter(osmCache, diffCache,
tagmapping.Conf.SingleIdSpace,
ways, db,
progress,
tagmapping.PolygonMatcher,
tagmapping.LineStringMatcher,
2018-06-07 21:23:06 +03:00
baseOpts.Srid)
2013-07-30 10:17:47 +04:00
wayWriter.SetLimiter(geometryLimiter)
2013-11-08 19:03:58 +04:00
wayWriter.SetExpireor(expireor)
2013-06-11 12:33:10 +04:00
wayWriter.Start()
nodeWriter := writer.NewNodeWriter(osmCache, nodes, db,
progress,
tagmapping.PointMatcher,
2018-06-07 21:23:06 +03:00
baseOpts.Srid)
2013-07-30 10:17:47 +04:00
nodeWriter.SetLimiter(geometryLimiter)
2013-11-08 19:03:58 +04:00
nodeWriter.SetExpireor(expireor)
2013-06-11 12:33:10 +04:00
nodeWriter.Start()
nodeIds := make(map[int64]struct{})
wayIds := make(map[int64]struct{})
relIds := make(map[int64]struct{})
2013-06-11 12:33:10 +04:00
2013-07-10 11:50:50 +04:00
step := log.StartStep("Parsing changes, updating cache and removing elements")
g := geos.NewGeos()
2013-06-11 12:33:10 +04:00
for {
2016-12-06 14:46:55 +03:00
elem, err := parser.Next()
if err == io.EOF {
break // finished
}
if err != nil {
return diffError(err, "")
}
if elem.Rel != nil {
relTagFilter.Filter(&elem.Rel.Tags)
progress.AddRelations(1)
} else if elem.Way != nil {
wayTagFilter.Filter(&elem.Way.Tags)
progress.AddWays(1)
} else if elem.Node != nil {
nodeTagFilter.Filter(&elem.Node.Tags)
if len(elem.Node.Tags) > 0 {
progress.AddNodes(1)
}
2016-12-06 14:46:55 +03:00
progress.AddCoords(1)
}
// always delete, to prevent duplicate elements from overlap of initial
// import and diff import
if err := deleter.Delete(elem); err != nil && err != cache.NotFound {
return diffError(err, "delete element %#v", elem)
}
if elem.Del {
// no new or modified elem -> remove from cache
2013-07-10 11:50:50 +04:00
if elem.Rel != nil {
2016-12-06 14:46:55 +03:00
if err := osmCache.Relations.DeleteRelation(elem.Rel.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete relation %v", elem.Rel)
}
2013-07-10 11:50:50 +04:00
} else if elem.Way != nil {
2016-12-06 14:46:55 +03:00
if err := osmCache.Ways.DeleteWay(elem.Way.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete way %v", elem.Way)
2013-07-10 11:50:50 +04:00
}
2016-12-06 14:46:55 +03:00
if err := diffCache.Ways.Delete(elem.Way.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete way references %v", elem.Way)
}
2016-12-06 14:46:55 +03:00
} else if elem.Node != nil {
if err := osmCache.Nodes.DeleteNode(elem.Node.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete node %v", elem.Node)
2013-06-11 12:33:10 +04:00
}
2016-12-06 14:46:55 +03:00
if err := osmCache.Coords.DeleteCoord(elem.Node.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete coord %v", elem.Node)
}
}
}
if elem.Mod && elem.Node != nil && elem.Node.Tags == nil {
// handle modifies where a node drops all tags
if err := osmCache.Nodes.DeleteNode(elem.Node.Id); err != nil && err != cache.NotFound {
return diffError(err, "delete node %v", elem.Node)
2013-06-11 12:33:10 +04:00
}
2016-12-06 14:46:55 +03:00
}
if elem.Add || elem.Mod {
if elem.Rel != nil {
// check if first member is cached to avoid caching
// unneeded relations (typical outside of our coverage)
cached, err := osmCache.FirstMemberIsCached(elem.Rel.Members)
if err != nil {
return diffError(err, "query first member %v", elem.Rel)
}
if cached {
err := osmCache.Relations.PutRelation(elem.Rel)
2015-01-05 11:56:39 +03:00
if err != nil {
2016-12-06 14:46:55 +03:00
return diffError(err, "put relation %v", elem.Rel)
2015-01-05 11:56:39 +03:00
}
2016-12-06 14:46:55 +03:00
relIds[elem.Rel.Id] = struct{}{}
}
} else if elem.Way != nil {
// check if first coord is cached to avoid caching
// unneeded ways (typical outside of our coverage)
cached, err := osmCache.Coords.FirstRefIsCached(elem.Way.Refs)
if err != nil {
return diffError(err, "query first ref %v", elem.Way)
}
if cached {
err := osmCache.Ways.PutWay(elem.Way)
2015-01-05 11:56:39 +03:00
if err != nil {
2016-12-06 14:46:55 +03:00
return diffError(err, "put way %v", elem.Way)
2015-01-05 11:56:39 +03:00
}
2016-12-06 14:46:55 +03:00
wayIds[elem.Way.Id] = struct{}{}
}
} else if elem.Node != nil {
addNode := true
if geometryLimiter != nil {
if !geometryLimiter.IntersectsBuffer(g, elem.Node.Long, elem.Node.Lat) {
addNode = false
}
2016-12-06 14:46:55 +03:00
}
if addNode {
err := osmCache.Nodes.PutNode(elem.Node)
if err != nil {
return diffError(err, "put node %v", elem.Node)
}
2016-12-06 14:46:55 +03:00
err = osmCache.Coords.PutCoords([]element.Node{*elem.Node})
if err != nil {
return diffError(err, "put coord %v", elem.Node)
}
2016-12-06 14:46:55 +03:00
nodeIds[elem.Node.Id] = struct{}{}
2013-06-11 12:33:10 +04:00
}
}
}
}
// mark member ways from deleted relations for re-insert
for id, _ := range deleter.DeletedMemberWays() {
wayIds[id] = struct{}{}
}
2013-07-10 11:50:50 +04:00
progress.Stop()
log.StopStep(step)
step = log.StartStep("Writing added/modified elements")
progress = stats.NewStatsReporter()
2013-06-11 12:33:10 +04:00
// mark depending ways for (re)insert
2013-06-11 12:33:10 +04:00
for nodeId, _ := range nodeIds {
dependers := diffCache.Coords.Get(nodeId)
for _, way := range dependers {
wayIds[way] = struct{}{}
2013-06-11 12:33:10 +04:00
}
}
// mark depending relations for (re)insert
for nodeId, _ := range nodeIds {
dependers := diffCache.CoordsRel.Get(nodeId)
for _, rel := range dependers {
relIds[rel] = struct{}{}
}
}
2013-06-11 12:33:10 +04:00
for wayId, _ := range wayIds {
dependers := diffCache.Ways.Get(wayId)
2013-07-02 10:49:44 +04:00
// mark depending relations for (re)insert
2013-06-11 12:33:10 +04:00
for _, rel := range dependers {
relIds[rel] = struct{}{}
2013-06-11 12:33:10 +04:00
}
}
for relId, _ := range relIds {
rel, err := osmCache.Relations.GetRelation(relId)
if err != nil {
if err != cache.NotFound {
2015-01-05 11:56:39 +03:00
return diffError(err, "could not get relation %v", relId)
}
2013-06-11 12:33:10 +04:00
continue
}
2013-07-02 10:49:44 +04:00
// insert new relation
progress.AddRelations(1)
relations <- rel
2013-06-11 12:33:10 +04:00
}
for wayId, _ := range wayIds {
way, err := osmCache.Ways.GetWay(wayId)
if err != nil {
if err != cache.NotFound {
2015-01-05 11:56:39 +03:00
return diffError(err, "could not get way %v", wayId)
}
continue
}
// insert new way
progress.AddWays(1)
ways <- way
}
for nodeId, _ := range nodeIds {
node, err := osmCache.Nodes.GetNode(nodeId)
if err != nil {
if err != cache.NotFound {
2015-01-05 11:56:39 +03:00
return diffError(err, "could not get node %v", nodeId)
}
// missing nodes can still be Coords
// no `continue` here
}
if node != nil {
// insert new node
progress.AddNodes(1)
nodes <- node
}
}
2013-06-11 12:33:10 +04:00
close(relations)
close(ways)
close(nodes)
2013-10-29 19:32:16 +04:00
nodeWriter.Wait()
relWriter.Wait()
wayWriter.Wait()
2013-06-11 12:33:10 +04:00
if genDb != nil {
genDb.GeneralizeUpdates()
}
err = db.End()
if err != nil {
return err
}
err = db.Close()
if err != nil {
return err
}
2013-07-10 11:50:50 +04:00
log.StopStep(step)
2013-07-12 16:57:06 +04:00
progress.Stop()
2013-07-16 16:03:42 +04:00
if state != nil {
if lastState != nil {
state.Url = lastState.Url
}
2018-06-07 21:23:06 +03:00
err = diffstate.WriteLastState(baseOpts.DiffDir, state)
if err != nil {
log.Warn(err) // warn only
}
}
return nil
}
2015-01-05 11:56:39 +03:00
func diffError(err error, msg string, args ...interface{}) error {
_, file, line, _ := runtime.Caller(1)
return fmt.Errorf("diff process error (%s:%d): %s %v",
filepath.Base(file), line, fmt.Sprintf(msg, args...), err)
}