mvcc: test watch victim/delay path
Current tests don't normally trigger the watch victim path because the constants are too large; set the constants to small values and hammer the store to cause watch delivery delays.release-3.3
parent
29911195de
commit
83b2ea2f60
|
@ -23,7 +23,8 @@ import (
|
|||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
)
|
||||
|
||||
const (
|
||||
// non-const so modifiable by tests
|
||||
var (
|
||||
// chanBufLen is the length of the buffered chan
|
||||
// for sending out watched events.
|
||||
// TODO: find a good buf value. 1024 is just a random one that
|
||||
|
|
|
@ -16,8 +16,10 @@ package mvcc
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -424,3 +426,83 @@ func TestNewMapwatcherToEventMap(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatchVictims tests that watchable store delivers watch events
|
||||
// when the watch channel is temporarily clogged with too many events.
|
||||
func TestWatchVictims(t *testing.T) {
|
||||
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
|
||||
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
|
||||
|
||||
defer func() {
|
||||
s.store.Close()
|
||||
os.Remove(tmpPath)
|
||||
chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
|
||||
}()
|
||||
|
||||
chanBufLen, maxWatchersPerSync = 1, 2
|
||||
numPuts := chanBufLen * 64
|
||||
testKey, testValue := []byte("foo"), []byte("bar")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
numWatches := maxWatchersPerSync * 128
|
||||
errc := make(chan error, numWatches)
|
||||
wg.Add(numWatches)
|
||||
for i := 0; i < numWatches; i++ {
|
||||
go func() {
|
||||
w := s.NewWatchStream()
|
||||
w.Watch(testKey, nil, 1)
|
||||
defer func() {
|
||||
w.Close()
|
||||
wg.Done()
|
||||
}()
|
||||
tc := time.After(10 * time.Second)
|
||||
evs, nextRev := 0, int64(2)
|
||||
for evs < numPuts {
|
||||
select {
|
||||
case <-tc:
|
||||
errc <- fmt.Errorf("time out")
|
||||
return
|
||||
case wr := <-w.Chan():
|
||||
evs += len(wr.Events)
|
||||
for _, ev := range wr.Events {
|
||||
if ev.Kv.ModRevision != nextRev {
|
||||
errc <- fmt.Errorf("expected rev=%d, got %d", nextRev, ev.Kv.ModRevision)
|
||||
return
|
||||
}
|
||||
nextRev++
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
}
|
||||
if evs != numPuts {
|
||||
errc <- fmt.Errorf("expected %d events, got %d", numPuts, evs)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-w.Chan():
|
||||
errc <- fmt.Errorf("unexpected response")
|
||||
default:
|
||||
}
|
||||
}()
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
|
||||
var wgPut sync.WaitGroup
|
||||
wgPut.Add(numPuts)
|
||||
for i := 0; i < numPuts; i++ {
|
||||
go func() {
|
||||
defer wgPut.Done()
|
||||
s.Put(testKey, testValue, lease.NoLease)
|
||||
}()
|
||||
}
|
||||
wgPut.Wait()
|
||||
|
||||
wg.Wait()
|
||||
select {
|
||||
case err := <-errc:
|
||||
t.Fatal(err)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue