2016-05-13 06:49:15 +03:00
// Copyright 2015 The etcd Authors
2015-01-25 06:19:16 +03:00
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2014-10-18 02:41:22 +04:00
2014-08-20 05:47:10 +04:00
package raft
2014-08-23 00:24:33 +04:00
import (
2016-06-29 13:24:58 +03:00
"bytes"
2014-08-23 00:24:33 +04:00
"errors"
"fmt"
2015-03-18 22:36:29 +03:00
"math"
2014-10-07 16:12:49 +04:00
"math/rand"
2014-08-23 00:24:33 +04:00
"sort"
2014-12-09 01:37:39 +03:00
"strings"
2016-09-05 16:03:18 +03:00
"sync"
"time"
2014-08-28 05:53:18 +04:00
pb "github.com/coreos/etcd/raft/raftpb"
2014-08-23 00:24:33 +04:00
)
2014-10-08 02:22:35 +04:00
// None is a placeholder node ID used when there is no leader.
2014-10-08 14:29:53 +04:00
const None uint64 = 0
2015-03-18 22:36:29 +03:00
const noLimit = math . MaxUint64
2014-08-23 00:24:33 +04:00
2014-10-08 02:22:35 +04:00
// Possible values for StateType.
2014-08-23 00:24:33 +04:00
const (
2014-09-16 04:35:02 +04:00
StateFollower StateType = iota
StateCandidate
StateLeader
2016-10-10 09:32:40 +03:00
StatePreCandidate
2016-10-19 06:07:43 +03:00
numStates
2014-08-23 00:24:33 +04:00
)
2016-08-27 02:03:06 +03:00
type ReadOnlyOption int
const (
// ReadOnlySafe guarantees the linearizability of the read only request by
// communicating with the quorum. It is the default and suggested option.
ReadOnlySafe ReadOnlyOption = iota
// ReadOnlyLeaseBased ensures linearizability of the read only request by
// relying on the leader lease. It can be affected by clock drift.
// If the clock drift is unbounded, leader might keep the lease longer than it
// should (clock can move backward/pause without any bound). ReadIndex is not safe
// in that case.
ReadOnlyLeaseBased
)
2016-06-29 13:24:58 +03:00
// Possible values for CampaignType
const (
2016-10-10 09:32:40 +03:00
// campaignPreElection represents the first phase of a normal election when
// Config.PreVote is true.
campaignPreElection CampaignType = "CampaignPreElection"
// campaignElection represents a normal (time-based) election (the second phase
// of the election when Config.PreVote is true).
2016-07-11 09:59:53 +03:00
campaignElection CampaignType = "CampaignElection"
// campaignTransfer represents the type of leader transfer
campaignTransfer CampaignType = "CampaignTransfer"
2016-06-29 13:24:58 +03:00
)
2018-01-11 07:43:55 +03:00
// ErrProposalDropped is returned when the proposal is ignored by some cases,
// so that the proposer can be notified and fail fast.
var ErrProposalDropped = errors . New ( "raft proposal dropped" )
2016-09-05 16:03:18 +03:00
// lockedRand is a small wrapper around rand.Rand to provide
// synchronization. Only the methods needed by the code are exposed
// (e.g. Intn).
type lockedRand struct {
mu sync . Mutex
rand * rand . Rand
}
func ( r * lockedRand ) Intn ( n int ) int {
r . mu . Lock ( )
v := r . rand . Intn ( n )
r . mu . Unlock ( )
return v
}
var globalRand = & lockedRand {
rand : rand . New ( rand . NewSource ( time . Now ( ) . UnixNano ( ) ) ) ,
}
2016-06-29 13:24:58 +03:00
// CampaignType represents the type of campaigning
// the reason we use the type of string instead of uint64
// is because it's simpler to compare and fill in raft entries
type CampaignType string
2014-10-08 02:22:35 +04:00
// StateType represents the role of a node in a cluster.
2014-10-08 14:29:53 +04:00
type StateType uint64
2014-08-23 00:24:33 +04:00
var stmap = [ ... ] string {
2014-10-08 02:34:32 +04:00
"StateFollower" ,
"StateCandidate" ,
"StateLeader" ,
2016-10-10 09:32:40 +03:00
"StatePreCandidate" ,
2014-08-23 00:24:33 +04:00
}
2014-09-16 04:35:02 +04:00
func ( st StateType ) String ( ) string {
2014-10-08 14:29:53 +04:00
return stmap [ uint64 ( st ) ]
2014-08-23 00:24:33 +04:00
}
2015-03-22 04:15:58 +03:00
// Config contains the parameters to start a raft.
type Config struct {
// ID is the identity of the local raft. ID cannot be 0.
ID uint64
2015-03-24 21:10:07 +03:00
2016-03-01 11:54:58 +03:00
// peers contains the IDs of all nodes (including self) in the raft cluster. It
// should only be set when starting a new raft cluster. Restarting raft from
// previous configuration will panic if peers is set. peer is private and only
// used for testing right now.
2015-03-24 21:10:07 +03:00
peers [ ] uint64
2015-03-22 04:15:58 +03:00
2018-01-03 04:34:12 +03:00
// learners contains the IDs of all learner nodes (including self if the
// local node is a learner) in the raft cluster. learners only receives
// entries from the leader node. It does not vote or promote itself.
2017-11-11 05:38:21 +03:00
learners [ ] uint64
2016-03-01 11:52:14 +03:00
// ElectionTick is the number of Node.Tick invocations that must pass between
// elections. That is, if a follower does not receive any message from the
// leader of current term before ElectionTick has elapsed, it will become
// candidate and start an election. ElectionTick must be greater than
// HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
// unnecessary leader switching.
2015-03-22 04:15:58 +03:00
ElectionTick int
2016-03-01 11:52:14 +03:00
// HeartbeatTick is the number of Node.Tick invocations that must pass between
// heartbeats. That is, a leader sends heartbeat messages to maintain its
// leadership every HeartbeatTick ticks.
2015-03-22 04:15:58 +03:00
HeartbeatTick int
2016-03-01 11:54:58 +03:00
// Storage is the storage for raft. raft generates entries and states to be
// stored in storage. raft reads the persisted entries and states out of
// Storage when it needs. raft reads out the previous state and configuration
// out of storage when restarting.
2015-03-22 04:15:58 +03:00
Storage Storage
// Applied is the last applied index. It should only be set when restarting
2016-03-01 11:54:58 +03:00
// raft. raft will not return entries to the application smaller or equal to
// Applied. If Applied is unset when restarting, raft might return previous
// applied entries. This is a very application dependent configuration.
2015-03-22 04:15:58 +03:00
Applied uint64
2016-03-01 11:54:58 +03:00
// MaxSizePerMsg limits the max size of each append message. Smaller value
// lowers the raft recovery cost(initial probing and message lost during normal
// operation). On the other side, it might affect the throughput during normal
// replication. Note: math.MaxUint64 for unlimited, 0 for at most one entry per
// message.
2015-03-22 04:15:58 +03:00
MaxSizePerMsg uint64
2016-03-01 11:54:58 +03:00
// MaxInflightMsgs limits the max number of in-flight append messages during
// optimistic replication phase. The application transportation layer usually
// has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
// overflowing that sending buffer. TODO (xiangli): feedback to application to
// limit the proposal rate?
2015-03-22 04:15:58 +03:00
MaxInflightMsgs int
2015-08-10 18:04:38 +03:00
2016-03-01 11:54:58 +03:00
// CheckQuorum specifies if the leader should check quorum activity. Leader
// steps down when quorum is not active for an electionTimeout.
2015-11-24 08:59:25 +03:00
CheckQuorum bool
2016-10-10 09:32:40 +03:00
// PreVote enables the Pre-Vote algorithm described in raft thesis section
// 9.6. This prevents disruption when a node that has been partitioned away
// rejoins the cluster.
PreVote bool
2016-08-27 02:03:06 +03:00
// ReadOnlyOption specifies how the read only request is processed.
//
// ReadOnlySafe guarantees the linearizability of the read only request by
// communicating with the quorum. It is the default and suggested option.
//
// ReadOnlyLeaseBased ensures linearizability of the read only request by
// relying on the leader lease. It can be affected by clock drift.
// If the clock drift is unbounded, leader might keep the lease longer than it
// should (clock can move backward/pause without any bound). ReadIndex is not safe
// in that case.
2017-09-17 20:46:12 +03:00
// CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased.
2016-08-27 02:03:06 +03:00
ReadOnlyOption ReadOnlyOption
2016-03-01 11:54:58 +03:00
// Logger is the logger used for raft log. For multinode which can host
// multiple raft group, each raft group can have its own logger
2015-08-10 18:04:38 +03:00
Logger Logger
2017-05-25 07:42:18 +03:00
// DisableProposalForwarding set to true means that followers will drop
// proposals, rather than forwarding them to the leader. One use case for
// this feature would be in a situation where the Raft leader is used to
// compute the data of a proposal, for example, adding a timestamp from a
// hybrid logical clock to data in a monotonically increasing way. Forwarding
// should be disabled to prevent a follower with an innaccurate hybrid
// logical clock from assigning the timestamp and then forwarding the data
// to the leader.
DisableProposalForwarding bool
2015-03-22 04:15:58 +03:00
}
func ( c * Config ) validate ( ) error {
if c . ID == None {
return errors . New ( "cannot use none as id" )
}
if c . HeartbeatTick <= 0 {
return errors . New ( "heartbeat tick must be greater than 0" )
}
if c . ElectionTick <= c . HeartbeatTick {
return errors . New ( "election tick must be greater than heartbeat tick" )
}
if c . Storage == nil {
return errors . New ( "storage cannot be nil" )
}
if c . MaxInflightMsgs <= 0 {
return errors . New ( "max inflight messages must be greater than 0" )
}
2015-08-10 18:04:38 +03:00
if c . Logger == nil {
c . Logger = raftLogger
}
2017-09-17 20:46:12 +03:00
if c . ReadOnlyOption == ReadOnlyLeaseBased && ! c . CheckQuorum {
return errors . New ( "CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased" )
}
2015-03-22 04:15:58 +03:00
return nil
}
2014-08-23 00:24:33 +04:00
type raft struct {
2014-10-08 14:29:53 +04:00
id uint64
2014-08-23 00:24:33 +04:00
2016-01-26 23:18:55 +03:00
Term uint64
Vote uint64
2016-08-27 02:03:06 +03:00
readStates [ ] ReadState
2016-06-03 18:20:10 +03:00
2014-08-23 00:24:33 +04:00
// the log
raftLog * raftLog
2015-03-20 04:06:51 +03:00
maxInflight int
maxMsgSize uint64
prs map [ uint64 ] * Progress
2017-11-11 05:38:21 +03:00
learnerPrs map [ uint64 ] * Progress
2014-08-23 00:24:33 +04:00
2014-09-16 04:35:02 +04:00
state StateType
2014-08-23 00:24:33 +04:00
2017-11-11 05:38:21 +03:00
// isLearner is true if the local raft node is a learner.
isLearner bool
2014-10-08 14:29:53 +04:00
votes map [ uint64 ] bool
2014-08-20 05:47:10 +04:00
2014-08-28 05:53:18 +04:00
msgs [ ] pb . Message
2014-08-23 00:24:33 +04:00
// the leader id
2014-10-08 14:29:53 +04:00
lead uint64
2016-03-24 14:59:41 +03:00
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
2017-12-30 07:11:22 +03:00
// Only one conf change may be pending (in the log, but not yet
// applied) at a time. This is enforced via pendingConfIndex, which
// is set to a value >= the log index of the latest pending
// configuration change (if any). Config changes are only allowed to
// be proposed if the leader's applied index is greater than this
// value.
pendingConfIndex uint64
2014-09-19 23:35:56 +04:00
2016-08-27 02:03:06 +03:00
readOnly * readOnly
2015-11-24 08:59:25 +03:00
// number of ticks since it reached last electionTimeout when it is leader
// or candidate.
// number of ticks since it reached last electionTimeout or received a
// valid message from current leader when it is a follower.
electionElapsed int
// number of ticks since it reached last heartbeatTimeout.
// only leader keeps heartbeatElapsed.
heartbeatElapsed int
checkQuorum bool
2016-10-10 09:32:40 +03:00
preVote bool
2015-11-24 08:59:25 +03:00
2014-09-03 03:59:29 +04:00
heartbeatTimeout int
electionTimeout int
2016-04-01 20:09:09 +03:00
// randomizedElectionTimeout is a random number between
// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
// when raft changes its state to follower or candidate.
randomizedElectionTimeout int
2017-05-25 07:42:18 +03:00
disableProposalForwarding bool
2016-04-01 20:09:09 +03:00
tick func ( )
step stepFunc
2015-08-10 18:04:38 +03:00
logger Logger
2014-08-23 00:24:33 +04:00
}
2015-03-22 04:15:58 +03:00
func newRaft ( c * Config ) * raft {
if err := c . validate ( ) ; err != nil {
panic ( err . Error ( ) )
2014-08-23 00:24:33 +04:00
}
2015-08-10 18:04:38 +03:00
raftlog := newLog ( c . Storage , c . Logger )
2015-03-22 04:15:58 +03:00
hs , cs , err := c . Storage . InitialState ( )
2014-11-20 00:17:50 +03:00
if err != nil {
panic ( err ) // TODO(bdarnell)
}
2015-03-24 21:10:07 +03:00
peers := c . peers
2017-11-11 05:38:21 +03:00
learners := c . learners
if len ( cs . Nodes ) > 0 || len ( cs . Learners ) > 0 {
if len ( peers ) > 0 || len ( learners ) > 0 {
2014-11-22 01:22:20 +03:00
// TODO(bdarnell): the peers argument is always nil except in
// tests; the argument should be removed and these tests should be
// updated to specify their nodes through a snapshot.
2017-11-11 05:38:21 +03:00
panic ( "cannot specify both newRaft(peers, learners) and ConfState.(Nodes, Learners)" )
2014-11-22 01:22:20 +03:00
}
peers = cs . Nodes
2017-11-11 05:38:21 +03:00
learners = cs . Learners
2014-11-22 01:22:20 +03:00
}
2014-09-03 03:59:29 +04:00
r := & raft {
2017-05-25 07:42:18 +03:00
id : c . ID ,
lead : None ,
2017-11-11 05:38:21 +03:00
isLearner : false ,
2017-05-25 07:42:18 +03:00
raftLog : raftlog ,
maxMsgSize : c . MaxSizePerMsg ,
maxInflight : c . MaxInflightMsgs ,
prs : make ( map [ uint64 ] * Progress ) ,
2017-11-11 05:38:21 +03:00
learnerPrs : make ( map [ uint64 ] * Progress ) ,
2017-05-25 07:42:18 +03:00
electionTimeout : c . ElectionTick ,
heartbeatTimeout : c . HeartbeatTick ,
logger : c . Logger ,
checkQuorum : c . CheckQuorum ,
preVote : c . PreVote ,
readOnly : newReadOnly ( c . ReadOnlyOption ) ,
disableProposalForwarding : c . DisableProposalForwarding ,
2014-09-03 03:59:29 +04:00
}
2014-08-23 00:24:33 +04:00
for _ , p := range peers {
2015-03-20 04:06:51 +03:00
r . prs [ p ] = & Progress { Next : 1 , ins : newInflights ( r . maxInflight ) }
2014-08-23 00:24:33 +04:00
}
2017-11-11 05:38:21 +03:00
for _ , p := range learners {
if _ , ok := r . prs [ p ] ; ok {
panic ( fmt . Sprintf ( "node %x is in both learner and peer list" , p ) )
}
r . learnerPrs [ p ] = & Progress { Next : 1 , ins : newInflights ( r . maxInflight ) , IsLearner : true }
if r . id == p {
r . isLearner = true
}
}
2014-11-22 01:22:20 +03:00
if ! isHardStateEqual ( hs , emptyState ) {
r . loadState ( hs )
2014-08-23 00:24:33 +04:00
}
2015-03-22 04:15:58 +03:00
if c . Applied > 0 {
raftlog . appliedTo ( c . Applied )
2015-01-22 19:37:02 +03:00
}
2014-11-27 08:21:13 +03:00
r . becomeFollower ( r . Term , None )
2014-12-09 01:37:39 +03:00
2016-04-08 08:14:56 +03:00
var nodesStrs [ ] string
2014-12-09 01:37:39 +03:00
for _ , n := range r . nodes ( ) {
nodesStrs = append ( nodesStrs , fmt . Sprintf ( "%x" , n ) )
}
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]" ,
2015-01-22 20:03:35 +03:00
r . id , strings . Join ( nodesStrs , "," ) , r . Term , r . raftLog . committed , r . raftLog . applied , r . raftLog . lastIndex ( ) , r . raftLog . lastTerm ( ) )
2014-08-24 04:03:53 +04:00
return r
2014-08-23 00:24:33 +04:00
}
2014-09-15 09:44:59 +04:00
func ( r * raft ) hasLeader ( ) bool { return r . lead != None }
2014-08-23 00:24:33 +04:00
2014-12-09 08:17:04 +03:00
func ( r * raft ) softState ( ) * SoftState { return & SoftState { Lead : r . lead , RaftState : r . state } }
2014-09-16 04:35:02 +04:00
2016-01-26 23:18:55 +03:00
func ( r * raft ) hardState ( ) pb . HardState {
return pb . HardState {
Term : r . Term ,
Vote : r . Vote ,
Commit : r . raftLog . committed ,
}
}
2016-01-06 10:23:35 +03:00
func ( r * raft ) quorum ( ) int { return len ( r . prs ) / 2 + 1 }
2014-12-09 02:24:34 +03:00
func ( r * raft ) nodes ( ) [ ] uint64 {
2018-01-08 18:43:04 +03:00
nodes := make ( [ ] uint64 , 0 , len ( r . prs ) )
2016-01-04 16:20:54 +03:00
for id := range r . prs {
nodes = append ( nodes , id )
2014-12-09 02:24:34 +03:00
}
2018-01-08 18:43:04 +03:00
sort . Sort ( uint64Slice ( nodes ) )
return nodes
}
func ( r * raft ) learnerNodes ( ) [ ] uint64 {
nodes := make ( [ ] uint64 , 0 , len ( r . learnerPrs ) )
2017-11-11 05:38:21 +03:00
for id := range r . learnerPrs {
nodes = append ( nodes , id )
}
2014-12-09 02:24:34 +03:00
sort . Sort ( uint64Slice ( nodes ) )
return nodes
}
2014-08-23 00:24:33 +04:00
// send persists state to stable storage and then sends to its mailbox.
2014-08-28 05:53:18 +04:00
func ( r * raft ) send ( m pb . Message ) {
2014-08-24 04:03:53 +04:00
m . From = r . id
2017-07-20 23:46:05 +03:00
if m . Type == pb . MsgVote || m . Type == pb . MsgVoteResp || m . Type == pb . MsgPreVote || m . Type == pb . MsgPreVoteResp {
2016-10-10 09:32:40 +03:00
if m . Term == 0 {
2017-07-20 23:46:05 +03:00
// All {pre-,}campaign messages need to have the term set when
// sending.
// - MsgVote: m.Term is the term the node is campaigning for,
// non-zero as we increment the term when campaigning.
// - MsgVoteResp: m.Term is the new r.Term if the MsgVote was
// granted, non-zero for the same reason MsgVote is
// - MsgPreVote: m.Term is the term the node will campaign,
// non-zero as we use m.Term to indicate the next term we'll be
// campaigning for
// - MsgPreVoteResp: m.Term is the term received in the original
// MsgPreVote if the pre-vote was granted, non-zero for the
// same reasons MsgPreVote is
2016-10-10 09:32:40 +03:00
panic ( fmt . Sprintf ( "term should be set when sending %s" , m . Type ) )
}
} else {
if m . Term != 0 {
panic ( fmt . Sprintf ( "term should not be set when sending %s (was %d)" , m . Type , m . Term ) )
}
2016-10-29 08:06:58 +03:00
// do not attach term to MsgProp, MsgReadIndex
2016-10-10 09:32:40 +03:00
// proposals are a way to forward to the leader and
// should be treated as local message.
2016-10-29 08:06:58 +03:00
// MsgReadIndex is also forwarded to leader.
if m . Type != pb . MsgProp && m . Type != pb . MsgReadIndex {
2016-10-10 09:32:40 +03:00
m . Term = r . Term
}
2014-10-04 05:45:15 +04:00
}
2014-08-24 04:03:53 +04:00
r . msgs = append ( r . msgs , m )
2014-08-23 00:24:33 +04:00
}
2017-11-11 05:38:21 +03:00
func ( r * raft ) getProgress ( id uint64 ) * Progress {
if pr , ok := r . prs [ id ] ; ok {
return pr
}
return r . learnerPrs [ id ]
}
2016-01-04 06:51:51 +03:00
// sendAppend sends RPC, with entries to the given peer.
2014-10-08 14:29:53 +04:00
func ( r * raft ) sendAppend ( to uint64 ) {
2017-11-11 05:38:21 +03:00
pr := r . getProgress ( to )
2016-12-04 08:14:08 +03:00
if pr . IsPaused ( ) {
2014-12-18 23:02:15 +03:00
return
}
2014-08-28 05:53:18 +04:00
m := pb . Message { }
2014-08-23 00:24:33 +04:00
m . To = to
2015-06-13 08:24:40 +03:00
term , errt := r . raftLog . term ( pr . Next - 1 )
ents , erre := r . raftLog . entries ( pr . Next , r . maxMsgSize )
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
2015-12-10 23:17:30 +03:00
if ! pr . RecentActive {
r . logger . Debugf ( "ignore sending snapshot to %x since it is not recently active" , to )
return
}
2014-10-12 11:34:22 +04:00
m . Type = pb . MsgSnap
2014-11-26 21:59:13 +03:00
snapshot , err := r . raftLog . snapshot ( )
2014-11-20 00:17:50 +03:00
if err != nil {
2015-09-29 10:04:12 +03:00
if err == ErrSnapshotTemporarilyUnavailable {
r . logger . Debugf ( "%x failed to send snapshot to %x because snapshot is temporarily unavailable" , r . id , to )
return
}
2014-11-20 00:17:50 +03:00
panic ( err ) // TODO(bdarnell)
}
2014-11-25 11:37:21 +03:00
if IsEmptySnap ( snapshot ) {
2014-11-20 00:17:50 +03:00
panic ( "need non-empty snapshot" )
}
m . Snapshot = snapshot
2014-12-02 22:45:41 +03:00
sindex , sterm := snapshot . Metadata . Index , snapshot . Metadata . Term
2015-08-19 01:43:49 +03:00
r . logger . Debugf ( "%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]" ,
2016-01-26 23:18:55 +03:00
r . id , r . raftLog . firstIndex ( ) , r . raftLog . committed , sindex , sterm , to , pr )
2015-03-16 10:46:16 +03:00
pr . becomeSnapshot ( sindex )
2015-08-19 01:43:49 +03:00
r . logger . Debugf ( "%x paused sending replication messages to %x [%s]" , r . id , to , pr )
2014-08-23 00:24:33 +04:00
} else {
2014-10-12 11:34:22 +04:00
m . Type = pb . MsgApp
2015-01-20 21:26:22 +03:00
m . Index = pr . Next - 1
2015-06-13 08:24:40 +03:00
m . LogTerm = term
m . Entries = ents
2014-08-24 04:03:53 +04:00
m . Commit = r . raftLog . committed
2015-03-16 10:46:16 +03:00
if n := len ( m . Entries ) ; n != 0 {
switch pr . State {
// optimistically increase the next when in ProgressStateReplicate
case ProgressStateReplicate :
2015-03-20 04:06:51 +03:00
last := m . Entries [ n - 1 ] . Index
pr . optimisticUpdate ( last )
pr . ins . add ( last )
2015-03-16 10:46:16 +03:00
case ProgressStateProbe :
pr . pause ( )
default :
2015-08-10 18:04:38 +03:00
r . logger . Panicf ( "%x is sending append in unhandled state %s" , r . id , pr . State )
2015-03-16 10:46:16 +03:00
}
2014-11-18 22:42:08 +03:00
}
2014-08-23 00:24:33 +04:00
}
2014-08-24 04:03:53 +04:00
r . send ( m )
2014-08-23 00:24:33 +04:00
}
2014-10-30 09:21:38 +03:00
// sendHeartbeat sends an empty MsgApp
2016-08-27 02:03:06 +03:00
func ( r * raft ) sendHeartbeat ( to uint64 , ctx [ ] byte ) {
2014-11-18 02:44:57 +03:00
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
2017-11-11 05:38:21 +03:00
commit := min ( r . getProgress ( to ) . Match , r . raftLog . committed )
2014-08-28 05:53:18 +04:00
m := pb . Message {
2016-08-27 02:03:06 +03:00
To : to ,
Type : pb . MsgHeartbeat ,
Commit : commit ,
Context : ctx ,
2014-08-23 00:24:33 +04:00
}
2016-08-27 02:03:06 +03:00
2014-08-24 04:03:53 +04:00
r . send ( m )
2014-08-23 00:24:33 +04:00
}
2017-11-11 05:38:21 +03:00
func ( r * raft ) forEachProgress ( f func ( id uint64 , pr * Progress ) ) {
for id , pr := range r . prs {
f ( id , pr )
}
for id , pr := range r . learnerPrs {
f ( id , pr )
}
}
2016-01-04 06:51:51 +03:00
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
2014-10-30 09:21:38 +03:00
// according to the progress recorded in r.prs.
2014-08-24 04:03:53 +04:00
func ( r * raft ) bcastAppend ( ) {
2017-11-11 05:38:21 +03:00
r . forEachProgress ( func ( id uint64 , _ * Progress ) {
2016-01-04 16:20:54 +03:00
if id == r . id {
2017-11-11 05:38:21 +03:00
return
2014-08-23 00:24:33 +04:00
}
2017-11-11 05:38:21 +03:00
2016-01-04 16:20:54 +03:00
r . sendAppend ( id )
2017-11-11 05:38:21 +03:00
} )
2014-08-23 00:24:33 +04:00
}
2016-01-04 06:51:51 +03:00
// bcastHeartbeat sends RPC, without entries to all the peers.
2014-08-24 04:03:53 +04:00
func ( r * raft ) bcastHeartbeat ( ) {
2016-08-27 02:03:06 +03:00
lastCtx := r . readOnly . lastPendingRequestCtx ( )
if len ( lastCtx ) == 0 {
r . bcastHeartbeatWithCtx ( nil )
} else {
r . bcastHeartbeatWithCtx ( [ ] byte ( lastCtx ) )
}
}
func ( r * raft ) bcastHeartbeatWithCtx ( ctx [ ] byte ) {
2017-11-11 05:38:21 +03:00
r . forEachProgress ( func ( id uint64 , _ * Progress ) {
2016-01-04 16:20:54 +03:00
if id == r . id {
2017-11-11 05:38:21 +03:00
return
2014-08-23 00:24:33 +04:00
}
2016-08-27 02:03:06 +03:00
r . sendHeartbeat ( id , ctx )
2017-11-11 05:38:21 +03:00
} )
2014-08-23 00:24:33 +04:00
}
2016-01-27 00:55:47 +03:00
// maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
2014-08-24 04:03:53 +04:00
func ( r * raft ) maybeCommit ( ) bool {
2014-08-23 00:24:33 +04:00
// TODO(bmizerany): optimize.. Currently naive
2014-10-09 10:42:29 +04:00
mis := make ( uint64Slice , 0 , len ( r . prs ) )
2017-11-11 05:38:21 +03:00
for _ , p := range r . prs {
mis = append ( mis , p . Match )
2014-08-23 00:24:33 +04:00
}
sort . Sort ( sort . Reverse ( mis ) )
2016-01-06 10:23:35 +03:00
mci := mis [ r . quorum ( ) - 1 ]
2014-08-24 04:03:53 +04:00
return r . raftLog . maybeCommit ( mci , r . Term )
2014-08-20 05:47:10 +04:00
}
2014-10-08 14:29:53 +04:00
func ( r * raft ) reset ( term uint64 ) {
2015-03-08 09:31:09 +03:00
if r . Term != term {
r . Term = term
r . Vote = None
}
2014-09-15 09:44:59 +04:00
r . lead = None
2015-11-24 08:59:25 +03:00
r . electionElapsed = 0
r . heartbeatElapsed = 0
2016-04-01 20:09:09 +03:00
r . resetRandomizedElectionTimeout ( )
2015-11-24 08:59:25 +03:00
2016-05-11 06:03:42 +03:00
r . abortLeaderTransfer ( )
2016-03-24 14:59:41 +03:00
2014-10-08 14:29:53 +04:00
r . votes = make ( map [ uint64 ] bool )
2017-11-11 05:38:21 +03:00
r . forEachProgress ( func ( id uint64 , pr * Progress ) {
* pr = Progress { Next : r . raftLog . lastIndex ( ) + 1 , ins : newInflights ( r . maxInflight ) , IsLearner : pr . IsLearner }
2016-01-04 16:20:54 +03:00
if id == r . id {
2017-11-11 05:38:21 +03:00
pr . Match = r . raftLog . lastIndex ( )
2014-08-23 00:24:33 +04:00
}
2017-11-11 05:38:21 +03:00
} )
2017-12-30 07:11:22 +03:00
r . pendingConfIndex = 0
2016-08-27 02:03:06 +03:00
r . readOnly = newReadOnly ( r . readOnly . option )
2014-08-23 00:24:33 +04:00
}
2014-12-10 09:13:42 +03:00
func ( r * raft ) appendEntry ( es ... pb . Entry ) {
li := r . raftLog . lastIndex ( )
for i := range es {
es [ i ] . Term = r . Term
es [ i ] . Index = li + 1 + uint64 ( i )
}
r . raftLog . append ( es ... )
2017-11-11 05:38:21 +03:00
r . getProgress ( r . id ) . maybeUpdate ( r . raftLog . lastIndex ( ) )
2016-01-27 00:55:47 +03:00
// Regardless of maybeCommit's return, our caller will call bcastAppend.
2014-08-24 04:03:53 +04:00
r . maybeCommit ( )
2014-08-23 00:24:33 +04:00
}
2014-10-30 09:21:38 +03:00
// tickElection is run by followers and candidates after r.electionTimeout.
2014-09-03 03:59:29 +04:00
func ( r * raft ) tickElection ( ) {
2015-11-24 08:59:25 +03:00
r . electionElapsed ++
2016-05-27 12:23:18 +03:00
if r . promotable ( ) && r . pastElectionTimeout ( ) {
2015-11-24 08:59:25 +03:00
r . electionElapsed = 0
2014-10-12 11:34:22 +04:00
r . Step ( pb . Message { From : r . id , Type : pb . MsgHup } )
2014-09-03 03:59:29 +04:00
}
}
2014-10-30 09:21:38 +03:00
// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
2014-09-03 03:59:29 +04:00
func ( r * raft ) tickHeartbeat ( ) {
2015-11-24 08:59:25 +03:00
r . heartbeatElapsed ++
r . electionElapsed ++
if r . electionElapsed >= r . electionTimeout {
r . electionElapsed = 0
if r . checkQuorum {
r . Step ( pb . Message { From : r . id , Type : pb . MsgCheckQuorum } )
}
2016-03-24 14:59:41 +03:00
// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
if r . state == StateLeader && r . leadTransferee != None {
r . abortLeaderTransfer ( )
}
2015-11-24 08:59:25 +03:00
}
if r . state != StateLeader {
return
}
if r . heartbeatElapsed >= r . heartbeatTimeout {
r . heartbeatElapsed = 0
2014-10-12 11:34:22 +04:00
r . Step ( pb . Message { From : r . id , Type : pb . MsgBeat } )
2014-09-03 03:59:29 +04:00
}
}
2014-10-08 14:29:53 +04:00
func ( r * raft ) becomeFollower ( term uint64 , lead uint64 ) {
2014-09-03 21:16:33 +04:00
r . step = stepFollower
2014-08-24 04:03:53 +04:00
r . reset ( term )
2014-09-03 21:16:33 +04:00
r . tick = r . tickElection
2014-08-24 04:40:25 +04:00
r . lead = lead
2014-09-16 04:35:02 +04:00
r . state = StateFollower
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x became follower at term %d" , r . id , r . Term )
2014-08-23 00:24:33 +04:00
}
2014-08-24 04:03:53 +04:00
func ( r * raft ) becomeCandidate ( ) {
2014-08-23 00:24:33 +04:00
// TODO(xiangli) remove the panic when the raft implementation is stable
2014-09-16 04:35:02 +04:00
if r . state == StateLeader {
2014-08-23 00:24:33 +04:00
panic ( "invalid transition [leader -> candidate]" )
}
2014-09-03 21:16:33 +04:00
r . step = stepCandidate
2014-08-24 04:03:53 +04:00
r . reset ( r . Term + 1 )
2014-09-03 21:16:33 +04:00
r . tick = r . tickElection
2014-08-24 04:30:14 +04:00
r . Vote = r . id
2014-09-16 04:35:02 +04:00
r . state = StateCandidate
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x became candidate at term %d" , r . id , r . Term )
2014-08-23 00:24:33 +04:00
}
2016-10-10 09:32:40 +03:00
func ( r * raft ) becomePreCandidate ( ) {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r . state == StateLeader {
panic ( "invalid transition [leader -> pre-candidate]" )
}
// Becoming a pre-candidate changes our step functions and state,
// but doesn't change anything else. In particular it does not increase
// r.Term or change r.Vote.
r . step = stepCandidate
2017-08-01 17:42:09 +03:00
r . votes = make ( map [ uint64 ] bool )
2016-10-10 09:32:40 +03:00
r . tick = r . tickElection
r . state = StatePreCandidate
r . logger . Infof ( "%x became pre-candidate at term %d" , r . id , r . Term )
}
2014-08-24 04:03:53 +04:00
func ( r * raft ) becomeLeader ( ) {
2014-08-23 00:24:33 +04:00
// TODO(xiangli) remove the panic when the raft implementation is stable
2014-09-16 04:35:02 +04:00
if r . state == StateFollower {
2014-08-23 00:24:33 +04:00
panic ( "invalid transition [follower -> leader]" )
}
2014-09-03 21:16:33 +04:00
r . step = stepLeader
2014-08-24 04:03:53 +04:00
r . reset ( r . Term )
2014-09-04 09:00:31 +04:00
r . tick = r . tickHeartbeat
2014-08-24 04:40:25 +04:00
r . lead = r . id
2014-09-16 04:35:02 +04:00
r . state = StateLeader
2015-06-13 08:24:40 +03:00
ents , err := r . raftLog . entries ( r . raftLog . committed + 1 , noLimit )
if err != nil {
2015-08-10 18:04:38 +03:00
r . logger . Panicf ( "unexpected error getting uncommitted entries (%v)" , err )
2015-06-13 08:24:40 +03:00
}
2017-12-30 07:11:22 +03:00
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
if len ( ents ) > 0 {
r . pendingConfIndex = ents [ len ( ents ) - 1 ] . Index
2014-09-19 23:35:56 +04:00
}
2016-07-26 01:06:57 +03:00
2014-09-04 02:16:31 +04:00
r . appendEntry ( pb . Entry { Data : nil } )
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x became leader at term %d" , r . id , r . Term )
2014-08-23 00:24:33 +04:00
}
2016-06-29 13:24:58 +03:00
func ( r * raft ) campaign ( t CampaignType ) {
2016-10-10 09:32:40 +03:00
var term uint64
var voteMsg pb . MessageType
if t == campaignPreElection {
r . becomePreCandidate ( )
voteMsg = pb . MsgPreVote
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term = r . Term + 1
} else {
r . becomeCandidate ( )
voteMsg = pb . MsgVote
term = r . Term
}
if r . quorum ( ) == r . poll ( r . id , voteRespMsgType ( voteMsg ) , true ) {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
r . campaign ( campaignElection )
} else {
r . becomeLeader ( )
}
2014-12-04 01:46:24 +03:00
return
2014-09-03 03:59:29 +04:00
}
2016-01-04 16:20:54 +03:00
for id := range r . prs {
if id == r . id {
2014-09-03 03:59:29 +04:00
continue
}
2016-10-10 09:32:40 +03:00
r . logger . Infof ( "%x [logterm: %d, index: %d] sent %s request to %x at term %d" ,
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , voteMsg , id , r . Term )
2016-07-11 09:59:53 +03:00
2016-07-12 11:14:06 +03:00
var ctx [ ] byte
2016-07-11 09:59:53 +03:00
if t == campaignTransfer {
2016-07-12 11:14:06 +03:00
ctx = [ ] byte ( t )
2016-07-11 09:59:53 +03:00
}
2016-10-10 09:32:40 +03:00
r . send ( pb . Message { Term : term , To : id , Type : voteMsg , Index : r . raftLog . lastIndex ( ) , LogTerm : r . raftLog . lastTerm ( ) , Context : ctx } )
2014-09-03 03:59:29 +04:00
}
}
2016-10-10 09:32:40 +03:00
func ( r * raft ) poll ( id uint64 , t pb . MessageType , v bool ) ( granted int ) {
2014-12-09 02:21:57 +03:00
if v {
2016-10-10 09:32:40 +03:00
r . logger . Infof ( "%x received %s from %x at term %d" , r . id , t , id , r . Term )
2014-12-09 02:21:57 +03:00
} else {
2016-10-10 09:32:40 +03:00
r . logger . Infof ( "%x received %s rejection from %x at term %d" , r . id , t , id , r . Term )
2014-12-09 02:21:57 +03:00
}
if _ , ok := r . votes [ id ] ; ! ok {
r . votes [ id ] = v
}
for _ , vv := range r . votes {
if vv {
granted ++
}
}
return granted
}
2014-08-28 05:53:18 +04:00
func ( r * raft ) Step ( m pb . Message ) error {
2016-10-19 06:07:43 +03:00
// Handle the message term, which may result in our stepping down to a follower.
2014-08-23 00:24:33 +04:00
switch {
case m . Term == 0 :
// local message
2014-08-24 04:03:53 +04:00
case m . Term > r . Term :
2016-10-10 09:32:40 +03:00
if m . Type == pb . MsgVote || m . Type == pb . MsgPreVote {
2016-07-12 11:14:06 +03:00
force := bytes . Equal ( m . Context , [ ] byte ( campaignTransfer ) )
2016-08-29 00:35:19 +03:00
inLease := r . checkQuorum && r . lead != None && r . electionElapsed < r . electionTimeout
2016-06-29 13:24:58 +03:00
if ! force && inLease {
2016-05-27 12:23:18 +03:00
// If a server receives a RequestVote request within the minimum election timeout
// of hearing from a current leader, it does not update its term or grant its vote
2016-10-10 09:32:40 +03:00
r . logger . Infof ( "%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)" ,
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , r . Vote , m . Type , m . From , m . LogTerm , m . Index , r . Term , r . electionTimeout - r . electionElapsed )
2016-05-27 12:23:18 +03:00
return nil
}
2014-08-23 00:24:33 +04:00
}
2016-10-10 09:32:40 +03:00
switch {
case m . Type == pb . MsgPreVote :
2016-10-24 16:29:33 +03:00
// Never change our term in response to a PreVote
2016-10-10 09:32:40 +03:00
case m . Type == pb . MsgPreVoteResp && ! m . Reject :
// We send pre-vote requests with a term in our future. If the
// pre-vote is granted, we will increment our term when we get a
// quorum. If it is not, the term comes from the node that
// rejected our vote so we should become a follower at the new
// term.
default :
r . logger . Infof ( "%x [term: %d] received a %s message with higher term from %x [term: %d]" ,
r . id , r . Term , m . Type , m . From , m . Term )
2017-09-15 02:38:55 +03:00
if m . Type == pb . MsgApp || m . Type == pb . MsgHeartbeat || m . Type == pb . MsgSnap {
r . becomeFollower ( m . Term , m . From )
} else {
r . becomeFollower ( m . Term , None )
}
2016-10-10 09:32:40 +03:00
}
2016-10-19 06:07:43 +03:00
2014-08-24 04:03:53 +04:00
case m . Term < r . Term :
2016-05-27 12:23:18 +03:00
if r . checkQuorum && ( m . Type == pb . MsgHeartbeat || m . Type == pb . MsgApp ) {
2016-10-10 09:32:40 +03:00
// We have received messages from a leader at a lower term. It is possible
// that these messages were simply delayed in the network, but this could
// also mean that this node has advanced its term number during a network
// partition, and it is now unable to either win an election or to rejoin
// the majority on the old term. If checkQuorum is false, this will be
// handled by incrementing term numbers in response to MsgVote with a
// higher term, but if checkQuorum is true we may not advance the term on
// MsgVote and must generate other messages to advance the term. The net
// result of these two features is to minimize the disruption caused by
// nodes that have been removed from the cluster's configuration: a
// removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
// but it will not receive MsgApp or MsgHeartbeat, so it will not create
// disruptive term increases
2016-05-27 12:23:18 +03:00
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp } )
} else {
// ignore other cases
r . logger . Infof ( "%x [term: %d] ignored a %s message with lower term from %x [term: %d]" ,
r . id , r . Term , m . Type , m . From , m . Term )
}
2014-09-04 21:58:22 +04:00
return nil
2014-08-23 00:24:33 +04:00
}
2016-10-19 06:07:43 +03:00
switch m . Type {
case pb . MsgHup :
if r . state != StateLeader {
ents , err := r . raftLog . slice ( r . raftLog . applied + 1 , r . raftLog . committed + 1 , noLimit )
if err != nil {
r . logger . Panicf ( "unexpected error getting unapplied entries (%v)" , err )
}
if n := numOfPendingConf ( ents ) ; n != 0 && r . raftLog . committed > r . raftLog . applied {
r . logger . Warningf ( "%x cannot campaign at term %d since there are still %d pending configuration changes to apply" , r . id , r . Term , n )
return nil
}
r . logger . Infof ( "%x is starting a new election at term %d" , r . id , r . Term )
if r . preVote {
r . campaign ( campaignPreElection )
} else {
r . campaign ( campaignElection )
}
} else {
r . logger . Debugf ( "%x ignoring MsgHup because already leader" , r . id )
}
case pb . MsgVote , pb . MsgPreVote :
2017-11-11 05:38:21 +03:00
if r . isLearner {
// TODO: learner may need to vote, in case of node down when confchange.
r . logger . Infof ( "%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote" ,
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , r . Vote , m . Type , m . From , m . LogTerm , m . Index , r . Term )
return nil
}
2016-10-24 16:29:33 +03:00
// The m.Term > r.Term clause is for MsgPreVote. For MsgVote m.Term should
// always equal r.Term.
2016-10-19 06:07:43 +03:00
if ( r . Vote == None || m . Term > r . Term || r . Vote == m . From ) && r . raftLog . isUpToDate ( m . Index , m . LogTerm ) {
r . logger . Infof ( "%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d" ,
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , r . Vote , m . Type , m . From , m . LogTerm , m . Index , r . Term )
2017-07-20 23:46:05 +03:00
// When responding to Msg{Pre,}Vote messages we include the term
// from the message, not the local term. To see why consider the
// case where a single node was previously partitioned away and
// it's local term is now of date. If we include the local term
// (recall that for pre-votes we don't update the local term), the
// (pre-)campaigning node on the other end will proceed to ignore
// the message (it ignores all out of date messages).
// The term in the original message and current local term are the
// same in the case of regular votes, but different for pre-votes.
r . send ( pb . Message { To : m . From , Term : m . Term , Type : voteRespMsgType ( m . Type ) } )
2016-10-19 06:07:43 +03:00
if m . Type == pb . MsgVote {
// Only record real votes.
r . electionElapsed = 0
r . Vote = m . From
}
} else {
r . logger . Infof ( "%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d" ,
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , r . Vote , m . Type , m . From , m . LogTerm , m . Index , r . Term )
2017-07-20 23:46:05 +03:00
r . send ( pb . Message { To : m . From , Term : r . Term , Type : voteRespMsgType ( m . Type ) , Reject : true } )
2016-10-19 06:07:43 +03:00
}
default :
2018-01-11 07:43:55 +03:00
err := r . step ( r , m )
if err != nil {
return err
}
2016-10-19 06:07:43 +03:00
}
2014-08-23 00:24:33 +04:00
return nil
}
2018-01-11 07:43:55 +03:00
type stepFunc func ( r * raft , m pb . Message ) error
2014-08-23 00:24:33 +04:00
2018-01-11 07:43:55 +03:00
func stepLeader ( r * raft , m pb . Message ) error {
2015-12-04 01:49:46 +03:00
// These message types do not require any progress for m.From.
2014-08-23 00:24:33 +04:00
switch m . Type {
2014-10-12 11:34:22 +04:00
case pb . MsgBeat :
2014-08-24 04:03:53 +04:00
r . bcastHeartbeat ( )
2018-01-11 07:43:55 +03:00
return nil
2015-11-24 08:59:25 +03:00
case pb . MsgCheckQuorum :
if ! r . checkQuorumActive ( ) {
r . logger . Warningf ( "%x stepped down to follower since quorum is not active" , r . id )
r . becomeFollower ( r . Term , None )
}
2018-01-11 07:43:55 +03:00
return nil
2014-10-12 11:34:22 +04:00
case pb . MsgProp :
2014-12-10 09:13:42 +03:00
if len ( m . Entries ) == 0 {
2015-08-10 18:04:38 +03:00
r . logger . Panicf ( "%x stepped empty MsgProp" , r . id )
2014-08-23 00:24:33 +04:00
}
2015-08-29 12:09:56 +03:00
if _ , ok := r . prs [ r . id ] ; ! ok {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
2018-01-11 07:43:55 +03:00
return ErrProposalDropped
2015-08-29 12:09:56 +03:00
}
2016-03-24 14:59:41 +03:00
if r . leadTransferee != None {
r . logger . Debugf ( "%x [term %d] transfer leadership to %x is in progress; dropping proposal" , r . id , r . Term , r . leadTransferee )
2018-01-11 07:43:55 +03:00
return ErrProposalDropped
2016-03-24 14:59:41 +03:00
}
2014-12-10 09:13:42 +03:00
for i , e := range m . Entries {
if e . Type == pb . EntryConfChange {
2017-12-30 07:11:22 +03:00
if r . pendingConfIndex > r . raftLog . applied {
r . logger . Infof ( "propose conf %s ignored since pending unapplied configuration [index %d, applied %d]" ,
e . String ( ) , r . pendingConfIndex , r . raftLog . applied )
2014-12-10 09:13:42 +03:00
m . Entries [ i ] = pb . Entry { Type : pb . EntryNormal }
2017-12-30 07:11:22 +03:00
} else {
r . pendingConfIndex = r . raftLog . lastIndex ( ) + uint64 ( i ) + 1
2014-12-10 09:13:42 +03:00
}
2014-09-19 23:35:56 +04:00
}
}
2014-12-10 09:13:42 +03:00
r . appendEntry ( m . Entries ... )
2014-08-24 04:03:53 +04:00
r . bcastAppend ( )
2018-01-11 07:43:55 +03:00
return nil
2016-06-03 18:20:10 +03:00
case pb . MsgReadIndex :
2016-08-27 02:03:06 +03:00
if r . quorum ( ) > 1 {
2017-02-15 11:45:54 +03:00
if r . raftLog . zeroTermOnErrCompacted ( r . raftLog . term ( r . raftLog . committed ) ) != r . Term {
// Reject read only request when this leader has not committed any log entry at its term.
2018-01-11 07:43:55 +03:00
return nil
2017-02-15 11:45:54 +03:00
}
2016-08-27 02:03:06 +03:00
// thinking: use an interally defined context instead of the user given context.
// We can express this in terms of the term and index instead of a user-supplied value.
// This would allow multiple reads to piggyback on the same message.
switch r . readOnly . option {
case ReadOnlySafe :
r . readOnly . addRequest ( r . raftLog . committed , m )
r . bcastHeartbeatWithCtx ( m . Entries [ 0 ] . Data )
case ReadOnlyLeaseBased :
2017-09-17 20:46:12 +03:00
ri := r . raftLog . committed
2016-08-27 02:03:06 +03:00
if m . From == None || m . From == r . id { // from local member
r . readStates = append ( r . readStates , ReadState { Index : r . raftLog . committed , RequestCtx : m . Entries [ 0 ] . Data } )
} else {
r . send ( pb . Message { To : m . From , Type : pb . MsgReadIndexResp , Index : ri , Entries : m . Entries } )
}
}
2016-07-20 00:19:13 +03:00
} else {
2016-08-27 02:03:06 +03:00
r . readStates = append ( r . readStates , ReadState { Index : r . raftLog . committed , RequestCtx : m . Entries [ 0 ] . Data } )
2016-07-20 00:19:13 +03:00
}
2016-08-27 02:03:06 +03:00
2018-01-11 07:43:55 +03:00
return nil
2015-12-04 01:49:46 +03:00
}
// All other message types require a progress for m.From (pr).
2017-11-11 05:38:21 +03:00
pr := r . getProgress ( m . From )
if pr == nil {
2016-03-24 14:59:41 +03:00
r . logger . Debugf ( "%x no progress available for %x" , r . id , m . From )
2018-01-11 07:43:55 +03:00
return nil
2015-12-04 01:49:46 +03:00
}
switch m . Type {
2014-10-12 11:34:22 +04:00
case pb . MsgAppResp :
2015-12-10 23:17:18 +03:00
pr . RecentActive = true
2015-11-24 08:59:25 +03:00
2014-10-01 23:59:30 +04:00
if m . Reject {
2015-08-10 18:04:38 +03:00
r . logger . Debugf ( "%x received msgApp rejection(lastindex: %d) from %x for index %d" ,
2014-12-31 06:42:59 +03:00
r . id , m . RejectHint , m . From , m . Index )
2015-02-26 02:07:26 +03:00
if pr . maybeDecrTo ( m . Index , m . RejectHint ) {
2015-08-10 18:04:38 +03:00
r . logger . Debugf ( "%x decreased progress of %x to [%s]" , r . id , m . From , pr )
2015-03-16 10:46:16 +03:00
if pr . State == ProgressStateReplicate {
pr . becomeProbe ( )
}
2014-09-30 06:48:49 +04:00
r . sendAppend ( m . From )
}
2014-08-23 00:24:33 +04:00
} else {
2016-12-04 08:14:08 +03:00
oldPaused := pr . IsPaused ( )
2015-03-16 10:46:16 +03:00
if pr . maybeUpdate ( m . Index ) {
switch {
case pr . State == ProgressStateProbe :
pr . becomeReplicate ( )
2016-06-24 17:54:10 +03:00
case pr . State == ProgressStateSnapshot && pr . needSnapshotAbort ( ) :
2015-08-19 01:43:49 +03:00
r . logger . Debugf ( "%x snapshot aborted, resumed sending replication messages to %x [%s]" , r . id , m . From , pr )
2015-03-16 10:46:16 +03:00
pr . becomeProbe ( )
2015-03-20 04:06:51 +03:00
case pr . State == ProgressStateReplicate :
pr . ins . freeTo ( m . Index )
2015-03-16 10:46:16 +03:00
}
if r . maybeCommit ( ) {
r . bcastAppend ( )
} else if oldPaused {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
r . sendAppend ( m . From )
}
2016-03-24 14:59:41 +03:00
// Transfer leadership is in progress.
if m . From == r . leadTransferee && pr . Match == r . raftLog . lastIndex ( ) {
r . logger . Infof ( "%x sent MsgTimeoutNow to %x after received MsgAppResp" , r . id , m . From )
r . sendTimeoutNow ( m . From )
}
2014-08-23 00:24:33 +04:00
}
}
2015-01-14 22:42:30 +03:00
case pb . MsgHeartbeatResp :
2015-12-10 23:17:18 +03:00
pr . RecentActive = true
2016-12-19 23:24:21 +03:00
pr . resume ( )
2015-11-24 08:59:25 +03:00
2015-03-20 04:06:51 +03:00
// free one slot for the full inflights window to allow progress.
if pr . State == ProgressStateReplicate && pr . ins . full ( ) {
pr . ins . freeFirstOne ( )
}
2015-02-26 02:07:26 +03:00
if pr . Match < r . raftLog . lastIndex ( ) {
2015-01-14 22:42:30 +03:00
r . sendAppend ( m . From )
}
2016-08-27 02:03:06 +03:00
if r . readOnly . option != ReadOnlySafe || len ( m . Context ) == 0 {
2018-01-11 07:43:55 +03:00
return nil
2016-08-27 02:03:06 +03:00
}
ackCount := r . readOnly . recvAck ( m )
if ackCount < r . quorum ( ) {
2018-01-11 07:43:55 +03:00
return nil
2016-08-27 02:03:06 +03:00
}
rss := r . readOnly . advance ( m )
for _ , rs := range rss {
req := rs . req
if req . From == None || req . From == r . id { // from local member
r . readStates = append ( r . readStates , ReadState { Index : rs . index , RequestCtx : req . Entries [ 0 ] . Data } )
} else {
r . send ( pb . Message { To : req . From , Type : pb . MsgReadIndexResp , Index : rs . index , Entries : req . Entries } )
}
}
2015-02-25 08:38:18 +03:00
case pb . MsgSnapStatus :
2015-03-16 10:46:16 +03:00
if pr . State != ProgressStateSnapshot {
2018-01-11 07:43:55 +03:00
return nil
2015-02-25 08:38:18 +03:00
}
2015-03-16 10:46:16 +03:00
if ! m . Reject {
pr . becomeProbe ( )
2015-08-19 01:43:49 +03:00
r . logger . Debugf ( "%x snapshot succeeded, resumed sending replication messages to %x [%s]" , r . id , m . From , pr )
2015-03-16 10:46:16 +03:00
} else {
pr . snapshotFailure ( )
pr . becomeProbe ( )
2015-08-19 01:43:49 +03:00
r . logger . Debugf ( "%x snapshot failed, resumed sending replication messages to %x [%s]" , r . id , m . From , pr )
2015-02-25 08:38:18 +03:00
}
2015-03-16 10:46:16 +03:00
// If snapshot finish, wait for the msgAppResp from the remote node before sending
// out the next msgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr . pause ( )
2015-02-05 01:41:14 +03:00
case pb . MsgUnreachable :
2015-03-16 10:46:16 +03:00
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
if pr . State == ProgressStateReplicate {
pr . becomeProbe ( )
2015-03-01 18:01:30 +03:00
}
2015-08-10 18:04:38 +03:00
r . logger . Debugf ( "%x failed to send message to %x because it is unreachable [%s]" , r . id , m . From , pr )
2016-03-24 14:59:41 +03:00
case pb . MsgTransferLeader :
2017-11-11 05:38:21 +03:00
if pr . IsLearner {
r . logger . Debugf ( "%x is learner. Ignored transferring leadership" , r . id )
2018-01-11 07:43:55 +03:00
return nil
2017-11-11 05:38:21 +03:00
}
2016-03-24 14:59:41 +03:00
leadTransferee := m . From
lastLeadTransferee := r . leadTransferee
if lastLeadTransferee != None {
if lastLeadTransferee == leadTransferee {
r . logger . Infof ( "%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x" ,
r . id , r . Term , leadTransferee , leadTransferee )
2018-01-11 07:43:55 +03:00
return nil
2016-03-24 14:59:41 +03:00
}
r . abortLeaderTransfer ( )
2016-05-11 06:03:42 +03:00
r . logger . Infof ( "%x [term %d] abort previous transferring leadership to %x" , r . id , r . Term , lastLeadTransferee )
2016-03-24 14:59:41 +03:00
}
if leadTransferee == r . id {
2016-05-11 06:03:42 +03:00
r . logger . Debugf ( "%x is already leader. Ignored transferring leadership to self" , r . id )
2018-01-11 07:43:55 +03:00
return nil
2016-03-24 14:59:41 +03:00
}
// Transfer leadership to third party.
r . logger . Infof ( "%x [term %d] starts to transfer leadership to %x" , r . id , r . Term , leadTransferee )
// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
r . electionElapsed = 0
r . leadTransferee = leadTransferee
if pr . Match == r . raftLog . lastIndex ( ) {
r . sendTimeoutNow ( leadTransferee )
r . logger . Infof ( "%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log" , r . id , leadTransferee , leadTransferee )
} else {
r . sendAppend ( leadTransferee )
}
2014-08-23 00:24:33 +04:00
}
2018-01-11 07:43:55 +03:00
return nil
2014-08-23 00:24:33 +04:00
}
2016-10-10 09:32:40 +03:00
// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
2018-01-11 07:43:55 +03:00
func stepCandidate ( r * raft , m pb . Message ) error {
2016-10-10 09:32:40 +03:00
// Only handle vote responses corresponding to our candidacy (while in
// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
// our pre-candidate state).
var myVoteRespType pb . MessageType
if r . state == StatePreCandidate {
myVoteRespType = pb . MsgPreVoteResp
} else {
myVoteRespType = pb . MsgVoteResp
}
2014-08-23 00:24:33 +04:00
switch m . Type {
2014-10-12 11:34:22 +04:00
case pb . MsgProp :
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x no leader at term %d; dropping proposal" , r . id , r . Term )
2018-01-11 07:43:55 +03:00
return ErrProposalDropped
2014-10-12 11:34:22 +04:00
case pb . MsgApp :
2014-08-24 04:03:53 +04:00
r . becomeFollower ( r . Term , m . From )
r . handleAppendEntries ( m )
2014-12-04 08:32:12 +03:00
case pb . MsgHeartbeat :
r . becomeFollower ( r . Term , m . From )
r . handleHeartbeat ( m )
2014-10-12 11:34:22 +04:00
case pb . MsgSnap :
2014-08-24 04:03:53 +04:00
r . becomeFollower ( m . Term , m . From )
r . handleSnapshot ( m )
2016-10-10 09:32:40 +03:00
case myVoteRespType :
gr := r . poll ( m . From , m . Type , ! m . Reject )
r . logger . Infof ( "%x [quorum:%d] has received %d %s votes and %d vote rejections" , r . id , r . quorum ( ) , gr , m . Type , len ( r . votes ) - gr )
2016-01-06 10:23:35 +03:00
switch r . quorum ( ) {
2014-08-23 00:24:33 +04:00
case gr :
2016-10-10 09:32:40 +03:00
if r . state == StatePreCandidate {
r . campaign ( campaignElection )
} else {
r . becomeLeader ( )
r . bcastAppend ( )
}
2014-08-24 04:03:53 +04:00
case len ( r . votes ) - gr :
2014-09-15 09:44:59 +04:00
r . becomeFollower ( r . Term , None )
2014-08-23 00:24:33 +04:00
}
2016-03-24 14:59:41 +03:00
case pb . MsgTimeoutNow :
r . logger . Debugf ( "%x [term %d state %v] ignored MsgTimeoutNow from %x" , r . id , r . Term , r . state , m . From )
2014-08-23 00:24:33 +04:00
}
2018-01-11 07:43:55 +03:00
return nil
2014-08-23 00:24:33 +04:00
}
2018-01-11 07:43:55 +03:00
func stepFollower ( r * raft , m pb . Message ) error {
2014-08-23 00:24:33 +04:00
switch m . Type {
2014-10-12 11:34:22 +04:00
case pb . MsgProp :
2014-09-15 09:44:59 +04:00
if r . lead == None {
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x no leader at term %d; dropping proposal" , r . id , r . Term )
2018-01-11 07:43:55 +03:00
return ErrProposalDropped
2017-05-25 07:42:18 +03:00
} else if r . disableProposalForwarding {
r . logger . Infof ( "%x not forwarding to leader %x at term %d; dropping proposal" , r . id , r . lead , r . Term )
2018-01-11 07:43:55 +03:00
return ErrProposalDropped
2014-08-23 00:24:33 +04:00
}
2014-08-24 04:40:25 +04:00
m . To = r . lead
2014-08-24 04:03:53 +04:00
r . send ( m )
2014-10-12 11:34:22 +04:00
case pb . MsgApp :
2015-11-24 08:59:25 +03:00
r . electionElapsed = 0
2014-08-24 04:40:25 +04:00
r . lead = m . From
2014-12-04 08:32:12 +03:00
r . handleAppendEntries ( m )
case pb . MsgHeartbeat :
2015-11-24 08:59:25 +03:00
r . electionElapsed = 0
2014-12-04 08:32:12 +03:00
r . lead = m . From
r . handleHeartbeat ( m )
2014-10-12 11:34:22 +04:00
case pb . MsgSnap :
2015-11-24 08:59:25 +03:00
r . electionElapsed = 0
2016-07-12 02:37:31 +03:00
r . lead = m . From
2014-08-24 04:03:53 +04:00
r . handleSnapshot ( m )
2016-08-11 02:24:29 +03:00
case pb . MsgTransferLeader :
if r . lead == None {
r . logger . Infof ( "%x no leader at term %d; dropping leader transfer msg" , r . id , r . Term )
2018-01-11 07:43:55 +03:00
return nil
2016-08-11 02:24:29 +03:00
}
m . To = r . lead
r . send ( m )
2016-03-24 14:59:41 +03:00
case pb . MsgTimeoutNow :
2016-11-07 15:02:21 +03:00
if r . promotable ( ) {
r . logger . Infof ( "%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership." , r . id , r . Term , m . From )
// Leadership transfers never use pre-vote even if r.preVote is true; we
// know we are not recovering from a partition so there is no need for the
// extra round trip.
r . campaign ( campaignTransfer )
} else {
r . logger . Infof ( "%x received MsgTimeoutNow from %x but is not promotable" , r . id , m . From )
}
2016-06-03 18:20:10 +03:00
case pb . MsgReadIndex :
if r . lead == None {
r . logger . Infof ( "%x no leader at term %d; dropping index reading msg" , r . id , r . Term )
2018-01-11 07:43:55 +03:00
return nil
2016-06-03 18:20:10 +03:00
}
m . To = r . lead
r . send ( m )
case pb . MsgReadIndexResp :
if len ( m . Entries ) != 1 {
r . logger . Errorf ( "%x invalid format of MsgReadIndexResp from %x, entries count: %d" , r . id , m . From , len ( m . Entries ) )
2018-01-11 07:43:55 +03:00
return nil
2016-06-03 18:20:10 +03:00
}
2016-08-27 02:03:06 +03:00
r . readStates = append ( r . readStates , ReadState { Index : m . Index , RequestCtx : m . Entries [ 0 ] . Data } )
2014-08-23 00:24:33 +04:00
}
2018-01-11 07:43:55 +03:00
return nil
2014-08-23 00:24:33 +04:00
}
2014-12-09 02:29:54 +03:00
func ( r * raft ) handleAppendEntries ( m pb . Message ) {
2016-01-26 23:18:55 +03:00
if m . Index < r . raftLog . committed {
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp , Index : r . raftLog . committed } )
2015-03-11 04:33:58 +03:00
return
}
2014-12-09 02:29:54 +03:00
if mlastIndex , ok := r . raftLog . maybeAppend ( m . Index , m . LogTerm , m . Commit , m . Entries ... ) ; ok {
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp , Index : mlastIndex } )
} else {
2015-08-10 18:04:38 +03:00
r . logger . Debugf ( "%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x" ,
r . id , r . raftLog . zeroTermOnErrCompacted ( r . raftLog . term ( m . Index ) ) , m . Index , m . LogTerm , m . Index , m . From )
2014-12-31 06:42:59 +03:00
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp , Index : m . Index , Reject : true , RejectHint : r . raftLog . lastIndex ( ) } )
2014-12-09 02:29:54 +03:00
}
}
func ( r * raft ) handleHeartbeat ( m pb . Message ) {
r . raftLog . commitTo ( m . Commit )
2016-08-27 02:03:06 +03:00
r . send ( pb . Message { To : m . From , Type : pb . MsgHeartbeatResp , Context : m . Context } )
2014-12-09 02:29:54 +03:00
}
func ( r * raft ) handleSnapshot ( m pb . Message ) {
sindex , sterm := m . Snapshot . Metadata . Index , m . Snapshot . Metadata . Term
if r . restore ( m . Snapshot ) {
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x [commit: %d] restored snapshot [index: %d, term: %d]" ,
2016-01-26 23:18:55 +03:00
r . id , r . raftLog . committed , sindex , sterm )
2014-12-09 02:29:54 +03:00
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp , Index : r . raftLog . lastIndex ( ) } )
} else {
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x [commit: %d] ignored snapshot [index: %d, term: %d]" ,
2016-01-26 23:18:55 +03:00
r . id , r . raftLog . committed , sindex , sterm )
2014-12-09 02:29:54 +03:00
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp , Index : r . raftLog . committed } )
}
}
2014-12-31 06:42:59 +03:00
// restore recovers the state machine from a snapshot. It restores the log and the
// configuration of state machine.
2014-08-28 05:53:18 +04:00
func ( r * raft ) restore ( s pb . Snapshot ) bool {
2014-11-20 00:17:50 +03:00
if s . Metadata . Index <= r . raftLog . committed {
2014-08-23 00:24:33 +04:00
return false
}
2014-12-03 08:34:14 +03:00
if r . raftLog . matchTerm ( s . Metadata . Index , s . Metadata . Term ) {
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]" ,
2016-01-26 23:18:55 +03:00
r . id , r . raftLog . committed , r . raftLog . lastIndex ( ) , r . raftLog . lastTerm ( ) , s . Metadata . Index , s . Metadata . Term )
2014-12-03 08:34:14 +03:00
r . raftLog . commitTo ( s . Metadata . Index )
return false
}
2017-11-11 05:38:21 +03:00
// The normal peer can't become learner.
if ! r . isLearner {
for _ , id := range s . Metadata . ConfState . Learners {
if id == r . id {
r . logger . Errorf ( "%x can't become learner when restores snapshot [index: %d, term: %d]" , r . id , s . Metadata . Index , s . Metadata . Term )
return false
}
}
}
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]" ,
2016-01-26 23:18:55 +03:00
r . id , r . raftLog . committed , r . raftLog . lastIndex ( ) , r . raftLog . lastTerm ( ) , s . Metadata . Index , s . Metadata . Term )
2014-08-23 00:24:33 +04:00
2014-08-24 04:03:53 +04:00
r . raftLog . restore ( s )
2015-01-20 21:26:22 +03:00
r . prs = make ( map [ uint64 ] * Progress )
2017-11-11 05:38:21 +03:00
r . learnerPrs = make ( map [ uint64 ] * Progress )
r . restoreNode ( s . Metadata . ConfState . Nodes , false )
r . restoreNode ( s . Metadata . ConfState . Learners , true )
return true
}
func ( r * raft ) restoreNode ( nodes [ ] uint64 , isLearner bool ) {
for _ , n := range nodes {
2016-07-10 08:01:19 +03:00
match , next := uint64 ( 0 ) , r . raftLog . lastIndex ( ) + 1
2014-08-24 04:03:53 +04:00
if n == r . id {
2014-12-02 22:45:41 +03:00
match = next - 1
2017-11-11 05:38:21 +03:00
r . isLearner = isLearner
2014-08-23 00:24:33 +04:00
}
2017-11-11 05:38:21 +03:00
r . setProgress ( n , match , next , isLearner )
r . logger . Infof ( "%x restored progress of %x [%s]" , r . id , n , r . getProgress ( n ) )
2014-08-23 00:24:33 +04:00
}
}
2014-12-09 02:23:21 +03:00
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func ( r * raft ) promotable ( ) bool {
_ , ok := r . prs [ r . id ]
return ok
}
func ( r * raft ) addNode ( id uint64 ) {
2017-11-11 05:38:21 +03:00
r . addNodeOrLearnerNode ( id , false )
}
func ( r * raft ) addLearner ( id uint64 ) {
r . addNodeOrLearnerNode ( id , true )
}
func ( r * raft ) addNodeOrLearnerNode ( id uint64 , isLearner bool ) {
pr := r . getProgress ( id )
if pr == nil {
r . setProgress ( id , 0 , r . raftLog . lastIndex ( ) + 1 , isLearner )
} else {
if isLearner && ! pr . IsLearner {
// can only change Learner to Voter
2018-01-03 04:34:12 +03:00
r . logger . Infof ( "%x ignored addLearner: do not support changing %x from raft peer to learner." , r . id , id )
2017-11-11 05:38:21 +03:00
return
}
if isLearner == pr . IsLearner {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
}
// change Learner to Voter, use origin Learner progress
delete ( r . learnerPrs , id )
pr . IsLearner = false
r . prs [ id ] = pr
}
if r . id == id {
r . isLearner = isLearner
2014-12-09 02:23:21 +03:00
}
2017-04-27 21:19:29 +03:00
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has a chance to communicate with us.
2017-11-11 05:38:21 +03:00
pr = r . getProgress ( id )
pr . RecentActive = true
2014-12-09 02:23:21 +03:00
}
func ( r * raft ) removeNode ( id uint64 ) {
r . delProgress ( id )
2016-05-16 19:58:57 +03:00
// do not try to commit or abort transferring if there is no nodes in the cluster.
2017-11-11 05:38:21 +03:00
if len ( r . prs ) == 0 && len ( r . learnerPrs ) == 0 {
2016-05-16 19:58:57 +03:00
return
}
2016-01-20 18:03:12 +03:00
// The quorum size is now smaller, so see if any pending entries can
// be committed.
2016-01-27 00:55:47 +03:00
if r . maybeCommit ( ) {
r . bcastAppend ( )
}
2016-04-11 09:16:56 +03:00
// If the removed node is the leadTransferee, then abort the leadership transferring.
2016-03-24 14:59:41 +03:00
if r . state == StateLeader && r . leadTransferee == id {
r . abortLeaderTransfer ( )
}
2014-12-09 02:23:21 +03:00
}
2017-11-11 05:38:21 +03:00
func ( r * raft ) setProgress ( id , match , next uint64 , isLearner bool ) {
if ! isLearner {
delete ( r . learnerPrs , id )
r . prs [ id ] = & Progress { Next : next , Match : match , ins : newInflights ( r . maxInflight ) }
return
}
if _ , ok := r . prs [ id ] ; ok {
panic ( fmt . Sprintf ( "%x unexpected changing from voter to learner for %x" , r . id , id ) )
}
r . learnerPrs [ id ] = & Progress { Next : next , Match : match , ins : newInflights ( r . maxInflight ) , IsLearner : true }
2014-08-23 00:24:33 +04:00
}
2014-10-08 14:29:53 +04:00
func ( r * raft ) delProgress ( id uint64 ) {
2014-08-24 04:03:53 +04:00
delete ( r . prs , id )
2017-11-11 05:38:21 +03:00
delete ( r . learnerPrs , id )
2014-08-23 00:24:33 +04:00
}
2014-09-16 04:35:02 +04:00
func ( r * raft ) loadState ( state pb . HardState ) {
2014-12-03 08:58:48 +03:00
if state . Commit < r . raftLog . committed || state . Commit > r . raftLog . lastIndex ( ) {
2015-08-10 18:04:38 +03:00
r . logger . Panicf ( "%x state.commit %d is out of range [%d, %d]" , r . id , state . Commit , r . raftLog . committed , r . raftLog . lastIndex ( ) )
2014-12-03 08:58:48 +03:00
}
2014-08-24 04:03:53 +04:00
r . raftLog . committed = state . Commit
2014-08-24 04:30:14 +04:00
r . Term = state . Term
r . Vote = state . Vote
2014-08-23 00:24:33 +04:00
}
2014-10-07 16:12:49 +04:00
2016-04-06 15:41:46 +03:00
// pastElectionTimeout returns true iff r.electionElapsed is greater
// than or equal to the randomized election timeout in
// [electiontimeout, 2 * electiontimeout - 1].
2016-04-01 20:09:09 +03:00
func ( r * raft ) pastElectionTimeout ( ) bool {
return r . electionElapsed >= r . randomizedElectionTimeout
}
func ( r * raft ) resetRandomizedElectionTimeout ( ) {
2016-09-05 16:03:18 +03:00
r . randomizedElectionTimeout = r . electionTimeout + globalRand . Intn ( r . electionTimeout )
2014-10-07 16:12:49 +04:00
}
2015-11-24 08:59:25 +03:00
// checkQuorumActive returns true if the quorum is active from
// the view of the local raft state machine. Otherwise, it returns
// false.
2015-12-12 02:06:11 +03:00
// checkQuorumActive also resets all RecentActive to false.
2015-11-24 08:59:25 +03:00
func ( r * raft ) checkQuorumActive ( ) bool {
var act int
2017-11-11 05:38:21 +03:00
r . forEachProgress ( func ( id uint64 , pr * Progress ) {
2015-11-24 08:59:25 +03:00
if id == r . id { // self is always active
2016-01-13 04:39:00 +03:00
act ++
2017-11-11 05:38:21 +03:00
return
2015-11-24 08:59:25 +03:00
}
2017-11-11 05:38:21 +03:00
if pr . RecentActive && ! pr . IsLearner {
2016-01-13 04:39:00 +03:00
act ++
2015-11-24 08:59:25 +03:00
}
2017-11-11 05:38:21 +03:00
pr . RecentActive = false
} )
2015-11-24 08:59:25 +03:00
2016-01-06 10:23:35 +03:00
return act >= r . quorum ( )
2015-11-24 08:59:25 +03:00
}
2016-03-24 14:59:41 +03:00
func ( r * raft ) sendTimeoutNow ( to uint64 ) {
r . send ( pb . Message { To : to , Type : pb . MsgTimeoutNow } )
}
func ( r * raft ) abortLeaderTransfer ( ) {
r . leadTransferee = None
}
2016-07-26 01:06:57 +03:00
func numOfPendingConf ( ents [ ] pb . Entry ) int {
n := 0
for i := range ents {
if ents [ i ] . Type == pb . EntryConfChange {
n ++
}
}
return n
}