From 9b181804da2f08d5c117696cf9484b9b32405a4c Mon Sep 17 00:00:00 2001 From: Oliver Tonnhofer Date: Wed, 7 Dec 2016 16:43:12 +0100 Subject: [PATCH] refactor diff downloader into replication package; add chagneset downloader --- replication/changes.go | 42 +++++ replication/diff.go | 24 +++ replication/source.go | 185 ++++++++++++++++++++ replication/source_test.go | 15 ++ update/download/download.go | 288 ------------------------------- update/download/download_test.go | 15 -- update/run.go | 34 +++- update/state/state.go | 12 +- 8 files changed, 298 insertions(+), 317 deletions(-) create mode 100644 replication/changes.go create mode 100644 replication/diff.go create mode 100644 replication/source.go create mode 100644 replication/source_test.go delete mode 100644 update/download/download.go delete mode 100644 update/download/download_test.go diff --git a/replication/changes.go b/replication/changes.go new file mode 100644 index 0000000..32c3131 --- /dev/null +++ b/replication/changes.go @@ -0,0 +1,42 @@ +package replication + +import ( + "io/ioutil" + "time" + + "gopkg.in/yaml.v2" +) + +func NewChangesetDownloader(dest, url string, seq int, interval time.Duration) *downloader { + dl := newDownloader(dest, url, seq, interval) + dl.fileExt = ".osm.gz" + dl.stateExt = ".state.txt" + dl.stateTime = parseYamlTime + go dl.fetchNextLoop() + return dl +} + +type changesetState struct { + Time time.Time `yaml:"last_run"` + Sequence int `yaml:"sequence"` +} + +func parseYamlState(filename string) (changesetState, error) { + b, err := ioutil.ReadFile(filename) + if err != nil { + return changesetState{}, err + } + state := changesetState{} + if err := yaml.Unmarshal(b, &state); err != nil { + return changesetState{}, err + } + return state, nil +} + +func parseYamlTime(filename string) (time.Time, error) { + state, err := parseYamlState(filename) + if err != nil { + return time.Time{}, err + } + return state.Time, nil +} diff --git a/replication/diff.go b/replication/diff.go new file mode 100644 index 0000000..92ed971 --- /dev/null +++ b/replication/diff.go @@ -0,0 +1,24 @@ +package replication + +import ( + "time" + + "github.com/omniscale/imposm3/update/state" +) + +func NewDiffDownloader(dest, url string, seq int, interval time.Duration) *downloader { + dl := newDownloader(dest, url, seq, interval) + dl.fileExt = ".osc.gz" + dl.stateExt = ".state.txt" + dl.stateTime = parseTxtTime + go dl.fetchNextLoop() + return dl +} + +func parseTxtTime(filename string) (time.Time, error) { + ds, err := state.ParseFile(filename) + if err != nil { + return time.Time{}, err + } + return ds.Time, nil +} diff --git a/replication/source.go b/replication/source.go new file mode 100644 index 0000000..35e40d9 --- /dev/null +++ b/replication/source.go @@ -0,0 +1,185 @@ +package replication + +import ( + "errors" + "fmt" + "io" + "net" + "net/http" + "os" + "path" + "time" + + "github.com/omniscale/imposm3" + "github.com/omniscale/imposm3/logging" +) + +var log = logging.NewLogger("replication") + +var NotAvailable = errors.New("file not available") + +type Sequence struct { + Filename string + StateFilename string + Time time.Time + Sequence int +} + +type Source interface { + Sequences() <-chan Sequence +} + +// N = AAA*1000000 + BBB*1000 + CCC +func seqPath(seq int) string { + c := seq % 1000 + b := seq / 1000 % 1000 + a := seq / 1000000 + + return fmt.Sprintf("%03d/%03d/%03d", a, b, c) +} + +var _ Source = &downloader{} + +type downloader struct { + baseUrl string + dest string + fileExt string + stateExt string + lastSequence int + stateTime func(string) (time.Time, error) + interval time.Duration + errWaittime time.Duration + naWaittime time.Duration + sequences chan Sequence + client *http.Client +} + +func newDownloader(dest, url string, seq int, interval time.Duration) *downloader { + client := &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 1 * time.Second, // do not keep alive till next interval + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, + ResponseHeaderTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + } + + dl := &downloader{ + baseUrl: url, + dest: dest, + lastSequence: seq, + interval: interval, + errWaittime: 60 * time.Second, + naWaittime: 10 * time.Second, + sequences: make(chan Sequence, 1), + client: client, + } + + return dl +} + +func (d *downloader) Sequences() <-chan Sequence { + return d.sequences +} + +func (d *downloader) download(seq int, ext string) error { + dest := path.Join(d.dest, seqPath(seq)+ext) + url := d.baseUrl + seqPath(seq) + ext + + if _, err := os.Stat(dest); err == nil { + return nil + } + + if err := os.MkdirAll(path.Dir(dest), 0755); err != nil { + return err + } + + tmpDest := fmt.Sprintf("%s~%d", dest, os.Getpid()) + out, err := os.Create(tmpDest) + if err != nil { + return err + } + defer out.Close() + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return err + } + req.Header.Set("User-Agent", "Imposm3 "+imposm3.Version) + resp, err := d.client.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() + + if resp.StatusCode == 404 { + return NotAvailable + } + + if resp.StatusCode != 200 { + return errors.New(fmt.Sprintf("invalid repsonse: %v", resp)) + } + + _, err = io.Copy(out, resp.Body) + if err != nil { + return err + } + out.Close() + + err = os.Rename(tmpDest, dest) + if err != nil { + return err + } + + return nil +} + +func (d *downloader) downloadTillSuccess(seq int, ext string) { + for { + err := d.download(seq, ext) + if err == nil { + break + } + if err == NotAvailable { + time.Sleep(d.naWaittime) + } else { + log.Warn(err) + time.Sleep(d.errWaittime) + } + } +} + +func (d *downloader) fetchNextLoop() { + stateFile := path.Join(d.dest, seqPath(d.lastSequence)+d.stateExt) + lastTime, err := d.stateTime(stateFile) + for { + if err == nil { + nextDiffTime := lastTime.Add(d.interval) + if nextDiffTime.After(time.Now()) { + // we catched up and the next diff file is in the future. + // wait till last diff time + interval, before fetching next + nextDiffTime = lastTime.Add(d.interval + 2*time.Second /* allow small time diff between server*/) + waitFor := nextDiffTime.Sub(time.Now()) + time.Sleep(waitFor) + } + } + nextSeq := d.lastSequence + 1 + // download will retry until they succeed + d.downloadTillSuccess(nextSeq, d.stateExt) + d.downloadTillSuccess(nextSeq, d.fileExt) + d.lastSequence = nextSeq + base := path.Join(d.dest, seqPath(d.lastSequence)) + lastTime, _ = d.stateTime(base + d.stateExt) + d.sequences <- Sequence{ + Sequence: d.lastSequence, + Filename: base + d.fileExt, + StateFilename: base + d.stateExt, + Time: lastTime, + } + } +} diff --git a/replication/source_test.go b/replication/source_test.go new file mode 100644 index 0000000..344a57d --- /dev/null +++ b/replication/source_test.go @@ -0,0 +1,15 @@ +package replication + +import "testing" + +func TestSeqPath(t *testing.T) { + if path := seqPath(0); path != "000/000/000" { + t.Fatal(path) + } + if path := seqPath(3069); path != "000/003/069" { + t.Fatal(path) + } + if path := seqPath(123456789); path != "123/456/789" { + t.Fatal(path) + } +} diff --git a/update/download/download.go b/update/download/download.go deleted file mode 100644 index fc58e48..0000000 --- a/update/download/download.go +++ /dev/null @@ -1,288 +0,0 @@ -package download - -import ( - "bytes" - "errors" - "fmt" - "io" - "io/ioutil" - "net" - "net/http" - "os" - "path" - "time" - - "github.com/omniscale/imposm3" - "github.com/omniscale/imposm3/logging" - "github.com/omniscale/imposm3/update/state" -) - -var log = logging.NewLogger("downloader") - -// N = AAA*1000000 + BBB*1000 + CCC -func diffPath(sequenceNumber seqId) string { - c := sequenceNumber % 1000 - b := sequenceNumber / 1000 % 1000 - a := sequenceNumber / 1000000 - - return fmt.Sprintf("%03d/%03d/%03d", a, b, c) -} - -type seqId int32 - -type Diff struct { - FileName string - State *state.DiffState -} - -type diffDownload struct { - url string - dest string - lastSequence seqId - diffInterval time.Duration - errWaittime time.Duration - naWaittime time.Duration - NextDiff chan Diff - client *http.Client -} - -type NotAvailable struct { - Url string - Sequence seqId -} - -func (na NotAvailable) Error() string { - return fmt.Sprintf("OSC #%d not available at %s", na.Sequence, na.Url) -} - -func NewDiffDownload(dest, url string, interval time.Duration) (*diffDownload, error) { - s, err := state.ParseLastState(dest) - if err != nil { - return nil, err - } - if url == "" { - url = s.Url - } - if url == "" { - return nil, errors.New("no replicationUrl in last.state.txt " + - "or replication_url in -config file") - } - client := &http.Client{ - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - Dial: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 1 * time.Second, // do not keep alive till next interval - }).Dial, - TLSHandshakeTimeout: 10 * time.Second, - ResponseHeaderTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - }, - } - - downloader := &diffDownload{ - url: url, - dest: dest, - lastSequence: seqId(s.Sequence), - diffInterval: interval, - errWaittime: 60 * time.Second, - naWaittime: 10 * time.Second, - NextDiff: make(chan Diff, 1), - client: client, - } - - go downloader.fetchNextLoop() - return downloader, nil -} - -func (d *diffDownload) oscFileName(sequence seqId) string { - return path.Join(d.dest, diffPath(sequence)) + ".osc.gz" -} - -func (d *diffDownload) oscStateFileName(sequence seqId) string { - return path.Join(d.dest, diffPath(sequence)) + ".state.txt" -} - -func (d *diffDownload) downloadDiff(sequence seqId) error { - dest := d.oscFileName(sequence) - - if _, err := os.Stat(dest); err == nil { - return nil - } - - err := os.MkdirAll(path.Dir(dest), 0755) - - if err != nil { - return err - } - - tmpDest := fmt.Sprintf("%s~%d", dest, os.Getpid()) - out, err := os.Create(tmpDest) - if err != nil { - return err - } - defer out.Close() - - req, err := http.NewRequest("GET", d.url+diffPath(sequence)+".osc.gz", nil) - if err != nil { - return err - } - req.Header.Set("User-Agent", "Imposm3 "+imposm3.Version) - resp, err := d.client.Do(req) - if err != nil { - return err - } - - defer resp.Body.Close() - - if resp.StatusCode == 404 { - return NotAvailable{d.url, sequence} - } - - if resp.StatusCode != 200 { - return errors.New(fmt.Sprintf("invalid repsonse: %v", resp)) - } - - _, err = io.Copy(out, resp.Body) - if err != nil { - return err - } - out.Close() - - err = os.Rename(tmpDest, dest) - if err != nil { - return err - } - - return nil -} - -func (d *diffDownload) downloadState(sequence seqId) (*state.DiffState, error) { - dest := path.Join(d.dest, diffPath(sequence)) + ".state.txt" - - if _, err := os.Stat(dest); err == nil { - return state.ParseFile(dest) - } - - err := os.MkdirAll(path.Dir(dest), 0755) - if err != nil { - return nil, err - } - - url := d.url + diffPath(sequence) + ".state.txt" - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return nil, err - } - req.Header.Set("User-Agent", "Imposm3 "+imposm3.Version) - resp, err := d.client.Do(req) - - if err != nil { - return nil, err - } - defer resp.Body.Close() - - if resp.StatusCode == 404 { - return nil, NotAvailable{d.url, sequence} - } - - if resp.StatusCode != 200 { - return nil, errors.New(fmt.Sprintf("invalid repsonse from %s: %v", url, resp)) - } - - buf := &bytes.Buffer{} - _, err = io.Copy(buf, resp.Body) - if err != nil { - return nil, err - } - - err = ioutil.WriteFile(dest, buf.Bytes(), 0644) - if err != nil { - return nil, err - } - - reader := bytes.NewReader(buf.Bytes()) - return state.Parse(reader) -} - -func (d *diffDownload) fetchNextLoop() { - for { - stateFile := path.Join(d.dest, diffPath(d.lastSequence)) + ".state.txt" - s, err := state.ParseFile(stateFile) - if err == nil { - nextDiffTime := s.Time.Add(d.diffInterval) - if nextDiffTime.After(time.Now()) { - // we catched up and the next diff file is in the future. - // wait till last diff time + interval, before fetching next - nextDiffTime = s.Time.Add(d.diffInterval + 2*time.Second /* allow small time diff between server*/) - waitFor := nextDiffTime.Sub(time.Now()) - time.Sleep(waitFor) - } - } - nextSeq := d.lastSequence + 1 - // downloadXxxTillSuccess will retry until they succeed - d.downloadStateTillSuccess(nextSeq) - d.downloadDiffTillSuccess(nextSeq) - d.lastSequence = nextSeq - state, _ := state.ParseFile(d.oscStateFileName(nextSeq)) - d.NextDiff <- Diff{FileName: d.oscFileName(nextSeq), State: state} - } -} - -func (d *diffDownload) downloadStateTillSuccess(seq seqId) { - for { - _, err := d.downloadState(seq) - if err == nil { - break - } - if _, ok := err.(NotAvailable); ok { - time.Sleep(d.naWaittime) - } else { - log.Warn(err) - time.Sleep(d.errWaittime) - } - } -} - -func (d *diffDownload) downloadDiffTillSuccess(seq seqId) { - for { - err := d.downloadDiff(seq) - if err == nil { - break - } - if _, ok := err.(NotAvailable); ok { - time.Sleep(d.naWaittime) - } else { - log.Warn(err) - time.Sleep(d.errWaittime) - } - } -} - -func (d *diffDownload) currentState() (*state.DiffState, error) { - resp, err := http.Get(d.url + "state.txt") - if err != nil { - return nil, err - } - if resp.StatusCode != 200 { - return nil, errors.New(fmt.Sprintf("invalid repsonse: %v", resp)) - } - defer resp.Body.Close() - return state.Parse(resp.Body) -} - -func (d *diffDownload) DownloadSince(since time.Time) error { - state, err := d.currentState() - if err != nil { - return err - } - - for since.Before(state.Time) { - state, err = d.downloadState(seqId(state.Sequence - 1)) - fmt.Println(state) - if err != nil { - return err - } - } - return nil -} diff --git a/update/download/download_test.go b/update/download/download_test.go deleted file mode 100644 index 9d51903..0000000 --- a/update/download/download_test.go +++ /dev/null @@ -1,15 +0,0 @@ -package download - -import "testing" - -func TestDiffPath(t *testing.T) { - if path := diffPath(0); path != "000/000/000" { - t.Fatal(path) - } - if path := diffPath(3069); path != "000/003/069" { - t.Fatal(path) - } - if path := diffPath(123456789); path != "123/456/789" { - t.Fatal(path) - } -} diff --git a/update/run.go b/update/run.go index 4dcc0fa..4484e74 100644 --- a/update/run.go +++ b/update/run.go @@ -12,7 +12,8 @@ import ( "github.com/omniscale/imposm3/expire" "github.com/omniscale/imposm3/geom/limit" "github.com/omniscale/imposm3/logging" - "github.com/omniscale/imposm3/update/download" + "github.com/omniscale/imposm3/replication" + "github.com/omniscale/imposm3/update/state" ) var logger = logging.NewLogger("") @@ -36,11 +37,27 @@ func Run() { } logger.StopStep(step) } - downloader, err := download.NewDiffDownload(config.BaseOptions.DiffDir, - config.BaseOptions.ReplicationUrl, config.BaseOptions.ReplicationInterval) + + s, err := state.ParseLastState(config.BaseOptions.DiffDir) if err != nil { - logger.Fatal("unable to start diff downloader", err) + log.Fatal("unable to read last.state.txt", err) } + replicationUrl := config.BaseOptions.ReplicationUrl + if replicationUrl == "" { + replicationUrl = s.Url + } + if replicationUrl == "" { + log.Fatal("no replicationUrl in last.state.txt " + + "or replication_url in -config file") + } + + downloader := replication.NewDiffDownloader( + config.BaseOptions.DiffDir, + replicationUrl, + s.Sequence, + config.BaseOptions.ReplicationInterval, + ) + nextSeq := downloader.Sequences() osmCache := cache.NewOSMCache(config.BaseOptions.CacheDir) err = osmCache.Open() @@ -87,11 +104,12 @@ func Run() { select { case <-sigc: shutdown() - case nextDiff := <-downloader.NextDiff: - fname := nextDiff.FileName - state := nextDiff.State + case seq := <-nextSeq: + fname := seq.Filename + seqId := seq.Sequence + seqTime := seq.Time for { - p := logger.StartStep(fmt.Sprintf("importing #%d till %s", state.Sequence, state.Time)) + p := logger.StartStep(fmt.Sprintf("importing #%d till %s", seqId, seqTime)) err := Update(fname, geometryLimiter, tileExpireor, osmCache, diffCache, false) diff --git a/update/state/state.go b/update/state/state.go index 0630e3a..9d62b90 100644 --- a/update/state/state.go +++ b/update/state/state.go @@ -20,7 +20,7 @@ var log = logging.NewLogger("diff") type DiffState struct { Time time.Time - Sequence int32 + Sequence int Url string } @@ -100,7 +100,7 @@ func FromPbf(filename string, before time.Duration) (*DiffState, error) { } // start earlier - seq -= int32(before.Minutes()) + seq -= int(before.Minutes()) return &DiffState{Time: timestamp, Url: replicationUrl, Sequence: seq}, nil } @@ -171,13 +171,13 @@ func parseTimeStamp(value string) (time.Time, error) { return time.Parse(timestampFormat, value) } -func parseSequence(value string) (int32, error) { +func parseSequence(value string) (int, error) { if value == "" { log.Warn("missing sequenceNumber in state file") return 0, nil } val, err := strconv.ParseInt(value, 10, 32) - return int32(val), err + return int(val), err } func currentState(url string) (*DiffState, error) { @@ -192,7 +192,7 @@ func currentState(url string) (*DiffState, error) { return Parse(resp.Body) } -func estimateSequence(url string, timestamp time.Time) int32 { +func estimateSequence(url string, timestamp time.Time) int { state, err := currentState(url) if err != nil { // try a second time befor failing @@ -206,5 +206,5 @@ func estimateSequence(url string, timestamp time.Time) int32 { } behind := state.Time.Sub(timestamp) - return state.Sequence - int32(behind.Minutes()) + return state.Sequence - int(behind.Minutes()) }