Merge pull request #8841 from gyuho/test-test

clientv3/integration: add more tests on balancer switch, inflight range
release-3.3
Gyuho Lee 2017-11-30 09:38:53 -08:00 committed by GitHub
commit 56a012f2ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 114 additions and 47 deletions

View File

@ -825,53 +825,6 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
}
}
// TestKVGetResetLoneEndpoint ensures that if an endpoint resets and all other
// endpoints are down, then it will reconnect.
func TestKVGetResetLoneEndpoint(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, SkipCreatingClient: true})
defer clus.Terminate(t)
// get endpoint list
eps := make([]string, 2)
for i := range eps {
eps[i] = clus.Members[i].GRPCAddr()
}
cfg := clientv3.Config{Endpoints: eps, DialTimeout: 500 * time.Millisecond}
cli, err := clientv3.New(cfg)
if err != nil {
t.Fatal(err)
}
defer cli.Close()
// disconnect everything
clus.Members[0].Stop(t)
clus.Members[1].Stop(t)
// have Get try to reconnect
donec := make(chan struct{})
go func() {
// 3-second is the minimum interval between endpoint being marked
// as unhealthy and being removed from unhealthy, so possibly
// takes >5-second to unpin and repin an endpoint
// TODO: decrease timeout when balancer switch rewrite
ctx, cancel := context.WithTimeout(context.TODO(), 7*time.Second)
if _, err := cli.Get(ctx, "abc", clientv3.WithSerializable()); err != nil {
t.Fatal(err)
}
cancel()
close(donec)
}()
time.Sleep(500 * time.Millisecond)
clus.Members[0].Restart(t)
select {
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for Get")
case <-donec:
}
}
// TestKVPutAtMostOnce ensures that a Put will only occur at most once
// in the presence of network errors.
func TestKVPutAtMostOnce(t *testing.T) {

View File

@ -240,6 +240,120 @@ func testBalancerUnderServerShutdownImmutable(t *testing.T, op func(*clientv3.Cl
}
}
func TestBalancerUnderServerStopInflightLinearizableGetOnRestart(t *testing.T) {
tt := []pinTestOpt{
{pinLeader: true, stopPinFirst: true},
{pinLeader: true, stopPinFirst: false},
{pinLeader: false, stopPinFirst: true},
{pinLeader: false, stopPinFirst: false},
}
for i := range tt {
testBalancerUnderServerStopInflightRangeOnRestart(t, true, tt[i])
}
}
func TestBalancerUnderServerStopInflightSerializableGetOnRestart(t *testing.T) {
tt := []pinTestOpt{
{pinLeader: true, stopPinFirst: true},
{pinLeader: true, stopPinFirst: false},
{pinLeader: false, stopPinFirst: true},
{pinLeader: false, stopPinFirst: false},
}
for i := range tt {
testBalancerUnderServerStopInflightRangeOnRestart(t, false, tt[i])
}
}
type pinTestOpt struct {
pinLeader bool
stopPinFirst bool
}
// testBalancerUnderServerStopInflightRangeOnRestart expects
// inflight range request reconnects on server restart.
func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizable bool, opt pinTestOpt) {
defer testutil.AfterTest(t)
cfg := &integration.ClusterConfig{
Size: 2,
SkipCreatingClient: true,
}
if linearizable {
cfg.Size = 3
}
clus := integration.NewClusterV3(t, cfg)
defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()}
if linearizable {
eps = append(eps, clus.Members[2].GRPCAddr())
}
lead := clus.WaitLeader(t)
target := lead
if !opt.pinLeader {
target = (target + 1) % 2
}
// pin eps[target]
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[target]}})
if err != nil {
t.Errorf("failed to create client: %v", err)
}
defer cli.Close()
// wait for eps[target] to be pinned
mustWaitPinReady(t, cli)
// add all eps to list, so that when the original pined one fails
// the client can switch to other available eps
cli.SetEndpoints(eps...)
if opt.stopPinFirst {
clus.Members[target].Stop(t)
// give some time for balancer switch before stopping the other
time.Sleep(time.Second)
clus.Members[(target+1)%2].Stop(t)
} else {
clus.Members[(target+1)%2].Stop(t)
// balancer cannot pin other member since it's already stopped
clus.Members[target].Stop(t)
}
// 3-second is the minimum interval between endpoint being marked
// as unhealthy and being removed from unhealthy, so possibly
// takes >5-second to unpin and repin an endpoint
// TODO: decrease timeout when balancer switch rewrite
clientTimeout := 7 * time.Second
var gops []clientv3.OpOption
if !linearizable {
gops = append(gops, clientv3.WithSerializable())
}
donec, readyc := make(chan struct{}), make(chan struct{}, 1)
go func() {
defer close(donec)
ctx, cancel := context.WithTimeout(context.TODO(), clientTimeout)
readyc <- struct{}{}
_, err := cli.Get(ctx, "abc", gops...)
cancel()
if err != nil {
t.Fatal(err)
}
}()
<-readyc
clus.Members[target].Restart(t)
select {
case <-time.After(clientTimeout + 3*time.Second):
t.Fatalf("timed out waiting for Get [linearizable: %v, opt: %+v]", linearizable, opt)
case <-donec:
}
}
// e.g. due to clock drifts in server-side,
// client context times out first in server-side
// while original client-side context is not timed out yet