From e41cbeda5dd0082559af4d001b0b666316b4430f Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 10 Mar 2015 23:55:37 -0700 Subject: [PATCH] rafthttp: drop messages in channel when disconnection The messages in channel are outdated, and there is no need to send them in the future. It also reports unreachable if there are messages in the channel. --- rafthttp/functional_test.go | 4 ++-- rafthttp/peer.go | 22 ++++++++++------------ rafthttp/stream.go | 17 ++++++++++++----- rafthttp/stream_test.go | 37 +++++++++++++++++++++++-------------- 4 files changed, 47 insertions(+), 33 deletions(-) diff --git a/rafthttp/functional_test.go b/rafthttp/functional_test.go index 2ea6bda14..9b939dabf 100644 --- a/rafthttp/functional_test.go +++ b/rafthttp/functional_test.go @@ -122,10 +122,10 @@ func newServerStats() *stats.ServerStats { func waitStreamWorking(p *peer) bool { for i := 0; i < 1000; i++ { time.Sleep(time.Millisecond) - if !p.msgAppWriter.isWorking() { + if _, ok := p.msgAppWriter.writec(); !ok { continue } - if !p.writer.isWorking() { + if _, ok := p.writer.writec(); !ok { continue } return true diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 832f11e71..f940ae3a2 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -115,8 +115,8 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r p := &peer{ id: to, r: r, - msgAppWriter: startStreamWriter(fs, r), - writer: startStreamWriter(fs, r), + msgAppWriter: startStreamWriter(to, fs, r), + writer: startStreamWriter(to, fs, r), pipeline: newPipeline(tr, picker, to, cid, fs, r, errorc), sendc: make(chan raftpb.Message), recvc: make(chan raftpb.Message, recvBufSize), @@ -244,20 +244,18 @@ func (p *peer) Stop() { // pick picks a chan for sending the given message. The picked chan and the picked chan // string name are returned. -func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, picked string) { - switch { +func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) { + var ok bool // Considering MsgSnap may have a big size, e.g., 1G, and will block // stream for a long time, only use one of the N pipelines to send MsgSnap. - case isMsgSnap(m): - return p.pipeline.msgc, pipelineMsg - case p.msgAppWriter.isWorking() && canUseMsgAppStream(m): - return p.msgAppWriter.msgc, streamApp - case p.writer.isWorking(): - return p.writer.msgc, streamMsg - default: + if isMsgSnap(m) { return p.pipeline.msgc, pipelineMsg + } else if writec, ok = p.msgAppWriter.writec(); ok && canUseMsgAppStream(m) { + return writec, streamApp + } else if writec, ok = p.writer.writec(); ok { + return writec, streamMsg } - return + return p.pipeline.msgc, pipelineMsg } func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap } diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 2f2a03b26..a8e213c84 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -61,6 +61,7 @@ type outgoingConn struct { // streamWriter is a long-running go-routine that writes messages into the // attached outgoingConn. type streamWriter struct { + id types.ID fs *stats.FollowerStats r Raft @@ -74,8 +75,9 @@ type streamWriter struct { done chan struct{} } -func startStreamWriter(fs *stats.FollowerStats, r Raft) *streamWriter { +func startStreamWriter(id types.ID, fs *stats.FollowerStats, r Raft) *streamWriter { w := &streamWriter{ + id: id, fs: fs, r: r, msgc: make(chan raftpb.Message, streamBufSize), @@ -163,18 +165,23 @@ func (cw *streamWriter) run() { } } -func (cw *streamWriter) isWorking() bool { +func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) { cw.mu.Lock() defer cw.mu.Unlock() - return cw.working + return cw.msgc, cw.working } func (cw *streamWriter) resetCloser() { cw.mu.Lock() defer cw.mu.Unlock() - if cw.working { - cw.closer.Close() + if !cw.working { + return } + cw.closer.Close() + if len(cw.msgc) > 0 { + cw.r.ReportUnreachable(uint64(cw.id)) + } + cw.msgc = make(chan raftpb.Message, streamBufSize) cw.working = false } diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index 4f82b767c..dba5ab8b8 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -18,10 +18,10 @@ import ( // to streamWriter. After that, streamWriter can use it to send messages // continuously, and closes it when stopped. func TestStreamWriterAttachOutgoingConn(t *testing.T) { - sw := startStreamWriter(&stats.FollowerStats{}, &fakeRaft{}) + sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{}) // the expected initial state of streamWrite is not working - if g := sw.isWorking(); g != false { - t.Errorf("initial working status = %v, want false", g) + if _, ok := sw.writec(); ok != false { + t.Errorf("initial working status = %v, want false", ok) } // repeatitive tests to ensure it can use latest connection @@ -36,15 +36,15 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { t.Errorf("#%d: close of previous connection = %v, want true", i, prevwfc.closed) } // starts working - if g := sw.isWorking(); g != true { - t.Errorf("#%d: working status = %v, want true", i, g) + if _, ok := sw.writec(); ok != true { + t.Errorf("#%d: working status = %v, want true", i, ok) } sw.msgc <- raftpb.Message{} testutil.ForceGosched() // still working - if g := sw.isWorking(); g != true { - t.Errorf("#%d: working status = %v, want true", i, g) + if _, ok := sw.writec(); ok != true { + t.Errorf("#%d: working status = %v, want true", i, ok) } if wfc.written == 0 { t.Errorf("#%d: failed to write to the underlying connection", i) @@ -53,8 +53,8 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { sw.stop() // no longer in working status now - if g := sw.isWorking(); g != false { - t.Errorf("working status after stop = %v, want false", g) + if _, ok := sw.writec(); ok != false { + t.Errorf("working status after stop = %v, want false", ok) } if wfc.closed != true { t.Errorf("failed to close the underlying connection") @@ -64,7 +64,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad // outgoingConn will close the outgoingConn and fall back to non-working status. func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { - sw := startStreamWriter(&stats.FollowerStats{}, &fakeRaft{}) + sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{}) defer sw.stop() wfc := &fakeWriteFlushCloser{err: errors.New("blah")} sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc}) @@ -72,8 +72,8 @@ func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { sw.msgc <- raftpb.Message{} testutil.ForceGosched() // no longer working - if g := sw.isWorking(); g != false { - t.Errorf("working = %v, want false", g) + if _, ok := sw.writec(); ok != false { + t.Errorf("working = %v, want false", ok) } if wfc.closed != true { t.Errorf("failed to close the underlying connection") @@ -197,7 +197,7 @@ func TestStream(t *testing.T) { srv := httptest.NewServer(h) defer srv.Close() - sw := startStreamWriter(&stats.FollowerStats{}, &fakeRaft{}) + sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{}) defer sw.stop() h.sw = sw @@ -207,8 +207,17 @@ func TestStream(t *testing.T) { if tt.t == streamTypeMsgApp { sr.updateMsgAppTerm(tt.term) } + // wait for stream to work + var writec chan<- raftpb.Message + for { + var ok bool + if writec, ok = sw.writec(); ok { + break + } + time.Sleep(time.Millisecond) + } - sw.msgc <- tt.m + writec <- tt.m var m raftpb.Message select { case m = <-tt.wc: