From 4076dda101c0265cea01cc07f9d1b6382f7c2da0 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 22 Oct 2015 12:06:47 -0700 Subject: [PATCH] rafthttp: fix wrong return in pipeline.handle pipeline.handle is a long-living one, and should continue to receive next message to send out when current message fails to send. So it should `continue` instead of `return` here. --- rafthttp/pipeline.go | 2 +- rafthttp/pipeline_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index 90b747cdd..305225fd2 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -106,7 +106,7 @@ func (p *pipeline) handle() { if isMsgSnap(m) { p.r.ReportSnapshot(m.To, raft.SnapshotFailure) } - return + continue } p.status.activate() diff --git a/rafthttp/pipeline_test.go b/rafthttp/pipeline_test.go index 86e5bf9f7..1d26fae44 100644 --- a/rafthttp/pipeline_test.go +++ b/rafthttp/pipeline_test.go @@ -16,6 +16,7 @@ package rafthttp import ( "errors" + "fmt" "io" "io/ioutil" "net/http" @@ -52,6 +53,28 @@ func TestPipelineSend(t *testing.T) { } } +// TestPipelineKeepSendingWhenPostError tests that pipeline can keep +// sending messages if previous messages meet post error. +func TestPipelineKeepSendingWhenPostError(t *testing.T) { + tr := &respRoundTripper{err: fmt.Errorf("roundtrip error")} + picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) + fs := &stats.FollowerStats{} + p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil) + + for i := 0; i < 50; i++ { + p.msgc <- raftpb.Message{Type: raftpb.MsgApp} + } + testutil.WaitSchedule() + p.stop() + + // check it send out 50 requests + tr.mu.Lock() + defer tr.mu.Unlock() + if tr.reqCount != 50 { + t.Errorf("request count = %d, want 50", tr.reqCount) + } +} + func TestPipelineExceedMaximumServing(t *testing.T) { tr := newRoundTripperBlocker() picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) @@ -236,6 +259,9 @@ func (t *roundTripperBlocker) CancelRequest(req *http.Request) { } type respRoundTripper struct { + mu sync.Mutex + reqCount int + code int header http.Header err error @@ -245,6 +271,9 @@ func newRespRoundTripper(code int, err error) *respRoundTripper { return &respRoundTripper{code: code, err: err} } func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + t.mu.Lock() + defer t.mu.Unlock() + t.reqCount++ return &http.Response{StatusCode: t.code, Header: t.header, Body: &nopReadCloser{}}, t.err }