diff --git a/replication/source.go b/replication/source.go index 5ee3410..1e76563 100644 --- a/replication/source.go +++ b/replication/source.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "path" + "path/filepath" "time" "gopkg.in/fsnotify.v1" @@ -213,31 +214,7 @@ func (d *reader) Sequences() <-chan Sequence { func (d *reader) waitTillPresent(seq int, ext string) error { filename := path.Join(d.dest, seqPath(seq)+ext) - if _, err := os.Stat(filename); err == nil { - return nil - } - - w, err := fsnotify.NewWatcher() - if err != nil { - return err - } - defer w.Close() - w.Add(path.Dir(filename)) - - // check again, in case file was created before we added the file - if _, err := os.Stat(filename); err == nil { - return nil - } - - for { - select { - case evt := <-w.Events: - if evt.Op&fsnotify.Create == fsnotify.Create && evt.Name == filename { - return nil - } - } - } - return nil + return waitTillPresent(filename) } func (d *reader) fetchNextLoop() { @@ -260,3 +237,39 @@ func (d *reader) fetchNextLoop() { } } } + +// waitTillPresent blocks till file is present. +func waitTillPresent(filename string) error { + if _, err := os.Stat(filename); err == nil { + return nil + } + + // fsnotify does not work recursive. wait for parent dirs first (e.g. 002/134) + parent := filepath.Dir(filename) + if err := waitTillPresent(parent); err != nil { + return err + } + + w, err := fsnotify.NewWatcher() + if err != nil { + return err + } + defer w.Close() + // need to watch on parent if we want to get events for new file + w.Add(parent) + + // check again, in case file was created before we added the file + if _, err := os.Stat(filename); err == nil { + return nil + } + + for { + select { + case evt := <-w.Events: + if evt.Op&fsnotify.Create == fsnotify.Create && evt.Name == filename { + return nil + } + } + } + return nil +} diff --git a/replication/source_test.go b/replication/source_test.go index 344a57d..65b9afd 100644 --- a/replication/source_test.go +++ b/replication/source_test.go @@ -1,6 +1,13 @@ package replication -import "testing" +import ( + "io/ioutil" + "os" + "path/filepath" + "time" + + "testing" +) func TestSeqPath(t *testing.T) { if path := seqPath(0); path != "000/000/000" { @@ -13,3 +20,49 @@ func TestSeqPath(t *testing.T) { t.Fatal(path) } } + +func TestWaitTillPresent(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "imposm3tests") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpdir) + + exists := filepath.Join(tmpdir, "exists") + f, err := os.Create(exists) + if err != nil { + t.Fatal(err) + } + f.Close() + waitTillPresent(exists) + + create := filepath.Join(tmpdir, "create") + go func() { + time.Sleep(200 * time.Millisecond) + f, err := os.Create(create) + if err != nil { + t.Fatal(err) + } + f.Close() + }() + waitTillPresent(create) + + sub := filepath.Join(tmpdir, "sub", "dir", "create") + go func() { + time.Sleep(200 * time.Millisecond) + if err := os.Mkdir(filepath.Join(tmpdir, "sub"), 0755); err != nil { + t.Fatal(err) + } + time.Sleep(200 * time.Millisecond) + if err := os.Mkdir(filepath.Join(tmpdir, "sub", "dir"), 0755); err != nil { + t.Fatal(err) + } + time.Sleep(200 * time.Millisecond) + f, err := os.Create(sub) + if err != nil { + t.Fatal(err) + } + f.Close() + }() + waitTillPresent(sub) +}