diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index ce852fdde..3205cf895 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -23,7 +23,8 @@ import ( "github.com/coreos/etcd/mvcc/mvccpb" ) -const ( +// non-const so modifiable by tests +var ( // chanBufLen is the length of the buffered chan // for sending out watched events. // TODO: find a good buf value. 1024 is just a random one that diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index 37bd01d88..a72be9cd9 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -16,8 +16,10 @@ package mvcc import ( "bytes" + "fmt" "os" "reflect" + "sync" "testing" "time" @@ -424,3 +426,83 @@ func TestNewMapwatcherToEventMap(t *testing.T) { } } } + +// TestWatchVictims tests that watchable store delivers watch events +// when the watch channel is temporarily clogged with too many events. +func TestWatchVictims(t *testing.T) { + oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync + + b, tmpPath := backend.NewDefaultTmpBackend() + s := newWatchableStore(b, &lease.FakeLessor{}, nil) + + defer func() { + s.store.Close() + os.Remove(tmpPath) + chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync + }() + + chanBufLen, maxWatchersPerSync = 1, 2 + numPuts := chanBufLen * 64 + testKey, testValue := []byte("foo"), []byte("bar") + + var wg sync.WaitGroup + numWatches := maxWatchersPerSync * 128 + errc := make(chan error, numWatches) + wg.Add(numWatches) + for i := 0; i < numWatches; i++ { + go func() { + w := s.NewWatchStream() + w.Watch(testKey, nil, 1) + defer func() { + w.Close() + wg.Done() + }() + tc := time.After(10 * time.Second) + evs, nextRev := 0, int64(2) + for evs < numPuts { + select { + case <-tc: + errc <- fmt.Errorf("time out") + return + case wr := <-w.Chan(): + evs += len(wr.Events) + for _, ev := range wr.Events { + if ev.Kv.ModRevision != nextRev { + errc <- fmt.Errorf("expected rev=%d, got %d", nextRev, ev.Kv.ModRevision) + return + } + nextRev++ + } + time.Sleep(time.Millisecond) + } + } + if evs != numPuts { + errc <- fmt.Errorf("expected %d events, got %d", numPuts, evs) + return + } + select { + case <-w.Chan(): + errc <- fmt.Errorf("unexpected response") + default: + } + }() + time.Sleep(time.Millisecond) + } + + var wgPut sync.WaitGroup + wgPut.Add(numPuts) + for i := 0; i < numPuts; i++ { + go func() { + defer wgPut.Done() + s.Put(testKey, testValue, lease.NoLease) + }() + } + wgPut.Wait() + + wg.Wait() + select { + case err := <-errc: + t.Fatal(err) + default: + } +}