2016-05-13 06:49:40 +03:00
|
|
|
// Copyright 2015 The etcd Authors
|
2015-01-25 06:19:16 +03:00
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
2014-11-07 08:50:35 +03:00
|
|
|
|
|
|
|
package etcdserver
|
|
|
|
|
|
|
|
import (
|
2014-11-07 23:53:32 +03:00
|
|
|
"encoding/json"
|
2015-01-29 01:07:07 +03:00
|
|
|
"expvar"
|
2014-11-07 21:57:42 +03:00
|
|
|
"sort"
|
2015-08-12 23:38:43 +03:00
|
|
|
"sync"
|
2015-03-06 00:03:04 +03:00
|
|
|
"sync/atomic"
|
2015-01-16 01:39:30 +03:00
|
|
|
"time"
|
2014-11-07 08:50:35 +03:00
|
|
|
|
2015-01-16 01:39:30 +03:00
|
|
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
2016-04-07 22:02:37 +03:00
|
|
|
"github.com/coreos/etcd/etcdserver/membership"
|
2016-10-27 02:26:00 +03:00
|
|
|
"github.com/coreos/etcd/pkg/contention"
|
2014-11-07 08:50:35 +03:00
|
|
|
"github.com/coreos/etcd/pkg/pbutil"
|
|
|
|
"github.com/coreos/etcd/pkg/types"
|
|
|
|
"github.com/coreos/etcd/raft"
|
|
|
|
"github.com/coreos/etcd/raft/raftpb"
|
2015-01-16 01:39:30 +03:00
|
|
|
"github.com/coreos/etcd/rafthttp"
|
2014-11-07 08:50:35 +03:00
|
|
|
"github.com/coreos/etcd/wal"
|
2015-01-06 10:27:03 +03:00
|
|
|
"github.com/coreos/etcd/wal/walpb"
|
2016-03-23 03:10:28 +03:00
|
|
|
"github.com/coreos/pkg/capnslog"
|
2014-11-07 08:50:35 +03:00
|
|
|
)
|
|
|
|
|
2015-03-01 07:33:28 +03:00
|
|
|
const (
|
|
|
|
// Number of entries for slow follower to catch-up after compacting
|
|
|
|
// the raft storage entries.
|
|
|
|
// We expect the follower has a millisecond level latency with the leader.
|
|
|
|
// The max throughput is around 10K. Keep a 5K entries is enough for helping
|
|
|
|
// follower to catch up.
|
|
|
|
numberOfCatchUpEntries = 5000
|
2015-03-24 07:19:03 +03:00
|
|
|
|
|
|
|
// The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
|
|
|
|
// Assuming the RTT is around 10ms, 1MB max size is large enough.
|
|
|
|
maxSizePerMsg = 1 * 1024 * 1024
|
|
|
|
// Never overflow the rafthttp buffer, which is 4096.
|
|
|
|
// TODO: a better const?
|
|
|
|
maxInflightMsgs = 4096 / 8
|
2015-03-01 07:33:28 +03:00
|
|
|
)
|
|
|
|
|
2015-01-29 01:07:07 +03:00
|
|
|
var (
|
2015-10-27 06:26:43 +03:00
|
|
|
// protects raftStatus
|
|
|
|
raftStatusMu sync.Mutex
|
2015-01-29 01:07:07 +03:00
|
|
|
// indirection for expvar func interface
|
|
|
|
// expvar panics when publishing duplicate name
|
|
|
|
// expvar does not support remove a registered name
|
|
|
|
// so only register a func that calls raftStatus
|
|
|
|
// and change raftStatus as we need.
|
|
|
|
raftStatus func() raft.Status
|
|
|
|
)
|
|
|
|
|
|
|
|
func init() {
|
2015-06-02 01:01:31 +03:00
|
|
|
raft.SetLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "raft"))
|
2015-10-27 06:26:43 +03:00
|
|
|
expvar.Publish("raft.status", expvar.Func(func() interface{} {
|
|
|
|
raftStatusMu.Lock()
|
|
|
|
defer raftStatusMu.Unlock()
|
|
|
|
return raftStatus()
|
|
|
|
}))
|
2015-01-29 01:07:07 +03:00
|
|
|
}
|
|
|
|
|
2015-01-16 01:39:30 +03:00
|
|
|
type RaftTimer interface {
|
|
|
|
Index() uint64
|
2018-01-19 21:28:43 +03:00
|
|
|
AppliedIndex() uint64
|
2015-01-16 01:39:30 +03:00
|
|
|
Term() uint64
|
|
|
|
}
|
|
|
|
|
2015-12-22 14:40:39 +03:00
|
|
|
// apply contains entries, snapshot to be applied. Once
|
|
|
|
// an apply is consumed, the entries will be persisted to
|
|
|
|
// to raft storage concurrently; the application must read
|
|
|
|
// raftDone before assuming the raft messages are stable.
|
2015-03-06 00:03:04 +03:00
|
|
|
type apply struct {
|
|
|
|
entries []raftpb.Entry
|
|
|
|
snapshot raftpb.Snapshot
|
2017-05-05 01:57:25 +03:00
|
|
|
// notifyc synchronizes etcd server applies with the raft node
|
|
|
|
notifyc chan struct{}
|
2015-03-06 00:03:04 +03:00
|
|
|
}
|
|
|
|
|
2015-01-16 01:39:30 +03:00
|
|
|
type raftNode struct {
|
2015-08-09 03:40:12 +03:00
|
|
|
// Cache of the latest raft index and raft term the server has seen.
|
|
|
|
// These three unit64 fields must be the first elements to keep 64-bit
|
|
|
|
// alignment for atomic access to the fields.
|
2018-01-19 21:28:43 +03:00
|
|
|
index uint64
|
|
|
|
appliedindex uint64
|
|
|
|
term uint64
|
|
|
|
lead uint64
|
2015-08-09 03:40:12 +03:00
|
|
|
|
2017-04-18 02:32:36 +03:00
|
|
|
raftNodeConfig
|
2015-01-16 01:39:30 +03:00
|
|
|
|
2016-10-27 02:26:00 +03:00
|
|
|
// a chan to send/receive snapshot
|
|
|
|
msgSnapC chan raftpb.Message
|
|
|
|
|
2015-03-06 00:03:04 +03:00
|
|
|
// a chan to send out apply
|
|
|
|
applyc chan apply
|
|
|
|
|
2016-09-13 06:16:53 +03:00
|
|
|
// a chan to send out readState
|
|
|
|
readStateC chan raft.ReadState
|
|
|
|
|
2015-01-16 01:39:30 +03:00
|
|
|
// utility
|
2017-03-07 01:08:46 +03:00
|
|
|
ticker *time.Ticker
|
2016-10-27 02:26:00 +03:00
|
|
|
// contention detectors for raft heartbeat message
|
2017-04-18 02:32:36 +03:00
|
|
|
td *contention.TimeoutDetector
|
|
|
|
|
|
|
|
stopped chan struct{}
|
|
|
|
done chan struct{}
|
|
|
|
}
|
|
|
|
|
|
|
|
type raftNodeConfig struct {
|
|
|
|
// to check if msg receiver is removed from cluster
|
|
|
|
isIDRemoved func(id uint64) bool
|
|
|
|
raft.Node
|
2015-12-08 18:52:54 +03:00
|
|
|
raftStorage *raft.MemoryStorage
|
2015-01-16 01:39:30 +03:00
|
|
|
storage Storage
|
2017-04-18 02:32:36 +03:00
|
|
|
heartbeat time.Duration // for logging
|
2015-01-16 01:39:30 +03:00
|
|
|
// transport specifies the transport to send and receive msgs to members.
|
|
|
|
// Sending messages MUST NOT block. It is okay to drop messages, since
|
|
|
|
// clients should timeout and reissue their messages.
|
|
|
|
// If transport is nil, server will panic.
|
|
|
|
transport rafthttp.Transporter
|
2017-04-18 02:32:36 +03:00
|
|
|
}
|
2015-01-16 01:39:30 +03:00
|
|
|
|
2017-04-18 02:32:36 +03:00
|
|
|
func newRaftNode(cfg raftNodeConfig) *raftNode {
|
|
|
|
r := &raftNode{
|
|
|
|
raftNodeConfig: cfg,
|
|
|
|
// set up contention detectors for raft heartbeat message.
|
|
|
|
// expect to send a heartbeat within 2 heartbeat intervals.
|
|
|
|
td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
|
|
|
|
readStateC: make(chan raft.ReadState, 1),
|
|
|
|
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
|
|
|
|
applyc: make(chan apply),
|
|
|
|
stopped: make(chan struct{}),
|
|
|
|
done: make(chan struct{}),
|
|
|
|
}
|
|
|
|
if r.heartbeat == 0 {
|
|
|
|
r.ticker = &time.Ticker{}
|
|
|
|
} else {
|
|
|
|
r.ticker = time.NewTicker(r.heartbeat)
|
|
|
|
}
|
|
|
|
return r
|
2015-01-16 01:39:30 +03:00
|
|
|
}
|
|
|
|
|
2015-07-11 02:31:22 +03:00
|
|
|
// start prepares and starts raftNode in a new goroutine. It is no longer safe
|
|
|
|
// to modify the fields after it has been started.
|
2016-10-07 23:18:39 +03:00
|
|
|
func (r *raftNode) start(rh *raftReadyHandler) {
|
2016-12-06 02:34:46 +03:00
|
|
|
internalTimeout := time.Second
|
2015-07-11 02:31:22 +03:00
|
|
|
|
|
|
|
go func() {
|
|
|
|
defer r.onStop()
|
2016-03-13 09:51:13 +03:00
|
|
|
islead := false
|
2015-12-22 14:40:39 +03:00
|
|
|
|
2015-07-11 02:31:22 +03:00
|
|
|
for {
|
|
|
|
select {
|
2017-03-07 01:08:46 +03:00
|
|
|
case <-r.ticker.C:
|
2015-07-11 02:31:22 +03:00
|
|
|
r.Tick()
|
|
|
|
case rd := <-r.Ready():
|
|
|
|
if rd.SoftState != nil {
|
2017-04-18 02:32:36 +03:00
|
|
|
newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
|
|
|
|
if newLeader {
|
2016-05-05 23:20:28 +03:00
|
|
|
leaderChanges.Inc()
|
2015-08-12 23:38:43 +03:00
|
|
|
}
|
2016-05-06 23:57:33 +03:00
|
|
|
|
|
|
|
if rd.SoftState.Lead == raft.None {
|
|
|
|
hasLeader.Set(0)
|
|
|
|
} else {
|
|
|
|
hasLeader.Set(1)
|
|
|
|
}
|
|
|
|
|
2015-07-11 02:31:22 +03:00
|
|
|
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
|
2016-10-07 23:18:39 +03:00
|
|
|
islead = rd.RaftState == raft.StateLeader
|
2017-04-18 02:32:36 +03:00
|
|
|
rh.updateLeadership(newLeader)
|
|
|
|
r.td.Reset()
|
2015-03-06 00:03:04 +03:00
|
|
|
}
|
|
|
|
|
2016-09-13 06:16:53 +03:00
|
|
|
if len(rd.ReadStates) != 0 {
|
|
|
|
select {
|
|
|
|
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
|
2016-12-06 02:34:46 +03:00
|
|
|
case <-time.After(internalTimeout):
|
|
|
|
plog.Warningf("timed out sending read state")
|
2016-09-13 06:16:53 +03:00
|
|
|
case <-r.stopped:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-05-05 01:57:25 +03:00
|
|
|
notifyc := make(chan struct{}, 1)
|
2015-12-12 15:25:27 +03:00
|
|
|
ap := apply{
|
2015-07-11 02:31:22 +03:00
|
|
|
entries: rd.CommittedEntries,
|
|
|
|
snapshot: rd.Snapshot,
|
2017-05-05 01:57:25 +03:00
|
|
|
notifyc: notifyc,
|
2015-07-11 02:31:22 +03:00
|
|
|
}
|
2015-03-06 00:03:04 +03:00
|
|
|
|
2016-12-22 23:03:49 +03:00
|
|
|
updateCommittedIndex(&ap, rh)
|
|
|
|
|
2015-07-11 02:31:22 +03:00
|
|
|
select {
|
2015-12-12 15:25:27 +03:00
|
|
|
case r.applyc <- ap:
|
2015-07-11 02:31:22 +03:00
|
|
|
case <-r.stopped:
|
|
|
|
return
|
|
|
|
}
|
2015-03-06 00:03:04 +03:00
|
|
|
|
2016-03-13 09:51:13 +03:00
|
|
|
// the leader can write to its disk in parallel with replicating to the followers and them
|
|
|
|
// writing to their disks.
|
|
|
|
// For more details, check raft thesis 10.2.1
|
|
|
|
if islead {
|
2016-06-21 23:03:26 +03:00
|
|
|
// gofail: var raftBeforeLeaderSend struct{}
|
2017-04-11 18:03:59 +03:00
|
|
|
r.transport.Send(r.processMessages(rd.Messages))
|
2016-03-13 09:51:13 +03:00
|
|
|
}
|
|
|
|
|
2016-06-21 23:03:26 +03:00
|
|
|
// gofail: var raftBeforeSave struct{}
|
2016-06-16 08:00:33 +03:00
|
|
|
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
|
|
|
|
plog.Fatalf("raft save state and entries error: %v", err)
|
|
|
|
}
|
|
|
|
if !raft.IsEmptyHardState(rd.HardState) {
|
|
|
|
proposalsCommitted.Set(float64(rd.HardState.Commit))
|
|
|
|
}
|
2016-06-21 23:03:26 +03:00
|
|
|
// gofail: var raftAfterSave struct{}
|
2016-06-16 08:00:33 +03:00
|
|
|
|
2015-07-11 02:31:22 +03:00
|
|
|
if !raft.IsEmptySnap(rd.Snapshot) {
|
2016-06-21 23:03:26 +03:00
|
|
|
// gofail: var raftBeforeSaveSnap struct{}
|
2015-07-11 02:31:22 +03:00
|
|
|
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
|
|
|
|
plog.Fatalf("raft save snapshot error: %v", err)
|
|
|
|
}
|
2017-05-05 01:57:25 +03:00
|
|
|
// etcdserver now claim the snapshot has been persisted onto the disk
|
|
|
|
notifyc <- struct{}{}
|
|
|
|
|
2016-06-21 23:03:26 +03:00
|
|
|
// gofail: var raftAfterSaveSnap struct{}
|
2015-07-11 02:31:22 +03:00
|
|
|
r.raftStorage.ApplySnapshot(rd.Snapshot)
|
|
|
|
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
|
2016-06-21 23:03:26 +03:00
|
|
|
// gofail: var raftAfterApplySnap struct{}
|
2015-03-06 00:03:04 +03:00
|
|
|
}
|
2016-05-07 01:06:41 +03:00
|
|
|
|
2015-07-11 02:31:22 +03:00
|
|
|
r.raftStorage.Append(rd.Entries)
|
2015-03-06 00:03:04 +03:00
|
|
|
|
2016-03-13 09:51:13 +03:00
|
|
|
if !islead {
|
2017-04-11 18:03:59 +03:00
|
|
|
// finish processing incoming messages before we signal raftdone chan
|
|
|
|
msgs := r.processMessages(rd.Messages)
|
2017-03-14 21:25:02 +03:00
|
|
|
|
2017-04-11 18:03:59 +03:00
|
|
|
// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
|
2017-05-05 01:57:25 +03:00
|
|
|
notifyc <- struct{}{}
|
2017-03-14 21:25:02 +03:00
|
|
|
|
2017-04-11 18:03:59 +03:00
|
|
|
// Candidate or follower needs to wait for all pending configuration
|
|
|
|
// changes to be applied before sending messages.
|
|
|
|
// Otherwise we might incorrectly count votes (e.g. votes from removed members).
|
|
|
|
// Also slow machine's follower raft-layer could proceed to become the leader
|
|
|
|
// on its own single-node cluster, before apply-layer applies the config change.
|
2017-03-14 21:25:02 +03:00
|
|
|
// We simply wait for ALL pending entries to be applied for now.
|
|
|
|
// We might improve this later on if it causes unnecessary long blocking issues.
|
2017-04-11 18:03:59 +03:00
|
|
|
waitApply := false
|
|
|
|
for _, ent := range rd.CommittedEntries {
|
|
|
|
if ent.Type == raftpb.EntryConfChange {
|
|
|
|
waitApply = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if waitApply {
|
etcdserver: ensure waitForApply sync with applyAll
Problem is:
`Step1`: `etcdserver/raft.go`'s `Ready` process routine sends config-change entries via `r.applyc <- ap` (https://github.com/coreos/etcd/blob/master/etcdserver/raft.go#L193-L203)
`Step2`: `etcdserver/server.go`'s `*EtcdServer.run` routine receives this via `ap := <-s.r.apply()` (https://github.com/coreos/etcd/blob/master/etcdserver/server.go#L735-L738)
`StepA`: `Step1` proceeds without sync, right after sending `r.applyc <- ap`.
`StepB`: `Step2` proceeds without sync, right after `sched.Schedule(s.applyAll(&ep,&ap))`.
`StepC`: `etcdserver` tries to sync with `s.applyAll(&ep,&ap)` by calling `rh.waitForApply()`.
`rh.waitForApply()` waits for all pending jobs to finish in `pkg/schedule`
side. However, the order of `StepA`,`StepB`,`StepC` is not guaranteed. It
is possible that `StepC` happens first, and proceeds without waiting on
apply. And the restarting member comes back as a leader in single-node
cluster, when there is no synchronization between apply-layer and
config-change Raft entry apply. Confirmed with more debugging lines below,
only reproducible with slow CPU VM (~2 vCPU).
```
~:24.005397 I | etcdserver: starting server... [version: 3.2.0+git, cluster version: to_be_decided]
~:24.011136 I | etcdserver: [DEBUG] 29b2d24047a277df waitForApply before
~:24.011194 I | etcdserver: [DEBUG] 29b2d24047a277df starts wait for 0 pending jobs
~:24.011234 I | etcdserver: [DEBUG] 29b2d24047a277df finished wait for 0 pending jobs (current pending 0)
~:24.011268 I | etcdserver: [DEBUG] 29b2d24047a277df waitForApply after
~:24.011348 I | etcdserver: [DEBUG] [0] 29b2d24047a277df is scheduling conf change on 29b2d24047a277df
~:24.011396 I | etcdserver: [DEBUG] [1] 29b2d24047a277df is scheduling conf change on 5edf80e32a334cf0
~:24.011437 I | etcdserver: [DEBUG] [2] 29b2d24047a277df is scheduling conf change on e32e31e76c8d2678
~:24.011477 I | etcdserver: [DEBUG] 29b2d24047a277df scheduled conf change on 29b2d24047a277df
~:24.011509 I | etcdserver: [DEBUG] 29b2d24047a277df scheduled conf change on 5edf80e32a334cf0
~:24.011545 I | etcdserver: [DEBUG] 29b2d24047a277df scheduled conf change on e32e31e76c8d2678
~:24.012500 I | etcdserver: [DEBUG] 29b2d24047a277df applyConfChange on 29b2d24047a277df before
~:24.013014 I | etcdserver/membership: added member 29b2d24047a277df [unix://127.0.0.1:2100515039] to cluster 9250d4ae34216949
~:24.013066 I | etcdserver: [DEBUG] 29b2d24047a277df applyConfChange on 29b2d24047a277df after
~:24.013113 I | etcdserver: [DEBUG] 29b2d24047a277df applyConfChange on 29b2d24047a277df after trigger
~:24.013158 I | etcdserver: [DEBUG] 29b2d24047a277df applyConfChange on 5edf80e32a334cf0 before
~:24.013666 W | etcdserver: failed to send out heartbeat on time (exceeded the 10ms timeout for 11.964739ms)
~:24.013709 W | etcdserver: server is likely overloaded
~:24.013750 W | etcdserver: failed to send out heartbeat on time (exceeded the 10ms timeout for 12.057265ms)
~:24.013775 W | etcdserver: server is likely overloaded
~:24.013950 I | raft: 29b2d24047a277df is starting a new election at term 4
~:24.014012 I | raft: 29b2d24047a277df became candidate at term 5
~:24.014051 I | raft: 29b2d24047a277df received MsgVoteResp from 29b2d24047a277df at term 5
~:24.014107 I | raft: 29b2d24047a277df became leader at term 5
~:24.014146 I | raft: raft.node: 29b2d24047a277df elected leader 29b2d24047a277df at term 5
```
I am printing out the number of pending jobs before we call
`sched.WaitFinish(0)`, and there was no pending jobs, so it returned
immediately (before we schedule `applyAll`).
This is the root cause to:
- https://github.com/coreos/etcd/issues/7595
- https://github.com/coreos/etcd/issues/7739
- https://github.com/coreos/etcd/issues/7802
`sched.WaitFinish(0)` doesn't work when `len(f.pendings)==0` and
`f.finished==0`. Config-change is the first job to apply, so
`f.finished` is 0 in this case.
`f.finished` monotonically increases, so we need `WaitFinish(finished+1)`.
And `finished` must be the one before calling `Schedule`. This is safe
because `Schedule(applyAll)` is the only place adding jobs to `sched`.
Then scheduler waits on the single job of `applyAll`, by getting the
current number of finished jobs before sending `Schedule`.
Or just make it be blocked until `applyAll` routine triggers on the
config-change job. This patch just removes `waitForApply`, and
signal `raftDone` to wait until `applyAll` finishes applying entries.
Confirmed that it fixes the issue, as below:
```
~:43.198354 I | rafthttp: started streaming with peer 36cda5222aba364b (stream MsgApp v2 reader)
~:43.198740 I | etcdserver: [DEBUG] 3988bc20c2b2e40c waitForApply before
~:43.198836 I | etcdserver: [DEBUG] 3988bc20c2b2e40c starts wait for 0 pending jobs, 1 finished jobs
~:43.200696 I | integration: launched 3169361310155633349 ()
~:43.201784 I | etcdserver: [DEBUG] [0] 3988bc20c2b2e40c is scheduling conf change on 36cda5222aba364b
~:43.201884 I | etcdserver: [DEBUG] [1] 3988bc20c2b2e40c is scheduling conf change on 3988bc20c2b2e40c
~:43.201965 I | etcdserver: [DEBUG] [2] 3988bc20c2b2e40c is scheduling conf change on cf5d6cbc2a121727
~:43.202070 I | etcdserver: [DEBUG] 3988bc20c2b2e40c scheduled conf change on 36cda5222aba364b
~:43.202139 I | etcdserver: [DEBUG] 3988bc20c2b2e40c scheduled conf change on 3988bc20c2b2e40c
~:43.202204 I | etcdserver: [DEBUG] 3988bc20c2b2e40c scheduled conf change on cf5d6cbc2a121727
~:43.202444 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 36cda5222aba364b (request ID: 0) before
~:43.204486 I | etcdserver/membership: added member 36cda5222aba364b [unix://127.0.0.1:2100913646] to cluster 425d73f1b7b01674
~:43.204588 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 36cda5222aba364b (request ID: 0) after
~:43.204703 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 36cda5222aba364b (request ID: 0) after trigger
~:43.204791 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 3988bc20c2b2e40c (request ID: 0) before
~:43.205689 I | etcdserver/membership: added member 3988bc20c2b2e40c [unix://127.0.0.1:2101113646] to cluster 425d73f1b7b01674
~:43.205783 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 3988bc20c2b2e40c (request ID: 0) after
~:43.205929 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 3988bc20c2b2e40c (request ID: 0) after trigger
~:43.206056 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on cf5d6cbc2a121727 (request ID: 0) before
~:43.207353 I | etcdserver/membership: added member cf5d6cbc2a121727 [unix://127.0.0.1:2100713646] to cluster 425d73f1b7b01674
~:43.207516 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on cf5d6cbc2a121727 (request ID: 0) after
~:43.207619 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on cf5d6cbc2a121727 (request ID: 0) after trigger
~:43.207710 I | etcdserver: [DEBUG] 3988bc20c2b2e40c finished scheduled conf change on 36cda5222aba364b
~:43.207781 I | etcdserver: [DEBUG] 3988bc20c2b2e40c finished scheduled conf change on 3988bc20c2b2e40c
~:43.207843 I | etcdserver: [DEBUG] 3988bc20c2b2e40c finished scheduled conf change on cf5d6cbc2a121727
~:43.207951 I | etcdserver: [DEBUG] 3988bc20c2b2e40c finished wait for 0 pending jobs (current pending 0, finished 1)
~:43.208029 I | rafthttp: started HTTP pipelining with peer cf5d6cbc2a121727
~:43.210339 I | rafthttp: peer 3988bc20c2b2e40c became active
~:43.210435 I | rafthttp: established a TCP streaming connection with peer 3988bc20c2b2e40c (stream MsgApp v2 reader)
~:43.210861 I | rafthttp: started streaming with peer 3988bc20c2b2e40c (writer)
~:43.211732 I | etcdserver: [DEBUG] 3988bc20c2b2e40c waitForApply after
```
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-25 16:19:52 +03:00
|
|
|
// blocks until 'applyAll' calls 'applyWait.Trigger'
|
|
|
|
// to be in sync with scheduled config-change job
|
2017-05-05 01:57:25 +03:00
|
|
|
// (assume notifyc has cap of 1)
|
2017-04-25 23:35:43 +03:00
|
|
|
select {
|
2017-05-05 01:57:25 +03:00
|
|
|
case notifyc <- struct{}{}:
|
2017-04-25 23:35:43 +03:00
|
|
|
case <-r.stopped:
|
|
|
|
return
|
|
|
|
}
|
2017-04-11 18:03:59 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
// gofail: var raftBeforeFollowerSend struct{}
|
|
|
|
r.transport.Send(msgs)
|
|
|
|
} else {
|
|
|
|
// leader already processed 'MsgSnap' and signaled
|
2017-05-05 01:57:25 +03:00
|
|
|
notifyc <- struct{}{}
|
2017-03-14 21:25:02 +03:00
|
|
|
}
|
2017-04-11 18:03:59 +03:00
|
|
|
|
|
|
|
r.Advance()
|
2015-06-09 08:48:10 +03:00
|
|
|
case <-r.stopped:
|
|
|
|
return
|
|
|
|
}
|
2015-03-06 00:03:04 +03:00
|
|
|
}
|
2015-07-11 02:31:22 +03:00
|
|
|
}()
|
2015-03-06 00:03:04 +03:00
|
|
|
}
|
|
|
|
|
2016-12-22 23:03:49 +03:00
|
|
|
func updateCommittedIndex(ap *apply, rh *raftReadyHandler) {
|
|
|
|
var ci uint64
|
|
|
|
if len(ap.entries) != 0 {
|
|
|
|
ci = ap.entries[len(ap.entries)-1].Index
|
|
|
|
}
|
|
|
|
if ap.snapshot.Metadata.Index > ci {
|
|
|
|
ci = ap.snapshot.Metadata.Index
|
|
|
|
}
|
|
|
|
if ci != 0 {
|
|
|
|
rh.updateCommittedIndex(ci)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:03:59 +03:00
|
|
|
func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
|
2016-10-27 02:26:00 +03:00
|
|
|
sentAppResp := false
|
|
|
|
for i := len(ms) - 1; i >= 0; i-- {
|
|
|
|
if r.isIDRemoved(ms[i].To) {
|
|
|
|
ms[i].To = 0
|
|
|
|
}
|
|
|
|
|
|
|
|
if ms[i].Type == raftpb.MsgAppResp {
|
|
|
|
if sentAppResp {
|
|
|
|
ms[i].To = 0
|
|
|
|
} else {
|
|
|
|
sentAppResp = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if ms[i].Type == raftpb.MsgSnap {
|
|
|
|
// There are two separate data store: the store for v2, and the KV for v3.
|
|
|
|
// The msgSnap only contains the most recent snapshot of store without KV.
|
|
|
|
// So we need to redirect the msgSnap to etcd server main loop for merging in the
|
|
|
|
// current store snapshot and KV snapshot.
|
|
|
|
select {
|
|
|
|
case r.msgSnapC <- ms[i]:
|
|
|
|
default:
|
|
|
|
// drop msgSnap if the inflight chan if full.
|
|
|
|
}
|
|
|
|
ms[i].To = 0
|
|
|
|
}
|
|
|
|
if ms[i].Type == raftpb.MsgHeartbeat {
|
|
|
|
ok, exceed := r.td.Observe(ms[i].To)
|
|
|
|
if !ok {
|
|
|
|
// TODO: limit request rate.
|
|
|
|
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
|
|
|
|
plog.Warningf("server is likely overloaded")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2017-04-11 18:03:59 +03:00
|
|
|
return ms
|
2016-10-27 02:26:00 +03:00
|
|
|
}
|
|
|
|
|
2015-03-06 00:03:04 +03:00
|
|
|
func (r *raftNode) apply() chan apply {
|
|
|
|
return r.applyc
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *raftNode) stop() {
|
2015-07-11 02:31:22 +03:00
|
|
|
r.stopped <- struct{}{}
|
|
|
|
<-r.done
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *raftNode) onStop() {
|
2015-03-06 00:03:04 +03:00
|
|
|
r.Stop()
|
2017-03-07 01:08:46 +03:00
|
|
|
r.ticker.Stop()
|
2015-03-06 00:03:04 +03:00
|
|
|
r.transport.Stop()
|
|
|
|
if err := r.storage.Close(); err != nil {
|
2015-06-08 23:28:34 +03:00
|
|
|
plog.Panicf("raft close storage error: %v", err)
|
2015-03-06 00:03:04 +03:00
|
|
|
}
|
2015-04-01 21:01:22 +03:00
|
|
|
close(r.done)
|
2015-03-06 00:03:04 +03:00
|
|
|
}
|
|
|
|
|
2015-01-16 01:39:30 +03:00
|
|
|
// for testing
|
|
|
|
func (r *raftNode) pauseSending() {
|
|
|
|
p := r.transport.(rafthttp.Pausable)
|
|
|
|
p.Pause()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *raftNode) resumeSending() {
|
|
|
|
p := r.transport.(rafthttp.Pausable)
|
|
|
|
p.Resume()
|
|
|
|
}
|
|
|
|
|
2015-07-25 02:48:33 +03:00
|
|
|
// advanceTicksForElection advances ticks to the node for fast election.
|
|
|
|
// This reduces the time to wait for first leader election if bootstrapping the whole
|
|
|
|
// cluster, while leaving at least 1 heartbeat for possible existing leader
|
|
|
|
// to contact it.
|
|
|
|
func advanceTicksForElection(n raft.Node, electionTicks int) {
|
|
|
|
for i := 0; i < electionTicks-1; i++ {
|
|
|
|
n.Tick()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-05-10 23:55:06 +03:00
|
|
|
func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
|
2015-01-16 01:39:30 +03:00
|
|
|
var err error
|
2015-04-27 05:49:01 +03:00
|
|
|
member := cl.MemberByName(cfg.Name)
|
2015-01-16 01:39:30 +03:00
|
|
|
metadata := pbutil.MustMarshal(
|
|
|
|
&pb.Metadata{
|
|
|
|
NodeID: uint64(member.ID),
|
2015-04-27 05:49:01 +03:00
|
|
|
ClusterID: uint64(cl.ID()),
|
2015-01-16 01:39:30 +03:00
|
|
|
},
|
|
|
|
)
|
|
|
|
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
|
2015-06-08 23:28:34 +03:00
|
|
|
plog.Fatalf("create wal error: %v", err)
|
2015-01-16 01:39:30 +03:00
|
|
|
}
|
|
|
|
peers := make([]raft.Peer, len(ids))
|
|
|
|
for i, id := range ids {
|
2015-04-27 05:49:01 +03:00
|
|
|
ctx, err := json.Marshal((*cl).Member(id))
|
2015-01-16 01:39:30 +03:00
|
|
|
if err != nil {
|
2015-06-08 23:28:34 +03:00
|
|
|
plog.Panicf("marshal member should never fail: %v", err)
|
2015-01-16 01:39:30 +03:00
|
|
|
}
|
|
|
|
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
|
|
|
|
}
|
|
|
|
id = member.ID
|
2015-06-08 23:28:34 +03:00
|
|
|
plog.Infof("starting member %s in cluster %s", id, cl.ID())
|
2015-12-08 18:52:54 +03:00
|
|
|
s = raft.NewMemoryStorage()
|
2015-03-24 07:19:03 +03:00
|
|
|
c := &raft.Config{
|
|
|
|
ID: uint64(id),
|
|
|
|
ElectionTick: cfg.ElectionTicks,
|
|
|
|
HeartbeatTick: 1,
|
|
|
|
Storage: s,
|
|
|
|
MaxSizePerMsg: maxSizePerMsg,
|
|
|
|
MaxInflightMsgs: maxInflightMsgs,
|
2016-03-24 02:39:15 +03:00
|
|
|
CheckQuorum: true,
|
2016-03-16 09:23:26 +03:00
|
|
|
}
|
|
|
|
|
2015-03-24 07:19:03 +03:00
|
|
|
n = raft.StartNode(c, peers)
|
2015-10-27 06:26:43 +03:00
|
|
|
raftStatusMu.Lock()
|
2015-01-29 01:07:07 +03:00
|
|
|
raftStatus = n.Status
|
2015-10-27 06:26:43 +03:00
|
|
|
raftStatusMu.Unlock()
|
2015-07-25 02:48:33 +03:00
|
|
|
advanceTicksForElection(n, c.ElectionTick)
|
2017-11-11 05:28:57 +03:00
|
|
|
return id, n, s, w
|
2015-01-16 01:39:30 +03:00
|
|
|
}
|
|
|
|
|
2017-05-10 23:55:06 +03:00
|
|
|
func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
2015-01-16 01:39:30 +03:00
|
|
|
var walsnap walpb.Snapshot
|
|
|
|
if snapshot != nil {
|
|
|
|
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
|
|
|
}
|
|
|
|
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
|
|
|
|
|
2015-06-08 23:28:34 +03:00
|
|
|
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
|
2016-04-07 22:02:37 +03:00
|
|
|
cl := membership.NewCluster("")
|
2015-04-27 05:49:01 +03:00
|
|
|
cl.SetID(cid)
|
2015-12-08 18:52:54 +03:00
|
|
|
s := raft.NewMemoryStorage()
|
2015-01-16 01:39:30 +03:00
|
|
|
if snapshot != nil {
|
|
|
|
s.ApplySnapshot(*snapshot)
|
|
|
|
}
|
|
|
|
s.SetHardState(st)
|
|
|
|
s.Append(ents)
|
2015-03-24 07:19:03 +03:00
|
|
|
c := &raft.Config{
|
|
|
|
ID: uint64(id),
|
|
|
|
ElectionTick: cfg.ElectionTicks,
|
|
|
|
HeartbeatTick: 1,
|
|
|
|
Storage: s,
|
|
|
|
MaxSizePerMsg: maxSizePerMsg,
|
|
|
|
MaxInflightMsgs: maxInflightMsgs,
|
2016-03-24 02:39:15 +03:00
|
|
|
CheckQuorum: true,
|
2016-03-16 09:23:26 +03:00
|
|
|
}
|
|
|
|
|
2015-03-24 07:19:03 +03:00
|
|
|
n := raft.RestartNode(c)
|
2015-10-27 06:26:43 +03:00
|
|
|
raftStatusMu.Lock()
|
2015-01-29 01:07:07 +03:00
|
|
|
raftStatus = n.Status
|
2015-10-27 06:26:43 +03:00
|
|
|
raftStatusMu.Unlock()
|
2015-07-25 02:48:33 +03:00
|
|
|
advanceTicksForElection(n, c.ElectionTick)
|
2015-04-27 05:49:01 +03:00
|
|
|
return id, cl, n, s, w
|
2015-01-16 01:39:30 +03:00
|
|
|
}
|
|
|
|
|
2017-05-10 23:55:06 +03:00
|
|
|
func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
2015-01-06 10:27:03 +03:00
|
|
|
var walsnap walpb.Snapshot
|
|
|
|
if snapshot != nil {
|
|
|
|
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
|
|
|
}
|
|
|
|
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
|
2014-11-07 08:50:35 +03:00
|
|
|
|
|
|
|
// discard the previously uncommitted entries
|
2014-11-25 18:18:14 +03:00
|
|
|
for i, ent := range ents {
|
|
|
|
if ent.Index > st.Commit {
|
2015-06-08 23:28:34 +03:00
|
|
|
plog.Infof("discarding %d uncommitted WAL entries ", len(ents)-i)
|
2014-11-25 18:18:14 +03:00
|
|
|
ents = ents[:i]
|
|
|
|
break
|
|
|
|
}
|
2014-11-07 08:50:35 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
// force append the configuration change entries
|
2014-11-07 21:57:42 +03:00
|
|
|
toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), uint64(id), st.Term, st.Commit)
|
2014-11-07 08:50:35 +03:00
|
|
|
ents = append(ents, toAppEnts...)
|
|
|
|
|
|
|
|
// force commit newly appended entries
|
2015-01-09 23:01:22 +03:00
|
|
|
err := w.Save(raftpb.HardState{}, toAppEnts)
|
|
|
|
if err != nil {
|
2015-06-08 23:28:34 +03:00
|
|
|
plog.Fatalf("%v", err)
|
2014-11-07 08:50:35 +03:00
|
|
|
}
|
|
|
|
if len(ents) != 0 {
|
|
|
|
st.Commit = ents[len(ents)-1].Index
|
|
|
|
}
|
|
|
|
|
2015-06-08 23:28:34 +03:00
|
|
|
plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
|
2016-04-07 22:02:37 +03:00
|
|
|
cl := membership.NewCluster("")
|
2015-04-27 05:49:01 +03:00
|
|
|
cl.SetID(cid)
|
2015-12-08 18:52:54 +03:00
|
|
|
s := raft.NewMemoryStorage()
|
2014-11-20 00:17:50 +03:00
|
|
|
if snapshot != nil {
|
|
|
|
s.ApplySnapshot(*snapshot)
|
|
|
|
}
|
|
|
|
s.SetHardState(st)
|
2014-11-13 02:31:19 +03:00
|
|
|
s.Append(ents)
|
2015-03-24 07:19:03 +03:00
|
|
|
c := &raft.Config{
|
|
|
|
ID: uint64(id),
|
|
|
|
ElectionTick: cfg.ElectionTicks,
|
|
|
|
HeartbeatTick: 1,
|
|
|
|
Storage: s,
|
|
|
|
MaxSizePerMsg: maxSizePerMsg,
|
|
|
|
MaxInflightMsgs: maxInflightMsgs,
|
|
|
|
}
|
|
|
|
n := raft.RestartNode(c)
|
2015-01-29 01:07:07 +03:00
|
|
|
raftStatus = n.Status
|
2015-04-27 05:49:01 +03:00
|
|
|
return id, cl, n, s, w
|
2014-11-07 08:50:35 +03:00
|
|
|
}
|
|
|
|
|
2014-11-07 21:57:42 +03:00
|
|
|
// getIDs returns an ordered set of IDs included in the given snapshot and
|
|
|
|
// the entries. The given snapshot/entries can contain two kinds of
|
|
|
|
// ID-related entry:
|
|
|
|
// - ConfChangeAddNode, in which case the contained ID will be added into the set.
|
2016-02-01 08:42:39 +03:00
|
|
|
// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
|
2014-11-07 21:57:42 +03:00
|
|
|
func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
|
2014-11-07 08:50:35 +03:00
|
|
|
ids := make(map[uint64]bool)
|
|
|
|
if snap != nil {
|
2014-11-20 00:17:50 +03:00
|
|
|
for _, id := range snap.Metadata.ConfState.Nodes {
|
2014-11-07 08:50:35 +03:00
|
|
|
ids[id] = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for _, e := range ents {
|
|
|
|
if e.Type != raftpb.EntryConfChange {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
var cc raftpb.ConfChange
|
|
|
|
pbutil.MustUnmarshal(&cc, e.Data)
|
|
|
|
switch cc.Type {
|
|
|
|
case raftpb.ConfChangeAddNode:
|
|
|
|
ids[cc.NodeID] = true
|
|
|
|
case raftpb.ConfChangeRemoveNode:
|
|
|
|
delete(ids, cc.NodeID)
|
2015-08-31 04:33:18 +03:00
|
|
|
case raftpb.ConfChangeUpdateNode:
|
|
|
|
// do nothing
|
2014-11-07 08:50:35 +03:00
|
|
|
default:
|
2015-06-08 23:28:34 +03:00
|
|
|
plog.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
|
2014-11-07 08:50:35 +03:00
|
|
|
}
|
|
|
|
}
|
2016-06-15 23:03:10 +03:00
|
|
|
sids := make(types.Uint64Slice, 0, len(ids))
|
2014-11-07 21:57:42 +03:00
|
|
|
for id := range ids {
|
|
|
|
sids = append(sids, id)
|
|
|
|
}
|
|
|
|
sort.Sort(sids)
|
|
|
|
return []uint64(sids)
|
2014-11-07 08:50:35 +03:00
|
|
|
}
|
|
|
|
|
2014-11-07 21:57:42 +03:00
|
|
|
// createConfigChangeEnts creates a series of Raft entries (i.e.
|
|
|
|
// EntryConfChange) to remove the set of given IDs from the cluster. The ID
|
|
|
|
// `self` is _not_ removed, even if present in the set.
|
2014-11-07 23:53:32 +03:00
|
|
|
// If `self` is not inside the given ids, it creates a Raft entry to add a
|
|
|
|
// default member with the given `self`.
|
2014-11-07 21:57:42 +03:00
|
|
|
func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
|
2014-11-07 08:50:35 +03:00
|
|
|
ents := make([]raftpb.Entry, 0)
|
|
|
|
next := index + 1
|
2014-11-07 23:53:32 +03:00
|
|
|
found := false
|
2014-11-07 21:57:42 +03:00
|
|
|
for _, id := range ids {
|
2014-11-07 08:50:35 +03:00
|
|
|
if id == self {
|
2014-11-07 23:53:32 +03:00
|
|
|
found = true
|
2014-11-07 08:50:35 +03:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
cc := &raftpb.ConfChange{
|
|
|
|
Type: raftpb.ConfChangeRemoveNode,
|
|
|
|
NodeID: id,
|
|
|
|
}
|
|
|
|
e := raftpb.Entry{
|
|
|
|
Type: raftpb.EntryConfChange,
|
|
|
|
Data: pbutil.MustMarshal(cc),
|
|
|
|
Term: term,
|
|
|
|
Index: next,
|
|
|
|
}
|
|
|
|
ents = append(ents, e)
|
|
|
|
next++
|
|
|
|
}
|
2014-11-07 23:53:32 +03:00
|
|
|
if !found {
|
2016-04-07 22:02:37 +03:00
|
|
|
m := membership.Member{
|
2014-11-07 23:53:32 +03:00
|
|
|
ID: types.ID(self),
|
2016-05-11 21:15:50 +03:00
|
|
|
RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}},
|
2014-11-07 23:53:32 +03:00
|
|
|
}
|
|
|
|
ctx, err := json.Marshal(m)
|
|
|
|
if err != nil {
|
2015-06-08 23:28:34 +03:00
|
|
|
plog.Panicf("marshal member should never fail: %v", err)
|
2014-11-07 23:53:32 +03:00
|
|
|
}
|
|
|
|
cc := &raftpb.ConfChange{
|
|
|
|
Type: raftpb.ConfChangeAddNode,
|
|
|
|
NodeID: self,
|
|
|
|
Context: ctx,
|
|
|
|
}
|
|
|
|
e := raftpb.Entry{
|
|
|
|
Type: raftpb.EntryConfChange,
|
|
|
|
Data: pbutil.MustMarshal(cc),
|
|
|
|
Term: term,
|
|
|
|
Index: next,
|
|
|
|
}
|
|
|
|
ents = append(ents, e)
|
|
|
|
}
|
2014-11-07 08:50:35 +03:00
|
|
|
return ents
|
|
|
|
}
|