diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 4ab7a7c6e..2e4b85b6f 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1313,7 +1313,7 @@ func TestGetOtherPeerURLs(t *testing.T) { type nodeRecorder struct{ testutil.Recorder } -func newNodeRecorder() *nodeRecorder { return &nodeRecorder{} } +func newNodeRecorder() *nodeRecorder { return &nodeRecorder{&testutil.RecorderBuffered{}} } func newNodeNop() raft.Node { return newNodeRecorder() } func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) } diff --git a/etcdserver/storage.go b/etcdserver/storage.go index b1ed47b0e..b4cff7bfb 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -149,7 +149,7 @@ func makeMemberDir(dir string) error { } type storageRecorder struct { - testutil.Recorder + testutil.RecorderBuffered dbPath string // must have '/' suffix if set } diff --git a/pkg/testutil/recorder.go b/pkg/testutil/recorder.go index 0a49788a0..04b5357f2 100644 --- a/pkg/testutil/recorder.go +++ b/pkg/testutil/recorder.go @@ -14,27 +14,120 @@ package testutil -import "sync" +import ( + "errors" + "fmt" + "sync" + "time" +) type Action struct { Name string Params []interface{} } -type Recorder struct { +type Recorder interface { + // Record publishes an Action (e.g., function call) which will + // be reflected by Wait() or Chan() + Record(a Action) + // Wait waits until at least n Actions are availble or returns with error + Wait(n int) ([]Action, error) + // Action returns immediately available Actions + Action() []Action + // Chan returns the channel for actions published by Record + Chan() <-chan Action +} + +// RecorderBuffered appends all Actions to a slice +type RecorderBuffered struct { sync.Mutex actions []Action } -func (r *Recorder) Record(a Action) { +func (r *RecorderBuffered) Record(a Action) { r.Lock() r.actions = append(r.actions, a) r.Unlock() } -func (r *Recorder) Action() []Action { +func (r *RecorderBuffered) Action() []Action { r.Lock() cpy := make([]Action, len(r.actions)) copy(cpy, r.actions) r.Unlock() return cpy } +func (r *RecorderBuffered) Wait(n int) (acts []Action, err error) { + // legacy racey behavior + WaitSchedule() + acts = r.Action() + if len(acts) < n { + err = newLenErr(n, len(r.actions)) + } + return acts, err +} + +func (r *RecorderBuffered) Chan() <-chan Action { + ch := make(chan Action) + go func() { + acts := r.Action() + for i := range acts { + ch <- acts[i] + } + close(ch) + }() + return ch +} + +// RecorderStream writes all Actions to an unbuffered channel +type recorderStream struct { + ch chan Action +} + +func NewRecorderStream() Recorder { + return &recorderStream{ch: make(chan Action)} +} + +func (r *recorderStream) Record(a Action) { + r.ch <- a +} + +func (r *recorderStream) Action() (acts []Action) { + for { + select { + case act := <-r.ch: + acts = append(acts, act) + default: + return acts + } + } + return acts +} + +func (r *recorderStream) Chan() <-chan Action { + return r.ch +} + +func (r *recorderStream) Wait(n int) ([]Action, error) { + acts := make([]Action, n) + timeoutC := time.After(5 * time.Second) + for i := 0; i < n; i++ { + select { + case acts[i] = <-r.ch: + case <-timeoutC: + acts = acts[:i] + return acts, newLenErr(n, i) + } + } + // extra wait to catch any Action spew + select { + case act := <-r.ch: + acts = append(acts, act) + case <-time.After(10 * time.Millisecond): + } + return acts, nil +} + +func newLenErr(expected int, actual int) error { + s := fmt.Sprintf("len(actions) = %d, expected >= %d", actual, expected) + return errors.New(s) +} diff --git a/pkg/wait/wait.go b/pkg/wait/wait.go index 8957d3a6a..fc343af3e 100644 --- a/pkg/wait/wait.go +++ b/pkg/wait/wait.go @@ -60,16 +60,16 @@ func (w *List) Trigger(id uint64, x interface{}) { type WaitRecorder struct { Wait - *testutil.Recorder + testutil.Recorder } type waitRecorder struct { - testutil.Recorder + testutil.RecorderBuffered } func NewRecorder() *WaitRecorder { wr := &waitRecorder{} - return &WaitRecorder{Wait: wr, Recorder: &wr.Recorder} + return &WaitRecorder{Wait: wr, Recorder: wr} } func NewNop() Wait { return NewRecorder() } diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 4700d2707..ab7ef9f10 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -473,8 +473,11 @@ func newTestKeyBytes(rev revision, tombstone bool) []byte { } func newFakeStore() *store { - b := &fakeBackend{&fakeBatchTx{rangeRespc: make(chan rangeResp, 5)}} + b := &fakeBackend{&fakeBatchTx{ + Recorder: &testutil.RecorderBuffered{}, + rangeRespc: make(chan rangeResp, 5)}} fi := &fakeIndex{ + Recorder: &testutil.RecorderBuffered{}, indexGetRespc: make(chan indexGetResp, 1), indexRangeRespc: make(chan indexRangeResp, 1), indexRangeEventsRespc: make(chan indexRangeEventsResp, 1), diff --git a/store/store.go b/store/store.go index 15307b38d..2d4b6c40a 100644 --- a/store/store.go +++ b/store/store.go @@ -748,7 +748,7 @@ func (s *store) JsonStats() []byte { // StoreRecorder provides a Store interface with a testutil.Recorder type StoreRecorder struct { Store - *testutil.Recorder + testutil.Recorder } // storeRecorder records all the methods it receives. @@ -756,13 +756,13 @@ type StoreRecorder struct { // It always returns invalid empty response and no error. type storeRecorder struct { Store - testutil.Recorder + testutil.RecorderBuffered } func NewNop() Store { return &storeRecorder{} } func NewRecorder() *StoreRecorder { sr := &storeRecorder{} - return &StoreRecorder{Store: sr, Recorder: &sr.Recorder} + return &StoreRecorder{Store: sr, Recorder: sr} } func (s *storeRecorder) Version() int { return 0 } @@ -856,7 +856,7 @@ type errStoreRecorder struct { func NewErrRecorder(err error) *StoreRecorder { sr := &errStoreRecorder{err: err} - return &StoreRecorder{Store: sr, Recorder: &sr.Recorder} + return &StoreRecorder{Store: sr, Recorder: sr} } func (s *errStoreRecorder) Get(path string, recursive, sorted bool) (*Event, error) {