Merge pull request #158 from yohanboniface/replication
Improvements for hourly/daily replicationmaster
commit
7b725dff6c
|
@ -126,7 +126,7 @@ func (o *_BaseOptions) updateFromConfig() error {
|
||||||
o.ExpireTilesZoom = 14
|
o.ExpireTilesZoom = 14
|
||||||
}
|
}
|
||||||
|
|
||||||
if conf.ReplicationInterval.Duration != 0 && o.ReplicationInterval != time.Minute {
|
if conf.ReplicationInterval.Duration != 0 && o.ReplicationInterval == time.Minute {
|
||||||
o.ReplicationInterval = conf.ReplicationInterval.Duration
|
o.ReplicationInterval = conf.ReplicationInterval.Duration
|
||||||
}
|
}
|
||||||
if o.ReplicationInterval < time.Minute {
|
if o.ReplicationInterval < time.Minute {
|
||||||
|
@ -143,7 +143,7 @@ func (o *_BaseOptions) updateFromConfig() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if conf.DiffStateBefore.Duration != 0 && o.DiffStateBefore == 2*time.Hour {
|
if conf.DiffStateBefore.Duration != 0 && o.DiffStateBefore == 0 {
|
||||||
o.DiffStateBefore = conf.DiffStateBefore.Duration
|
o.DiffStateBefore = conf.DiffStateBefore.Duration
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -226,7 +226,7 @@ func init() {
|
||||||
ImportFlags.BoolVar(&ImportOptions.DeployProduction, "deployproduction", false, "deploy production")
|
ImportFlags.BoolVar(&ImportOptions.DeployProduction, "deployproduction", false, "deploy production")
|
||||||
ImportFlags.BoolVar(&ImportOptions.RevertDeploy, "revertdeploy", false, "revert deploy to production")
|
ImportFlags.BoolVar(&ImportOptions.RevertDeploy, "revertdeploy", false, "revert deploy to production")
|
||||||
ImportFlags.BoolVar(&ImportOptions.RemoveBackup, "removebackup", false, "remove backups from deploy")
|
ImportFlags.BoolVar(&ImportOptions.RemoveBackup, "removebackup", false, "remove backups from deploy")
|
||||||
ImportFlags.DurationVar(&BaseOptions.DiffStateBefore, "diff-state-before", 2*time.Hour, "set initial diff sequence before")
|
ImportFlags.DurationVar(&BaseOptions.DiffStateBefore, "diff-state-before", 0, "set initial diff sequence before")
|
||||||
|
|
||||||
DiffFlags.StringVar(&BaseOptions.ExpireTilesDir, "expiretiles-dir", "", "write expire tiles into dir")
|
DiffFlags.StringVar(&BaseOptions.ExpireTilesDir, "expiretiles-dir", "", "write expire tiles into dir")
|
||||||
DiffFlags.IntVar(&BaseOptions.ExpireTilesZoom, "expiretiles-zoom", 14, "write expire tiles in this zoom level")
|
DiffFlags.IntVar(&BaseOptions.ExpireTilesZoom, "expiretiles-zoom", 14, "write expire tiles in this zoom level")
|
||||||
|
|
|
@ -210,7 +210,9 @@ To start the update process::
|
||||||
|
|
||||||
You can stop processing new diff files SIGTERM (``crtl-c``), SIGKILL or SIGHUP. You should create systemd/upstart/init.d service for ``imposm3 run`` to always run in background.
|
You can stop processing new diff files SIGTERM (``crtl-c``), SIGKILL or SIGHUP. You should create systemd/upstart/init.d service for ``imposm3 run`` to always run in background.
|
||||||
|
|
||||||
You can change to hourly updates by adding `replication_url: "https://planet.openstreetmap.org/replication/hour/"` and `replication_interval: "1h"` to the Imposm configuration.
|
You can change to hourly updates by adding `replication_url: "https://planet.openstreetmap.org/replication/hour/"` and `replication_interval: "1h"` to the Imposm configuration. Same for daily updates (works also for Geofabrik updates): `replication_url: "https://planet.openstreetmap.org/replication/day/"` and `replication_interval: "24h"`.
|
||||||
|
|
||||||
|
At import time, Imposm compute the first diff sequence number by comparing the PBF input file timestamp and the latest state available in the remote server. Depending on the PBF generation process, this sequence number may not be correct, you can force Imposm to start with an earlier sequence number by adding a `diff_state_before` duration in your conf file. For example, `diff_state_before: 4h` will start with an initial sequence number generated 4 hours before the PBF generation time.
|
||||||
|
|
||||||
|
|
||||||
One-time update
|
One-time update
|
||||||
|
|
|
@ -125,7 +125,7 @@ func Import() {
|
||||||
osmCache.Close()
|
osmCache.Close()
|
||||||
log.StopStep(step)
|
log.StopStep(step)
|
||||||
if config.ImportOptions.Diff {
|
if config.ImportOptions.Diff {
|
||||||
diffstate, err := state.FromPbf(config.ImportOptions.Read, config.BaseOptions.DiffStateBefore)
|
diffstate, err := state.FromPbf(config.ImportOptions.Read, config.BaseOptions.DiffStateBefore, config.BaseOptions.ReplicationUrl, config.BaseOptions.ReplicationInterval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print("error parsing diff state form PBF", err)
|
log.Print("error parsing diff state form PBF", err)
|
||||||
} else if diffstate != nil {
|
} else if diffstate != nil {
|
||||||
|
|
|
@ -19,7 +19,13 @@ import (
|
||||||
|
|
||||||
var log = logging.NewLogger("replication")
|
var log = logging.NewLogger("replication")
|
||||||
|
|
||||||
var NotAvailable = errors.New("file not available")
|
type NotAvailable struct {
|
||||||
|
url string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *NotAvailable) Error() string {
|
||||||
|
return fmt.Sprintf("File not available: %s", e.url)
|
||||||
|
}
|
||||||
|
|
||||||
type Sequence struct {
|
type Sequence struct {
|
||||||
Filename string
|
Filename string
|
||||||
|
@ -71,13 +77,23 @@ func newDownloader(dest, url string, seq int, interval time.Duration) *downloade
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var naWaittime time.Duration
|
||||||
|
switch interval {
|
||||||
|
case 24 * time.Hour:
|
||||||
|
naWaittime = 5 * time.Minute
|
||||||
|
case time.Hour:
|
||||||
|
naWaittime = 60 * time.Second
|
||||||
|
default:
|
||||||
|
naWaittime = 10 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
dl := &downloader{
|
dl := &downloader{
|
||||||
baseUrl: url,
|
baseUrl: url,
|
||||||
dest: dest,
|
dest: dest,
|
||||||
lastSequence: seq,
|
lastSequence: seq,
|
||||||
interval: interval,
|
interval: interval,
|
||||||
errWaittime: 60 * time.Second,
|
errWaittime: 60 * time.Second,
|
||||||
naWaittime: 10 * time.Second,
|
naWaittime: naWaittime,
|
||||||
sequences: make(chan Sequence, 1),
|
sequences: make(chan Sequence, 1),
|
||||||
client: client,
|
client: client,
|
||||||
}
|
}
|
||||||
|
@ -92,6 +108,7 @@ func (d *downloader) Sequences() <-chan Sequence {
|
||||||
func (d *downloader) download(seq int, ext string) error {
|
func (d *downloader) download(seq int, ext string) error {
|
||||||
dest := path.Join(d.dest, seqPath(seq)+ext)
|
dest := path.Join(d.dest, seqPath(seq)+ext)
|
||||||
url := d.baseUrl + seqPath(seq) + ext
|
url := d.baseUrl + seqPath(seq) + ext
|
||||||
|
log.Print("Downloading diff file from ", url)
|
||||||
|
|
||||||
if _, err := os.Stat(dest); err == nil {
|
if _, err := os.Stat(dest); err == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -121,11 +138,11 @@ func (d *downloader) download(seq int, ext string) error {
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode == 404 {
|
if resp.StatusCode == 404 {
|
||||||
return NotAvailable
|
return &NotAvailable{url}
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp.StatusCode != 200 {
|
if resp.StatusCode != 200 {
|
||||||
return errors.New(fmt.Sprintf("invalid repsonse: %v", resp))
|
return errors.New(fmt.Sprintf("invalid response: %v", resp))
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = io.Copy(out, resp.Body)
|
_, err = io.Copy(out, resp.Body)
|
||||||
|
@ -148,7 +165,8 @@ func (d *downloader) downloadTillSuccess(seq int, ext string) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err == NotAvailable {
|
if err, ok := err.(*NotAvailable); ok {
|
||||||
|
log.Print(err)
|
||||||
time.Sleep(d.naWaittime)
|
time.Sleep(d.naWaittime)
|
||||||
} else {
|
} else {
|
||||||
log.Warn(err)
|
log.Warn(err)
|
||||||
|
@ -161,17 +179,19 @@ func (d *downloader) fetchNextLoop() {
|
||||||
stateFile := path.Join(d.dest, seqPath(d.lastSequence)+d.stateExt)
|
stateFile := path.Join(d.dest, seqPath(d.lastSequence)+d.stateExt)
|
||||||
lastTime, err := d.stateTime(stateFile)
|
lastTime, err := d.stateTime(stateFile)
|
||||||
for {
|
for {
|
||||||
|
nextSeq := d.lastSequence + 1
|
||||||
|
log.Print("Processing sequence ", nextSeq)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
nextDiffTime := lastTime.Add(d.interval)
|
nextDiffTime := lastTime.Add(d.interval)
|
||||||
if nextDiffTime.After(time.Now()) {
|
if nextDiffTime.After(time.Now()) {
|
||||||
// we catched up and the next diff file is in the future.
|
// we catched up and the next diff file is in the future.
|
||||||
// wait till last diff time + interval, before fetching next
|
// wait till last diff time + interval, before fetching next
|
||||||
nextDiffTime = lastTime.Add(d.interval + 2*time.Second /* allow small time diff between server*/)
|
nextDiffTime = lastTime.Add(d.interval + 2*time.Second /* allow small time diff between servers */)
|
||||||
waitFor := nextDiffTime.Sub(time.Now())
|
waitFor := nextDiffTime.Sub(time.Now())
|
||||||
|
log.Print("Next process in ", waitFor)
|
||||||
time.Sleep(waitFor)
|
time.Sleep(waitFor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
nextSeq := d.lastSequence + 1
|
|
||||||
// download will retry until they succeed
|
// download will retry until they succeed
|
||||||
d.downloadTillSuccess(nextSeq, d.stateExt)
|
d.downloadTillSuccess(nextSeq, d.stateExt)
|
||||||
d.downloadTillSuccess(nextSeq, d.fileExt)
|
d.downloadTillSuccess(nextSeq, d.fileExt)
|
||||||
|
|
|
@ -50,6 +50,8 @@ func Run() {
|
||||||
log.Fatal("no replicationUrl in last.state.txt " +
|
log.Fatal("no replicationUrl in last.state.txt " +
|
||||||
"or replication_url in -config file")
|
"or replication_url in -config file")
|
||||||
}
|
}
|
||||||
|
logger.Print("Replication URL: " + replicationUrl)
|
||||||
|
logger.Print("Replication interval: ", config.BaseOptions.ReplicationInterval)
|
||||||
|
|
||||||
downloader := replication.NewDiffDownloader(
|
downloader := replication.NewDiffDownloader(
|
||||||
config.BaseOptions.DiffDir,
|
config.BaseOptions.DiffDir,
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
@ -76,7 +77,7 @@ func FromOscGz(oscFile string) (*DiffState, error) {
|
||||||
return ParseFile(stateFile)
|
return ParseFile(stateFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
func FromPbf(filename string, before time.Duration) (*DiffState, error) {
|
func FromPbf(filename string, before time.Duration, replicationUrl string, replicationInterval time.Duration) (*DiffState, error) {
|
||||||
pbfFile, err := pbf.NewParser(filename)
|
pbfFile, err := pbf.NewParser(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -92,15 +93,17 @@ func FromPbf(filename string, before time.Duration) (*DiffState, error) {
|
||||||
timestamp = fstat.ModTime()
|
timestamp = fstat.ModTime()
|
||||||
}
|
}
|
||||||
|
|
||||||
replicationUrl := "https://planet.openstreetmap.org/replication/minute/"
|
if replicationUrl == "" {
|
||||||
|
replicationUrl = "https://planet.openstreetmap.org/replication/minute/"
|
||||||
|
}
|
||||||
|
|
||||||
seq := estimateSequence(replicationUrl, timestamp)
|
seq := estimateSequence(replicationUrl, replicationInterval, timestamp)
|
||||||
if seq == 0 {
|
if seq == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// start earlier
|
// start earlier
|
||||||
seq -= int(before.Minutes())
|
seq -= int(math.Ceil(before.Minutes() / replicationInterval.Minutes()))
|
||||||
return &DiffState{Time: timestamp, Url: replicationUrl, Sequence: seq}, nil
|
return &DiffState{Time: timestamp, Url: replicationUrl, Sequence: seq}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -186,16 +189,16 @@ func currentState(url string) (*DiffState, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if resp.StatusCode != 200 {
|
if resp.StatusCode != 200 {
|
||||||
return nil, errors.New(fmt.Sprintf("invalid repsonse: %v", resp))
|
return nil, errors.New(fmt.Sprintf("invalid response: %v", resp))
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
return Parse(resp.Body)
|
return Parse(resp.Body)
|
||||||
}
|
}
|
||||||
|
|
||||||
func estimateSequence(url string, timestamp time.Time) int {
|
func estimateSequence(url string, interval time.Duration, timestamp time.Time) int {
|
||||||
state, err := currentState(url)
|
state, err := currentState(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// try a second time befor failing
|
// try a second time before failing
|
||||||
log.Warn("unable to fetch current state from ", url, ":", err, ", retry in 30s")
|
log.Warn("unable to fetch current state from ", url, ":", err, ", retry in 30s")
|
||||||
time.Sleep(time.Second * 30)
|
time.Sleep(time.Second * 30)
|
||||||
state, err = currentState(url)
|
state, err = currentState(url)
|
||||||
|
@ -205,6 +208,7 @@ func estimateSequence(url string, timestamp time.Time) int {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
behind := state.Time.Sub(timestamp)
|
behind := state.Time.Sub(timestamp)
|
||||||
return state.Sequence - int(behind.Minutes())
|
// Sequence unit depends on replication interval (minute, hour, day).
|
||||||
|
return state.Sequence - int(math.Ceil(behind.Minutes() / interval.Minutes()))
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFromPBF(t *testing.T) {
|
func TestFromPBF(t *testing.T) {
|
||||||
state, err := FromPbf("../../parser/pbf/monaco-20150428.osm.pbf", time.Hour*1)
|
state, err := FromPbf("../../parser/pbf/monaco-20150428.osm.pbf", time.Hour*1, "", time.Minute*1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue