improve diff reader from directory
watching files does not work reliable on all platforms, watch on directory and check filenamemaster
parent
0a2184b445
commit
4ec0b2c57d
|
@ -3,7 +3,6 @@ package replication
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"gopkg.in/fsnotify.v1"
|
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -11,6 +10,8 @@ import (
|
||||||
"path"
|
"path"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"gopkg.in/fsnotify.v1"
|
||||||
|
|
||||||
"github.com/omniscale/imposm3"
|
"github.com/omniscale/imposm3"
|
||||||
"github.com/omniscale/imposm3/logging"
|
"github.com/omniscale/imposm3/logging"
|
||||||
)
|
)
|
||||||
|
@ -221,7 +222,7 @@ func (d *reader) waitTillPresent(seq int, ext string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
w.Add(filename)
|
w.Add(path.Dir(filename))
|
||||||
|
|
||||||
// check again, in case file was created before we added the file
|
// check again, in case file was created before we added the file
|
||||||
if _, err := os.Stat(filename); err == nil {
|
if _, err := os.Stat(filename); err == nil {
|
||||||
|
@ -231,7 +232,7 @@ func (d *reader) waitTillPresent(seq int, ext string) error {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case evt := <-w.Events:
|
case evt := <-w.Events:
|
||||||
if evt.Op == fsnotify.Create {
|
if evt.Op&fsnotify.Create == fsnotify.Create && evt.Name == filename {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -242,8 +243,12 @@ func (d *reader) waitTillPresent(seq int, ext string) error {
|
||||||
func (d *reader) fetchNextLoop() {
|
func (d *reader) fetchNextLoop() {
|
||||||
for {
|
for {
|
||||||
nextSeq := d.lastSequence + 1
|
nextSeq := d.lastSequence + 1
|
||||||
d.waitTillPresent(nextSeq, d.stateExt)
|
if err := d.waitTillPresent(nextSeq, d.stateExt); err != nil {
|
||||||
d.waitTillPresent(nextSeq, d.fileExt)
|
log.Error(err)
|
||||||
|
}
|
||||||
|
if err := d.waitTillPresent(nextSeq, d.fileExt); err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
d.lastSequence = nextSeq
|
d.lastSequence = nextSeq
|
||||||
base := path.Join(d.dest, seqPath(d.lastSequence))
|
base := path.Join(d.dest, seqPath(d.lastSequence))
|
||||||
lastTime, _ := d.stateTime(base + d.stateExt)
|
lastTime, _ := d.stateTime(base + d.stateExt)
|
||||||
|
|
Loading…
Reference in New Issue