rafthttp: make TestPipelineKeepSendingWhenPostError reliable
parent
32a486b462
commit
fb137f11c5
|
@ -57,23 +57,20 @@ func TestPipelineSend(t *testing.T) {
|
||||||
// TestPipelineKeepSendingWhenPostError tests that pipeline can keep
|
// TestPipelineKeepSendingWhenPostError tests that pipeline can keep
|
||||||
// sending messages if previous messages meet post error.
|
// sending messages if previous messages meet post error.
|
||||||
func TestPipelineKeepSendingWhenPostError(t *testing.T) {
|
func TestPipelineKeepSendingWhenPostError(t *testing.T) {
|
||||||
tr := &respRoundTripper{err: fmt.Errorf("roundtrip error")}
|
tr := &respRoundTripper{rec: testutil.NewRecorderStream(), err: fmt.Errorf("roundtrip error")}
|
||||||
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
||||||
fs := &stats.FollowerStats{}
|
fs := &stats.FollowerStats{}
|
||||||
tp := &Transport{pipelineRt: tr}
|
tp := &Transport{pipelineRt: tr}
|
||||||
p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
|
p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
|
||||||
|
defer p.stop()
|
||||||
|
|
||||||
for i := 0; i < 50; i++ {
|
for i := 0; i < 50; i++ {
|
||||||
p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
|
p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
|
||||||
}
|
}
|
||||||
testutil.WaitSchedule()
|
|
||||||
p.stop()
|
|
||||||
|
|
||||||
// check it send out 50 requests
|
_, err := tr.rec.Wait(50)
|
||||||
tr.mu.Lock()
|
if err != nil {
|
||||||
defer tr.mu.Unlock()
|
t.Errorf("unexpected wait error %v", err)
|
||||||
if tr.reqCount != 50 {
|
|
||||||
t.Errorf("request count = %d, want 50", tr.reqCount)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -269,8 +266,8 @@ func (t *roundTripperBlocker) CancelRequest(req *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type respRoundTripper struct {
|
type respRoundTripper struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
reqCount int
|
rec testutil.Recorder
|
||||||
|
|
||||||
code int
|
code int
|
||||||
header http.Header
|
header http.Header
|
||||||
|
@ -283,7 +280,9 @@ func newRespRoundTripper(code int, err error) *respRoundTripper {
|
||||||
func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
t.reqCount++
|
if t.rec != nil {
|
||||||
|
t.rec.Record(testutil.Action{Name: "req", Params: []interface{}{req}})
|
||||||
|
}
|
||||||
return &http.Response{StatusCode: t.code, Header: t.header, Body: &nopReadCloser{}}, t.err
|
return &http.Response{StatusCode: t.code, Header: t.header, Body: &nopReadCloser{}}, t.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue