diff --git a/rafthttp/functional_test.go b/rafthttp/functional_test.go index 2ea6bda14..9b939dabf 100644 --- a/rafthttp/functional_test.go +++ b/rafthttp/functional_test.go @@ -122,10 +122,10 @@ func newServerStats() *stats.ServerStats { func waitStreamWorking(p *peer) bool { for i := 0; i < 1000; i++ { time.Sleep(time.Millisecond) - if !p.msgAppWriter.isWorking() { + if _, ok := p.msgAppWriter.writec(); !ok { continue } - if !p.writer.isWorking() { + if _, ok := p.writer.writec(); !ok { continue } return true diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 832f11e71..f940ae3a2 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -115,8 +115,8 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r p := &peer{ id: to, r: r, - msgAppWriter: startStreamWriter(fs, r), - writer: startStreamWriter(fs, r), + msgAppWriter: startStreamWriter(to, fs, r), + writer: startStreamWriter(to, fs, r), pipeline: newPipeline(tr, picker, to, cid, fs, r, errorc), sendc: make(chan raftpb.Message), recvc: make(chan raftpb.Message, recvBufSize), @@ -244,20 +244,18 @@ func (p *peer) Stop() { // pick picks a chan for sending the given message. The picked chan and the picked chan // string name are returned. -func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, picked string) { - switch { +func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) { + var ok bool // Considering MsgSnap may have a big size, e.g., 1G, and will block // stream for a long time, only use one of the N pipelines to send MsgSnap. - case isMsgSnap(m): - return p.pipeline.msgc, pipelineMsg - case p.msgAppWriter.isWorking() && canUseMsgAppStream(m): - return p.msgAppWriter.msgc, streamApp - case p.writer.isWorking(): - return p.writer.msgc, streamMsg - default: + if isMsgSnap(m) { return p.pipeline.msgc, pipelineMsg + } else if writec, ok = p.msgAppWriter.writec(); ok && canUseMsgAppStream(m) { + return writec, streamApp + } else if writec, ok = p.writer.writec(); ok { + return writec, streamMsg } - return + return p.pipeline.msgc, pipelineMsg } func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap } diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 2f2a03b26..a8e213c84 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -61,6 +61,7 @@ type outgoingConn struct { // streamWriter is a long-running go-routine that writes messages into the // attached outgoingConn. type streamWriter struct { + id types.ID fs *stats.FollowerStats r Raft @@ -74,8 +75,9 @@ type streamWriter struct { done chan struct{} } -func startStreamWriter(fs *stats.FollowerStats, r Raft) *streamWriter { +func startStreamWriter(id types.ID, fs *stats.FollowerStats, r Raft) *streamWriter { w := &streamWriter{ + id: id, fs: fs, r: r, msgc: make(chan raftpb.Message, streamBufSize), @@ -163,18 +165,23 @@ func (cw *streamWriter) run() { } } -func (cw *streamWriter) isWorking() bool { +func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) { cw.mu.Lock() defer cw.mu.Unlock() - return cw.working + return cw.msgc, cw.working } func (cw *streamWriter) resetCloser() { cw.mu.Lock() defer cw.mu.Unlock() - if cw.working { - cw.closer.Close() + if !cw.working { + return } + cw.closer.Close() + if len(cw.msgc) > 0 { + cw.r.ReportUnreachable(uint64(cw.id)) + } + cw.msgc = make(chan raftpb.Message, streamBufSize) cw.working = false } diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index 4f82b767c..dba5ab8b8 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -18,10 +18,10 @@ import ( // to streamWriter. After that, streamWriter can use it to send messages // continuously, and closes it when stopped. func TestStreamWriterAttachOutgoingConn(t *testing.T) { - sw := startStreamWriter(&stats.FollowerStats{}, &fakeRaft{}) + sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{}) // the expected initial state of streamWrite is not working - if g := sw.isWorking(); g != false { - t.Errorf("initial working status = %v, want false", g) + if _, ok := sw.writec(); ok != false { + t.Errorf("initial working status = %v, want false", ok) } // repeatitive tests to ensure it can use latest connection @@ -36,15 +36,15 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { t.Errorf("#%d: close of previous connection = %v, want true", i, prevwfc.closed) } // starts working - if g := sw.isWorking(); g != true { - t.Errorf("#%d: working status = %v, want true", i, g) + if _, ok := sw.writec(); ok != true { + t.Errorf("#%d: working status = %v, want true", i, ok) } sw.msgc <- raftpb.Message{} testutil.ForceGosched() // still working - if g := sw.isWorking(); g != true { - t.Errorf("#%d: working status = %v, want true", i, g) + if _, ok := sw.writec(); ok != true { + t.Errorf("#%d: working status = %v, want true", i, ok) } if wfc.written == 0 { t.Errorf("#%d: failed to write to the underlying connection", i) @@ -53,8 +53,8 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { sw.stop() // no longer in working status now - if g := sw.isWorking(); g != false { - t.Errorf("working status after stop = %v, want false", g) + if _, ok := sw.writec(); ok != false { + t.Errorf("working status after stop = %v, want false", ok) } if wfc.closed != true { t.Errorf("failed to close the underlying connection") @@ -64,7 +64,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad // outgoingConn will close the outgoingConn and fall back to non-working status. func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { - sw := startStreamWriter(&stats.FollowerStats{}, &fakeRaft{}) + sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{}) defer sw.stop() wfc := &fakeWriteFlushCloser{err: errors.New("blah")} sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc}) @@ -72,8 +72,8 @@ func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { sw.msgc <- raftpb.Message{} testutil.ForceGosched() // no longer working - if g := sw.isWorking(); g != false { - t.Errorf("working = %v, want false", g) + if _, ok := sw.writec(); ok != false { + t.Errorf("working = %v, want false", ok) } if wfc.closed != true { t.Errorf("failed to close the underlying connection") @@ -197,7 +197,7 @@ func TestStream(t *testing.T) { srv := httptest.NewServer(h) defer srv.Close() - sw := startStreamWriter(&stats.FollowerStats{}, &fakeRaft{}) + sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{}) defer sw.stop() h.sw = sw @@ -207,8 +207,17 @@ func TestStream(t *testing.T) { if tt.t == streamTypeMsgApp { sr.updateMsgAppTerm(tt.term) } + // wait for stream to work + var writec chan<- raftpb.Message + for { + var ok bool + if writec, ok = sw.writec(); ok { + break + } + time.Sleep(time.Millisecond) + } - sw.msgc <- tt.m + writec <- tt.m var m raftpb.Message select { case m = <-tt.wc: