diff --git a/raft/log.go b/raft/log.go index 50f28f87b..03f83e61c 100644 --- a/raft/log.go +++ b/raft/log.go @@ -39,7 +39,9 @@ type raftLog struct { logger Logger - maxMsgSize uint64 + // maxNextEntsSize is the maximum number aggregate byte size of the messages + // returned from calls to nextEnts. + maxNextEntsSize uint64 } // newLog returns log using the given storage and default options. It @@ -51,14 +53,14 @@ func newLog(storage Storage, logger Logger) *raftLog { // newLogWithSize returns a log using the given storage and max // message size. -func newLogWithSize(storage Storage, logger Logger, maxMsgSize uint64) *raftLog { +func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raftLog { if storage == nil { log.Panic("storage must not be nil") } log := &raftLog{ - storage: storage, - logger: logger, - maxMsgSize: maxMsgSize, + storage: storage, + logger: logger, + maxNextEntsSize: maxNextEntsSize, } firstIndex, err := storage.FirstIndex() if err != nil { @@ -149,7 +151,7 @@ func (l *raftLog) unstableEntries() []pb.Entry { func (l *raftLog) nextEnts() (ents []pb.Entry) { off := max(l.applied+1, l.firstIndex()) if l.committed+1 > off { - ents, err := l.slice(off, l.committed+1, l.maxMsgSize) + ents, err := l.slice(off, l.committed+1, l.maxNextEntsSize) if err != nil { l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err) } diff --git a/raft/node_test.go b/raft/node_test.go index e977da6d6..641a5ca2f 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -894,7 +894,7 @@ func TestAppendPagination(t *testing.T) { func TestCommitPagination(t *testing.T) { s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) - cfg.MaxSizePerMsg = 2048 + cfg.MaxCommittedSizePerReady = 2048 r := newRaft(cfg) n := newNode() go n.run(r) diff --git a/raft/raft.go b/raft/raft.go index b76b9a942..865b36836 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -154,6 +154,9 @@ type Config struct { // throughput during normal replication. Note: math.MaxUint64 for unlimited, // 0 for at most one entry per message. MaxSizePerMsg uint64 + // MaxCommittedSizePerReady limits the size of the committed entries which + // can be applied. + MaxCommittedSizePerReady uint64 // MaxUncommittedEntriesSize limits the aggregate byte size of the // uncommitted entries that may be appended to a leader's log. Once this // limit is exceeded, proposals will begin to return ErrProposalDropped @@ -224,6 +227,12 @@ func (c *Config) validate() error { c.MaxUncommittedEntriesSize = noLimit } + // default MaxCommittedSizePerReady to MaxSizePerMsg because they were + // previously the same parameter. + if c.MaxCommittedSizePerReady == 0 { + c.MaxCommittedSizePerReady = c.MaxSizePerMsg + } + if c.MaxInflightMsgs <= 0 { return errors.New("max inflight messages must be greater than 0") } @@ -316,7 +325,7 @@ func newRaft(c *Config) *raft { if err := c.validate(); err != nil { panic(err.Error()) } - raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxSizePerMsg) + raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady) hs, cs, err := c.Storage.InitialState() if err != nil { panic(err) // TODO(bdarnell)