Merge pull request #10258 from ajwerner/ajwerner/raft_committed_entries_size
raft: separate MaxCommittedSizePerReady config from MaxSizePerMsgrelease-3.4
commit
fa92397e18
14
raft/log.go
14
raft/log.go
|
@ -39,7 +39,9 @@ type raftLog struct {
|
||||||
|
|
||||||
logger Logger
|
logger Logger
|
||||||
|
|
||||||
maxMsgSize uint64
|
// maxNextEntsSize is the maximum number aggregate byte size of the messages
|
||||||
|
// returned from calls to nextEnts.
|
||||||
|
maxNextEntsSize uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// newLog returns log using the given storage and default options. It
|
// newLog returns log using the given storage and default options. It
|
||||||
|
@ -51,14 +53,14 @@ func newLog(storage Storage, logger Logger) *raftLog {
|
||||||
|
|
||||||
// newLogWithSize returns a log using the given storage and max
|
// newLogWithSize returns a log using the given storage and max
|
||||||
// message size.
|
// message size.
|
||||||
func newLogWithSize(storage Storage, logger Logger, maxMsgSize uint64) *raftLog {
|
func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raftLog {
|
||||||
if storage == nil {
|
if storage == nil {
|
||||||
log.Panic("storage must not be nil")
|
log.Panic("storage must not be nil")
|
||||||
}
|
}
|
||||||
log := &raftLog{
|
log := &raftLog{
|
||||||
storage: storage,
|
storage: storage,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
maxMsgSize: maxMsgSize,
|
maxNextEntsSize: maxNextEntsSize,
|
||||||
}
|
}
|
||||||
firstIndex, err := storage.FirstIndex()
|
firstIndex, err := storage.FirstIndex()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -149,7 +151,7 @@ func (l *raftLog) unstableEntries() []pb.Entry {
|
||||||
func (l *raftLog) nextEnts() (ents []pb.Entry) {
|
func (l *raftLog) nextEnts() (ents []pb.Entry) {
|
||||||
off := max(l.applied+1, l.firstIndex())
|
off := max(l.applied+1, l.firstIndex())
|
||||||
if l.committed+1 > off {
|
if l.committed+1 > off {
|
||||||
ents, err := l.slice(off, l.committed+1, l.maxMsgSize)
|
ents, err := l.slice(off, l.committed+1, l.maxNextEntsSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
|
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -894,7 +894,7 @@ func TestAppendPagination(t *testing.T) {
|
||||||
func TestCommitPagination(t *testing.T) {
|
func TestCommitPagination(t *testing.T) {
|
||||||
s := NewMemoryStorage()
|
s := NewMemoryStorage()
|
||||||
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
|
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
|
||||||
cfg.MaxSizePerMsg = 2048
|
cfg.MaxCommittedSizePerReady = 2048
|
||||||
r := newRaft(cfg)
|
r := newRaft(cfg)
|
||||||
n := newNode()
|
n := newNode()
|
||||||
go n.run(r)
|
go n.run(r)
|
||||||
|
|
11
raft/raft.go
11
raft/raft.go
|
@ -154,6 +154,9 @@ type Config struct {
|
||||||
// throughput during normal replication. Note: math.MaxUint64 for unlimited,
|
// throughput during normal replication. Note: math.MaxUint64 for unlimited,
|
||||||
// 0 for at most one entry per message.
|
// 0 for at most one entry per message.
|
||||||
MaxSizePerMsg uint64
|
MaxSizePerMsg uint64
|
||||||
|
// MaxCommittedSizePerReady limits the size of the committed entries which
|
||||||
|
// can be applied.
|
||||||
|
MaxCommittedSizePerReady uint64
|
||||||
// MaxUncommittedEntriesSize limits the aggregate byte size of the
|
// MaxUncommittedEntriesSize limits the aggregate byte size of the
|
||||||
// uncommitted entries that may be appended to a leader's log. Once this
|
// uncommitted entries that may be appended to a leader's log. Once this
|
||||||
// limit is exceeded, proposals will begin to return ErrProposalDropped
|
// limit is exceeded, proposals will begin to return ErrProposalDropped
|
||||||
|
@ -224,6 +227,12 @@ func (c *Config) validate() error {
|
||||||
c.MaxUncommittedEntriesSize = noLimit
|
c.MaxUncommittedEntriesSize = noLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// default MaxCommittedSizePerReady to MaxSizePerMsg because they were
|
||||||
|
// previously the same parameter.
|
||||||
|
if c.MaxCommittedSizePerReady == 0 {
|
||||||
|
c.MaxCommittedSizePerReady = c.MaxSizePerMsg
|
||||||
|
}
|
||||||
|
|
||||||
if c.MaxInflightMsgs <= 0 {
|
if c.MaxInflightMsgs <= 0 {
|
||||||
return errors.New("max inflight messages must be greater than 0")
|
return errors.New("max inflight messages must be greater than 0")
|
||||||
}
|
}
|
||||||
|
@ -316,7 +325,7 @@ func newRaft(c *Config) *raft {
|
||||||
if err := c.validate(); err != nil {
|
if err := c.validate(); err != nil {
|
||||||
panic(err.Error())
|
panic(err.Error())
|
||||||
}
|
}
|
||||||
raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxSizePerMsg)
|
raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
|
||||||
hs, cs, err := c.Storage.InitialState()
|
hs, cs, err := c.Storage.InitialState()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err) // TODO(bdarnell)
|
panic(err) // TODO(bdarnell)
|
||||||
|
|
Loading…
Reference in New Issue