2016-05-13 06:49:15 +03:00
// Copyright 2015 The etcd Authors
2015-01-25 06:19:16 +03:00
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2014-10-18 02:41:22 +04:00
2014-08-20 05:47:10 +04:00
package raft
2014-08-23 00:24:33 +04:00
import (
2016-06-29 13:24:58 +03:00
"bytes"
2014-08-23 00:24:33 +04:00
"errors"
"fmt"
2015-03-18 22:36:29 +03:00
"math"
2014-10-07 16:12:49 +04:00
"math/rand"
2014-08-23 00:24:33 +04:00
"sort"
2014-12-09 01:37:39 +03:00
"strings"
2016-09-05 16:03:18 +03:00
"sync"
"time"
2014-08-28 05:53:18 +04:00
pb "github.com/coreos/etcd/raft/raftpb"
2014-08-23 00:24:33 +04:00
)
2014-10-08 02:22:35 +04:00
// None is a placeholder node ID used when there is no leader.
2014-10-08 14:29:53 +04:00
const None uint64 = 0
2015-03-18 22:36:29 +03:00
const noLimit = math . MaxUint64
2014-08-23 00:24:33 +04:00
2014-10-08 02:22:35 +04:00
// Possible values for StateType.
2014-08-23 00:24:33 +04:00
const (
2014-09-16 04:35:02 +04:00
StateFollower StateType = iota
StateCandidate
StateLeader
2014-08-23 00:24:33 +04:00
)
2016-08-27 02:03:06 +03:00
type ReadOnlyOption int
const (
// ReadOnlySafe guarantees the linearizability of the read only request by
// communicating with the quorum. It is the default and suggested option.
ReadOnlySafe ReadOnlyOption = iota
// ReadOnlyLeaseBased ensures linearizability of the read only request by
// relying on the leader lease. It can be affected by clock drift.
// If the clock drift is unbounded, leader might keep the lease longer than it
// should (clock can move backward/pause without any bound). ReadIndex is not safe
// in that case.
ReadOnlyLeaseBased
)
2016-06-29 13:24:58 +03:00
// Possible values for CampaignType
const (
2016-07-11 09:59:53 +03:00
// campaignElection represents the type of normal election
campaignElection CampaignType = "CampaignElection"
// campaignTransfer represents the type of leader transfer
campaignTransfer CampaignType = "CampaignTransfer"
2016-06-29 13:24:58 +03:00
)
2016-09-05 16:03:18 +03:00
// lockedRand is a small wrapper around rand.Rand to provide
// synchronization. Only the methods needed by the code are exposed
// (e.g. Intn).
type lockedRand struct {
mu sync . Mutex
rand * rand . Rand
}
func ( r * lockedRand ) Intn ( n int ) int {
r . mu . Lock ( )
v := r . rand . Intn ( n )
r . mu . Unlock ( )
return v
}
var globalRand = & lockedRand {
rand : rand . New ( rand . NewSource ( time . Now ( ) . UnixNano ( ) ) ) ,
}
2016-06-29 13:24:58 +03:00
// CampaignType represents the type of campaigning
// the reason we use the type of string instead of uint64
// is because it's simpler to compare and fill in raft entries
type CampaignType string
2014-10-08 02:22:35 +04:00
// StateType represents the role of a node in a cluster.
2014-10-08 14:29:53 +04:00
type StateType uint64
2014-08-23 00:24:33 +04:00
var stmap = [ ... ] string {
2014-10-08 02:34:32 +04:00
"StateFollower" ,
"StateCandidate" ,
"StateLeader" ,
2014-08-23 00:24:33 +04:00
}
2014-09-16 04:35:02 +04:00
func ( st StateType ) String ( ) string {
2014-10-08 14:29:53 +04:00
return stmap [ uint64 ( st ) ]
2014-08-23 00:24:33 +04:00
}
2015-03-22 04:15:58 +03:00
// Config contains the parameters to start a raft.
type Config struct {
// ID is the identity of the local raft. ID cannot be 0.
ID uint64
2015-03-24 21:10:07 +03:00
2016-03-01 11:54:58 +03:00
// peers contains the IDs of all nodes (including self) in the raft cluster. It
// should only be set when starting a new raft cluster. Restarting raft from
// previous configuration will panic if peers is set. peer is private and only
// used for testing right now.
2015-03-24 21:10:07 +03:00
peers [ ] uint64
2015-03-22 04:15:58 +03:00
2016-03-01 11:52:14 +03:00
// ElectionTick is the number of Node.Tick invocations that must pass between
// elections. That is, if a follower does not receive any message from the
// leader of current term before ElectionTick has elapsed, it will become
// candidate and start an election. ElectionTick must be greater than
// HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
// unnecessary leader switching.
2015-03-22 04:15:58 +03:00
ElectionTick int
2016-03-01 11:52:14 +03:00
// HeartbeatTick is the number of Node.Tick invocations that must pass between
// heartbeats. That is, a leader sends heartbeat messages to maintain its
// leadership every HeartbeatTick ticks.
2015-03-22 04:15:58 +03:00
HeartbeatTick int
2016-03-01 11:54:58 +03:00
// Storage is the storage for raft. raft generates entries and states to be
// stored in storage. raft reads the persisted entries and states out of
// Storage when it needs. raft reads out the previous state and configuration
// out of storage when restarting.
2015-03-22 04:15:58 +03:00
Storage Storage
// Applied is the last applied index. It should only be set when restarting
2016-03-01 11:54:58 +03:00
// raft. raft will not return entries to the application smaller or equal to
// Applied. If Applied is unset when restarting, raft might return previous
// applied entries. This is a very application dependent configuration.
2015-03-22 04:15:58 +03:00
Applied uint64
2016-03-01 11:54:58 +03:00
// MaxSizePerMsg limits the max size of each append message. Smaller value
// lowers the raft recovery cost(initial probing and message lost during normal
// operation). On the other side, it might affect the throughput during normal
// replication. Note: math.MaxUint64 for unlimited, 0 for at most one entry per
// message.
2015-03-22 04:15:58 +03:00
MaxSizePerMsg uint64
2016-03-01 11:54:58 +03:00
// MaxInflightMsgs limits the max number of in-flight append messages during
// optimistic replication phase. The application transportation layer usually
// has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
// overflowing that sending buffer. TODO (xiangli): feedback to application to
// limit the proposal rate?
2015-03-22 04:15:58 +03:00
MaxInflightMsgs int
2015-08-10 18:04:38 +03:00
2016-03-01 11:54:58 +03:00
// CheckQuorum specifies if the leader should check quorum activity. Leader
// steps down when quorum is not active for an electionTimeout.
2015-11-24 08:59:25 +03:00
CheckQuorum bool
2016-08-27 02:03:06 +03:00
// ReadOnlyOption specifies how the read only request is processed.
//
// ReadOnlySafe guarantees the linearizability of the read only request by
// communicating with the quorum. It is the default and suggested option.
//
// ReadOnlyLeaseBased ensures linearizability of the read only request by
// relying on the leader lease. It can be affected by clock drift.
// If the clock drift is unbounded, leader might keep the lease longer than it
// should (clock can move backward/pause without any bound). ReadIndex is not safe
// in that case.
ReadOnlyOption ReadOnlyOption
2016-03-01 11:54:58 +03:00
// Logger is the logger used for raft log. For multinode which can host
// multiple raft group, each raft group can have its own logger
2015-08-10 18:04:38 +03:00
Logger Logger
2015-03-22 04:15:58 +03:00
}
func ( c * Config ) validate ( ) error {
if c . ID == None {
return errors . New ( "cannot use none as id" )
}
if c . HeartbeatTick <= 0 {
return errors . New ( "heartbeat tick must be greater than 0" )
}
if c . ElectionTick <= c . HeartbeatTick {
return errors . New ( "election tick must be greater than heartbeat tick" )
}
if c . Storage == nil {
return errors . New ( "storage cannot be nil" )
}
if c . MaxInflightMsgs <= 0 {
return errors . New ( "max inflight messages must be greater than 0" )
}
2015-08-10 18:04:38 +03:00
if c . Logger == nil {
c . Logger = raftLogger
}
2015-03-22 04:15:58 +03:00
return nil
}
2014-08-23 00:24:33 +04:00
type raft struct {
2014-10-08 14:29:53 +04:00
id uint64
2014-08-23 00:24:33 +04:00
2016-01-26 23:18:55 +03:00
Term uint64
Vote uint64
2016-08-27 02:03:06 +03:00
readStates [ ] ReadState
2016-06-03 18:20:10 +03:00
2014-08-23 00:24:33 +04:00
// the log
raftLog * raftLog
2015-03-20 04:06:51 +03:00
maxInflight int
maxMsgSize uint64
prs map [ uint64 ] * Progress
2014-08-23 00:24:33 +04:00
2014-09-16 04:35:02 +04:00
state StateType
2014-08-23 00:24:33 +04:00
2014-10-08 14:29:53 +04:00
votes map [ uint64 ] bool
2014-08-20 05:47:10 +04:00
2014-08-28 05:53:18 +04:00
msgs [ ] pb . Message
2014-08-23 00:24:33 +04:00
// the leader id
2014-10-08 14:29:53 +04:00
lead uint64
2016-03-24 14:59:41 +03:00
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
2014-09-21 06:32:21 +04:00
// New configuration is ignored if there exists unapplied configuration.
2014-09-19 23:35:56 +04:00
pendingConf bool
2016-08-27 02:03:06 +03:00
readOnly * readOnly
2015-11-24 08:59:25 +03:00
// number of ticks since it reached last electionTimeout when it is leader
// or candidate.
// number of ticks since it reached last electionTimeout or received a
// valid message from current leader when it is a follower.
electionElapsed int
// number of ticks since it reached last heartbeatTimeout.
// only leader keeps heartbeatElapsed.
heartbeatElapsed int
checkQuorum bool
2014-09-03 03:59:29 +04:00
heartbeatTimeout int
electionTimeout int
2016-04-01 20:09:09 +03:00
// randomizedElectionTimeout is a random number between
// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
// when raft changes its state to follower or candidate.
randomizedElectionTimeout int
tick func ( )
step stepFunc
2015-08-10 18:04:38 +03:00
logger Logger
2014-08-23 00:24:33 +04:00
}
2015-03-22 04:15:58 +03:00
func newRaft ( c * Config ) * raft {
if err := c . validate ( ) ; err != nil {
panic ( err . Error ( ) )
2014-08-23 00:24:33 +04:00
}
2015-08-10 18:04:38 +03:00
raftlog := newLog ( c . Storage , c . Logger )
2015-03-22 04:15:58 +03:00
hs , cs , err := c . Storage . InitialState ( )
2014-11-20 00:17:50 +03:00
if err != nil {
panic ( err ) // TODO(bdarnell)
}
2015-03-24 21:10:07 +03:00
peers := c . peers
2014-11-22 01:22:20 +03:00
if len ( cs . Nodes ) > 0 {
if len ( peers ) > 0 {
// TODO(bdarnell): the peers argument is always nil except in
// tests; the argument should be removed and these tests should be
// updated to specify their nodes through a snapshot.
panic ( "cannot specify both newRaft(peers) and ConfState.Nodes)" )
}
peers = cs . Nodes
}
2014-09-03 03:59:29 +04:00
r := & raft {
2015-09-29 07:31:12 +03:00
id : c . ID ,
lead : None ,
raftLog : raftlog ,
2015-03-22 04:15:58 +03:00
maxMsgSize : c . MaxSizePerMsg ,
maxInflight : c . MaxInflightMsgs ,
2015-01-20 21:26:22 +03:00
prs : make ( map [ uint64 ] * Progress ) ,
2015-03-22 04:15:58 +03:00
electionTimeout : c . ElectionTick ,
heartbeatTimeout : c . HeartbeatTick ,
2015-08-10 18:04:38 +03:00
logger : c . Logger ,
2015-11-24 08:59:25 +03:00
checkQuorum : c . CheckQuorum ,
2016-08-27 02:03:06 +03:00
readOnly : newReadOnly ( c . ReadOnlyOption ) ,
2014-09-03 03:59:29 +04:00
}
2014-08-23 00:24:33 +04:00
for _ , p := range peers {
2015-03-20 04:06:51 +03:00
r . prs [ p ] = & Progress { Next : 1 , ins : newInflights ( r . maxInflight ) }
2014-08-23 00:24:33 +04:00
}
2014-11-22 01:22:20 +03:00
if ! isHardStateEqual ( hs , emptyState ) {
r . loadState ( hs )
2014-08-23 00:24:33 +04:00
}
2015-03-22 04:15:58 +03:00
if c . Applied > 0 {
raftlog . appliedTo ( c . Applied )
2015-01-22 19:37:02 +03:00
}
2014-11-27 08:21:13 +03:00
r . becomeFollower ( r . Term , None )
2014-12-09 01:37:39 +03:00
2016-04-08 08:14:56 +03:00
var nodesStrs [ ] string
2014-12-09 01:37:39 +03:00
for _ , n := range r . nodes ( ) {
nodesStrs = append ( nodesStrs , fmt . Sprintf ( "%x" , n ) )
}
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]" ,
2015-01-22 20:03:35 +03:00
r . id , strings . Join ( nodesStrs , "," ) , r . Term , r . raftLog . committed , r . raftLog . applied , r . raftLog . lastIndex ( ) , r . raftLog . lastTerm ( ) )
2014-08-24 04:03:53 +04:00
return r
2014-08-23 00:24:33 +04:00
}
2014-09-15 09:44:59 +04:00
func ( r * raft ) hasLeader ( ) bool { return r . lead != None }
2014-08-23 00:24:33 +04:00
2014-12-09 08:17:04 +03:00
func ( r * raft ) softState ( ) * SoftState { return & SoftState { Lead : r . lead , RaftState : r . state } }
2014-09-16 04:35:02 +04:00
2016-01-26 23:18:55 +03:00
func ( r * raft ) hardState ( ) pb . HardState {
return pb . HardState {
Term : r . Term ,
Vote : r . Vote ,
Commit : r . raftLog . committed ,
}
}
2016-01-06 10:23:35 +03:00
func ( r * raft ) quorum ( ) int { return len ( r . prs ) / 2 + 1 }
2014-12-09 02:24:34 +03:00
func ( r * raft ) nodes ( ) [ ] uint64 {
nodes := make ( [ ] uint64 , 0 , len ( r . prs ) )
2016-01-04 16:20:54 +03:00
for id := range r . prs {
nodes = append ( nodes , id )
2014-12-09 02:24:34 +03:00
}
sort . Sort ( uint64Slice ( nodes ) )
return nodes
}
2014-08-23 00:24:33 +04:00
// send persists state to stable storage and then sends to its mailbox.
2014-08-28 05:53:18 +04:00
func ( r * raft ) send ( m pb . Message ) {
2014-08-24 04:03:53 +04:00
m . From = r . id
2014-10-30 09:21:38 +03:00
// do not attach term to MsgProp
2014-10-04 05:45:15 +04:00
// proposals are a way to forward to the leader and
// should be treated as local message.
2014-10-12 11:34:22 +04:00
if m . Type != pb . MsgProp {
2014-10-04 05:45:15 +04:00
m . Term = r . Term
}
2014-08-24 04:03:53 +04:00
r . msgs = append ( r . msgs , m )
2014-08-23 00:24:33 +04:00
}
2016-01-04 06:51:51 +03:00
// sendAppend sends RPC, with entries to the given peer.
2014-10-08 14:29:53 +04:00
func ( r * raft ) sendAppend ( to uint64 ) {
2014-08-24 04:03:53 +04:00
pr := r . prs [ to ]
2015-03-16 10:46:16 +03:00
if pr . isPaused ( ) {
2014-12-18 23:02:15 +03:00
return
}
2014-08-28 05:53:18 +04:00
m := pb . Message { }
2014-08-23 00:24:33 +04:00
m . To = to
2015-06-13 08:24:40 +03:00
term , errt := r . raftLog . term ( pr . Next - 1 )
ents , erre := r . raftLog . entries ( pr . Next , r . maxMsgSize )
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
2015-12-10 23:17:30 +03:00
if ! pr . RecentActive {
r . logger . Debugf ( "ignore sending snapshot to %x since it is not recently active" , to )
return
}
2014-10-12 11:34:22 +04:00
m . Type = pb . MsgSnap
2014-11-26 21:59:13 +03:00
snapshot , err := r . raftLog . snapshot ( )
2014-11-20 00:17:50 +03:00
if err != nil {
2015-09-29 10:04:12 +03:00
if err == ErrSnapshotTemporarilyUnavailable {
r . logger . Debugf ( "%x failed to send snapshot to %x because snapshot is temporarily unavailable" , r . id , to )
return
}
2014-11-20 00:17:50 +03:00
panic ( err ) // TODO(bdarnell)
}
2014-11-25 11:37:21 +03:00
if IsEmptySnap ( snapshot ) {
2014-11-20 00:17:50 +03:00
panic ( "need non-empty snapshot" )
}
m . Snapshot = snapshot
2014-12-02 22:45:41 +03:00
sindex , sterm := snapshot . Metadata . Index , snapshot . Metadata . Term
2015-08-19 01:43:49 +03:00
r . logger . Debugf ( "%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]" ,
2016-01-26 23:18:55 +03:00
r . id , r . raftLog . firstIndex ( ) , r . raftLog . committed , sindex , sterm , to , pr )
2015-03-16 10:46:16 +03:00
pr . becomeSnapshot ( sindex )
2015-08-19 01:43:49 +03:00
r . logger . Debugf ( "%x paused sending replication messages to %x [%s]" , r . id , to , pr )
2014-08-23 00:24:33 +04:00
} else {
2014-10-12 11:34:22 +04:00
m . Type = pb . MsgApp
2015-01-20 21:26:22 +03:00
m . Index = pr . Next - 1
2015-06-13 08:24:40 +03:00
m . LogTerm = term
m . Entries = ents
2014-08-24 04:03:53 +04:00
m . Commit = r . raftLog . committed
2015-03-16 10:46:16 +03:00
if n := len ( m . Entries ) ; n != 0 {
switch pr . State {
// optimistically increase the next when in ProgressStateReplicate
case ProgressStateReplicate :
2015-03-20 04:06:51 +03:00
last := m . Entries [ n - 1 ] . Index
pr . optimisticUpdate ( last )
pr . ins . add ( last )
2015-03-16 10:46:16 +03:00
case ProgressStateProbe :
pr . pause ( )
default :
2015-08-10 18:04:38 +03:00
r . logger . Panicf ( "%x is sending append in unhandled state %s" , r . id , pr . State )
2015-03-16 10:46:16 +03:00
}
2014-11-18 22:42:08 +03:00
}
2014-08-23 00:24:33 +04:00
}
2014-08-24 04:03:53 +04:00
r . send ( m )
2014-08-23 00:24:33 +04:00
}
2014-10-30 09:21:38 +03:00
// sendHeartbeat sends an empty MsgApp
2016-08-27 02:03:06 +03:00
func ( r * raft ) sendHeartbeat ( to uint64 , ctx [ ] byte ) {
2014-11-18 02:44:57 +03:00
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
2015-01-20 21:26:22 +03:00
commit := min ( r . prs [ to ] . Match , r . raftLog . committed )
2014-08-28 05:53:18 +04:00
m := pb . Message {
2016-08-27 02:03:06 +03:00
To : to ,
Type : pb . MsgHeartbeat ,
Commit : commit ,
Context : ctx ,
2014-08-23 00:24:33 +04:00
}
2016-08-27 02:03:06 +03:00
2014-08-24 04:03:53 +04:00
r . send ( m )
2014-08-23 00:24:33 +04:00
}
2016-01-04 06:51:51 +03:00
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
2014-10-30 09:21:38 +03:00
// according to the progress recorded in r.prs.
2014-08-24 04:03:53 +04:00
func ( r * raft ) bcastAppend ( ) {
2016-01-04 16:20:54 +03:00
for id := range r . prs {
if id == r . id {
2014-08-23 00:24:33 +04:00
continue
}
2016-01-04 16:20:54 +03:00
r . sendAppend ( id )
2014-08-23 00:24:33 +04:00
}
}
2016-01-04 06:51:51 +03:00
// bcastHeartbeat sends RPC, without entries to all the peers.
2014-08-24 04:03:53 +04:00
func ( r * raft ) bcastHeartbeat ( ) {
2016-08-27 02:03:06 +03:00
lastCtx := r . readOnly . lastPendingRequestCtx ( )
if len ( lastCtx ) == 0 {
r . bcastHeartbeatWithCtx ( nil )
} else {
r . bcastHeartbeatWithCtx ( [ ] byte ( lastCtx ) )
}
}
func ( r * raft ) bcastHeartbeatWithCtx ( ctx [ ] byte ) {
2016-01-04 16:20:54 +03:00
for id := range r . prs {
if id == r . id {
2014-08-23 00:24:33 +04:00
continue
}
2016-08-27 02:03:06 +03:00
r . sendHeartbeat ( id , ctx )
2016-01-04 16:20:54 +03:00
r . prs [ id ] . resume ( )
2014-08-23 00:24:33 +04:00
}
}
2016-01-27 00:55:47 +03:00
// maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
2014-08-24 04:03:53 +04:00
func ( r * raft ) maybeCommit ( ) bool {
2014-08-23 00:24:33 +04:00
// TODO(bmizerany): optimize.. Currently naive
2014-10-09 10:42:29 +04:00
mis := make ( uint64Slice , 0 , len ( r . prs ) )
2016-01-04 16:20:54 +03:00
for id := range r . prs {
mis = append ( mis , r . prs [ id ] . Match )
2014-08-23 00:24:33 +04:00
}
sort . Sort ( sort . Reverse ( mis ) )
2016-01-06 10:23:35 +03:00
mci := mis [ r . quorum ( ) - 1 ]
2014-08-24 04:03:53 +04:00
return r . raftLog . maybeCommit ( mci , r . Term )
2014-08-20 05:47:10 +04:00
}
2014-10-08 14:29:53 +04:00
func ( r * raft ) reset ( term uint64 ) {
2015-03-08 09:31:09 +03:00
if r . Term != term {
r . Term = term
r . Vote = None
}
2014-09-15 09:44:59 +04:00
r . lead = None
2015-11-24 08:59:25 +03:00
r . electionElapsed = 0
r . heartbeatElapsed = 0
2016-04-01 20:09:09 +03:00
r . resetRandomizedElectionTimeout ( )
2015-11-24 08:59:25 +03:00
2016-05-11 06:03:42 +03:00
r . abortLeaderTransfer ( )
2016-03-24 14:59:41 +03:00
2014-10-08 14:29:53 +04:00
r . votes = make ( map [ uint64 ] bool )
2016-01-04 16:20:54 +03:00
for id := range r . prs {
r . prs [ id ] = & Progress { Next : r . raftLog . lastIndex ( ) + 1 , ins : newInflights ( r . maxInflight ) }
if id == r . id {
r . prs [ id ] . Match = r . raftLog . lastIndex ( )
2014-08-23 00:24:33 +04:00
}
}
2014-09-19 23:35:56 +04:00
r . pendingConf = false
2016-08-27 02:03:06 +03:00
r . readOnly = newReadOnly ( r . readOnly . option )
2014-08-23 00:24:33 +04:00
}
2014-12-10 09:13:42 +03:00
func ( r * raft ) appendEntry ( es ... pb . Entry ) {
li := r . raftLog . lastIndex ( )
for i := range es {
es [ i ] . Term = r . Term
es [ i ] . Index = li + 1 + uint64 ( i )
}
r . raftLog . append ( es ... )
2015-03-16 10:46:16 +03:00
r . prs [ r . id ] . maybeUpdate ( r . raftLog . lastIndex ( ) )
2016-01-27 00:55:47 +03:00
// Regardless of maybeCommit's return, our caller will call bcastAppend.
2014-08-24 04:03:53 +04:00
r . maybeCommit ( )
2014-08-23 00:24:33 +04:00
}
2014-10-30 09:21:38 +03:00
// tickElection is run by followers and candidates after r.electionTimeout.
2014-09-03 03:59:29 +04:00
func ( r * raft ) tickElection ( ) {
2015-11-24 08:59:25 +03:00
r . electionElapsed ++
2016-05-27 12:23:18 +03:00
if r . promotable ( ) && r . pastElectionTimeout ( ) {
2015-11-24 08:59:25 +03:00
r . electionElapsed = 0
2014-10-12 11:34:22 +04:00
r . Step ( pb . Message { From : r . id , Type : pb . MsgHup } )
2014-09-03 03:59:29 +04:00
}
}
2014-10-30 09:21:38 +03:00
// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
2014-09-03 03:59:29 +04:00
func ( r * raft ) tickHeartbeat ( ) {
2015-11-24 08:59:25 +03:00
r . heartbeatElapsed ++
r . electionElapsed ++
if r . electionElapsed >= r . electionTimeout {
r . electionElapsed = 0
if r . checkQuorum {
r . Step ( pb . Message { From : r . id , Type : pb . MsgCheckQuorum } )
}
2016-03-24 14:59:41 +03:00
// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
if r . state == StateLeader && r . leadTransferee != None {
r . abortLeaderTransfer ( )
}
2015-11-24 08:59:25 +03:00
}
if r . state != StateLeader {
return
}
if r . heartbeatElapsed >= r . heartbeatTimeout {
r . heartbeatElapsed = 0
2014-10-12 11:34:22 +04:00
r . Step ( pb . Message { From : r . id , Type : pb . MsgBeat } )
2014-09-03 03:59:29 +04:00
}
}
2014-10-08 14:29:53 +04:00
func ( r * raft ) becomeFollower ( term uint64 , lead uint64 ) {
2014-09-03 21:16:33 +04:00
r . step = stepFollower
2014-08-24 04:03:53 +04:00
r . reset ( term )
2014-09-03 21:16:33 +04:00
r . tick = r . tickElection
2014-08-24 04:40:25 +04:00
r . lead = lead
2014-09-16 04:35:02 +04:00
r . state = StateFollower
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x became follower at term %d" , r . id , r . Term )
2014-08-23 00:24:33 +04:00
}
2014-08-24 04:03:53 +04:00
func ( r * raft ) becomeCandidate ( ) {
2014-08-23 00:24:33 +04:00
// TODO(xiangli) remove the panic when the raft implementation is stable
2014-09-16 04:35:02 +04:00
if r . state == StateLeader {
2014-08-23 00:24:33 +04:00
panic ( "invalid transition [leader -> candidate]" )
}
2014-09-03 21:16:33 +04:00
r . step = stepCandidate
2014-08-24 04:03:53 +04:00
r . reset ( r . Term + 1 )
2014-09-03 21:16:33 +04:00
r . tick = r . tickElection
2014-08-24 04:30:14 +04:00
r . Vote = r . id
2014-09-16 04:35:02 +04:00
r . state = StateCandidate
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x became candidate at term %d" , r . id , r . Term )
2014-08-23 00:24:33 +04:00
}
2014-08-24 04:03:53 +04:00
func ( r * raft ) becomeLeader ( ) {
2014-08-23 00:24:33 +04:00
// TODO(xiangli) remove the panic when the raft implementation is stable
2014-09-16 04:35:02 +04:00
if r . state == StateFollower {
2014-08-23 00:24:33 +04:00
panic ( "invalid transition [follower -> leader]" )
}
2014-09-03 21:16:33 +04:00
r . step = stepLeader
2014-08-24 04:03:53 +04:00
r . reset ( r . Term )
2014-09-04 09:00:31 +04:00
r . tick = r . tickHeartbeat
2014-08-24 04:40:25 +04:00
r . lead = r . id
2014-09-16 04:35:02 +04:00
r . state = StateLeader
2015-06-13 08:24:40 +03:00
ents , err := r . raftLog . entries ( r . raftLog . committed + 1 , noLimit )
if err != nil {
2015-08-10 18:04:38 +03:00
r . logger . Panicf ( "unexpected error getting uncommitted entries (%v)" , err )
2015-06-13 08:24:40 +03:00
}
2016-07-26 01:06:57 +03:00
nconf := numOfPendingConf ( ents )
if nconf > 1 {
panic ( "unexpected multiple uncommitted config entry" )
}
if nconf == 1 {
2014-09-19 23:35:56 +04:00
r . pendingConf = true
}
2016-07-26 01:06:57 +03:00
2014-09-04 02:16:31 +04:00
r . appendEntry ( pb . Entry { Data : nil } )
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x became leader at term %d" , r . id , r . Term )
2014-08-23 00:24:33 +04:00
}
2016-06-29 13:24:58 +03:00
func ( r * raft ) campaign ( t CampaignType ) {
2014-09-03 03:59:29 +04:00
r . becomeCandidate ( )
2016-01-06 10:23:35 +03:00
if r . quorum ( ) == r . poll ( r . id , true ) {
2014-09-03 03:59:29 +04:00
r . becomeLeader ( )
2014-12-04 01:46:24 +03:00
return
2014-09-03 03:59:29 +04:00
}
2016-01-04 16:20:54 +03:00
for id := range r . prs {
if id == r . id {
2014-09-03 03:59:29 +04:00
continue
}
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x [logterm: %d, index: %d] sent vote request to %x at term %d" ,
2016-01-04 16:20:54 +03:00
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , id , r . Term )
2016-07-11 09:59:53 +03:00
2016-07-12 11:14:06 +03:00
var ctx [ ] byte
2016-07-11 09:59:53 +03:00
if t == campaignTransfer {
2016-07-12 11:14:06 +03:00
ctx = [ ] byte ( t )
2016-07-11 09:59:53 +03:00
}
2016-07-12 11:14:06 +03:00
r . send ( pb . Message { To : id , Type : pb . MsgVote , Index : r . raftLog . lastIndex ( ) , LogTerm : r . raftLog . lastTerm ( ) , Context : ctx } )
2014-09-03 03:59:29 +04:00
}
}
2014-12-09 02:21:57 +03:00
func ( r * raft ) poll ( id uint64 , v bool ) ( granted int ) {
if v {
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x received vote from %x at term %d" , r . id , id , r . Term )
2014-12-09 02:21:57 +03:00
} else {
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x received vote rejection from %x at term %d" , r . id , id , r . Term )
2014-12-09 02:21:57 +03:00
}
if _ , ok := r . votes [ id ] ; ! ok {
r . votes [ id ] = v
}
for _ , vv := range r . votes {
if vv {
granted ++
}
}
return granted
}
2014-08-28 05:53:18 +04:00
func ( r * raft ) Step ( m pb . Message ) error {
2014-10-12 11:34:22 +04:00
if m . Type == pb . MsgHup {
2015-11-17 03:32:12 +03:00
if r . state != StateLeader {
2016-08-08 20:44:02 +03:00
ents , err := r . raftLog . slice ( r . raftLog . applied + 1 , r . raftLog . committed + 1 , noLimit )
2016-07-26 01:06:57 +03:00
if err != nil {
2016-08-08 20:44:02 +03:00
r . logger . Panicf ( "unexpected error getting unapplied entries (%v)" , err )
2016-07-26 01:06:57 +03:00
}
if n := numOfPendingConf ( ents ) ; n != 0 && r . raftLog . committed > r . raftLog . applied {
r . logger . Warningf ( "%x cannot campaign at term %d since there are still %d pending configuration changes to apply" , r . id , r . Term , n )
return nil
}
2015-11-17 03:32:12 +03:00
r . logger . Infof ( "%x is starting a new election at term %d" , r . id , r . Term )
2016-07-11 09:59:53 +03:00
r . campaign ( campaignElection )
2015-11-17 03:32:12 +03:00
} else {
r . logger . Debugf ( "%x ignoring MsgHup because already leader" , r . id )
}
2014-12-04 01:46:24 +03:00
return nil
2014-08-23 00:24:33 +04:00
}
switch {
case m . Term == 0 :
// local message
2014-08-24 04:03:53 +04:00
case m . Term > r . Term :
2014-08-23 00:24:33 +04:00
lead := m . From
2014-10-12 11:34:22 +04:00
if m . Type == pb . MsgVote {
2016-07-12 11:14:06 +03:00
force := bytes . Equal ( m . Context , [ ] byte ( campaignTransfer ) )
2016-08-29 00:35:19 +03:00
inLease := r . checkQuorum && r . lead != None && r . electionElapsed < r . electionTimeout
2016-06-29 13:24:58 +03:00
if ! force && inLease {
2016-05-27 12:23:18 +03:00
// If a server receives a RequestVote request within the minimum election timeout
// of hearing from a current leader, it does not update its term or grant its vote
r . logger . Infof ( "%x [logterm: %d, index: %d, vote: %x] ignored vote from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)" ,
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , r . Vote , m . From , m . LogTerm , m . Index , r . Term , r . electionTimeout - r . electionElapsed )
return nil
}
2014-09-15 09:44:59 +04:00
lead = None
2014-08-23 00:24:33 +04:00
}
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x [term: %d] received a %s message with higher term from %x [term: %d]" ,
2014-12-04 01:46:24 +03:00
r . id , r . Term , m . Type , m . From , m . Term )
2014-08-24 04:03:53 +04:00
r . becomeFollower ( m . Term , lead )
case m . Term < r . Term :
2016-05-27 12:23:18 +03:00
if r . checkQuorum && ( m . Type == pb . MsgHeartbeat || m . Type == pb . MsgApp ) {
// We have received messages from a leader at a lower term. It is possible that these messages were
// simply delayed in the network, but this could also mean that this node has advanced its term number
// during a network partition, and it is now unable to either win an election or to rejoin the majority
// on the old term. If checkQuorum is false, this will be handled by incrementing term numbers in response
// to MsgVote with a higher term, but if checkQuorum is true we may not advance the term on MsgVote and
// must generate other messages to advance the term. The net result of these two features is to minimize
// the disruption caused by nodes that have been removed from the cluster's configuration: a removed node
// will send MsgVotes which will be ignored, but it will not receive MsgApp or MsgHeartbeat, so it will not
// create disruptive term increases
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp } )
} else {
// ignore other cases
r . logger . Infof ( "%x [term: %d] ignored a %s message with lower term from %x [term: %d]" ,
r . id , r . Term , m . Type , m . From , m . Term )
}
2014-09-04 21:58:22 +04:00
return nil
2014-08-23 00:24:33 +04:00
}
2014-09-03 21:16:33 +04:00
r . step ( r , m )
2014-08-23 00:24:33 +04:00
return nil
}
2014-08-28 05:53:18 +04:00
type stepFunc func ( r * raft , m pb . Message )
2014-08-23 00:24:33 +04:00
2014-08-28 05:53:18 +04:00
func stepLeader ( r * raft , m pb . Message ) {
2015-12-04 01:49:46 +03:00
// These message types do not require any progress for m.From.
2014-08-23 00:24:33 +04:00
switch m . Type {
2014-10-12 11:34:22 +04:00
case pb . MsgBeat :
2014-08-24 04:03:53 +04:00
r . bcastHeartbeat ( )
2015-12-04 01:49:46 +03:00
return
2015-11-24 08:59:25 +03:00
case pb . MsgCheckQuorum :
if ! r . checkQuorumActive ( ) {
r . logger . Warningf ( "%x stepped down to follower since quorum is not active" , r . id )
r . becomeFollower ( r . Term , None )
}
2015-12-04 01:49:46 +03:00
return
2014-10-12 11:34:22 +04:00
case pb . MsgProp :
2014-12-10 09:13:42 +03:00
if len ( m . Entries ) == 0 {
2015-08-10 18:04:38 +03:00
r . logger . Panicf ( "%x stepped empty MsgProp" , r . id )
2014-08-23 00:24:33 +04:00
}
2015-08-29 12:09:56 +03:00
if _ , ok := r . prs [ r . id ] ; ! ok {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
return
}
2016-03-24 14:59:41 +03:00
if r . leadTransferee != None {
r . logger . Debugf ( "%x [term %d] transfer leadership to %x is in progress; dropping proposal" , r . id , r . Term , r . leadTransferee )
return
}
2014-12-10 09:13:42 +03:00
for i , e := range m . Entries {
if e . Type == pb . EntryConfChange {
if r . pendingConf {
m . Entries [ i ] = pb . Entry { Type : pb . EntryNormal }
}
r . pendingConf = true
2014-09-19 23:35:56 +04:00
}
}
2014-12-10 09:13:42 +03:00
r . appendEntry ( m . Entries ... )
2014-08-24 04:03:53 +04:00
r . bcastAppend ( )
2015-12-04 01:49:46 +03:00
return
case pb . MsgVote :
r . logger . Infof ( "%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d" ,
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , r . Vote , m . From , m . LogTerm , m . Index , r . Term )
r . send ( pb . Message { To : m . From , Type : pb . MsgVoteResp , Reject : true } )
return
2016-06-03 18:20:10 +03:00
case pb . MsgReadIndex :
2016-08-27 02:03:06 +03:00
if r . quorum ( ) > 1 {
// thinking: use an interally defined context instead of the user given context.
// We can express this in terms of the term and index instead of a user-supplied value.
// This would allow multiple reads to piggyback on the same message.
switch r . readOnly . option {
case ReadOnlySafe :
r . readOnly . addRequest ( r . raftLog . committed , m )
r . bcastHeartbeatWithCtx ( m . Entries [ 0 ] . Data )
case ReadOnlyLeaseBased :
var ri uint64
if r . checkQuorum {
ri = r . raftLog . committed
}
if m . From == None || m . From == r . id { // from local member
r . readStates = append ( r . readStates , ReadState { Index : r . raftLog . committed , RequestCtx : m . Entries [ 0 ] . Data } )
} else {
r . send ( pb . Message { To : m . From , Type : pb . MsgReadIndexResp , Index : ri , Entries : m . Entries } )
}
}
2016-07-20 00:19:13 +03:00
} else {
2016-08-27 02:03:06 +03:00
r . readStates = append ( r . readStates , ReadState { Index : r . raftLog . committed , RequestCtx : m . Entries [ 0 ] . Data } )
2016-07-20 00:19:13 +03:00
}
2016-08-27 02:03:06 +03:00
2016-06-03 18:20:10 +03:00
return
2015-12-04 01:49:46 +03:00
}
// All other message types require a progress for m.From (pr).
pr , prOk := r . prs [ m . From ]
if ! prOk {
2016-03-24 14:59:41 +03:00
r . logger . Debugf ( "%x no progress available for %x" , r . id , m . From )
2015-12-04 01:49:46 +03:00
return
}
switch m . Type {
2014-10-12 11:34:22 +04:00
case pb . MsgAppResp :
2015-12-10 23:17:18 +03:00
pr . RecentActive = true
2015-11-24 08:59:25 +03:00
2014-10-01 23:59:30 +04:00
if m . Reject {
2015-08-10 18:04:38 +03:00
r . logger . Debugf ( "%x received msgApp rejection(lastindex: %d) from %x for index %d" ,
2014-12-31 06:42:59 +03:00
r . id , m . RejectHint , m . From , m . Index )
2015-02-26 02:07:26 +03:00
if pr . maybeDecrTo ( m . Index , m . RejectHint ) {
2015-08-10 18:04:38 +03:00
r . logger . Debugf ( "%x decreased progress of %x to [%s]" , r . id , m . From , pr )
2015-03-16 10:46:16 +03:00
if pr . State == ProgressStateReplicate {
pr . becomeProbe ( )
}
2014-09-30 06:48:49 +04:00
r . sendAppend ( m . From )
}
2014-08-23 00:24:33 +04:00
} else {
2015-03-16 10:46:16 +03:00
oldPaused := pr . isPaused ( )
if pr . maybeUpdate ( m . Index ) {
switch {
case pr . State == ProgressStateProbe :
pr . becomeReplicate ( )
2016-06-24 17:54:10 +03:00
case pr . State == ProgressStateSnapshot && pr . needSnapshotAbort ( ) :
2015-08-19 01:43:49 +03:00
r . logger . Debugf ( "%x snapshot aborted, resumed sending replication messages to %x [%s]" , r . id , m . From , pr )
2015-03-16 10:46:16 +03:00
pr . becomeProbe ( )
2015-03-20 04:06:51 +03:00
case pr . State == ProgressStateReplicate :
pr . ins . freeTo ( m . Index )
2015-03-16 10:46:16 +03:00
}
if r . maybeCommit ( ) {
r . bcastAppend ( )
} else if oldPaused {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
r . sendAppend ( m . From )
}
2016-03-24 14:59:41 +03:00
// Transfer leadership is in progress.
if m . From == r . leadTransferee && pr . Match == r . raftLog . lastIndex ( ) {
r . logger . Infof ( "%x sent MsgTimeoutNow to %x after received MsgAppResp" , r . id , m . From )
r . sendTimeoutNow ( m . From )
}
2014-08-23 00:24:33 +04:00
}
}
2015-01-14 22:42:30 +03:00
case pb . MsgHeartbeatResp :
2015-12-10 23:17:18 +03:00
pr . RecentActive = true
2015-11-24 08:59:25 +03:00
2015-03-20 04:06:51 +03:00
// free one slot for the full inflights window to allow progress.
if pr . State == ProgressStateReplicate && pr . ins . full ( ) {
pr . ins . freeFirstOne ( )
}
2015-02-26 02:07:26 +03:00
if pr . Match < r . raftLog . lastIndex ( ) {
2015-01-14 22:42:30 +03:00
r . sendAppend ( m . From )
}
2016-08-27 02:03:06 +03:00
if r . readOnly . option != ReadOnlySafe || len ( m . Context ) == 0 {
return
}
ackCount := r . readOnly . recvAck ( m )
if ackCount < r . quorum ( ) {
return
}
rss := r . readOnly . advance ( m )
for _ , rs := range rss {
req := rs . req
if req . From == None || req . From == r . id { // from local member
r . readStates = append ( r . readStates , ReadState { Index : rs . index , RequestCtx : req . Entries [ 0 ] . Data } )
} else {
r . send ( pb . Message { To : req . From , Type : pb . MsgReadIndexResp , Index : rs . index , Entries : req . Entries } )
}
}
2015-02-25 08:38:18 +03:00
case pb . MsgSnapStatus :
2015-03-16 10:46:16 +03:00
if pr . State != ProgressStateSnapshot {
2015-02-25 08:38:18 +03:00
return
}
2015-03-16 10:46:16 +03:00
if ! m . Reject {
pr . becomeProbe ( )
2015-08-19 01:43:49 +03:00
r . logger . Debugf ( "%x snapshot succeeded, resumed sending replication messages to %x [%s]" , r . id , m . From , pr )
2015-03-16 10:46:16 +03:00
} else {
pr . snapshotFailure ( )
pr . becomeProbe ( )
2015-08-19 01:43:49 +03:00
r . logger . Debugf ( "%x snapshot failed, resumed sending replication messages to %x [%s]" , r . id , m . From , pr )
2015-02-25 08:38:18 +03:00
}
2015-03-16 10:46:16 +03:00
// If snapshot finish, wait for the msgAppResp from the remote node before sending
// out the next msgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr . pause ( )
2015-02-05 01:41:14 +03:00
case pb . MsgUnreachable :
2015-03-16 10:46:16 +03:00
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
if pr . State == ProgressStateReplicate {
pr . becomeProbe ( )
2015-03-01 18:01:30 +03:00
}
2015-08-10 18:04:38 +03:00
r . logger . Debugf ( "%x failed to send message to %x because it is unreachable [%s]" , r . id , m . From , pr )
2016-03-24 14:59:41 +03:00
case pb . MsgTransferLeader :
leadTransferee := m . From
lastLeadTransferee := r . leadTransferee
if lastLeadTransferee != None {
if lastLeadTransferee == leadTransferee {
r . logger . Infof ( "%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x" ,
r . id , r . Term , leadTransferee , leadTransferee )
return
}
r . abortLeaderTransfer ( )
2016-05-11 06:03:42 +03:00
r . logger . Infof ( "%x [term %d] abort previous transferring leadership to %x" , r . id , r . Term , lastLeadTransferee )
2016-03-24 14:59:41 +03:00
}
if leadTransferee == r . id {
2016-05-11 06:03:42 +03:00
r . logger . Debugf ( "%x is already leader. Ignored transferring leadership to self" , r . id )
2016-03-24 14:59:41 +03:00
return
}
// Transfer leadership to third party.
r . logger . Infof ( "%x [term %d] starts to transfer leadership to %x" , r . id , r . Term , leadTransferee )
// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
r . electionElapsed = 0
r . leadTransferee = leadTransferee
if pr . Match == r . raftLog . lastIndex ( ) {
r . sendTimeoutNow ( leadTransferee )
r . logger . Infof ( "%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log" , r . id , leadTransferee , leadTransferee )
} else {
r . sendAppend ( leadTransferee )
}
2014-08-23 00:24:33 +04:00
}
}
2014-08-28 05:53:18 +04:00
func stepCandidate ( r * raft , m pb . Message ) {
2014-08-23 00:24:33 +04:00
switch m . Type {
2014-10-12 11:34:22 +04:00
case pb . MsgProp :
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x no leader at term %d; dropping proposal" , r . id , r . Term )
2014-12-11 01:34:40 +03:00
return
2014-10-12 11:34:22 +04:00
case pb . MsgApp :
2014-08-24 04:03:53 +04:00
r . becomeFollower ( r . Term , m . From )
r . handleAppendEntries ( m )
2014-12-04 08:32:12 +03:00
case pb . MsgHeartbeat :
r . becomeFollower ( r . Term , m . From )
r . handleHeartbeat ( m )
2014-10-12 11:34:22 +04:00
case pb . MsgSnap :
2014-08-24 04:03:53 +04:00
r . becomeFollower ( m . Term , m . From )
r . handleSnapshot ( m )
2014-10-12 11:34:22 +04:00
case pb . MsgVote :
2015-11-16 07:26:16 +03:00
r . logger . Infof ( "%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d" ,
2014-12-02 10:50:10 +03:00
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , r . Vote , m . From , m . LogTerm , m . Index , r . Term )
2014-10-12 11:34:22 +04:00
r . send ( pb . Message { To : m . From , Type : pb . MsgVoteResp , Reject : true } )
case pb . MsgVoteResp :
2014-10-01 23:59:30 +04:00
gr := r . poll ( m . From , ! m . Reject )
2016-01-06 10:23:35 +03:00
r . logger . Infof ( "%x [quorum:%d] has received %d votes and %d vote rejections" , r . id , r . quorum ( ) , gr , len ( r . votes ) - gr )
switch r . quorum ( ) {
2014-08-23 00:24:33 +04:00
case gr :
2014-08-24 04:03:53 +04:00
r . becomeLeader ( )
r . bcastAppend ( )
case len ( r . votes ) - gr :
2014-09-15 09:44:59 +04:00
r . becomeFollower ( r . Term , None )
2014-08-23 00:24:33 +04:00
}
2016-03-24 14:59:41 +03:00
case pb . MsgTimeoutNow :
r . logger . Debugf ( "%x [term %d state %v] ignored MsgTimeoutNow from %x" , r . id , r . Term , r . state , m . From )
2014-08-23 00:24:33 +04:00
}
}
2014-08-28 05:53:18 +04:00
func stepFollower ( r * raft , m pb . Message ) {
2014-08-23 00:24:33 +04:00
switch m . Type {
2014-10-12 11:34:22 +04:00
case pb . MsgProp :
2014-09-15 09:44:59 +04:00
if r . lead == None {
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x no leader at term %d; dropping proposal" , r . id , r . Term )
2014-12-11 01:34:40 +03:00
return
2014-08-23 00:24:33 +04:00
}
2014-08-24 04:40:25 +04:00
m . To = r . lead
2014-08-24 04:03:53 +04:00
r . send ( m )
2014-10-12 11:34:22 +04:00
case pb . MsgApp :
2015-11-24 08:59:25 +03:00
r . electionElapsed = 0
2014-08-24 04:40:25 +04:00
r . lead = m . From
2014-12-04 08:32:12 +03:00
r . handleAppendEntries ( m )
case pb . MsgHeartbeat :
2015-11-24 08:59:25 +03:00
r . electionElapsed = 0
2014-12-04 08:32:12 +03:00
r . lead = m . From
r . handleHeartbeat ( m )
2014-10-12 11:34:22 +04:00
case pb . MsgSnap :
2015-11-24 08:59:25 +03:00
r . electionElapsed = 0
2016-07-12 02:37:31 +03:00
r . lead = m . From
2014-08-24 04:03:53 +04:00
r . handleSnapshot ( m )
2014-10-12 11:34:22 +04:00
case pb . MsgVote :
2014-09-15 09:44:59 +04:00
if ( r . Vote == None || r . Vote == m . From ) && r . raftLog . isUpToDate ( m . Index , m . LogTerm ) {
2015-11-24 08:59:25 +03:00
r . electionElapsed = 0
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d" ,
2014-12-02 10:50:10 +03:00
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , r . Vote , m . From , m . LogTerm , m . Index , r . Term )
2014-08-24 04:30:14 +04:00
r . Vote = m . From
2014-10-12 11:34:22 +04:00
r . send ( pb . Message { To : m . From , Type : pb . MsgVoteResp } )
2014-08-23 00:24:33 +04:00
} else {
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d" ,
2014-12-02 10:50:10 +03:00
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , r . Vote , m . From , m . LogTerm , m . Index , r . Term )
2014-10-12 11:34:22 +04:00
r . send ( pb . Message { To : m . From , Type : pb . MsgVoteResp , Reject : true } )
2014-08-23 00:24:33 +04:00
}
2016-08-11 02:24:29 +03:00
case pb . MsgTransferLeader :
if r . lead == None {
r . logger . Infof ( "%x no leader at term %d; dropping leader transfer msg" , r . id , r . Term )
return
}
m . To = r . lead
r . send ( m )
2016-03-24 14:59:41 +03:00
case pb . MsgTimeoutNow :
r . logger . Infof ( "%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership." , r . id , r . Term , m . From )
2016-07-11 09:59:53 +03:00
r . campaign ( campaignTransfer )
2016-06-03 18:20:10 +03:00
case pb . MsgReadIndex :
if r . lead == None {
r . logger . Infof ( "%x no leader at term %d; dropping index reading msg" , r . id , r . Term )
return
}
m . To = r . lead
r . send ( m )
case pb . MsgReadIndexResp :
if len ( m . Entries ) != 1 {
r . logger . Errorf ( "%x invalid format of MsgReadIndexResp from %x, entries count: %d" , r . id , m . From , len ( m . Entries ) )
return
}
2016-08-27 02:03:06 +03:00
r . readStates = append ( r . readStates , ReadState { Index : m . Index , RequestCtx : m . Entries [ 0 ] . Data } )
2014-08-23 00:24:33 +04:00
}
}
2014-12-09 02:29:54 +03:00
func ( r * raft ) handleAppendEntries ( m pb . Message ) {
2016-01-26 23:18:55 +03:00
if m . Index < r . raftLog . committed {
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp , Index : r . raftLog . committed } )
2015-03-11 04:33:58 +03:00
return
}
2014-12-09 02:29:54 +03:00
if mlastIndex , ok := r . raftLog . maybeAppend ( m . Index , m . LogTerm , m . Commit , m . Entries ... ) ; ok {
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp , Index : mlastIndex } )
} else {
2015-08-10 18:04:38 +03:00
r . logger . Debugf ( "%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x" ,
r . id , r . raftLog . zeroTermOnErrCompacted ( r . raftLog . term ( m . Index ) ) , m . Index , m . LogTerm , m . Index , m . From )
2014-12-31 06:42:59 +03:00
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp , Index : m . Index , Reject : true , RejectHint : r . raftLog . lastIndex ( ) } )
2014-12-09 02:29:54 +03:00
}
}
func ( r * raft ) handleHeartbeat ( m pb . Message ) {
r . raftLog . commitTo ( m . Commit )
2016-08-27 02:03:06 +03:00
r . send ( pb . Message { To : m . From , Type : pb . MsgHeartbeatResp , Context : m . Context } )
2014-12-09 02:29:54 +03:00
}
func ( r * raft ) handleSnapshot ( m pb . Message ) {
sindex , sterm := m . Snapshot . Metadata . Index , m . Snapshot . Metadata . Term
if r . restore ( m . Snapshot ) {
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x [commit: %d] restored snapshot [index: %d, term: %d]" ,
2016-01-26 23:18:55 +03:00
r . id , r . raftLog . committed , sindex , sterm )
2014-12-09 02:29:54 +03:00
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp , Index : r . raftLog . lastIndex ( ) } )
} else {
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x [commit: %d] ignored snapshot [index: %d, term: %d]" ,
2016-01-26 23:18:55 +03:00
r . id , r . raftLog . committed , sindex , sterm )
2014-12-09 02:29:54 +03:00
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp , Index : r . raftLog . committed } )
}
}
2014-12-31 06:42:59 +03:00
// restore recovers the state machine from a snapshot. It restores the log and the
// configuration of state machine.
2014-08-28 05:53:18 +04:00
func ( r * raft ) restore ( s pb . Snapshot ) bool {
2014-11-20 00:17:50 +03:00
if s . Metadata . Index <= r . raftLog . committed {
2014-08-23 00:24:33 +04:00
return false
}
2014-12-03 08:34:14 +03:00
if r . raftLog . matchTerm ( s . Metadata . Index , s . Metadata . Term ) {
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]" ,
2016-01-26 23:18:55 +03:00
r . id , r . raftLog . committed , r . raftLog . lastIndex ( ) , r . raftLog . lastTerm ( ) , s . Metadata . Index , s . Metadata . Term )
2014-12-03 08:34:14 +03:00
r . raftLog . commitTo ( s . Metadata . Index )
return false
}
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]" ,
2016-01-26 23:18:55 +03:00
r . id , r . raftLog . committed , r . raftLog . lastIndex ( ) , r . raftLog . lastTerm ( ) , s . Metadata . Index , s . Metadata . Term )
2014-08-23 00:24:33 +04:00
2014-08-24 04:03:53 +04:00
r . raftLog . restore ( s )
2015-01-20 21:26:22 +03:00
r . prs = make ( map [ uint64 ] * Progress )
2014-11-20 00:17:50 +03:00
for _ , n := range s . Metadata . ConfState . Nodes {
2016-07-10 08:01:19 +03:00
match , next := uint64 ( 0 ) , r . raftLog . lastIndex ( ) + 1
2014-08-24 04:03:53 +04:00
if n == r . id {
2014-12-02 22:45:41 +03:00
match = next - 1
2014-08-23 00:24:33 +04:00
}
2014-12-02 22:45:41 +03:00
r . setProgress ( n , match , next )
2015-08-10 18:04:38 +03:00
r . logger . Infof ( "%x restored progress of %x [%s]" , r . id , n , r . prs [ n ] )
2014-08-23 00:24:33 +04:00
}
return true
}
2014-12-09 02:23:21 +03:00
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func ( r * raft ) promotable ( ) bool {
_ , ok := r . prs [ r . id ]
return ok
}
func ( r * raft ) addNode ( id uint64 ) {
if _ , ok := r . prs [ id ] ; ok {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
}
r . setProgress ( id , 0 , r . raftLog . lastIndex ( ) + 1 )
r . pendingConf = false
}
func ( r * raft ) removeNode ( id uint64 ) {
r . delProgress ( id )
r . pendingConf = false
2016-05-16 19:58:57 +03:00
// do not try to commit or abort transferring if there is no nodes in the cluster.
if len ( r . prs ) == 0 {
return
}
2016-01-20 18:03:12 +03:00
// The quorum size is now smaller, so see if any pending entries can
// be committed.
2016-01-27 00:55:47 +03:00
if r . maybeCommit ( ) {
r . bcastAppend ( )
}
2016-04-11 09:16:56 +03:00
// If the removed node is the leadTransferee, then abort the leadership transferring.
2016-03-24 14:59:41 +03:00
if r . state == StateLeader && r . leadTransferee == id {
r . abortLeaderTransfer ( )
}
2014-12-09 02:23:21 +03:00
}
func ( r * raft ) resetPendingConf ( ) { r . pendingConf = false }
2014-10-08 14:29:53 +04:00
func ( r * raft ) setProgress ( id , match , next uint64 ) {
2015-03-20 04:06:51 +03:00
r . prs [ id ] = & Progress { Next : next , Match : match , ins : newInflights ( r . maxInflight ) }
2014-08-23 00:24:33 +04:00
}
2014-10-08 14:29:53 +04:00
func ( r * raft ) delProgress ( id uint64 ) {
2014-08-24 04:03:53 +04:00
delete ( r . prs , id )
2014-08-23 00:24:33 +04:00
}
2014-09-16 04:35:02 +04:00
func ( r * raft ) loadState ( state pb . HardState ) {
2014-12-03 08:58:48 +03:00
if state . Commit < r . raftLog . committed || state . Commit > r . raftLog . lastIndex ( ) {
2015-08-10 18:04:38 +03:00
r . logger . Panicf ( "%x state.commit %d is out of range [%d, %d]" , r . id , state . Commit , r . raftLog . committed , r . raftLog . lastIndex ( ) )
2014-12-03 08:58:48 +03:00
}
2014-08-24 04:03:53 +04:00
r . raftLog . committed = state . Commit
2014-08-24 04:30:14 +04:00
r . Term = state . Term
r . Vote = state . Vote
2014-08-23 00:24:33 +04:00
}
2014-10-07 16:12:49 +04:00
2016-04-06 15:41:46 +03:00
// pastElectionTimeout returns true iff r.electionElapsed is greater
// than or equal to the randomized election timeout in
// [electiontimeout, 2 * electiontimeout - 1].
2016-04-01 20:09:09 +03:00
func ( r * raft ) pastElectionTimeout ( ) bool {
return r . electionElapsed >= r . randomizedElectionTimeout
}
func ( r * raft ) resetRandomizedElectionTimeout ( ) {
2016-09-05 16:03:18 +03:00
r . randomizedElectionTimeout = r . electionTimeout + globalRand . Intn ( r . electionTimeout )
2014-10-07 16:12:49 +04:00
}
2015-11-24 08:59:25 +03:00
// checkQuorumActive returns true if the quorum is active from
// the view of the local raft state machine. Otherwise, it returns
// false.
2015-12-12 02:06:11 +03:00
// checkQuorumActive also resets all RecentActive to false.
2015-11-24 08:59:25 +03:00
func ( r * raft ) checkQuorumActive ( ) bool {
var act int
for id := range r . prs {
if id == r . id { // self is always active
2016-01-13 04:39:00 +03:00
act ++
2015-11-24 08:59:25 +03:00
continue
}
2015-12-10 23:17:18 +03:00
if r . prs [ id ] . RecentActive {
2016-01-13 04:39:00 +03:00
act ++
2015-11-24 08:59:25 +03:00
}
2015-12-10 23:17:18 +03:00
r . prs [ id ] . RecentActive = false
2015-11-24 08:59:25 +03:00
}
2016-01-06 10:23:35 +03:00
return act >= r . quorum ( )
2015-11-24 08:59:25 +03:00
}
2016-03-24 14:59:41 +03:00
func ( r * raft ) sendTimeoutNow ( to uint64 ) {
r . send ( pb . Message { To : to , Type : pb . MsgTimeoutNow } )
}
func ( r * raft ) abortLeaderTransfer ( ) {
r . leadTransferee = None
}
2016-07-26 01:06:57 +03:00
func numOfPendingConf ( ents [ ] pb . Entry ) int {
n := 0
for i := range ents {
if ents [ i ] . Type == pb . EntryConfChange {
n ++
}
}
return n
}