From efe0ee7e59e290b7452c898d1605cfccf2c79924 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 30 May 2016 16:17:13 -0700 Subject: [PATCH] rafthttp: remove the newPipeline func Using struct to initialize pipeline is better when we have many fields to file in. --- rafthttp/peer.go | 13 +++++++++- rafthttp/pipeline.go | 42 +++++++++++------------------- rafthttp/pipeline_test.go | 54 ++++++++++++++++++++++----------------- rafthttp/remote.go | 12 ++++++++- 4 files changed, 68 insertions(+), 53 deletions(-) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 0ef79c69c..9dd0207d0 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -120,6 +120,17 @@ type peer struct { func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer { status := newPeerStatus(to) picker := newURLPicker(urls) + pipeline := &pipeline{ + to: to, + tr: transport, + picker: picker, + status: status, + followerStats: fs, + raft: r, + errorc: errorc, + } + pipeline.start() + p := &peer{ id: to, r: r, @@ -127,7 +138,7 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r picker: picker, msgAppV2Writer: startStreamWriter(to, status, fs, r), writer: startStreamWriter(to, status, fs, r), - pipeline: newPipeline(transport, picker, local, to, cid, status, fs, r, errorc), + pipeline: pipeline, snapSender: newSnapshotSender(transport, picker, local, to, cid, status, r, errorc), sendc: make(chan raftpb.Message), recvc: make(chan raftpb.Message, recvBufSize), diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index f746c855e..081ca2b7d 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -41,15 +41,15 @@ const ( var errStopped = errors.New("stopped") type pipeline struct { - from, to types.ID - cid types.ID + to types.ID tr *Transport picker *urlPicker status *peerStatus - fs *stats.FollowerStats - r Raft + raft Raft errorc chan error + // deprecate when we depercate v2 API + followerStats *stats.FollowerStats msgc chan raftpb.Message // wait for the handling routines @@ -57,25 +57,13 @@ type pipeline struct { stopc chan struct{} } -func newPipeline(tr *Transport, picker *urlPicker, from, to, cid types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline { - p := &pipeline{ - from: from, - to: to, - cid: cid, - tr: tr, - picker: picker, - status: status, - fs: fs, - r: r, - errorc: errorc, - stopc: make(chan struct{}), - msgc: make(chan raftpb.Message, pipelineBufSize), - } +func (p *pipeline) start() { + p.stopc = make(chan struct{}) + p.msgc = make(chan raftpb.Message, pipelineBufSize) p.wg.Add(connPerPipeline) for i := 0; i < connPerPipeline; i++ { go p.handle() } - return p } func (p *pipeline) stop() { @@ -96,22 +84,22 @@ func (p *pipeline) handle() { if err != nil { p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error()) - if m.Type == raftpb.MsgApp && p.fs != nil { - p.fs.Fail() + if m.Type == raftpb.MsgApp && p.followerStats != nil { + p.followerStats.Fail() } - p.r.ReportUnreachable(m.To) + p.raft.ReportUnreachable(m.To) if isMsgSnap(m) { - p.r.ReportSnapshot(m.To, raft.SnapshotFailure) + p.raft.ReportSnapshot(m.To, raft.SnapshotFailure) } continue } p.status.activate() - if m.Type == raftpb.MsgApp && p.fs != nil { - p.fs.Succ(end.Sub(start)) + if m.Type == raftpb.MsgApp && p.followerStats != nil { + p.followerStats.Succ(end.Sub(start)) } if isMsgSnap(m) { - p.r.ReportSnapshot(m.To, raft.SnapshotFinish) + p.raft.ReportSnapshot(m.To, raft.SnapshotFinish) } sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size())) case <-p.stopc: @@ -124,7 +112,7 @@ func (p *pipeline) handle() { // error on any failure. func (p *pipeline) post(data []byte) (err error) { u := p.picker.pick() - req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.from, p.cid) + req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID) done := make(chan struct{}, 1) cancel := httputil.RequestCanceler(p.tr.pipelineRt, req) diff --git a/rafthttp/pipeline_test.go b/rafthttp/pipeline_test.go index 37c77750f..bf83372a7 100644 --- a/rafthttp/pipeline_test.go +++ b/rafthttp/pipeline_test.go @@ -36,9 +36,8 @@ import ( func TestPipelineSend(t *testing.T) { tr := &roundTripperRecorder{} picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) - fs := &stats.FollowerStats{} tp := &Transport{pipelineRt: tr} - p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil) + p := startTestPipeline(tp, picker) p.msgc <- raftpb.Message{Type: raftpb.MsgApp} testutil.WaitSchedule() @@ -47,10 +46,8 @@ func TestPipelineSend(t *testing.T) { if tr.Request() == nil { t.Errorf("sender fails to post the data") } - fs.Lock() - defer fs.Unlock() - if fs.Counts.Success != 1 { - t.Errorf("success = %d, want 1", fs.Counts.Success) + if p.followerStats.Counts.Success != 1 { + t.Errorf("success = %d, want 1", p.followerStats.Counts.Success) } } @@ -59,9 +56,8 @@ func TestPipelineSend(t *testing.T) { func TestPipelineKeepSendingWhenPostError(t *testing.T) { tr := &respRoundTripper{rec: testutil.NewRecorderStream(), err: fmt.Errorf("roundtrip error")} picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) - fs := &stats.FollowerStats{} tp := &Transport{pipelineRt: tr} - p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil) + p := startTestPipeline(tp, picker) defer p.stop() for i := 0; i < 50; i++ { @@ -77,9 +73,9 @@ func TestPipelineKeepSendingWhenPostError(t *testing.T) { func TestPipelineExceedMaximumServing(t *testing.T) { tr := newRoundTripperBlocker() picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) - fs := &stats.FollowerStats{} tp := &Transport{pipelineRt: tr} - p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil) + p := startTestPipeline(tp, picker) + defer p.stop() // keep the sender busy and make the buffer full // nothing can go out as we block the sender @@ -111,33 +107,29 @@ func TestPipelineExceedMaximumServing(t *testing.T) { default: t.Errorf("failed to send out message") } - p.stop() } // TestPipelineSendFailed tests that when send func meets the post error, // it increases fail count in stats. func TestPipelineSendFailed(t *testing.T) { picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) - fs := &stats.FollowerStats{} tp := &Transport{pipelineRt: newRespRoundTripper(0, errors.New("blah"))} - p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil) + p := startTestPipeline(tp, picker) p.msgc <- raftpb.Message{Type: raftpb.MsgApp} testutil.WaitSchedule() p.stop() - fs.Lock() - defer fs.Unlock() - if fs.Counts.Fail != 1 { - t.Errorf("fail = %d, want 1", fs.Counts.Fail) + if p.followerStats.Counts.Fail != 1 { + t.Errorf("fail = %d, want 1", p.followerStats.Counts.Fail) } } func TestPipelinePost(t *testing.T) { tr := &roundTripperRecorder{} picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) - tp := &Transport{pipelineRt: tr} - p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, nil) + tp := &Transport{ClusterID: types.ID(1), pipelineRt: tr} + p := startTestPipeline(tp, picker) if err := p.post([]byte("some data")); err != nil { t.Fatalf("unexpected post error: %v", err) } @@ -185,7 +177,7 @@ func TestPipelinePostBad(t *testing.T) { for i, tt := range tests { picker := mustNewURLPicker(t, []string{tt.u}) tp := &Transport{pipelineRt: newRespRoundTripper(tt.code, tt.err)} - p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, make(chan error)) + p := startTestPipeline(tp, picker) err := p.post([]byte("some data")) p.stop() @@ -205,13 +197,12 @@ func TestPipelinePostErrorc(t *testing.T) { } for i, tt := range tests { picker := mustNewURLPicker(t, []string{tt.u}) - errorc := make(chan error, 1) tp := &Transport{pipelineRt: newRespRoundTripper(tt.code, tt.err)} - p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, errorc) + p := startTestPipeline(tp, picker) p.post([]byte("some data")) p.stop() select { - case <-errorc: + case <-p.errorc: default: t.Fatalf("#%d: cannot receive from errorc", i) } @@ -221,7 +212,7 @@ func TestPipelinePostErrorc(t *testing.T) { func TestStopBlockedPipeline(t *testing.T) { picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) tp := &Transport{pipelineRt: newRoundTripperBlocker()} - p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, nil) + p := startTestPipeline(tp, picker) // send many messages that most of them will be blocked in buffer for i := 0; i < connPerPipeline*10; i++ { p.msgc <- raftpb.Message{} @@ -307,3 +298,18 @@ type nopReadCloser struct{} func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, io.EOF } func (n *nopReadCloser) Close() error { return nil } + +func startTestPipeline(tr *Transport, picker *urlPicker) *pipeline { + p := &pipeline{ + tr: tr, + picker: picker, + + to: types.ID(1), + status: newPeerStatus(types.ID(1)), + raft: &fakeRaft{}, + followerStats: &stats.FollowerStats{}, + errorc: make(chan error, 1), + } + p.start() + return p +} diff --git a/rafthttp/remote.go b/rafthttp/remote.go index f06cb7a81..df9d2b2c8 100644 --- a/rafthttp/remote.go +++ b/rafthttp/remote.go @@ -28,10 +28,20 @@ type remote struct { func startRemote(tr *Transport, urls types.URLs, local, to, cid types.ID, r Raft, errorc chan error) *remote { picker := newURLPicker(urls) status := newPeerStatus(to) + pipeline := &pipeline{ + to: to, + tr: tr, + picker: picker, + status: status, + raft: r, + errorc: errorc, + } + pipeline.start() + return &remote{ id: to, status: status, - pipeline: newPipeline(tr, picker, local, to, cid, status, nil, r, errorc), + pipeline: pipeline, } }