Merge pull request #2412 from xiang90/refactor-peer

rafhttp: refactor peer.go
release-2.1
Xiang Li 2015-03-02 16:24:35 -08:00
commit 24fbad7bd8
2 changed files with 31 additions and 21 deletions

View File

@ -31,6 +31,18 @@ const (
ConnWriteTimeout = 5 * time.Second
recvBufSize = 4096
streamApp = "streamMsgApp"
streamMsg = "streamMsg"
pipelineMsg = "pipeline"
)
var (
bufSizeMap = map[string]int{
streamApp: streamBufSize,
streamMsg: streamBufSize,
pipelineMsg: pipelineBufSize,
}
)
type Peer interface {
@ -104,12 +116,12 @@ func startPeer(tr http.RoundTripper, u string, local, to, cid types.ID, r Raft,
if paused {
continue
}
writec, name, size := p.pick(m)
writec, name := p.pick(m)
select {
case writec <- m:
default:
log.Printf("peer: dropping %s to %s since %s with %d-size buffer is blocked",
m.Type, p.id, name, size)
m.Type, p.id, name, bufSizeMap[name])
}
case mm := <-p.recvc:
if mm.Type == raftpb.MsgApp {
@ -194,22 +206,20 @@ func (p *peer) Stop() {
<-p.done
}
func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, name string, size int) {
// pick picks a chan for sending the given message. The picked chan and the picked chan
// string name are returned.
func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, picked string) {
switch {
// Considering MsgSnap may have a big size, e.g., 1G, and will block
// stream for a long time, only use one of the N pipelines to send MsgSnap.
case isMsgSnap(m):
writec = p.pipeline.msgc
name, size = "pipeline", pipelineBufSize
return p.pipeline.msgc, pipelineMsg
case p.msgAppWriter.isWorking() && canUseMsgAppStream(m):
writec = p.msgAppWriter.msgc
name, size = "msgapp stream", streamBufSize
return p.msgAppWriter.msgc, streamApp
case p.writer.isWorking():
writec = p.writer.msgc
name, size = "general stream", streamBufSize
return p.writer.msgc, streamMsg
default:
writec = p.pipeline.msgc
name, size = "pipeline", pipelineBufSize
return p.pipeline.msgc, pipelineMsg
}
return
}

View File

@ -30,47 +30,47 @@ func TestPeerPick(t *testing.T) {
{
true, true,
raftpb.Message{Type: raftpb.MsgSnap},
"pipeline",
pipelineMsg,
},
{
true, true,
raftpb.Message{Type: raftpb.MsgApp, Term: 1, LogTerm: 1},
"msgapp stream",
streamApp,
},
{
true, true,
raftpb.Message{Type: raftpb.MsgProp},
"general stream",
streamMsg,
},
{
true, true,
raftpb.Message{Type: raftpb.MsgHeartbeat},
"general stream",
streamMsg,
},
{
false, true,
raftpb.Message{Type: raftpb.MsgApp, Term: 1, LogTerm: 1},
"general stream",
streamMsg,
},
{
false, false,
raftpb.Message{Type: raftpb.MsgApp, Term: 1, LogTerm: 1},
"pipeline",
pipelineMsg,
},
{
false, false,
raftpb.Message{Type: raftpb.MsgProp},
"pipeline",
pipelineMsg,
},
{
false, false,
raftpb.Message{Type: raftpb.MsgSnap},
"pipeline",
pipelineMsg,
},
{
false, false,
raftpb.Message{Type: raftpb.MsgHeartbeat},
"pipeline",
pipelineMsg,
},
}
for i, tt := range tests {
@ -79,7 +79,7 @@ func TestPeerPick(t *testing.T) {
writer: &streamWriter{working: tt.messageWorking},
pipeline: &pipeline{},
}
_, picked, _ := peer.pick(tt.m)
_, picked := peer.pick(tt.m)
if picked != tt.wpicked {
t.Errorf("#%d: picked = %v, want %v", i, picked, tt.wpicked)
}