etcd/etcdserver/raft_test.go

271 lines
7.3 KiB
Go
Raw Normal View History

2016-05-13 06:49:40 +03:00
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import (
2014-11-07 23:53:32 +03:00
"encoding/json"
"reflect"
"sync"
"testing"
"time"
"go.etcd.io/etcd/etcdserver/api/membership"
"go.etcd.io/etcd/pkg/mock/mockstorage"
"go.etcd.io/etcd/pkg/pbutil"
"go.etcd.io/etcd/pkg/types"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.uber.org/zap"
)
func TestGetIDs(t *testing.T) {
addcc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2}
addEntry := raftpb.Entry{Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(addcc)}
removecc := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2}
removeEntry := raftpb.Entry{Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc)}
normalEntry := raftpb.Entry{Type: raftpb.EntryNormal}
updatecc := &raftpb.ConfChange{Type: raftpb.ConfChangeUpdateNode, NodeID: 2}
updateEntry := raftpb.Entry{Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(updatecc)}
tests := []struct {
confState *raftpb.ConfState
ents []raftpb.Entry
widSet []uint64
}{
{nil, []raftpb.Entry{}, []uint64{}},
{&raftpb.ConfState{Nodes: []uint64{1}},
[]raftpb.Entry{}, []uint64{1}},
{&raftpb.ConfState{Nodes: []uint64{1}},
[]raftpb.Entry{addEntry}, []uint64{1, 2}},
{&raftpb.ConfState{Nodes: []uint64{1}},
[]raftpb.Entry{addEntry, removeEntry}, []uint64{1}},
{&raftpb.ConfState{Nodes: []uint64{1}},
[]raftpb.Entry{addEntry, normalEntry}, []uint64{1, 2}},
{&raftpb.ConfState{Nodes: []uint64{1}},
[]raftpb.Entry{addEntry, normalEntry, updateEntry}, []uint64{1, 2}},
{&raftpb.ConfState{Nodes: []uint64{1}},
[]raftpb.Entry{addEntry, removeEntry, normalEntry}, []uint64{1}},
}
for i, tt := range tests {
var snap raftpb.Snapshot
if tt.confState != nil {
snap.Metadata.ConfState = *tt.confState
}
idSet := getIDs(testLogger, &snap, tt.ents)
if !reflect.DeepEqual(idSet, tt.widSet) {
t.Errorf("#%d: idset = %#v, want %#v", i, idSet, tt.widSet)
}
}
}
func TestCreateConfigChangeEnts(t *testing.T) {
m := membership.Member{
2014-11-07 23:53:32 +03:00
ID: types.ID(1),
RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}},
2014-11-07 23:53:32 +03:00
}
ctx, err := json.Marshal(m)
if err != nil {
t.Fatal(err)
}
addcc1 := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1, Context: ctx}
removecc2 := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2}
removecc3 := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 3}
tests := []struct {
ids []uint64
self uint64
term, index uint64
wents []raftpb.Entry
}{
{
[]uint64{1},
1,
1, 1,
[]raftpb.Entry{},
},
{
[]uint64{1, 2},
1,
1, 1,
[]raftpb.Entry{{Term: 1, Index: 2, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)}},
},
{
[]uint64{1, 2},
1,
2, 2,
[]raftpb.Entry{{Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)}},
},
{
[]uint64{1, 2, 3},
1,
2, 2,
[]raftpb.Entry{
{Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)},
{Term: 2, Index: 4, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc3)},
},
},
{
[]uint64{2, 3},
2,
2, 2,
[]raftpb.Entry{
{Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc3)},
},
},
2014-11-07 23:53:32 +03:00
{
[]uint64{2, 3},
1,
2, 2,
[]raftpb.Entry{
{Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)},
{Term: 2, Index: 4, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc3)},
{Term: 2, Index: 5, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(addcc1)},
},
},
}
for i, tt := range tests {
gents := createConfigChangeEnts(testLogger, tt.ids, tt.self, tt.term, tt.index)
if !reflect.DeepEqual(gents, tt.wents) {
t.Errorf("#%d: ents = %v, want %v", i, gents, tt.wents)
}
}
}
func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
n := newNopReadyNode()
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: n,
storage: mockstorage.NewStorageRecorder(""),
2015-12-08 18:52:54 +03:00
raftStorage: raft.NewMemoryStorage(),
transport: newNopTransporter(),
})
srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zap.NewExample(), r: *r}
srv.r.start(nil)
n.readyc <- raft.Ready{}
select {
2016-04-02 02:01:47 +03:00
case <-srv.r.applyc:
case <-time.After(time.Second):
t.Fatalf("failed to receive apply struct")
}
2016-04-02 02:01:47 +03:00
srv.r.stopped <- struct{}{}
select {
2016-04-02 02:01:47 +03:00
case <-srv.r.done:
case <-time.After(time.Second):
t.Fatalf("failed to stop raft loop")
}
}
// TestConfgChangeBlocksApply ensures apply blocks if committed entries contain config-change.
func TestConfgChangeBlocksApply(t *testing.T) {
n := newNopReadyNode()
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: n,
storage: mockstorage.NewStorageRecorder(""),
raftStorage: raft.NewMemoryStorage(),
transport: newNopTransporter(),
})
srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zap.NewExample(), r: *r}
srv.r.start(&raftReadyHandler{
getLead: func() uint64 { return 0 },
updateLead: func(uint64) {},
updateLeadership: func(bool) {},
})
defer srv.r.Stop()
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateFollower},
CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}},
}
etcdserver: ensure waitForApply sync with applyAll Problem is: `Step1`: `etcdserver/raft.go`'s `Ready` process routine sends config-change entries via `r.applyc <- ap` (https://github.com/coreos/etcd/blob/master/etcdserver/raft.go#L193-L203) `Step2`: `etcdserver/server.go`'s `*EtcdServer.run` routine receives this via `ap := <-s.r.apply()` (https://github.com/coreos/etcd/blob/master/etcdserver/server.go#L735-L738) `StepA`: `Step1` proceeds without sync, right after sending `r.applyc <- ap`. `StepB`: `Step2` proceeds without sync, right after `sched.Schedule(s.applyAll(&ep,&ap))`. `StepC`: `etcdserver` tries to sync with `s.applyAll(&ep,&ap)` by calling `rh.waitForApply()`. `rh.waitForApply()` waits for all pending jobs to finish in `pkg/schedule` side. However, the order of `StepA`,`StepB`,`StepC` is not guaranteed. It is possible that `StepC` happens first, and proceeds without waiting on apply. And the restarting member comes back as a leader in single-node cluster, when there is no synchronization between apply-layer and config-change Raft entry apply. Confirmed with more debugging lines below, only reproducible with slow CPU VM (~2 vCPU). ``` ~:24.005397 I | etcdserver: starting server... [version: 3.2.0+git, cluster version: to_be_decided] ~:24.011136 I | etcdserver: [DEBUG] 29b2d24047a277df waitForApply before ~:24.011194 I | etcdserver: [DEBUG] 29b2d24047a277df starts wait for 0 pending jobs ~:24.011234 I | etcdserver: [DEBUG] 29b2d24047a277df finished wait for 0 pending jobs (current pending 0) ~:24.011268 I | etcdserver: [DEBUG] 29b2d24047a277df waitForApply after ~:24.011348 I | etcdserver: [DEBUG] [0] 29b2d24047a277df is scheduling conf change on 29b2d24047a277df ~:24.011396 I | etcdserver: [DEBUG] [1] 29b2d24047a277df is scheduling conf change on 5edf80e32a334cf0 ~:24.011437 I | etcdserver: [DEBUG] [2] 29b2d24047a277df is scheduling conf change on e32e31e76c8d2678 ~:24.011477 I | etcdserver: [DEBUG] 29b2d24047a277df scheduled conf change on 29b2d24047a277df ~:24.011509 I | etcdserver: [DEBUG] 29b2d24047a277df scheduled conf change on 5edf80e32a334cf0 ~:24.011545 I | etcdserver: [DEBUG] 29b2d24047a277df scheduled conf change on e32e31e76c8d2678 ~:24.012500 I | etcdserver: [DEBUG] 29b2d24047a277df applyConfChange on 29b2d24047a277df before ~:24.013014 I | etcdserver/membership: added member 29b2d24047a277df [unix://127.0.0.1:2100515039] to cluster 9250d4ae34216949 ~:24.013066 I | etcdserver: [DEBUG] 29b2d24047a277df applyConfChange on 29b2d24047a277df after ~:24.013113 I | etcdserver: [DEBUG] 29b2d24047a277df applyConfChange on 29b2d24047a277df after trigger ~:24.013158 I | etcdserver: [DEBUG] 29b2d24047a277df applyConfChange on 5edf80e32a334cf0 before ~:24.013666 W | etcdserver: failed to send out heartbeat on time (exceeded the 10ms timeout for 11.964739ms) ~:24.013709 W | etcdserver: server is likely overloaded ~:24.013750 W | etcdserver: failed to send out heartbeat on time (exceeded the 10ms timeout for 12.057265ms) ~:24.013775 W | etcdserver: server is likely overloaded ~:24.013950 I | raft: 29b2d24047a277df is starting a new election at term 4 ~:24.014012 I | raft: 29b2d24047a277df became candidate at term 5 ~:24.014051 I | raft: 29b2d24047a277df received MsgVoteResp from 29b2d24047a277df at term 5 ~:24.014107 I | raft: 29b2d24047a277df became leader at term 5 ~:24.014146 I | raft: raft.node: 29b2d24047a277df elected leader 29b2d24047a277df at term 5 ``` I am printing out the number of pending jobs before we call `sched.WaitFinish(0)`, and there was no pending jobs, so it returned immediately (before we schedule `applyAll`). This is the root cause to: - https://github.com/coreos/etcd/issues/7595 - https://github.com/coreos/etcd/issues/7739 - https://github.com/coreos/etcd/issues/7802 `sched.WaitFinish(0)` doesn't work when `len(f.pendings)==0` and `f.finished==0`. Config-change is the first job to apply, so `f.finished` is 0 in this case. `f.finished` monotonically increases, so we need `WaitFinish(finished+1)`. And `finished` must be the one before calling `Schedule`. This is safe because `Schedule(applyAll)` is the only place adding jobs to `sched`. Then scheduler waits on the single job of `applyAll`, by getting the current number of finished jobs before sending `Schedule`. Or just make it be blocked until `applyAll` routine triggers on the config-change job. This patch just removes `waitForApply`, and signal `raftDone` to wait until `applyAll` finishes applying entries. Confirmed that it fixes the issue, as below: ``` ~:43.198354 I | rafthttp: started streaming with peer 36cda5222aba364b (stream MsgApp v2 reader) ~:43.198740 I | etcdserver: [DEBUG] 3988bc20c2b2e40c waitForApply before ~:43.198836 I | etcdserver: [DEBUG] 3988bc20c2b2e40c starts wait for 0 pending jobs, 1 finished jobs ~:43.200696 I | integration: launched 3169361310155633349 () ~:43.201784 I | etcdserver: [DEBUG] [0] 3988bc20c2b2e40c is scheduling conf change on 36cda5222aba364b ~:43.201884 I | etcdserver: [DEBUG] [1] 3988bc20c2b2e40c is scheduling conf change on 3988bc20c2b2e40c ~:43.201965 I | etcdserver: [DEBUG] [2] 3988bc20c2b2e40c is scheduling conf change on cf5d6cbc2a121727 ~:43.202070 I | etcdserver: [DEBUG] 3988bc20c2b2e40c scheduled conf change on 36cda5222aba364b ~:43.202139 I | etcdserver: [DEBUG] 3988bc20c2b2e40c scheduled conf change on 3988bc20c2b2e40c ~:43.202204 I | etcdserver: [DEBUG] 3988bc20c2b2e40c scheduled conf change on cf5d6cbc2a121727 ~:43.202444 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 36cda5222aba364b (request ID: 0) before ~:43.204486 I | etcdserver/membership: added member 36cda5222aba364b [unix://127.0.0.1:2100913646] to cluster 425d73f1b7b01674 ~:43.204588 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 36cda5222aba364b (request ID: 0) after ~:43.204703 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 36cda5222aba364b (request ID: 0) after trigger ~:43.204791 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 3988bc20c2b2e40c (request ID: 0) before ~:43.205689 I | etcdserver/membership: added member 3988bc20c2b2e40c [unix://127.0.0.1:2101113646] to cluster 425d73f1b7b01674 ~:43.205783 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 3988bc20c2b2e40c (request ID: 0) after ~:43.205929 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on 3988bc20c2b2e40c (request ID: 0) after trigger ~:43.206056 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on cf5d6cbc2a121727 (request ID: 0) before ~:43.207353 I | etcdserver/membership: added member cf5d6cbc2a121727 [unix://127.0.0.1:2100713646] to cluster 425d73f1b7b01674 ~:43.207516 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on cf5d6cbc2a121727 (request ID: 0) after ~:43.207619 I | etcdserver: [DEBUG] 3988bc20c2b2e40c applyConfChange on cf5d6cbc2a121727 (request ID: 0) after trigger ~:43.207710 I | etcdserver: [DEBUG] 3988bc20c2b2e40c finished scheduled conf change on 36cda5222aba364b ~:43.207781 I | etcdserver: [DEBUG] 3988bc20c2b2e40c finished scheduled conf change on 3988bc20c2b2e40c ~:43.207843 I | etcdserver: [DEBUG] 3988bc20c2b2e40c finished scheduled conf change on cf5d6cbc2a121727 ~:43.207951 I | etcdserver: [DEBUG] 3988bc20c2b2e40c finished wait for 0 pending jobs (current pending 0, finished 1) ~:43.208029 I | rafthttp: started HTTP pipelining with peer cf5d6cbc2a121727 ~:43.210339 I | rafthttp: peer 3988bc20c2b2e40c became active ~:43.210435 I | rafthttp: established a TCP streaming connection with peer 3988bc20c2b2e40c (stream MsgApp v2 reader) ~:43.210861 I | rafthttp: started streaming with peer 3988bc20c2b2e40c (writer) ~:43.211732 I | etcdserver: [DEBUG] 3988bc20c2b2e40c waitForApply after ``` Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-25 16:19:52 +03:00
ap := <-srv.r.applyc
continueC := make(chan struct{})
go func() {
n.readyc <- raft.Ready{}
<-srv.r.applyc
close(continueC)
}()
select {
case <-continueC:
t.Fatalf("unexpected execution: raft routine should block waiting for apply")
case <-time.After(time.Second):
}
// finish apply, unblock raft routine
<-ap.notifyc
select {
case <-continueC:
case <-time.After(time.Second):
t.Fatalf("unexpected blocking on execution")
}
}
func TestProcessDuplicatedAppRespMessage(t *testing.T) {
n := newNopReadyNode()
cl := membership.NewCluster(zap.NewExample(), "abc")
rs := raft.NewMemoryStorage()
p := mockstorage.NewStorageRecorder("")
tr, sendc := newSendMsgAppRespTransporter()
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
transport: tr,
storage: p,
raftStorage: rs,
})
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
cluster: cl,
SyncTicker: &time.Ticker{},
}
s.start()
defer s.Stop()
lead := uint64(1)
n.readyc <- raft.Ready{Messages: []raftpb.Message{
{Type: raftpb.MsgAppResp, From: 2, To: lead, Term: 1, Index: 1},
{Type: raftpb.MsgAppResp, From: 2, To: lead, Term: 1, Index: 2},
{Type: raftpb.MsgAppResp, From: 2, To: lead, Term: 1, Index: 3},
}}
got, want := <-sendc, 1
if got != want {
t.Errorf("count = %d, want %d", got, want)
}
}