From 448e0fc4819a7aed1e0d541184c435b02b9f4b3d Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 11 Apr 2018 17:13:18 -0700 Subject: [PATCH] functional: wait election timeout after member add Signed-off-by: Gyuho Lee --- functional/rpcpb/member.go | 5 +++ .../tester/case_sigquit_remove_quorum.go | 32 ++++++++++++++++--- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/functional/rpcpb/member.go b/functional/rpcpb/member.go index e1f6ab69d..2016783b1 100644 --- a/functional/rpcpb/member.go +++ b/functional/rpcpb/member.go @@ -33,6 +33,11 @@ import ( "google.golang.org/grpc/credentials" ) +// ElectionTimeout returns an election timeout duration. +func (m *Member) ElectionTimeout() time.Duration { + return time.Duration(m.Etcd.ElectionTimeoutMs) * time.Millisecond +} + // DialEtcdGRPCServer creates a raw gRPC connection to an etcd member. func (m *Member) DialEtcdGRPCServer(opts ...grpc.DialOption) (*grpc.ClientConn, error) { dialOpts := []grpc.DialOption{ diff --git a/functional/tester/case_sigquit_remove_quorum.go b/functional/tester/case_sigquit_remove_quorum.go index a2811ddc2..b1658a8f4 100644 --- a/functional/tester/case_sigquit_remove_quorum.go +++ b/functional/tester/case_sigquit_remove_quorum.go @@ -158,11 +158,13 @@ func (c *fetchSnapshotCaseQuorum) Recover(clus *Cluster) error { clus.lg.Info( "restore snapshot and restart from snapshot request START", zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint), + zap.Strings("initial-cluster", initClus), ) err := clus.sendOp(oldlead, rpcpb.Operation_RESTORE_RESTART_FROM_SNAPSHOT) clus.lg.Info( "restore snapshot and restart from snapshot request END", zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint), + zap.Strings("initial-cluster", initClus), zap.Error(err), ) if err != nil { @@ -178,7 +180,11 @@ func (c *fetchSnapshotCaseQuorum) Recover(clus *Cluster) error { // 7. Add another member to establish 2-node cluster. // 8. Add another member to establish 3-node cluster. // 9. Add more if any. + idxs := make([]int, 0, len(c.injected)) for idx := range c.injected { + idxs = append(idxs, idx) + } + for i, idx := range idxs { clus.lg.Info( "member add request START", zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint), @@ -197,10 +203,6 @@ func (c *fetchSnapshotCaseQuorum) Recover(clus *Cluster) error { return err } - // wait until membership reconfiguration entry gets applied - // TODO: test concurrent member add - time.Sleep(3 * time.Second) - // start the added(new) member with fresh data clus.Members[idx].EtcdOnSnapshotRestore = clus.Members[idx].Etcd clus.Members[idx].EtcdOnSnapshotRestore.InitialClusterState = "existing" @@ -212,18 +214,38 @@ func (c *fetchSnapshotCaseQuorum) Recover(clus *Cluster) error { clus.lg.Info( "restart from snapshot request START", zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint), + zap.Strings("initial-cluster", initClus), ) err = clus.sendOp(idx, rpcpb.Operation_RESTART_FROM_SNAPSHOT) clus.lg.Info( "restart from snapshot request END", zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint), + zap.Strings("initial-cluster", initClus), zap.Error(err), ) if err != nil { return err } - } + if i != len(c.injected)-1 { + // wait until membership reconfiguration entry gets applied + // TODO: test concurrent member add + dur := 5 * clus.Members[idx].ElectionTimeout() + clus.lg.Info( + "waiting after restart from snapshot request", + zap.Int("i", i), + zap.Int("idx", idx), + zap.Duration("sleep", dur), + ) + time.Sleep(dur) + } else { + clus.lg.Info( + "restart from snapshot request ALL END", + zap.Int("i", i), + zap.Int("idx", idx), + ) + } + } return nil }