diff --git a/CHANGELOG/CHANGELOG-3.6.md b/CHANGELOG/CHANGELOG-3.6.md index 8d53c51cb..e51c81985 100644 --- a/CHANGELOG/CHANGELOG-3.6.md +++ b/CHANGELOG/CHANGELOG-3.6.md @@ -46,6 +46,10 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0). - Package `wal` was moved to `storage/wal` - Package `datadir` was moved to `storage/datadir` +### Package `raft` +- Send empty `MsgApp` when entry in-flight limits are exceeded. See [pull/14633](https://github.com/etcd-io/etcd/pull/14633). +- Add [MaxInflightBytes](https://github.com/etcd-io/etcd/pull/14624) setting in `raft.Config` for better flow control of entries. + ### etcd server - Add [`etcd --log-format`](https://github.com/etcd-io/etcd/pull/13339) flag to support log format. diff --git a/raft/confchange/confchange.go b/raft/confchange/confchange.go index dddbcc9d9..bc60abf7f 100644 --- a/raft/confchange/confchange.go +++ b/raft/confchange/confchange.go @@ -265,7 +265,7 @@ func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id u // making the first index the better choice). Next: c.LastIndex, Match: 0, - Inflights: tracker.NewInflights(c.Tracker.MaxInflight), + Inflights: tracker.NewInflights(c.Tracker.MaxInflight, c.Tracker.MaxInflightBytes), IsLearner: isLearner, // When a node is first added, we should mark it as recently active. // Otherwise, CheckQuorum may cause us to step down if it is invoked diff --git a/raft/confchange/datadriven_test.go b/raft/confchange/datadriven_test.go index ab1524091..f179f1f43 100644 --- a/raft/confchange/datadriven_test.go +++ b/raft/confchange/datadriven_test.go @@ -28,7 +28,7 @@ import ( func TestConfChangeDataDriven(t *testing.T) { datadriven.Walk(t, "testdata", func(t *testing.T, path string) { - tr := tracker.MakeProgressTracker(10) + tr := tracker.MakeProgressTracker(10, 0) c := Changer{ Tracker: tr, LastIndex: 0, // incremented in this test with each cmd diff --git a/raft/confchange/quick_test.go b/raft/confchange/quick_test.go index 16d72c199..76018f634 100644 --- a/raft/confchange/quick_test.go +++ b/raft/confchange/quick_test.go @@ -89,7 +89,7 @@ func TestConfChangeQuick(t *testing.T) { wrapper := func(invoke testFunc) func(setup initialChanges, ccs confChanges) (*Changer, error) { return func(setup initialChanges, ccs confChanges) (*Changer, error) { - tr := tracker.MakeProgressTracker(10) + tr := tracker.MakeProgressTracker(10, 0) c := &Changer{ Tracker: tr, LastIndex: 10, diff --git a/raft/confchange/restore_test.go b/raft/confchange/restore_test.go index 50712c794..ec45e5144 100644 --- a/raft/confchange/restore_test.go +++ b/raft/confchange/restore_test.go @@ -86,7 +86,7 @@ func TestRestore(t *testing.T) { f := func(cs pb.ConfState) bool { chg := Changer{ - Tracker: tracker.MakeProgressTracker(20), + Tracker: tracker.MakeProgressTracker(20, 0), LastIndex: 10, } cfg, prs, err := Restore(chg, cs) diff --git a/raft/raft.go b/raft/raft.go index 5b3139196..180a96e93 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -160,6 +160,16 @@ type Config struct { // overflowing that sending buffer. TODO (xiangli): feedback to application to // limit the proposal rate? MaxInflightMsgs int + // MaxInflightBytes limits the number of in-flight bytes in append messages. + // Complements MaxInflightMsgs. Ignored if zero. + // + // This effectively bounds the bandwidth-delay product. Note that especially + // in high-latency deployments setting this too low can lead to a dramatic + // reduction in throughput. For example, with a peer that has a round-trip + // latency of 100ms to the leader and this setting is set to 1 MB, there is a + // throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops + // to 2.5 MB/s. See Little's law to understand the maths behind. + MaxInflightBytes uint64 // CheckQuorum specifies if the leader should check quorum activity. Leader // steps down when quorum is not active for an electionTimeout. @@ -228,6 +238,11 @@ func (c *Config) validate() error { if c.MaxInflightMsgs <= 0 { return errors.New("max inflight messages must be greater than 0") } + if c.MaxInflightBytes == 0 { + c.MaxInflightBytes = noLimit + } else if c.MaxInflightBytes < c.MaxSizePerMsg { + return errors.New("max inflight bytes must be >= max message size") + } if c.Logger == nil { c.Logger = getLogger() @@ -332,7 +347,7 @@ func newRaft(c *Config) *raft { raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxUncommittedSize: c.MaxUncommittedEntriesSize, - prs: tracker.MakeProgressTracker(c.MaxInflightMsgs), + prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes), electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, @@ -484,7 +499,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { // Send the actual MsgApp otherwise, and update the progress accordingly. next := pr.Next // save Next for later, as the progress update can change it - if err := pr.UpdateOnEntriesSend(len(ents), next); err != nil { + if err := pr.UpdateOnEntriesSend(len(ents), payloadsSize(ents), next); err != nil { r.logger.Panicf("%x: %v", r.id, err) } r.send(pb.Message{ @@ -629,7 +644,7 @@ func (r *raft) reset(term uint64) { *pr = tracker.Progress{ Match: 0, Next: r.raftLog.lastIndex() + 1, - Inflights: tracker.NewInflights(r.prs.MaxInflight), + Inflights: tracker.NewInflights(r.prs.MaxInflight, r.prs.MaxInflightBytes), IsLearner: pr.IsLearner, } if id == r.id { @@ -1618,7 +1633,7 @@ func (r *raft) restore(s pb.Snapshot) bool { r.raftLog.restore(s) // Reset the configuration and add the (potentially updated) peers in anew. - r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) + r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight, r.prs.MaxInflightBytes) cfg, prs, err := confchange.Restore(confchange.Changer{ Tracker: r.prs, LastIndex: r.raftLog.lastIndex(), @@ -1789,11 +1804,7 @@ func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Messa // Empty payloads are never refused. This is used both for appending an empty // entry at a new leader's term, as well as leaving a joint configuration. func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { - var s uint64 - for _, e := range ents { - s += uint64(PayloadSize(e)) - } - + s := payloadsSize(ents) if r.uncommittedSize > 0 && s > 0 && r.uncommittedSize+s > r.maxUncommittedSize { // If the uncommitted tail of the Raft log is empty, allow any size // proposal. Otherwise, limit the size of the uncommitted tail of the @@ -1815,12 +1826,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) { // Fast-path for followers, who do not track or enforce the limit. return } - - var s uint64 - for _, e := range ents { - s += uint64(PayloadSize(e)) - } - if s > r.uncommittedSize { + if s := payloadsSize(ents); s > r.uncommittedSize { // uncommittedSize may underestimate the size of the uncommitted Raft // log tail but will never overestimate it. Saturate at 0 instead of // allowing overflow. @@ -1830,6 +1836,14 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) { } } +func payloadsSize(ents []pb.Entry) uint64 { + var s uint64 + for _, e := range ents { + s += uint64(PayloadSize(e)) + } + return s +} + func numOfPendingConf(ents []pb.Entry) int { n := 0 for i := range ents { diff --git a/raft/raft_test.go b/raft/raft_test.go index 95408976b..6563b1748 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -130,6 +130,7 @@ func TestProgressFlowControl(t *testing.T) { cfg := newTestConfig(1, 5, 1, newTestMemoryStorage(withPeers(1, 2))) cfg.MaxInflightMsgs = 3 cfg.MaxSizePerMsg = 2048 + cfg.MaxInflightBytes = 9000 // A little over MaxInflightMsgs * MaxSizePerMsg. r := newRaft(cfg) r.becomeCandidate() r.becomeLeader() @@ -140,7 +141,12 @@ func TestProgressFlowControl(t *testing.T) { // While node 2 is in probe state, propose a bunch of entries. r.prs.Progress[2].BecomeProbe() blob := []byte(strings.Repeat("a", 1000)) - for i := 0; i < 10; i++ { + large := []byte(strings.Repeat("b", 5000)) + for i := 0; i < 22; i++ { + blob := blob + if i >= 10 && i < 16 { // Temporarily send large messages. + blob = large + } r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}}) } @@ -158,40 +164,40 @@ func TestProgressFlowControl(t *testing.T) { t.Fatalf("unexpected entry sizes: %v", ms[0].Entries) } - // When this append is acked, we change to replicate state and can - // send multiple messages at once. - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[0].Entries[1].Index}) - ms = r.readMessages() - if len(ms) != 3 { - t.Fatalf("expected 3 messages, got %d", len(ms)) - } - for i, m := range ms { - if m.Type != pb.MsgApp { - t.Errorf("%d: expected MsgApp, got %s", i, m.Type) + ackAndVerify := func(index uint64, expEntries ...int) uint64 { + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: index}) + ms := r.readMessages() + if got, want := len(ms), len(expEntries); got != want { + t.Fatalf("expected %d messages, got %d", want, got) } - if len(m.Entries) != 2 { - t.Errorf("%d: expected 2 entries, got %d", i, len(m.Entries)) + for i, m := range ms { + if got, want := m.Type, pb.MsgApp; got != want { + t.Errorf("%d: expected MsgApp, got %s", i, got) + } + if got, want := len(m.Entries), expEntries[i]; got != want { + t.Errorf("%d: expected %d entries, got %d", i, want, got) + } } + last := ms[len(ms)-1].Entries + if len(last) == 0 { + return index + } + return last[len(last)-1].Index } - // Ack all three of those messages together and get the last two - // messages (containing three entries). - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[2].Entries[1].Index}) - ms = r.readMessages() - if len(ms) != 2 { - t.Fatalf("expected 2 messages, got %d", len(ms)) - } - for i, m := range ms { - if m.Type != pb.MsgApp { - t.Errorf("%d: expected MsgApp, got %s", i, m.Type) - } - } - if len(ms[0].Entries) != 2 { - t.Errorf("%d: expected 2 entries, got %d", 0, len(ms[0].Entries)) - } - if len(ms[1].Entries) != 1 { - t.Errorf("%d: expected 1 entry, got %d", 1, len(ms[1].Entries)) - } + // When this append is acked, we change to replicate state and can + // send multiple messages at once. + index := ackAndVerify(ms[0].Entries[1].Index, 2, 2, 2) + // Ack all three of those messages together and get another 3 messages. The + // third message contains a single large entry, in contrast to 2 before. + index = ackAndVerify(index, 2, 1, 1) + // All subsequent messages contain one large entry, and we cap at 2 messages + // because it overflows MaxInflightBytes. + index = ackAndVerify(index, 1, 1) + index = ackAndVerify(index, 1, 1) + // Start getting small messages again. + index = ackAndVerify(index, 1, 2, 2) + ackAndVerify(index, 2) } func TestUncommittedEntryLimit(t *testing.T) { @@ -4706,7 +4712,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw learners[i] = true } v.id = id - v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight) + v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight, v.prs.MaxInflightBytes) if len(learners) > 0 { v.prs.Learners = map[uint64]struct{}{} } diff --git a/raft/tracker/inflights.go b/raft/tracker/inflights.go index 242d1cab1..350728aec 100644 --- a/raft/tracker/inflights.go +++ b/raft/tracker/inflights.go @@ -14,6 +14,12 @@ package tracker +// inflight describes an in-flight MsgApp message. +type inflight struct { + index uint64 // the index of the last entry inside the message + bytes uint64 // the total byte size of the entries in the message +} + // Inflights limits the number of MsgApp (represented by the largest index // contained within) sent to followers but not yet acknowledged by them. Callers // use Full() to check whether more messages can be sent, call Add() whenever @@ -22,21 +28,25 @@ package tracker type Inflights struct { // the starting index in the buffer start int - // number of inflights in the buffer - count int - // the size of the buffer - size int + count int // number of inflight messages in the buffer + bytes uint64 // number of inflight bytes - // buffer contains the index of the last entry - // inside one message. - buffer []uint64 + size int // the max number of inflight messages + maxBytes uint64 // the max total byte size of inflight messages + + // buffer is a ring buffer containing info about all in-flight messages. + buffer []inflight } -// NewInflights sets up an Inflights that allows up to 'size' inflight messages. -func NewInflights(size int) *Inflights { +// NewInflights sets up an Inflights that allows up to size inflight messages, +// with the total byte size up to maxBytes. If maxBytes is 0 then there is no +// byte size limit. The maxBytes limit is soft, i.e. we accept a single message +// that brings it from size < maxBytes to size >= maxBytes. +func NewInflights(size int, maxBytes uint64) *Inflights { return &Inflights{ - size: size, + size: size, + maxBytes: maxBytes, } } @@ -44,15 +54,15 @@ func NewInflights(size int) *Inflights { // the receiver. func (in *Inflights) Clone() *Inflights { ins := *in - ins.buffer = append([]uint64(nil), in.buffer...) + ins.buffer = append([]inflight(nil), in.buffer...) return &ins } -// Add notifies the Inflights that a new message with the given index is being -// dispatched. Full() must be called prior to Add() to verify that there is room -// for one more message, and consecutive calls to add Add() must provide a -// monotonic sequence of indexes. -func (in *Inflights) Add(inflight uint64) { +// Add notifies the Inflights that a new message with the given index and byte +// size is being dispatched. Full() must be called prior to Add() to verify that +// there is room for one more message, and consecutive calls to Add() must +// provide a monotonic sequence of indexes. +func (in *Inflights) Add(index, bytes uint64) { if in.Full() { panic("cannot add into a Full inflights") } @@ -64,8 +74,9 @@ func (in *Inflights) Add(inflight uint64) { if next >= len(in.buffer) { in.grow() } - in.buffer[next] = inflight + in.buffer[next] = inflight{index: index, bytes: bytes} in.count++ + in.bytes += bytes } // grow the inflight buffer by doubling up to inflights.size. We grow on demand @@ -78,24 +89,26 @@ func (in *Inflights) grow() { } else if newSize > in.size { newSize = in.size } - newBuffer := make([]uint64, newSize) + newBuffer := make([]inflight, newSize) copy(newBuffer, in.buffer) in.buffer = newBuffer } // FreeLE frees the inflights smaller or equal to the given `to` flight. func (in *Inflights) FreeLE(to uint64) { - if in.count == 0 || to < in.buffer[in.start] { + if in.count == 0 || to < in.buffer[in.start].index { // out of the left side of the window return } idx := in.start var i int + var bytes uint64 for i = 0; i < in.count; i++ { - if to < in.buffer[idx] { // found the first large inflight + if to < in.buffer[idx].index { // found the first large inflight break } + bytes += in.buffer[idx].bytes // increase index and maybe rotate size := in.size @@ -105,6 +118,7 @@ func (in *Inflights) FreeLE(to uint64) { } // free i inflights and set new start index in.count -= i + in.bytes -= bytes in.start = idx if in.count == 0 { // inflights is empty, reset the start index so that we don't grow the @@ -115,7 +129,7 @@ func (in *Inflights) FreeLE(to uint64) { // Full returns true if no more messages can be sent at the moment. func (in *Inflights) Full() bool { - return in.count == in.size + return in.count == in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes) } // Count returns the number of inflight messages. diff --git a/raft/tracker/inflights_test.go b/raft/tracker/inflights_test.go index fe2a1b564..3514220df 100644 --- a/raft/tracker/inflights_test.go +++ b/raft/tracker/inflights_test.go @@ -24,32 +24,38 @@ func TestInflightsAdd(t *testing.T) { // no rotating case in := &Inflights{ size: 10, - buffer: make([]uint64, 10), + buffer: make([]inflight, 10), } for i := 0; i < 5; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } wantIn := &Inflights{ start: 0, count: 5, + bytes: 510, size: 10, - // ↓------------ - buffer: []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0}, + buffer: inflightsBuffer( + // ↓------------ + []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0}, + []uint64{100, 101, 102, 103, 104, 0, 0, 0, 0, 0}), } require.Equal(t, wantIn, in) for i := 5; i < 10; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } wantIn2 := &Inflights{ start: 0, count: 10, + bytes: 1045, size: 10, - // ↓--------------------------- - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓--------------------------- + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn2, in) @@ -57,41 +63,47 @@ func TestInflightsAdd(t *testing.T) { in2 := &Inflights{ start: 5, size: 10, - buffer: make([]uint64, 10), + buffer: make([]inflight, 10), } for i := 0; i < 5; i++ { - in2.Add(uint64(i)) + in2.Add(uint64(i), uint64(100+i)) } wantIn21 := &Inflights{ start: 5, count: 5, + bytes: 510, size: 10, - // ↓------------ - buffer: []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4}, + buffer: inflightsBuffer( + // ↓------------ + []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4}, + []uint64{0, 0, 0, 0, 0, 100, 101, 102, 103, 104}), } require.Equal(t, wantIn21, in2) for i := 5; i < 10; i++ { - in2.Add(uint64(i)) + in2.Add(uint64(i), uint64(100+i)) } wantIn22 := &Inflights{ start: 5, count: 10, + bytes: 1045, size: 10, - // -------------- ↓------------ - buffer: []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4}, + buffer: inflightsBuffer( + // -------------- ↓------------ + []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4}, + []uint64{105, 106, 107, 108, 109, 100, 101, 102, 103, 104}), } require.Equal(t, wantIn22, in2) } func TestInflightFreeTo(t *testing.T) { // no rotating case - in := NewInflights(10) + in := NewInflights(10, 0) for i := 0; i < 10; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } in.FreeLE(0) @@ -99,9 +111,12 @@ func TestInflightFreeTo(t *testing.T) { wantIn0 := &Inflights{ start: 1, count: 9, + bytes: 945, size: 10, - // ↓------------------------ - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓------------------------ + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn0, in) @@ -110,9 +125,12 @@ func TestInflightFreeTo(t *testing.T) { wantIn := &Inflights{ start: 5, count: 5, + bytes: 535, size: 10, - // ↓------------ - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓------------ + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn, in) @@ -121,15 +139,18 @@ func TestInflightFreeTo(t *testing.T) { wantIn2 := &Inflights{ start: 9, count: 1, + bytes: 109, size: 10, - // ↓ - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓ + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn2, in) // rotating case for i := 10; i < 15; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } in.FreeLE(12) @@ -137,9 +158,12 @@ func TestInflightFreeTo(t *testing.T) { wantIn3 := &Inflights{ start: 3, count: 2, + bytes: 227, size: 10, - // ↓----- - buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓----- + []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + []uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn3, in) @@ -149,8 +173,67 @@ func TestInflightFreeTo(t *testing.T) { start: 0, count: 0, size: 10, - // ↓ - buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓ + []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + []uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn4, in) } + +func TestInflightsFull(t *testing.T) { + for _, tc := range []struct { + name string + size int + maxBytes uint64 + fullAt int + freeLE uint64 + againAt int + }{ + {name: "always-full", size: 0, fullAt: 0}, + {name: "single-entry", size: 1, fullAt: 1, freeLE: 1, againAt: 2}, + {name: "single-entry-overflow", size: 1, maxBytes: 10, fullAt: 1, freeLE: 1, againAt: 2}, + {name: "multi-entry", size: 15, fullAt: 15, freeLE: 6, againAt: 22}, + {name: "slight-overflow", size: 8, maxBytes: 400, fullAt: 4, freeLE: 2, againAt: 7}, + {name: "exact-max-bytes", size: 8, maxBytes: 406, fullAt: 4, freeLE: 3, againAt: 8}, + {name: "larger-overflow", size: 15, maxBytes: 408, fullAt: 5, freeLE: 1, againAt: 6}, + } { + t.Run(tc.name, func(t *testing.T) { + in := NewInflights(tc.size, tc.maxBytes) + + addUntilFull := func(begin, end int) { + for i := begin; i < end; i++ { + if in.Full() { + t.Fatalf("full at %d, want %d", i, end) + } + in.Add(uint64(i), uint64(100+i)) + } + if !in.Full() { + t.Fatalf("not full at %d", end) + } + } + + addUntilFull(0, tc.fullAt) + in.FreeLE(tc.freeLE) + addUntilFull(tc.fullAt, tc.againAt) + + defer func() { + if r := recover(); r == nil { + t.Errorf("Add() did not panic") + } + }() + in.Add(100, 1024) + }) + } +} + +func inflightsBuffer(indices []uint64, sizes []uint64) []inflight { + if len(indices) != len(sizes) { + panic("len(indices) != len(sizes)") + } + buffer := make([]inflight, 0, len(indices)) + for i, idx := range indices { + buffer = append(buffer, inflight{index: idx, bytes: sizes[i]}) + } + return buffer +} diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index c6272d22d..f4e1e07d8 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -134,14 +134,15 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) { } // UpdateOnEntriesSend updates the progress on the given number of consecutive -// entries being sent in a MsgApp, appended at and after the given log index. -func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error { +// entries being sent in a MsgApp, with the given total bytes size, appended at +// and after the given log index. +func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) error { switch pr.State { case StateReplicate: if entries > 0 { last := nextIndex + uint64(entries) - 1 pr.OptimisticUpdate(last) - pr.Inflights.Add(last) + pr.Inflights.Add(last, bytes) } // If this message overflows the in-flights tracker, or it was already full, // consider this message being a probe, so that the flow is paused. diff --git a/raft/tracker/progress_test.go b/raft/tracker/progress_test.go index 974c383f0..49dedb536 100644 --- a/raft/tracker/progress_test.go +++ b/raft/tracker/progress_test.go @@ -21,8 +21,8 @@ import ( ) func TestProgressString(t *testing.T) { - ins := NewInflights(1) - ins.Add(123) + ins := NewInflights(1, 0) + ins.Add(123, 1) pr := &Progress{ Match: 1, Next: 2, @@ -55,7 +55,7 @@ func TestProgressIsPaused(t *testing.T) { p := &Progress{ State: tt.state, MsgAppFlowPaused: tt.paused, - Inflights: NewInflights(256), + Inflights: NewInflights(256, 0), } assert.Equal(t, tt.w, p.IsPaused(), i) } @@ -82,17 +82,17 @@ func TestProgressBecomeProbe(t *testing.T) { wnext uint64 }{ { - &Progress{State: StateReplicate, Match: match, Next: 5, Inflights: NewInflights(256)}, + &Progress{State: StateReplicate, Match: match, Next: 5, Inflights: NewInflights(256, 0)}, 2, }, { // snapshot finish - &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256)}, + &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256, 0)}, 11, }, { // snapshot failure - &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256)}, + &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256, 0)}, 2, }, } @@ -105,7 +105,7 @@ func TestProgressBecomeProbe(t *testing.T) { } func TestProgressBecomeReplicate(t *testing.T) { - p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)} + p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256, 0)} p.BecomeReplicate() assert.Equal(t, StateReplicate, p.State) assert.Equal(t, uint64(1), p.Match) @@ -113,7 +113,7 @@ func TestProgressBecomeReplicate(t *testing.T) { } func TestProgressBecomeSnapshot(t *testing.T) { - p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)} + p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256, 0)} p.BecomeSnapshot(10) assert.Equal(t, StateSnapshot, p.State) assert.Equal(t, uint64(1), p.Match) diff --git a/raft/tracker/tracker.go b/raft/tracker/tracker.go index 72dcc73b8..938b7878c 100644 --- a/raft/tracker/tracker.go +++ b/raft/tracker/tracker.go @@ -121,13 +121,15 @@ type ProgressTracker struct { Votes map[uint64]bool - MaxInflight int + MaxInflight int + MaxInflightBytes uint64 } // MakeProgressTracker initializes a ProgressTracker. -func MakeProgressTracker(maxInflight int) ProgressTracker { +func MakeProgressTracker(maxInflight int, maxBytes uint64) ProgressTracker { p := ProgressTracker{ - MaxInflight: maxInflight, + MaxInflight: maxInflight, + MaxInflightBytes: maxBytes, Config: Config{ Voters: quorum.JointConfig{ quorum.MajorityConfig{},