From ae5161382bbf99097eaadd15017d1aaaf4ac79b3 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 2 Feb 2016 22:48:50 -0800 Subject: [PATCH 1/4] storage: release tx lock until finish using the readonly bytes The backend will return read only bytes that are only vaild while the tx is open. We should hold the lock until we get a full copy by unmarshal. --- storage/watchable_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index fef796008..2f0a0578e 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -324,7 +324,6 @@ func (s *watchableStore) syncWatchers() { tx := s.store.b.BatchTx() tx.Lock() ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) - tx.Unlock() evs := []storagepb.Event{} @@ -351,6 +350,7 @@ func (s *watchableStore) syncWatchers() { evs = append(evs, ev) } + tx.Unlock() for w, es := range newWatcherToEventMap(s.unsynced, evs) { select { From 5780497e1812c1afe09f3bb72d70f11d492f5de5 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 2 Feb 2016 23:02:15 -0800 Subject: [PATCH 2/4] storage: remove unncessary handle func --- storage/watchable_store.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 2f0a0578e..708c76279 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -131,7 +131,7 @@ func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) Type: storagepb.PUT, Kv: &changes[0], } - s.handle(rev, []storagepb.Event{ev}) + s.notify(rev, []storagepb.Event{ev}) return rev } @@ -156,7 +156,7 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { Type: storagepb.DELETE, Kv: &change} } - s.handle(rev, evs) + s.notify(rev, evs) return n, rev } @@ -191,7 +191,7 @@ func (s *watchableStore) TxnEnd(txnID int64) error { } } - s.handle(s.store.Rev(), evs) + s.notify(s.store.Rev(), evs) s.mu.Unlock() return nil @@ -370,11 +370,6 @@ func (s *watchableStore) syncWatchers() { slowWatcherGauge.Set(float64(len(s.unsynced))) } -// handle handles the change of the happening event on all watchers. -func (s *watchableStore) handle(rev int64, evs []storagepb.Event) { - s.notify(rev, evs) -} - // notify notifies the fact that given event at the given rev just happened to // watchers that watch on the key of the event. func (s *watchableStore) notify(rev int64, evs []storagepb.Event) { From 52416fafb0523babd009ecd550659b49016dfe68 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 2 Feb 2016 23:17:27 -0800 Subject: [PATCH 3/4] storage: send compaction --- storage/watchable_store.go | 9 +++++--- storage/watchable_store_test.go | 39 +++++++++++++++++++++++++++++++++ storage/watcher.go | 3 +++ 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 708c76279..19235e102 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -299,9 +299,12 @@ func (s *watchableStore) syncWatchers() { } if w.cur < compactionRev { - // TODO: return error compacted to that watcher instead of - // just removing it silently from unsynced. - s.unsynced.delete(w) + select { + case w.ch <- WatchResponse{WatchID: w.id, Compacted: true}: + s.unsynced.delete(w) + default: + // retry next time + } continue } diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index 6136d2448..0a154ac72 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -19,6 +19,7 @@ import ( "os" "reflect" "testing" + "time" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/storage/backend" @@ -215,6 +216,44 @@ func TestSyncWatchers(t *testing.T) { } } +// TestWatchCompacted tests a watcher that watches on a compacted revision. +func TestWatchCompacted(t *testing.T) { + b, tmpPath := backend.NewDefaultTmpBackend() + s := newWatchableStore(b, &lease.FakeLessor{}) + + defer func() { + s.store.Close() + os.Remove(tmpPath) + }() + testKey := []byte("foo") + testValue := []byte("bar") + + maxRev := 10 + compactRev := int64(5) + for i := 0; i < maxRev; i++ { + s.Put(testKey, testValue, lease.NoLease) + } + err := s.Compact(compactRev) + if err != nil { + t.Fatalf("failed to compact kv (%v)", err) + } + + w := s.NewWatchStream() + wt := w.Watch(testKey, true, compactRev-1) + + select { + case resp := <-w.Chan(): + if resp.WatchID != wt { + t.Errorf("resp.WatchID = %x, want %x", resp.WatchID, wt) + } + if resp.Compacted != true { + t.Errorf("resp.Compacted = %v, want %v", resp.Compacted, true) + } + case <-time.After(1 * time.Second): + t.Fatalf("failed to receive response (timeout)") + } +} + func TestNewMapwatcherToEventMap(t *testing.T) { k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2") v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2") diff --git a/storage/watcher.go b/storage/watcher.go index 3eec7ce63..0f099a84a 100644 --- a/storage/watcher.go +++ b/storage/watcher.go @@ -67,6 +67,9 @@ type WatchResponse struct { // watcher, the revision is greater than the last modified revision // inside Events. Revision int64 + + // Compacted is set when the watcher is cancelled due to compaction. + Compacted bool } // watchStream contains a collection of watchers that share From 3ed404633a7d8bd4869d81fe8b804b69ba36c308 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 2 Feb 2016 23:24:15 -0800 Subject: [PATCH 4/4] v3rpc: add compacted field from wresp --- etcdserver/api/v3rpc/watch.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 8f313919b..df70f556a 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -146,9 +146,10 @@ func (sws *serverWatchStream) sendLoop() { } err := sws.gRPCStream.Send(&pb.WatchResponse{ - Header: sws.newResponseHeader(wresp.Revision), - WatchId: int64(wresp.WatchID), - Events: events, + Header: sws.newResponseHeader(wresp.Revision), + WatchId: int64(wresp.WatchID), + Events: events, + Compacted: wresp.Compacted, }) storage.ReportEventReceived() if err != nil {