etcd/etcdserver/raft.go

608 lines
17 KiB
Go
Raw Normal View History

2016-05-13 06:49:40 +03:00
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import (
2014-11-07 23:53:32 +03:00
"encoding/json"
"expvar"
"sort"
"sync"
"sync/atomic"
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/contention"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/wal"
2015-01-06 10:27:03 +03:00
"github.com/coreos/etcd/wal/walpb"
2016-03-23 03:10:28 +03:00
"github.com/coreos/pkg/capnslog"
)
const (
// Number of entries for slow follower to catch-up after compacting
// the raft storage entries.
// We expect the follower has a millisecond level latency with the leader.
// The max throughput is around 10K. Keep a 5K entries is enough for helping
// follower to catch up.
numberOfCatchUpEntries = 5000
2015-03-24 07:19:03 +03:00
// The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
// Assuming the RTT is around 10ms, 1MB max size is large enough.
maxSizePerMsg = 1 * 1024 * 1024
// Never overflow the rafthttp buffer, which is 4096.
// TODO: a better const?
maxInflightMsgs = 4096 / 8
)
var (
// protects raftStatus
raftStatusMu sync.Mutex
// indirection for expvar func interface
// expvar panics when publishing duplicate name
// expvar does not support remove a registered name
// so only register a func that calls raftStatus
// and change raftStatus as we need.
raftStatus func() raft.Status
)
func init() {
raft.SetLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "raft"))
expvar.Publish("raft.status", expvar.Func(func() interface{} {
raftStatusMu.Lock()
defer raftStatusMu.Unlock()
return raftStatus()
}))
}
type RaftTimer interface {
Index() uint64
AppliedIndex() uint64
Term() uint64
}
// apply contains entries, snapshot to be applied. Once
// an apply is consumed, the entries will be persisted to
// to raft storage concurrently; the application must read
// raftDone before assuming the raft messages are stable.
type apply struct {
entries []raftpb.Entry
snapshot raftpb.Snapshot
// notifyc synchronizes etcd server applies with the raft node
notifyc chan struct{}
}
type raftNode struct {
// Cache of the latest raft index and raft term the server has seen.
// These three unit64 fields must be the first elements to keep 64-bit
// alignment for atomic access to the fields.
index uint64
appliedindex uint64
term uint64
lead uint64
tickMu *sync.Mutex
raftNodeConfig
// a chan to send/receive snapshot
msgSnapC chan raftpb.Message
// a chan to send out apply
applyc chan apply
// a chan to send out readState
readStateC chan raft.ReadState
// utility
ticker *time.Ticker
// contention detectors for raft heartbeat message
td *contention.TimeoutDetector
stopped chan struct{}
done chan struct{}
}
type raftNodeConfig struct {
// to check if msg receiver is removed from cluster
isIDRemoved func(id uint64) bool
raft.Node
2015-12-08 18:52:54 +03:00
raftStorage *raft.MemoryStorage
storage Storage
heartbeat time.Duration // for logging
// transport specifies the transport to send and receive msgs to members.
// Sending messages MUST NOT block. It is okay to drop messages, since
// clients should timeout and reissue their messages.
// If transport is nil, server will panic.
transport rafthttp.Transporter
}
func newRaftNode(cfg raftNodeConfig) *raftNode {
r := &raftNode{
tickMu: new(sync.Mutex),
raftNodeConfig: cfg,
// set up contention detectors for raft heartbeat message.
// expect to send a heartbeat within 2 heartbeat intervals.
td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
readStateC: make(chan raft.ReadState, 1),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
applyc: make(chan apply),
stopped: make(chan struct{}),
done: make(chan struct{}),
}
if r.heartbeat == 0 {
r.ticker = &time.Ticker{}
} else {
r.ticker = time.NewTicker(r.heartbeat)
}
return r
}
// raft.Node does not have locks in Raft package
func (r *raftNode) tick() {
r.tickMu.Lock()
r.Tick()
r.tickMu.Unlock()
}
// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
internalTimeout := time.Second
go func() {
defer r.onStop()
islead := false
for {
select {
case <-r.ticker.C:
r.tick()
case rd := <-r.Ready():
if rd.SoftState != nil {
newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
if newLeader {
2016-05-05 23:20:28 +03:00
leaderChanges.Inc()
}
2016-05-06 23:57:33 +03:00
if rd.SoftState.Lead == raft.None {
hasLeader.Set(0)
} else {
hasLeader.Set(1)
}
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
islead = rd.RaftState == raft.StateLeader
rh.updateLeadership(newLeader)
r.td.Reset()
}
if len(rd.ReadStates) != 0 {
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
case <-time.After(internalTimeout):
plog.Warningf("timed out sending read state")
case <-r.stopped:
return
}
}
notifyc := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
}
updateCommittedIndex(&ap, rh)
select {
case r.applyc <- ap:
case <-r.stopped:
return
}
// the leader can write to its disk in parallel with replicating to the followers and them
// writing to their disks.
// For more details, check raft thesis 10.2.1
if islead {
2016-06-21 23:03:26 +03:00
// gofail: var raftBeforeLeaderSend struct{}
r.transport.Send(r.processMessages(rd.Messages))
}
2016-06-21 23:03:26 +03:00
// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
plog.Fatalf("raft save state and entries error: %v", err)
}
if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
}
2016-06-21 23:03:26 +03:00
// gofail: var raftAfterSave struct{}
if !raft.IsEmptySnap(rd.Snapshot) {
2016-06-21 23:03:26 +03:00
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
plog.Fatalf("raft save snapshot error: %v", err)
}
// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}
2016-06-21 23:03:26 +03:00
// gofail: var raftAfterSaveSnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
2016-06-21 23:03:26 +03:00
// gofail: var raftAfterApplySnap struct{}
}
2016-05-07 01:06:41 +03:00
r.raftStorage.Append(rd.Entries)
if !islead {
// finish processing incoming messages before we signal raftdone chan
msgs := r.processMessages(rd.Messages)
// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
notifyc <- struct{}{}
// Candidate or follower needs to wait for all pending configuration
// changes to be applied before sending messages.
// Otherwise we might incorrectly count votes (e.g. votes from removed members).
// Also slow machine's follower raft-layer could proceed to become the leader
// on its own single-node cluster, before apply-layer applies the config change.
// We simply wait for ALL pending entries to be applied for now.
// We might improve this later on if it causes unnecessary long blocking issues.
waitApply := false
for _, ent := range rd.CommittedEntries {
if ent.Type == raftpb.EntryConfChange {
waitApply = true
break
}
}
if waitApply {
etcdserver: ensure waitForApply sync with applyAll Problem is: `Step1`: `etcdserver/raft.go`'s `Ready` process routine sends config-change entries via `r.applyc <- ap` (https://github.com/coreos/etcd/blob/master/etcdserver/raft.go#L193-L203) `Step2`: `etcdserver/server.go`'s `*EtcdServer.run` routine receives this via `ap := <-s.r.apply()` (https://github.com/coreos/etcd/blob/master/etcdserver/server.go#L735-L738) `StepA`: `Step1` proceeds without sync, right after sending `r.applyc <- ap`. `StepB`: `Step2` proceeds without sync, right after `sched.Schedule(s.applyAll(&ep,&ap))`. `StepC`: `etcdserver` tries to sync with `s.applyAll(&ep,&ap)` by calling `rh.waitForApply()`. `rh.waitForApply()` waits for all pending jobs to finish in `pkg/schedule` side. However, the order of `StepA`,`StepB`,`StepC` is not guaranteed. It is possible that `StepC` happens first, and proceeds without waiting on apply. And the restarting member comes back as a leader in single-node cluster, when there is no synchronization between apply-layer and config-change Raft entry apply. Confirmed with more debugging lines below, only reproducible with slow CPU VM (~2 vCPU). ``` ~:24.005397 I | etcdserver: starting server... [version: 3.2.0+git, cluster version: to_be_decided] ~:24.011136 I | etcdserver: [DEBUG] 29b2d24047a277df waitForApply before ~:24.011194 I | etcdserver: [DEBUG] 29b2d24047a277df starts wait for 0 pending jobs ~:24.011234 I | etcdserver: [DEBUG] 29b2d24047a277df finished wait for 0 pending jobs (current pending 0) ~:24.011268 I | etcdserver: [DEBUG] 29b2d24047a277df waitForApply after ~:24.011348 I | etcdserver: [DEBUG] [0] 29b2d24047a277df is scheduling conf change on 29b2d24047a277df ~:24.011396 I | etcdserver: [DEBUG] [1] 29b2d24047a277df is scheduling conf change on 5edf80e32a334cf0 ~:24.011437 I | etcdserver: [DEBUG] [2] 29b2d24047a277df is scheduling conf change on e32e31e76c8d2678 ~:24.011477 I | etcdserver: [DEBUG] 29b2d24047a277df scheduled conf change on 29b2d24047a277df ~:24.011509 I | etcdserver: [DEBUG] 29b2d24047a277df scheduled conf change on 5edf80e32a334cf0 ~:24.011545 I | etcdserver: [DEBUG] 29b2d24047a277df scheduled conf change on e32e31e76c8d2678 ~:24.012500 I | etcdserver: [DEBUG] 29b2d24047a277df applyConfChange on 29b2d24047a277df before ~:24.013014 I | etcdserver/membership: added member 29b2d24047a277df [unix://127.0.0.1:2100515039] to cluster 9250d4ae34216949 ~:24.013066 I | etcdserver: [DEBUG] 29b2d24047a277df applyConfChange on 29b2d24047a277df after ~:24.013113 I | etcdserver: [DEBUG] 29b2d24047a277df applyConfChange on 29b2d24047a277df after trigger ~:24.013158 I | etcdserver: [DEBUG] 29b2d24047a277df applyConfChange on 5edf80e32a334cf0 before ~:24.013666 W | etcdserver: failed to send out heartbeat on time (exceeded the 10ms timeout for 11.964739ms) ~:24.013709 W | etcdserver: server is likely overloaded ~:24.013750 W | etcdserver: failed to send out heartbeat on time (exceeded the 10ms timeout for 12.057265ms) ~:24.013775 W | etcdserver: server is likely overloaded ~:24.013950 I | raft: 29b2d24047a277df is starting a new election at term 4 ~:24.014012 I | raft: 29b2d24047a277df became candidate at term 5 ~:24.014051 I | raft: 29b2d24047a277df received MsgVoteResp from 29b2d24047a277df at term 5 ~:24.014107 I | raft: 29b2d24047a277df became leader at term 5 ~:24.014146 I | raft: raft.node: 29b2d24047a277df elected leader 29b2d24047a277df at term 5 ``` I am printing out the number of pending jobs before we call `sched.WaitFinish(0)`, and there was no pending jobs, so it returned immediately (before we schedule `applyAll`). This is the root cause to: - https://github.com/coreos/etcd/issues/7595 - https://github.com/coreos/etcd/issues/7739 - https://github.com/coreos/etcd/issues/7802 `sched.WaitFinish(0)` doesn't work when `len(f.pendings)==0` and `f.finished==0`. Config-change is the first job to apply, so `f.finished` is 0 in this case. `f.finished` monotonically increases, so we need `WaitFinish(finished+1)`. And `finished` must be the one before calling `Schedule`. This is safe because `Schedule(applyAll)` is the only place adding jobs to `sched`. Then scheduler waits on the single job of `applyAll`, by getting the current number of finished jobs before sending `Schedule`. Or just make it be blocked until `applyAll` routine triggers on the config-change job. This patch just removes `waitForApply`, and signal `raftDone` to wait until `applyAll` finishes applying entries. Confirmed that it fixes the issue, as below: ``` ~:43.198354 I | rafthttp: started streaming with peer 36cda5222aba364b (stream MsgApp v2 reader) ~:43.198740 I | etcdserver: [DEBUG] 3988bc20c2b2e40c waitForApply before ~:43.198836 I | etcdserver: [DEBUG] 3988bc20c2b2e40c starts wait for 0 pending jobs, 1 finished jobs ~:43.200696 I | integration: launched 3169361310155633349 () ~:43.201784 I | etcdserver: [DEBUG] [0] 3988bc20c2b2e40c is scheduling conf change on 36cda5222aba364b ~:43.201884 I | etcdserver: [DEBUG] [1] 3988bc20c2b2e40c is scheduling conf change on 3988bc20c2b2e40c ~:43.201965 I | etcdserver: [DEBUG] [2] 3988bc20c2b2e40c is scheduling conf change on cf5d6cbc2a121727 ~:43.202070 I | etcdserver: [DEBUG] 3988bc20c2b2e40c scheduled conf change on 36cda5222aba364b ~:43.202139 I | etcdserver: [DEBUG] 3988bc20c2b2e40c scheduled conf change on 3988bc20c2b2e40c ~:43.202204 I | etcdserver: [DEBUG] 3988bc20c2b2e40c scheduled conf change on cf5d6cbc2a121727 ~:43.202444 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 36cda5222aba364b (request ID: 0) before ~:43.204486 I | etcdserver/membership: added member 36cda5222aba364b [unix://127.0.0.1:2100913646] to cluster 425d73f1b7b01674 ~:43.204588 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 36cda5222aba364b (request ID: 0) after ~:43.204703 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 36cda5222aba364b (request ID: 0) after trigger ~:43.204791 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 3988bc20c2b2e40c (request ID: 0) before ~:43.205689 I | etcdserver/membership: added member 3988bc20c2b2e40c [unix://127.0.0.1:2101113646] to cluster 425d73f1b7b01674 ~:43.205783 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 3988bc20c2b2e40c (request ID: 0) after ~:43.205929 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 3988bc20c2b2e40c (request ID: 0) after trigger ~:43.206056 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on cf5d6cbc2a121727 (request ID: 0) before ~:43.207353 I | etcdserver/membership: added member cf5d6cbc2a121727 [unix://127.0.0.1:2100713646] to cluster 425d73f1b7b01674 ~:43.207516 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on cf5d6cbc2a121727 (request ID: 0) after ~:43.207619 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on cf5d6cbc2a121727 (request ID: 0) after trigger ~:43.207710 I | etcdserver: [DEBUG] 3988bc20c2b2e40c finished scheduled conf change on 36cda5222aba364b ~:43.207781 I | etcdserver: [DEBUG] 3988bc20c2b2e40c finished scheduled conf change on 3988bc20c2b2e40c ~:43.207843 I | etcdserver: [DEBUG] 3988bc20c2b2e40c finished scheduled conf change on cf5d6cbc2a121727 ~:43.207951 I | etcdserver: [DEBUG] 3988bc20c2b2e40c finished wait for 0 pending jobs (current pending 0, finished 1) ~:43.208029 I | rafthttp: started HTTP pipelining with peer cf5d6cbc2a121727 ~:43.210339 I | rafthttp: peer 3988bc20c2b2e40c became active ~:43.210435 I | rafthttp: established a TCP streaming connection with peer 3988bc20c2b2e40c (stream MsgApp v2 reader) ~:43.210861 I | rafthttp: started streaming with peer 3988bc20c2b2e40c (writer) ~:43.211732 I | etcdserver: [DEBUG] 3988bc20c2b2e40c waitForApply after ``` Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-25 16:19:52 +03:00
// blocks until 'applyAll' calls 'applyWait.Trigger'
// to be in sync with scheduled config-change job
// (assume notifyc has cap of 1)
select {
case notifyc <- struct{}{}:
case <-r.stopped:
return
}
}
// gofail: var raftBeforeFollowerSend struct{}
r.transport.Send(msgs)
} else {
// leader already processed 'MsgSnap' and signaled
notifyc <- struct{}{}
}
r.Advance()
case <-r.stopped:
return
}
}
}()
}
func updateCommittedIndex(ap *apply, rh *raftReadyHandler) {
var ci uint64
if len(ap.entries) != 0 {
ci = ap.entries[len(ap.entries)-1].Index
}
if ap.snapshot.Metadata.Index > ci {
ci = ap.snapshot.Metadata.Index
}
if ci != 0 {
rh.updateCommittedIndex(ci)
}
}
func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
sentAppResp := false
for i := len(ms) - 1; i >= 0; i-- {
if r.isIDRemoved(ms[i].To) {
ms[i].To = 0
}
if ms[i].Type == raftpb.MsgAppResp {
if sentAppResp {
ms[i].To = 0
} else {
sentAppResp = true
}
}
if ms[i].Type == raftpb.MsgSnap {
// There are two separate data store: the store for v2, and the KV for v3.
// The msgSnap only contains the most recent snapshot of store without KV.
// So we need to redirect the msgSnap to etcd server main loop for merging in the
// current store snapshot and KV snapshot.
select {
case r.msgSnapC <- ms[i]:
default:
// drop msgSnap if the inflight chan if full.
}
ms[i].To = 0
}
if ms[i].Type == raftpb.MsgHeartbeat {
ok, exceed := r.td.Observe(ms[i].To)
if !ok {
// TODO: limit request rate.
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
plog.Warningf("server is likely overloaded")
}
}
}
return ms
}
func (r *raftNode) apply() chan apply {
return r.applyc
}
func (r *raftNode) stop() {
r.stopped <- struct{}{}
<-r.done
}
func (r *raftNode) onStop() {
r.Stop()
r.ticker.Stop()
r.transport.Stop()
if err := r.storage.Close(); err != nil {
plog.Panicf("raft close storage error: %v", err)
}
close(r.done)
}
// for testing
func (r *raftNode) pauseSending() {
p := r.transport.(rafthttp.Pausable)
p.Pause()
}
func (r *raftNode) resumeSending() {
p := r.transport.(rafthttp.Pausable)
p.Resume()
}
// advanceTicks advances ticks of Raft node.
// This can be used for fast-forwarding election
// ticks in multi data-center deployments, thus
// speeding up election process.
func (r *raftNode) advanceTicks(ticks int) {
for i := 0; i < ticks; i++ {
r.tick()
}
}
func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
var err error
member := cl.MemberByName(cfg.Name)
metadata := pbutil.MustMarshal(
&pb.Metadata{
NodeID: uint64(member.ID),
ClusterID: uint64(cl.ID()),
},
)
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
plog.Fatalf("create wal error: %v", err)
}
peers := make([]raft.Peer, len(ids))
for i, id := range ids {
ctx, err := json.Marshal((*cl).Member(id))
if err != nil {
plog.Panicf("marshal member should never fail: %v", err)
}
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
}
id = member.ID
plog.Infof("starting member %s in cluster %s", id, cl.ID())
2015-12-08 18:52:54 +03:00
s = raft.NewMemoryStorage()
2015-03-24 07:19:03 +03:00
c := &raft.Config{
ID: uint64(id),
ElectionTick: cfg.ElectionTicks,
HeartbeatTick: 1,
Storage: s,
MaxSizePerMsg: maxSizePerMsg,
MaxInflightMsgs: maxInflightMsgs,
2016-03-24 02:39:15 +03:00
CheckQuorum: true,
PreVote: cfg.PreVote,
}
2015-03-24 07:19:03 +03:00
n = raft.StartNode(c, peers)
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
return id, n, s, w
}
func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := membership.NewCluster("")
cl.SetID(cid)
2015-12-08 18:52:54 +03:00
s := raft.NewMemoryStorage()
if snapshot != nil {
s.ApplySnapshot(*snapshot)
}
s.SetHardState(st)
s.Append(ents)
2015-03-24 07:19:03 +03:00
c := &raft.Config{
ID: uint64(id),
ElectionTick: cfg.ElectionTicks,
HeartbeatTick: 1,
Storage: s,
MaxSizePerMsg: maxSizePerMsg,
MaxInflightMsgs: maxInflightMsgs,
2016-03-24 02:39:15 +03:00
CheckQuorum: true,
PreVote: cfg.PreVote,
}
2015-03-24 07:19:03 +03:00
n := raft.RestartNode(c)
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
return id, cl, n, s, w
}
func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
2015-01-06 10:27:03 +03:00
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
// discard the previously uncommitted entries
for i, ent := range ents {
if ent.Index > st.Commit {
plog.Infof("discarding %d uncommitted WAL entries ", len(ents)-i)
ents = ents[:i]
break
}
}
// force append the configuration change entries
toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), uint64(id), st.Term, st.Commit)
ents = append(ents, toAppEnts...)
// force commit newly appended entries
err := w.Save(raftpb.HardState{}, toAppEnts)
if err != nil {
plog.Fatalf("%v", err)
}
if len(ents) != 0 {
st.Commit = ents[len(ents)-1].Index
}
plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
cl := membership.NewCluster("")
cl.SetID(cid)
2015-12-08 18:52:54 +03:00
s := raft.NewMemoryStorage()
if snapshot != nil {
s.ApplySnapshot(*snapshot)
}
s.SetHardState(st)
s.Append(ents)
2015-03-24 07:19:03 +03:00
c := &raft.Config{
ID: uint64(id),
ElectionTick: cfg.ElectionTicks,
HeartbeatTick: 1,
Storage: s,
MaxSizePerMsg: maxSizePerMsg,
MaxInflightMsgs: maxInflightMsgs,
CheckQuorum: true,
PreVote: cfg.PreVote,
2015-03-24 07:19:03 +03:00
}
n := raft.RestartNode(c)
raftStatus = n.Status
return id, cl, n, s, w
}
// getIDs returns an ordered set of IDs included in the given snapshot and
// the entries. The given snapshot/entries can contain two kinds of
// ID-related entry:
// - ConfChangeAddNode, in which case the contained ID will be added into the set.
2016-02-01 08:42:39 +03:00
// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
ids := make(map[uint64]bool)
if snap != nil {
for _, id := range snap.Metadata.ConfState.Nodes {
ids[id] = true
}
}
for _, e := range ents {
if e.Type != raftpb.EntryConfChange {
continue
}
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
switch cc.Type {
case raftpb.ConfChangeAddNode:
ids[cc.NodeID] = true
case raftpb.ConfChangeRemoveNode:
delete(ids, cc.NodeID)
case raftpb.ConfChangeUpdateNode:
// do nothing
default:
plog.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
}
}
2016-06-15 23:03:10 +03:00
sids := make(types.Uint64Slice, 0, len(ids))
for id := range ids {
sids = append(sids, id)
}
sort.Sort(sids)
return []uint64(sids)
}
// createConfigChangeEnts creates a series of Raft entries (i.e.
// EntryConfChange) to remove the set of given IDs from the cluster. The ID
// `self` is _not_ removed, even if present in the set.
2014-11-07 23:53:32 +03:00
// If `self` is not inside the given ids, it creates a Raft entry to add a
// default member with the given `self`.
func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
ents := make([]raftpb.Entry, 0)
next := index + 1
2014-11-07 23:53:32 +03:00
found := false
for _, id := range ids {
if id == self {
2014-11-07 23:53:32 +03:00
found = true
continue
}
cc := &raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
}
e := raftpb.Entry{
Type: raftpb.EntryConfChange,
Data: pbutil.MustMarshal(cc),
Term: term,
Index: next,
}
ents = append(ents, e)
next++
}
2014-11-07 23:53:32 +03:00
if !found {
m := membership.Member{
2014-11-07 23:53:32 +03:00
ID: types.ID(self),
RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}},
2014-11-07 23:53:32 +03:00
}
ctx, err := json.Marshal(m)
if err != nil {
plog.Panicf("marshal member should never fail: %v", err)
2014-11-07 23:53:32 +03:00
}
cc := &raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: self,
Context: ctx,
}
e := raftpb.Entry{
Type: raftpb.EntryConfChange,
Data: pbutil.MustMarshal(cc),
Term: term,
Index: next,
}
ents = append(ents, e)
}
return ents
}