From 4de27039cbec29be4f1f0cb4c6a611797f0e0ea5 Mon Sep 17 00:00:00 2001 From: nolouch Date: Fri, 14 Sep 2018 13:33:57 +0800 Subject: [PATCH] server: drop read request if found leader changed --- clientv3/integration/leasing_test.go | 1 - .../integration/network_partition_test.go | 52 +++++++++++++++++++ etcdserver/api/v3rpc/rpctypes/error.go | 3 ++ etcdserver/api/v3rpc/util.go | 1 + etcdserver/errors.go | 1 + etcdserver/metrics.go | 7 +++ etcdserver/server.go | 7 ++- etcdserver/v3_server.go | 8 +++ 8 files changed, 78 insertions(+), 2 deletions(-) diff --git a/clientv3/integration/leasing_test.go b/clientv3/integration/leasing_test.go index a824726f0..78826b189 100644 --- a/clientv3/integration/leasing_test.go +++ b/clientv3/integration/leasing_test.go @@ -1790,7 +1790,6 @@ func TestLeasingTxnOwnerPutBranch(t *testing.T) { defer testutil.AfterTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) - lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") testutil.AssertNil(t, err) defer closeLKV() diff --git a/clientv3/integration/network_partition_test.go b/clientv3/integration/network_partition_test.go index 637d94d22..388eb4e07 100644 --- a/clientv3/integration/network_partition_test.go +++ b/clientv3/integration/network_partition_test.go @@ -24,6 +24,7 @@ import ( "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/integration" "go.etcd.io/etcd/pkg/testutil" "google.golang.org/grpc" @@ -265,3 +266,54 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) { t.Fatal("took too long to detect leader lost") } } + +func TestDropReadUnderNetworkPartition(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{ + Size: 3, + SkipCreatingClient: true, + }) + defer clus.Terminate(t) + leaderIndex := clus.WaitLeader(t) + // get a follower endpoint + eps := []string{clus.Members[(leaderIndex+1)%3].GRPCAddr()} + ccfg := clientv3.Config{ + Endpoints: eps, + DialTimeout: 10 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + } + cli, err := clientv3.New(ccfg) + if err != nil { + t.Fatal(err) + } + defer cli.Close() + + // wait for eps[0] to be pinned + mustWaitPinReady(t, cli) + + // add other endpoints for later endpoint switch + cli.SetEndpoints(eps...) + time.Sleep(time.Second * 2) + conn, err := cli.Dial(clus.Members[(leaderIndex+1)%3].GRPCAddr()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + clus.Members[leaderIndex].InjectPartition(t, clus.Members[(leaderIndex+1)%3], clus.Members[(leaderIndex+2)%3]) + kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), nil) + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + _, err = kvc.Get(ctx, "a") + cancel() + if err != rpctypes.ErrLeaderChanged { + t.Fatalf("expected %v, got %v", rpctypes.ErrLeaderChanged, err) + } + + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + _, err = kvc.Get(ctx, "a") + cancel() + if err != nil { + t.Fatalf("expected nil, got %v", err) + } +} diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 55eab38ef..9e45cea5b 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -61,6 +61,7 @@ var ( ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err() ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err() + ErrGRPCLeaderChanged = status.New(codes.Unavailable, "etcdserver: leader changed").Err() ErrGRPCNotCapable = status.New(codes.Unavailable, "etcdserver: not capable").Err() ErrGRPCStopped = status.New(codes.Unavailable, "etcdserver: server stopped").Err() ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err() @@ -111,6 +112,7 @@ var ( ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader, ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader, + ErrorDesc(ErrGRPCLeaderChanged): ErrGRPCLeaderChanged, ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable, ErrorDesc(ErrGRPCStopped): ErrGRPCStopped, ErrorDesc(ErrGRPCTimeout): ErrGRPCTimeout, @@ -163,6 +165,7 @@ var ( ErrNoLeader = Error(ErrGRPCNoLeader) ErrNotLeader = Error(ErrGRPCNotLeader) + ErrLeaderChanged = Error(ErrGRPCLeaderChanged) ErrNotCapable = Error(ErrGRPCNotCapable) ErrStopped = Error(ErrGRPCStopped) ErrTimeout = Error(ErrGRPCTimeout) diff --git a/etcdserver/api/v3rpc/util.go b/etcdserver/api/v3rpc/util.go index 62c6c12bb..5887dfeba 100644 --- a/etcdserver/api/v3rpc/util.go +++ b/etcdserver/api/v3rpc/util.go @@ -44,6 +44,7 @@ var toGRPCErrorMap = map[error]error{ etcdserver.ErrNoLeader: rpctypes.ErrGRPCNoLeader, etcdserver.ErrNotLeader: rpctypes.ErrGRPCNotLeader, + etcdserver.ErrLeaderChanged: rpctypes.ErrGRPCLeaderChanged, etcdserver.ErrStopped: rpctypes.ErrGRPCStopped, etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout, etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail, diff --git a/etcdserver/errors.go b/etcdserver/errors.go index fb93c4b2a..8cec52a17 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -27,6 +27,7 @@ var ( ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long") + ErrLeaderChanged = errors.New("etcdserver: leader changed") ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") ErrNoLeader = errors.New("etcdserver: no leader") ErrNotLeader = errors.New("etcdserver: not leader") diff --git a/etcdserver/metrics.go b/etcdserver/metrics.go index 99dbea96f..748e7edb5 100644 --- a/etcdserver/metrics.go +++ b/etcdserver/metrics.go @@ -86,6 +86,12 @@ var ( Name: "slow_read_indexes_total", Help: "The total number of pending read indexes not in sync with leader's or timed out read index requests.", }) + readIndexFailed = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "read_indexes_failed_total", + Help: "The total number of failed read indexes seen.", + }) leaseExpired = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "etcd_debugging", Subsystem: "server", @@ -132,6 +138,7 @@ func init() { prometheus.MustRegister(proposalsPending) prometheus.MustRegister(proposalsFailed) prometheus.MustRegister(slowReadIndex) + prometheus.MustRegister(readIndexFailed) prometheus.MustRegister(leaseExpired) prometheus.MustRegister(quotaBackendBytes) prometheus.MustRegister(currentVersion) diff --git a/etcdserver/server.go b/etcdserver/server.go index 50e845923..178861745 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -212,6 +212,8 @@ type EtcdServer struct { stopping chan struct{} // done is closed when all goroutines from start() complete. done chan struct{} + // leaderChanged is used to notify the linearizable read loop to drop the old read requests. + leaderChanged chan struct{} errorc chan error id types.ID @@ -752,6 +754,7 @@ func (s *EtcdServer) start() { s.ctx, s.cancel = context.WithCancel(context.Background()) s.readwaitc = make(chan struct{}, 1) s.readNotifier = newNotifier() + s.leaderChanged = make(chan struct{}, 1) if s.ClusterVersion() != nil { if lg != nil { lg.Info( @@ -938,7 +941,9 @@ func (s *EtcdServer) run() { s.compactor.Resume() } } - + if newLeader { + s.leaderChanged <- struct{}{} + } // TODO: remove the nil checking // current test utility does not provide the stats if s.stats != nil { diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 6fa89969c..5bcb7fc32 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -636,6 +636,8 @@ func (s *EtcdServer) linearizableReadLoop() { binary.BigEndian.PutUint64(ctxToSend, id1) select { + case <-s.leaderChanged: + continue case <-s.readwaitc: case <-s.stopping: return @@ -660,6 +662,7 @@ func (s *EtcdServer) linearizableReadLoop() { } else { plog.Errorf("failed to get read index from raft: %v", err) } + readIndexFailed.Inc() nr.notify(err) continue } @@ -691,6 +694,11 @@ func (s *EtcdServer) linearizableReadLoop() { } slowReadIndex.Inc() } + case <-s.leaderChanged: + timeout = true + readIndexFailed.Inc() + // return a retryable error. + nr.notify(ErrLeaderChanged) case <-time.After(s.Cfg.ReqTimeout()): if lg != nil { lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout()))