rafhttp: refactor func peer.pick in peer.go

release-2.1
Xiang Li 2015-03-02 14:34:02 -08:00
parent 4dd3be0f05
commit 88bde91716
2 changed files with 31 additions and 21 deletions

View File

@ -31,6 +31,18 @@ const (
ConnWriteTimeout = 5 * time.Second ConnWriteTimeout = 5 * time.Second
recvBufSize = 4096 recvBufSize = 4096
streamApp = "streamMsgApp"
streamMsg = "streamMsg"
pipelineMsg = "pipeline"
)
var (
bufSizeMap = map[string]int{
streamApp: streamBufSize,
streamMsg: streamBufSize,
pipelineMsg: pipelineBufSize,
}
) )
type Peer interface { type Peer interface {
@ -104,12 +116,12 @@ func startPeer(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft,
if paused { if paused {
continue continue
} }
writec, name, size := p.pick(m) writec, name := p.pick(m)
select { select {
case writec <- m: case writec <- m:
default: default:
log.Printf("peer: dropping %s to %s since %s with %d-size buffer is blocked", log.Printf("peer: dropping %s to %s since %s with %d-size buffer is blocked",
m.Type, p.id, name, size) m.Type, p.id, name, bufSizeMap[name])
} }
case mm := <-p.recvc: case mm := <-p.recvc:
if mm.Type == raftpb.MsgApp { if mm.Type == raftpb.MsgApp {
@ -194,22 +206,20 @@ func (p *peer) Stop() {
<-p.done <-p.done
} }
func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, name string, size int) { // pick picks a chan for sending the given message. The picked chan and the picked chan
// string name are returned.
func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, picked string) {
switch { switch {
// Considering MsgSnap may have a big size, e.g., 1G, and will block // Considering MsgSnap may have a big size, e.g., 1G, and will block
// stream for a long time, only use one of the N pipelines to send MsgSnap. // stream for a long time, only use one of the N pipelines to send MsgSnap.
case isMsgSnap(m): case isMsgSnap(m):
writec = p.pipeline.msgc return p.pipeline.msgc, pipelineMsg
name, size = "pipeline", pipelineBufSize
case p.msgAppWriter.isWorking() && canUseMsgAppStream(m): case p.msgAppWriter.isWorking() && canUseMsgAppStream(m):
writec = p.msgAppWriter.msgc return p.msgAppWriter.msgc, streamApp
name, size = "msgapp stream", streamBufSize
case p.writer.isWorking(): case p.writer.isWorking():
writec = p.writer.msgc return p.writer.msgc, streamMsg
name, size = "general stream", streamBufSize
default: default:
writec = p.pipeline.msgc return p.pipeline.msgc, pipelineMsg
name, size = "pipeline", pipelineBufSize
} }
return return
} }

View File

@ -30,47 +30,47 @@ func TestPeerPick(t *testing.T) {
{ {
true, true, true, true,
raftpb.Message{Type: raftpb.MsgSnap}, raftpb.Message{Type: raftpb.MsgSnap},
"pipeline", pipelineMsg,
}, },
{ {
true, true, true, true,
raftpb.Message{Type: raftpb.MsgApp, Term: 1, LogTerm: 1}, raftpb.Message{Type: raftpb.MsgApp, Term: 1, LogTerm: 1},
"msgapp stream", streamApp,
}, },
{ {
true, true, true, true,
raftpb.Message{Type: raftpb.MsgProp}, raftpb.Message{Type: raftpb.MsgProp},
"general stream", streamMsg,
}, },
{ {
true, true, true, true,
raftpb.Message{Type: raftpb.MsgHeartbeat}, raftpb.Message{Type: raftpb.MsgHeartbeat},
"general stream", streamMsg,
}, },
{ {
false, true, false, true,
raftpb.Message{Type: raftpb.MsgApp, Term: 1, LogTerm: 1}, raftpb.Message{Type: raftpb.MsgApp, Term: 1, LogTerm: 1},
"general stream", streamMsg,
}, },
{ {
false, false, false, false,
raftpb.Message{Type: raftpb.MsgApp, Term: 1, LogTerm: 1}, raftpb.Message{Type: raftpb.MsgApp, Term: 1, LogTerm: 1},
"pipeline", pipelineMsg,
}, },
{ {
false, false, false, false,
raftpb.Message{Type: raftpb.MsgProp}, raftpb.Message{Type: raftpb.MsgProp},
"pipeline", pipelineMsg,
}, },
{ {
false, false, false, false,
raftpb.Message{Type: raftpb.MsgSnap}, raftpb.Message{Type: raftpb.MsgSnap},
"pipeline", pipelineMsg,
}, },
{ {
false, false, false, false,
raftpb.Message{Type: raftpb.MsgHeartbeat}, raftpb.Message{Type: raftpb.MsgHeartbeat},
"pipeline", pipelineMsg,
}, },
} }
for i, tt := range tests { for i, tt := range tests {
@ -79,7 +79,7 @@ func TestPeerPick(t *testing.T) {
writer: &streamWriter{working: tt.messageWorking}, writer: &streamWriter{working: tt.messageWorking},
pipeline: &pipeline{}, pipeline: &pipeline{},
} }
_, picked, _ := peer.pick(tt.m) _, picked := peer.pick(tt.m)
if picked != tt.wpicked { if picked != tt.wpicked {
t.Errorf("#%d: picked = %v, want %v", i, picked, tt.wpicked) t.Errorf("#%d: picked = %v, want %v", i, picked, tt.wpicked)
} }