From 91bfead9e922cb51efee7d3cea27fc838545b194 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 21 Nov 2014 16:42:12 -0800 Subject: [PATCH] rafthttp: add batcher After we enable streaming, there will be a large amount of msgAppResp due to high rate msgApp. We should batch msgAppResp in a meanful way. --- rafthttp/batcher.go | 41 +++++++++++++++++++++++++++ rafthttp/batcher_test.go | 61 ++++++++++++++++++++++++++++++++++++++++ rafthttp/sender.go | 12 +++++++- 3 files changed, 113 insertions(+), 1 deletion(-) create mode 100644 rafthttp/batcher.go create mode 100644 rafthttp/batcher_test.go diff --git a/rafthttp/batcher.go b/rafthttp/batcher.go new file mode 100644 index 000000000..4f2334341 --- /dev/null +++ b/rafthttp/batcher.go @@ -0,0 +1,41 @@ +package rafthttp + +import ( + "time" + + "github.com/coreos/etcd/raft/raftpb" +) + +type Batcher struct { + batchedN int + batchedT time.Time + batchN int + batchD time.Duration +} + +func NewBatcher(n int, d time.Duration) *Batcher { + return &Batcher{ + batchN: n, + batchD: d, + batchedT: time.Now(), + } +} + +func (b *Batcher) ShouldBatch(now time.Time) bool { + b.batchedN++ + batchedD := now.Sub(b.batchedT) + if b.batchedN >= b.batchN || batchedD >= b.batchD { + b.Reset(now) + return false + } + return true +} + +func (b *Batcher) Reset(t time.Time) { + b.batchedN = 0 + b.batchedT = t +} + +func canBatch(m raftpb.Message) bool { + return m.Type == raftpb.MsgAppResp +} diff --git a/rafthttp/batcher_test.go b/rafthttp/batcher_test.go new file mode 100644 index 000000000..90635d599 --- /dev/null +++ b/rafthttp/batcher_test.go @@ -0,0 +1,61 @@ +package rafthttp + +import ( + "testing" + "time" +) + +func TestBatcherNum(t *testing.T) { + n := 100 + largeD := time.Minute + tests := []struct { + n int + wnotbatch int + }{ + {n - 1, 0}, + {n, 1}, + {n + 1, 1}, + {2*n + 1, 2}, + {3*n + 1, 3}, + } + + for i, tt := range tests { + b := NewBatcher(n, largeD) + notbatched := 0 + for j := 0; j < tt.n; j++ { + if !b.ShouldBatch(time.Now()) { + notbatched++ + } + } + if notbatched != tt.wnotbatch { + t.Errorf("#%d: notbatched = %d, want %d", i, notbatched, tt.wnotbatch) + } + } +} + +func TestBatcherTime(t *testing.T) { + largeN := 10000 + tests := []struct { + nms int + wnotbatch int + }{ + {0, 0}, + {1, 1}, + {2, 2}, + {3, 3}, + } + + for i, tt := range tests { + b := NewBatcher(largeN, time.Millisecond) + baseT := b.batchedT + notbatched := 0 + for j := 0; j < tt.nms+1; j++ { + if !b.ShouldBatch(baseT.Add(time.Duration(j) * time.Millisecond)) { + notbatched++ + } + } + if notbatched != tt.wnotbatch { + t.Errorf("#%d: notbatched = %d, want %d", i, notbatched, tt.wnotbatch) + } + } +} diff --git a/rafthttp/sender.go b/rafthttp/sender.go index 995abf78c..6ed909338 100644 --- a/rafthttp/sender.go +++ b/rafthttp/sender.go @@ -33,6 +33,8 @@ import ( const ( connPerSender = 4 senderBufSize = connPerSender * 4 + + appRespBatchMs = 50 ) type Sender interface { @@ -56,6 +58,7 @@ func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *st p: p, fs: fs, shouldstop: shouldstop, + batcher: NewBatcher(100, appRespBatchMs*time.Millisecond), q: make(chan []byte, senderBufSize), } s.wg.Add(connPerSender) @@ -74,6 +77,7 @@ type sender struct { shouldstop chan struct{} strmCln *streamClient + batcher *Batcher strmSrv *streamServer strmSrvMu sync.Mutex q chan []byte @@ -106,8 +110,14 @@ func (s *sender) Update(u string) { // TODO (xiangli): reasonable retry logic func (s *sender) Send(m raftpb.Message) error { s.maybeStopStream(m.Term) - if !s.hasStreamClient() && shouldInitStream(m) { + if shouldInitStream(m) && !s.hasStreamClient() { s.initStream(types.ID(m.From), types.ID(m.To), m.Term) + s.batcher.Reset(time.Now()) + } + if canBatch(m) && s.hasStreamClient() { + if s.batcher.ShouldBatch(time.Now()) { + return nil + } } if canUseStream(m) { if ok := s.tryStream(m); ok {