From c5be424c8cedb79869fda6127f9a56dd97f7c3d4 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Mon, 22 Jan 2018 19:41:56 +0100 Subject: [PATCH 1/8] Replace default ReplicationInterval if it still equals default --- config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index e499bdb..0b589a1 100644 --- a/config/config.go +++ b/config/config.go @@ -126,7 +126,7 @@ func (o *_BaseOptions) updateFromConfig() error { o.ExpireTilesZoom = 14 } - if conf.ReplicationInterval.Duration != 0 && o.ReplicationInterval != time.Minute { + if conf.ReplicationInterval.Duration != 0 && o.ReplicationInterval == time.Minute { o.ReplicationInterval = conf.ReplicationInterval.Duration } if o.ReplicationInterval < time.Minute { From cfb1be89d1e87876ba01b58224db159cdc93cbe5 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Mon, 22 Jan 2018 19:44:56 +0100 Subject: [PATCH 2/8] Make run command more verbose --- replication/source.go | 10 +++++++--- update/run.go | 2 ++ update/state/state.go | 4 ++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/replication/source.go b/replication/source.go index 1e76563..188d5c8 100644 --- a/replication/source.go +++ b/replication/source.go @@ -92,6 +92,7 @@ func (d *downloader) Sequences() <-chan Sequence { func (d *downloader) download(seq int, ext string) error { dest := path.Join(d.dest, seqPath(seq)+ext) url := d.baseUrl + seqPath(seq) + ext + log.Print("Downloading diff file from ", url) if _, err := os.Stat(dest); err == nil { return nil @@ -121,11 +122,12 @@ func (d *downloader) download(seq int, ext string) error { defer resp.Body.Close() if resp.StatusCode == 404 { + log.Print("Remote file does not exist ", url) return NotAvailable } if resp.StatusCode != 200 { - return errors.New(fmt.Sprintf("invalid repsonse: %v", resp)) + return errors.New(fmt.Sprintf("invalid response: %v", resp)) } _, err = io.Copy(out, resp.Body) @@ -161,17 +163,19 @@ func (d *downloader) fetchNextLoop() { stateFile := path.Join(d.dest, seqPath(d.lastSequence)+d.stateExt) lastTime, err := d.stateTime(stateFile) for { + nextSeq := d.lastSequence + 1 + log.Print("Processing sequence ", nextSeq, err) if err == nil { nextDiffTime := lastTime.Add(d.interval) if nextDiffTime.After(time.Now()) { // we catched up and the next diff file is in the future. // wait till last diff time + interval, before fetching next - nextDiffTime = lastTime.Add(d.interval + 2*time.Second /* allow small time diff between server*/) + nextDiffTime = lastTime.Add(d.interval + 2*time.Second /* allow small time diff between servers */) waitFor := nextDiffTime.Sub(time.Now()) + log.Print("Next process in ", waitFor) time.Sleep(waitFor) } } - nextSeq := d.lastSequence + 1 // download will retry until they succeed d.downloadTillSuccess(nextSeq, d.stateExt) d.downloadTillSuccess(nextSeq, d.fileExt) diff --git a/update/run.go b/update/run.go index 4484e74..8055406 100644 --- a/update/run.go +++ b/update/run.go @@ -50,6 +50,8 @@ func Run() { log.Fatal("no replicationUrl in last.state.txt " + "or replication_url in -config file") } + logger.Print("Replication URL: " + replicationUrl) + logger.Print("Replication interval: ", config.BaseOptions.ReplicationInterval) downloader := replication.NewDiffDownloader( config.BaseOptions.DiffDir, diff --git a/update/state/state.go b/update/state/state.go index 615a9a8..b0ec965 100644 --- a/update/state/state.go +++ b/update/state/state.go @@ -186,7 +186,7 @@ func currentState(url string) (*DiffState, error) { return nil, err } if resp.StatusCode != 200 { - return nil, errors.New(fmt.Sprintf("invalid repsonse: %v", resp)) + return nil, errors.New(fmt.Sprintf("invalid response: %v", resp)) } defer resp.Body.Close() return Parse(resp.Body) @@ -195,7 +195,7 @@ func currentState(url string) (*DiffState, error) { func estimateSequence(url string, timestamp time.Time) int { state, err := currentState(url) if err != nil { - // try a second time befor failing + // try a second time before failing log.Warn("unable to fetch current state from ", url, ":", err, ", retry in 30s") time.Sleep(time.Second * 30) state, err = currentState(url) From f2381554f92fa6976f13edf0d6c6a6644682a12b Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Mon, 22 Jan 2018 19:45:33 +0100 Subject: [PATCH 3/8] Use URL defined in config to fetch replication state --- update/state/state.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/update/state/state.go b/update/state/state.go index b0ec965..d6dd9f8 100644 --- a/update/state/state.go +++ b/update/state/state.go @@ -12,6 +12,7 @@ import ( "strings" "time" + "github.com/omniscale/imposm3/config" "github.com/omniscale/imposm3/logging" "github.com/omniscale/imposm3/parser/pbf" ) @@ -92,7 +93,10 @@ func FromPbf(filename string, before time.Duration) (*DiffState, error) { timestamp = fstat.ModTime() } - replicationUrl := "https://planet.openstreetmap.org/replication/minute/" + replicationUrl := config.BaseOptions.ReplicationUrl + if replicationUrl == "" { + replicationUrl = "https://planet.openstreetmap.org/replication/minute/" + } seq := estimateSequence(replicationUrl, timestamp) if seq == 0 { From e277d8da245f04aa3f2d37bc9def91094a852fb4 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Mon, 22 Jan 2018 19:46:27 +0100 Subject: [PATCH 4/8] Adapt estimateSequence to replication interval --- update/state/state.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/update/state/state.go b/update/state/state.go index d6dd9f8..f5bfcbc 100644 --- a/update/state/state.go +++ b/update/state/state.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math" "net/http" "os" "path" @@ -209,6 +210,7 @@ func estimateSequence(url string, timestamp time.Time) int { } } - behind := state.Time.Sub(timestamp) - return state.Sequence - int(behind.Minutes()) + behind := state.Time.Sub(timestamp) + // Sequence unit depends on replication interval (minute, hour, day). + return state.Sequence - int(math.Ceil(behind.Minutes() / config.BaseOptions.ReplicationInterval.Minutes())) } From 6f99b41d1392982f868b5f2ea08c44f26d8f6038 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Fri, 26 Jan 2018 09:35:31 +0100 Subject: [PATCH 5/8] Diff: adapt wait time according to replication interval --- replication/source.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/replication/source.go b/replication/source.go index 188d5c8..3bd9042 100644 --- a/replication/source.go +++ b/replication/source.go @@ -71,13 +71,23 @@ func newDownloader(dest, url string, seq int, interval time.Duration) *downloade }, } + var naWaittime time.Duration + switch interval { + case 24 * time.Hour: + naWaittime = 5 * time.Minute + case time.Hour: + naWaittime = 60 * time.Second + default: + naWaittime = 10 * time.Second + } + dl := &downloader{ baseUrl: url, dest: dest, lastSequence: seq, interval: interval, errWaittime: 60 * time.Second, - naWaittime: 10 * time.Second, + naWaittime: naWaittime, sequences: make(chan Sequence, 1), client: client, } From a8ae4dcbdcb1c86e2053a559abdbb61e47293322 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Fri, 26 Jan 2018 11:30:28 +0100 Subject: [PATCH 6/8] Make diff_state_before default to 0 --- config/config.go | 4 ++-- docs/tutorial.rst | 4 +++- replication/source.go | 2 +- update/state/state.go | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/config/config.go b/config/config.go index 0b589a1..7ef26e4 100644 --- a/config/config.go +++ b/config/config.go @@ -143,7 +143,7 @@ func (o *_BaseOptions) updateFromConfig() error { } } - if conf.DiffStateBefore.Duration != 0 && o.DiffStateBefore == 2*time.Hour { + if conf.DiffStateBefore.Duration != 0 && o.DiffStateBefore == 0 { o.DiffStateBefore = conf.DiffStateBefore.Duration } return nil @@ -226,7 +226,7 @@ func init() { ImportFlags.BoolVar(&ImportOptions.DeployProduction, "deployproduction", false, "deploy production") ImportFlags.BoolVar(&ImportOptions.RevertDeploy, "revertdeploy", false, "revert deploy to production") ImportFlags.BoolVar(&ImportOptions.RemoveBackup, "removebackup", false, "remove backups from deploy") - ImportFlags.DurationVar(&BaseOptions.DiffStateBefore, "diff-state-before", 2*time.Hour, "set initial diff sequence before") + ImportFlags.DurationVar(&BaseOptions.DiffStateBefore, "diff-state-before", 0, "set initial diff sequence before") DiffFlags.StringVar(&BaseOptions.ExpireTilesDir, "expiretiles-dir", "", "write expire tiles into dir") DiffFlags.IntVar(&BaseOptions.ExpireTilesZoom, "expiretiles-zoom", 14, "write expire tiles in this zoom level") diff --git a/docs/tutorial.rst b/docs/tutorial.rst index 81531de..b6d60d4 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -210,7 +210,9 @@ To start the update process:: You can stop processing new diff files SIGTERM (``crtl-c``), SIGKILL or SIGHUP. You should create systemd/upstart/init.d service for ``imposm3 run`` to always run in background. -You can change to hourly updates by adding `replication_url: "https://planet.openstreetmap.org/replication/hour/"` and `replication_interval: "1h"` to the Imposm configuration. +You can change to hourly updates by adding `replication_url: "https://planet.openstreetmap.org/replication/hour/"` and `replication_interval: "1h"` to the Imposm configuration. Same for daily updates (works also for Geofabrik updates): `replication_url: "https://planet.openstreetmap.org/replication/day/"` and `replication_interval: "24h"`. + +At import time, Imposm compute the first diff sequence number by comparing the PBF input file timestamp and the latest state available in the remote server. Depending on the PBF generation process, this sequence number may not be correct, you can force Imposm to start with an earlier sequence number by adding a `diff_state_before` duration in your conf file. For example, `diff_state_before: 4h` will start with an initial sequence number generated 4 hours before the PBF generation time. One-time update diff --git a/replication/source.go b/replication/source.go index 3bd9042..1d39caa 100644 --- a/replication/source.go +++ b/replication/source.go @@ -174,7 +174,7 @@ func (d *downloader) fetchNextLoop() { lastTime, err := d.stateTime(stateFile) for { nextSeq := d.lastSequence + 1 - log.Print("Processing sequence ", nextSeq, err) + log.Print("Processing sequence ", nextSeq) if err == nil { nextDiffTime := lastTime.Add(d.interval) if nextDiffTime.After(time.Now()) { diff --git a/update/state/state.go b/update/state/state.go index f5bfcbc..548709c 100644 --- a/update/state/state.go +++ b/update/state/state.go @@ -105,7 +105,7 @@ func FromPbf(filename string, before time.Duration) (*DiffState, error) { } // start earlier - seq -= int(before.Minutes()) + seq -= int(before.Minutes() / config.BaseOptions.ReplicationInterval.Minutes()) return &DiffState{Time: timestamp, Url: replicationUrl, Sequence: seq}, nil } From 25f165d74ee27b1038dc6f9d5053dc4f1868e183 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Tue, 30 Jan 2018 14:00:43 +0100 Subject: [PATCH 7/8] Log when remote file is not available --- replication/source.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/replication/source.go b/replication/source.go index 1d39caa..2a91ec1 100644 --- a/replication/source.go +++ b/replication/source.go @@ -19,7 +19,13 @@ import ( var log = logging.NewLogger("replication") -var NotAvailable = errors.New("file not available") +type NotAvailable struct { + url string +} + +func (e *NotAvailable) Error() string { + return fmt.Sprintf("File not available: %s", e.url) +} type Sequence struct { Filename string @@ -132,8 +138,7 @@ func (d *downloader) download(seq int, ext string) error { defer resp.Body.Close() if resp.StatusCode == 404 { - log.Print("Remote file does not exist ", url) - return NotAvailable + return &NotAvailable{url} } if resp.StatusCode != 200 { @@ -160,7 +165,8 @@ func (d *downloader) downloadTillSuccess(seq int, ext string) { if err == nil { break } - if err == NotAvailable { + if err, ok := err.(*NotAvailable); ok { + log.Print(err) time.Sleep(d.naWaittime) } else { log.Warn(err) From 9e131ca1166d16e19c1086e6b72bda754bd27242 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Sat, 7 Apr 2018 21:21:56 +0200 Subject: [PATCH 8/8] Pass replicationUrl and replicationInterval as parameters Instead of reading them from the config, per consistency. --- import_/import.go | 2 +- update/state/state.go | 12 +++++------- update/state/state_test.go | 2 +- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/import_/import.go b/import_/import.go index 00ee923..bf3d945 100644 --- a/import_/import.go +++ b/import_/import.go @@ -125,7 +125,7 @@ func Import() { osmCache.Close() log.StopStep(step) if config.ImportOptions.Diff { - diffstate, err := state.FromPbf(config.ImportOptions.Read, config.BaseOptions.DiffStateBefore) + diffstate, err := state.FromPbf(config.ImportOptions.Read, config.BaseOptions.DiffStateBefore, config.BaseOptions.ReplicationUrl, config.BaseOptions.ReplicationInterval) if err != nil { log.Print("error parsing diff state form PBF", err) } else if diffstate != nil { diff --git a/update/state/state.go b/update/state/state.go index 548709c..1da40e9 100644 --- a/update/state/state.go +++ b/update/state/state.go @@ -13,7 +13,6 @@ import ( "strings" "time" - "github.com/omniscale/imposm3/config" "github.com/omniscale/imposm3/logging" "github.com/omniscale/imposm3/parser/pbf" ) @@ -78,7 +77,7 @@ func FromOscGz(oscFile string) (*DiffState, error) { return ParseFile(stateFile) } -func FromPbf(filename string, before time.Duration) (*DiffState, error) { +func FromPbf(filename string, before time.Duration, replicationUrl string, replicationInterval time.Duration) (*DiffState, error) { pbfFile, err := pbf.NewParser(filename) if err != nil { return nil, err @@ -94,18 +93,17 @@ func FromPbf(filename string, before time.Duration) (*DiffState, error) { timestamp = fstat.ModTime() } - replicationUrl := config.BaseOptions.ReplicationUrl if replicationUrl == "" { replicationUrl = "https://planet.openstreetmap.org/replication/minute/" } - seq := estimateSequence(replicationUrl, timestamp) + seq := estimateSequence(replicationUrl, replicationInterval, timestamp) if seq == 0 { return nil, nil } // start earlier - seq -= int(before.Minutes() / config.BaseOptions.ReplicationInterval.Minutes()) + seq -= int(math.Ceil(before.Minutes() / replicationInterval.Minutes())) return &DiffState{Time: timestamp, Url: replicationUrl, Sequence: seq}, nil } @@ -197,7 +195,7 @@ func currentState(url string) (*DiffState, error) { return Parse(resp.Body) } -func estimateSequence(url string, timestamp time.Time) int { +func estimateSequence(url string, interval time.Duration, timestamp time.Time) int { state, err := currentState(url) if err != nil { // try a second time before failing @@ -212,5 +210,5 @@ func estimateSequence(url string, timestamp time.Time) int { behind := state.Time.Sub(timestamp) // Sequence unit depends on replication interval (minute, hour, day). - return state.Sequence - int(math.Ceil(behind.Minutes() / config.BaseOptions.ReplicationInterval.Minutes())) + return state.Sequence - int(math.Ceil(behind.Minutes() / interval.Minutes())) } diff --git a/update/state/state_test.go b/update/state/state_test.go index 7f5ca8b..bac76c3 100644 --- a/update/state/state_test.go +++ b/update/state/state_test.go @@ -6,7 +6,7 @@ import ( ) func TestFromPBF(t *testing.T) { - state, err := FromPbf("../../parser/pbf/monaco-20150428.osm.pbf", time.Hour*1) + state, err := FromPbf("../../parser/pbf/monaco-20150428.osm.pbf", time.Hour*1, "", time.Minute*1) if err != nil { t.Fatal(err) }