2016-12-07 18:43:12 +03:00
|
|
|
package replication
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"net"
|
|
|
|
"net/http"
|
|
|
|
"os"
|
|
|
|
"path"
|
2017-02-13 12:37:16 +03:00
|
|
|
"path/filepath"
|
2016-12-07 18:43:12 +03:00
|
|
|
"time"
|
|
|
|
|
2017-02-10 13:23:20 +03:00
|
|
|
"gopkg.in/fsnotify.v1"
|
|
|
|
|
2016-12-07 18:43:12 +03:00
|
|
|
"github.com/omniscale/imposm3"
|
|
|
|
"github.com/omniscale/imposm3/logging"
|
|
|
|
)
|
|
|
|
|
|
|
|
var log = logging.NewLogger("replication")
|
|
|
|
|
2018-01-30 16:00:43 +03:00
|
|
|
type NotAvailable struct {
|
2018-04-17 22:06:55 +03:00
|
|
|
url string
|
2018-01-30 16:00:43 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (e *NotAvailable) Error() string {
|
2018-04-17 22:06:55 +03:00
|
|
|
return fmt.Sprintf("File not available: %s", e.url)
|
2018-01-30 16:00:43 +03:00
|
|
|
}
|
2016-12-07 18:43:12 +03:00
|
|
|
|
|
|
|
type Sequence struct {
|
|
|
|
Filename string
|
|
|
|
StateFilename string
|
|
|
|
Time time.Time
|
|
|
|
Sequence int
|
|
|
|
}
|
|
|
|
|
|
|
|
type Source interface {
|
|
|
|
Sequences() <-chan Sequence
|
|
|
|
}
|
|
|
|
|
|
|
|
// N = AAA*1000000 + BBB*1000 + CCC
|
|
|
|
func seqPath(seq int) string {
|
|
|
|
c := seq % 1000
|
|
|
|
b := seq / 1000 % 1000
|
|
|
|
a := seq / 1000000
|
|
|
|
|
|
|
|
return fmt.Sprintf("%03d/%03d/%03d", a, b, c)
|
|
|
|
}
|
|
|
|
|
|
|
|
var _ Source = &downloader{}
|
|
|
|
|
|
|
|
type downloader struct {
|
|
|
|
baseUrl string
|
|
|
|
dest string
|
|
|
|
fileExt string
|
|
|
|
stateExt string
|
|
|
|
lastSequence int
|
|
|
|
stateTime func(string) (time.Time, error)
|
|
|
|
interval time.Duration
|
|
|
|
errWaittime time.Duration
|
|
|
|
naWaittime time.Duration
|
|
|
|
sequences chan Sequence
|
|
|
|
client *http.Client
|
|
|
|
}
|
|
|
|
|
|
|
|
func newDownloader(dest, url string, seq int, interval time.Duration) *downloader {
|
|
|
|
client := &http.Client{
|
|
|
|
Transport: &http.Transport{
|
|
|
|
Proxy: http.ProxyFromEnvironment,
|
|
|
|
Dial: (&net.Dialer{
|
|
|
|
Timeout: 30 * time.Second,
|
|
|
|
KeepAlive: 1 * time.Second, // do not keep alive till next interval
|
|
|
|
}).Dial,
|
|
|
|
TLSHandshakeTimeout: 10 * time.Second,
|
|
|
|
ResponseHeaderTimeout: 10 * time.Second,
|
|
|
|
ExpectContinueTimeout: 1 * time.Second,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2018-01-26 11:35:31 +03:00
|
|
|
var naWaittime time.Duration
|
|
|
|
switch interval {
|
|
|
|
case 24 * time.Hour:
|
|
|
|
naWaittime = 5 * time.Minute
|
|
|
|
case time.Hour:
|
|
|
|
naWaittime = 60 * time.Second
|
|
|
|
default:
|
|
|
|
naWaittime = 10 * time.Second
|
|
|
|
}
|
|
|
|
|
2016-12-07 18:43:12 +03:00
|
|
|
dl := &downloader{
|
|
|
|
baseUrl: url,
|
|
|
|
dest: dest,
|
|
|
|
lastSequence: seq,
|
|
|
|
interval: interval,
|
|
|
|
errWaittime: 60 * time.Second,
|
2018-01-26 11:35:31 +03:00
|
|
|
naWaittime: naWaittime,
|
2016-12-07 18:43:12 +03:00
|
|
|
sequences: make(chan Sequence, 1),
|
|
|
|
client: client,
|
|
|
|
}
|
|
|
|
|
|
|
|
return dl
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *downloader) Sequences() <-chan Sequence {
|
|
|
|
return d.sequences
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *downloader) download(seq int, ext string) error {
|
|
|
|
dest := path.Join(d.dest, seqPath(seq)+ext)
|
|
|
|
url := d.baseUrl + seqPath(seq) + ext
|
2018-01-22 21:44:56 +03:00
|
|
|
log.Print("Downloading diff file from ", url)
|
2016-12-07 18:43:12 +03:00
|
|
|
|
|
|
|
if _, err := os.Stat(dest); err == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := os.MkdirAll(path.Dir(dest), 0755); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
tmpDest := fmt.Sprintf("%s~%d", dest, os.Getpid())
|
|
|
|
out, err := os.Create(tmpDest)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer out.Close()
|
|
|
|
|
|
|
|
req, err := http.NewRequest("GET", url, nil)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-04-17 22:06:55 +03:00
|
|
|
req.Header.Set("User-Agent", "Imposm "+imposm3.Version)
|
2016-12-07 18:43:12 +03:00
|
|
|
resp, err := d.client.Do(req)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
defer resp.Body.Close()
|
|
|
|
|
|
|
|
if resp.StatusCode == 404 {
|
2018-01-30 16:00:43 +03:00
|
|
|
return &NotAvailable{url}
|
2016-12-07 18:43:12 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
if resp.StatusCode != 200 {
|
2018-01-22 21:44:56 +03:00
|
|
|
return errors.New(fmt.Sprintf("invalid response: %v", resp))
|
2016-12-07 18:43:12 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
_, err = io.Copy(out, resp.Body)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
out.Close()
|
|
|
|
|
|
|
|
err = os.Rename(tmpDest, dest)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *downloader) downloadTillSuccess(seq int, ext string) {
|
|
|
|
for {
|
|
|
|
err := d.download(seq, ext)
|
|
|
|
if err == nil {
|
|
|
|
break
|
|
|
|
}
|
2018-01-30 16:00:43 +03:00
|
|
|
if err, ok := err.(*NotAvailable); ok {
|
|
|
|
log.Print(err)
|
2016-12-07 18:43:12 +03:00
|
|
|
time.Sleep(d.naWaittime)
|
|
|
|
} else {
|
|
|
|
log.Warn(err)
|
|
|
|
time.Sleep(d.errWaittime)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *downloader) fetchNextLoop() {
|
|
|
|
stateFile := path.Join(d.dest, seqPath(d.lastSequence)+d.stateExt)
|
|
|
|
lastTime, err := d.stateTime(stateFile)
|
|
|
|
for {
|
2018-01-22 21:44:56 +03:00
|
|
|
nextSeq := d.lastSequence + 1
|
2018-01-26 13:30:28 +03:00
|
|
|
log.Print("Processing sequence ", nextSeq)
|
2016-12-07 18:43:12 +03:00
|
|
|
if err == nil {
|
|
|
|
nextDiffTime := lastTime.Add(d.interval)
|
|
|
|
if nextDiffTime.After(time.Now()) {
|
|
|
|
// we catched up and the next diff file is in the future.
|
|
|
|
// wait till last diff time + interval, before fetching next
|
2018-01-22 21:44:56 +03:00
|
|
|
nextDiffTime = lastTime.Add(d.interval + 2*time.Second /* allow small time diff between servers */)
|
2016-12-07 18:43:12 +03:00
|
|
|
waitFor := nextDiffTime.Sub(time.Now())
|
2018-01-22 21:44:56 +03:00
|
|
|
log.Print("Next process in ", waitFor)
|
2016-12-07 18:43:12 +03:00
|
|
|
time.Sleep(waitFor)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// download will retry until they succeed
|
|
|
|
d.downloadTillSuccess(nextSeq, d.stateExt)
|
|
|
|
d.downloadTillSuccess(nextSeq, d.fileExt)
|
|
|
|
d.lastSequence = nextSeq
|
|
|
|
base := path.Join(d.dest, seqPath(d.lastSequence))
|
|
|
|
lastTime, _ = d.stateTime(base + d.stateExt)
|
|
|
|
d.sequences <- Sequence{
|
|
|
|
Sequence: d.lastSequence,
|
|
|
|
Filename: base + d.fileExt,
|
|
|
|
StateFilename: base + d.stateExt,
|
|
|
|
Time: lastTime,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2016-12-08 13:51:31 +03:00
|
|
|
|
|
|
|
var _ Source = &reader{}
|
|
|
|
|
|
|
|
type reader struct {
|
|
|
|
dest string
|
|
|
|
fileExt string
|
|
|
|
stateExt string
|
|
|
|
lastSequence int
|
|
|
|
stateTime func(string) (time.Time, error)
|
|
|
|
sequences chan Sequence
|
|
|
|
}
|
|
|
|
|
|
|
|
func newReader(dest string, seq int) *reader {
|
|
|
|
r := &reader{
|
|
|
|
dest: dest,
|
|
|
|
lastSequence: seq,
|
|
|
|
sequences: make(chan Sequence, 1),
|
|
|
|
}
|
|
|
|
|
|
|
|
return r
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *reader) Sequences() <-chan Sequence {
|
|
|
|
return d.sequences
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *reader) waitTillPresent(seq int, ext string) error {
|
|
|
|
filename := path.Join(d.dest, seqPath(seq)+ext)
|
2017-02-13 12:37:16 +03:00
|
|
|
return waitTillPresent(filename)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *reader) fetchNextLoop() {
|
|
|
|
for {
|
|
|
|
nextSeq := d.lastSequence + 1
|
|
|
|
if err := d.waitTillPresent(nextSeq, d.stateExt); err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
}
|
|
|
|
if err := d.waitTillPresent(nextSeq, d.fileExt); err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
}
|
|
|
|
d.lastSequence = nextSeq
|
|
|
|
base := path.Join(d.dest, seqPath(d.lastSequence))
|
|
|
|
lastTime, _ := d.stateTime(base + d.stateExt)
|
|
|
|
d.sequences <- Sequence{
|
|
|
|
Sequence: d.lastSequence,
|
|
|
|
Filename: base + d.fileExt,
|
|
|
|
StateFilename: base + d.stateExt,
|
|
|
|
Time: lastTime,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// waitTillPresent blocks till file is present.
|
|
|
|
func waitTillPresent(filename string) error {
|
2016-12-08 13:51:31 +03:00
|
|
|
if _, err := os.Stat(filename); err == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-02-13 12:37:16 +03:00
|
|
|
// fsnotify does not work recursive. wait for parent dirs first (e.g. 002/134)
|
|
|
|
parent := filepath.Dir(filename)
|
|
|
|
if err := waitTillPresent(parent); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-12-08 13:51:31 +03:00
|
|
|
w, err := fsnotify.NewWatcher()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer w.Close()
|
2017-02-13 12:37:16 +03:00
|
|
|
// need to watch on parent if we want to get events for new file
|
|
|
|
w.Add(parent)
|
2016-12-08 13:51:31 +03:00
|
|
|
|
|
|
|
// check again, in case file was created before we added the file
|
|
|
|
if _, err := os.Stat(filename); err == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case evt := <-w.Events:
|
2017-02-10 13:23:20 +03:00
|
|
|
if evt.Op&fsnotify.Create == fsnotify.Create && evt.Name == filename {
|
2016-12-08 13:51:31 +03:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|