refactor diff downloader into replication package; add chagneset downloader

master
Oliver Tonnhofer 2016-12-07 16:43:12 +01:00
parent c84d5aacb2
commit 9b181804da
8 changed files with 298 additions and 317 deletions

42
replication/changes.go Normal file
View File

@ -0,0 +1,42 @@
package replication
import (
"io/ioutil"
"time"
"gopkg.in/yaml.v2"
)
func NewChangesetDownloader(dest, url string, seq int, interval time.Duration) *downloader {
dl := newDownloader(dest, url, seq, interval)
dl.fileExt = ".osm.gz"
dl.stateExt = ".state.txt"
dl.stateTime = parseYamlTime
go dl.fetchNextLoop()
return dl
}
type changesetState struct {
Time time.Time `yaml:"last_run"`
Sequence int `yaml:"sequence"`
}
func parseYamlState(filename string) (changesetState, error) {
b, err := ioutil.ReadFile(filename)
if err != nil {
return changesetState{}, err
}
state := changesetState{}
if err := yaml.Unmarshal(b, &state); err != nil {
return changesetState{}, err
}
return state, nil
}
func parseYamlTime(filename string) (time.Time, error) {
state, err := parseYamlState(filename)
if err != nil {
return time.Time{}, err
}
return state.Time, nil
}

24
replication/diff.go Normal file
View File

@ -0,0 +1,24 @@
package replication
import (
"time"
"github.com/omniscale/imposm3/update/state"
)
func NewDiffDownloader(dest, url string, seq int, interval time.Duration) *downloader {
dl := newDownloader(dest, url, seq, interval)
dl.fileExt = ".osc.gz"
dl.stateExt = ".state.txt"
dl.stateTime = parseTxtTime
go dl.fetchNextLoop()
return dl
}
func parseTxtTime(filename string) (time.Time, error) {
ds, err := state.ParseFile(filename)
if err != nil {
return time.Time{}, err
}
return ds.Time, nil
}

185
replication/source.go Normal file
View File

@ -0,0 +1,185 @@
package replication
import (
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"path"
"time"
"github.com/omniscale/imposm3"
"github.com/omniscale/imposm3/logging"
)
var log = logging.NewLogger("replication")
var NotAvailable = errors.New("file not available")
type Sequence struct {
Filename string
StateFilename string
Time time.Time
Sequence int
}
type Source interface {
Sequences() <-chan Sequence
}
// N = AAA*1000000 + BBB*1000 + CCC
func seqPath(seq int) string {
c := seq % 1000
b := seq / 1000 % 1000
a := seq / 1000000
return fmt.Sprintf("%03d/%03d/%03d", a, b, c)
}
var _ Source = &downloader{}
type downloader struct {
baseUrl string
dest string
fileExt string
stateExt string
lastSequence int
stateTime func(string) (time.Time, error)
interval time.Duration
errWaittime time.Duration
naWaittime time.Duration
sequences chan Sequence
client *http.Client
}
func newDownloader(dest, url string, seq int, interval time.Duration) *downloader {
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 1 * time.Second, // do not keep alive till next interval
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
dl := &downloader{
baseUrl: url,
dest: dest,
lastSequence: seq,
interval: interval,
errWaittime: 60 * time.Second,
naWaittime: 10 * time.Second,
sequences: make(chan Sequence, 1),
client: client,
}
return dl
}
func (d *downloader) Sequences() <-chan Sequence {
return d.sequences
}
func (d *downloader) download(seq int, ext string) error {
dest := path.Join(d.dest, seqPath(seq)+ext)
url := d.baseUrl + seqPath(seq) + ext
if _, err := os.Stat(dest); err == nil {
return nil
}
if err := os.MkdirAll(path.Dir(dest), 0755); err != nil {
return err
}
tmpDest := fmt.Sprintf("%s~%d", dest, os.Getpid())
out, err := os.Create(tmpDest)
if err != nil {
return err
}
defer out.Close()
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return err
}
req.Header.Set("User-Agent", "Imposm3 "+imposm3.Version)
resp, err := d.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return NotAvailable
}
if resp.StatusCode != 200 {
return errors.New(fmt.Sprintf("invalid repsonse: %v", resp))
}
_, err = io.Copy(out, resp.Body)
if err != nil {
return err
}
out.Close()
err = os.Rename(tmpDest, dest)
if err != nil {
return err
}
return nil
}
func (d *downloader) downloadTillSuccess(seq int, ext string) {
for {
err := d.download(seq, ext)
if err == nil {
break
}
if err == NotAvailable {
time.Sleep(d.naWaittime)
} else {
log.Warn(err)
time.Sleep(d.errWaittime)
}
}
}
func (d *downloader) fetchNextLoop() {
stateFile := path.Join(d.dest, seqPath(d.lastSequence)+d.stateExt)
lastTime, err := d.stateTime(stateFile)
for {
if err == nil {
nextDiffTime := lastTime.Add(d.interval)
if nextDiffTime.After(time.Now()) {
// we catched up and the next diff file is in the future.
// wait till last diff time + interval, before fetching next
nextDiffTime = lastTime.Add(d.interval + 2*time.Second /* allow small time diff between server*/)
waitFor := nextDiffTime.Sub(time.Now())
time.Sleep(waitFor)
}
}
nextSeq := d.lastSequence + 1
// download will retry until they succeed
d.downloadTillSuccess(nextSeq, d.stateExt)
d.downloadTillSuccess(nextSeq, d.fileExt)
d.lastSequence = nextSeq
base := path.Join(d.dest, seqPath(d.lastSequence))
lastTime, _ = d.stateTime(base + d.stateExt)
d.sequences <- Sequence{
Sequence: d.lastSequence,
Filename: base + d.fileExt,
StateFilename: base + d.stateExt,
Time: lastTime,
}
}
}

View File

@ -0,0 +1,15 @@
package replication
import "testing"
func TestSeqPath(t *testing.T) {
if path := seqPath(0); path != "000/000/000" {
t.Fatal(path)
}
if path := seqPath(3069); path != "000/003/069" {
t.Fatal(path)
}
if path := seqPath(123456789); path != "123/456/789" {
t.Fatal(path)
}
}

View File

@ -1,288 +0,0 @@
package download
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"path"
"time"
"github.com/omniscale/imposm3"
"github.com/omniscale/imposm3/logging"
"github.com/omniscale/imposm3/update/state"
)
var log = logging.NewLogger("downloader")
// N = AAA*1000000 + BBB*1000 + CCC
func diffPath(sequenceNumber seqId) string {
c := sequenceNumber % 1000
b := sequenceNumber / 1000 % 1000
a := sequenceNumber / 1000000
return fmt.Sprintf("%03d/%03d/%03d", a, b, c)
}
type seqId int32
type Diff struct {
FileName string
State *state.DiffState
}
type diffDownload struct {
url string
dest string
lastSequence seqId
diffInterval time.Duration
errWaittime time.Duration
naWaittime time.Duration
NextDiff chan Diff
client *http.Client
}
type NotAvailable struct {
Url string
Sequence seqId
}
func (na NotAvailable) Error() string {
return fmt.Sprintf("OSC #%d not available at %s", na.Sequence, na.Url)
}
func NewDiffDownload(dest, url string, interval time.Duration) (*diffDownload, error) {
s, err := state.ParseLastState(dest)
if err != nil {
return nil, err
}
if url == "" {
url = s.Url
}
if url == "" {
return nil, errors.New("no replicationUrl in last.state.txt " +
"or replication_url in -config file")
}
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 1 * time.Second, // do not keep alive till next interval
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
downloader := &diffDownload{
url: url,
dest: dest,
lastSequence: seqId(s.Sequence),
diffInterval: interval,
errWaittime: 60 * time.Second,
naWaittime: 10 * time.Second,
NextDiff: make(chan Diff, 1),
client: client,
}
go downloader.fetchNextLoop()
return downloader, nil
}
func (d *diffDownload) oscFileName(sequence seqId) string {
return path.Join(d.dest, diffPath(sequence)) + ".osc.gz"
}
func (d *diffDownload) oscStateFileName(sequence seqId) string {
return path.Join(d.dest, diffPath(sequence)) + ".state.txt"
}
func (d *diffDownload) downloadDiff(sequence seqId) error {
dest := d.oscFileName(sequence)
if _, err := os.Stat(dest); err == nil {
return nil
}
err := os.MkdirAll(path.Dir(dest), 0755)
if err != nil {
return err
}
tmpDest := fmt.Sprintf("%s~%d", dest, os.Getpid())
out, err := os.Create(tmpDest)
if err != nil {
return err
}
defer out.Close()
req, err := http.NewRequest("GET", d.url+diffPath(sequence)+".osc.gz", nil)
if err != nil {
return err
}
req.Header.Set("User-Agent", "Imposm3 "+imposm3.Version)
resp, err := d.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return NotAvailable{d.url, sequence}
}
if resp.StatusCode != 200 {
return errors.New(fmt.Sprintf("invalid repsonse: %v", resp))
}
_, err = io.Copy(out, resp.Body)
if err != nil {
return err
}
out.Close()
err = os.Rename(tmpDest, dest)
if err != nil {
return err
}
return nil
}
func (d *diffDownload) downloadState(sequence seqId) (*state.DiffState, error) {
dest := path.Join(d.dest, diffPath(sequence)) + ".state.txt"
if _, err := os.Stat(dest); err == nil {
return state.ParseFile(dest)
}
err := os.MkdirAll(path.Dir(dest), 0755)
if err != nil {
return nil, err
}
url := d.url + diffPath(sequence) + ".state.txt"
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", "Imposm3 "+imposm3.Version)
resp, err := d.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return nil, NotAvailable{d.url, sequence}
}
if resp.StatusCode != 200 {
return nil, errors.New(fmt.Sprintf("invalid repsonse from %s: %v", url, resp))
}
buf := &bytes.Buffer{}
_, err = io.Copy(buf, resp.Body)
if err != nil {
return nil, err
}
err = ioutil.WriteFile(dest, buf.Bytes(), 0644)
if err != nil {
return nil, err
}
reader := bytes.NewReader(buf.Bytes())
return state.Parse(reader)
}
func (d *diffDownload) fetchNextLoop() {
for {
stateFile := path.Join(d.dest, diffPath(d.lastSequence)) + ".state.txt"
s, err := state.ParseFile(stateFile)
if err == nil {
nextDiffTime := s.Time.Add(d.diffInterval)
if nextDiffTime.After(time.Now()) {
// we catched up and the next diff file is in the future.
// wait till last diff time + interval, before fetching next
nextDiffTime = s.Time.Add(d.diffInterval + 2*time.Second /* allow small time diff between server*/)
waitFor := nextDiffTime.Sub(time.Now())
time.Sleep(waitFor)
}
}
nextSeq := d.lastSequence + 1
// downloadXxxTillSuccess will retry until they succeed
d.downloadStateTillSuccess(nextSeq)
d.downloadDiffTillSuccess(nextSeq)
d.lastSequence = nextSeq
state, _ := state.ParseFile(d.oscStateFileName(nextSeq))
d.NextDiff <- Diff{FileName: d.oscFileName(nextSeq), State: state}
}
}
func (d *diffDownload) downloadStateTillSuccess(seq seqId) {
for {
_, err := d.downloadState(seq)
if err == nil {
break
}
if _, ok := err.(NotAvailable); ok {
time.Sleep(d.naWaittime)
} else {
log.Warn(err)
time.Sleep(d.errWaittime)
}
}
}
func (d *diffDownload) downloadDiffTillSuccess(seq seqId) {
for {
err := d.downloadDiff(seq)
if err == nil {
break
}
if _, ok := err.(NotAvailable); ok {
time.Sleep(d.naWaittime)
} else {
log.Warn(err)
time.Sleep(d.errWaittime)
}
}
}
func (d *diffDownload) currentState() (*state.DiffState, error) {
resp, err := http.Get(d.url + "state.txt")
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, errors.New(fmt.Sprintf("invalid repsonse: %v", resp))
}
defer resp.Body.Close()
return state.Parse(resp.Body)
}
func (d *diffDownload) DownloadSince(since time.Time) error {
state, err := d.currentState()
if err != nil {
return err
}
for since.Before(state.Time) {
state, err = d.downloadState(seqId(state.Sequence - 1))
fmt.Println(state)
if err != nil {
return err
}
}
return nil
}

View File

@ -1,15 +0,0 @@
package download
import "testing"
func TestDiffPath(t *testing.T) {
if path := diffPath(0); path != "000/000/000" {
t.Fatal(path)
}
if path := diffPath(3069); path != "000/003/069" {
t.Fatal(path)
}
if path := diffPath(123456789); path != "123/456/789" {
t.Fatal(path)
}
}

View File

@ -12,7 +12,8 @@ import (
"github.com/omniscale/imposm3/expire"
"github.com/omniscale/imposm3/geom/limit"
"github.com/omniscale/imposm3/logging"
"github.com/omniscale/imposm3/update/download"
"github.com/omniscale/imposm3/replication"
"github.com/omniscale/imposm3/update/state"
)
var logger = logging.NewLogger("")
@ -36,11 +37,27 @@ func Run() {
}
logger.StopStep(step)
}
downloader, err := download.NewDiffDownload(config.BaseOptions.DiffDir,
config.BaseOptions.ReplicationUrl, config.BaseOptions.ReplicationInterval)
s, err := state.ParseLastState(config.BaseOptions.DiffDir)
if err != nil {
logger.Fatal("unable to start diff downloader", err)
log.Fatal("unable to read last.state.txt", err)
}
replicationUrl := config.BaseOptions.ReplicationUrl
if replicationUrl == "" {
replicationUrl = s.Url
}
if replicationUrl == "" {
log.Fatal("no replicationUrl in last.state.txt " +
"or replication_url in -config file")
}
downloader := replication.NewDiffDownloader(
config.BaseOptions.DiffDir,
replicationUrl,
s.Sequence,
config.BaseOptions.ReplicationInterval,
)
nextSeq := downloader.Sequences()
osmCache := cache.NewOSMCache(config.BaseOptions.CacheDir)
err = osmCache.Open()
@ -87,11 +104,12 @@ func Run() {
select {
case <-sigc:
shutdown()
case nextDiff := <-downloader.NextDiff:
fname := nextDiff.FileName
state := nextDiff.State
case seq := <-nextSeq:
fname := seq.Filename
seqId := seq.Sequence
seqTime := seq.Time
for {
p := logger.StartStep(fmt.Sprintf("importing #%d till %s", state.Sequence, state.Time))
p := logger.StartStep(fmt.Sprintf("importing #%d till %s", seqId, seqTime))
err := Update(fname, geometryLimiter, tileExpireor, osmCache, diffCache, false)

View File

@ -20,7 +20,7 @@ var log = logging.NewLogger("diff")
type DiffState struct {
Time time.Time
Sequence int32
Sequence int
Url string
}
@ -100,7 +100,7 @@ func FromPbf(filename string, before time.Duration) (*DiffState, error) {
}
// start earlier
seq -= int32(before.Minutes())
seq -= int(before.Minutes())
return &DiffState{Time: timestamp, Url: replicationUrl, Sequence: seq}, nil
}
@ -171,13 +171,13 @@ func parseTimeStamp(value string) (time.Time, error) {
return time.Parse(timestampFormat, value)
}
func parseSequence(value string) (int32, error) {
func parseSequence(value string) (int, error) {
if value == "" {
log.Warn("missing sequenceNumber in state file")
return 0, nil
}
val, err := strconv.ParseInt(value, 10, 32)
return int32(val), err
return int(val), err
}
func currentState(url string) (*DiffState, error) {
@ -192,7 +192,7 @@ func currentState(url string) (*DiffState, error) {
return Parse(resp.Body)
}
func estimateSequence(url string, timestamp time.Time) int32 {
func estimateSequence(url string, timestamp time.Time) int {
state, err := currentState(url)
if err != nil {
// try a second time befor failing
@ -206,5 +206,5 @@ func estimateSequence(url string, timestamp time.Time) int32 {
}
behind := state.Time.Sub(timestamp)
return state.Sequence - int32(behind.Minutes())
return state.Sequence - int(behind.Minutes())
}