// Copyright 2015 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package raft import "fmt" const ( ProgressStateProbe ProgressStateType = iota ProgressStateReplicate ProgressStateSnapshot ) type ProgressStateType uint64 var prstmap = [...]string{ "ProgressStateProbe", "ProgressStateReplicate", "ProgressStateSnapshot", } func (st ProgressStateType) String() string { return prstmap[uint64(st)] } // Progress represents a follower’s progress in the view of the leader. Leader maintains // progresses of all followers, and sends entries to the follower based on its progress. type Progress struct { Match, Next uint64 // State defines how the leader should interact with the follower. // // When in ProgressStateProbe, leader sends at most one replication message // per heartbeat interval. It also probes actual progress of the follower. // // When in ProgressStateReplicate, leader optimistically increases next // to the latest entry sent after sending replication message. This is // an optimized state for fast replicating log entries to the follower. // // When in ProgressStateSnapshot, leader should have sent out snapshot // before and stops sending any replication message. State ProgressStateType // Paused is used in ProgressStateProbe. // When Paused is true, raft should pause sending replication message to this peer. Paused bool // PendingSnapshot is used in ProgressStateSnapshot. // If there is a pending snapshot, the pendingSnapshot will be set to the // index of the snapshot. If pendingSnapshot is set, the replication process of // this Progress will be paused. raft will not resend snapshot until the pending one // is reported to be failed. PendingSnapshot uint64 // RecentActive is true if the progress is recently active. Receiving any messages // from the corresponding follower indicates the progress is active. // RecentActive can be reset to false after an election timeout. RecentActive bool // inflights is a sliding window for the inflight messages. // Each inflight message contains one or more log entries. // The max number of entries per message is defined in raft config as MaxSizePerMsg. // Thus inflight effectively limits both the number of inflight messages // and the bandwidth each Progress can use. // When inflights is full, no more message should be sent. // When a leader sends out a message, the index of the last // entry should be added to inflights. The index MUST be added // into inflights in order. // When a leader receives a reply, the previous inflights should // be freed by calling inflights.freeTo with the index of the last // received entry. ins *inflights // IsLearner is true if this progress is tracked for a learner. IsLearner bool } func (pr *Progress) resetState(state ProgressStateType) { pr.Paused = false pr.PendingSnapshot = 0 pr.State = state pr.ins.reset() } func (pr *Progress) becomeProbe() { // If the original state is ProgressStateSnapshot, progress knows that // the pending snapshot has been sent to this peer successfully, then // probes from pendingSnapshot + 1. if pr.State == ProgressStateSnapshot { pendingSnapshot := pr.PendingSnapshot pr.resetState(ProgressStateProbe) pr.Next = max(pr.Match+1, pendingSnapshot+1) } else { pr.resetState(ProgressStateProbe) pr.Next = pr.Match + 1 } } func (pr *Progress) becomeReplicate() { pr.resetState(ProgressStateReplicate) pr.Next = pr.Match + 1 } func (pr *Progress) becomeSnapshot(snapshoti uint64) { pr.resetState(ProgressStateSnapshot) pr.PendingSnapshot = snapshoti } // maybeUpdate returns false if the given n index comes from an outdated message. // Otherwise it updates the progress and returns true. func (pr *Progress) maybeUpdate(n uint64) bool { var updated bool if pr.Match < n { pr.Match = n updated = true pr.resume() } if pr.Next < n+1 { pr.Next = n + 1 } return updated } func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 } // maybeDecrTo returns false if the given to index comes from an out of order message. // Otherwise it decreases the progress next index to min(rejected, last) and returns true. func (pr *Progress) maybeDecrTo(rejected, last uint64) bool { if pr.State == ProgressStateReplicate { // the rejection must be stale if the progress has matched and "rejected" // is smaller than "match". if rejected <= pr.Match { return false } // directly decrease next to match + 1 pr.Next = pr.Match + 1 return true } // the rejection must be stale if "rejected" does not match next - 1 if pr.Next-1 != rejected { return false } if pr.Next = min(rejected, last+1); pr.Next < 1 { pr.Next = 1 } pr.resume() return true } func (pr *Progress) pause() { pr.Paused = true } func (pr *Progress) resume() { pr.Paused = false } // IsPaused returns whether sending log entries to this node has been // paused. A node may be paused because it has rejected recent // MsgApps, is currently waiting for a snapshot, or has reached the // MaxInflightMsgs limit. func (pr *Progress) IsPaused() bool { switch pr.State { case ProgressStateProbe: return pr.Paused case ProgressStateReplicate: return pr.ins.full() case ProgressStateSnapshot: return true default: panic("unexpected state") } } func (pr *Progress) snapshotFailure() { pr.PendingSnapshot = 0 } // needSnapshotAbort returns true if snapshot progress's Match // is equal or higher than the pendingSnapshot. func (pr *Progress) needSnapshotAbort() bool { return pr.State == ProgressStateSnapshot && pr.Match >= pr.PendingSnapshot } func (pr *Progress) String() string { return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d", pr.Next, pr.Match, pr.State, pr.IsPaused(), pr.PendingSnapshot) } type inflights struct { // the starting index in the buffer start int // number of inflights in the buffer count int // the size of the buffer size int // buffer contains the index of the last entry // inside one message. buffer []uint64 } func newInflights(size int) *inflights { return &inflights{ size: size, } } // add adds an inflight into inflights func (in *inflights) add(inflight uint64) { if in.full() { panic("cannot add into a full inflights") } next := in.start + in.count size := in.size if next >= size { next -= size } if next >= len(in.buffer) { in.growBuf() } in.buffer[next] = inflight in.count++ } // grow the inflight buffer by doubling up to inflights.size. We grow on demand // instead of preallocating to inflights.size to handle systems which have // thousands of Raft groups per process. func (in *inflights) growBuf() { newSize := len(in.buffer) * 2 if newSize == 0 { newSize = 1 } else if newSize > in.size { newSize = in.size } newBuffer := make([]uint64, newSize) copy(newBuffer, in.buffer) in.buffer = newBuffer } // freeTo frees the inflights smaller or equal to the given `to` flight. func (in *inflights) freeTo(to uint64) { if in.count == 0 || to < in.buffer[in.start] { // out of the left side of the window return } idx := in.start var i int for i = 0; i < in.count; i++ { if to < in.buffer[idx] { // found the first large inflight break } // increase index and maybe rotate size := in.size if idx++; idx >= size { idx -= size } } // free i inflights and set new start index in.count -= i in.start = idx if in.count == 0 { // inflights is empty, reset the start index so that we don't grow the // buffer unnecessarily. in.start = 0 } } func (in *inflights) freeFirstOne() { in.freeTo(in.buffer[in.start]) } // full returns true if the inflights is full. func (in *inflights) full() bool { return in.count == in.size } // resets frees all inflights. func (in *inflights) reset() { in.count = 0 in.start = 0 }