e2e: add cluster version test for rolling start servers

release-3.5
yoyinzyc 2019-10-22 12:39:00 -07:00
parent 92f313811e
commit 4243ebd3ef
2 changed files with 89 additions and 30 deletions

View File

@ -20,6 +20,7 @@ import (
"net/url" "net/url"
"os" "os"
"strings" "strings"
"time"
"go.etcd.io/etcd/etcdserver" "go.etcd.io/etcd/etcdserver"
) )
@ -134,6 +135,8 @@ type etcdProcessClusterConfig struct {
enableV2 bool enableV2 bool
initialCorruptCheck bool initialCorruptCheck bool
authTokenOpts string authTokenOpts string
rollingStart bool
} }
// newEtcdProcessCluster launches a new cluster from etcd processes, returning // newEtcdProcessCluster launches a new cluster from etcd processes, returning
@ -155,8 +158,14 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster,
epc.procs[i] = proc epc.procs[i] = proc
} }
if err := epc.Start(); err != nil { if cfg.rollingStart {
return nil, err if err := epc.RollingStart(); err != nil {
return nil, err
}
} else {
if err := epc.Start(); err != nil {
return nil, err
}
} }
return epc, nil return epc, nil
} }
@ -347,6 +356,10 @@ func (epc *etcdProcessCluster) Start() error {
return epc.start(func(ep etcdProcess) error { return ep.Start() }) return epc.start(func(ep etcdProcess) error { return ep.Start() })
} }
func (epc *etcdProcessCluster) RollingStart() error {
return epc.rollingStart(func(ep etcdProcess) error { return ep.Start() })
}
func (epc *etcdProcessCluster) Restart() error { func (epc *etcdProcessCluster) Restart() error {
return epc.start(func(ep etcdProcess) error { return ep.Restart() }) return epc.start(func(ep etcdProcess) error { return ep.Restart() })
} }
@ -365,6 +378,22 @@ func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error {
return nil return nil
} }
func (epc *etcdProcessCluster) rollingStart(f func(ep etcdProcess) error) error {
readyC := make(chan error, len(epc.procs))
for i := range epc.procs {
go func(n int) { readyC <- f(epc.procs[n]) }(i)
// make sure the servers do not start at the same time
time.Sleep(time.Second)
}
for range epc.procs {
if err := <-readyC; err != nil {
epc.Close()
return err
}
}
return nil
}
func (epc *etcdProcessCluster) Stop() (err error) { func (epc *etcdProcessCluster) Stop() (err error) {
for _, p := range epc.procs { for _, p := range epc.procs {
if p == nil { if p == nil {

View File

@ -30,36 +30,51 @@ import (
func TestCtlV3Version(t *testing.T) { testCtl(t, versionTest) } func TestCtlV3Version(t *testing.T) { testCtl(t, versionTest) }
func TestClusterVersion(t *testing.T) { func TestClusterVersion(t *testing.T) {
binary := binDir + "/etcd" tests := []struct {
if !fileutil.Exist(binary) { name string
t.Skipf("%q does not exist", binary) rollingStart bool
}{
{
name: "When start servers at the same time",
rollingStart: false,
},
{
name: "When start servers one by one",
rollingStart: true,
},
} }
defer testutil.AfterTest(t)
cfg := configNoTLS
cfg.execPath = binary
cfg.snapshotCount = 3
cfg.baseScheme = "unix" // to avoid port conflict
epc, err := newEtcdProcessCluster(&cfg) for _, tt := range tests {
if err != nil { t.Run(tt.name, func(t *testing.T) {
t.Fatalf("could not start etcd process cluster (%v)", err) binary := binDir + "/etcd"
} if !fileutil.Exist(binary) {
defer func() { t.Skipf("%q does not exist", binary)
if errC := epc.Close(); errC != nil { }
t.Fatalf("error closing etcd processes (%v)", errC) defer testutil.AfterTest(t)
} cfg := configNoTLS
}() cfg.execPath = binary
cv := version.Cluster(version.Version) cfg.snapshotCount = 3
for i := 0; i < 7; i++ { cfg.baseScheme = "unix" // to avoid port conflict
if err = cURLGet(epc, cURLReq{endpoint: "/version", expected: `"etcdcluster":"` + cv}); err != nil { cfg.rollingStart = tt.rollingStart
t.Logf("#%d: v3 is not ready yet (%v)", i, err)
time.Sleep(time.Second) epc, err := newEtcdProcessCluster(&cfg)
continue if err != nil {
} t.Fatalf("could not start etcd process cluster (%v)", err)
break }
} defer func() {
if err != nil { if errC := epc.Close(); errC != nil {
t.Fatalf("failed cluster version test expected %v got (%v)", cv, err) t.Fatalf("error closing etcd processes (%v)", errC)
}
}()
ctx := ctlCtx{
t: t,
cfg: cfg,
epc: epc,
}
cv := version.Cluster(version.Version)
clusterVersionTest(ctx, `"etcdcluster":"`+cv)
})
} }
} }
@ -69,6 +84,21 @@ func versionTest(cx ctlCtx) {
} }
} }
func clusterVersionTest(cx ctlCtx, expected string) {
var err error
for i := 0; i < 7; i++ {
if err = cURLGet(cx.epc, cURLReq{endpoint: "/version", expected: expected}); err != nil {
cx.t.Logf("#%d: v3 is not ready yet (%v)", i, err)
time.Sleep(time.Second)
continue
}
break
}
if err != nil {
cx.t.Fatalf("failed cluster version test expected %v got (%v)", expected, err)
}
}
func ctlV3Version(cx ctlCtx) error { func ctlV3Version(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "version") cmdArgs := append(cx.PrefixArgs(), "version")
return spawnWithExpect(cmdArgs, version.Version) return spawnWithExpect(cmdArgs, version.Version)