diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 8f313919b..df70f556a 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -146,9 +146,10 @@ func (sws *serverWatchStream) sendLoop() { } err := sws.gRPCStream.Send(&pb.WatchResponse{ - Header: sws.newResponseHeader(wresp.Revision), - WatchId: int64(wresp.WatchID), - Events: events, + Header: sws.newResponseHeader(wresp.Revision), + WatchId: int64(wresp.WatchID), + Events: events, + Compacted: wresp.Compacted, }) storage.ReportEventReceived() if err != nil { diff --git a/storage/watchable_store.go b/storage/watchable_store.go index fef796008..19235e102 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -131,7 +131,7 @@ func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) Type: storagepb.PUT, Kv: &changes[0], } - s.handle(rev, []storagepb.Event{ev}) + s.notify(rev, []storagepb.Event{ev}) return rev } @@ -156,7 +156,7 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { Type: storagepb.DELETE, Kv: &change} } - s.handle(rev, evs) + s.notify(rev, evs) return n, rev } @@ -191,7 +191,7 @@ func (s *watchableStore) TxnEnd(txnID int64) error { } } - s.handle(s.store.Rev(), evs) + s.notify(s.store.Rev(), evs) s.mu.Unlock() return nil @@ -299,9 +299,12 @@ func (s *watchableStore) syncWatchers() { } if w.cur < compactionRev { - // TODO: return error compacted to that watcher instead of - // just removing it silently from unsynced. - s.unsynced.delete(w) + select { + case w.ch <- WatchResponse{WatchID: w.id, Compacted: true}: + s.unsynced.delete(w) + default: + // retry next time + } continue } @@ -324,7 +327,6 @@ func (s *watchableStore) syncWatchers() { tx := s.store.b.BatchTx() tx.Lock() ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) - tx.Unlock() evs := []storagepb.Event{} @@ -351,6 +353,7 @@ func (s *watchableStore) syncWatchers() { evs = append(evs, ev) } + tx.Unlock() for w, es := range newWatcherToEventMap(s.unsynced, evs) { select { @@ -370,11 +373,6 @@ func (s *watchableStore) syncWatchers() { slowWatcherGauge.Set(float64(len(s.unsynced))) } -// handle handles the change of the happening event on all watchers. -func (s *watchableStore) handle(rev int64, evs []storagepb.Event) { - s.notify(rev, evs) -} - // notify notifies the fact that given event at the given rev just happened to // watchers that watch on the key of the event. func (s *watchableStore) notify(rev int64, evs []storagepb.Event) { diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index 6136d2448..0a154ac72 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -19,6 +19,7 @@ import ( "os" "reflect" "testing" + "time" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/storage/backend" @@ -215,6 +216,44 @@ func TestSyncWatchers(t *testing.T) { } } +// TestWatchCompacted tests a watcher that watches on a compacted revision. +func TestWatchCompacted(t *testing.T) { + b, tmpPath := backend.NewDefaultTmpBackend() + s := newWatchableStore(b, &lease.FakeLessor{}) + + defer func() { + s.store.Close() + os.Remove(tmpPath) + }() + testKey := []byte("foo") + testValue := []byte("bar") + + maxRev := 10 + compactRev := int64(5) + for i := 0; i < maxRev; i++ { + s.Put(testKey, testValue, lease.NoLease) + } + err := s.Compact(compactRev) + if err != nil { + t.Fatalf("failed to compact kv (%v)", err) + } + + w := s.NewWatchStream() + wt := w.Watch(testKey, true, compactRev-1) + + select { + case resp := <-w.Chan(): + if resp.WatchID != wt { + t.Errorf("resp.WatchID = %x, want %x", resp.WatchID, wt) + } + if resp.Compacted != true { + t.Errorf("resp.Compacted = %v, want %v", resp.Compacted, true) + } + case <-time.After(1 * time.Second): + t.Fatalf("failed to receive response (timeout)") + } +} + func TestNewMapwatcherToEventMap(t *testing.T) { k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2") v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2") diff --git a/storage/watcher.go b/storage/watcher.go index 3eec7ce63..0f099a84a 100644 --- a/storage/watcher.go +++ b/storage/watcher.go @@ -67,6 +67,9 @@ type WatchResponse struct { // watcher, the revision is greater than the last modified revision // inside Events. Revision int64 + + // Compacted is set when the watcher is cancelled due to compaction. + Compacted bool } // watchStream contains a collection of watchers that share