diff --git a/rafthttp/peer.go b/rafthttp/peer.go index a78503d9c..39c134851 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -31,6 +31,18 @@ const ( ConnWriteTimeout = 5 * time.Second recvBufSize = 4096 + + streamApp = "streamMsgApp" + streamMsg = "streamMsg" + pipelineMsg = "pipeline" +) + +var ( + bufSizeMap = map[string]int{ + streamApp: streamBufSize, + streamMsg: streamBufSize, + pipelineMsg: pipelineBufSize, + } ) type Peer interface { @@ -104,12 +116,12 @@ func startPeer(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft, if paused { continue } - writec, name, size := p.pick(m) + writec, name := p.pick(m) select { case writec <- m: default: log.Printf("peer: dropping %s to %s since %s with %d-size buffer is blocked", - m.Type, p.id, name, size) + m.Type, p.id, name, bufSizeMap[name]) } case mm := <-p.recvc: if mm.Type == raftpb.MsgApp { @@ -194,22 +206,20 @@ func (p *peer) Stop() { <-p.done } -func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, name string, size int) { +// pick picks a chan for sending the given message. The picked chan and the picked chan +// string name are returned. +func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, picked string) { switch { // Considering MsgSnap may have a big size, e.g., 1G, and will block // stream for a long time, only use one of the N pipelines to send MsgSnap. case isMsgSnap(m): - writec = p.pipeline.msgc - name, size = "pipeline", pipelineBufSize + return p.pipeline.msgc, pipelineMsg case p.msgAppWriter.isWorking() && canUseMsgAppStream(m): - writec = p.msgAppWriter.msgc - name, size = "msgapp stream", streamBufSize + return p.msgAppWriter.msgc, streamApp case p.writer.isWorking(): - writec = p.writer.msgc - name, size = "general stream", streamBufSize + return p.writer.msgc, streamMsg default: - writec = p.pipeline.msgc - name, size = "pipeline", pipelineBufSize + return p.pipeline.msgc, pipelineMsg } return } diff --git a/rafthttp/peer_test.go b/rafthttp/peer_test.go index 13ec2156a..b35d50c8e 100644 --- a/rafthttp/peer_test.go +++ b/rafthttp/peer_test.go @@ -30,47 +30,47 @@ func TestPeerPick(t *testing.T) { { true, true, raftpb.Message{Type: raftpb.MsgSnap}, - "pipeline", + pipelineMsg, }, { true, true, raftpb.Message{Type: raftpb.MsgApp, Term: 1, LogTerm: 1}, - "msgapp stream", + streamApp, }, { true, true, raftpb.Message{Type: raftpb.MsgProp}, - "general stream", + streamMsg, }, { true, true, raftpb.Message{Type: raftpb.MsgHeartbeat}, - "general stream", + streamMsg, }, { false, true, raftpb.Message{Type: raftpb.MsgApp, Term: 1, LogTerm: 1}, - "general stream", + streamMsg, }, { false, false, raftpb.Message{Type: raftpb.MsgApp, Term: 1, LogTerm: 1}, - "pipeline", + pipelineMsg, }, { false, false, raftpb.Message{Type: raftpb.MsgProp}, - "pipeline", + pipelineMsg, }, { false, false, raftpb.Message{Type: raftpb.MsgSnap}, - "pipeline", + pipelineMsg, }, { false, false, raftpb.Message{Type: raftpb.MsgHeartbeat}, - "pipeline", + pipelineMsg, }, } for i, tt := range tests { @@ -79,7 +79,7 @@ func TestPeerPick(t *testing.T) { writer: &streamWriter{working: tt.messageWorking}, pipeline: &pipeline{}, } - _, picked, _ := peer.pick(tt.m) + _, picked := peer.pick(tt.m) if picked != tt.wpicked { t.Errorf("#%d: picked = %v, want %v", i, picked, tt.wpicked) }