fix diff reader for missing subdirectories

master
Oliver Tonnhofer 2017-02-13 10:37:16 +01:00
parent 4ec0b2c57d
commit dbc9deb913
2 changed files with 92 additions and 26 deletions

View File

@ -8,6 +8,7 @@ import (
"net/http"
"os"
"path"
"path/filepath"
"time"
"gopkg.in/fsnotify.v1"
@ -213,31 +214,7 @@ func (d *reader) Sequences() <-chan Sequence {
func (d *reader) waitTillPresent(seq int, ext string) error {
filename := path.Join(d.dest, seqPath(seq)+ext)
if _, err := os.Stat(filename); err == nil {
return nil
}
w, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer w.Close()
w.Add(path.Dir(filename))
// check again, in case file was created before we added the file
if _, err := os.Stat(filename); err == nil {
return nil
}
for {
select {
case evt := <-w.Events:
if evt.Op&fsnotify.Create == fsnotify.Create && evt.Name == filename {
return nil
}
}
}
return nil
return waitTillPresent(filename)
}
func (d *reader) fetchNextLoop() {
@ -260,3 +237,39 @@ func (d *reader) fetchNextLoop() {
}
}
}
// waitTillPresent blocks till file is present.
func waitTillPresent(filename string) error {
if _, err := os.Stat(filename); err == nil {
return nil
}
// fsnotify does not work recursive. wait for parent dirs first (e.g. 002/134)
parent := filepath.Dir(filename)
if err := waitTillPresent(parent); err != nil {
return err
}
w, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer w.Close()
// need to watch on parent if we want to get events for new file
w.Add(parent)
// check again, in case file was created before we added the file
if _, err := os.Stat(filename); err == nil {
return nil
}
for {
select {
case evt := <-w.Events:
if evt.Op&fsnotify.Create == fsnotify.Create && evt.Name == filename {
return nil
}
}
}
return nil
}

View File

@ -1,6 +1,13 @@
package replication
import "testing"
import (
"io/ioutil"
"os"
"path/filepath"
"time"
"testing"
)
func TestSeqPath(t *testing.T) {
if path := seqPath(0); path != "000/000/000" {
@ -13,3 +20,49 @@ func TestSeqPath(t *testing.T) {
t.Fatal(path)
}
}
func TestWaitTillPresent(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "imposm3tests")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpdir)
exists := filepath.Join(tmpdir, "exists")
f, err := os.Create(exists)
if err != nil {
t.Fatal(err)
}
f.Close()
waitTillPresent(exists)
create := filepath.Join(tmpdir, "create")
go func() {
time.Sleep(200 * time.Millisecond)
f, err := os.Create(create)
if err != nil {
t.Fatal(err)
}
f.Close()
}()
waitTillPresent(create)
sub := filepath.Join(tmpdir, "sub", "dir", "create")
go func() {
time.Sleep(200 * time.Millisecond)
if err := os.Mkdir(filepath.Join(tmpdir, "sub"), 0755); err != nil {
t.Fatal(err)
}
time.Sleep(200 * time.Millisecond)
if err := os.Mkdir(filepath.Join(tmpdir, "sub", "dir"), 0755); err != nil {
t.Fatal(err)
}
time.Sleep(200 * time.Millisecond)
f, err := os.Create(sub)
if err != nil {
t.Fatal(err)
}
f.Close()
}()
waitTillPresent(sub)
}