From a7cb307a1849806bb082d6824b4724127bcb6b5c Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 8 Nov 2017 11:01:05 -0800 Subject: [PATCH] clientv3/integration: add more tests on balancer switch, inflight range Test all possible cases of server shutdown with inflight range requests. Removed redundant tests in kv_test.go. Signed-off-by: Gyu-Ho Lee --- clientv3/integration/kv_test.go | 47 -------- clientv3/integration/server_shutdown_test.go | 114 +++++++++++++++++++ 2 files changed, 114 insertions(+), 47 deletions(-) diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 33ed7ecba..f41b95297 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -825,53 +825,6 @@ func TestKVPutStoppedServerAndClose(t *testing.T) { } } -// TestKVGetResetLoneEndpoint ensures that if an endpoint resets and all other -// endpoints are down, then it will reconnect. -func TestKVGetResetLoneEndpoint(t *testing.T) { - defer testutil.AfterTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, SkipCreatingClient: true}) - defer clus.Terminate(t) - - // get endpoint list - eps := make([]string, 2) - for i := range eps { - eps[i] = clus.Members[i].GRPCAddr() - } - - cfg := clientv3.Config{Endpoints: eps, DialTimeout: 500 * time.Millisecond} - cli, err := clientv3.New(cfg) - if err != nil { - t.Fatal(err) - } - defer cli.Close() - - // disconnect everything - clus.Members[0].Stop(t) - clus.Members[1].Stop(t) - - // have Get try to reconnect - donec := make(chan struct{}) - go func() { - // 3-second is the minimum interval between endpoint being marked - // as unhealthy and being removed from unhealthy, so possibly - // takes >5-second to unpin and repin an endpoint - // TODO: decrease timeout when balancer switch rewrite - ctx, cancel := context.WithTimeout(context.TODO(), 7*time.Second) - if _, err := cli.Get(ctx, "abc", clientv3.WithSerializable()); err != nil { - t.Fatal(err) - } - cancel() - close(donec) - }() - time.Sleep(500 * time.Millisecond) - clus.Members[0].Restart(t) - select { - case <-time.After(10 * time.Second): - t.Fatalf("timed out waiting for Get") - case <-donec: - } -} - // TestKVPutAtMostOnce ensures that a Put will only occur at most once // in the presence of network errors. func TestKVPutAtMostOnce(t *testing.T) { diff --git a/clientv3/integration/server_shutdown_test.go b/clientv3/integration/server_shutdown_test.go index ffb46a929..333f1e89b 100644 --- a/clientv3/integration/server_shutdown_test.go +++ b/clientv3/integration/server_shutdown_test.go @@ -240,6 +240,120 @@ func testBalancerUnderServerShutdownImmutable(t *testing.T, op func(*clientv3.Cl } } +func TestBalancerUnderServerStopInflightLinearizableGetOnRestart(t *testing.T) { + tt := []pinTestOpt{ + {pinLeader: true, stopPinFirst: true}, + {pinLeader: true, stopPinFirst: false}, + {pinLeader: false, stopPinFirst: true}, + {pinLeader: false, stopPinFirst: false}, + } + for i := range tt { + testBalancerUnderServerStopInflightRangeOnRestart(t, true, tt[i]) + } +} + +func TestBalancerUnderServerStopInflightSerializableGetOnRestart(t *testing.T) { + tt := []pinTestOpt{ + {pinLeader: true, stopPinFirst: true}, + {pinLeader: true, stopPinFirst: false}, + {pinLeader: false, stopPinFirst: true}, + {pinLeader: false, stopPinFirst: false}, + } + for i := range tt { + testBalancerUnderServerStopInflightRangeOnRestart(t, false, tt[i]) + } +} + +type pinTestOpt struct { + pinLeader bool + stopPinFirst bool +} + +// testBalancerUnderServerStopInflightRangeOnRestart expects +// inflight range request reconnects on server restart. +func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizable bool, opt pinTestOpt) { + defer testutil.AfterTest(t) + + cfg := &integration.ClusterConfig{ + Size: 2, + SkipCreatingClient: true, + } + if linearizable { + cfg.Size = 3 + } + + clus := integration.NewClusterV3(t, cfg) + defer clus.Terminate(t) + eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()} + if linearizable { + eps = append(eps, clus.Members[2].GRPCAddr()) + } + + lead := clus.WaitLeader(t) + + target := lead + if !opt.pinLeader { + target = (target + 1) % 2 + } + + // pin eps[target] + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[target]}}) + if err != nil { + t.Errorf("failed to create client: %v", err) + } + defer cli.Close() + + // wait for eps[target] to be pinned + mustWaitPinReady(t, cli) + + // add all eps to list, so that when the original pined one fails + // the client can switch to other available eps + cli.SetEndpoints(eps...) + + if opt.stopPinFirst { + clus.Members[target].Stop(t) + // give some time for balancer switch before stopping the other + time.Sleep(time.Second) + clus.Members[(target+1)%2].Stop(t) + } else { + clus.Members[(target+1)%2].Stop(t) + // balancer cannot pin other member since it's already stopped + clus.Members[target].Stop(t) + } + + // 3-second is the minimum interval between endpoint being marked + // as unhealthy and being removed from unhealthy, so possibly + // takes >5-second to unpin and repin an endpoint + // TODO: decrease timeout when balancer switch rewrite + clientTimeout := 7 * time.Second + + var gops []clientv3.OpOption + if !linearizable { + gops = append(gops, clientv3.WithSerializable()) + } + + donec, readyc := make(chan struct{}), make(chan struct{}, 1) + go func() { + defer close(donec) + ctx, cancel := context.WithTimeout(context.TODO(), clientTimeout) + readyc <- struct{}{} + _, err := cli.Get(ctx, "abc", gops...) + cancel() + if err != nil { + t.Fatal(err) + } + }() + + <-readyc + clus.Members[target].Restart(t) + + select { + case <-time.After(clientTimeout + 3*time.Second): + t.Fatalf("timed out waiting for Get [linearizable: %v, opt: %+v]", linearizable, opt) + case <-donec: + } +} + // e.g. due to clock drifts in server-side, // client context times out first in server-side // while original client-side context is not timed out yet