From e8f46ce341768c6cf929f4defbeefb5bcbaec79e Mon Sep 17 00:00:00 2001 From: johncming Date: Tue, 8 Jan 2019 09:56:39 +0800 Subject: [PATCH] etcdserver: add a test to verify not to send duplicated append responses --- etcdserver/raft_test.go | 41 +++++++++++++++++++++++++++++++++++++++ etcdserver/server_test.go | 21 ++++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index 0ed7ad00b..9d731c688 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -227,3 +227,44 @@ func TestConfgChangeBlocksApply(t *testing.T) { t.Fatalf("unexpected blocking on execution") } } + +func TestProcessDuplicatedAppRespMessage(t *testing.T) { + n := newNopReadyNode() + cl := membership.NewCluster(zap.NewExample(), "abc") + + rs := raft.NewMemoryStorage() + p := mockstorage.NewStorageRecorder("") + tr, sendc := newSendMsgAppRespTransporter() + r := newRaftNode(raftNodeConfig{ + lg: zap.NewExample(), + isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, + Node: n, + transport: tr, + storage: p, + raftStorage: rs, + }) + + s := &EtcdServer{ + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + r: *r, + cluster: cl, + SyncTicker: &time.Ticker{}, + } + + s.start() + defer s.Stop() + + lead := uint64(1) + + n.readyc <- raft.Ready{Messages: []raftpb.Message{ + {Type: raftpb.MsgAppResp, From: 2, To: lead, Term: 1, Index: 1}, + {Type: raftpb.MsgAppResp, From: 2, To: lead, Term: 1, Index: 2}, + {Type: raftpb.MsgAppResp, From: 2, To: lead, Term: 1, Index: 3}, + }} + + got, want := <-sendc, 1 + if got != want { + t.Errorf("count = %d, want %d", got, want) + } +} diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 3daad4896..46a5363f8 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1768,3 +1768,24 @@ func (s *snapTransporter) SendSnapshot(m snap.Message) { m.CloseWithError(nil) s.snapDoneC <- m } + +type sendMsgAppRespTransporter struct { + nopTransporter + sendC chan int +} + +func newSendMsgAppRespTransporter() (rafthttp.Transporter, <-chan int) { + ch := make(chan int, 1) + tr := &sendMsgAppRespTransporter{sendC: ch} + return tr, ch +} + +func (s *sendMsgAppRespTransporter) Send(m []raftpb.Message) { + var send int + for _, msg := range m { + if msg.To != 0 { + send++ + } + } + s.sendC <- send +}