From f9c2d00fb3d0a63cf3a7d8c124ad87cd5d826253 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Sat, 8 Jun 2019 19:41:24 +0200 Subject: [PATCH] raft: extract 'tracker' package Mechanically extract `progressTracker`, `Progress`, and `inflights` to their own package named `tracker`. Add lots of comments in the progress, and take the opportunity to rename and clarify various fields. --- etcdserver/raft.go | 1 - raft/node.go | 10 +- raft/progress.go | 457 ------------------ raft/raft.go | 143 +++--- raft/raft_flow_control_test.go | 40 +- raft/raft_paper_test.go | 2 +- raft/raft_snap_test.go | 63 +-- raft/raft_test.go | 376 ++++---------- raft/rawnode.go | 13 +- raft/rawnode_test.go | 5 +- raft/status.go | 13 +- raft/tracker/inflights.go | 124 +++++ .../inflights_test.go} | 54 +-- raft/tracker/progress.go | 215 ++++++++ raft/tracker/progress_test.go | 231 +++++++++ raft/tracker/state.go | 42 ++ raft/tracker/tracker.go | 195 ++++++++ 17 files changed, 1065 insertions(+), 919 deletions(-) delete mode 100644 raft/progress.go create mode 100644 raft/tracker/inflights.go rename raft/{progress_test.go => tracker/inflights_test.go} (84%) create mode 100644 raft/tracker/progress.go create mode 100644 raft/tracker/progress_test.go create mode 100644 raft/tracker/state.go create mode 100644 raft/tracker/tracker.go diff --git a/etcdserver/raft.go b/etcdserver/raft.go index c03d2a7b8..6f42c3504 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -34,7 +34,6 @@ import ( "go.etcd.io/etcd/raft/raftpb" "go.etcd.io/etcd/wal" "go.etcd.io/etcd/wal/walpb" - "go.uber.org/zap" ) diff --git a/raft/node.go b/raft/node.go index 396007df9..745553c4f 100644 --- a/raft/node.go +++ b/raft/node.go @@ -353,15 +353,15 @@ func (n *node) run(r *raft) { } case m := <-n.recvc: // filter out response message from unknown From. - if pr := r.prs.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { + if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) { r.Step(m) } case cc := <-n.confc: if cc.NodeID == None { select { case n.confstatec <- pb.ConfState{ - Nodes: r.prs.voterNodes(), - Learners: r.prs.learnerNodes()}: + Nodes: r.prs.VoterNodes(), + Learners: r.prs.LearnerNodes()}: case <-n.done: } break @@ -384,8 +384,8 @@ func (n *node) run(r *raft) { } select { case n.confstatec <- pb.ConfState{ - Nodes: r.prs.voterNodes(), - Learners: r.prs.learnerNodes()}: + Nodes: r.prs.VoterNodes(), + Learners: r.prs.LearnerNodes()}: case <-n.done: } case <-n.tickc: diff --git a/raft/progress.go b/raft/progress.go deleted file mode 100644 index 376635624..000000000 --- a/raft/progress.go +++ /dev/null @@ -1,457 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package raft - -import ( - "fmt" - "sort" - - "go.etcd.io/etcd/raft/quorum" -) - -const ( - ProgressStateProbe ProgressStateType = iota - ProgressStateReplicate - ProgressStateSnapshot -) - -type ProgressStateType uint64 - -var prstmap = [...]string{ - "ProgressStateProbe", - "ProgressStateReplicate", - "ProgressStateSnapshot", -} - -func (st ProgressStateType) String() string { return prstmap[uint64(st)] } - -// Progress represents a follower’s progress in the view of the leader. Leader maintains -// progresses of all followers, and sends entries to the follower based on its progress. -type Progress struct { - Match, Next uint64 - // State defines how the leader should interact with the follower. - // - // When in ProgressStateProbe, leader sends at most one replication message - // per heartbeat interval. It also probes actual progress of the follower. - // - // When in ProgressStateReplicate, leader optimistically increases next - // to the latest entry sent after sending replication message. This is - // an optimized state for fast replicating log entries to the follower. - // - // When in ProgressStateSnapshot, leader should have sent out snapshot - // before and stops sending any replication message. - State ProgressStateType - - // Paused is used in ProgressStateProbe. - // When Paused is true, raft should pause sending replication message to this peer. - Paused bool - // PendingSnapshot is used in ProgressStateSnapshot. - // If there is a pending snapshot, the pendingSnapshot will be set to the - // index of the snapshot. If pendingSnapshot is set, the replication process of - // this Progress will be paused. raft will not resend snapshot until the pending one - // is reported to be failed. - PendingSnapshot uint64 - - // RecentActive is true if the progress is recently active. Receiving any messages - // from the corresponding follower indicates the progress is active. - // RecentActive can be reset to false after an election timeout. - RecentActive bool - - // inflights is a sliding window for the inflight messages. - // Each inflight message contains one or more log entries. - // The max number of entries per message is defined in raft config as MaxSizePerMsg. - // Thus inflight effectively limits both the number of inflight messages - // and the bandwidth each Progress can use. - // When inflights is full, no more message should be sent. - // When a leader sends out a message, the index of the last - // entry should be added to inflights. The index MUST be added - // into inflights in order. - // When a leader receives a reply, the previous inflights should - // be freed by calling inflights.freeTo with the index of the last - // received entry. - ins *inflights - - // IsLearner is true if this progress is tracked for a learner. - IsLearner bool -} - -func (pr *Progress) resetState(state ProgressStateType) { - pr.Paused = false - pr.PendingSnapshot = 0 - pr.State = state - pr.ins.reset() -} - -func (pr *Progress) becomeProbe() { - // If the original state is ProgressStateSnapshot, progress knows that - // the pending snapshot has been sent to this peer successfully, then - // probes from pendingSnapshot + 1. - if pr.State == ProgressStateSnapshot { - pendingSnapshot := pr.PendingSnapshot - pr.resetState(ProgressStateProbe) - pr.Next = max(pr.Match+1, pendingSnapshot+1) - } else { - pr.resetState(ProgressStateProbe) - pr.Next = pr.Match + 1 - } -} - -func (pr *Progress) becomeReplicate() { - pr.resetState(ProgressStateReplicate) - pr.Next = pr.Match + 1 -} - -func (pr *Progress) becomeSnapshot(snapshoti uint64) { - pr.resetState(ProgressStateSnapshot) - pr.PendingSnapshot = snapshoti -} - -// maybeUpdate returns false if the given n index comes from an outdated message. -// Otherwise it updates the progress and returns true. -func (pr *Progress) maybeUpdate(n uint64) bool { - var updated bool - if pr.Match < n { - pr.Match = n - updated = true - pr.resume() - } - if pr.Next < n+1 { - pr.Next = n + 1 - } - return updated -} - -func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 } - -// maybeDecrTo returns false if the given to index comes from an out of order message. -// Otherwise it decreases the progress next index to min(rejected, last) and returns true. -func (pr *Progress) maybeDecrTo(rejected, last uint64) bool { - if pr.State == ProgressStateReplicate { - // the rejection must be stale if the progress has matched and "rejected" - // is smaller than "match". - if rejected <= pr.Match { - return false - } - // directly decrease next to match + 1 - pr.Next = pr.Match + 1 - return true - } - - // the rejection must be stale if "rejected" does not match next - 1 - if pr.Next-1 != rejected { - return false - } - - if pr.Next = min(rejected, last+1); pr.Next < 1 { - pr.Next = 1 - } - pr.resume() - return true -} - -func (pr *Progress) pause() { pr.Paused = true } -func (pr *Progress) resume() { pr.Paused = false } - -// IsPaused returns whether sending log entries to this node has been -// paused. A node may be paused because it has rejected recent -// MsgApps, is currently waiting for a snapshot, or has reached the -// MaxInflightMsgs limit. -func (pr *Progress) IsPaused() bool { - switch pr.State { - case ProgressStateProbe: - return pr.Paused - case ProgressStateReplicate: - return pr.ins.full() - case ProgressStateSnapshot: - return true - default: - panic("unexpected state") - } -} - -func (pr *Progress) snapshotFailure() { pr.PendingSnapshot = 0 } - -// needSnapshotAbort returns true if snapshot progress's Match -// is equal or higher than the pendingSnapshot. -func (pr *Progress) needSnapshotAbort() bool { - return pr.State == ProgressStateSnapshot && pr.Match >= pr.PendingSnapshot -} - -func (pr *Progress) String() string { - return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d, recentActive = %v, isLearner = %v", - pr.Next, pr.Match, pr.State, pr.IsPaused(), pr.PendingSnapshot, pr.RecentActive, pr.IsLearner) -} - -type inflights struct { - // the starting index in the buffer - start int - // number of inflights in the buffer - count int - - // the size of the buffer - size int - - // buffer contains the index of the last entry - // inside one message. - buffer []uint64 -} - -func newInflights(size int) *inflights { - return &inflights{ - size: size, - } -} - -// add adds an inflight into inflights -func (in *inflights) add(inflight uint64) { - if in.full() { - panic("cannot add into a full inflights") - } - next := in.start + in.count - size := in.size - if next >= size { - next -= size - } - if next >= len(in.buffer) { - in.growBuf() - } - in.buffer[next] = inflight - in.count++ -} - -// grow the inflight buffer by doubling up to inflights.size. We grow on demand -// instead of preallocating to inflights.size to handle systems which have -// thousands of Raft groups per process. -func (in *inflights) growBuf() { - newSize := len(in.buffer) * 2 - if newSize == 0 { - newSize = 1 - } else if newSize > in.size { - newSize = in.size - } - newBuffer := make([]uint64, newSize) - copy(newBuffer, in.buffer) - in.buffer = newBuffer -} - -// freeTo frees the inflights smaller or equal to the given `to` flight. -func (in *inflights) freeTo(to uint64) { - if in.count == 0 || to < in.buffer[in.start] { - // out of the left side of the window - return - } - - idx := in.start - var i int - for i = 0; i < in.count; i++ { - if to < in.buffer[idx] { // found the first large inflight - break - } - - // increase index and maybe rotate - size := in.size - if idx++; idx >= size { - idx -= size - } - } - // free i inflights and set new start index - in.count -= i - in.start = idx - if in.count == 0 { - // inflights is empty, reset the start index so that we don't grow the - // buffer unnecessarily. - in.start = 0 - } -} - -func (in *inflights) freeFirstOne() { in.freeTo(in.buffer[in.start]) } - -// full returns true if the inflights is full. -func (in *inflights) full() bool { - return in.count == in.size -} - -// resets frees all inflights. -func (in *inflights) reset() { - in.count = 0 - in.start = 0 -} - -// progressTracker tracks the currently active configuration and the information -// known about the nodes and learners in it. In particular, it tracks the match -// index for each peer which in turn allows reasoning about the committed index. -type progressTracker struct { - voters quorum.JointConfig - learners map[uint64]struct{} - prs map[uint64]*Progress - - votes map[uint64]bool - - maxInflight int -} - -func makeProgressTracker(maxInflight int) progressTracker { - p := progressTracker{ - maxInflight: maxInflight, - voters: quorum.JointConfig{ - quorum.MajorityConfig{}, - quorum.MajorityConfig{}, - }, - learners: map[uint64]struct{}{}, - votes: map[uint64]bool{}, - prs: map[uint64]*Progress{}, - } - return p -} - -// isSingleton returns true if (and only if) there is only one voting member -// (i.e. the leader) in the current configuration. -func (p *progressTracker) isSingleton() bool { - return len(p.voters[0]) == 1 && len(p.voters[1]) == 0 -} - -type progressAckIndexer map[uint64]*Progress - -var _ quorum.AckedIndexer = progressAckIndexer(nil) - -func (l progressAckIndexer) AckedIndex(id uint64) (quorum.Index, bool) { - pr, ok := l[id] - if !ok { - return 0, false - } - return quorum.Index(pr.Match), true -} - -// committed returns the largest log index known to be committed based on what -// the voting members of the group have acknowledged. -func (p *progressTracker) committed() uint64 { - return uint64(p.voters.CommittedIndex(progressAckIndexer(p.prs))) -} - -func (p *progressTracker) removeAny(id uint64) { - _, okPR := p.prs[id] - _, okV1 := p.voters[0][id] - _, okV2 := p.voters[1][id] - _, okL := p.learners[id] - - okV := okV1 || okV2 - - if !okPR { - panic("attempting to remove unknown peer %x") - } else if !okV && !okL { - panic("attempting to remove unknown peer %x") - } else if okV && okL { - panic(fmt.Sprintf("peer %x is both voter and learner", id)) - } - - delete(p.voters[0], id) - delete(p.voters[1], id) - delete(p.learners, id) - delete(p.prs, id) -} - -// initProgress initializes a new progress for the given node or learner. The -// node may not exist yet in either form or a panic will ensue. -func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) { - if pr := p.prs[id]; pr != nil { - panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr)) - } - if !isLearner { - p.voters[0][id] = struct{}{} - } else { - p.learners[id] = struct{}{} - } - p.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: isLearner} -} - -func (p *progressTracker) getProgress(id uint64) *Progress { - return p.prs[id] -} - -// visit invokes the supplied closure for all tracked progresses. -func (p *progressTracker) visit(f func(id uint64, pr *Progress)) { - for id, pr := range p.prs { - f(id, pr) - } -} - -// checkQuorumActive returns true if the quorum is active from -// the view of the local raft state machine. Otherwise, it returns -// false. -func (p *progressTracker) quorumActive() bool { - votes := map[uint64]bool{} - p.visit(func(id uint64, pr *Progress) { - if pr.IsLearner { - return - } - votes[id] = pr.RecentActive - }) - - return p.voters.VoteResult(votes) == quorum.VoteWon -} - -func (p *progressTracker) voterNodes() []uint64 { - m := p.voters.IDs() - nodes := make([]uint64, 0, len(m)) - for id := range m { - nodes = append(nodes, id) - } - sort.Sort(uint64Slice(nodes)) - return nodes -} - -func (p *progressTracker) learnerNodes() []uint64 { - nodes := make([]uint64, 0, len(p.learners)) - for id := range p.learners { - nodes = append(nodes, id) - } - sort.Sort(uint64Slice(nodes)) - return nodes -} - -// resetVotes prepares for a new round of vote counting via recordVote. -func (p *progressTracker) resetVotes() { - p.votes = map[uint64]bool{} -} - -// recordVote records that the node with the given id voted for this Raft -// instance if v == true (and declined it otherwise). -func (p *progressTracker) recordVote(id uint64, v bool) { - _, ok := p.votes[id] - if !ok { - p.votes[id] = v - } -} - -// tallyVotes returns the number of granted and rejected votes, and whether the -// election outcome is known. -func (p *progressTracker) tallyVotes() (granted int, rejected int, _ quorum.VoteResult) { - // Make sure to populate granted/rejected correctly even if the votes slice - // contains members no longer part of the configuration. This doesn't really - // matter in the way the numbers are used (they're informational), but might - // as well get it right. - for id, pr := range p.prs { - if pr.IsLearner { - continue - } - if p.votes[id] { - granted++ - } else { - rejected++ - } - } - result := p.voters.VoteResult(p.votes) - return granted, rejected, result -} diff --git a/raft/raft.go b/raft/raft.go index 06ba6bf12..3cdc1f0ac 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -26,6 +26,7 @@ import ( "go.etcd.io/etcd/raft/quorum" pb "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/raft/tracker" ) // None is a placeholder node ID used when there is no leader. @@ -261,7 +262,7 @@ type raft struct { maxMsgSize uint64 maxUncommittedSize uint64 - prs progressTracker + prs tracker.ProgressTracker state StateType @@ -344,7 +345,7 @@ func newRaft(c *Config) *raft { raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxUncommittedSize: c.MaxUncommittedEntriesSize, - prs: makeProgressTracker(c.MaxInflightMsgs), + prs: tracker.MakeProgressTracker(c.MaxInflightMsgs), electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, @@ -355,11 +356,11 @@ func newRaft(c *Config) *raft { } for _, p := range peers { // Add node to active config. - r.prs.initProgress(p, 0 /* match */, 1 /* next */, false /* isLearner */) + r.prs.InitProgress(p, 0 /* match */, 1 /* next */, false /* isLearner */) } for _, p := range learners { // Add learner to active config. - r.prs.initProgress(p, 0 /* match */, 1 /* next */, true /* isLearner */) + r.prs.InitProgress(p, 0 /* match */, 1 /* next */, true /* isLearner */) if r.id == p { r.isLearner = true @@ -375,7 +376,7 @@ func newRaft(c *Config) *raft { r.becomeFollower(r.Term, None) var nodesStrs []string - for _, n := range r.prs.voterNodes() { + for _, n := range r.prs.VoterNodes() { nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n)) } @@ -442,7 +443,7 @@ func (r *raft) sendAppend(to uint64) { // ("empty" messages are useful to convey updated Commit indexes, but // are undesirable when we're sending multiple messages in a batch). func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { - pr := r.prs.getProgress(to) + pr := r.prs.Progress[to] if pr.IsPaused() { return false } @@ -477,7 +478,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) - pr.becomeSnapshot(sindex) + pr.BecomeSnapshot(sindex) r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) } else { m.Type = pb.MsgApp @@ -487,13 +488,13 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { m.Commit = r.raftLog.committed if n := len(m.Entries); n != 0 { switch pr.State { - // optimistically increase the next when in ProgressStateReplicate - case ProgressStateReplicate: + // optimistically increase the next when in StateReplicate + case tracker.StateReplicate: last := m.Entries[n-1].Index - pr.optimisticUpdate(last) - pr.ins.add(last) - case ProgressStateProbe: - pr.pause() + pr.OptimisticUpdate(last) + pr.Inflights.Add(last) + case tracker.StateProbe: + pr.ProbeSent = true default: r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State) } @@ -511,7 +512,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // or it might not have all the committed entries. // The leader MUST NOT forward the follower's commit to // an unmatched index. - commit := min(r.prs.getProgress(to).Match, r.raftLog.committed) + commit := min(r.prs.Progress[to].Match, r.raftLog.committed) m := pb.Message{ To: to, Type: pb.MsgHeartbeat, @@ -525,7 +526,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // bcastAppend sends RPC, with entries to all peers that are not up-to-date // according to the progress recorded in r.prs. func (r *raft) bcastAppend() { - r.prs.visit(func(id uint64, _ *Progress) { + r.prs.Visit(func(id uint64, _ *tracker.Progress) { if id == r.id { return } @@ -545,7 +546,7 @@ func (r *raft) bcastHeartbeat() { } func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { - r.prs.visit(func(id uint64, _ *Progress) { + r.prs.Visit(func(id uint64, _ *tracker.Progress) { if id == r.id { return } @@ -557,7 +558,7 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { // the commit index changed (in which case the caller should call // r.bcastAppend). func (r *raft) maybeCommit() bool { - mci := r.prs.committed() + mci := r.prs.Committed() return r.raftLog.maybeCommit(mci, r.Term) } @@ -574,12 +575,12 @@ func (r *raft) reset(term uint64) { r.abortLeaderTransfer() - r.prs.resetVotes() - r.prs.visit(func(id uint64, pr *Progress) { - *pr = Progress{ + r.prs.ResetVotes() + r.prs.Visit(func(id uint64, pr *tracker.Progress) { + *pr = tracker.Progress{ Match: 0, Next: r.raftLog.lastIndex() + 1, - ins: newInflights(r.prs.maxInflight), + Inflights: tracker.NewInflights(r.prs.MaxInflight), IsLearner: pr.IsLearner, } if id == r.id { @@ -609,7 +610,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { } // use latest "last" index after truncate/append li = r.raftLog.append(es...) - r.prs.getProgress(r.id).maybeUpdate(li) + r.prs.Progress[r.id].MaybeUpdate(li) // Regardless of maybeCommit's return, our caller will call bcastAppend. r.maybeCommit() return true @@ -682,7 +683,7 @@ func (r *raft) becomePreCandidate() { // but doesn't change anything else. In particular it does not increase // r.Term or change r.Vote. r.step = stepCandidate - r.prs.resetVotes() + r.prs.ResetVotes() r.tick = r.tickElection r.lead = None r.state = StatePreCandidate @@ -703,7 +704,7 @@ func (r *raft) becomeLeader() { // (perhaps after having received a snapshot as a result). The leader is // trivially in this state. Note that r.reset() has initialized this // progress with the last index already. - r.prs.getProgress(r.id).becomeReplicate() + r.prs.Progress[r.id].BecomeReplicate() // Conservatively set the pendingConfIndex to the last index in the // log. There may or may not be a pending config change, but it's @@ -755,7 +756,7 @@ func (r *raft) campaign(t CampaignType) { } return } - for id := range r.prs.voters.IDs() { + for id := range r.prs.Voters.IDs() { if id == r.id { continue } @@ -776,8 +777,8 @@ func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected } else { r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term) } - r.prs.recordVote(id, v) - return r.prs.tallyVotes() + r.prs.RecordVote(id, v) + return r.prs.TallyVotes() } func (r *raft) Step(m pb.Message) error { @@ -943,16 +944,16 @@ func stepLeader(r *raft, m pb.Message) error { // // TODO(tbg): I added a TODO in removeNode, it doesn't seem that the // leader steps down when removing itself. I might be missing something. - if pr := r.prs.getProgress(r.id); pr != nil { + if pr := r.prs.Progress[r.id]; pr != nil { pr.RecentActive = true } - if !r.prs.quorumActive() { + if !r.prs.QuorumActive() { r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id) r.becomeFollower(r.Term, None) } // Mark everyone (but ourselves) as inactive in preparation for the next // CheckQuorum. - r.prs.visit(func(id uint64, pr *Progress) { + r.prs.Visit(func(id uint64, pr *tracker.Progress) { if id != r.id { pr.RecentActive = false } @@ -962,7 +963,7 @@ func stepLeader(r *raft, m pb.Message) error { if len(m.Entries) == 0 { r.logger.Panicf("%x stepped empty MsgProp", r.id) } - if r.prs.getProgress(r.id) == nil { + if r.prs.Progress[r.id] == nil { // If we are not currently a member of the range (i.e. this node // was removed from the configuration while serving as leader), // drop any new proposals. @@ -994,7 +995,7 @@ func stepLeader(r *raft, m pb.Message) error { case pb.MsgReadIndex: // If more than the local vote is needed, go through a full broadcast, // otherwise optimize. - if !r.prs.isSingleton() { + if !r.prs.IsSingleton() { if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { // Reject read only request when this leader has not committed any log entry at its term. return nil @@ -1029,7 +1030,7 @@ func stepLeader(r *raft, m pb.Message) error { } // All other message types require a progress for m.From (pr). - pr := r.prs.getProgress(m.From) + pr := r.prs.Progress[m.From] if pr == nil { r.logger.Debugf("%x no progress available for %x", r.id, m.From) return nil @@ -1041,30 +1042,30 @@ func stepLeader(r *raft, m pb.Message) error { if m.Reject { r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d", r.id, m.RejectHint, m.From, m.Index) - if pr.maybeDecrTo(m.Index, m.RejectHint) { + if pr.MaybeDecrTo(m.Index, m.RejectHint) { r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr) - if pr.State == ProgressStateReplicate { - pr.becomeProbe() + if pr.State == tracker.StateReplicate { + pr.BecomeProbe() } r.sendAppend(m.From) } } else { oldPaused := pr.IsPaused() - if pr.maybeUpdate(m.Index) { + if pr.MaybeUpdate(m.Index) { switch { - case pr.State == ProgressStateProbe: - pr.becomeReplicate() - case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort(): - r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr) + case pr.State == tracker.StateProbe: + pr.BecomeReplicate() + case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot: + r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr) // Transition back to replicating state via probing state // (which takes the snapshot into account). If we didn't // move to replicating state, that would only happen with // the next round of appends (but there may not be a next // round for a while, exposing an inconsistent RaftStatus). - pr.becomeProbe() - pr.becomeReplicate() - case pr.State == ProgressStateReplicate: - pr.ins.freeTo(m.Index) + pr.BecomeProbe() + pr.BecomeReplicate() + case pr.State == tracker.StateReplicate: + pr.Inflights.FreeLE(m.Index) } if r.maybeCommit() { @@ -1091,11 +1092,11 @@ func stepLeader(r *raft, m pb.Message) error { } case pb.MsgHeartbeatResp: pr.RecentActive = true - pr.resume() + pr.ProbeSent = false // free one slot for the full inflights window to allow progress. - if pr.State == ProgressStateReplicate && pr.ins.full() { - pr.ins.freeFirstOne() + if pr.State == tracker.StateReplicate && pr.Inflights.Full() { + pr.Inflights.FreeFirstOne() } if pr.Match < r.raftLog.lastIndex() { r.sendAppend(m.From) @@ -1105,7 +1106,7 @@ func stepLeader(r *raft, m pb.Message) error { return nil } - if r.prs.voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon { + if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon { return nil } @@ -1119,26 +1120,32 @@ func stepLeader(r *raft, m pb.Message) error { } } case pb.MsgSnapStatus: - if pr.State != ProgressStateSnapshot { + if pr.State != tracker.StateSnapshot { return nil } + // TODO(tbg): this code is very similar to the snapshot handling in + // MsgAppResp above. In fact, the code there is more correct than the + // code here and should likely be updated to match (or even better, the + // logic pulled into a newly created Progress state machine handler). if !m.Reject { - pr.becomeProbe() + pr.BecomeProbe() r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr) } else { - pr.snapshotFailure() - pr.becomeProbe() + // NB: the order here matters or we'll be probing erroneously from + // the snapshot index, but the snapshot never applied. + pr.PendingSnapshot = 0 + pr.BecomeProbe() r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr) } // If snapshot finish, wait for the msgAppResp from the remote node before sending // out the next msgApp. // If snapshot failure, wait for a heartbeat interval before next try - pr.pause() + pr.ProbeSent = true case pb.MsgUnreachable: // During optimistic replication, if the remote becomes unreachable, // there is huge probability that a MsgApp is lost. - if pr.State == ProgressStateReplicate { - pr.becomeProbe() + if pr.State == tracker.StateReplicate { + pr.BecomeProbe() } r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr) case pb.MsgTransferLeader: @@ -1341,7 +1348,7 @@ func (r *raft) restore(s pb.Snapshot) bool { r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) r.raftLog.restore(s) - r.prs = makeProgressTracker(r.prs.maxInflight) + r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) r.restoreNode(s.Metadata.ConfState.Nodes, false) r.restoreNode(s.Metadata.ConfState.Learners, true) return true @@ -1354,15 +1361,15 @@ func (r *raft) restoreNode(nodes []uint64, isLearner bool) { match = next - 1 r.isLearner = isLearner } - r.prs.initProgress(n, match, next, isLearner) - r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.getProgress(n)) + r.prs.InitProgress(n, match, next, isLearner) + r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.Progress[n]) } } // promotable indicates whether state machine can be promoted to leader, // which is true when its own id is in progress list. func (r *raft) promotable() bool { - pr := r.prs.getProgress(r.id) + pr := r.prs.Progress[r.id] return pr != nil && !pr.IsLearner } @@ -1375,9 +1382,9 @@ func (r *raft) addLearner(id uint64) { } func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { - pr := r.prs.getProgress(id) + pr := r.prs.Progress[id] if pr == nil { - r.prs.initProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) + r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) } else { if isLearner && !pr.IsLearner { // Can only change Learner to Voter. @@ -1392,10 +1399,10 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { } // Change Learner to Voter, use origin Learner progress. - r.prs.removeAny(id) - r.prs.initProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */) + r.prs.RemoveAny(id) + r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */) pr.IsLearner = false - *r.prs.getProgress(id) = *pr + *r.prs.Progress[id] = *pr } if r.id == id { @@ -1405,14 +1412,14 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { // When a node is first added, we should mark it as recently active. // Otherwise, CheckQuorum may cause us to step down if it is invoked // before the added node has a chance to communicate with us. - r.prs.getProgress(id).RecentActive = true + r.prs.Progress[id].RecentActive = true } func (r *raft) removeNode(id uint64) { - r.prs.removeAny(id) + r.prs.RemoveAny(id) // Do not try to commit or abort transferring if the cluster is now empty. - if len(r.prs.voters[0]) == 0 && len(r.prs.learners) == 0 { + if len(r.prs.Voters[0]) == 0 && len(r.prs.Learners) == 0 { return } diff --git a/raft/raft_flow_control_test.go b/raft/raft_flow_control_test.go index 033e33692..dbfbac7db 100644 --- a/raft/raft_flow_control_test.go +++ b/raft/raft_flow_control_test.go @@ -29,11 +29,11 @@ func TestMsgAppFlowControlFull(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs.prs[2] + pr2 := r.prs.Progress[2] // force the progress to be in replicate state - pr2.becomeReplicate() + pr2.BecomeReplicate() // fill in the inflights window - for i := 0; i < r.prs.maxInflight; i++ { + for i := 0; i < r.prs.MaxInflight; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) ms := r.readMessages() if len(ms) != 1 { @@ -42,8 +42,8 @@ func TestMsgAppFlowControlFull(t *testing.T) { } // ensure 1 - if !pr2.ins.full() { - t.Fatalf("inflights.full = %t, want %t", pr2.ins.full(), true) + if !pr2.Inflights.Full() { + t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true) } // ensure 2 @@ -65,18 +65,18 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs.prs[2] + pr2 := r.prs.Progress[2] // force the progress to be in replicate state - pr2.becomeReplicate() + pr2.BecomeReplicate() // fill in the inflights window - for i := 0; i < r.prs.maxInflight; i++ { + for i := 0; i < r.prs.MaxInflight; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) r.readMessages() } // 1 is noop, 2 is the first proposal we just sent. // so we start with 2. - for tt := 2; tt < r.prs.maxInflight; tt++ { + for tt := 2; tt < r.prs.MaxInflight; tt++ { // move forward the window r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(tt)}) r.readMessages() @@ -89,15 +89,15 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { } // ensure 1 - if !pr2.ins.full() { - t.Fatalf("inflights.full = %t, want %t", pr2.ins.full(), true) + if !pr2.Inflights.Full() { + t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true) } // ensure 2 for i := 0; i < tt; i++ { r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(i)}) - if !pr2.ins.full() { - t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.ins.full(), true) + if !pr2.Inflights.Full() { + t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true) } } } @@ -110,26 +110,26 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs.prs[2] + pr2 := r.prs.Progress[2] // force the progress to be in replicate state - pr2.becomeReplicate() + pr2.BecomeReplicate() // fill in the inflights window - for i := 0; i < r.prs.maxInflight; i++ { + for i := 0; i < r.prs.MaxInflight; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) r.readMessages() } for tt := 1; tt < 5; tt++ { - if !pr2.ins.full() { - t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.ins.full(), true) + if !pr2.Inflights.Full() { + t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true) } // recv tt msgHeartbeatResp and expect one free slot for i := 0; i < tt; i++ { r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) r.readMessages() - if pr2.ins.full() { - t.Fatalf("#%d.%d: inflights.full = %t, want %t", tt, i, pr2.ins.full(), false) + if pr2.Inflights.Full() { + t.Fatalf("#%d.%d: inflights.full = %t, want %t", tt, i, pr2.Inflights.Full(), false) } } diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index cf7b192f4..0fa8de1ed 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -169,7 +169,7 @@ func testNonleaderStartElection(t *testing.T, state StateType) { if r.state != StateCandidate { t.Errorf("state = %s, want %s", r.state, StateCandidate) } - if !r.prs.votes[r.id] { + if !r.prs.Votes[r.id] { t.Errorf("vote for self = false, want true") } msgs := r.readMessages() diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index 246ed07e2..de72c4c4d 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -18,6 +18,7 @@ import ( "testing" pb "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/raft/tracker" ) var ( @@ -40,11 +41,11 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { // force set the next of node 2, so that // node 2 needs a snapshot - sm.prs.prs[2].Next = sm.raftLog.firstIndex() + sm.prs.Progress[2].Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true}) - if sm.prs.prs[2].PendingSnapshot != 11 { - t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.prs[2].PendingSnapshot) + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.Progress[2].Next - 1, Reject: true}) + if sm.prs.Progress[2].PendingSnapshot != 11 { + t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.Progress[2].PendingSnapshot) } } @@ -56,7 +57,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.prs[2].becomeSnapshot(11) + sm.prs.Progress[2].BecomeSnapshot(11) sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) msgs := sm.readMessages() @@ -73,18 +74,18 @@ func TestSnapshotFailure(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.prs[2].Next = 1 - sm.prs.prs[2].becomeSnapshot(11) + sm.prs.Progress[2].Next = 1 + sm.prs.Progress[2].BecomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true}) - if sm.prs.prs[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot) + if sm.prs.Progress[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot) } - if sm.prs.prs[2].Next != 1 { - t.Fatalf("Next = %d, want 1", sm.prs.prs[2].Next) + if sm.prs.Progress[2].Next != 1 { + t.Fatalf("Next = %d, want 1", sm.prs.Progress[2].Next) } - if !sm.prs.prs[2].Paused { - t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused) + if !sm.prs.Progress[2].ProbeSent { + t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent) } } @@ -96,18 +97,18 @@ func TestSnapshotSucceed(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.prs[2].Next = 1 - sm.prs.prs[2].becomeSnapshot(11) + sm.prs.Progress[2].Next = 1 + sm.prs.Progress[2].BecomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false}) - if sm.prs.prs[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot) + if sm.prs.Progress[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot) } - if sm.prs.prs[2].Next != 12 { - t.Fatalf("Next = %d, want 12", sm.prs.prs[2].Next) + if sm.prs.Progress[2].Next != 12 { + t.Fatalf("Next = %d, want 12", sm.prs.Progress[2].Next) } - if !sm.prs.prs[2].Paused { - t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused) + if !sm.prs.Progress[2].ProbeSent { + t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent) } } @@ -206,8 +207,8 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { mustSend(n2, n1, pb.MsgAppResp) // Leader has correct state for follower. - pr := n1.prs.prs[2] - if pr.State != ProgressStateReplicate { + pr := n1.prs.Progress[2] + if pr.State != tracker.StateReplicate { t.Fatalf("unexpected state %v", pr) } if pr.Match != expIdx || pr.Next != expIdx+1 { @@ -227,23 +228,23 @@ func TestSnapshotAbort(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs.prs[2].Next = 1 - sm.prs.prs[2].becomeSnapshot(11) + sm.prs.Progress[2].Next = 1 + sm.prs.Progress[2].BecomeSnapshot(11) // A successful msgAppResp that has a higher/equal index than the // pending snapshot should abort the pending snapshot. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11}) - if sm.prs.prs[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot) + if sm.prs.Progress[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot) } - // The follower entered ProgressStateReplicate and the leader send an append + // The follower entered StateReplicate and the leader send an append // and optimistically updated the progress (so we see 13 instead of 12). // There is something to append because the leader appended an empty entry // to the log at index 12 when it assumed leadership. - if sm.prs.prs[2].Next != 13 { - t.Fatalf("Next = %d, want 13", sm.prs.prs[2].Next) + if sm.prs.Progress[2].Next != 13 { + t.Fatalf("Next = %d, want 13", sm.prs.Progress[2].Next) } - if n := sm.prs.prs[2].ins.count; n != 1 { + if n := sm.prs.Progress[2].Inflights.Count(); n != 1 { t.Fatalf("expected an inflight message, got %d", n) } } diff --git a/raft/raft_test.go b/raft/raft_test.go index 40be17cfc..805e6071f 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -24,6 +24,7 @@ import ( "testing" pb "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/raft/tracker" ) // nextEnts returns the appliable entries and updates the applied index @@ -55,228 +56,16 @@ func (r *raft) readMessages() []pb.Message { return msgs } -func TestProgressBecomeProbe(t *testing.T) { - match := uint64(1) - tests := []struct { - p *Progress - wnext uint64 - }{ - { - &Progress{State: ProgressStateReplicate, Match: match, Next: 5, ins: newInflights(256)}, - 2, - }, - { - // snapshot finish - &Progress{State: ProgressStateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, ins: newInflights(256)}, - 11, - }, - { - // snapshot failure - &Progress{State: ProgressStateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, ins: newInflights(256)}, - 2, - }, - } - for i, tt := range tests { - tt.p.becomeProbe() - if tt.p.State != ProgressStateProbe { - t.Errorf("#%d: state = %s, want %s", i, tt.p.State, ProgressStateProbe) - } - if tt.p.Match != match { - t.Errorf("#%d: match = %d, want %d", i, tt.p.Match, match) - } - if tt.p.Next != tt.wnext { - t.Errorf("#%d: next = %d, want %d", i, tt.p.Next, tt.wnext) - } - } -} - -func TestProgressBecomeReplicate(t *testing.T) { - p := &Progress{State: ProgressStateProbe, Match: 1, Next: 5, ins: newInflights(256)} - p.becomeReplicate() - - if p.State != ProgressStateReplicate { - t.Errorf("state = %s, want %s", p.State, ProgressStateReplicate) - } - if p.Match != 1 { - t.Errorf("match = %d, want 1", p.Match) - } - if w := p.Match + 1; p.Next != w { - t.Errorf("next = %d, want %d", p.Next, w) - } -} - -func TestProgressBecomeSnapshot(t *testing.T) { - p := &Progress{State: ProgressStateProbe, Match: 1, Next: 5, ins: newInflights(256)} - p.becomeSnapshot(10) - - if p.State != ProgressStateSnapshot { - t.Errorf("state = %s, want %s", p.State, ProgressStateSnapshot) - } - if p.Match != 1 { - t.Errorf("match = %d, want 1", p.Match) - } - if p.PendingSnapshot != 10 { - t.Errorf("pendingSnapshot = %d, want 10", p.PendingSnapshot) - } -} - -func TestProgressUpdate(t *testing.T) { - prevM, prevN := uint64(3), uint64(5) - tests := []struct { - update uint64 - - wm uint64 - wn uint64 - wok bool - }{ - {prevM - 1, prevM, prevN, false}, // do not decrease match, next - {prevM, prevM, prevN, false}, // do not decrease next - {prevM + 1, prevM + 1, prevN, true}, // increase match, do not decrease next - {prevM + 2, prevM + 2, prevN + 1, true}, // increase match, next - } - for i, tt := range tests { - p := &Progress{ - Match: prevM, - Next: prevN, - } - ok := p.maybeUpdate(tt.update) - if ok != tt.wok { - t.Errorf("#%d: ok= %v, want %v", i, ok, tt.wok) - } - if p.Match != tt.wm { - t.Errorf("#%d: match= %d, want %d", i, p.Match, tt.wm) - } - if p.Next != tt.wn { - t.Errorf("#%d: next= %d, want %d", i, p.Next, tt.wn) - } - } -} - -func TestProgressMaybeDecr(t *testing.T) { - tests := []struct { - state ProgressStateType - m uint64 - n uint64 - rejected uint64 - last uint64 - - w bool - wn uint64 - }{ - { - // state replicate and rejected is not greater than match - ProgressStateReplicate, 5, 10, 5, 5, false, 10, - }, - { - // state replicate and rejected is not greater than match - ProgressStateReplicate, 5, 10, 4, 4, false, 10, - }, - { - // state replicate and rejected is greater than match - // directly decrease to match+1 - ProgressStateReplicate, 5, 10, 9, 9, true, 6, - }, - { - // next-1 != rejected is always false - ProgressStateProbe, 0, 0, 0, 0, false, 0, - }, - { - // next-1 != rejected is always false - ProgressStateProbe, 0, 10, 5, 5, false, 10, - }, - { - // next>1 = decremented by 1 - ProgressStateProbe, 0, 10, 9, 9, true, 9, - }, - { - // next>1 = decremented by 1 - ProgressStateProbe, 0, 2, 1, 1, true, 1, - }, - { - // next<=1 = reset to 1 - ProgressStateProbe, 0, 1, 0, 0, true, 1, - }, - { - // decrease to min(rejected, last+1) - ProgressStateProbe, 0, 10, 9, 2, true, 3, - }, - { - // rejected < 1, reset to 1 - ProgressStateProbe, 0, 10, 9, 0, true, 1, - }, - } - for i, tt := range tests { - p := &Progress{ - State: tt.state, - Match: tt.m, - Next: tt.n, - } - if g := p.maybeDecrTo(tt.rejected, tt.last); g != tt.w { - t.Errorf("#%d: maybeDecrTo= %t, want %t", i, g, tt.w) - } - if gm := p.Match; gm != tt.m { - t.Errorf("#%d: match= %d, want %d", i, gm, tt.m) - } - if gn := p.Next; gn != tt.wn { - t.Errorf("#%d: next= %d, want %d", i, gn, tt.wn) - } - } -} - -func TestProgressIsPaused(t *testing.T) { - tests := []struct { - state ProgressStateType - paused bool - - w bool - }{ - {ProgressStateProbe, false, false}, - {ProgressStateProbe, true, true}, - {ProgressStateReplicate, false, false}, - {ProgressStateReplicate, true, false}, - {ProgressStateSnapshot, false, true}, - {ProgressStateSnapshot, true, true}, - } - for i, tt := range tests { - p := &Progress{ - State: tt.state, - Paused: tt.paused, - ins: newInflights(256), - } - if g := p.IsPaused(); g != tt.w { - t.Errorf("#%d: paused= %t, want %t", i, g, tt.w) - } - } -} - -// TestProgressResume ensures that progress.maybeUpdate and progress.maybeDecrTo -// will reset progress.paused. -func TestProgressResume(t *testing.T) { - p := &Progress{ - Next: 2, - Paused: true, - } - p.maybeDecrTo(1, 1) - if p.Paused { - t.Errorf("paused= %v, want false", p.Paused) - } - p.Paused = true - p.maybeUpdate(2) - if p.Paused { - t.Errorf("paused= %v, want false", p.Paused) - } -} - func TestProgressLeader(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() - r.prs.prs[2].becomeReplicate() + r.prs.Progress[2].BecomeReplicate() // Send proposals to r1. The first 5 entries should be appended to the log. propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}} for i := 0; i < 5; i++ { - if pr := r.prs.prs[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { + if pr := r.prs.Progress[r.id]; pr.State != tracker.StateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { t.Errorf("unexpected progress %v", pr) } if err := r.Step(propMsg); err != nil { @@ -291,17 +80,17 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.prs.prs[2].Paused = true + r.prs.Progress[2].ProbeSent = true r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if !r.prs.prs[2].Paused { - t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) + if !r.prs.Progress[2].ProbeSent { + t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) } - r.prs.prs[2].becomeReplicate() + r.prs.Progress[2].BecomeReplicate() r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - if r.prs.prs[2].Paused { - t.Errorf("paused = %v, want false", r.prs.prs[2].Paused) + if r.prs.Progress[2].ProbeSent { + t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent) } } @@ -331,7 +120,7 @@ func TestProgressFlowControl(t *testing.T) { r.readMessages() // While node 2 is in probe state, propose a bunch of entries. - r.prs.prs[2].becomeProbe() + r.prs.Progress[2].BecomeProbe() blob := []byte(strings.Repeat("a", 1000)) for i := 0; i < 10; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}}) @@ -409,8 +198,8 @@ func TestUncommittedEntryLimit(t *testing.T) { // Set the two followers to the replicate state. Commit to tail of log. const numFollowers = 2 - r.prs.prs[2].becomeReplicate() - r.prs.prs[3].becomeReplicate() + r.prs.Progress[2].BecomeReplicate() + r.prs.Progress[3].BecomeReplicate() r.uncommittedSize = 0 // Send proposals to r1. The first 5 entries should be appended to the log. @@ -889,7 +678,7 @@ func TestLearnerLogReplication(t *testing.T) { t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed) } - match := n1.prs.getProgress(2).Match + match := n1.prs.Progress[2].Match if match != n2.raftLog.committed { t.Errorf("progress 2 of leader 1 wants match %d, but got %d", n2.raftLog.committed, match) } @@ -1351,9 +1140,9 @@ func TestCommit(t *testing.T) { storage.hardState = pb.HardState{Term: tt.smTerm} sm := newTestRaft(1, []uint64{1}, 10, 2, storage) - sm.prs.removeAny(1) + sm.prs.RemoveAny(1) for j := 0; j < len(tt.matches); j++ { - sm.prs.initProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false) + sm.prs.InitProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false) } sm.maybeCommit() if g := sm.raftLog.committed; g != tt.w { @@ -2138,7 +1927,7 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) { nt := newNetwork(a, b) setRandomizedElectionTimeout(b, b.electionTimeout+1) // Need to remove 2 again to make it a non-promotable node since newNetwork overwritten some internal states - b.prs.removeAny(2) + b.prs.RemoveAny(2) if b.promotable() { t.Fatalf("promotable = %v, want false", b.promotable()) @@ -2632,7 +2421,7 @@ func TestLeaderAppResp(t *testing.T) { sm.readMessages() sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index}) - p := sm.prs.prs[2] + p := sm.prs.Progress[2] if p.Match != tt.wmatch { t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch) } @@ -2679,9 +2468,9 @@ func TestBcastBeat(t *testing.T) { mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1}) } // slow follower - sm.prs.prs[2].Match, sm.prs.prs[2].Next = 5, 6 + sm.prs.Progress[2].Match, sm.prs.Progress[2].Next = 5, 6 // normal follower - sm.prs.prs[3].Match, sm.prs.prs[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 + sm.prs.Progress[3].Match, sm.prs.Progress[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 sm.Step(pb.Message{Type: pb.MsgBeat}) msgs := sm.readMessages() @@ -2689,8 +2478,8 @@ func TestBcastBeat(t *testing.T) { t.Fatalf("len(msgs) = %v, want 2", len(msgs)) } wantCommitMap := map[uint64]uint64{ - 2: min(sm.raftLog.committed, sm.prs.prs[2].Match), - 3: min(sm.raftLog.committed, sm.prs.prs[3].Match), + 2: min(sm.raftLog.committed, sm.prs.Progress[2].Match), + 3: min(sm.raftLog.committed, sm.prs.Progress[3].Match), } for i, m := range msgs { if m.Type != pb.MsgHeartbeat { @@ -2759,16 +2548,16 @@ func TestLeaderIncreaseNext(t *testing.T) { previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}} tests := []struct { // progress - state ProgressStateType + state tracker.StateType next uint64 wnext uint64 }{ // state replicate, optimistically increase next // previous entries + noop entry + propose + 1 - {ProgressStateReplicate, 2, uint64(len(previousEnts) + 1 + 1 + 1)}, + {tracker.StateReplicate, 2, uint64(len(previousEnts) + 1 + 1 + 1)}, // state probe, not optimistically increase next - {ProgressStateProbe, 2, 2}, + {tracker.StateProbe, 2, 2}, } for i, tt := range tests { @@ -2776,11 +2565,11 @@ func TestLeaderIncreaseNext(t *testing.T) { sm.raftLog.append(previousEnts...) sm.becomeCandidate() sm.becomeLeader() - sm.prs.prs[2].State = tt.state - sm.prs.prs[2].Next = tt.next + sm.prs.Progress[2].State = tt.state + sm.prs.Progress[2].Next = tt.next sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - p := sm.prs.prs[2] + p := sm.prs.Progress[2] if p.Next != tt.wnext { t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext) } @@ -2792,7 +2581,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs.prs[2].becomeProbe() + r.prs.Progress[2].BecomeProbe() // each round is a heartbeat for i := 0; i < 3; i++ { @@ -2811,8 +2600,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { } } - if !r.prs.prs[2].Paused { - t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) + if !r.prs.Progress[2].ProbeSent { + t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2826,8 +2615,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } - if !r.prs.prs[2].Paused { - t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) + if !r.prs.Progress[2].ProbeSent { + t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) } // consume the heartbeat @@ -2849,8 +2638,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { if msg[0].Index != 0 { t.Errorf("index = %d, want %d", msg[0].Index, 0) } - if !r.prs.prs[2].Paused { - t.Errorf("paused = %v, want true", r.prs.prs[2].Paused) + if !r.prs.Progress[2].ProbeSent { + t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) } } @@ -2859,7 +2648,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs.prs[2].becomeReplicate() + r.prs.Progress[2].BecomeReplicate() for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2876,7 +2665,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs.prs[2].becomeSnapshot(10) + r.prs.Progress[2].BecomeSnapshot(10) for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2897,17 +2686,17 @@ func TestRecvMsgUnreachable(t *testing.T) { r.becomeLeader() r.readMessages() // set node 2 to state replicate - r.prs.prs[2].Match = 3 - r.prs.prs[2].becomeReplicate() - r.prs.prs[2].optimisticUpdate(5) + r.prs.Progress[2].Match = 3 + r.prs.Progress[2].BecomeReplicate() + r.prs.Progress[2].OptimisticUpdate(5) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) - if r.prs.prs[2].State != ProgressStateProbe { - t.Errorf("state = %s, want %s", r.prs.prs[2].State, ProgressStateProbe) + if r.prs.Progress[2].State != tracker.StateProbe { + t.Errorf("state = %s, want %s", r.prs.Progress[2].State, tracker.StateProbe) } - if wnext := r.prs.prs[2].Match + 1; r.prs.prs[2].Next != wnext { - t.Errorf("next = %d, want %d", r.prs.prs[2].Next, wnext) + if wnext := r.prs.Progress[2].Match + 1; r.prs.Progress[2].Next != wnext { + t.Errorf("next = %d, want %d", r.prs.Progress[2].Next, wnext) } } @@ -2932,7 +2721,7 @@ func TestRestore(t *testing.T) { if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term { t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term) } - sg := sm.prs.voterNodes() + sg := sm.prs.VoterNodes() if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) { t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Metadata.ConfState.Nodes) } @@ -2964,22 +2753,22 @@ func TestRestoreWithLearner(t *testing.T) { if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term { t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term) } - sg := sm.prs.voterNodes() + sg := sm.prs.VoterNodes() if len(sg) != len(s.Metadata.ConfState.Nodes) { t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Nodes) } - lns := sm.prs.learnerNodes() + lns := sm.prs.LearnerNodes() if len(lns) != len(s.Metadata.ConfState.Learners) { t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners) } for _, n := range s.Metadata.ConfState.Nodes { - if sm.prs.prs[n].IsLearner { - t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], false) + if sm.prs.Progress[n].IsLearner { + t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.Progress[n], false) } } for _, n := range s.Metadata.ConfState.Learners { - if !sm.prs.prs[n].IsLearner { - t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], true) + if !sm.prs.Progress[n].IsLearner { + t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.Progress[n], true) } } @@ -3121,8 +2910,8 @@ func TestProvideSnap(t *testing.T) { sm.becomeLeader() // force set the next of node 2, so that node 2 needs a snapshot - sm.prs.prs[2].Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true}) + sm.prs.Progress[2].Next = sm.raftLog.firstIndex() + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.Progress[2].Next - 1, Reject: true}) msgs := sm.readMessages() if len(msgs) != 1 { @@ -3152,8 +2941,8 @@ func TestIgnoreProvidingSnap(t *testing.T) { // force set the next of node 2, so that node 2 needs a snapshot // change node 2 to be inactive, expect node 1 ignore sending snapshot to 2 - sm.prs.prs[2].Next = sm.raftLog.firstIndex() - 1 - sm.prs.prs[2].RecentActive = false + sm.prs.Progress[2].Next = sm.raftLog.firstIndex() - 1 + sm.prs.Progress[2].RecentActive = false sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) @@ -3193,7 +2982,7 @@ func TestSlowNodeRestore(t *testing.T) { } lead := nt.peers[1].(*raft) nextEnts(lead, nt.storage[1]) - nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.voterNodes()}, nil) + nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.VoterNodes()}, nil) nt.storage[1].Compact(lead.raftLog.applied) nt.recover() @@ -3201,7 +2990,7 @@ func TestSlowNodeRestore(t *testing.T) { // node 3 will only be considered as active when node 1 receives a reply from it. for { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if lead.prs.prs[3].RecentActive { + if lead.prs.Progress[3].RecentActive { break } } @@ -3288,7 +3077,7 @@ func TestNewLeaderPendingConfig(t *testing.T) { func TestAddNode(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) r.addNode(2) - nodes := r.prs.voterNodes() + nodes := r.prs.VoterNodes() wnodes := []uint64{1, 2} if !reflect.DeepEqual(nodes, wnodes) { t.Errorf("nodes = %v, want %v", nodes, wnodes) @@ -3299,13 +3088,13 @@ func TestAddNode(t *testing.T) { func TestAddLearner(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) r.addLearner(2) - nodes := r.prs.learnerNodes() + nodes := r.prs.LearnerNodes() wnodes := []uint64{2} if !reflect.DeepEqual(nodes, wnodes) { t.Errorf("nodes = %v, want %v", nodes, wnodes) } - if !r.prs.prs[2].IsLearner { - t.Errorf("node 2 is learner %t, want %t", r.prs.prs[2].IsLearner, true) + if !r.prs.Progress[2].IsLearner { + t.Errorf("node 2 is learner %t, want %t", r.prs.Progress[2].IsLearner, true) } } @@ -3349,14 +3138,14 @@ func TestRemoveNode(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) r.removeNode(2) w := []uint64{1} - if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) { + if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } // remove all nodes from cluster r.removeNode(1) w = []uint64{} - if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) { + if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } } @@ -3367,18 +3156,18 @@ func TestRemoveLearner(t *testing.T) { r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) r.removeNode(2) w := []uint64{1} - if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) { + if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } w = []uint64{} - if g := r.prs.learnerNodes(); !reflect.DeepEqual(g, w) { + if g := r.prs.LearnerNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } // remove all nodes from cluster r.removeNode(1) - if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) { + if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } } @@ -3417,8 +3206,8 @@ func TestRaftNodes(t *testing.T) { } for i, tt := range tests { r := newTestRaft(1, tt.ids, 10, 1, NewMemoryStorage()) - if !reflect.DeepEqual(r.prs.voterNodes(), tt.wids) { - t.Errorf("#%d: nodes = %+v, want %+v", i, r.prs.voterNodes(), tt.wids) + if !reflect.DeepEqual(r.prs.VoterNodes(), tt.wids) { + t.Errorf("#%d: nodes = %+v, want %+v", i, r.prs.VoterNodes(), tt.wids) } } } @@ -3619,8 +3408,8 @@ func TestLeaderTransferToSlowFollower(t *testing.T) { nt.recover() lead := nt.peers[1].(*raft) - if lead.prs.prs[3].Match != 1 { - t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1) + if lead.prs.Progress[3].Match != 1 { + t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.Progress[3].Match, 1) } // Transfer leadership to 3 when node 3 is lack of log. @@ -3638,12 +3427,12 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) lead := nt.peers[1].(*raft) nextEnts(lead, nt.storage[1]) - nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.voterNodes()}, nil) + nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.VoterNodes()}, nil) nt.storage[1].Compact(lead.raftLog.applied) nt.recover() - if lead.prs.prs[3].Match != 1 { - t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1) + if lead.prs.Progress[3].Match != 1 { + t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.Progress[3].Match, 1) } // Transfer leadership to 3 when node 3 is lack of snapshot. @@ -3722,8 +3511,8 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) { t.Fatalf("should return drop proposal error while transferring") } - if lead.prs.prs[1].Match != 1 { - t.Fatalf("node 1 has match %x, want %x", lead.prs.prs[1].Match, 1) + if lead.prs.Progress[1].Match != 1 { + t.Fatalf("node 1 has match %x, want %x", lead.prs.Progress[1].Match, 1) } } @@ -4329,24 +4118,21 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw sm := newRaft(cfg) npeers[id] = sm case *raft: - learners := make(map[uint64]bool, len(v.prs.learners)) - for i := range v.prs.learners { + learners := make(map[uint64]bool, len(v.prs.Learners)) + for i := range v.prs.Learners { learners[i] = true } v.id = id - v.prs.voters[0] = make(map[uint64]struct{}) - v.prs.voters[1] = make(map[uint64]struct{}) - v.prs.learners = make(map[uint64]struct{}) - v.prs.prs = make(map[uint64]*Progress) + v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight) for i := 0; i < size; i++ { - pr := &Progress{} + pr := &tracker.Progress{} if _, ok := learners[peerAddrs[i]]; ok { pr.IsLearner = true - v.prs.learners[peerAddrs[i]] = struct{}{} + v.prs.Learners[peerAddrs[i]] = struct{}{} } else { - v.prs.voters[0][peerAddrs[i]] = struct{}{} + v.prs.Voters[0][peerAddrs[i]] = struct{}{} } - v.prs.prs[peerAddrs[i]] = pr + v.prs.Progress[peerAddrs[i]] = pr } v.reset(v.Term) npeers[id] = v diff --git a/raft/rawnode.go b/raft/rawnode.go index d31de6e7d..d7e54eeea 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -18,6 +18,7 @@ import ( "errors" pb "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/raft/tracker" ) // ErrStepLocalMsg is returned when try to step a local raft message @@ -166,7 +167,7 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error { // ApplyConfChange applies a config change to the local node. func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { if cc.NodeID == None { - return &pb.ConfState{Nodes: rn.raft.prs.voterNodes(), Learners: rn.raft.prs.learnerNodes()} + return &pb.ConfState{Nodes: rn.raft.prs.VoterNodes(), Learners: rn.raft.prs.LearnerNodes()} } switch cc.Type { case pb.ConfChangeAddNode: @@ -179,7 +180,7 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { default: panic("unexpected conf type") } - return &pb.ConfState{Nodes: rn.raft.prs.voterNodes(), Learners: rn.raft.prs.learnerNodes()} + return &pb.ConfState{Nodes: rn.raft.prs.VoterNodes(), Learners: rn.raft.prs.LearnerNodes()} } // Step advances the state machine using the given message. @@ -188,7 +189,7 @@ func (rn *RawNode) Step(m pb.Message) error { if IsLocalMsg(m.Type) { return ErrStepLocalMsg } - if pr := rn.raft.prs.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { + if pr := rn.raft.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) { return rn.raft.Step(m) } return ErrStepPeerNotFound @@ -256,14 +257,14 @@ const ( // WithProgress is a helper to introspect the Progress for this node and its // peers. -func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress)) { - rn.raft.prs.visit(func(id uint64, pr *Progress) { +func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr tracker.Progress)) { + rn.raft.prs.Visit(func(id uint64, pr *tracker.Progress) { typ := ProgressTypePeer if pr.IsLearner { typ = ProgressTypeLearner } p := *pr - p.ins = nil + p.Inflights = nil visitor(id, typ, p) }) } diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 2ad3662e9..425324f46 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -22,6 +22,7 @@ import ( "testing" "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/raft/tracker" ) // rawNodeAdapter is essentially a lint that makes sure that RawNode implements @@ -638,7 +639,7 @@ func BenchmarkStatusProgress(b *testing.B) { b.Run("WithProgress", func(b *testing.B) { b.ReportAllocs() - visit := func(uint64, ProgressType, Progress) {} + visit := func(uint64, ProgressType, tracker.Progress) {} for i := 0; i < b.N; i++ { rn.WithProgress(visit) @@ -648,7 +649,7 @@ func BenchmarkStatusProgress(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { var n uint64 - visit := func(_ uint64, _ ProgressType, pr Progress) { + visit := func(_ uint64, _ ProgressType, pr tracker.Progress) { n += pr.Match } rn.WithProgress(visit) diff --git a/raft/status.go b/raft/status.go index 197a6820d..bf4898c9e 100644 --- a/raft/status.go +++ b/raft/status.go @@ -18,6 +18,7 @@ import ( "fmt" pb "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/raft/tracker" ) type Status struct { @@ -27,20 +28,20 @@ type Status struct { SoftState Applied uint64 - Progress map[uint64]Progress + Progress map[uint64]tracker.Progress LeadTransferee uint64 } -func getProgressCopy(r *raft) map[uint64]Progress { - m := make(map[uint64]Progress) - r.prs.visit(func(id uint64, pr *Progress) { - var p Progress +func getProgressCopy(r *raft) map[uint64]tracker.Progress { + m := make(map[uint64]tracker.Progress) + r.prs.Visit(func(id uint64, pr *tracker.Progress) { + var p tracker.Progress p, pr = *pr, nil /* avoid accidental reuse below */ // The inflight buffer is tricky to copy and besides, it isn't exposed // to the client, so pretend it's nil. - p.ins = nil + p.Inflights = nil m[id] = p }) diff --git a/raft/tracker/inflights.go b/raft/tracker/inflights.go new file mode 100644 index 000000000..9e209e21e --- /dev/null +++ b/raft/tracker/inflights.go @@ -0,0 +1,124 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracker + +// Inflights limits the number of MsgApp (represented by the largest index +// contained within) sent to followers but not yet acknowledged by them. Callers +// use Full() to check whether more messages can be sent, call Add() whenever +// they are sending a new append, and release "quota" via FreeLE() whenever an +// ack is received. +type Inflights struct { + // the starting index in the buffer + start int + // number of inflights in the buffer + count int + + // the size of the buffer + size int + + // buffer contains the index of the last entry + // inside one message. + buffer []uint64 +} + +// NewInflights sets up an Inflights that allows up to 'size' inflight messages. +func NewInflights(size int) *Inflights { + return &Inflights{ + size: size, + } +} + +// Add notifies the Inflights that a new message with the given index is being +// dispatched. Full() must be called prior to Add() to verify that there is room +// for one more message, and consecutive calls to add Add() must provide a +// monotonic sequence of indexes. +func (in *Inflights) Add(inflight uint64) { + if in.Full() { + panic("cannot add into a Full inflights") + } + next := in.start + in.count + size := in.size + if next >= size { + next -= size + } + if next >= len(in.buffer) { + in.grow() + } + in.buffer[next] = inflight + in.count++ +} + +// grow the inflight buffer by doubling up to inflights.size. We grow on demand +// instead of preallocating to inflights.size to handle systems which have +// thousands of Raft groups per process. +func (in *Inflights) grow() { + newSize := len(in.buffer) * 2 + if newSize == 0 { + newSize = 1 + } else if newSize > in.size { + newSize = in.size + } + newBuffer := make([]uint64, newSize) + copy(newBuffer, in.buffer) + in.buffer = newBuffer +} + +// FreeLE frees the inflights smaller or equal to the given `to` flight. +func (in *Inflights) FreeLE(to uint64) { + if in.count == 0 || to < in.buffer[in.start] { + // out of the left side of the window + return + } + + idx := in.start + var i int + for i = 0; i < in.count; i++ { + if to < in.buffer[idx] { // found the first large inflight + break + } + + // increase index and maybe rotate + size := in.size + if idx++; idx >= size { + idx -= size + } + } + // free i inflights and set new start index + in.count -= i + in.start = idx + if in.count == 0 { + // inflights is empty, reset the start index so that we don't grow the + // buffer unnecessarily. + in.start = 0 + } +} + +// FreeFirstOne releases the first inflight. This is a no-op if nothing is +// inflight. +func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) } + +// Full returns true if no more messages can be sent at the moment. +func (in *Inflights) Full() bool { + return in.count == in.size +} + +// Count returns the number of inflight messages. +func (in *Inflights) Count() int { return in.count } + +// reset frees all inflights. +func (in *Inflights) reset() { + in.count = 0 + in.start = 0 +} diff --git a/raft/progress_test.go b/raft/tracker/inflights_test.go similarity index 84% rename from raft/progress_test.go rename to raft/tracker/inflights_test.go index cf92eb8ec..582a373ba 100644 --- a/raft/progress_test.go +++ b/raft/tracker/inflights_test.go @@ -1,4 +1,4 @@ -// Copyright 2015 The etcd Authors +// Copyright 2019 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package raft +package tracker import ( "reflect" @@ -21,16 +21,16 @@ import ( func TestInflightsAdd(t *testing.T) { // no rotating case - in := &inflights{ + in := &Inflights{ size: 10, buffer: make([]uint64, 10), } for i := 0; i < 5; i++ { - in.add(uint64(i)) + in.Add(uint64(i)) } - wantIn := &inflights{ + wantIn := &Inflights{ start: 0, count: 5, size: 10, @@ -43,10 +43,10 @@ func TestInflightsAdd(t *testing.T) { } for i := 5; i < 10; i++ { - in.add(uint64(i)) + in.Add(uint64(i)) } - wantIn2 := &inflights{ + wantIn2 := &Inflights{ start: 0, count: 10, size: 10, @@ -59,17 +59,17 @@ func TestInflightsAdd(t *testing.T) { } // rotating case - in2 := &inflights{ + in2 := &Inflights{ start: 5, size: 10, buffer: make([]uint64, 10), } for i := 0; i < 5; i++ { - in2.add(uint64(i)) + in2.Add(uint64(i)) } - wantIn21 := &inflights{ + wantIn21 := &Inflights{ start: 5, count: 5, size: 10, @@ -82,10 +82,10 @@ func TestInflightsAdd(t *testing.T) { } for i := 5; i < 10; i++ { - in2.add(uint64(i)) + in2.Add(uint64(i)) } - wantIn22 := &inflights{ + wantIn22 := &Inflights{ start: 5, count: 10, size: 10, @@ -100,14 +100,14 @@ func TestInflightsAdd(t *testing.T) { func TestInflightFreeTo(t *testing.T) { // no rotating case - in := newInflights(10) + in := NewInflights(10) for i := 0; i < 10; i++ { - in.add(uint64(i)) + in.Add(uint64(i)) } - in.freeTo(4) + in.FreeLE(4) - wantIn := &inflights{ + wantIn := &Inflights{ start: 5, count: 5, size: 10, @@ -119,9 +119,9 @@ func TestInflightFreeTo(t *testing.T) { t.Fatalf("in = %+v, want %+v", in, wantIn) } - in.freeTo(8) + in.FreeLE(8) - wantIn2 := &inflights{ + wantIn2 := &Inflights{ start: 9, count: 1, size: 10, @@ -135,12 +135,12 @@ func TestInflightFreeTo(t *testing.T) { // rotating case for i := 10; i < 15; i++ { - in.add(uint64(i)) + in.Add(uint64(i)) } - in.freeTo(12) + in.FreeLE(12) - wantIn3 := &inflights{ + wantIn3 := &Inflights{ start: 3, count: 2, size: 10, @@ -152,9 +152,9 @@ func TestInflightFreeTo(t *testing.T) { t.Fatalf("in = %+v, want %+v", in, wantIn3) } - in.freeTo(14) + in.FreeLE(14) - wantIn4 := &inflights{ + wantIn4 := &Inflights{ start: 0, count: 0, size: 10, @@ -168,14 +168,14 @@ func TestInflightFreeTo(t *testing.T) { } func TestInflightFreeFirstOne(t *testing.T) { - in := newInflights(10) + in := NewInflights(10) for i := 0; i < 10; i++ { - in.add(uint64(i)) + in.Add(uint64(i)) } - in.freeFirstOne() + in.FreeFirstOne() - wantIn := &inflights{ + wantIn := &Inflights{ start: 1, count: 9, size: 10, diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go new file mode 100644 index 000000000..d580332f8 --- /dev/null +++ b/raft/tracker/progress.go @@ -0,0 +1,215 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracker + +import "fmt" + +// Progress represents a follower’s progress in the view of the leader. Leader +// maintains progresses of all followers, and sends entries to the follower +// based on its progress. +// +// NB(tbg): Progress is basically a state machine whose transitions are mostly +// strewn around `*raft.raft`. Additionally, some fields are only used when in a +// certain State. All of this isn't ideal. +type Progress struct { + Match, Next uint64 + // State defines how the leader should interact with the follower. + // + // When in StateProbe, leader sends at most one replication message + // per heartbeat interval. It also probes actual progress of the follower. + // + // When in StateReplicate, leader optimistically increases next + // to the latest entry sent after sending replication message. This is + // an optimized state for fast replicating log entries to the follower. + // + // When in StateSnapshot, leader should have sent out snapshot + // before and stops sending any replication message. + State StateType + + // PendingSnapshot is used in StateSnapshot. + // If there is a pending snapshot, the pendingSnapshot will be set to the + // index of the snapshot. If pendingSnapshot is set, the replication process of + // this Progress will be paused. raft will not resend snapshot until the pending one + // is reported to be failed. + PendingSnapshot uint64 + + // RecentActive is true if the progress is recently active. Receiving any messages + // from the corresponding follower indicates the progress is active. + // RecentActive can be reset to false after an election timeout. + RecentActive bool + + // ProbeSent is used while this follow is in StateProbe. When ProbeSent is + // true, raft should pause sending replication message to this peer until + // ProbeSent is reset. See ProbeAcked() and IsPaused(). + ProbeSent bool + + // Inflights is a sliding window for the inflight messages. + // Each inflight message contains one or more log entries. + // The max number of entries per message is defined in raft config as MaxSizePerMsg. + // Thus inflight effectively limits both the number of inflight messages + // and the bandwidth each Progress can use. + // When inflights is Full, no more message should be sent. + // When a leader sends out a message, the index of the last + // entry should be added to inflights. The index MUST be added + // into inflights in order. + // When a leader receives a reply, the previous inflights should + // be freed by calling inflights.FreeLE with the index of the last + // received entry. + Inflights *Inflights + + // IsLearner is true if this progress is tracked for a learner. + IsLearner bool +} + +// ResetState moves the Progress into the specified State, resetting ProbeSent, +// PendingSnapshot, and Inflights. +func (pr *Progress) ResetState(state StateType) { + pr.ProbeSent = false + pr.PendingSnapshot = 0 + pr.State = state + pr.Inflights.reset() +} + +func max(a, b uint64) uint64 { + if a > b { + return a + } + return b +} + +func min(a, b uint64) uint64 { + if a > b { + return b + } + return a +} + +// ProbeAcked is called when this peer has accepted an append. It resets +// ProbeSent to signal that additional append messages should be sent without +// further delay. +func (pr *Progress) ProbeAcked() { + pr.ProbeSent = false +} + +// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or, +// optionally and if larger, the index of the pending snapshot. +func (pr *Progress) BecomeProbe() { + // If the original state is StateSnapshot, progress knows that + // the pending snapshot has been sent to this peer successfully, then + // probes from pendingSnapshot + 1. + if pr.State == StateSnapshot { + pendingSnapshot := pr.PendingSnapshot + pr.ResetState(StateProbe) + pr.Next = max(pr.Match+1, pendingSnapshot+1) + } else { + pr.ResetState(StateProbe) + pr.Next = pr.Match + 1 + } +} + +// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1. +func (pr *Progress) BecomeReplicate() { + pr.ResetState(StateReplicate) + pr.Next = pr.Match + 1 +} + +// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending +// snapshot index. +func (pr *Progress) BecomeSnapshot(snapshoti uint64) { + pr.ResetState(StateSnapshot) + pr.PendingSnapshot = snapshoti +} + +// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the +// index acked by it. The method returns false if the given n index comes from +// an outdated message. Otherwise it updates the progress and returns true. +func (pr *Progress) MaybeUpdate(n uint64) bool { + var updated bool + if pr.Match < n { + pr.Match = n + updated = true + pr.ProbeAcked() + } + if pr.Next < n+1 { + pr.Next = n + 1 + } + return updated +} + +// OptimisticUpdate signals that appends all the way up to and including index n +// are in-flight. As a result, Next is increased to n+1. +func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 } + +// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The +// arguments are the index the follower rejected to append to its log, and its +// last index. +// +// Rejections can happen spuriously as messages are sent out of order or +// duplicated. In such cases, the rejection pertains to an index that the +// Progress already knows were previously acknowledged, and false is returned +// without changing the Progress. +// +// If the rejection is genuine, Next is lowered sensibly, and the Progress is +// cleared for sending log entries. +func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool { + if pr.State == StateReplicate { + // The rejection must be stale if the progress has matched and "rejected" + // is smaller than "match". + if rejected <= pr.Match { + return false + } + // Directly decrease next to match + 1. + // + // TODO(tbg): why not use last if it's larger? + pr.Next = pr.Match + 1 + return true + } + + // The rejection must be stale if "rejected" does not match next - 1. This + // is because non-replicating followers are probed one entry at a time. + if pr.Next-1 != rejected { + return false + } + + if pr.Next = min(rejected, last+1); pr.Next < 1 { + pr.Next = 1 + } + pr.ProbeSent = false + return true +} + +// IsPaused returns whether sending log entries to this node has been throttled. +// This is done when a node has rejected recent MsgApps, is currently waiting +// for a snapshot, or has reached the MaxInflightMsgs limit. In normal +// operation, this is false. A throttled node will be contacted less frequently +// until it has reached a state in which it's able to accept a steady stream of +// log entries again. +func (pr *Progress) IsPaused() bool { + switch pr.State { + case StateProbe: + return pr.ProbeSent + case StateReplicate: + return pr.Inflights.Full() + case StateSnapshot: + return true + default: + panic("unexpected state") + } +} + +func (pr *Progress) String() string { + return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d, recentActive = %v, isLearner = %v", + pr.Next, pr.Match, pr.State, pr.IsPaused(), pr.PendingSnapshot, pr.RecentActive, pr.IsLearner) +} diff --git a/raft/tracker/progress_test.go b/raft/tracker/progress_test.go new file mode 100644 index 000000000..2e657f72e --- /dev/null +++ b/raft/tracker/progress_test.go @@ -0,0 +1,231 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracker + +import ( + "testing" +) + +func TestProgressIsPaused(t *testing.T) { + tests := []struct { + state StateType + paused bool + + w bool + }{ + {StateProbe, false, false}, + {StateProbe, true, true}, + {StateReplicate, false, false}, + {StateReplicate, true, false}, + {StateSnapshot, false, true}, + {StateSnapshot, true, true}, + } + for i, tt := range tests { + p := &Progress{ + State: tt.state, + ProbeSent: tt.paused, + Inflights: NewInflights(256), + } + if g := p.IsPaused(); g != tt.w { + t.Errorf("#%d: paused= %t, want %t", i, g, tt.w) + } + } +} + +// TestProgressResume ensures that MaybeUpdate and MaybeDecrTo will reset +// ProbeSent. +func TestProgressResume(t *testing.T) { + p := &Progress{ + Next: 2, + ProbeSent: true, + } + p.MaybeDecrTo(1, 1) + if p.ProbeSent { + t.Errorf("paused= %v, want false", p.ProbeSent) + } + p.ProbeSent = true + p.MaybeUpdate(2) + if p.ProbeSent { + t.Errorf("paused= %v, want false", p.ProbeSent) + } +} + +func TestProgressBecomeProbe(t *testing.T) { + match := uint64(1) + tests := []struct { + p *Progress + wnext uint64 + }{ + { + &Progress{State: StateReplicate, Match: match, Next: 5, Inflights: NewInflights(256)}, + 2, + }, + { + // snapshot finish + &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256)}, + 11, + }, + { + // snapshot failure + &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256)}, + 2, + }, + } + for i, tt := range tests { + tt.p.BecomeProbe() + if tt.p.State != StateProbe { + t.Errorf("#%d: state = %s, want %s", i, tt.p.State, StateProbe) + } + if tt.p.Match != match { + t.Errorf("#%d: match = %d, want %d", i, tt.p.Match, match) + } + if tt.p.Next != tt.wnext { + t.Errorf("#%d: next = %d, want %d", i, tt.p.Next, tt.wnext) + } + } +} + +func TestProgressBecomeReplicate(t *testing.T) { + p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)} + p.BecomeReplicate() + + if p.State != StateReplicate { + t.Errorf("state = %s, want %s", p.State, StateReplicate) + } + if p.Match != 1 { + t.Errorf("match = %d, want 1", p.Match) + } + if w := p.Match + 1; p.Next != w { + t.Errorf("next = %d, want %d", p.Next, w) + } +} + +func TestProgressBecomeSnapshot(t *testing.T) { + p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)} + p.BecomeSnapshot(10) + + if p.State != StateSnapshot { + t.Errorf("state = %s, want %s", p.State, StateSnapshot) + } + if p.Match != 1 { + t.Errorf("match = %d, want 1", p.Match) + } + if p.PendingSnapshot != 10 { + t.Errorf("pendingSnapshot = %d, want 10", p.PendingSnapshot) + } +} + +func TestProgressUpdate(t *testing.T) { + prevM, prevN := uint64(3), uint64(5) + tests := []struct { + update uint64 + + wm uint64 + wn uint64 + wok bool + }{ + {prevM - 1, prevM, prevN, false}, // do not decrease match, next + {prevM, prevM, prevN, false}, // do not decrease next + {prevM + 1, prevM + 1, prevN, true}, // increase match, do not decrease next + {prevM + 2, prevM + 2, prevN + 1, true}, // increase match, next + } + for i, tt := range tests { + p := &Progress{ + Match: prevM, + Next: prevN, + } + ok := p.MaybeUpdate(tt.update) + if ok != tt.wok { + t.Errorf("#%d: ok= %v, want %v", i, ok, tt.wok) + } + if p.Match != tt.wm { + t.Errorf("#%d: match= %d, want %d", i, p.Match, tt.wm) + } + if p.Next != tt.wn { + t.Errorf("#%d: next= %d, want %d", i, p.Next, tt.wn) + } + } +} + +func TestProgressMaybeDecr(t *testing.T) { + tests := []struct { + state StateType + m uint64 + n uint64 + rejected uint64 + last uint64 + + w bool + wn uint64 + }{ + { + // state replicate and rejected is not greater than match + StateReplicate, 5, 10, 5, 5, false, 10, + }, + { + // state replicate and rejected is not greater than match + StateReplicate, 5, 10, 4, 4, false, 10, + }, + { + // state replicate and rejected is greater than match + // directly decrease to match+1 + StateReplicate, 5, 10, 9, 9, true, 6, + }, + { + // next-1 != rejected is always false + StateProbe, 0, 0, 0, 0, false, 0, + }, + { + // next-1 != rejected is always false + StateProbe, 0, 10, 5, 5, false, 10, + }, + { + // next>1 = decremented by 1 + StateProbe, 0, 10, 9, 9, true, 9, + }, + { + // next>1 = decremented by 1 + StateProbe, 0, 2, 1, 1, true, 1, + }, + { + // next<=1 = reset to 1 + StateProbe, 0, 1, 0, 0, true, 1, + }, + { + // decrease to min(rejected, last+1) + StateProbe, 0, 10, 9, 2, true, 3, + }, + { + // rejected < 1, reset to 1 + StateProbe, 0, 10, 9, 0, true, 1, + }, + } + for i, tt := range tests { + p := &Progress{ + State: tt.state, + Match: tt.m, + Next: tt.n, + } + if g := p.MaybeDecrTo(tt.rejected, tt.last); g != tt.w { + t.Errorf("#%d: maybeDecrTo= %t, want %t", i, g, tt.w) + } + if gm := p.Match; gm != tt.m { + t.Errorf("#%d: match= %d, want %d", i, gm, tt.m) + } + if gn := p.Next; gn != tt.wn { + t.Errorf("#%d: next= %d, want %d", i, gn, tt.wn) + } + } +} diff --git a/raft/tracker/state.go b/raft/tracker/state.go new file mode 100644 index 000000000..285b4b8f5 --- /dev/null +++ b/raft/tracker/state.go @@ -0,0 +1,42 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracker + +// StateType is the state of a tracked follower. +type StateType uint64 + +const ( + // StateProbe indicates a follower whose last index isn't known. Such a + // follower is "probed" (i.e. an append sent periodically) to narrow down + // its last index. In the ideal (and common) case, only one round of probing + // is necessary as the follower will react with a hint. Followers that are + // probed over extended periods of time are often offline. + StateProbe StateType = iota + // StateReplicate is the state steady in which a follower eagerly receives + // log entries to append to its log. + StateReplicate + // StateSnapshot indicates a follower that needs log entries not available + // from the leader's Raft log. Such a follower needs a full snapshot to + // return to StateReplicate. + StateSnapshot +) + +var prstmap = [...]string{ + "StateProbe", + "StateReplicate", + "StateSnapshot", +} + +func (st StateType) String() string { return prstmap[uint64(st)] } diff --git a/raft/tracker/tracker.go b/raft/tracker/tracker.go new file mode 100644 index 000000000..2d162c6de --- /dev/null +++ b/raft/tracker/tracker.go @@ -0,0 +1,195 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracker + +import ( + "fmt" + "sort" + + "go.etcd.io/etcd/raft/quorum" +) + +// ProgressTracker tracks the currently active configuration and the information +// known about the nodes and learners in it. In particular, it tracks the match +// index for each peer which in turn allows reasoning about the committed index. +type ProgressTracker struct { + Voters quorum.JointConfig + Learners map[uint64]struct{} + + Progress map[uint64]*Progress + + Votes map[uint64]bool + + MaxInflight int +} + +// MakeProgressTracker initializes a ProgressTracker. +func MakeProgressTracker(maxInflight int) ProgressTracker { + p := ProgressTracker{ + MaxInflight: maxInflight, + Voters: quorum.JointConfig{ + quorum.MajorityConfig{}, + quorum.MajorityConfig{}, + }, + Learners: map[uint64]struct{}{}, + Votes: map[uint64]bool{}, + Progress: map[uint64]*Progress{}, + } + return p +} + +// IsSingleton returns true if (and only if) there is only one voting member +// (i.e. the leader) in the current configuration. +func (p *ProgressTracker) IsSingleton() bool { + return len(p.Voters[0]) == 1 && len(p.Voters[1]) == 0 +} + +type matchAckIndexer map[uint64]*Progress + +var _ quorum.AckedIndexer = matchAckIndexer(nil) + +// AckedIndex implements IndexLookuper. +func (l matchAckIndexer) AckedIndex(id uint64) (quorum.Index, bool) { + pr, ok := l[id] + if !ok { + return 0, false + } + return quorum.Index(pr.Match), true +} + +// Committed returns the largest log index known to be committed based on what +// the voting members of the group have acknowledged. +func (p *ProgressTracker) Committed() uint64 { + return uint64(p.Voters.CommittedIndex(matchAckIndexer(p.Progress))) +} + +// RemoveAny removes this peer, which *must* be tracked as a voter or learner, +// from the tracker. +func (p *ProgressTracker) RemoveAny(id uint64) { + _, okPR := p.Progress[id] + _, okV1 := p.Voters[0][id] + _, okV2 := p.Voters[1][id] + _, okL := p.Learners[id] + + okV := okV1 || okV2 + + if !okPR { + panic("attempting to remove unknown peer %x") + } else if !okV && !okL { + panic("attempting to remove unknown peer %x") + } else if okV && okL { + panic(fmt.Sprintf("peer %x is both voter and learner", id)) + } + + delete(p.Voters[0], id) + delete(p.Voters[1], id) + delete(p.Learners, id) + delete(p.Progress, id) +} + +// InitProgress initializes a new progress for the given node or learner. The +// node may not exist yet in either form or a panic will ensue. +func (p *ProgressTracker) InitProgress(id, match, next uint64, isLearner bool) { + if pr := p.Progress[id]; pr != nil { + panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr)) + } + if !isLearner { + p.Voters[0][id] = struct{}{} + } else { + p.Learners[id] = struct{}{} + } + p.Progress[id] = &Progress{Next: next, Match: match, Inflights: NewInflights(p.MaxInflight), IsLearner: isLearner} +} + +// Visit invokes the supplied closure for all tracked progresses. +func (p *ProgressTracker) Visit(f func(id uint64, pr *Progress)) { + for id, pr := range p.Progress { + f(id, pr) + } +} + +// QuorumActive returns true if the quorum is active from the view of the local +// raft state machine. Otherwise, it returns false. +func (p *ProgressTracker) QuorumActive() bool { + votes := map[uint64]bool{} + p.Visit(func(id uint64, pr *Progress) { + if pr.IsLearner { + return + } + votes[id] = pr.RecentActive + }) + + return p.Voters.VoteResult(votes) == quorum.VoteWon +} + +// VoterNodes returns a sorted slice of voters. +func (p *ProgressTracker) VoterNodes() []uint64 { + m := p.Voters.IDs() + nodes := make([]uint64, 0, len(m)) + for id := range m { + nodes = append(nodes, id) + } + sort.Slice(nodes, func(i, j int) bool { return nodes[i] < nodes[j] }) + return nodes +} + +// LearnerNodes returns a sorted slice of learners. +func (p *ProgressTracker) LearnerNodes() []uint64 { + nodes := make([]uint64, 0, len(p.Learners)) + for id := range p.Learners { + nodes = append(nodes, id) + } + sort.Slice(nodes, func(i, j int) bool { return nodes[i] < nodes[j] }) + return nodes +} + +// ResetVotes prepares for a new round of vote counting via recordVote. +func (p *ProgressTracker) ResetVotes() { + p.Votes = map[uint64]bool{} +} + +// RecordVote records that the node with the given id voted for this Raft +// instance if v == true (and declined it otherwise). +func (p *ProgressTracker) RecordVote(id uint64, v bool) { + _, ok := p.Votes[id] + if !ok { + p.Votes[id] = v + } +} + +// TallyVotes returns the number of granted and rejected Votes, and whether the +// election outcome is known. +func (p *ProgressTracker) TallyVotes() (granted int, rejected int, _ quorum.VoteResult) { + // Make sure to populate granted/rejected correctly even if the Votes slice + // contains members no longer part of the configuration. This doesn't really + // matter in the way the numbers are used (they're informational), but might + // as well get it right. + for id, pr := range p.Progress { + if pr.IsLearner { + continue + } + v, voted := p.Votes[id] + if !voted { + continue + } + if v { + granted++ + } else { + rejected++ + } + } + result := p.Voters.VoteResult(p.Votes) + return granted, rejected, result +}