2016-12-06 13:00:52 +03:00
|
|
|
package update
|
2016-11-25 15:53:58 +03:00
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"os"
|
|
|
|
"os/signal"
|
|
|
|
"syscall"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/omniscale/imposm3/cache"
|
|
|
|
"github.com/omniscale/imposm3/config"
|
|
|
|
"github.com/omniscale/imposm3/expire"
|
|
|
|
"github.com/omniscale/imposm3/geom/limit"
|
|
|
|
"github.com/omniscale/imposm3/logging"
|
2016-12-07 18:43:12 +03:00
|
|
|
"github.com/omniscale/imposm3/replication"
|
|
|
|
"github.com/omniscale/imposm3/update/state"
|
2016-11-25 15:53:58 +03:00
|
|
|
)
|
|
|
|
|
|
|
|
var logger = logging.NewLogger("")
|
|
|
|
|
2018-06-07 21:23:06 +03:00
|
|
|
func Run(baseOpts config.Base) {
|
|
|
|
if baseOpts.Quiet {
|
2016-11-25 15:53:58 +03:00
|
|
|
logging.SetQuiet(true)
|
|
|
|
}
|
|
|
|
|
|
|
|
var geometryLimiter *limit.Limiter
|
2018-06-07 21:23:06 +03:00
|
|
|
if baseOpts.LimitTo != "" {
|
2016-11-25 15:53:58 +03:00
|
|
|
var err error
|
|
|
|
step := logger.StartStep("Reading limitto geometries")
|
|
|
|
geometryLimiter, err = limit.NewFromGeoJSON(
|
2018-06-07 21:23:06 +03:00
|
|
|
baseOpts.LimitTo,
|
|
|
|
baseOpts.LimitToCacheBuffer,
|
|
|
|
baseOpts.Srid,
|
2016-11-25 15:53:58 +03:00
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
logger.Fatal(err)
|
|
|
|
}
|
|
|
|
logger.StopStep(step)
|
|
|
|
}
|
2016-12-07 18:43:12 +03:00
|
|
|
|
2018-06-07 21:23:06 +03:00
|
|
|
s, err := state.ParseLastState(baseOpts.DiffDir)
|
2016-11-25 15:53:58 +03:00
|
|
|
if err != nil {
|
2016-12-07 18:43:12 +03:00
|
|
|
log.Fatal("unable to read last.state.txt", err)
|
|
|
|
}
|
2018-06-07 21:23:06 +03:00
|
|
|
replicationUrl := baseOpts.ReplicationUrl
|
2016-12-07 18:43:12 +03:00
|
|
|
if replicationUrl == "" {
|
|
|
|
replicationUrl = s.Url
|
2016-11-25 15:53:58 +03:00
|
|
|
}
|
2016-12-07 18:43:12 +03:00
|
|
|
if replicationUrl == "" {
|
|
|
|
log.Fatal("no replicationUrl in last.state.txt " +
|
|
|
|
"or replication_url in -config file")
|
|
|
|
}
|
2018-01-22 21:44:56 +03:00
|
|
|
logger.Print("Replication URL: " + replicationUrl)
|
2018-06-07 21:23:06 +03:00
|
|
|
logger.Print("Replication interval: ", baseOpts.ReplicationInterval)
|
2016-12-07 18:43:12 +03:00
|
|
|
|
|
|
|
downloader := replication.NewDiffDownloader(
|
2018-06-07 21:23:06 +03:00
|
|
|
baseOpts.DiffDir,
|
2016-12-07 18:43:12 +03:00
|
|
|
replicationUrl,
|
|
|
|
s.Sequence,
|
2018-06-07 21:23:06 +03:00
|
|
|
baseOpts.ReplicationInterval,
|
2016-12-07 18:43:12 +03:00
|
|
|
)
|
|
|
|
nextSeq := downloader.Sequences()
|
2016-11-25 15:53:58 +03:00
|
|
|
|
2018-06-07 21:23:06 +03:00
|
|
|
osmCache := cache.NewOSMCache(baseOpts.CacheDir)
|
2016-11-25 15:53:58 +03:00
|
|
|
err = osmCache.Open()
|
|
|
|
if err != nil {
|
|
|
|
logger.Fatal("osm cache: ", err)
|
|
|
|
}
|
|
|
|
defer osmCache.Close()
|
|
|
|
|
2018-06-07 21:23:06 +03:00
|
|
|
diffCache := cache.NewDiffCache(baseOpts.CacheDir)
|
2016-11-25 15:53:58 +03:00
|
|
|
err = diffCache.Open()
|
|
|
|
if err != nil {
|
|
|
|
logger.Fatal("diff cache: ", err)
|
|
|
|
}
|
|
|
|
defer diffCache.Close()
|
|
|
|
|
|
|
|
sigc := make(chan os.Signal, 1)
|
2016-11-28 13:57:05 +03:00
|
|
|
signal.Notify(sigc, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP)
|
2016-11-25 15:53:58 +03:00
|
|
|
|
2016-11-28 14:11:15 +03:00
|
|
|
var tilelist *expire.TileList
|
|
|
|
var lastTlFlush = time.Now()
|
|
|
|
var tileExpireor expire.Expireor
|
2018-06-07 21:23:06 +03:00
|
|
|
if baseOpts.ExpireTilesDir != "" {
|
|
|
|
tilelist = expire.NewTileList(baseOpts.ExpireTilesZoom, baseOpts.ExpireTilesDir)
|
2016-11-28 14:11:15 +03:00
|
|
|
tileExpireor = tilelist
|
|
|
|
}
|
|
|
|
|
2016-11-25 15:53:58 +03:00
|
|
|
shutdown := func() {
|
2016-11-28 13:57:05 +03:00
|
|
|
logger.Print("Exiting. (SIGTERM/SIGINT/SIGHUB)")
|
2016-11-25 15:53:58 +03:00
|
|
|
logging.Shutdown()
|
|
|
|
osmCache.Close()
|
|
|
|
diffCache.Close()
|
2016-11-28 14:11:15 +03:00
|
|
|
if tilelist != nil {
|
|
|
|
err := tilelist.Flush()
|
|
|
|
if err != nil {
|
|
|
|
logger.Print("error writing tile expire list", err)
|
|
|
|
}
|
|
|
|
}
|
2016-11-25 15:53:58 +03:00
|
|
|
os.Exit(0)
|
|
|
|
}
|
|
|
|
|
|
|
|
exp := newExpBackoff(2*time.Second, 5*time.Minute)
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-sigc:
|
|
|
|
shutdown()
|
2016-12-07 18:43:12 +03:00
|
|
|
case seq := <-nextSeq:
|
|
|
|
fname := seq.Filename
|
|
|
|
seqId := seq.Sequence
|
|
|
|
seqTime := seq.Time
|
2016-11-25 15:53:58 +03:00
|
|
|
for {
|
2016-12-07 18:43:12 +03:00
|
|
|
p := logger.StartStep(fmt.Sprintf("importing #%d till %s", seqId, seqTime))
|
2016-11-25 15:53:58 +03:00
|
|
|
|
2018-06-07 21:23:06 +03:00
|
|
|
err := Update(baseOpts, fname, geometryLimiter, tileExpireor, osmCache, diffCache, false)
|
2016-11-25 15:53:58 +03:00
|
|
|
|
|
|
|
osmCache.Coords.Flush()
|
|
|
|
diffCache.Flush()
|
|
|
|
|
2016-11-28 14:11:15 +03:00
|
|
|
if err == nil && tilelist != nil && time.Since(lastTlFlush) > time.Second*30 {
|
|
|
|
// call at most once every 30 seconds to reduce files during the
|
|
|
|
// catch-up phase after the initial import
|
|
|
|
lastTlFlush = time.Now()
|
|
|
|
err := tilelist.Flush()
|
2016-11-25 15:53:58 +03:00
|
|
|
if err != nil {
|
|
|
|
logger.Print("error writing tile expire list", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
logger.StopStep(p)
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-sigc:
|
|
|
|
shutdown()
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
logger.Error(err)
|
|
|
|
logger.Print("retrying in ", exp.Duration())
|
|
|
|
exp.Wait()
|
|
|
|
} else {
|
|
|
|
exp.Reset()
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if os.Getenv("IMPOSM3_SINGLE_DIFF") != "" {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type expBackoff struct {
|
|
|
|
current time.Duration
|
|
|
|
min time.Duration
|
|
|
|
max time.Duration
|
|
|
|
}
|
|
|
|
|
|
|
|
func newExpBackoff(min, max time.Duration) *expBackoff {
|
|
|
|
return &expBackoff{min, min, max}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (eb *expBackoff) Duration() time.Duration {
|
|
|
|
return eb.current
|
|
|
|
}
|
|
|
|
|
|
|
|
func (eb *expBackoff) Wait() {
|
|
|
|
time.Sleep(eb.current)
|
|
|
|
eb.current = eb.current * 2
|
|
|
|
if eb.current > eb.max {
|
|
|
|
eb.current = eb.max
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (eb *expBackoff) Reset() {
|
|
|
|
eb.current = eb.min
|
|
|
|
}
|