From c26de6626285f033b1fbb91787e7f689eeff28de Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 15 Nov 2014 21:28:03 -0800 Subject: [PATCH] integration: add integration test for remove member --- etcdserver/server.go | 8 +++++ integration/cluster_test.go | 67 +++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/etcdserver/server.go b/etcdserver/server.go index 48c8c480f..1b688a741 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -186,6 +186,8 @@ type EtcdServer struct { // Cache of the latest raft index and raft term the server has seen raftIndex uint64 raftTerm uint64 + + raftLead uint64 } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -342,6 +344,7 @@ func (s *EtcdServer) run() { s.node.Tick() case rd := <-s.node.Ready(): if rd.SoftState != nil { + atomic.StoreUint64(&s.raftLead, rd.SoftState.Lead) nodes = rd.SoftState.Nodes if rd.RaftState == raft.StateLeader { syncC = s.SyncTicker @@ -532,6 +535,11 @@ func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.raftIndex) } func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.raftTerm) } +// Only for testing purpose +// TODO: add Raft server interface to expose raft related info: +// Index, Term, Lead, Committed, Applied, LastIndex, etc. +func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.raftLead) } + // configure sends a configuration change through consensus and // then waits for it to be applied to the server. It // will block until the change is performed or there is an error. diff --git a/integration/cluster_test.go b/integration/cluster_test.go index f304005e2..e86be63db 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -99,6 +99,27 @@ func testDoubleClusterSize(t *testing.T, size int) { clusterMustProgress(t, c) } +func TestDecreaseClusterSizeOf3(t *testing.T) { testDecreaseClusterSize(t, 3) } +func TestDecreaseClusterSizeOf5(t *testing.T) { + t.Skip("enable after reducing the election collision rate") + // election collision rate is too high when enabling --race + testDecreaseClusterSize(t, 5) +} + +func testDecreaseClusterSize(t *testing.T, size int) { + defer afterTest(t) + c := NewCluster(t, size) + c.Launch(t) + defer c.Terminate(t) + + for i := 0; i < size-1; i++ { + id := c.Members[len(c.Members)-1].s.ID() + c.RemoveMember(t, uint64(id)) + c.waitLeader(t) + } + clusterMustProgress(t, c) +} + // clusterMustProgress ensures that cluster can make progress. It creates // a key first, and check the new key could be got from all client urls of // the cluster. @@ -251,6 +272,32 @@ func (c *cluster) AddMember(t *testing.T) { c.waitMembersMatch(t, c.HTTPMembers()) } +func (c *cluster) RemoveMember(t *testing.T, id uint64) { + // send remove request to the cluster + cc := mustNewHTTPClient(t, []string{c.URL(0)}) + ma := client.NewMembersAPI(cc) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + if err := ma.Remove(ctx, types.ID(id).String()); err != nil { + t.Fatalf("unexpected remove error %v", err) + } + cancel() + newMembers := make([]*member, 0) + for _, m := range c.Members { + if uint64(m.s.ID()) != id { + newMembers = append(newMembers, m) + } else { + select { + case <-m.s.StopNotify(): + m.Terminate(t) + case <-time.After(time.Second): + t.Fatalf("failed to remove member %s in one second", m.s.ID()) + } + } + } + c.Members = newMembers + c.waitMembersMatch(t, c.HTTPMembers()) +} + func (c *cluster) Terminate(t *testing.T) { for _, m := range c.Members { m.Terminate(t) @@ -274,6 +321,26 @@ func (c *cluster) waitMembersMatch(t *testing.T, membs []httptypes.Member) { return } +func (c *cluster) waitLeader(t *testing.T) { + possibleLead := make(map[uint64]bool) + var lead uint64 + for _, m := range c.Members { + possibleLead[uint64(m.s.ID())] = true + } + + for lead == 0 || !possibleLead[lead] { + lead = 0 + for _, m := range c.Members { + if lead != 0 && lead != m.s.Lead() { + lead = 0 + break + } + lead = m.s.Lead() + } + time.Sleep(10 * tickDuration) + } +} + func (c *cluster) name(i int) string { return fmt.Sprint("node", i) }