Merge pull request #14624 from pavelkalinnikov/limit_inflight_bytes

dependabot/go_modules/go.uber.org/atomic-1.10.0
Tobias Grieger 2022-11-14 12:10:13 +01:00 committed by GitHub
commit e64d644989
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 237 additions and 113 deletions

View File

@ -46,6 +46,10 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
- Package `wal` was moved to `storage/wal`
- Package `datadir` was moved to `storage/datadir`
### Package `raft`
- Send empty `MsgApp` when entry in-flight limits are exceeded. See [pull/14633](https://github.com/etcd-io/etcd/pull/14633).
- Add [MaxInflightBytes](https://github.com/etcd-io/etcd/pull/14624) setting in `raft.Config` for better flow control of entries.
### etcd server
- Add [`etcd --log-format`](https://github.com/etcd-io/etcd/pull/13339) flag to support log format.

View File

@ -265,7 +265,7 @@ func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id u
// making the first index the better choice).
Next: c.LastIndex,
Match: 0,
Inflights: tracker.NewInflights(c.Tracker.MaxInflight),
Inflights: tracker.NewInflights(c.Tracker.MaxInflight, c.Tracker.MaxInflightBytes),
IsLearner: isLearner,
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked

View File

@ -28,7 +28,7 @@ import (
func TestConfChangeDataDriven(t *testing.T) {
datadriven.Walk(t, "testdata", func(t *testing.T, path string) {
tr := tracker.MakeProgressTracker(10)
tr := tracker.MakeProgressTracker(10, 0)
c := Changer{
Tracker: tr,
LastIndex: 0, // incremented in this test with each cmd

View File

@ -89,7 +89,7 @@ func TestConfChangeQuick(t *testing.T) {
wrapper := func(invoke testFunc) func(setup initialChanges, ccs confChanges) (*Changer, error) {
return func(setup initialChanges, ccs confChanges) (*Changer, error) {
tr := tracker.MakeProgressTracker(10)
tr := tracker.MakeProgressTracker(10, 0)
c := &Changer{
Tracker: tr,
LastIndex: 10,

View File

@ -86,7 +86,7 @@ func TestRestore(t *testing.T) {
f := func(cs pb.ConfState) bool {
chg := Changer{
Tracker: tracker.MakeProgressTracker(20),
Tracker: tracker.MakeProgressTracker(20, 0),
LastIndex: 10,
}
cfg, prs, err := Restore(chg, cs)

View File

@ -160,6 +160,16 @@ type Config struct {
// overflowing that sending buffer. TODO (xiangli): feedback to application to
// limit the proposal rate?
MaxInflightMsgs int
// MaxInflightBytes limits the number of in-flight bytes in append messages.
// Complements MaxInflightMsgs. Ignored if zero.
//
// This effectively bounds the bandwidth-delay product. Note that especially
// in high-latency deployments setting this too low can lead to a dramatic
// reduction in throughput. For example, with a peer that has a round-trip
// latency of 100ms to the leader and this setting is set to 1 MB, there is a
// throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops
// to 2.5 MB/s. See Little's law to understand the maths behind.
MaxInflightBytes uint64
// CheckQuorum specifies if the leader should check quorum activity. Leader
// steps down when quorum is not active for an electionTimeout.
@ -228,6 +238,11 @@ func (c *Config) validate() error {
if c.MaxInflightMsgs <= 0 {
return errors.New("max inflight messages must be greater than 0")
}
if c.MaxInflightBytes == 0 {
c.MaxInflightBytes = noLimit
} else if c.MaxInflightBytes < c.MaxSizePerMsg {
return errors.New("max inflight bytes must be >= max message size")
}
if c.Logger == nil {
c.Logger = getLogger()
@ -332,7 +347,7 @@ func newRaft(c *Config) *raft {
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
maxUncommittedSize: c.MaxUncommittedEntriesSize,
prs: tracker.MakeProgressTracker(c.MaxInflightMsgs),
prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes),
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
@ -484,7 +499,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
// Send the actual MsgApp otherwise, and update the progress accordingly.
next := pr.Next // save Next for later, as the progress update can change it
if err := pr.UpdateOnEntriesSend(len(ents), next); err != nil {
if err := pr.UpdateOnEntriesSend(len(ents), payloadsSize(ents), next); err != nil {
r.logger.Panicf("%x: %v", r.id, err)
}
r.send(pb.Message{
@ -629,7 +644,7 @@ func (r *raft) reset(term uint64) {
*pr = tracker.Progress{
Match: 0,
Next: r.raftLog.lastIndex() + 1,
Inflights: tracker.NewInflights(r.prs.MaxInflight),
Inflights: tracker.NewInflights(r.prs.MaxInflight, r.prs.MaxInflightBytes),
IsLearner: pr.IsLearner,
}
if id == r.id {
@ -1618,7 +1633,7 @@ func (r *raft) restore(s pb.Snapshot) bool {
r.raftLog.restore(s)
// Reset the configuration and add the (potentially updated) peers in anew.
r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight, r.prs.MaxInflightBytes)
cfg, prs, err := confchange.Restore(confchange.Changer{
Tracker: r.prs,
LastIndex: r.raftLog.lastIndex(),
@ -1789,11 +1804,7 @@ func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Messa
// Empty payloads are never refused. This is used both for appending an empty
// entry at a new leader's term, as well as leaving a joint configuration.
func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
var s uint64
for _, e := range ents {
s += uint64(PayloadSize(e))
}
s := payloadsSize(ents)
if r.uncommittedSize > 0 && s > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
// If the uncommitted tail of the Raft log is empty, allow any size
// proposal. Otherwise, limit the size of the uncommitted tail of the
@ -1815,12 +1826,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
// Fast-path for followers, who do not track or enforce the limit.
return
}
var s uint64
for _, e := range ents {
s += uint64(PayloadSize(e))
}
if s > r.uncommittedSize {
if s := payloadsSize(ents); s > r.uncommittedSize {
// uncommittedSize may underestimate the size of the uncommitted Raft
// log tail but will never overestimate it. Saturate at 0 instead of
// allowing overflow.
@ -1830,6 +1836,14 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
}
}
func payloadsSize(ents []pb.Entry) uint64 {
var s uint64
for _, e := range ents {
s += uint64(PayloadSize(e))
}
return s
}
func numOfPendingConf(ents []pb.Entry) int {
n := 0
for i := range ents {

View File

@ -130,6 +130,7 @@ func TestProgressFlowControl(t *testing.T) {
cfg := newTestConfig(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
cfg.MaxInflightMsgs = 3
cfg.MaxSizePerMsg = 2048
cfg.MaxInflightBytes = 9000 // A little over MaxInflightMsgs * MaxSizePerMsg.
r := newRaft(cfg)
r.becomeCandidate()
r.becomeLeader()
@ -140,7 +141,12 @@ func TestProgressFlowControl(t *testing.T) {
// While node 2 is in probe state, propose a bunch of entries.
r.prs.Progress[2].BecomeProbe()
blob := []byte(strings.Repeat("a", 1000))
for i := 0; i < 10; i++ {
large := []byte(strings.Repeat("b", 5000))
for i := 0; i < 22; i++ {
blob := blob
if i >= 10 && i < 16 { // Temporarily send large messages.
blob = large
}
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
}
@ -158,40 +164,40 @@ func TestProgressFlowControl(t *testing.T) {
t.Fatalf("unexpected entry sizes: %v", ms[0].Entries)
}
// When this append is acked, we change to replicate state and can
// send multiple messages at once.
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[0].Entries[1].Index})
ms = r.readMessages()
if len(ms) != 3 {
t.Fatalf("expected 3 messages, got %d", len(ms))
}
for i, m := range ms {
if m.Type != pb.MsgApp {
t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
ackAndVerify := func(index uint64, expEntries ...int) uint64 {
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: index})
ms := r.readMessages()
if got, want := len(ms), len(expEntries); got != want {
t.Fatalf("expected %d messages, got %d", want, got)
}
if len(m.Entries) != 2 {
t.Errorf("%d: expected 2 entries, got %d", i, len(m.Entries))
for i, m := range ms {
if got, want := m.Type, pb.MsgApp; got != want {
t.Errorf("%d: expected MsgApp, got %s", i, got)
}
if got, want := len(m.Entries), expEntries[i]; got != want {
t.Errorf("%d: expected %d entries, got %d", i, want, got)
}
}
last := ms[len(ms)-1].Entries
if len(last) == 0 {
return index
}
return last[len(last)-1].Index
}
// Ack all three of those messages together and get the last two
// messages (containing three entries).
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[2].Entries[1].Index})
ms = r.readMessages()
if len(ms) != 2 {
t.Fatalf("expected 2 messages, got %d", len(ms))
}
for i, m := range ms {
if m.Type != pb.MsgApp {
t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
}
}
if len(ms[0].Entries) != 2 {
t.Errorf("%d: expected 2 entries, got %d", 0, len(ms[0].Entries))
}
if len(ms[1].Entries) != 1 {
t.Errorf("%d: expected 1 entry, got %d", 1, len(ms[1].Entries))
}
// When this append is acked, we change to replicate state and can
// send multiple messages at once.
index := ackAndVerify(ms[0].Entries[1].Index, 2, 2, 2)
// Ack all three of those messages together and get another 3 messages. The
// third message contains a single large entry, in contrast to 2 before.
index = ackAndVerify(index, 2, 1, 1)
// All subsequent messages contain one large entry, and we cap at 2 messages
// because it overflows MaxInflightBytes.
index = ackAndVerify(index, 1, 1)
index = ackAndVerify(index, 1, 1)
// Start getting small messages again.
index = ackAndVerify(index, 1, 2, 2)
ackAndVerify(index, 2)
}
func TestUncommittedEntryLimit(t *testing.T) {
@ -4706,7 +4712,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
learners[i] = true
}
v.id = id
v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight)
v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight, v.prs.MaxInflightBytes)
if len(learners) > 0 {
v.prs.Learners = map[uint64]struct{}{}
}

View File

@ -14,6 +14,12 @@
package tracker
// inflight describes an in-flight MsgApp message.
type inflight struct {
index uint64 // the index of the last entry inside the message
bytes uint64 // the total byte size of the entries in the message
}
// Inflights limits the number of MsgApp (represented by the largest index
// contained within) sent to followers but not yet acknowledged by them. Callers
// use Full() to check whether more messages can be sent, call Add() whenever
@ -22,21 +28,25 @@ package tracker
type Inflights struct {
// the starting index in the buffer
start int
// number of inflights in the buffer
count int
// the size of the buffer
size int
count int // number of inflight messages in the buffer
bytes uint64 // number of inflight bytes
// buffer contains the index of the last entry
// inside one message.
buffer []uint64
size int // the max number of inflight messages
maxBytes uint64 // the max total byte size of inflight messages
// buffer is a ring buffer containing info about all in-flight messages.
buffer []inflight
}
// NewInflights sets up an Inflights that allows up to 'size' inflight messages.
func NewInflights(size int) *Inflights {
// NewInflights sets up an Inflights that allows up to size inflight messages,
// with the total byte size up to maxBytes. If maxBytes is 0 then there is no
// byte size limit. The maxBytes limit is soft, i.e. we accept a single message
// that brings it from size < maxBytes to size >= maxBytes.
func NewInflights(size int, maxBytes uint64) *Inflights {
return &Inflights{
size: size,
size: size,
maxBytes: maxBytes,
}
}
@ -44,15 +54,15 @@ func NewInflights(size int) *Inflights {
// the receiver.
func (in *Inflights) Clone() *Inflights {
ins := *in
ins.buffer = append([]uint64(nil), in.buffer...)
ins.buffer = append([]inflight(nil), in.buffer...)
return &ins
}
// Add notifies the Inflights that a new message with the given index is being
// dispatched. Full() must be called prior to Add() to verify that there is room
// for one more message, and consecutive calls to add Add() must provide a
// monotonic sequence of indexes.
func (in *Inflights) Add(inflight uint64) {
// Add notifies the Inflights that a new message with the given index and byte
// size is being dispatched. Full() must be called prior to Add() to verify that
// there is room for one more message, and consecutive calls to Add() must
// provide a monotonic sequence of indexes.
func (in *Inflights) Add(index, bytes uint64) {
if in.Full() {
panic("cannot add into a Full inflights")
}
@ -64,8 +74,9 @@ func (in *Inflights) Add(inflight uint64) {
if next >= len(in.buffer) {
in.grow()
}
in.buffer[next] = inflight
in.buffer[next] = inflight{index: index, bytes: bytes}
in.count++
in.bytes += bytes
}
// grow the inflight buffer by doubling up to inflights.size. We grow on demand
@ -78,24 +89,26 @@ func (in *Inflights) grow() {
} else if newSize > in.size {
newSize = in.size
}
newBuffer := make([]uint64, newSize)
newBuffer := make([]inflight, newSize)
copy(newBuffer, in.buffer)
in.buffer = newBuffer
}
// FreeLE frees the inflights smaller or equal to the given `to` flight.
func (in *Inflights) FreeLE(to uint64) {
if in.count == 0 || to < in.buffer[in.start] {
if in.count == 0 || to < in.buffer[in.start].index {
// out of the left side of the window
return
}
idx := in.start
var i int
var bytes uint64
for i = 0; i < in.count; i++ {
if to < in.buffer[idx] { // found the first large inflight
if to < in.buffer[idx].index { // found the first large inflight
break
}
bytes += in.buffer[idx].bytes
// increase index and maybe rotate
size := in.size
@ -105,6 +118,7 @@ func (in *Inflights) FreeLE(to uint64) {
}
// free i inflights and set new start index
in.count -= i
in.bytes -= bytes
in.start = idx
if in.count == 0 {
// inflights is empty, reset the start index so that we don't grow the
@ -115,7 +129,7 @@ func (in *Inflights) FreeLE(to uint64) {
// Full returns true if no more messages can be sent at the moment.
func (in *Inflights) Full() bool {
return in.count == in.size
return in.count == in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes)
}
// Count returns the number of inflight messages.

View File

@ -24,32 +24,38 @@ func TestInflightsAdd(t *testing.T) {
// no rotating case
in := &Inflights{
size: 10,
buffer: make([]uint64, 10),
buffer: make([]inflight, 10),
}
for i := 0; i < 5; i++ {
in.Add(uint64(i))
in.Add(uint64(i), uint64(100+i))
}
wantIn := &Inflights{
start: 0,
count: 5,
bytes: 510,
size: 10,
// ↓------------
buffer: []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0},
buffer: inflightsBuffer(
// ↓------------
[]uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0},
[]uint64{100, 101, 102, 103, 104, 0, 0, 0, 0, 0}),
}
require.Equal(t, wantIn, in)
for i := 5; i < 10; i++ {
in.Add(uint64(i))
in.Add(uint64(i), uint64(100+i))
}
wantIn2 := &Inflights{
start: 0,
count: 10,
bytes: 1045,
size: 10,
// ↓---------------------------
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
buffer: inflightsBuffer(
// ↓---------------------------
[]uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
[]uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}),
}
require.Equal(t, wantIn2, in)
@ -57,41 +63,47 @@ func TestInflightsAdd(t *testing.T) {
in2 := &Inflights{
start: 5,
size: 10,
buffer: make([]uint64, 10),
buffer: make([]inflight, 10),
}
for i := 0; i < 5; i++ {
in2.Add(uint64(i))
in2.Add(uint64(i), uint64(100+i))
}
wantIn21 := &Inflights{
start: 5,
count: 5,
bytes: 510,
size: 10,
// ↓------------
buffer: []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4},
buffer: inflightsBuffer(
// ↓------------
[]uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4},
[]uint64{0, 0, 0, 0, 0, 100, 101, 102, 103, 104}),
}
require.Equal(t, wantIn21, in2)
for i := 5; i < 10; i++ {
in2.Add(uint64(i))
in2.Add(uint64(i), uint64(100+i))
}
wantIn22 := &Inflights{
start: 5,
count: 10,
bytes: 1045,
size: 10,
// -------------- ↓------------
buffer: []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4},
buffer: inflightsBuffer(
// -------------- ↓------------
[]uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4},
[]uint64{105, 106, 107, 108, 109, 100, 101, 102, 103, 104}),
}
require.Equal(t, wantIn22, in2)
}
func TestInflightFreeTo(t *testing.T) {
// no rotating case
in := NewInflights(10)
in := NewInflights(10, 0)
for i := 0; i < 10; i++ {
in.Add(uint64(i))
in.Add(uint64(i), uint64(100+i))
}
in.FreeLE(0)
@ -99,9 +111,12 @@ func TestInflightFreeTo(t *testing.T) {
wantIn0 := &Inflights{
start: 1,
count: 9,
bytes: 945,
size: 10,
// ↓------------------------
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
buffer: inflightsBuffer(
// ↓------------------------
[]uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
[]uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}),
}
require.Equal(t, wantIn0, in)
@ -110,9 +125,12 @@ func TestInflightFreeTo(t *testing.T) {
wantIn := &Inflights{
start: 5,
count: 5,
bytes: 535,
size: 10,
// ↓------------
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
buffer: inflightsBuffer(
// ↓------------
[]uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
[]uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}),
}
require.Equal(t, wantIn, in)
@ -121,15 +139,18 @@ func TestInflightFreeTo(t *testing.T) {
wantIn2 := &Inflights{
start: 9,
count: 1,
bytes: 109,
size: 10,
// ↓
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
buffer: inflightsBuffer(
// ↓
[]uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
[]uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}),
}
require.Equal(t, wantIn2, in)
// rotating case
for i := 10; i < 15; i++ {
in.Add(uint64(i))
in.Add(uint64(i), uint64(100+i))
}
in.FreeLE(12)
@ -137,9 +158,12 @@ func TestInflightFreeTo(t *testing.T) {
wantIn3 := &Inflights{
start: 3,
count: 2,
bytes: 227,
size: 10,
// ↓-----
buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9},
buffer: inflightsBuffer(
// ↓-----
[]uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9},
[]uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}),
}
require.Equal(t, wantIn3, in)
@ -149,8 +173,67 @@ func TestInflightFreeTo(t *testing.T) {
start: 0,
count: 0,
size: 10,
// ↓
buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9},
buffer: inflightsBuffer(
// ↓
[]uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9},
[]uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}),
}
require.Equal(t, wantIn4, in)
}
func TestInflightsFull(t *testing.T) {
for _, tc := range []struct {
name string
size int
maxBytes uint64
fullAt int
freeLE uint64
againAt int
}{
{name: "always-full", size: 0, fullAt: 0},
{name: "single-entry", size: 1, fullAt: 1, freeLE: 1, againAt: 2},
{name: "single-entry-overflow", size: 1, maxBytes: 10, fullAt: 1, freeLE: 1, againAt: 2},
{name: "multi-entry", size: 15, fullAt: 15, freeLE: 6, againAt: 22},
{name: "slight-overflow", size: 8, maxBytes: 400, fullAt: 4, freeLE: 2, againAt: 7},
{name: "exact-max-bytes", size: 8, maxBytes: 406, fullAt: 4, freeLE: 3, againAt: 8},
{name: "larger-overflow", size: 15, maxBytes: 408, fullAt: 5, freeLE: 1, againAt: 6},
} {
t.Run(tc.name, func(t *testing.T) {
in := NewInflights(tc.size, tc.maxBytes)
addUntilFull := func(begin, end int) {
for i := begin; i < end; i++ {
if in.Full() {
t.Fatalf("full at %d, want %d", i, end)
}
in.Add(uint64(i), uint64(100+i))
}
if !in.Full() {
t.Fatalf("not full at %d", end)
}
}
addUntilFull(0, tc.fullAt)
in.FreeLE(tc.freeLE)
addUntilFull(tc.fullAt, tc.againAt)
defer func() {
if r := recover(); r == nil {
t.Errorf("Add() did not panic")
}
}()
in.Add(100, 1024)
})
}
}
func inflightsBuffer(indices []uint64, sizes []uint64) []inflight {
if len(indices) != len(sizes) {
panic("len(indices) != len(sizes)")
}
buffer := make([]inflight, 0, len(indices))
for i, idx := range indices {
buffer = append(buffer, inflight{index: idx, bytes: sizes[i]})
}
return buffer
}

View File

@ -134,14 +134,15 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
}
// UpdateOnEntriesSend updates the progress on the given number of consecutive
// entries being sent in a MsgApp, appended at and after the given log index.
func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error {
// entries being sent in a MsgApp, with the given total bytes size, appended at
// and after the given log index.
func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) error {
switch pr.State {
case StateReplicate:
if entries > 0 {
last := nextIndex + uint64(entries) - 1
pr.OptimisticUpdate(last)
pr.Inflights.Add(last)
pr.Inflights.Add(last, bytes)
}
// If this message overflows the in-flights tracker, or it was already full,
// consider this message being a probe, so that the flow is paused.

View File

@ -21,8 +21,8 @@ import (
)
func TestProgressString(t *testing.T) {
ins := NewInflights(1)
ins.Add(123)
ins := NewInflights(1, 0)
ins.Add(123, 1)
pr := &Progress{
Match: 1,
Next: 2,
@ -55,7 +55,7 @@ func TestProgressIsPaused(t *testing.T) {
p := &Progress{
State: tt.state,
MsgAppFlowPaused: tt.paused,
Inflights: NewInflights(256),
Inflights: NewInflights(256, 0),
}
assert.Equal(t, tt.w, p.IsPaused(), i)
}
@ -82,17 +82,17 @@ func TestProgressBecomeProbe(t *testing.T) {
wnext uint64
}{
{
&Progress{State: StateReplicate, Match: match, Next: 5, Inflights: NewInflights(256)},
&Progress{State: StateReplicate, Match: match, Next: 5, Inflights: NewInflights(256, 0)},
2,
},
{
// snapshot finish
&Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256)},
&Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256, 0)},
11,
},
{
// snapshot failure
&Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256)},
&Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256, 0)},
2,
},
}
@ -105,7 +105,7 @@ func TestProgressBecomeProbe(t *testing.T) {
}
func TestProgressBecomeReplicate(t *testing.T) {
p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)}
p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256, 0)}
p.BecomeReplicate()
assert.Equal(t, StateReplicate, p.State)
assert.Equal(t, uint64(1), p.Match)
@ -113,7 +113,7 @@ func TestProgressBecomeReplicate(t *testing.T) {
}
func TestProgressBecomeSnapshot(t *testing.T) {
p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)}
p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256, 0)}
p.BecomeSnapshot(10)
assert.Equal(t, StateSnapshot, p.State)
assert.Equal(t, uint64(1), p.Match)

View File

@ -121,13 +121,15 @@ type ProgressTracker struct {
Votes map[uint64]bool
MaxInflight int
MaxInflight int
MaxInflightBytes uint64
}
// MakeProgressTracker initializes a ProgressTracker.
func MakeProgressTracker(maxInflight int) ProgressTracker {
func MakeProgressTracker(maxInflight int, maxBytes uint64) ProgressTracker {
p := ProgressTracker{
MaxInflight: maxInflight,
MaxInflight: maxInflight,
MaxInflightBytes: maxBytes,
Config: Config{
Voters: quorum.JointConfig{
quorum.MajorityConfig{},