etcd/raft/raft.go

573 lines
13 KiB
Go

/*
Copyright 2014 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package raft
import (
"errors"
"fmt"
"math/rand"
"sort"
pb "github.com/coreos/etcd/raft/raftpb"
)
// None is a placeholder node ID used when there is no leader.
const None uint64 = 0
var errNoLeader = errors.New("no leader")
// Possible values for StateType.
const (
StateFollower StateType = iota
StateCandidate
StateLeader
)
// StateType represents the role of a node in a cluster.
type StateType uint64
var stmap = [...]string{
"StateFollower",
"StateCandidate",
"StateLeader",
}
func (st StateType) String() string {
return stmap[uint64(st)]
}
func (st StateType) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf("%q", st.String())), nil
}
type progress struct {
match, next uint64
}
func (pr *progress) update(n uint64) {
pr.match = n
pr.next = n + 1
}
// maybeDecrTo returns false if the given to index comes from an out of order message.
// Otherwise it decreases the progress next index and returns true.
func (pr *progress) maybeDecrTo(to uint64) bool {
// the rejection must be stale if the progress has matched with
// follower or "to" does not match next - 1
if pr.match != 0 || pr.next-1 != to {
return false
}
if pr.next--; pr.next < 1 {
pr.next = 1
}
return true
}
func (pr *progress) String() string {
return fmt.Sprintf("n=%d m=%d", pr.next, pr.match)
}
// uint64Slice implements sort interface
type uint64Slice []uint64
func (p uint64Slice) Len() int { return len(p) }
func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
type raft struct {
pb.HardState
id uint64
// the log
raftLog *raftLog
prs map[uint64]*progress
state StateType
votes map[uint64]bool
msgs []pb.Message
// the leader id
lead uint64
// New configuration is ignored if there exists unapplied configuration.
pendingConf bool
elapsed int // number of ticks since the last msg
heartbeatTimeout int
electionTimeout int
rand *rand.Rand
tick func()
step stepFunc
}
func newRaft(id uint64, peers []uint64, election, heartbeat int) *raft {
if id == None {
panic("cannot use none id")
}
r := &raft{
id: id,
lead: None,
raftLog: newLog(),
prs: make(map[uint64]*progress),
electionTimeout: election,
heartbeatTimeout: heartbeat,
}
r.rand = rand.New(rand.NewSource(int64(id)))
for _, p := range peers {
r.prs[p] = &progress{}
}
r.becomeFollower(0, None)
return r
}
func (r *raft) hasLeader() bool { return r.lead != None }
func (r *raft) softState() *SoftState {
return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes()}
}
func (r *raft) String() string {
s := fmt.Sprintf(`state=%v term=%d`, r.state, r.Term)
switch r.state {
case StateFollower:
s += fmt.Sprintf(" vote=%v lead=%v", r.Vote, r.lead)
case StateCandidate:
s += fmt.Sprintf(` votes="%v"`, r.votes)
case StateLeader:
s += fmt.Sprintf(` prs="%v"`, r.prs)
}
return s
}
func (r *raft) poll(id uint64, v bool) (granted int) {
if _, ok := r.votes[id]; !ok {
r.votes[id] = v
}
for _, vv := range r.votes {
if vv {
granted++
}
}
return granted
}
// send persists state to stable storage and then sends to its mailbox.
func (r *raft) send(m pb.Message) {
m.From = r.id
// do not attach term to MsgProp
// proposals are a way to forward to the leader and
// should be treated as local message.
if m.Type != pb.MsgProp {
m.Term = r.Term
}
r.msgs = append(r.msgs, m)
}
// sendAppend sends RRPC, with entries to the given peer.
func (r *raft) sendAppend(to uint64) {
pr := r.prs[to]
m := pb.Message{}
m.To = to
m.Index = pr.next - 1
if r.needSnapshot(m.Index) {
m.Type = pb.MsgSnap
m.Snapshot = r.raftLog.snapshot
} else {
m.Type = pb.MsgApp
m.LogTerm = r.raftLog.term(pr.next - 1)
m.Entries = r.raftLog.entries(pr.next)
m.Commit = r.raftLog.committed
}
r.send(m)
}
// sendHeartbeat sends an empty MsgApp
func (r *raft) sendHeartbeat(to uint64) {
m := pb.Message{
To: to,
Type: pb.MsgApp,
}
r.send(m)
}
// bcastAppend sends RRPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
for i := range r.prs {
if i == r.id {
continue
}
r.sendAppend(i)
}
}
// bcastHeartbeat sends RRPC, without entries to all the peers.
func (r *raft) bcastHeartbeat() {
for i := range r.prs {
if i == r.id {
continue
}
r.sendHeartbeat(i)
}
}
func (r *raft) maybeCommit() bool {
// TODO(bmizerany): optimize.. Currently naive
mis := make(uint64Slice, 0, len(r.prs))
for i := range r.prs {
mis = append(mis, r.prs[i].match)
}
sort.Sort(sort.Reverse(mis))
mci := mis[r.q()-1]
return r.raftLog.maybeCommit(mci, r.Term)
}
func (r *raft) reset(term uint64) {
r.Term = term
r.lead = None
r.Vote = None
r.elapsed = 0
r.votes = make(map[uint64]bool)
for i := range r.prs {
r.prs[i] = &progress{next: r.raftLog.lastIndex() + 1}
if i == r.id {
r.prs[i].match = r.raftLog.lastIndex()
}
}
r.pendingConf = false
}
func (r *raft) q() int {
return len(r.prs)/2 + 1
}
func (r *raft) appendEntry(e pb.Entry) {
e.Term = r.Term
e.Index = r.raftLog.lastIndex() + 1
r.raftLog.append(r.raftLog.lastIndex(), e)
r.prs[r.id].update(r.raftLog.lastIndex())
r.maybeCommit()
}
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
if !r.promotable() {
r.elapsed = 0
return
}
r.elapsed++
if r.isElectionTimeout() {
r.elapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
func (r *raft) tickHeartbeat() {
r.elapsed++
if r.elapsed > r.heartbeatTimeout {
r.elapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}
}
func (r *raft) becomeFollower(term uint64, lead uint64) {
r.step = stepFollower
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.state = StateFollower
}
func (r *raft) becomeCandidate() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateLeader {
panic("invalid transition [leader -> candidate]")
}
r.step = stepCandidate
r.reset(r.Term + 1)
r.tick = r.tickElection
r.Vote = r.id
r.state = StateCandidate
}
func (r *raft) becomeLeader() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateFollower {
panic("invalid transition [follower -> leader]")
}
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
if e.Type != pb.EntryConfChange {
continue
}
if r.pendingConf {
panic("unexpected double uncommitted config entry")
}
r.pendingConf = true
}
r.appendEntry(pb.Entry{Data: nil})
}
func (r *raft) campaign() {
r.becomeCandidate()
if r.q() == r.poll(r.id, true) {
r.becomeLeader()
}
for i := range r.prs {
if i == r.id {
continue
}
r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
}
}
func (r *raft) Step(m pb.Message) error {
// TODO(bmizerany): this likely allocs - prevent that.
defer func() { r.Commit = r.raftLog.committed }()
if m.Type == pb.MsgHup {
r.campaign()
}
switch {
case m.Term == 0:
// local message
case m.Term > r.Term:
lead := m.From
if m.Type == pb.MsgVote {
lead = None
}
r.becomeFollower(m.Term, lead)
case m.Term < r.Term:
// ignore
return nil
}
r.step(r, m)
return nil
}
func (r *raft) handleAppendEntries(m pb.Message) {
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
}
}
func (r *raft) handleSnapshot(m pb.Message) {
if r.restore(m.Snapshot) {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
}
}
func (r *raft) resetPendingConf() {
r.pendingConf = false
}
func (r *raft) addNode(id uint64) {
r.setProgress(id, 0, r.raftLog.lastIndex()+1)
r.pendingConf = false
}
func (r *raft) removeNode(id uint64) {
r.delProgress(id)
r.pendingConf = false
}
type stepFunc func(r *raft, m pb.Message)
func stepLeader(r *raft, m pb.Message) {
switch m.Type {
case pb.MsgBeat:
r.bcastHeartbeat()
case pb.MsgProp:
if len(m.Entries) != 1 {
panic("unexpected length(entries) of a MsgProp")
}
e := m.Entries[0]
if e.Type == pb.EntryConfChange {
if r.pendingConf {
return
}
r.pendingConf = true
}
r.appendEntry(e)
r.bcastAppend()
case pb.MsgAppResp:
if m.Index == 0 {
return
}
if m.Reject {
if r.prs[m.From].maybeDecrTo(m.Index) {
r.sendAppend(m.From)
}
} else {
r.prs[m.From].update(m.Index)
if r.maybeCommit() {
r.bcastAppend()
}
}
case pb.MsgVote:
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
}
}
func stepCandidate(r *raft, m pb.Message) {
switch m.Type {
case pb.MsgProp:
panic("no leader")
case pb.MsgApp:
r.becomeFollower(r.Term, m.From)
r.handleAppendEntries(m)
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From)
r.handleSnapshot(m)
case pb.MsgVote:
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
case pb.MsgVoteResp:
gr := r.poll(m.From, !m.Reject)
switch r.q() {
case gr:
r.becomeLeader()
r.bcastAppend()
case len(r.votes) - gr:
r.becomeFollower(r.Term, None)
}
}
}
func stepFollower(r *raft, m pb.Message) {
switch m.Type {
case pb.MsgProp:
if r.lead == None {
panic("no leader")
}
m.To = r.lead
r.send(m)
case pb.MsgApp:
r.elapsed = 0
r.lead = m.From
r.handleAppendEntries(m)
case pb.MsgSnap:
r.elapsed = 0
r.handleSnapshot(m)
case pb.MsgVote:
if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.elapsed = 0
r.Vote = m.From
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
}
}
}
func (r *raft) compact(index uint64, nodes []uint64, d []byte) {
if index > r.raftLog.applied {
panic(fmt.Sprintf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied))
}
r.raftLog.snap(d, index, r.raftLog.term(index), nodes)
r.raftLog.compact(index)
}
// restore recovers the statemachine from a snapshot. It restores the log and the
// configuration of statemachine.
func (r *raft) restore(s pb.Snapshot) bool {
if s.Index <= r.raftLog.committed {
return false
}
r.raftLog.restore(s)
r.prs = make(map[uint64]*progress)
for _, n := range s.Nodes {
if n == r.id {
r.setProgress(n, r.raftLog.lastIndex(), r.raftLog.lastIndex()+1)
} else {
r.setProgress(n, 0, r.raftLog.lastIndex()+1)
}
}
return true
}
func (r *raft) needSnapshot(i uint64) bool {
if i < r.raftLog.offset {
if r.raftLog.snapshot.Term == 0 {
panic("need non-empty snapshot")
}
return true
}
return false
}
func (r *raft) nodes() []uint64 {
nodes := make([]uint64, 0, len(r.prs))
for k := range r.prs {
nodes = append(nodes, k)
}
return nodes
}
func (r *raft) setProgress(id, match, next uint64) {
r.prs[id] = &progress{next: next, match: match}
}
func (r *raft) delProgress(id uint64) {
delete(r.prs, id)
}
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
_, ok := r.prs[r.id]
return ok
}
func (r *raft) loadEnts(ents []pb.Entry) {
r.raftLog.load(ents)
}
func (r *raft) loadState(state pb.HardState) {
r.raftLog.committed = state.Commit
r.Term = state.Term
r.Vote = state.Vote
r.Commit = state.Commit
}
// isElectionTimeout returns true if r.elapsed is greater than the
// randomized election timeout in (electiontimeout, 2 * electiontimeout - 1).
// Otherwise, it returns false.
func (r *raft) isElectionTimeout() bool {
d := r.elapsed - r.electionTimeout
if d < 0 {
return false
}
return d > r.rand.Int()%r.electionTimeout
}