add imposm3 run command

master
Oliver Tonnhofer 2016-11-25 13:53:58 +01:00
parent c90badce51
commit 2abbaa3248
8 changed files with 561 additions and 38 deletions

View File

@ -18,14 +18,14 @@ BUILD_VERSION=dev-$(BUILD_DATE)-$(BUILD_REV)
all: build test
update_version:
@perl -p -i -e 's/buildVersion = ".*"/buildVersion = "$(BUILD_VERSION)"/' cmd/version.go
@perl -p -i -e 's/buildVersion = ".*"/buildVersion = "$(BUILD_VERSION)"/' version.go
revert_version:
@perl -p -i -e 's/buildVersion = ".*"/buildVersion = ""/' cmd/version.go
@perl -p -i -e 's/buildVersion = ".*"/buildVersion = ""/' version.go
imposm3: $(PBGOFILES) $(GOFILES)
$(MAKE) update_version
$(GO) build $(GOLDFLAGS)
$(GO) build $(GOLDFLAGS) ./cmd/imposm3
$(MAKE) revert_version
build: imposm3

View File

@ -1,4 +1,4 @@
package cmd
package main
import (
"fmt"
@ -6,6 +6,7 @@ import (
"os"
"runtime"
"github.com/omniscale/imposm3"
"github.com/omniscale/imposm3/cache/query"
"github.com/omniscale/imposm3/config"
"github.com/omniscale/imposm3/diff"
@ -21,6 +22,7 @@ func PrintCmds() {
fmt.Println("Available commands:")
fmt.Println("\timport")
fmt.Println("\tdiff")
fmt.Println("\trun")
fmt.Println("\tquery-cache")
fmt.Println("\tversion")
}
@ -51,11 +53,17 @@ func Main(usage func()) {
stats.StartHttpPProf(config.BaseOptions.Httpprofile)
}
diff.Diff()
case "run":
config.ParseRunImport(os.Args[2:])
if config.BaseOptions.Httpprofile != "" {
stats.StartHttpPProf(config.BaseOptions.Httpprofile)
}
diff.Run()
case "query-cache":
query.Query(os.Args[2:])
case "version":
fmt.Println(Version)
fmt.Println(imposm3.Version)
os.Exit(0)
default:
usage()
@ -65,3 +73,7 @@ func Main(usage func()) {
os.Exit(0)
}
func main() {
Main(PrintCmds)
}

View File

@ -21,6 +21,8 @@ type Config struct {
Schemas Schemas `json:"schemas"`
ExpireTilesDir string `json:"expiretiles_dir"`
ExpireTilesZoom int `json:"expiretiles_zoom"`
ReplicationUrl string `json:"replication_url"`
ReplicationInterval MinutesInterval `json:"replication_interval"`
}
type Schemas struct {
@ -37,6 +39,7 @@ const defaultSchemaBackup = "backup"
var ImportFlags = flag.NewFlagSet("import", flag.ExitOnError)
var DiffFlags = flag.NewFlagSet("diff", flag.ExitOnError)
var RunFlags = flag.NewFlagSet("run", flag.ExitOnError)
type _BaseOptions struct {
Connection string
@ -52,6 +55,8 @@ type _BaseOptions struct {
Schemas Schemas
ExpireTilesDir string
ExpireTilesZoom int
ReplicationUrl string
ReplicationInterval time.Duration
}
func (o *_BaseOptions) updateFromConfig() error {
@ -108,12 +113,24 @@ func (o *_BaseOptions) updateFromConfig() error {
if o.CacheDir == defaultCacheDir {
o.CacheDir = conf.CacheDir
}
if o.ExpireTilesDir == "" {
o.ExpireTilesDir = conf.ExpireTilesDir
}
if o.ExpireTilesZoom == 0 {
o.ExpireTilesZoom = conf.ExpireTilesZoom
}
if o.ExpireTilesZoom < 6 || o.ExpireTilesZoom > 18 {
o.ExpireTilesZoom = 14
}
if conf.ReplicationInterval.Duration != 0 && o.ReplicationInterval != time.Minute {
o.ReplicationInterval = conf.ReplicationInterval.Duration
}
if o.ReplicationInterval < time.Minute {
o.ReplicationInterval = time.Minute
}
o.ReplicationUrl = conf.ReplicationUrl
if o.DiffDir == "" {
if conf.DiffDir == "" {
@ -181,12 +198,20 @@ func UsageDiff() {
os.Exit(2)
}
func UsageRun() {
fmt.Fprintf(os.Stderr, "Usage: %s %s [args] [.osc.gz, ...]\n\n", os.Args[0], os.Args[1])
DiffFlags.PrintDefaults()
os.Exit(2)
}
func init() {
ImportFlags.Usage = UsageImport
DiffFlags.Usage = UsageDiff
RunFlags.Usage = UsageRun
addBaseFlags(DiffFlags)
addBaseFlags(ImportFlags)
addBaseFlags(RunFlags)
ImportFlags.BoolVar(&ImportOptions.Overwritecache, "overwritecache", false, "overwritecache")
ImportFlags.BoolVar(&ImportOptions.Appendcache, "appendcache", false, "append cache")
ImportFlags.StringVar(&ImportOptions.Read, "read", "", "read")
@ -200,6 +225,10 @@ func init() {
DiffFlags.StringVar(&BaseOptions.ExpireTilesDir, "expiretiles-dir", "", "write expire tiles into dir")
DiffFlags.IntVar(&BaseOptions.ExpireTilesZoom, "expiretiles-zoom", 14, "write expire tiles in this zoom level")
RunFlags.StringVar(&BaseOptions.ExpireTilesDir, "expiretiles-dir", "", "write expire tiles into dir")
RunFlags.IntVar(&BaseOptions.ExpireTilesZoom, "expiretiles-zoom", 14, "write expire tiles in this zoom level")
RunFlags.DurationVar(&BaseOptions.ReplicationInterval, "replication-interval", time.Minute, "replication interval as duration (1m, 1h, 24h)")
}
func ParseImport(args []string) {
@ -242,6 +271,27 @@ func ParseDiffImport(args []string) {
}
}
func ParseRunImport(args []string) {
if len(args) == 0 {
UsageRun()
}
err := RunFlags.Parse(args)
if err != nil {
log.Fatal(err)
}
err = BaseOptions.updateFromConfig()
if err != nil {
log.Fatal(err)
}
errs := BaseOptions.check()
if len(errs) != 0 {
reportErrors(errs)
UsageRun()
}
}
func reportErrors(errs []error) {
fmt.Println("errors in config/options:")
for _, err := range errs {
@ -249,3 +299,21 @@ func reportErrors(errs []error) {
}
os.Exit(1)
}
type MinutesInterval struct {
time.Duration
}
func (d *MinutesInterval) UnmarshalJSON(b []byte) (err error) {
if b[0] == '"' {
sd := string(b[1 : len(b)-1])
d.Duration, err = time.ParseDuration(sd)
return
}
var id int64
id, err = json.Number(string(b)).Int64()
d.Duration = time.Duration(id) * time.Minute
return
}

288
diff/download/download.go Normal file
View File

@ -0,0 +1,288 @@
package download
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"path"
"time"
"github.com/omniscale/imposm3"
"github.com/omniscale/imposm3/diff/state"
"github.com/omniscale/imposm3/logging"
)
var log = logging.NewLogger("downloader")
// N = AAA*1000000 + BBB*1000 + CCC
func diffPath(sequenceNumber seqId) string {
c := sequenceNumber % 1000
b := sequenceNumber / 1000 % 1000
a := sequenceNumber / 1000000
return fmt.Sprintf("%03d/%03d/%03d", a, b, c)
}
type seqId int32
type Diff struct {
FileName string
State *state.DiffState
}
type diffDownload struct {
url string
dest string
lastSequence seqId
diffInterval time.Duration
errWaittime time.Duration
naWaittime time.Duration
NextDiff chan Diff
client *http.Client
}
type NotAvailable struct {
Url string
Sequence seqId
}
func (na NotAvailable) Error() string {
return fmt.Sprintf("OSC #%d not available at %s", na.Sequence, na.Url)
}
func NewDiffDownload(dest, url string, interval time.Duration) (*diffDownload, error) {
s, err := state.ParseLastState(dest)
if err != nil {
return nil, err
}
if url == "" {
url = s.Url
}
if url == "" {
return nil, errors.New("no replicationUrl in last.state.txt " +
"or replication_url in -config file")
}
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 1 * time.Second, // do not keep alive till next interval
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
downloader := &diffDownload{
url: url,
dest: dest,
lastSequence: seqId(s.Sequence),
diffInterval: interval,
errWaittime: 60 * time.Second,
naWaittime: 10 * time.Second,
NextDiff: make(chan Diff, 1),
client: client,
}
go downloader.fetchNextLoop()
return downloader, nil
}
func (d *diffDownload) oscFileName(sequence seqId) string {
return path.Join(d.dest, diffPath(sequence)) + ".osc.gz"
}
func (d *diffDownload) oscStateFileName(sequence seqId) string {
return path.Join(d.dest, diffPath(sequence)) + ".state.txt"
}
func (d *diffDownload) downloadDiff(sequence seqId) error {
dest := d.oscFileName(sequence)
if _, err := os.Stat(dest); err == nil {
return nil
}
err := os.MkdirAll(path.Dir(dest), 0755)
if err != nil {
return err
}
tmpDest := fmt.Sprintf("%s~%d", dest, os.Getpid())
out, err := os.Create(tmpDest)
if err != nil {
return err
}
defer out.Close()
req, err := http.NewRequest("GET", d.url+diffPath(sequence)+".osc.gz", nil)
if err != nil {
return err
}
req.Header.Set("User-Agent", "Imposm3 "+imposm3.Version)
resp, err := d.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return NotAvailable{d.url, sequence}
}
if resp.StatusCode != 200 {
return errors.New(fmt.Sprintf("invalid repsonse: %v", resp))
}
_, err = io.Copy(out, resp.Body)
if err != nil {
return err
}
out.Close()
err = os.Rename(tmpDest, dest)
if err != nil {
return err
}
return nil
}
func (d *diffDownload) downloadState(sequence seqId) (*state.DiffState, error) {
dest := path.Join(d.dest, diffPath(sequence)) + ".state.txt"
if _, err := os.Stat(dest); err == nil {
return state.ParseFile(dest)
}
err := os.MkdirAll(path.Dir(dest), 0755)
if err != nil {
return nil, err
}
url := d.url + diffPath(sequence) + ".state.txt"
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", "Imposm3 "+imposm3.Version)
resp, err := d.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return nil, NotAvailable{d.url, sequence}
}
if resp.StatusCode != 200 {
return nil, errors.New(fmt.Sprintf("invalid repsonse from %s: %v", url, resp))
}
buf := &bytes.Buffer{}
_, err = io.Copy(buf, resp.Body)
if err != nil {
return nil, err
}
err = ioutil.WriteFile(dest, buf.Bytes(), 0644)
if err != nil {
return nil, err
}
reader := bytes.NewReader(buf.Bytes())
return state.Parse(reader)
}
func (d *diffDownload) fetchNextLoop() {
for {
stateFile := path.Join(d.dest, diffPath(d.lastSequence)) + ".state.txt"
s, err := state.ParseFile(stateFile)
if err == nil {
nextDiffTime := s.Time.Add(d.diffInterval)
if nextDiffTime.After(time.Now()) {
// we catched up and the next diff file is in the future.
// wait till last diff time + interval, before fetching next
nextDiffTime = s.Time.Add(d.diffInterval + 2*time.Second /* allow small time diff between server*/)
waitFor := nextDiffTime.Sub(time.Now())
time.Sleep(waitFor)
}
}
nextSeq := d.lastSequence + 1
// downloadXxxTillSuccess will retry until they succeed
d.downloadStateTillSuccess(nextSeq)
d.downloadDiffTillSuccess(nextSeq)
d.lastSequence = nextSeq
state, _ := state.ParseFile(d.oscStateFileName(nextSeq))
d.NextDiff <- Diff{FileName: d.oscFileName(nextSeq), State: state}
}
}
func (d *diffDownload) downloadStateTillSuccess(seq seqId) {
for {
_, err := d.downloadState(seq)
if err == nil {
break
}
if _, ok := err.(NotAvailable); ok {
time.Sleep(d.naWaittime)
} else {
log.Warn(err)
time.Sleep(d.errWaittime)
}
}
}
func (d *diffDownload) downloadDiffTillSuccess(seq seqId) {
for {
err := d.downloadDiff(seq)
if err == nil {
break
}
if _, ok := err.(NotAvailable); ok {
time.Sleep(d.naWaittime)
} else {
log.Warn(err)
time.Sleep(d.errWaittime)
}
}
}
func (d *diffDownload) currentState() (*state.DiffState, error) {
resp, err := http.Get(d.url + "state.txt")
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, errors.New(fmt.Sprintf("invalid repsonse: %v", resp))
}
defer resp.Body.Close()
return state.Parse(resp.Body)
}
func (d *diffDownload) DownloadSince(since time.Time) error {
state, err := d.currentState()
if err != nil {
return err
}
for since.Before(state.Time) {
state, err = d.downloadState(seqId(state.Sequence - 1))
fmt.Println(state)
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,15 @@
package download
import "testing"
func TestDiffPath(t *testing.T) {
if path := diffPath(0); path != "000/000/000" {
t.Fatal(path)
}
if path := diffPath(3069); path != "000/003/069" {
t.Fatal(path)
}
if path := diffPath(123456789); path != "123/456/789" {
t.Fatal(path)
}
}

149
diff/run.go Normal file
View File

@ -0,0 +1,149 @@
package diff
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/omniscale/imposm3/cache"
"github.com/omniscale/imposm3/config"
"github.com/omniscale/imposm3/diff/download"
"github.com/omniscale/imposm3/expire"
"github.com/omniscale/imposm3/geom/limit"
"github.com/omniscale/imposm3/logging"
)
var logger = logging.NewLogger("")
func Run() {
if config.BaseOptions.Quiet {
logging.SetQuiet(true)
}
var geometryLimiter *limit.Limiter
if config.BaseOptions.LimitTo != "" {
var err error
step := logger.StartStep("Reading limitto geometries")
geometryLimiter, err = limit.NewFromGeoJSON(
config.BaseOptions.LimitTo,
config.BaseOptions.LimitToCacheBuffer,
config.BaseOptions.Srid,
)
if err != nil {
logger.Fatal(err)
}
logger.StopStep(step)
}
downloader, err := download.NewDiffDownload(config.BaseOptions.DiffDir,
config.BaseOptions.ReplicationUrl, config.BaseOptions.ReplicationInterval)
if err != nil {
logger.Fatal("unable to start diff downloader", err)
}
osmCache := cache.NewOSMCache(config.BaseOptions.CacheDir)
err = osmCache.Open()
if err != nil {
logger.Fatal("osm cache: ", err)
}
defer osmCache.Close()
diffCache := cache.NewDiffCache(config.BaseOptions.CacheDir)
err = diffCache.Open()
if err != nil {
logger.Fatal("diff cache: ", err)
}
defer diffCache.Close()
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGTERM, syscall.SIGHUP)
shutdown := func() {
logger.Print("Exiting. (SIGTERM/SIGHUB)")
logging.Shutdown()
osmCache.Close()
diffCache.Close()
os.Exit(0)
}
var tiles *expire.TileList
var tileExpireor expire.Expireor
if config.BaseOptions.ExpireTilesDir != "" {
tiles = expire.NewTileList(config.BaseOptions.ExpireTilesZoom, config.BaseOptions.ExpireTilesDir)
tileExpireor = tiles
}
exp := newExpBackoff(2*time.Second, 5*time.Minute)
for {
select {
case <-sigc:
shutdown()
case nextDiff := <-downloader.NextDiff:
fname := nextDiff.FileName
state := nextDiff.State
for {
p := logger.StartStep(fmt.Sprintf("importing #%d till %s", state.Sequence, state.Time))
err := Update(fname, geometryLimiter, tileExpireor, osmCache, diffCache, false)
osmCache.Coords.Flush()
diffCache.Flush()
if err == nil && tiles != nil {
err := tiles.Flush()
if err != nil {
logger.Print("error writing tile expire list", err)
}
}
logger.StopStep(p)
select {
case <-sigc:
shutdown()
default:
}
if err != nil {
logger.Error(err)
logger.Print("retrying in ", exp.Duration())
exp.Wait()
} else {
exp.Reset()
break
}
}
if os.Getenv("IMPOSM3_SINGLE_DIFF") != "" {
return
}
}
}
}
type expBackoff struct {
current time.Duration
min time.Duration
max time.Duration
}
func newExpBackoff(min, max time.Duration) *expBackoff {
return &expBackoff{min, min, max}
}
func (eb *expBackoff) Duration() time.Duration {
return eb.current
}
func (eb *expBackoff) Wait() {
time.Sleep(eb.current)
eb.current = eb.current * 2
if eb.current > eb.max {
eb.current = eb.max
}
}
func (eb *expBackoff) Reset() {
eb.current = eb.min
}

View File

@ -1,9 +0,0 @@
package main
import (
"github.com/omniscale/imposm3/cmd"
)
func main() {
cmd.Main(cmd.PrintCmds)
}

View File

@ -1,4 +1,4 @@
package cmd
package imposm3
var Version string