From b1c04ce04338f6ec483db1d2919adc7893b2f323 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Sat, 10 Apr 2021 11:42:15 +0200 Subject: [PATCH] Applying consistency fix: ClusterVersionSet (and co) might get no applied on v2store ClusterVersionSet, ClusterMemberAttrSet, DowngradeInfoSet functions are writing both to V2store and backend. Prior this CL there were in a branch not executed if shouldApplyV3 was false, e.g. during restore when Backend is up-to-date (has high consistency-index) while v2store requires replay from WAL log. The most serious consequence of this bug was that v2store after restore could have different index (revision) than the same exact store before restore, so potentially different content between replicas. Also this change is supressing double-applying of Membership (ClusterConfig) changes on Backend (store v3) - that lackilly are not part of MVCC/KeyValue store, so they didn't caused Revisions to be bumped. Inspired by jingyih@ comment: https://github.com/etcd-io/etcd/pull/12820#issuecomment-815299406 --- etcdctl/ctlv3/command/migrate_command.go | 6 +-- etcdctl/snapshot/v3_snapshot.go | 2 +- server/etcdserver/api/membership/cluster.go | 29 +++++------ .../etcdserver/api/membership/cluster_test.go | 14 +++--- server/etcdserver/apply.go | 49 ++++++++++++------- server/etcdserver/apply_auth.go | 4 +- server/etcdserver/apply_v2.go | 2 +- server/etcdserver/server.go | 35 ++++++++----- server/etcdserver/server_test.go | 24 ++++----- server/mvcc/kvstore.go | 5 +- tests/go.mod | 1 + .../integration/clientv3/ordering_kv_test.go | 23 +++++---- 12 files changed, 112 insertions(+), 82 deletions(-) diff --git a/etcdctl/ctlv3/command/migrate_command.go b/etcdctl/ctlv3/command/migrate_command.go index c287f0329..74245483c 100644 --- a/etcdctl/ctlv3/command/migrate_command.go +++ b/etcdctl/ctlv3/command/migrate_command.go @@ -208,15 +208,15 @@ func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) { if err := json.Unmarshal(cc.Context, m); err != nil { panic(err) } - cl.AddMember(m) + cl.AddMember(m, true) case raftpb.ConfChangeRemoveNode: - cl.RemoveMember(types.ID(cc.NodeID)) + cl.RemoveMember(types.ID(cc.NodeID), true) case raftpb.ConfChangeUpdateNode: m := new(membership.Member) if err := json.Unmarshal(cc.Context, m); err != nil { panic(err) } - cl.UpdateRaftAttributes(m.ID, m.RaftAttributes) + cl.UpdateRaftAttributes(m.ID, m.RaftAttributes, true) } } diff --git a/etcdctl/snapshot/v3_snapshot.go b/etcdctl/snapshot/v3_snapshot.go index 48ea0597a..7c8b159f8 100644 --- a/etcdctl/snapshot/v3_snapshot.go +++ b/etcdctl/snapshot/v3_snapshot.go @@ -398,7 +398,7 @@ func (s *v3Manager) saveWALAndSnap() error { st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) s.cl.SetStore(st) for _, m := range s.cl.Members() { - s.cl.AddMember(m) + s.cl.AddMember(m, true) } m := s.cl.MemberByName(s.name) diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index ea9d58aef..1d6949d59 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -285,6 +285,7 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { // ValidateConfigurationChange takes a proposed ConfChange and // ensures that it is still valid. func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { + // TODO: this must be switched to backend as well. members, removed := membersFromStore(c.lg, c.v2store) id := types.ID(cc.NodeID) if removed[id] { @@ -370,13 +371,13 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { // AddMember adds a new Member into the cluster, and saves the given member's // raftAttributes into the store. The given member should have empty attributes. // A Member with a matching id must not exist. -func (c *RaftCluster) AddMember(m *Member) { +func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 bool) { c.Lock() defer c.Unlock() if c.v2store != nil { mustSaveMemberToStore(c.lg, c.v2store, m) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveMemberToBackend(c.lg, c.be, m) } @@ -393,13 +394,13 @@ func (c *RaftCluster) AddMember(m *Member) { // RemoveMember removes a member from the store. // The given id MUST exist, or the function panics. -func (c *RaftCluster) RemoveMember(id types.ID) { +func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 bool) { c.Lock() defer c.Unlock() if c.v2store != nil { mustDeleteMemberFromStore(c.lg, c.v2store, id) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustDeleteMemberFromBackend(c.be, id) } @@ -425,7 +426,7 @@ func (c *RaftCluster) RemoveMember(id types.ID) { } } -func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) { +func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApplyV3 bool) { c.Lock() defer c.Unlock() @@ -434,7 +435,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) { if c.v2store != nil { mustUpdateMemberAttrInStore(c.lg, c.v2store, m) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveMemberToBackend(c.lg, c.be, m) } return @@ -459,7 +460,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) { } // PromoteMember marks the member's IsLearner RaftAttributes to false. -func (c *RaftCluster) PromoteMember(id types.ID) { +func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 bool) { c.Lock() defer c.Unlock() @@ -467,7 +468,7 @@ func (c *RaftCluster) PromoteMember(id types.ID) { if c.v2store != nil { mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveMemberToBackend(c.lg, c.be, c.members[id]) } @@ -478,7 +479,7 @@ func (c *RaftCluster) PromoteMember(id types.ID) { ) } -func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { +func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, shouldApplyV3 bool) { c.Lock() defer c.Unlock() @@ -486,7 +487,7 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) if c.v2store != nil { mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveMemberToBackend(c.lg, c.be, c.members[id]) } @@ -508,7 +509,7 @@ func (c *RaftCluster) Version() *semver.Version { return semver.Must(semver.NewVersion(c.version.String())) } -func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version)) { +func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version), shouldApplyV3 bool) { c.Lock() defer c.Unlock() if c.version != nil { @@ -533,7 +534,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s if c.v2store != nil { mustSaveClusterVersionToStore(c.lg, c.v2store, ver) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveClusterVersionToBackend(c.be, ver) } if oldVer != nil { @@ -809,11 +810,11 @@ func (c *RaftCluster) DowngradeInfo() *DowngradeInfo { return d } -func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo) { +func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 bool) { c.Lock() defer c.Unlock() - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveDowngradeToBackend(c.lg, c.be, d) } diff --git a/server/etcdserver/api/membership/cluster_test.go b/server/etcdserver/api/membership/cluster_test.go index a2fff321f..c3975df25 100644 --- a/server/etcdserver/api/membership/cluster_test.go +++ b/server/etcdserver/api/membership/cluster_test.go @@ -283,9 +283,9 @@ func TestClusterValidateConfigurationChange(t *testing.T) { cl.SetStore(v2store.New()) for i := 1; i <= 4; i++ { attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}} - cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr}) + cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr}, true) } - cl.RemoveMember(4) + cl.RemoveMember(4, true) attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}} ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) @@ -446,7 +446,7 @@ func TestClusterGenID(t *testing.T) { previd := cs.ID() cs.SetStore(mockstore.NewNop()) - cs.AddMember(newTestMember(3, nil, "", nil)) + cs.AddMember(newTestMember(3, nil, "", nil), true) cs.genID() if cs.ID() == previd { t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd) @@ -489,7 +489,7 @@ func TestClusterAddMember(t *testing.T) { st := mockstore.NewRecorder() c := newTestCluster(t, nil) c.SetStore(st) - c.AddMember(newTestMember(1, nil, "node1", nil)) + c.AddMember(newTestMember(1, nil, "node1", nil), true) wactions := []testutil.Action{ { @@ -512,7 +512,7 @@ func TestClusterAddMemberAsLearner(t *testing.T) { st := mockstore.NewRecorder() c := newTestCluster(t, nil) c.SetStore(st) - c.AddMember(newTestMemberAsLearner(1, nil, "node1", nil)) + c.AddMember(newTestMemberAsLearner(1, nil, "node1", nil), true) wactions := []testutil.Action{ { @@ -555,7 +555,7 @@ func TestClusterRemoveMember(t *testing.T) { st := mockstore.NewRecorder() c := newTestCluster(t, nil) c.SetStore(st) - c.RemoveMember(1) + c.RemoveMember(1, true) wactions := []testutil.Action{ {Name: "Delete", Params: []interface{}{MemberStoreKey(1), true, true}}, @@ -595,7 +595,7 @@ func TestClusterUpdateAttributes(t *testing.T) { c := newTestCluster(t, tt.mems) c.removed = tt.removed - c.UpdateAttributes(types.ID(1), Attributes{Name: name, ClientURLs: clientURLs}) + c.UpdateAttributes(types.ID(1), Attributes{Name: name, ClientURLs: clientURLs}, true) if g := c.Members(); !reflect.DeepEqual(g, tt.wmems) { t.Errorf("#%d: members = %+v, want %+v", i, g, tt.wmems) } diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index f80f89ca3..2096ecbb4 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -54,14 +54,14 @@ type applyResult struct { // applierV3Internal is the interface for processing internal V3 raft request type applierV3Internal interface { - ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) - ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) - DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) + ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 bool) + ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 bool) + DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 bool) } // applierV3 is the interface for processing V3 raft messages type applierV3 interface { - Apply(r *pb.InternalRaftRequest) *applyResult + Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) @@ -130,7 +130,7 @@ func (s *EtcdServer) newApplierV3() applierV3 { ) } -func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { +func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult { op := "unknown" ar := &applyResult{} defer func(start time.Time) { @@ -142,6 +142,25 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { } }(time.Now()) + switch { + case r.ClusterVersionSet != nil: // Implemented in 3.5.x + op = "ClusterVersionSet" + a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3) + return nil + case r.ClusterMemberAttrSet != nil: + op = "ClusterMemberAttrSet" // Implemented in 3.5.x + a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3) + return nil + case r.DowngradeInfoSet != nil: + op = "DowngradeInfoSet" // Implemented in 3.5.x + a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3) + return nil + } + + if !shouldApplyV3 { + return nil + } + // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls switch { case r.Range != nil: @@ -221,15 +240,6 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { case r.AuthRoleList != nil: op = "AuthRoleList" ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList) - case r.ClusterVersionSet != nil: // Implemented in 3.5.x - op = "ClusterVersionSet" - a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet) - case r.ClusterMemberAttrSet != nil: - op = "ClusterMemberAttrSet" // Implemented in 3.5.x - a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet) - case r.DowngradeInfoSet != nil: - op = "DowngradeInfoSet" // Implemented in 3.5.x - a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet) default: a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r)) } @@ -903,26 +913,27 @@ func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleList return resp, err } -func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) { - a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability) +func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 bool) { + a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability, shouldApplyV3) } -func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) { +func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 bool) { a.s.cluster.UpdateAttributes( types.ID(r.Member_ID), membership.Attributes{ Name: r.MemberAttributes.Name, ClientURLs: r.MemberAttributes.ClientUrls, }, + shouldApplyV3, ) } -func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) { +func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 bool) { d := membership.DowngradeInfo{Enabled: false} if r.Enabled { d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver} } - a.s.cluster.SetDowngradeInfo(&d) + a.s.cluster.SetDowngradeInfo(&d, shouldApplyV3) } type quotaApplierV3 struct { diff --git a/server/etcdserver/apply_auth.go b/server/etcdserver/apply_auth.go index c1de09f11..140ec847d 100644 --- a/server/etcdserver/apply_auth.go +++ b/server/etcdserver/apply_auth.go @@ -41,7 +41,7 @@ func newAuthApplierV3(as auth.AuthStore, base applierV3, lessor lease.Lessor) *a return &authApplierV3{applierV3: base, as: as, lessor: lessor} } -func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult { +func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult { aa.mu.Lock() defer aa.mu.Unlock() if r.Header != nil { @@ -57,7 +57,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult { return &applyResult{err: err} } } - ret := aa.applierV3.Apply(r) + ret := aa.applierV3.Apply(r, shouldApplyV3) aa.authInfo.Username = "" aa.authInfo.Revision = 0 return ret diff --git a/server/etcdserver/apply_v2.go b/server/etcdserver/apply_v2.go index e322c29a6..f2e3d89da 100644 --- a/server/etcdserver/apply_v2.go +++ b/server/etcdserver/apply_v2.go @@ -87,7 +87,7 @@ func (a *applierV2store) Put(r *RequestV2) Response { a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) } if a.cluster != nil { - a.cluster.UpdateAttributes(id, attr) + a.cluster.UpdateAttributes(id, attr, true) } // return an empty response since there is no consumer. return Response{} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 7aee4d0b9..8797ca6b2 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -561,8 +561,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) kvindex := srv.consistIndex.ConsistentIndex() - srv.lg.Debug("restore consistentIndex", - zap.Uint64("index", kvindex)) + srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex)) if beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. @@ -2018,8 +2017,13 @@ func (s *EtcdServer) apply( es []raftpb.Entry, confState *raftpb.ConfState, ) (appliedt uint64, appliedi uint64, shouldStop bool) { + s.lg.Debug("Applying entries", zap.Int("num-entries", len(es))) for i := range es { e := es[i] + s.lg.Debug("Applying entry", + zap.Uint64("index", e.Index), + zap.Uint64("term", e.Term), + zap.Stringer("type", e.Type)) switch e.Type { case raftpb.EntryNormal: s.applyEntryNormal(&e) @@ -2028,12 +2032,14 @@ func (s *EtcdServer) apply( case raftpb.EntryConfChange: // set the consistent index of current executing entry + shouldApplyV3 := false if e.Index > s.consistIndex.ConsistentIndex() { s.consistIndex.SetConsistentIndex(e.Index) + shouldApplyV3 = true } var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) - removedSelf, err := s.applyConfChange(cc, confState) + removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3) s.setAppliedIndex(e.Index) s.setTerm(e.Term) shouldStop = shouldStop || removedSelf @@ -2085,18 +2091,16 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { var r pb.Request rp := &r pbutil.MustUnmarshal(rp, e.Data) + s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp)) s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp))) return } + s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq)) if raftReq.V2 != nil { req := (*RequestV2)(raftReq.V2) s.w.Trigger(req.ID, s.applyV2Request(req)) return } - // do not re-apply applied entries. - if !shouldApplyV3 { - return - } id := raftReq.ID if id == 0 { @@ -2109,7 +2113,12 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { if !needResult && raftReq.Txn != nil { removeNeedlessRangeReqs(raftReq.Txn) } - ar = s.applyV3.Apply(&raftReq) + ar = s.applyV3.Apply(&raftReq, shouldApplyV3) + } + + // do not re-apply applied entries. + if !shouldApplyV3 { + return } if ar == nil { @@ -2142,7 +2151,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { // applyConfChange applies a ConfChange to the server. It is only // invoked with a ConfChange that has already passed through Raft -func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) { +func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 bool) (bool, error) { if err := s.cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.r.ApplyConfChange(cc) @@ -2165,9 +2174,9 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con ) } if confChangeContext.IsPromote { - s.cluster.PromoteMember(confChangeContext.Member.ID) + s.cluster.PromoteMember(confChangeContext.Member.ID, shouldApplyV3) } else { - s.cluster.AddMember(&confChangeContext.Member) + s.cluster.AddMember(&confChangeContext.Member, shouldApplyV3) if confChangeContext.Member.ID != s.id { s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs) @@ -2185,7 +2194,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con case raftpb.ConfChangeRemoveNode: id := types.ID(cc.NodeID) - s.cluster.RemoveMember(id) + s.cluster.RemoveMember(id, shouldApplyV3) if id == s.id { return true, nil } @@ -2203,7 +2212,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con zap.String("member-id-from-message", m.ID.String()), ) } - s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) + s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, shouldApplyV3) if m.ID != s.id { s.r.transport.UpdatePeer(m.ID, m.PeerURLs) } diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 38d6ef2cf..1f1cd1bd9 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -181,7 +181,7 @@ func TestApplyRepeat(t *testing.T) { cl := newTestCluster(nil) st := v2store.New() cl.SetStore(v2store.New()) - cl.AddMember(&membership.Member{ID: 1234}) + cl.AddMember(&membership.Member{ID: 1234}, true) r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: n, @@ -509,9 +509,9 @@ func TestApplyConfChangeError(t *testing.T) { cl := membership.NewCluster(zap.NewExample(), "") cl.SetStore(v2store.New()) for i := 1; i <= 4; i++ { - cl.AddMember(&membership.Member{ID: types.ID(i)}) + cl.AddMember(&membership.Member{ID: types.ID(i)}, true) } - cl.RemoveMember(4) + cl.RemoveMember(4, true) attr := membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}} ctx, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr}) @@ -576,7 +576,7 @@ func TestApplyConfChangeError(t *testing.T) { r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}), cluster: cl, } - _, err := srv.applyConfChange(tt.cc, nil) + _, err := srv.applyConfChange(tt.cc, nil, true) if err != tt.werr { t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr) } @@ -597,7 +597,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { cl := membership.NewCluster(zap.NewExample(), "") cl.SetStore(v2store.New()) for i := 1; i <= 3; i++ { - cl.AddMember(&membership.Member{ID: types.ID(i)}) + cl.AddMember(&membership.Member{ID: types.ID(i)}, true) } r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), @@ -616,7 +616,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { NodeID: 2, } // remove non-local member - shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{}) + shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{}, true) if err != nil { t.Fatalf("unexpected error %v", err) } @@ -626,7 +626,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { // remove local member cc.NodeID = 1 - shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{}) + shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{}, true) if err != nil { t.Fatalf("unexpected error %v", err) } @@ -640,7 +640,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { cl := membership.NewCluster(zap.NewExample(), "") cl.SetStore(v2store.New()) - cl.AddMember(&membership.Member{ID: types.ID(1)}) + cl.AddMember(&membership.Member{ID: types.ID(1)}, true) r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: newNodeNop(), @@ -688,7 +688,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { cl := membership.NewCluster(zap.NewExample(), "") cl.SetStore(v2store.New()) for i := 1; i <= 5; i++ { - cl.AddMember(&membership.Member{ID: types.ID(i)}) + cl.AddMember(&membership.Member{ID: types.ID(i)}, true) } r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), @@ -1342,7 +1342,7 @@ func TestRemoveMember(t *testing.T) { cl := newTestCluster(nil) st := v2store.New() cl.SetStore(v2store.New()) - cl.AddMember(&membership.Member{ID: 1234}) + cl.AddMember(&membership.Member{ID: 1234}, true) r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: n, @@ -1386,7 +1386,7 @@ func TestUpdateMember(t *testing.T) { cl := newTestCluster(nil) st := v2store.New() cl.SetStore(st) - cl.AddMember(&membership.Member{ID: 1234}) + cl.AddMember(&membership.Member{ID: 1234}, true) r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: n, @@ -1874,7 +1874,7 @@ func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error { func newTestCluster(membs []*membership.Member) *membership.RaftCluster { c := membership.NewCluster(zap.NewExample(), "") for _, m := range membs { - c.AddMember(m) + c.AddMember(m, true) } return c } diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index 41303ee3a..2256902fe 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -43,7 +43,6 @@ var ( ErrCompacted = errors.New("mvcc: required revision has been compacted") ErrFutureRev = errors.New("mvcc: required revision is a future revision") - ErrCanceled = errors.New("mvcc: watcher is canceled") ) const ( @@ -438,6 +437,10 @@ func (s *store) restore() error { tx.Unlock() + s.lg.Info("kvstore restored", + zap.Uint64("consistent-index", s.ConsistentIndex()), + zap.Int64("current-rev", s.currentRev)) + if scheduledCompact != 0 { if _, err := s.compactLockfree(scheduledCompact); err != nil { s.lg.Warn("compaction encountered error", zap.Error(err)) diff --git a/tests/go.mod b/tests/go.mod index 4a43b433c..e1ced4037 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -26,6 +26,7 @@ require ( github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.1.1 github.com/spf13/pflag v1.0.5 + github.com/stretchr/testify v1.5.1 go.etcd.io/bbolt v1.3.5 go.etcd.io/etcd/api/v3 v3.5.0-alpha.0 go.etcd.io/etcd/client/pkg/v3 v3.5.0-alpha.0 diff --git a/tests/integration/clientv3/ordering_kv_test.go b/tests/integration/clientv3/ordering_kv_test.go index 184c09bc1..f7d984be0 100644 --- a/tests/integration/clientv3/ordering_kv_test.go +++ b/tests/integration/clientv3/ordering_kv_test.go @@ -20,13 +20,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/ordering" "go.etcd.io/etcd/tests/v3/integration" ) func TestDetectKvOrderViolation(t *testing.T) { - var errOrderViolation = errors.New("Detected Order Violation") + var errOrderViolation = errors.New("DetectedOrderViolation") integration.BeforeTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) @@ -43,7 +44,7 @@ func TestDetectKvOrderViolation(t *testing.T) { if err != nil { t.Fatal(err) } - defer cli.Close() + defer func() { assert.NoError(t, cli.Close()) }() ctx := context.TODO() if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil { @@ -69,27 +70,31 @@ func TestDetectKvOrderViolation(t *testing.T) { func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) error { return errOrderViolation }) - _, err = orderingKv.Get(ctx, "foo") + v, err := orderingKv.Get(ctx, "foo") if err != nil { t.Fatal(err) } + t.Logf("Read from the first member: v:%v err:%v", v, err) + assert.Equal(t, []byte("buzz"), v.Kvs[0].Value) // ensure that only the third member is queried during requests clus.Members[0].Stop(t) clus.Members[1].Stop(t) - clus.Members[2].Restart(t) + assert.NoError(t, clus.Members[2].Restart(t)) // force OrderingKv to query the third member cli.SetEndpoints(clus.Members[2].GRPCAddr()) time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed - _, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable()) + t.Logf("Quering m2 after restart") + v, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable()) + t.Logf("Quering m2 returned: v:%v erro:%v ", v, err) if err != errOrderViolation { - t.Fatalf("expected %v, got %v", errOrderViolation, err) + t.Fatalf("expected %v, got err:%v v:%v", errOrderViolation, err, v) } } func TestDetectTxnOrderViolation(t *testing.T) { - var errOrderViolation = errors.New("Detected Order Violation") + var errOrderViolation = errors.New("DetectedOrderViolation") integration.BeforeTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) @@ -106,7 +111,7 @@ func TestDetectTxnOrderViolation(t *testing.T) { if err != nil { t.Fatal(err) } - defer cli.Close() + defer func() { assert.NoError(t, cli.Close()) }() ctx := context.TODO() if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil { @@ -144,7 +149,7 @@ func TestDetectTxnOrderViolation(t *testing.T) { // ensure that only the third member is queried during requests clus.Members[0].Stop(t) clus.Members[1].Stop(t) - clus.Members[2].Restart(t) + assert.NoError(t, clus.Members[2].Restart(t)) // force OrderingKv to query the third member cli.SetEndpoints(clus.Members[2].GRPCAddr()) time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed