functional: wait election timeout after member add

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
release-3.4
Gyuho Lee 2018-04-11 17:13:18 -07:00
parent bd235ab8f9
commit 448e0fc481
2 changed files with 32 additions and 5 deletions

View File

@ -33,6 +33,11 @@ import (
"google.golang.org/grpc/credentials"
)
// ElectionTimeout returns an election timeout duration.
func (m *Member) ElectionTimeout() time.Duration {
return time.Duration(m.Etcd.ElectionTimeoutMs) * time.Millisecond
}
// DialEtcdGRPCServer creates a raw gRPC connection to an etcd member.
func (m *Member) DialEtcdGRPCServer(opts ...grpc.DialOption) (*grpc.ClientConn, error) {
dialOpts := []grpc.DialOption{

View File

@ -158,11 +158,13 @@ func (c *fetchSnapshotCaseQuorum) Recover(clus *Cluster) error {
clus.lg.Info(
"restore snapshot and restart from snapshot request START",
zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint),
zap.Strings("initial-cluster", initClus),
)
err := clus.sendOp(oldlead, rpcpb.Operation_RESTORE_RESTART_FROM_SNAPSHOT)
clus.lg.Info(
"restore snapshot and restart from snapshot request END",
zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint),
zap.Strings("initial-cluster", initClus),
zap.Error(err),
)
if err != nil {
@ -178,7 +180,11 @@ func (c *fetchSnapshotCaseQuorum) Recover(clus *Cluster) error {
// 7. Add another member to establish 2-node cluster.
// 8. Add another member to establish 3-node cluster.
// 9. Add more if any.
idxs := make([]int, 0, len(c.injected))
for idx := range c.injected {
idxs = append(idxs, idx)
}
for i, idx := range idxs {
clus.lg.Info(
"member add request START",
zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
@ -197,10 +203,6 @@ func (c *fetchSnapshotCaseQuorum) Recover(clus *Cluster) error {
return err
}
// wait until membership reconfiguration entry gets applied
// TODO: test concurrent member add
time.Sleep(3 * time.Second)
// start the added(new) member with fresh data
clus.Members[idx].EtcdOnSnapshotRestore = clus.Members[idx].Etcd
clus.Members[idx].EtcdOnSnapshotRestore.InitialClusterState = "existing"
@ -212,18 +214,38 @@ func (c *fetchSnapshotCaseQuorum) Recover(clus *Cluster) error {
clus.lg.Info(
"restart from snapshot request START",
zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
zap.Strings("initial-cluster", initClus),
)
err = clus.sendOp(idx, rpcpb.Operation_RESTART_FROM_SNAPSHOT)
clus.lg.Info(
"restart from snapshot request END",
zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
zap.Strings("initial-cluster", initClus),
zap.Error(err),
)
if err != nil {
return err
}
}
if i != len(c.injected)-1 {
// wait until membership reconfiguration entry gets applied
// TODO: test concurrent member add
dur := 5 * clus.Members[idx].ElectionTimeout()
clus.lg.Info(
"waiting after restart from snapshot request",
zap.Int("i", i),
zap.Int("idx", idx),
zap.Duration("sleep", dur),
)
time.Sleep(dur)
} else {
clus.lg.Info(
"restart from snapshot request ALL END",
zap.Int("i", i),
zap.Int("idx", idx),
)
}
}
return nil
}