rafthttp: add batcher

After we enable streaming, there will be a large amount of msgAppResp due to high
rate msgApp. We should batch msgAppResp in a meanful way.
release-2.0
Xiang Li 2014-11-21 16:42:12 -08:00
parent 66c30f28d6
commit 91bfead9e9
3 changed files with 113 additions and 1 deletions

41
rafthttp/batcher.go Normal file
View File

@ -0,0 +1,41 @@
package rafthttp
import (
"time"
"github.com/coreos/etcd/raft/raftpb"
)
type Batcher struct {
batchedN int
batchedT time.Time
batchN int
batchD time.Duration
}
func NewBatcher(n int, d time.Duration) *Batcher {
return &Batcher{
batchN: n,
batchD: d,
batchedT: time.Now(),
}
}
func (b *Batcher) ShouldBatch(now time.Time) bool {
b.batchedN++
batchedD := now.Sub(b.batchedT)
if b.batchedN >= b.batchN || batchedD >= b.batchD {
b.Reset(now)
return false
}
return true
}
func (b *Batcher) Reset(t time.Time) {
b.batchedN = 0
b.batchedT = t
}
func canBatch(m raftpb.Message) bool {
return m.Type == raftpb.MsgAppResp
}

61
rafthttp/batcher_test.go Normal file
View File

@ -0,0 +1,61 @@
package rafthttp
import (
"testing"
"time"
)
func TestBatcherNum(t *testing.T) {
n := 100
largeD := time.Minute
tests := []struct {
n int
wnotbatch int
}{
{n - 1, 0},
{n, 1},
{n + 1, 1},
{2*n + 1, 2},
{3*n + 1, 3},
}
for i, tt := range tests {
b := NewBatcher(n, largeD)
notbatched := 0
for j := 0; j < tt.n; j++ {
if !b.ShouldBatch(time.Now()) {
notbatched++
}
}
if notbatched != tt.wnotbatch {
t.Errorf("#%d: notbatched = %d, want %d", i, notbatched, tt.wnotbatch)
}
}
}
func TestBatcherTime(t *testing.T) {
largeN := 10000
tests := []struct {
nms int
wnotbatch int
}{
{0, 0},
{1, 1},
{2, 2},
{3, 3},
}
for i, tt := range tests {
b := NewBatcher(largeN, time.Millisecond)
baseT := b.batchedT
notbatched := 0
for j := 0; j < tt.nms+1; j++ {
if !b.ShouldBatch(baseT.Add(time.Duration(j) * time.Millisecond)) {
notbatched++
}
}
if notbatched != tt.wnotbatch {
t.Errorf("#%d: notbatched = %d, want %d", i, notbatched, tt.wnotbatch)
}
}
}

View File

@ -33,6 +33,8 @@ import (
const (
connPerSender = 4
senderBufSize = connPerSender * 4
appRespBatchMs = 50
)
type Sender interface {
@ -56,6 +58,7 @@ func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *st
p: p,
fs: fs,
shouldstop: shouldstop,
batcher: NewBatcher(100, appRespBatchMs*time.Millisecond),
q: make(chan []byte, senderBufSize),
}
s.wg.Add(connPerSender)
@ -74,6 +77,7 @@ type sender struct {
shouldstop chan struct{}
strmCln *streamClient
batcher *Batcher
strmSrv *streamServer
strmSrvMu sync.Mutex
q chan []byte
@ -106,8 +110,14 @@ func (s *sender) Update(u string) {
// TODO (xiangli): reasonable retry logic
func (s *sender) Send(m raftpb.Message) error {
s.maybeStopStream(m.Term)
if !s.hasStreamClient() && shouldInitStream(m) {
if shouldInitStream(m) && !s.hasStreamClient() {
s.initStream(types.ID(m.From), types.ID(m.To), m.Term)
s.batcher.Reset(time.Now())
}
if canBatch(m) && s.hasStreamClient() {
if s.batcher.ShouldBatch(time.Now()) {
return nil
}
}
if canUseStream(m) {
if ok := s.tryStream(m); ok {