imposm3/update/state/state.go

211 lines
4.6 KiB
Go
Raw Normal View History

2013-07-19 11:35:21 +04:00
package state
2013-07-16 16:03:42 +04:00
import (
"bufio"
"errors"
"fmt"
2013-07-17 11:10:43 +04:00
"io"
"net/http"
2013-07-16 16:03:42 +04:00
"os"
"path"
"strconv"
"strings"
"time"
"github.com/omniscale/imposm3/logging"
"github.com/omniscale/imposm3/parser/pbf"
2013-07-16 16:03:42 +04:00
)
var log = logging.NewLogger("diff")
type DiffState struct {
Time time.Time
Sequence int
2013-07-22 18:04:32 +04:00
Url string
2013-07-16 16:03:42 +04:00
}
func (d DiffState) String() string {
return fmt.Sprintf("Diff #%d from %s", d.Sequence, d.Time.Local())
}
2016-12-07 16:20:21 +03:00
func (d DiffState) Write(w io.Writer) error {
2013-07-16 16:03:42 +04:00
lines := []string{}
lines = append(lines, "timestamp="+d.Time.Format(timestampFormat))
if d.Sequence != 0 {
lines = append(lines, "sequenceNumber="+fmt.Sprintf("%d", d.Sequence))
}
2013-07-22 18:04:32 +04:00
lines = append(lines, "replicationUrl="+d.Url)
2013-07-16 16:03:42 +04:00
for _, line := range lines {
2016-12-07 16:20:21 +03:00
_, err := w.Write([]byte(line + "\n"))
2013-07-16 16:03:42 +04:00
if err != nil {
return err
}
}
2016-12-07 16:20:21 +03:00
return nil
2013-07-16 16:03:42 +04:00
}
func WriteLastState(cacheDir string, state *DiffState) error {
stateFile := path.Join(cacheDir, "last.state.txt")
2016-12-07 16:20:21 +03:00
f, err := os.Create(stateFile)
if err != nil {
return err
}
defer f.Close()
return state.Write(f)
2013-07-16 16:03:42 +04:00
}
2016-12-06 16:47:34 +03:00
func FromOscGz(oscFile string) (*DiffState, error) {
2013-07-16 16:03:42 +04:00
var stateFile string
if !strings.HasSuffix(oscFile, ".osc.gz") {
log.Warn("cannot read state file for non .osc.gz files")
return nil, nil
}
stateFile = oscFile[:len(oscFile)-len(".osc.gz")] + ".state.txt"
if _, err := os.Stat(stateFile); os.IsNotExist(err) {
log.Warn("cannot find state file ", stateFile)
return nil, nil
}
2013-07-17 11:10:43 +04:00
f, err := os.Open(stateFile)
if err != nil {
return nil, err
}
defer f.Close()
2013-07-19 11:35:21 +04:00
return ParseFile(stateFile)
2013-07-16 16:03:42 +04:00
}
2016-12-06 16:47:34 +03:00
func FromPbf(filename string, before time.Duration) (*DiffState, error) {
pbfFile, err := pbf.NewParser(filename)
if err != nil {
return nil, err
}
var timestamp time.Time
2016-12-06 16:47:34 +03:00
if pbfFile.Header().Time.Unix() != 0 {
timestamp = pbfFile.Header().Time
} else {
2016-12-06 16:47:34 +03:00
fstat, err := os.Stat(filename)
if err != nil {
2016-12-06 16:47:34 +03:00
return nil, err
}
timestamp = fstat.ModTime()
}
replicationUrl := "https://planet.openstreetmap.org/replication/minute/"
seq := estimateSequence(replicationUrl, timestamp)
if seq == 0 {
2016-12-06 16:47:34 +03:00
return nil, nil
}
// start earlier
seq -= int(before.Minutes())
2016-12-06 16:47:34 +03:00
return &DiffState{Time: timestamp, Url: replicationUrl, Sequence: seq}, nil
}
2013-07-19 11:35:21 +04:00
func ParseFile(stateFile string) (*DiffState, error) {
2013-07-17 11:10:43 +04:00
f, err := os.Open(stateFile)
if err != nil {
return nil, err
}
defer f.Close()
2013-07-19 11:35:21 +04:00
return Parse(f)
2013-07-17 11:10:43 +04:00
}
2013-07-19 11:35:21 +04:00
func Parse(f io.Reader) (*DiffState, error) {
2013-07-17 11:10:43 +04:00
values, err := parseSimpleIni(f)
2013-07-16 16:03:42 +04:00
if err != nil {
return nil, err
}
timestamp, err := parseTimeStamp(values["timestamp"])
if err != nil {
return nil, err
}
sequence, err := parseSequence(values["sequenceNumber"])
if err != nil {
return nil, err
}
2013-07-22 18:04:32 +04:00
url := values["replicationUrl"]
return &DiffState{timestamp, sequence, url}, nil
2013-07-16 16:03:42 +04:00
}
func ParseLastState(cacheDir string) (*DiffState, error) {
stateFile := path.Join(cacheDir, "last.state.txt")
if _, err := os.Stat(stateFile); os.IsNotExist(err) {
return nil, err
2013-07-16 16:03:42 +04:00
}
2013-07-19 11:35:21 +04:00
return ParseFile(stateFile)
2013-07-16 16:03:42 +04:00
}
2013-07-17 11:10:43 +04:00
func parseSimpleIni(f io.Reader) (map[string]string, error) {
2013-07-16 16:03:42 +04:00
result := make(map[string]string)
reader := bufio.NewScanner(f)
for reader.Scan() {
line := reader.Text()
if line != "" && line[0] == '#' {
continue
}
if strings.Contains(line, "=") {
keyVal := strings.SplitN(line, "=", 2)
result[strings.TrimSpace(keyVal[0])] = strings.TrimSpace(keyVal[1])
}
}
if err := reader.Err(); err != nil {
return nil, err
}
return result, nil
}
const timestampFormat = "2006-01-02T15\\:04\\:05Z"
2013-07-16 16:03:42 +04:00
func parseTimeStamp(value string) (time.Time, error) {
if value == "" {
return time.Time{}, errors.New("missing timestamp in state")
}
return time.Parse(timestampFormat, value)
2013-07-16 16:03:42 +04:00
}
func parseSequence(value string) (int, error) {
2013-07-16 16:03:42 +04:00
if value == "" {
log.Warn("missing sequenceNumber in state file")
return 0, nil
2013-07-16 16:03:42 +04:00
}
val, err := strconv.ParseInt(value, 10, 32)
return int(val), err
2013-07-16 16:03:42 +04:00
}
func currentState(url string) (*DiffState, error) {
resp, err := http.Get(url + "state.txt")
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, errors.New(fmt.Sprintf("invalid repsonse: %v", resp))
}
defer resp.Body.Close()
return Parse(resp.Body)
}
func estimateSequence(url string, timestamp time.Time) int {
state, err := currentState(url)
if err != nil {
// try a second time befor failing
log.Warn("unable to fetch current state from ", url, ":", err, ", retry in 30s")
time.Sleep(time.Second * 30)
state, err = currentState(url)
if err != nil {
log.Warn("unable to fetch current state from ", url, ":", err, ", giving up")
return 0
}
}
behind := state.Time.Sub(timestamp)
return state.Sequence - int(behind.Minutes())
}