raft: extract 'tracker' package

Mechanically extract `progressTracker`, `Progress`, and `inflights`
to their own package named `tracker`. Add lots of comments in the
progress, and take the opportunity to rename and clarify various
fields.
release-3.4
Tobias Schottdorf 2019-06-08 19:41:24 +02:00
parent 6953ccc135
commit f9c2d00fb3
17 changed files with 1065 additions and 919 deletions

View File

@ -34,7 +34,6 @@ import (
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/wal"
"go.etcd.io/etcd/wal/walpb"
"go.uber.org/zap"
)

View File

@ -353,15 +353,15 @@ func (n *node) run(r *raft) {
}
case m := <-n.recvc:
// filter out response message from unknown From.
if pr := r.prs.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
r.Step(m)
}
case cc := <-n.confc:
if cc.NodeID == None {
select {
case n.confstatec <- pb.ConfState{
Nodes: r.prs.voterNodes(),
Learners: r.prs.learnerNodes()}:
Nodes: r.prs.VoterNodes(),
Learners: r.prs.LearnerNodes()}:
case <-n.done:
}
break
@ -384,8 +384,8 @@ func (n *node) run(r *raft) {
}
select {
case n.confstatec <- pb.ConfState{
Nodes: r.prs.voterNodes(),
Learners: r.prs.learnerNodes()}:
Nodes: r.prs.VoterNodes(),
Learners: r.prs.LearnerNodes()}:
case <-n.done:
}
case <-n.tickc:

View File

@ -1,457 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raft
import (
"fmt"
"sort"
"go.etcd.io/etcd/raft/quorum"
)
const (
ProgressStateProbe ProgressStateType = iota
ProgressStateReplicate
ProgressStateSnapshot
)
type ProgressStateType uint64
var prstmap = [...]string{
"ProgressStateProbe",
"ProgressStateReplicate",
"ProgressStateSnapshot",
}
func (st ProgressStateType) String() string { return prstmap[uint64(st)] }
// Progress represents a followers progress in the view of the leader. Leader maintains
// progresses of all followers, and sends entries to the follower based on its progress.
type Progress struct {
Match, Next uint64
// State defines how the leader should interact with the follower.
//
// When in ProgressStateProbe, leader sends at most one replication message
// per heartbeat interval. It also probes actual progress of the follower.
//
// When in ProgressStateReplicate, leader optimistically increases next
// to the latest entry sent after sending replication message. This is
// an optimized state for fast replicating log entries to the follower.
//
// When in ProgressStateSnapshot, leader should have sent out snapshot
// before and stops sending any replication message.
State ProgressStateType
// Paused is used in ProgressStateProbe.
// When Paused is true, raft should pause sending replication message to this peer.
Paused bool
// PendingSnapshot is used in ProgressStateSnapshot.
// If there is a pending snapshot, the pendingSnapshot will be set to the
// index of the snapshot. If pendingSnapshot is set, the replication process of
// this Progress will be paused. raft will not resend snapshot until the pending one
// is reported to be failed.
PendingSnapshot uint64
// RecentActive is true if the progress is recently active. Receiving any messages
// from the corresponding follower indicates the progress is active.
// RecentActive can be reset to false after an election timeout.
RecentActive bool
// inflights is a sliding window for the inflight messages.
// Each inflight message contains one or more log entries.
// The max number of entries per message is defined in raft config as MaxSizePerMsg.
// Thus inflight effectively limits both the number of inflight messages
// and the bandwidth each Progress can use.
// When inflights is full, no more message should be sent.
// When a leader sends out a message, the index of the last
// entry should be added to inflights. The index MUST be added
// into inflights in order.
// When a leader receives a reply, the previous inflights should
// be freed by calling inflights.freeTo with the index of the last
// received entry.
ins *inflights
// IsLearner is true if this progress is tracked for a learner.
IsLearner bool
}
func (pr *Progress) resetState(state ProgressStateType) {
pr.Paused = false
pr.PendingSnapshot = 0
pr.State = state
pr.ins.reset()
}
func (pr *Progress) becomeProbe() {
// If the original state is ProgressStateSnapshot, progress knows that
// the pending snapshot has been sent to this peer successfully, then
// probes from pendingSnapshot + 1.
if pr.State == ProgressStateSnapshot {
pendingSnapshot := pr.PendingSnapshot
pr.resetState(ProgressStateProbe)
pr.Next = max(pr.Match+1, pendingSnapshot+1)
} else {
pr.resetState(ProgressStateProbe)
pr.Next = pr.Match + 1
}
}
func (pr *Progress) becomeReplicate() {
pr.resetState(ProgressStateReplicate)
pr.Next = pr.Match + 1
}
func (pr *Progress) becomeSnapshot(snapshoti uint64) {
pr.resetState(ProgressStateSnapshot)
pr.PendingSnapshot = snapshoti
}
// maybeUpdate returns false if the given n index comes from an outdated message.
// Otherwise it updates the progress and returns true.
func (pr *Progress) maybeUpdate(n uint64) bool {
var updated bool
if pr.Match < n {
pr.Match = n
updated = true
pr.resume()
}
if pr.Next < n+1 {
pr.Next = n + 1
}
return updated
}
func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 }
// maybeDecrTo returns false if the given to index comes from an out of order message.
// Otherwise it decreases the progress next index to min(rejected, last) and returns true.
func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
if pr.State == ProgressStateReplicate {
// the rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= pr.Match {
return false
}
// directly decrease next to match + 1
pr.Next = pr.Match + 1
return true
}
// the rejection must be stale if "rejected" does not match next - 1
if pr.Next-1 != rejected {
return false
}
if pr.Next = min(rejected, last+1); pr.Next < 1 {
pr.Next = 1
}
pr.resume()
return true
}
func (pr *Progress) pause() { pr.Paused = true }
func (pr *Progress) resume() { pr.Paused = false }
// IsPaused returns whether sending log entries to this node has been
// paused. A node may be paused because it has rejected recent
// MsgApps, is currently waiting for a snapshot, or has reached the
// MaxInflightMsgs limit.
func (pr *Progress) IsPaused() bool {
switch pr.State {
case ProgressStateProbe:
return pr.Paused
case ProgressStateReplicate:
return pr.ins.full()
case ProgressStateSnapshot:
return true
default:
panic("unexpected state")
}
}
func (pr *Progress) snapshotFailure() { pr.PendingSnapshot = 0 }
// needSnapshotAbort returns true if snapshot progress's Match
// is equal or higher than the pendingSnapshot.
func (pr *Progress) needSnapshotAbort() bool {
return pr.State == ProgressStateSnapshot && pr.Match >= pr.PendingSnapshot
}
func (pr *Progress) String() string {
return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d, recentActive = %v, isLearner = %v",
pr.Next, pr.Match, pr.State, pr.IsPaused(), pr.PendingSnapshot, pr.RecentActive, pr.IsLearner)
}
type inflights struct {
// the starting index in the buffer
start int
// number of inflights in the buffer
count int
// the size of the buffer
size int
// buffer contains the index of the last entry
// inside one message.
buffer []uint64
}
func newInflights(size int) *inflights {
return &inflights{
size: size,
}
}
// add adds an inflight into inflights
func (in *inflights) add(inflight uint64) {
if in.full() {
panic("cannot add into a full inflights")
}
next := in.start + in.count
size := in.size
if next >= size {
next -= size
}
if next >= len(in.buffer) {
in.growBuf()
}
in.buffer[next] = inflight
in.count++
}
// grow the inflight buffer by doubling up to inflights.size. We grow on demand
// instead of preallocating to inflights.size to handle systems which have
// thousands of Raft groups per process.
func (in *inflights) growBuf() {
newSize := len(in.buffer) * 2
if newSize == 0 {
newSize = 1
} else if newSize > in.size {
newSize = in.size
}
newBuffer := make([]uint64, newSize)
copy(newBuffer, in.buffer)
in.buffer = newBuffer
}
// freeTo frees the inflights smaller or equal to the given `to` flight.
func (in *inflights) freeTo(to uint64) {
if in.count == 0 || to < in.buffer[in.start] {
// out of the left side of the window
return
}
idx := in.start
var i int
for i = 0; i < in.count; i++ {
if to < in.buffer[idx] { // found the first large inflight
break
}
// increase index and maybe rotate
size := in.size
if idx++; idx >= size {
idx -= size
}
}
// free i inflights and set new start index
in.count -= i
in.start = idx
if in.count == 0 {
// inflights is empty, reset the start index so that we don't grow the
// buffer unnecessarily.
in.start = 0
}
}
func (in *inflights) freeFirstOne() { in.freeTo(in.buffer[in.start]) }
// full returns true if the inflights is full.
func (in *inflights) full() bool {
return in.count == in.size
}
// resets frees all inflights.
func (in *inflights) reset() {
in.count = 0
in.start = 0
}
// progressTracker tracks the currently active configuration and the information
// known about the nodes and learners in it. In particular, it tracks the match
// index for each peer which in turn allows reasoning about the committed index.
type progressTracker struct {
voters quorum.JointConfig
learners map[uint64]struct{}
prs map[uint64]*Progress
votes map[uint64]bool
maxInflight int
}
func makeProgressTracker(maxInflight int) progressTracker {
p := progressTracker{
maxInflight: maxInflight,
voters: quorum.JointConfig{
quorum.MajorityConfig{},
quorum.MajorityConfig{},
},
learners: map[uint64]struct{}{},
votes: map[uint64]bool{},
prs: map[uint64]*Progress{},
}
return p
}
// isSingleton returns true if (and only if) there is only one voting member
// (i.e. the leader) in the current configuration.
func (p *progressTracker) isSingleton() bool {
return len(p.voters[0]) == 1 && len(p.voters[1]) == 0
}
type progressAckIndexer map[uint64]*Progress
var _ quorum.AckedIndexer = progressAckIndexer(nil)
func (l progressAckIndexer) AckedIndex(id uint64) (quorum.Index, bool) {
pr, ok := l[id]
if !ok {
return 0, false
}
return quorum.Index(pr.Match), true
}
// committed returns the largest log index known to be committed based on what
// the voting members of the group have acknowledged.
func (p *progressTracker) committed() uint64 {
return uint64(p.voters.CommittedIndex(progressAckIndexer(p.prs)))
}
func (p *progressTracker) removeAny(id uint64) {
_, okPR := p.prs[id]
_, okV1 := p.voters[0][id]
_, okV2 := p.voters[1][id]
_, okL := p.learners[id]
okV := okV1 || okV2
if !okPR {
panic("attempting to remove unknown peer %x")
} else if !okV && !okL {
panic("attempting to remove unknown peer %x")
} else if okV && okL {
panic(fmt.Sprintf("peer %x is both voter and learner", id))
}
delete(p.voters[0], id)
delete(p.voters[1], id)
delete(p.learners, id)
delete(p.prs, id)
}
// initProgress initializes a new progress for the given node or learner. The
// node may not exist yet in either form or a panic will ensue.
func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) {
if pr := p.prs[id]; pr != nil {
panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
}
if !isLearner {
p.voters[0][id] = struct{}{}
} else {
p.learners[id] = struct{}{}
}
p.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: isLearner}
}
func (p *progressTracker) getProgress(id uint64) *Progress {
return p.prs[id]
}
// visit invokes the supplied closure for all tracked progresses.
func (p *progressTracker) visit(f func(id uint64, pr *Progress)) {
for id, pr := range p.prs {
f(id, pr)
}
}
// checkQuorumActive returns true if the quorum is active from
// the view of the local raft state machine. Otherwise, it returns
// false.
func (p *progressTracker) quorumActive() bool {
votes := map[uint64]bool{}
p.visit(func(id uint64, pr *Progress) {
if pr.IsLearner {
return
}
votes[id] = pr.RecentActive
})
return p.voters.VoteResult(votes) == quorum.VoteWon
}
func (p *progressTracker) voterNodes() []uint64 {
m := p.voters.IDs()
nodes := make([]uint64, 0, len(m))
for id := range m {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
return nodes
}
func (p *progressTracker) learnerNodes() []uint64 {
nodes := make([]uint64, 0, len(p.learners))
for id := range p.learners {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
return nodes
}
// resetVotes prepares for a new round of vote counting via recordVote.
func (p *progressTracker) resetVotes() {
p.votes = map[uint64]bool{}
}
// recordVote records that the node with the given id voted for this Raft
// instance if v == true (and declined it otherwise).
func (p *progressTracker) recordVote(id uint64, v bool) {
_, ok := p.votes[id]
if !ok {
p.votes[id] = v
}
}
// tallyVotes returns the number of granted and rejected votes, and whether the
// election outcome is known.
func (p *progressTracker) tallyVotes() (granted int, rejected int, _ quorum.VoteResult) {
// Make sure to populate granted/rejected correctly even if the votes slice
// contains members no longer part of the configuration. This doesn't really
// matter in the way the numbers are used (they're informational), but might
// as well get it right.
for id, pr := range p.prs {
if pr.IsLearner {
continue
}
if p.votes[id] {
granted++
} else {
rejected++
}
}
result := p.voters.VoteResult(p.votes)
return granted, rejected, result
}

View File

@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/raft/quorum"
pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
)
// None is a placeholder node ID used when there is no leader.
@ -261,7 +262,7 @@ type raft struct {
maxMsgSize uint64
maxUncommittedSize uint64
prs progressTracker
prs tracker.ProgressTracker
state StateType
@ -344,7 +345,7 @@ func newRaft(c *Config) *raft {
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
maxUncommittedSize: c.MaxUncommittedEntriesSize,
prs: makeProgressTracker(c.MaxInflightMsgs),
prs: tracker.MakeProgressTracker(c.MaxInflightMsgs),
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
@ -355,11 +356,11 @@ func newRaft(c *Config) *raft {
}
for _, p := range peers {
// Add node to active config.
r.prs.initProgress(p, 0 /* match */, 1 /* next */, false /* isLearner */)
r.prs.InitProgress(p, 0 /* match */, 1 /* next */, false /* isLearner */)
}
for _, p := range learners {
// Add learner to active config.
r.prs.initProgress(p, 0 /* match */, 1 /* next */, true /* isLearner */)
r.prs.InitProgress(p, 0 /* match */, 1 /* next */, true /* isLearner */)
if r.id == p {
r.isLearner = true
@ -375,7 +376,7 @@ func newRaft(c *Config) *raft {
r.becomeFollower(r.Term, None)
var nodesStrs []string
for _, n := range r.prs.voterNodes() {
for _, n := range r.prs.VoterNodes() {
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
}
@ -442,7 +443,7 @@ func (r *raft) sendAppend(to uint64) {
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.prs.getProgress(to)
pr := r.prs.Progress[to]
if pr.IsPaused() {
return false
}
@ -477,7 +478,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.becomeSnapshot(sindex)
pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
} else {
m.Type = pb.MsgApp
@ -487,13 +488,13 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
// optimistically increase the next when in ProgressStateReplicate
case ProgressStateReplicate:
// optimistically increase the next when in StateReplicate
case tracker.StateReplicate:
last := m.Entries[n-1].Index
pr.optimisticUpdate(last)
pr.ins.add(last)
case ProgressStateProbe:
pr.pause()
pr.OptimisticUpdate(last)
pr.Inflights.Add(last)
case tracker.StateProbe:
pr.ProbeSent = true
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
@ -511,7 +512,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
commit := min(r.prs.getProgress(to).Match, r.raftLog.committed)
commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
@ -525,7 +526,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
r.prs.visit(func(id uint64, _ *Progress) {
r.prs.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {
return
}
@ -545,7 +546,7 @@ func (r *raft) bcastHeartbeat() {
}
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
r.prs.visit(func(id uint64, _ *Progress) {
r.prs.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {
return
}
@ -557,7 +558,7 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
// the commit index changed (in which case the caller should call
// r.bcastAppend).
func (r *raft) maybeCommit() bool {
mci := r.prs.committed()
mci := r.prs.Committed()
return r.raftLog.maybeCommit(mci, r.Term)
}
@ -574,12 +575,12 @@ func (r *raft) reset(term uint64) {
r.abortLeaderTransfer()
r.prs.resetVotes()
r.prs.visit(func(id uint64, pr *Progress) {
*pr = Progress{
r.prs.ResetVotes()
r.prs.Visit(func(id uint64, pr *tracker.Progress) {
*pr = tracker.Progress{
Match: 0,
Next: r.raftLog.lastIndex() + 1,
ins: newInflights(r.prs.maxInflight),
Inflights: tracker.NewInflights(r.prs.MaxInflight),
IsLearner: pr.IsLearner,
}
if id == r.id {
@ -609,7 +610,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
r.prs.getProgress(r.id).maybeUpdate(li)
r.prs.Progress[r.id].MaybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
return true
@ -682,7 +683,7 @@ func (r *raft) becomePreCandidate() {
// but doesn't change anything else. In particular it does not increase
// r.Term or change r.Vote.
r.step = stepCandidate
r.prs.resetVotes()
r.prs.ResetVotes()
r.tick = r.tickElection
r.lead = None
r.state = StatePreCandidate
@ -703,7 +704,7 @@ func (r *raft) becomeLeader() {
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
r.prs.getProgress(r.id).becomeReplicate()
r.prs.Progress[r.id].BecomeReplicate()
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
@ -755,7 +756,7 @@ func (r *raft) campaign(t CampaignType) {
}
return
}
for id := range r.prs.voters.IDs() {
for id := range r.prs.Voters.IDs() {
if id == r.id {
continue
}
@ -776,8 +777,8 @@ func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected
} else {
r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
}
r.prs.recordVote(id, v)
return r.prs.tallyVotes()
r.prs.RecordVote(id, v)
return r.prs.TallyVotes()
}
func (r *raft) Step(m pb.Message) error {
@ -943,16 +944,16 @@ func stepLeader(r *raft, m pb.Message) error {
//
// TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
// leader steps down when removing itself. I might be missing something.
if pr := r.prs.getProgress(r.id); pr != nil {
if pr := r.prs.Progress[r.id]; pr != nil {
pr.RecentActive = true
}
if !r.prs.quorumActive() {
if !r.prs.QuorumActive() {
r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
r.becomeFollower(r.Term, None)
}
// Mark everyone (but ourselves) as inactive in preparation for the next
// CheckQuorum.
r.prs.visit(func(id uint64, pr *Progress) {
r.prs.Visit(func(id uint64, pr *tracker.Progress) {
if id != r.id {
pr.RecentActive = false
}
@ -962,7 +963,7 @@ func stepLeader(r *raft, m pb.Message) error {
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
if r.prs.getProgress(r.id) == nil {
if r.prs.Progress[r.id] == nil {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
@ -994,7 +995,7 @@ func stepLeader(r *raft, m pb.Message) error {
case pb.MsgReadIndex:
// If more than the local vote is needed, go through a full broadcast,
// otherwise optimize.
if !r.prs.isSingleton() {
if !r.prs.IsSingleton() {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return nil
@ -1029,7 +1030,7 @@ func stepLeader(r *raft, m pb.Message) error {
}
// All other message types require a progress for m.From (pr).
pr := r.prs.getProgress(m.From)
pr := r.prs.Progress[m.From]
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
@ -1041,30 +1042,30 @@ func stepLeader(r *raft, m pb.Message) error {
if m.Reject {
r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
if pr.maybeDecrTo(m.Index, m.RejectHint) {
if pr.MaybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.sendAppend(m.From)
}
} else {
oldPaused := pr.IsPaused()
if pr.maybeUpdate(m.Index) {
if pr.MaybeUpdate(m.Index) {
switch {
case pr.State == ProgressStateProbe:
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
case pr.State == tracker.StateProbe:
pr.BecomeReplicate()
case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
// move to replicating state, that would only happen with
// the next round of appends (but there may not be a next
// round for a while, exposing an inconsistent RaftStatus).
pr.becomeProbe()
pr.becomeReplicate()
case pr.State == ProgressStateReplicate:
pr.ins.freeTo(m.Index)
pr.BecomeProbe()
pr.BecomeReplicate()
case pr.State == tracker.StateReplicate:
pr.Inflights.FreeLE(m.Index)
}
if r.maybeCommit() {
@ -1091,11 +1092,11 @@ func stepLeader(r *raft, m pb.Message) error {
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.resume()
pr.ProbeSent = false
// free one slot for the full inflights window to allow progress.
if pr.State == ProgressStateReplicate && pr.ins.full() {
pr.ins.freeFirstOne()
if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
pr.Inflights.FreeFirstOne()
}
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
@ -1105,7 +1106,7 @@ func stepLeader(r *raft, m pb.Message) error {
return nil
}
if r.prs.voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}
@ -1119,26 +1120,32 @@ func stepLeader(r *raft, m pb.Message) error {
}
}
case pb.MsgSnapStatus:
if pr.State != ProgressStateSnapshot {
if pr.State != tracker.StateSnapshot {
return nil
}
// TODO(tbg): this code is very similar to the snapshot handling in
// MsgAppResp above. In fact, the code there is more correct than the
// code here and should likely be updated to match (or even better, the
// logic pulled into a newly created Progress state machine handler).
if !m.Reject {
pr.becomeProbe()
pr.BecomeProbe()
r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
} else {
pr.snapshotFailure()
pr.becomeProbe()
// NB: the order here matters or we'll be probing erroneously from
// the snapshot index, but the snapshot never applied.
pr.PendingSnapshot = 0
pr.BecomeProbe()
r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
// If snapshot finish, wait for the msgAppResp from the remote node before sending
// out the next msgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.pause()
pr.ProbeSent = true
case pb.MsgUnreachable:
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
case pb.MsgTransferLeader:
@ -1341,7 +1348,7 @@ func (r *raft) restore(s pb.Snapshot) bool {
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
r.raftLog.restore(s)
r.prs = makeProgressTracker(r.prs.maxInflight)
r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
r.restoreNode(s.Metadata.ConfState.Nodes, false)
r.restoreNode(s.Metadata.ConfState.Learners, true)
return true
@ -1354,15 +1361,15 @@ func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
match = next - 1
r.isLearner = isLearner
}
r.prs.initProgress(n, match, next, isLearner)
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.getProgress(n))
r.prs.InitProgress(n, match, next, isLearner)
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.Progress[n])
}
}
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
pr := r.prs.getProgress(r.id)
pr := r.prs.Progress[r.id]
return pr != nil && !pr.IsLearner
}
@ -1375,9 +1382,9 @@ func (r *raft) addLearner(id uint64) {
}
func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
pr := r.prs.getProgress(id)
pr := r.prs.Progress[id]
if pr == nil {
r.prs.initProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
} else {
if isLearner && !pr.IsLearner {
// Can only change Learner to Voter.
@ -1392,10 +1399,10 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
}
// Change Learner to Voter, use origin Learner progress.
r.prs.removeAny(id)
r.prs.initProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
r.prs.RemoveAny(id)
r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
pr.IsLearner = false
*r.prs.getProgress(id) = *pr
*r.prs.Progress[id] = *pr
}
if r.id == id {
@ -1405,14 +1412,14 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has a chance to communicate with us.
r.prs.getProgress(id).RecentActive = true
r.prs.Progress[id].RecentActive = true
}
func (r *raft) removeNode(id uint64) {
r.prs.removeAny(id)
r.prs.RemoveAny(id)
// Do not try to commit or abort transferring if the cluster is now empty.
if len(r.prs.voters[0]) == 0 && len(r.prs.learners) == 0 {
if len(r.prs.Voters[0]) == 0 && len(r.prs.Learners) == 0 {
return
}

View File

@ -29,11 +29,11 @@ func TestMsgAppFlowControlFull(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
pr2 := r.prs.prs[2]
pr2 := r.prs.Progress[2]
// force the progress to be in replicate state
pr2.becomeReplicate()
pr2.BecomeReplicate()
// fill in the inflights window
for i := 0; i < r.prs.maxInflight; i++ {
for i := 0; i < r.prs.MaxInflight; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
ms := r.readMessages()
if len(ms) != 1 {
@ -42,8 +42,8 @@ func TestMsgAppFlowControlFull(t *testing.T) {
}
// ensure 1
if !pr2.ins.full() {
t.Fatalf("inflights.full = %t, want %t", pr2.ins.full(), true)
if !pr2.Inflights.Full() {
t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true)
}
// ensure 2
@ -65,18 +65,18 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
pr2 := r.prs.prs[2]
pr2 := r.prs.Progress[2]
// force the progress to be in replicate state
pr2.becomeReplicate()
pr2.BecomeReplicate()
// fill in the inflights window
for i := 0; i < r.prs.maxInflight; i++ {
for i := 0; i < r.prs.MaxInflight; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
r.readMessages()
}
// 1 is noop, 2 is the first proposal we just sent.
// so we start with 2.
for tt := 2; tt < r.prs.maxInflight; tt++ {
for tt := 2; tt < r.prs.MaxInflight; tt++ {
// move forward the window
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(tt)})
r.readMessages()
@ -89,15 +89,15 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
}
// ensure 1
if !pr2.ins.full() {
t.Fatalf("inflights.full = %t, want %t", pr2.ins.full(), true)
if !pr2.Inflights.Full() {
t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true)
}
// ensure 2
for i := 0; i < tt; i++ {
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(i)})
if !pr2.ins.full() {
t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.ins.full(), true)
if !pr2.Inflights.Full() {
t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true)
}
}
}
@ -110,26 +110,26 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
pr2 := r.prs.prs[2]
pr2 := r.prs.Progress[2]
// force the progress to be in replicate state
pr2.becomeReplicate()
pr2.BecomeReplicate()
// fill in the inflights window
for i := 0; i < r.prs.maxInflight; i++ {
for i := 0; i < r.prs.MaxInflight; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
r.readMessages()
}
for tt := 1; tt < 5; tt++ {
if !pr2.ins.full() {
t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.ins.full(), true)
if !pr2.Inflights.Full() {
t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true)
}
// recv tt msgHeartbeatResp and expect one free slot
for i := 0; i < tt; i++ {
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
r.readMessages()
if pr2.ins.full() {
t.Fatalf("#%d.%d: inflights.full = %t, want %t", tt, i, pr2.ins.full(), false)
if pr2.Inflights.Full() {
t.Fatalf("#%d.%d: inflights.full = %t, want %t", tt, i, pr2.Inflights.Full(), false)
}
}

View File

@ -169,7 +169,7 @@ func testNonleaderStartElection(t *testing.T, state StateType) {
if r.state != StateCandidate {
t.Errorf("state = %s, want %s", r.state, StateCandidate)
}
if !r.prs.votes[r.id] {
if !r.prs.Votes[r.id] {
t.Errorf("vote for self = false, want true")
}
msgs := r.readMessages()

View File

@ -18,6 +18,7 @@ import (
"testing"
pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
)
var (
@ -40,11 +41,11 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
// force set the next of node 2, so that
// node 2 needs a snapshot
sm.prs.prs[2].Next = sm.raftLog.firstIndex()
sm.prs.Progress[2].Next = sm.raftLog.firstIndex()
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true})
if sm.prs.prs[2].PendingSnapshot != 11 {
t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.prs[2].PendingSnapshot)
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.Progress[2].Next - 1, Reject: true})
if sm.prs.Progress[2].PendingSnapshot != 11 {
t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.Progress[2].PendingSnapshot)
}
}
@ -56,7 +57,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) {
sm.becomeCandidate()
sm.becomeLeader()
sm.prs.prs[2].becomeSnapshot(11)
sm.prs.Progress[2].BecomeSnapshot(11)
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
msgs := sm.readMessages()
@ -73,18 +74,18 @@ func TestSnapshotFailure(t *testing.T) {
sm.becomeCandidate()
sm.becomeLeader()
sm.prs.prs[2].Next = 1
sm.prs.prs[2].becomeSnapshot(11)
sm.prs.Progress[2].Next = 1
sm.prs.Progress[2].BecomeSnapshot(11)
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
if sm.prs.prs[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot)
if sm.prs.Progress[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot)
}
if sm.prs.prs[2].Next != 1 {
t.Fatalf("Next = %d, want 1", sm.prs.prs[2].Next)
if sm.prs.Progress[2].Next != 1 {
t.Fatalf("Next = %d, want 1", sm.prs.Progress[2].Next)
}
if !sm.prs.prs[2].Paused {
t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused)
if !sm.prs.Progress[2].ProbeSent {
t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent)
}
}
@ -96,18 +97,18 @@ func TestSnapshotSucceed(t *testing.T) {
sm.becomeCandidate()
sm.becomeLeader()
sm.prs.prs[2].Next = 1
sm.prs.prs[2].becomeSnapshot(11)
sm.prs.Progress[2].Next = 1
sm.prs.Progress[2].BecomeSnapshot(11)
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
if sm.prs.prs[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot)
if sm.prs.Progress[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot)
}
if sm.prs.prs[2].Next != 12 {
t.Fatalf("Next = %d, want 12", sm.prs.prs[2].Next)
if sm.prs.Progress[2].Next != 12 {
t.Fatalf("Next = %d, want 12", sm.prs.Progress[2].Next)
}
if !sm.prs.prs[2].Paused {
t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused)
if !sm.prs.Progress[2].ProbeSent {
t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent)
}
}
@ -206,8 +207,8 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) {
mustSend(n2, n1, pb.MsgAppResp)
// Leader has correct state for follower.
pr := n1.prs.prs[2]
if pr.State != ProgressStateReplicate {
pr := n1.prs.Progress[2]
if pr.State != tracker.StateReplicate {
t.Fatalf("unexpected state %v", pr)
}
if pr.Match != expIdx || pr.Next != expIdx+1 {
@ -227,23 +228,23 @@ func TestSnapshotAbort(t *testing.T) {
sm.becomeCandidate()
sm.becomeLeader()
sm.prs.prs[2].Next = 1
sm.prs.prs[2].becomeSnapshot(11)
sm.prs.Progress[2].Next = 1
sm.prs.Progress[2].BecomeSnapshot(11)
// A successful msgAppResp that has a higher/equal index than the
// pending snapshot should abort the pending snapshot.
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11})
if sm.prs.prs[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot)
if sm.prs.Progress[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot)
}
// The follower entered ProgressStateReplicate and the leader send an append
// The follower entered StateReplicate and the leader send an append
// and optimistically updated the progress (so we see 13 instead of 12).
// There is something to append because the leader appended an empty entry
// to the log at index 12 when it assumed leadership.
if sm.prs.prs[2].Next != 13 {
t.Fatalf("Next = %d, want 13", sm.prs.prs[2].Next)
if sm.prs.Progress[2].Next != 13 {
t.Fatalf("Next = %d, want 13", sm.prs.Progress[2].Next)
}
if n := sm.prs.prs[2].ins.count; n != 1 {
if n := sm.prs.Progress[2].Inflights.Count(); n != 1 {
t.Fatalf("expected an inflight message, got %d", n)
}
}

View File

@ -24,6 +24,7 @@ import (
"testing"
pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
)
// nextEnts returns the appliable entries and updates the applied index
@ -55,228 +56,16 @@ func (r *raft) readMessages() []pb.Message {
return msgs
}
func TestProgressBecomeProbe(t *testing.T) {
match := uint64(1)
tests := []struct {
p *Progress
wnext uint64
}{
{
&Progress{State: ProgressStateReplicate, Match: match, Next: 5, ins: newInflights(256)},
2,
},
{
// snapshot finish
&Progress{State: ProgressStateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, ins: newInflights(256)},
11,
},
{
// snapshot failure
&Progress{State: ProgressStateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, ins: newInflights(256)},
2,
},
}
for i, tt := range tests {
tt.p.becomeProbe()
if tt.p.State != ProgressStateProbe {
t.Errorf("#%d: state = %s, want %s", i, tt.p.State, ProgressStateProbe)
}
if tt.p.Match != match {
t.Errorf("#%d: match = %d, want %d", i, tt.p.Match, match)
}
if tt.p.Next != tt.wnext {
t.Errorf("#%d: next = %d, want %d", i, tt.p.Next, tt.wnext)
}
}
}
func TestProgressBecomeReplicate(t *testing.T) {
p := &Progress{State: ProgressStateProbe, Match: 1, Next: 5, ins: newInflights(256)}
p.becomeReplicate()
if p.State != ProgressStateReplicate {
t.Errorf("state = %s, want %s", p.State, ProgressStateReplicate)
}
if p.Match != 1 {
t.Errorf("match = %d, want 1", p.Match)
}
if w := p.Match + 1; p.Next != w {
t.Errorf("next = %d, want %d", p.Next, w)
}
}
func TestProgressBecomeSnapshot(t *testing.T) {
p := &Progress{State: ProgressStateProbe, Match: 1, Next: 5, ins: newInflights(256)}
p.becomeSnapshot(10)
if p.State != ProgressStateSnapshot {
t.Errorf("state = %s, want %s", p.State, ProgressStateSnapshot)
}
if p.Match != 1 {
t.Errorf("match = %d, want 1", p.Match)
}
if p.PendingSnapshot != 10 {
t.Errorf("pendingSnapshot = %d, want 10", p.PendingSnapshot)
}
}
func TestProgressUpdate(t *testing.T) {
prevM, prevN := uint64(3), uint64(5)
tests := []struct {
update uint64
wm uint64
wn uint64
wok bool
}{
{prevM - 1, prevM, prevN, false}, // do not decrease match, next
{prevM, prevM, prevN, false}, // do not decrease next
{prevM + 1, prevM + 1, prevN, true}, // increase match, do not decrease next
{prevM + 2, prevM + 2, prevN + 1, true}, // increase match, next
}
for i, tt := range tests {
p := &Progress{
Match: prevM,
Next: prevN,
}
ok := p.maybeUpdate(tt.update)
if ok != tt.wok {
t.Errorf("#%d: ok= %v, want %v", i, ok, tt.wok)
}
if p.Match != tt.wm {
t.Errorf("#%d: match= %d, want %d", i, p.Match, tt.wm)
}
if p.Next != tt.wn {
t.Errorf("#%d: next= %d, want %d", i, p.Next, tt.wn)
}
}
}
func TestProgressMaybeDecr(t *testing.T) {
tests := []struct {
state ProgressStateType
m uint64
n uint64
rejected uint64
last uint64
w bool
wn uint64
}{
{
// state replicate and rejected is not greater than match
ProgressStateReplicate, 5, 10, 5, 5, false, 10,
},
{
// state replicate and rejected is not greater than match
ProgressStateReplicate, 5, 10, 4, 4, false, 10,
},
{
// state replicate and rejected is greater than match
// directly decrease to match+1
ProgressStateReplicate, 5, 10, 9, 9, true, 6,
},
{
// next-1 != rejected is always false
ProgressStateProbe, 0, 0, 0, 0, false, 0,
},
{
// next-1 != rejected is always false
ProgressStateProbe, 0, 10, 5, 5, false, 10,
},
{
// next>1 = decremented by 1
ProgressStateProbe, 0, 10, 9, 9, true, 9,
},
{
// next>1 = decremented by 1
ProgressStateProbe, 0, 2, 1, 1, true, 1,
},
{
// next<=1 = reset to 1
ProgressStateProbe, 0, 1, 0, 0, true, 1,
},
{
// decrease to min(rejected, last+1)
ProgressStateProbe, 0, 10, 9, 2, true, 3,
},
{
// rejected < 1, reset to 1
ProgressStateProbe, 0, 10, 9, 0, true, 1,
},
}
for i, tt := range tests {
p := &Progress{
State: tt.state,
Match: tt.m,
Next: tt.n,
}
if g := p.maybeDecrTo(tt.rejected, tt.last); g != tt.w {
t.Errorf("#%d: maybeDecrTo= %t, want %t", i, g, tt.w)
}
if gm := p.Match; gm != tt.m {
t.Errorf("#%d: match= %d, want %d", i, gm, tt.m)
}
if gn := p.Next; gn != tt.wn {
t.Errorf("#%d: next= %d, want %d", i, gn, tt.wn)
}
}
}
func TestProgressIsPaused(t *testing.T) {
tests := []struct {
state ProgressStateType
paused bool
w bool
}{
{ProgressStateProbe, false, false},
{ProgressStateProbe, true, true},
{ProgressStateReplicate, false, false},
{ProgressStateReplicate, true, false},
{ProgressStateSnapshot, false, true},
{ProgressStateSnapshot, true, true},
}
for i, tt := range tests {
p := &Progress{
State: tt.state,
Paused: tt.paused,
ins: newInflights(256),
}
if g := p.IsPaused(); g != tt.w {
t.Errorf("#%d: paused= %t, want %t", i, g, tt.w)
}
}
}
// TestProgressResume ensures that progress.maybeUpdate and progress.maybeDecrTo
// will reset progress.paused.
func TestProgressResume(t *testing.T) {
p := &Progress{
Next: 2,
Paused: true,
}
p.maybeDecrTo(1, 1)
if p.Paused {
t.Errorf("paused= %v, want false", p.Paused)
}
p.Paused = true
p.maybeUpdate(2)
if p.Paused {
t.Errorf("paused= %v, want false", p.Paused)
}
}
func TestProgressLeader(t *testing.T) {
r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
r.becomeCandidate()
r.becomeLeader()
r.prs.prs[2].becomeReplicate()
r.prs.Progress[2].BecomeReplicate()
// Send proposals to r1. The first 5 entries should be appended to the log.
propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}}
for i := 0; i < 5; i++ {
if pr := r.prs.prs[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 {
if pr := r.prs.Progress[r.id]; pr.State != tracker.StateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 {
t.Errorf("unexpected progress %v", pr)
}
if err := r.Step(propMsg); err != nil {
@ -291,17 +80,17 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
r.prs.prs[2].Paused = true
r.prs.Progress[2].ProbeSent = true
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
if !r.prs.prs[2].Paused {
t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
if !r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
}
r.prs.prs[2].becomeReplicate()
r.prs.Progress[2].BecomeReplicate()
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
if r.prs.prs[2].Paused {
t.Errorf("paused = %v, want false", r.prs.prs[2].Paused)
if r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent)
}
}
@ -331,7 +120,7 @@ func TestProgressFlowControl(t *testing.T) {
r.readMessages()
// While node 2 is in probe state, propose a bunch of entries.
r.prs.prs[2].becomeProbe()
r.prs.Progress[2].BecomeProbe()
blob := []byte(strings.Repeat("a", 1000))
for i := 0; i < 10; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
@ -409,8 +198,8 @@ func TestUncommittedEntryLimit(t *testing.T) {
// Set the two followers to the replicate state. Commit to tail of log.
const numFollowers = 2
r.prs.prs[2].becomeReplicate()
r.prs.prs[3].becomeReplicate()
r.prs.Progress[2].BecomeReplicate()
r.prs.Progress[3].BecomeReplicate()
r.uncommittedSize = 0
// Send proposals to r1. The first 5 entries should be appended to the log.
@ -889,7 +678,7 @@ func TestLearnerLogReplication(t *testing.T) {
t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed)
}
match := n1.prs.getProgress(2).Match
match := n1.prs.Progress[2].Match
if match != n2.raftLog.committed {
t.Errorf("progress 2 of leader 1 wants match %d, but got %d", n2.raftLog.committed, match)
}
@ -1351,9 +1140,9 @@ func TestCommit(t *testing.T) {
storage.hardState = pb.HardState{Term: tt.smTerm}
sm := newTestRaft(1, []uint64{1}, 10, 2, storage)
sm.prs.removeAny(1)
sm.prs.RemoveAny(1)
for j := 0; j < len(tt.matches); j++ {
sm.prs.initProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false)
sm.prs.InitProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false)
}
sm.maybeCommit()
if g := sm.raftLog.committed; g != tt.w {
@ -2138,7 +1927,7 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) {
nt := newNetwork(a, b)
setRandomizedElectionTimeout(b, b.electionTimeout+1)
// Need to remove 2 again to make it a non-promotable node since newNetwork overwritten some internal states
b.prs.removeAny(2)
b.prs.RemoveAny(2)
if b.promotable() {
t.Fatalf("promotable = %v, want false", b.promotable())
@ -2632,7 +2421,7 @@ func TestLeaderAppResp(t *testing.T) {
sm.readMessages()
sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index})
p := sm.prs.prs[2]
p := sm.prs.Progress[2]
if p.Match != tt.wmatch {
t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch)
}
@ -2679,9 +2468,9 @@ func TestBcastBeat(t *testing.T) {
mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1})
}
// slow follower
sm.prs.prs[2].Match, sm.prs.prs[2].Next = 5, 6
sm.prs.Progress[2].Match, sm.prs.Progress[2].Next = 5, 6
// normal follower
sm.prs.prs[3].Match, sm.prs.prs[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
sm.prs.Progress[3].Match, sm.prs.Progress[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
sm.Step(pb.Message{Type: pb.MsgBeat})
msgs := sm.readMessages()
@ -2689,8 +2478,8 @@ func TestBcastBeat(t *testing.T) {
t.Fatalf("len(msgs) = %v, want 2", len(msgs))
}
wantCommitMap := map[uint64]uint64{
2: min(sm.raftLog.committed, sm.prs.prs[2].Match),
3: min(sm.raftLog.committed, sm.prs.prs[3].Match),
2: min(sm.raftLog.committed, sm.prs.Progress[2].Match),
3: min(sm.raftLog.committed, sm.prs.Progress[3].Match),
}
for i, m := range msgs {
if m.Type != pb.MsgHeartbeat {
@ -2759,16 +2548,16 @@ func TestLeaderIncreaseNext(t *testing.T) {
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
tests := []struct {
// progress
state ProgressStateType
state tracker.StateType
next uint64
wnext uint64
}{
// state replicate, optimistically increase next
// previous entries + noop entry + propose + 1
{ProgressStateReplicate, 2, uint64(len(previousEnts) + 1 + 1 + 1)},
{tracker.StateReplicate, 2, uint64(len(previousEnts) + 1 + 1 + 1)},
// state probe, not optimistically increase next
{ProgressStateProbe, 2, 2},
{tracker.StateProbe, 2, 2},
}
for i, tt := range tests {
@ -2776,11 +2565,11 @@ func TestLeaderIncreaseNext(t *testing.T) {
sm.raftLog.append(previousEnts...)
sm.becomeCandidate()
sm.becomeLeader()
sm.prs.prs[2].State = tt.state
sm.prs.prs[2].Next = tt.next
sm.prs.Progress[2].State = tt.state
sm.prs.Progress[2].Next = tt.next
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
p := sm.prs.prs[2]
p := sm.prs.Progress[2]
if p.Next != tt.wnext {
t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
}
@ -2792,7 +2581,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
r.prs.prs[2].becomeProbe()
r.prs.Progress[2].BecomeProbe()
// each round is a heartbeat
for i := 0; i < 3; i++ {
@ -2811,8 +2600,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
}
}
if !r.prs.prs[2].Paused {
t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
if !r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
}
for j := 0; j < 10; j++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
@ -2826,8 +2615,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
for j := 0; j < r.heartbeatTimeout; j++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
}
if !r.prs.prs[2].Paused {
t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
if !r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
}
// consume the heartbeat
@ -2849,8 +2638,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
if msg[0].Index != 0 {
t.Errorf("index = %d, want %d", msg[0].Index, 0)
}
if !r.prs.prs[2].Paused {
t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
if !r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
}
}
@ -2859,7 +2648,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
r.prs.prs[2].becomeReplicate()
r.prs.Progress[2].BecomeReplicate()
for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
@ -2876,7 +2665,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
r.prs.prs[2].becomeSnapshot(10)
r.prs.Progress[2].BecomeSnapshot(10)
for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
@ -2897,17 +2686,17 @@ func TestRecvMsgUnreachable(t *testing.T) {
r.becomeLeader()
r.readMessages()
// set node 2 to state replicate
r.prs.prs[2].Match = 3
r.prs.prs[2].becomeReplicate()
r.prs.prs[2].optimisticUpdate(5)
r.prs.Progress[2].Match = 3
r.prs.Progress[2].BecomeReplicate()
r.prs.Progress[2].OptimisticUpdate(5)
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})
if r.prs.prs[2].State != ProgressStateProbe {
t.Errorf("state = %s, want %s", r.prs.prs[2].State, ProgressStateProbe)
if r.prs.Progress[2].State != tracker.StateProbe {
t.Errorf("state = %s, want %s", r.prs.Progress[2].State, tracker.StateProbe)
}
if wnext := r.prs.prs[2].Match + 1; r.prs.prs[2].Next != wnext {
t.Errorf("next = %d, want %d", r.prs.prs[2].Next, wnext)
if wnext := r.prs.Progress[2].Match + 1; r.prs.Progress[2].Next != wnext {
t.Errorf("next = %d, want %d", r.prs.Progress[2].Next, wnext)
}
}
@ -2932,7 +2721,7 @@ func TestRestore(t *testing.T) {
if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
}
sg := sm.prs.voterNodes()
sg := sm.prs.VoterNodes()
if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) {
t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Metadata.ConfState.Nodes)
}
@ -2964,22 +2753,22 @@ func TestRestoreWithLearner(t *testing.T) {
if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
}
sg := sm.prs.voterNodes()
sg := sm.prs.VoterNodes()
if len(sg) != len(s.Metadata.ConfState.Nodes) {
t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Nodes)
}
lns := sm.prs.learnerNodes()
lns := sm.prs.LearnerNodes()
if len(lns) != len(s.Metadata.ConfState.Learners) {
t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners)
}
for _, n := range s.Metadata.ConfState.Nodes {
if sm.prs.prs[n].IsLearner {
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], false)
if sm.prs.Progress[n].IsLearner {
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.Progress[n], false)
}
}
for _, n := range s.Metadata.ConfState.Learners {
if !sm.prs.prs[n].IsLearner {
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], true)
if !sm.prs.Progress[n].IsLearner {
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.Progress[n], true)
}
}
@ -3121,8 +2910,8 @@ func TestProvideSnap(t *testing.T) {
sm.becomeLeader()
// force set the next of node 2, so that node 2 needs a snapshot
sm.prs.prs[2].Next = sm.raftLog.firstIndex()
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true})
sm.prs.Progress[2].Next = sm.raftLog.firstIndex()
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.Progress[2].Next - 1, Reject: true})
msgs := sm.readMessages()
if len(msgs) != 1 {
@ -3152,8 +2941,8 @@ func TestIgnoreProvidingSnap(t *testing.T) {
// force set the next of node 2, so that node 2 needs a snapshot
// change node 2 to be inactive, expect node 1 ignore sending snapshot to 2
sm.prs.prs[2].Next = sm.raftLog.firstIndex() - 1
sm.prs.prs[2].RecentActive = false
sm.prs.Progress[2].Next = sm.raftLog.firstIndex() - 1
sm.prs.Progress[2].RecentActive = false
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
@ -3193,7 +2982,7 @@ func TestSlowNodeRestore(t *testing.T) {
}
lead := nt.peers[1].(*raft)
nextEnts(lead, nt.storage[1])
nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.voterNodes()}, nil)
nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.VoterNodes()}, nil)
nt.storage[1].Compact(lead.raftLog.applied)
nt.recover()
@ -3201,7 +2990,7 @@ func TestSlowNodeRestore(t *testing.T) {
// node 3 will only be considered as active when node 1 receives a reply from it.
for {
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
if lead.prs.prs[3].RecentActive {
if lead.prs.Progress[3].RecentActive {
break
}
}
@ -3288,7 +3077,7 @@ func TestNewLeaderPendingConfig(t *testing.T) {
func TestAddNode(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.addNode(2)
nodes := r.prs.voterNodes()
nodes := r.prs.VoterNodes()
wnodes := []uint64{1, 2}
if !reflect.DeepEqual(nodes, wnodes) {
t.Errorf("nodes = %v, want %v", nodes, wnodes)
@ -3299,13 +3088,13 @@ func TestAddNode(t *testing.T) {
func TestAddLearner(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.addLearner(2)
nodes := r.prs.learnerNodes()
nodes := r.prs.LearnerNodes()
wnodes := []uint64{2}
if !reflect.DeepEqual(nodes, wnodes) {
t.Errorf("nodes = %v, want %v", nodes, wnodes)
}
if !r.prs.prs[2].IsLearner {
t.Errorf("node 2 is learner %t, want %t", r.prs.prs[2].IsLearner, true)
if !r.prs.Progress[2].IsLearner {
t.Errorf("node 2 is learner %t, want %t", r.prs.Progress[2].IsLearner, true)
}
}
@ -3349,14 +3138,14 @@ func TestRemoveNode(t *testing.T) {
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
r.removeNode(2)
w := []uint64{1}
if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) {
if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
// remove all nodes from cluster
r.removeNode(1)
w = []uint64{}
if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) {
if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
}
@ -3367,18 +3156,18 @@ func TestRemoveLearner(t *testing.T) {
r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
r.removeNode(2)
w := []uint64{1}
if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) {
if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
w = []uint64{}
if g := r.prs.learnerNodes(); !reflect.DeepEqual(g, w) {
if g := r.prs.LearnerNodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
// remove all nodes from cluster
r.removeNode(1)
if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) {
if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
}
@ -3417,8 +3206,8 @@ func TestRaftNodes(t *testing.T) {
}
for i, tt := range tests {
r := newTestRaft(1, tt.ids, 10, 1, NewMemoryStorage())
if !reflect.DeepEqual(r.prs.voterNodes(), tt.wids) {
t.Errorf("#%d: nodes = %+v, want %+v", i, r.prs.voterNodes(), tt.wids)
if !reflect.DeepEqual(r.prs.VoterNodes(), tt.wids) {
t.Errorf("#%d: nodes = %+v, want %+v", i, r.prs.VoterNodes(), tt.wids)
}
}
}
@ -3619,8 +3408,8 @@ func TestLeaderTransferToSlowFollower(t *testing.T) {
nt.recover()
lead := nt.peers[1].(*raft)
if lead.prs.prs[3].Match != 1 {
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1)
if lead.prs.Progress[3].Match != 1 {
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.Progress[3].Match, 1)
}
// Transfer leadership to 3 when node 3 is lack of log.
@ -3638,12 +3427,12 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) {
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
lead := nt.peers[1].(*raft)
nextEnts(lead, nt.storage[1])
nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.voterNodes()}, nil)
nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.VoterNodes()}, nil)
nt.storage[1].Compact(lead.raftLog.applied)
nt.recover()
if lead.prs.prs[3].Match != 1 {
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1)
if lead.prs.Progress[3].Match != 1 {
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.Progress[3].Match, 1)
}
// Transfer leadership to 3 when node 3 is lack of snapshot.
@ -3722,8 +3511,8 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) {
t.Fatalf("should return drop proposal error while transferring")
}
if lead.prs.prs[1].Match != 1 {
t.Fatalf("node 1 has match %x, want %x", lead.prs.prs[1].Match, 1)
if lead.prs.Progress[1].Match != 1 {
t.Fatalf("node 1 has match %x, want %x", lead.prs.Progress[1].Match, 1)
}
}
@ -4329,24 +4118,21 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
sm := newRaft(cfg)
npeers[id] = sm
case *raft:
learners := make(map[uint64]bool, len(v.prs.learners))
for i := range v.prs.learners {
learners := make(map[uint64]bool, len(v.prs.Learners))
for i := range v.prs.Learners {
learners[i] = true
}
v.id = id
v.prs.voters[0] = make(map[uint64]struct{})
v.prs.voters[1] = make(map[uint64]struct{})
v.prs.learners = make(map[uint64]struct{})
v.prs.prs = make(map[uint64]*Progress)
v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight)
for i := 0; i < size; i++ {
pr := &Progress{}
pr := &tracker.Progress{}
if _, ok := learners[peerAddrs[i]]; ok {
pr.IsLearner = true
v.prs.learners[peerAddrs[i]] = struct{}{}
v.prs.Learners[peerAddrs[i]] = struct{}{}
} else {
v.prs.voters[0][peerAddrs[i]] = struct{}{}
v.prs.Voters[0][peerAddrs[i]] = struct{}{}
}
v.prs.prs[peerAddrs[i]] = pr
v.prs.Progress[peerAddrs[i]] = pr
}
v.reset(v.Term)
npeers[id] = v

View File

@ -18,6 +18,7 @@ import (
"errors"
pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
)
// ErrStepLocalMsg is returned when try to step a local raft message
@ -166,7 +167,7 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
// ApplyConfChange applies a config change to the local node.
func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
if cc.NodeID == None {
return &pb.ConfState{Nodes: rn.raft.prs.voterNodes(), Learners: rn.raft.prs.learnerNodes()}
return &pb.ConfState{Nodes: rn.raft.prs.VoterNodes(), Learners: rn.raft.prs.LearnerNodes()}
}
switch cc.Type {
case pb.ConfChangeAddNode:
@ -179,7 +180,7 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
default:
panic("unexpected conf type")
}
return &pb.ConfState{Nodes: rn.raft.prs.voterNodes(), Learners: rn.raft.prs.learnerNodes()}
return &pb.ConfState{Nodes: rn.raft.prs.VoterNodes(), Learners: rn.raft.prs.LearnerNodes()}
}
// Step advances the state machine using the given message.
@ -188,7 +189,7 @@ func (rn *RawNode) Step(m pb.Message) error {
if IsLocalMsg(m.Type) {
return ErrStepLocalMsg
}
if pr := rn.raft.prs.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
if pr := rn.raft.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
return rn.raft.Step(m)
}
return ErrStepPeerNotFound
@ -256,14 +257,14 @@ const (
// WithProgress is a helper to introspect the Progress for this node and its
// peers.
func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress)) {
rn.raft.prs.visit(func(id uint64, pr *Progress) {
func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr tracker.Progress)) {
rn.raft.prs.Visit(func(id uint64, pr *tracker.Progress) {
typ := ProgressTypePeer
if pr.IsLearner {
typ = ProgressTypeLearner
}
p := *pr
p.ins = nil
p.Inflights = nil
visitor(id, typ, p)
})
}

View File

@ -22,6 +22,7 @@ import (
"testing"
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
)
// rawNodeAdapter is essentially a lint that makes sure that RawNode implements
@ -638,7 +639,7 @@ func BenchmarkStatusProgress(b *testing.B) {
b.Run("WithProgress", func(b *testing.B) {
b.ReportAllocs()
visit := func(uint64, ProgressType, Progress) {}
visit := func(uint64, ProgressType, tracker.Progress) {}
for i := 0; i < b.N; i++ {
rn.WithProgress(visit)
@ -648,7 +649,7 @@ func BenchmarkStatusProgress(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
var n uint64
visit := func(_ uint64, _ ProgressType, pr Progress) {
visit := func(_ uint64, _ ProgressType, pr tracker.Progress) {
n += pr.Match
}
rn.WithProgress(visit)

View File

@ -18,6 +18,7 @@ import (
"fmt"
pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
)
type Status struct {
@ -27,20 +28,20 @@ type Status struct {
SoftState
Applied uint64
Progress map[uint64]Progress
Progress map[uint64]tracker.Progress
LeadTransferee uint64
}
func getProgressCopy(r *raft) map[uint64]Progress {
m := make(map[uint64]Progress)
r.prs.visit(func(id uint64, pr *Progress) {
var p Progress
func getProgressCopy(r *raft) map[uint64]tracker.Progress {
m := make(map[uint64]tracker.Progress)
r.prs.Visit(func(id uint64, pr *tracker.Progress) {
var p tracker.Progress
p, pr = *pr, nil /* avoid accidental reuse below */
// The inflight buffer is tricky to copy and besides, it isn't exposed
// to the client, so pretend it's nil.
p.ins = nil
p.Inflights = nil
m[id] = p
})

124
raft/tracker/inflights.go Normal file
View File

@ -0,0 +1,124 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tracker
// Inflights limits the number of MsgApp (represented by the largest index
// contained within) sent to followers but not yet acknowledged by them. Callers
// use Full() to check whether more messages can be sent, call Add() whenever
// they are sending a new append, and release "quota" via FreeLE() whenever an
// ack is received.
type Inflights struct {
// the starting index in the buffer
start int
// number of inflights in the buffer
count int
// the size of the buffer
size int
// buffer contains the index of the last entry
// inside one message.
buffer []uint64
}
// NewInflights sets up an Inflights that allows up to 'size' inflight messages.
func NewInflights(size int) *Inflights {
return &Inflights{
size: size,
}
}
// Add notifies the Inflights that a new message with the given index is being
// dispatched. Full() must be called prior to Add() to verify that there is room
// for one more message, and consecutive calls to add Add() must provide a
// monotonic sequence of indexes.
func (in *Inflights) Add(inflight uint64) {
if in.Full() {
panic("cannot add into a Full inflights")
}
next := in.start + in.count
size := in.size
if next >= size {
next -= size
}
if next >= len(in.buffer) {
in.grow()
}
in.buffer[next] = inflight
in.count++
}
// grow the inflight buffer by doubling up to inflights.size. We grow on demand
// instead of preallocating to inflights.size to handle systems which have
// thousands of Raft groups per process.
func (in *Inflights) grow() {
newSize := len(in.buffer) * 2
if newSize == 0 {
newSize = 1
} else if newSize > in.size {
newSize = in.size
}
newBuffer := make([]uint64, newSize)
copy(newBuffer, in.buffer)
in.buffer = newBuffer
}
// FreeLE frees the inflights smaller or equal to the given `to` flight.
func (in *Inflights) FreeLE(to uint64) {
if in.count == 0 || to < in.buffer[in.start] {
// out of the left side of the window
return
}
idx := in.start
var i int
for i = 0; i < in.count; i++ {
if to < in.buffer[idx] { // found the first large inflight
break
}
// increase index and maybe rotate
size := in.size
if idx++; idx >= size {
idx -= size
}
}
// free i inflights and set new start index
in.count -= i
in.start = idx
if in.count == 0 {
// inflights is empty, reset the start index so that we don't grow the
// buffer unnecessarily.
in.start = 0
}
}
// FreeFirstOne releases the first inflight. This is a no-op if nothing is
// inflight.
func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) }
// Full returns true if no more messages can be sent at the moment.
func (in *Inflights) Full() bool {
return in.count == in.size
}
// Count returns the number of inflight messages.
func (in *Inflights) Count() int { return in.count }
// reset frees all inflights.
func (in *Inflights) reset() {
in.count = 0
in.start = 0
}

View File

@ -1,4 +1,4 @@
// Copyright 2015 The etcd Authors
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package raft
package tracker
import (
"reflect"
@ -21,16 +21,16 @@ import (
func TestInflightsAdd(t *testing.T) {
// no rotating case
in := &inflights{
in := &Inflights{
size: 10,
buffer: make([]uint64, 10),
}
for i := 0; i < 5; i++ {
in.add(uint64(i))
in.Add(uint64(i))
}
wantIn := &inflights{
wantIn := &Inflights{
start: 0,
count: 5,
size: 10,
@ -43,10 +43,10 @@ func TestInflightsAdd(t *testing.T) {
}
for i := 5; i < 10; i++ {
in.add(uint64(i))
in.Add(uint64(i))
}
wantIn2 := &inflights{
wantIn2 := &Inflights{
start: 0,
count: 10,
size: 10,
@ -59,17 +59,17 @@ func TestInflightsAdd(t *testing.T) {
}
// rotating case
in2 := &inflights{
in2 := &Inflights{
start: 5,
size: 10,
buffer: make([]uint64, 10),
}
for i := 0; i < 5; i++ {
in2.add(uint64(i))
in2.Add(uint64(i))
}
wantIn21 := &inflights{
wantIn21 := &Inflights{
start: 5,
count: 5,
size: 10,
@ -82,10 +82,10 @@ func TestInflightsAdd(t *testing.T) {
}
for i := 5; i < 10; i++ {
in2.add(uint64(i))
in2.Add(uint64(i))
}
wantIn22 := &inflights{
wantIn22 := &Inflights{
start: 5,
count: 10,
size: 10,
@ -100,14 +100,14 @@ func TestInflightsAdd(t *testing.T) {
func TestInflightFreeTo(t *testing.T) {
// no rotating case
in := newInflights(10)
in := NewInflights(10)
for i := 0; i < 10; i++ {
in.add(uint64(i))
in.Add(uint64(i))
}
in.freeTo(4)
in.FreeLE(4)
wantIn := &inflights{
wantIn := &Inflights{
start: 5,
count: 5,
size: 10,
@ -119,9 +119,9 @@ func TestInflightFreeTo(t *testing.T) {
t.Fatalf("in = %+v, want %+v", in, wantIn)
}
in.freeTo(8)
in.FreeLE(8)
wantIn2 := &inflights{
wantIn2 := &Inflights{
start: 9,
count: 1,
size: 10,
@ -135,12 +135,12 @@ func TestInflightFreeTo(t *testing.T) {
// rotating case
for i := 10; i < 15; i++ {
in.add(uint64(i))
in.Add(uint64(i))
}
in.freeTo(12)
in.FreeLE(12)
wantIn3 := &inflights{
wantIn3 := &Inflights{
start: 3,
count: 2,
size: 10,
@ -152,9 +152,9 @@ func TestInflightFreeTo(t *testing.T) {
t.Fatalf("in = %+v, want %+v", in, wantIn3)
}
in.freeTo(14)
in.FreeLE(14)
wantIn4 := &inflights{
wantIn4 := &Inflights{
start: 0,
count: 0,
size: 10,
@ -168,14 +168,14 @@ func TestInflightFreeTo(t *testing.T) {
}
func TestInflightFreeFirstOne(t *testing.T) {
in := newInflights(10)
in := NewInflights(10)
for i := 0; i < 10; i++ {
in.add(uint64(i))
in.Add(uint64(i))
}
in.freeFirstOne()
in.FreeFirstOne()
wantIn := &inflights{
wantIn := &Inflights{
start: 1,
count: 9,
size: 10,

215
raft/tracker/progress.go Normal file
View File

@ -0,0 +1,215 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tracker
import "fmt"
// Progress represents a followers progress in the view of the leader. Leader
// maintains progresses of all followers, and sends entries to the follower
// based on its progress.
//
// NB(tbg): Progress is basically a state machine whose transitions are mostly
// strewn around `*raft.raft`. Additionally, some fields are only used when in a
// certain State. All of this isn't ideal.
type Progress struct {
Match, Next uint64
// State defines how the leader should interact with the follower.
//
// When in StateProbe, leader sends at most one replication message
// per heartbeat interval. It also probes actual progress of the follower.
//
// When in StateReplicate, leader optimistically increases next
// to the latest entry sent after sending replication message. This is
// an optimized state for fast replicating log entries to the follower.
//
// When in StateSnapshot, leader should have sent out snapshot
// before and stops sending any replication message.
State StateType
// PendingSnapshot is used in StateSnapshot.
// If there is a pending snapshot, the pendingSnapshot will be set to the
// index of the snapshot. If pendingSnapshot is set, the replication process of
// this Progress will be paused. raft will not resend snapshot until the pending one
// is reported to be failed.
PendingSnapshot uint64
// RecentActive is true if the progress is recently active. Receiving any messages
// from the corresponding follower indicates the progress is active.
// RecentActive can be reset to false after an election timeout.
RecentActive bool
// ProbeSent is used while this follow is in StateProbe. When ProbeSent is
// true, raft should pause sending replication message to this peer until
// ProbeSent is reset. See ProbeAcked() and IsPaused().
ProbeSent bool
// Inflights is a sliding window for the inflight messages.
// Each inflight message contains one or more log entries.
// The max number of entries per message is defined in raft config as MaxSizePerMsg.
// Thus inflight effectively limits both the number of inflight messages
// and the bandwidth each Progress can use.
// When inflights is Full, no more message should be sent.
// When a leader sends out a message, the index of the last
// entry should be added to inflights. The index MUST be added
// into inflights in order.
// When a leader receives a reply, the previous inflights should
// be freed by calling inflights.FreeLE with the index of the last
// received entry.
Inflights *Inflights
// IsLearner is true if this progress is tracked for a learner.
IsLearner bool
}
// ResetState moves the Progress into the specified State, resetting ProbeSent,
// PendingSnapshot, and Inflights.
func (pr *Progress) ResetState(state StateType) {
pr.ProbeSent = false
pr.PendingSnapshot = 0
pr.State = state
pr.Inflights.reset()
}
func max(a, b uint64) uint64 {
if a > b {
return a
}
return b
}
func min(a, b uint64) uint64 {
if a > b {
return b
}
return a
}
// ProbeAcked is called when this peer has accepted an append. It resets
// ProbeSent to signal that additional append messages should be sent without
// further delay.
func (pr *Progress) ProbeAcked() {
pr.ProbeSent = false
}
// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
// optionally and if larger, the index of the pending snapshot.
func (pr *Progress) BecomeProbe() {
// If the original state is StateSnapshot, progress knows that
// the pending snapshot has been sent to this peer successfully, then
// probes from pendingSnapshot + 1.
if pr.State == StateSnapshot {
pendingSnapshot := pr.PendingSnapshot
pr.ResetState(StateProbe)
pr.Next = max(pr.Match+1, pendingSnapshot+1)
} else {
pr.ResetState(StateProbe)
pr.Next = pr.Match + 1
}
}
// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
func (pr *Progress) BecomeReplicate() {
pr.ResetState(StateReplicate)
pr.Next = pr.Match + 1
}
// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
// snapshot index.
func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
pr.ResetState(StateSnapshot)
pr.PendingSnapshot = snapshoti
}
// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
// index acked by it. The method returns false if the given n index comes from
// an outdated message. Otherwise it updates the progress and returns true.
func (pr *Progress) MaybeUpdate(n uint64) bool {
var updated bool
if pr.Match < n {
pr.Match = n
updated = true
pr.ProbeAcked()
}
if pr.Next < n+1 {
pr.Next = n + 1
}
return updated
}
// OptimisticUpdate signals that appends all the way up to and including index n
// are in-flight. As a result, Next is increased to n+1.
func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }
// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
// arguments are the index the follower rejected to append to its log, and its
// last index.
//
// Rejections can happen spuriously as messages are sent out of order or
// duplicated. In such cases, the rejection pertains to an index that the
// Progress already knows were previously acknowledged, and false is returned
// without changing the Progress.
//
// If the rejection is genuine, Next is lowered sensibly, and the Progress is
// cleared for sending log entries.
func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool {
if pr.State == StateReplicate {
// The rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= pr.Match {
return false
}
// Directly decrease next to match + 1.
//
// TODO(tbg): why not use last if it's larger?
pr.Next = pr.Match + 1
return true
}
// The rejection must be stale if "rejected" does not match next - 1. This
// is because non-replicating followers are probed one entry at a time.
if pr.Next-1 != rejected {
return false
}
if pr.Next = min(rejected, last+1); pr.Next < 1 {
pr.Next = 1
}
pr.ProbeSent = false
return true
}
// IsPaused returns whether sending log entries to this node has been throttled.
// This is done when a node has rejected recent MsgApps, is currently waiting
// for a snapshot, or has reached the MaxInflightMsgs limit. In normal
// operation, this is false. A throttled node will be contacted less frequently
// until it has reached a state in which it's able to accept a steady stream of
// log entries again.
func (pr *Progress) IsPaused() bool {
switch pr.State {
case StateProbe:
return pr.ProbeSent
case StateReplicate:
return pr.Inflights.Full()
case StateSnapshot:
return true
default:
panic("unexpected state")
}
}
func (pr *Progress) String() string {
return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d, recentActive = %v, isLearner = %v",
pr.Next, pr.Match, pr.State, pr.IsPaused(), pr.PendingSnapshot, pr.RecentActive, pr.IsLearner)
}

View File

@ -0,0 +1,231 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tracker
import (
"testing"
)
func TestProgressIsPaused(t *testing.T) {
tests := []struct {
state StateType
paused bool
w bool
}{
{StateProbe, false, false},
{StateProbe, true, true},
{StateReplicate, false, false},
{StateReplicate, true, false},
{StateSnapshot, false, true},
{StateSnapshot, true, true},
}
for i, tt := range tests {
p := &Progress{
State: tt.state,
ProbeSent: tt.paused,
Inflights: NewInflights(256),
}
if g := p.IsPaused(); g != tt.w {
t.Errorf("#%d: paused= %t, want %t", i, g, tt.w)
}
}
}
// TestProgressResume ensures that MaybeUpdate and MaybeDecrTo will reset
// ProbeSent.
func TestProgressResume(t *testing.T) {
p := &Progress{
Next: 2,
ProbeSent: true,
}
p.MaybeDecrTo(1, 1)
if p.ProbeSent {
t.Errorf("paused= %v, want false", p.ProbeSent)
}
p.ProbeSent = true
p.MaybeUpdate(2)
if p.ProbeSent {
t.Errorf("paused= %v, want false", p.ProbeSent)
}
}
func TestProgressBecomeProbe(t *testing.T) {
match := uint64(1)
tests := []struct {
p *Progress
wnext uint64
}{
{
&Progress{State: StateReplicate, Match: match, Next: 5, Inflights: NewInflights(256)},
2,
},
{
// snapshot finish
&Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256)},
11,
},
{
// snapshot failure
&Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256)},
2,
},
}
for i, tt := range tests {
tt.p.BecomeProbe()
if tt.p.State != StateProbe {
t.Errorf("#%d: state = %s, want %s", i, tt.p.State, StateProbe)
}
if tt.p.Match != match {
t.Errorf("#%d: match = %d, want %d", i, tt.p.Match, match)
}
if tt.p.Next != tt.wnext {
t.Errorf("#%d: next = %d, want %d", i, tt.p.Next, tt.wnext)
}
}
}
func TestProgressBecomeReplicate(t *testing.T) {
p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)}
p.BecomeReplicate()
if p.State != StateReplicate {
t.Errorf("state = %s, want %s", p.State, StateReplicate)
}
if p.Match != 1 {
t.Errorf("match = %d, want 1", p.Match)
}
if w := p.Match + 1; p.Next != w {
t.Errorf("next = %d, want %d", p.Next, w)
}
}
func TestProgressBecomeSnapshot(t *testing.T) {
p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)}
p.BecomeSnapshot(10)
if p.State != StateSnapshot {
t.Errorf("state = %s, want %s", p.State, StateSnapshot)
}
if p.Match != 1 {
t.Errorf("match = %d, want 1", p.Match)
}
if p.PendingSnapshot != 10 {
t.Errorf("pendingSnapshot = %d, want 10", p.PendingSnapshot)
}
}
func TestProgressUpdate(t *testing.T) {
prevM, prevN := uint64(3), uint64(5)
tests := []struct {
update uint64
wm uint64
wn uint64
wok bool
}{
{prevM - 1, prevM, prevN, false}, // do not decrease match, next
{prevM, prevM, prevN, false}, // do not decrease next
{prevM + 1, prevM + 1, prevN, true}, // increase match, do not decrease next
{prevM + 2, prevM + 2, prevN + 1, true}, // increase match, next
}
for i, tt := range tests {
p := &Progress{
Match: prevM,
Next: prevN,
}
ok := p.MaybeUpdate(tt.update)
if ok != tt.wok {
t.Errorf("#%d: ok= %v, want %v", i, ok, tt.wok)
}
if p.Match != tt.wm {
t.Errorf("#%d: match= %d, want %d", i, p.Match, tt.wm)
}
if p.Next != tt.wn {
t.Errorf("#%d: next= %d, want %d", i, p.Next, tt.wn)
}
}
}
func TestProgressMaybeDecr(t *testing.T) {
tests := []struct {
state StateType
m uint64
n uint64
rejected uint64
last uint64
w bool
wn uint64
}{
{
// state replicate and rejected is not greater than match
StateReplicate, 5, 10, 5, 5, false, 10,
},
{
// state replicate and rejected is not greater than match
StateReplicate, 5, 10, 4, 4, false, 10,
},
{
// state replicate and rejected is greater than match
// directly decrease to match+1
StateReplicate, 5, 10, 9, 9, true, 6,
},
{
// next-1 != rejected is always false
StateProbe, 0, 0, 0, 0, false, 0,
},
{
// next-1 != rejected is always false
StateProbe, 0, 10, 5, 5, false, 10,
},
{
// next>1 = decremented by 1
StateProbe, 0, 10, 9, 9, true, 9,
},
{
// next>1 = decremented by 1
StateProbe, 0, 2, 1, 1, true, 1,
},
{
// next<=1 = reset to 1
StateProbe, 0, 1, 0, 0, true, 1,
},
{
// decrease to min(rejected, last+1)
StateProbe, 0, 10, 9, 2, true, 3,
},
{
// rejected < 1, reset to 1
StateProbe, 0, 10, 9, 0, true, 1,
},
}
for i, tt := range tests {
p := &Progress{
State: tt.state,
Match: tt.m,
Next: tt.n,
}
if g := p.MaybeDecrTo(tt.rejected, tt.last); g != tt.w {
t.Errorf("#%d: maybeDecrTo= %t, want %t", i, g, tt.w)
}
if gm := p.Match; gm != tt.m {
t.Errorf("#%d: match= %d, want %d", i, gm, tt.m)
}
if gn := p.Next; gn != tt.wn {
t.Errorf("#%d: next= %d, want %d", i, gn, tt.wn)
}
}
}

42
raft/tracker/state.go Normal file
View File

@ -0,0 +1,42 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tracker
// StateType is the state of a tracked follower.
type StateType uint64
const (
// StateProbe indicates a follower whose last index isn't known. Such a
// follower is "probed" (i.e. an append sent periodically) to narrow down
// its last index. In the ideal (and common) case, only one round of probing
// is necessary as the follower will react with a hint. Followers that are
// probed over extended periods of time are often offline.
StateProbe StateType = iota
// StateReplicate is the state steady in which a follower eagerly receives
// log entries to append to its log.
StateReplicate
// StateSnapshot indicates a follower that needs log entries not available
// from the leader's Raft log. Such a follower needs a full snapshot to
// return to StateReplicate.
StateSnapshot
)
var prstmap = [...]string{
"StateProbe",
"StateReplicate",
"StateSnapshot",
}
func (st StateType) String() string { return prstmap[uint64(st)] }

195
raft/tracker/tracker.go Normal file
View File

@ -0,0 +1,195 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tracker
import (
"fmt"
"sort"
"go.etcd.io/etcd/raft/quorum"
)
// ProgressTracker tracks the currently active configuration and the information
// known about the nodes and learners in it. In particular, it tracks the match
// index for each peer which in turn allows reasoning about the committed index.
type ProgressTracker struct {
Voters quorum.JointConfig
Learners map[uint64]struct{}
Progress map[uint64]*Progress
Votes map[uint64]bool
MaxInflight int
}
// MakeProgressTracker initializes a ProgressTracker.
func MakeProgressTracker(maxInflight int) ProgressTracker {
p := ProgressTracker{
MaxInflight: maxInflight,
Voters: quorum.JointConfig{
quorum.MajorityConfig{},
quorum.MajorityConfig{},
},
Learners: map[uint64]struct{}{},
Votes: map[uint64]bool{},
Progress: map[uint64]*Progress{},
}
return p
}
// IsSingleton returns true if (and only if) there is only one voting member
// (i.e. the leader) in the current configuration.
func (p *ProgressTracker) IsSingleton() bool {
return len(p.Voters[0]) == 1 && len(p.Voters[1]) == 0
}
type matchAckIndexer map[uint64]*Progress
var _ quorum.AckedIndexer = matchAckIndexer(nil)
// AckedIndex implements IndexLookuper.
func (l matchAckIndexer) AckedIndex(id uint64) (quorum.Index, bool) {
pr, ok := l[id]
if !ok {
return 0, false
}
return quorum.Index(pr.Match), true
}
// Committed returns the largest log index known to be committed based on what
// the voting members of the group have acknowledged.
func (p *ProgressTracker) Committed() uint64 {
return uint64(p.Voters.CommittedIndex(matchAckIndexer(p.Progress)))
}
// RemoveAny removes this peer, which *must* be tracked as a voter or learner,
// from the tracker.
func (p *ProgressTracker) RemoveAny(id uint64) {
_, okPR := p.Progress[id]
_, okV1 := p.Voters[0][id]
_, okV2 := p.Voters[1][id]
_, okL := p.Learners[id]
okV := okV1 || okV2
if !okPR {
panic("attempting to remove unknown peer %x")
} else if !okV && !okL {
panic("attempting to remove unknown peer %x")
} else if okV && okL {
panic(fmt.Sprintf("peer %x is both voter and learner", id))
}
delete(p.Voters[0], id)
delete(p.Voters[1], id)
delete(p.Learners, id)
delete(p.Progress, id)
}
// InitProgress initializes a new progress for the given node or learner. The
// node may not exist yet in either form or a panic will ensue.
func (p *ProgressTracker) InitProgress(id, match, next uint64, isLearner bool) {
if pr := p.Progress[id]; pr != nil {
panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
}
if !isLearner {
p.Voters[0][id] = struct{}{}
} else {
p.Learners[id] = struct{}{}
}
p.Progress[id] = &Progress{Next: next, Match: match, Inflights: NewInflights(p.MaxInflight), IsLearner: isLearner}
}
// Visit invokes the supplied closure for all tracked progresses.
func (p *ProgressTracker) Visit(f func(id uint64, pr *Progress)) {
for id, pr := range p.Progress {
f(id, pr)
}
}
// QuorumActive returns true if the quorum is active from the view of the local
// raft state machine. Otherwise, it returns false.
func (p *ProgressTracker) QuorumActive() bool {
votes := map[uint64]bool{}
p.Visit(func(id uint64, pr *Progress) {
if pr.IsLearner {
return
}
votes[id] = pr.RecentActive
})
return p.Voters.VoteResult(votes) == quorum.VoteWon
}
// VoterNodes returns a sorted slice of voters.
func (p *ProgressTracker) VoterNodes() []uint64 {
m := p.Voters.IDs()
nodes := make([]uint64, 0, len(m))
for id := range m {
nodes = append(nodes, id)
}
sort.Slice(nodes, func(i, j int) bool { return nodes[i] < nodes[j] })
return nodes
}
// LearnerNodes returns a sorted slice of learners.
func (p *ProgressTracker) LearnerNodes() []uint64 {
nodes := make([]uint64, 0, len(p.Learners))
for id := range p.Learners {
nodes = append(nodes, id)
}
sort.Slice(nodes, func(i, j int) bool { return nodes[i] < nodes[j] })
return nodes
}
// ResetVotes prepares for a new round of vote counting via recordVote.
func (p *ProgressTracker) ResetVotes() {
p.Votes = map[uint64]bool{}
}
// RecordVote records that the node with the given id voted for this Raft
// instance if v == true (and declined it otherwise).
func (p *ProgressTracker) RecordVote(id uint64, v bool) {
_, ok := p.Votes[id]
if !ok {
p.Votes[id] = v
}
}
// TallyVotes returns the number of granted and rejected Votes, and whether the
// election outcome is known.
func (p *ProgressTracker) TallyVotes() (granted int, rejected int, _ quorum.VoteResult) {
// Make sure to populate granted/rejected correctly even if the Votes slice
// contains members no longer part of the configuration. This doesn't really
// matter in the way the numbers are used (they're informational), but might
// as well get it right.
for id, pr := range p.Progress {
if pr.IsLearner {
continue
}
v, voted := p.Votes[id]
if !voted {
continue
}
if v {
granted++
} else {
rejected++
}
}
result := p.Voters.VoteResult(p.Votes)
return granted, rejected, result
}