diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index 93c7cc954..52e1b90c0 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -539,3 +539,49 @@ func TestWatchVictims(t *testing.T) { default: } } + +// TestStressWatchCancelClose tests closing a watch stream while +// canceling its watches. +func TestStressWatchCancelClose(t *testing.T) { + b, tmpPath := backend.NewDefaultTmpBackend() + s := newWatchableStore(b, &lease.FakeLessor{}, nil) + + defer func() { + s.store.Close() + os.Remove(tmpPath) + }() + + testKey, testValue := []byte("foo"), []byte("bar") + var wg sync.WaitGroup + readyc := make(chan struct{}) + wg.Add(100) + for i := 0; i < 100; i++ { + go func() { + defer wg.Done() + w := s.NewWatchStream() + ids := make([]WatchID, 10) + for i := range ids { + ids[i] = w.Watch(testKey, nil, 0) + } + <-readyc + wg.Add(1 + len(ids)/2) + for i := range ids[:len(ids)/2] { + go func(n int) { + defer wg.Done() + w.Cancel(ids[n]) + }(i) + } + go func() { + defer wg.Done() + w.Close() + }() + }() + } + + close(readyc) + for i := 0; i < 100; i++ { + s.Put(testKey, testValue, lease.NoLease) + } + + wg.Wait() +}