imposm3/update/run.go

180 lines
3.9 KiB
Go
Raw Permalink Normal View History

2016-12-06 13:00:52 +03:00
package update
2016-11-25 15:53:58 +03:00
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/omniscale/imposm3/cache"
"github.com/omniscale/imposm3/config"
"github.com/omniscale/imposm3/expire"
"github.com/omniscale/imposm3/geom/limit"
"github.com/omniscale/imposm3/logging"
"github.com/omniscale/imposm3/replication"
"github.com/omniscale/imposm3/update/state"
2016-11-25 15:53:58 +03:00
)
var logger = logging.NewLogger("")
2018-06-07 21:23:06 +03:00
func Run(baseOpts config.Base) {
if baseOpts.Quiet {
2016-11-25 15:53:58 +03:00
logging.SetQuiet(true)
}
var geometryLimiter *limit.Limiter
2018-06-07 21:23:06 +03:00
if baseOpts.LimitTo != "" {
2016-11-25 15:53:58 +03:00
var err error
step := logger.StartStep("Reading limitto geometries")
geometryLimiter, err = limit.NewFromGeoJSON(
2018-06-07 21:23:06 +03:00
baseOpts.LimitTo,
baseOpts.LimitToCacheBuffer,
baseOpts.Srid,
2016-11-25 15:53:58 +03:00
)
if err != nil {
logger.Fatal(err)
}
logger.StopStep(step)
}
2018-06-07 21:23:06 +03:00
s, err := state.ParseLastState(baseOpts.DiffDir)
2016-11-25 15:53:58 +03:00
if err != nil {
log.Fatal("unable to read last.state.txt", err)
}
2018-06-07 21:23:06 +03:00
replicationUrl := baseOpts.ReplicationUrl
if replicationUrl == "" {
replicationUrl = s.Url
2016-11-25 15:53:58 +03:00
}
if replicationUrl == "" {
log.Fatal("no replicationUrl in last.state.txt " +
"or replication_url in -config file")
}
2018-01-22 21:44:56 +03:00
logger.Print("Replication URL: " + replicationUrl)
2018-06-07 21:23:06 +03:00
logger.Print("Replication interval: ", baseOpts.ReplicationInterval)
downloader := replication.NewDiffDownloader(
2018-06-07 21:23:06 +03:00
baseOpts.DiffDir,
replicationUrl,
s.Sequence,
2018-06-07 21:23:06 +03:00
baseOpts.ReplicationInterval,
)
nextSeq := downloader.Sequences()
2016-11-25 15:53:58 +03:00
2018-06-07 21:23:06 +03:00
osmCache := cache.NewOSMCache(baseOpts.CacheDir)
2016-11-25 15:53:58 +03:00
err = osmCache.Open()
if err != nil {
logger.Fatal("osm cache: ", err)
}
defer osmCache.Close()
2018-06-07 21:23:06 +03:00
diffCache := cache.NewDiffCache(baseOpts.CacheDir)
2016-11-25 15:53:58 +03:00
err = diffCache.Open()
if err != nil {
logger.Fatal("diff cache: ", err)
}
defer diffCache.Close()
sigc := make(chan os.Signal, 1)
2016-11-28 13:57:05 +03:00
signal.Notify(sigc, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP)
2016-11-25 15:53:58 +03:00
var tilelist *expire.TileList
var lastTlFlush = time.Now()
var tileExpireor expire.Expireor
2018-06-07 21:23:06 +03:00
if baseOpts.ExpireTilesDir != "" {
tilelist = expire.NewTileList(baseOpts.ExpireTilesZoom, baseOpts.ExpireTilesDir)
tileExpireor = tilelist
}
2016-11-25 15:53:58 +03:00
shutdown := func() {
2016-11-28 13:57:05 +03:00
logger.Print("Exiting. (SIGTERM/SIGINT/SIGHUB)")
2016-11-25 15:53:58 +03:00
logging.Shutdown()
osmCache.Close()
diffCache.Close()
if tilelist != nil {
err := tilelist.Flush()
if err != nil {
logger.Print("error writing tile expire list", err)
}
}
2016-11-25 15:53:58 +03:00
os.Exit(0)
}
exp := newExpBackoff(2*time.Second, 5*time.Minute)
for {
select {
case <-sigc:
shutdown()
case seq := <-nextSeq:
fname := seq.Filename
seqId := seq.Sequence
seqTime := seq.Time
2016-11-25 15:53:58 +03:00
for {
p := logger.StartStep(fmt.Sprintf("importing #%d till %s", seqId, seqTime))
2016-11-25 15:53:58 +03:00
2018-06-07 21:23:06 +03:00
err := Update(baseOpts, fname, geometryLimiter, tileExpireor, osmCache, diffCache, false)
2016-11-25 15:53:58 +03:00
osmCache.Coords.Flush()
diffCache.Flush()
if err == nil && tilelist != nil && time.Since(lastTlFlush) > time.Second*30 {
// call at most once every 30 seconds to reduce files during the
// catch-up phase after the initial import
lastTlFlush = time.Now()
err := tilelist.Flush()
2016-11-25 15:53:58 +03:00
if err != nil {
logger.Print("error writing tile expire list", err)
}
}
logger.StopStep(p)
select {
case <-sigc:
shutdown()
default:
}
if err != nil {
logger.Error(err)
logger.Print("retrying in ", exp.Duration())
exp.Wait()
} else {
exp.Reset()
break
}
}
if os.Getenv("IMPOSM3_SINGLE_DIFF") != "" {
return
}
}
}
}
type expBackoff struct {
current time.Duration
min time.Duration
max time.Duration
}
func newExpBackoff(min, max time.Duration) *expBackoff {
return &expBackoff{min, min, max}
}
func (eb *expBackoff) Duration() time.Duration {
return eb.current
}
func (eb *expBackoff) Wait() {
time.Sleep(eb.current)
eb.current = eb.current * 2
if eb.current > eb.max {
eb.current = eb.max
}
}
func (eb *expBackoff) Reset() {
eb.current = eb.min
}