diff --git a/replication/source.go b/replication/source.go index e821642..5ee3410 100644 --- a/replication/source.go +++ b/replication/source.go @@ -3,7 +3,6 @@ package replication import ( "errors" "fmt" - "gopkg.in/fsnotify.v1" "io" "net" "net/http" @@ -11,6 +10,8 @@ import ( "path" "time" + "gopkg.in/fsnotify.v1" + "github.com/omniscale/imposm3" "github.com/omniscale/imposm3/logging" ) @@ -221,7 +222,7 @@ func (d *reader) waitTillPresent(seq int, ext string) error { return err } defer w.Close() - w.Add(filename) + w.Add(path.Dir(filename)) // check again, in case file was created before we added the file if _, err := os.Stat(filename); err == nil { @@ -231,7 +232,7 @@ func (d *reader) waitTillPresent(seq int, ext string) error { for { select { case evt := <-w.Events: - if evt.Op == fsnotify.Create { + if evt.Op&fsnotify.Create == fsnotify.Create && evt.Name == filename { return nil } } @@ -242,8 +243,12 @@ func (d *reader) waitTillPresent(seq int, ext string) error { func (d *reader) fetchNextLoop() { for { nextSeq := d.lastSequence + 1 - d.waitTillPresent(nextSeq, d.stateExt) - d.waitTillPresent(nextSeq, d.fileExt) + if err := d.waitTillPresent(nextSeq, d.stateExt); err != nil { + log.Error(err) + } + if err := d.waitTillPresent(nextSeq, d.fileExt); err != nil { + log.Error(err) + } d.lastSequence = nextSeq base := path.Join(d.dest, seqPath(d.lastSequence)) lastTime, _ := d.stateTime(base + d.stateExt)