diff --git a/etcdctl/ctlv3/command/migrate_command.go b/etcdctl/ctlv3/command/migrate_command.go index c287f0329..74245483c 100644 --- a/etcdctl/ctlv3/command/migrate_command.go +++ b/etcdctl/ctlv3/command/migrate_command.go @@ -208,15 +208,15 @@ func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) { if err := json.Unmarshal(cc.Context, m); err != nil { panic(err) } - cl.AddMember(m) + cl.AddMember(m, true) case raftpb.ConfChangeRemoveNode: - cl.RemoveMember(types.ID(cc.NodeID)) + cl.RemoveMember(types.ID(cc.NodeID), true) case raftpb.ConfChangeUpdateNode: m := new(membership.Member) if err := json.Unmarshal(cc.Context, m); err != nil { panic(err) } - cl.UpdateRaftAttributes(m.ID, m.RaftAttributes) + cl.UpdateRaftAttributes(m.ID, m.RaftAttributes, true) } } diff --git a/etcdctl/snapshot/v3_snapshot.go b/etcdctl/snapshot/v3_snapshot.go index 48ea0597a..7c8b159f8 100644 --- a/etcdctl/snapshot/v3_snapshot.go +++ b/etcdctl/snapshot/v3_snapshot.go @@ -398,7 +398,7 @@ func (s *v3Manager) saveWALAndSnap() error { st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) s.cl.SetStore(st) for _, m := range s.cl.Members() { - s.cl.AddMember(m) + s.cl.AddMember(m, true) } m := s.cl.MemberByName(s.name) diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index ea9d58aef..1d6949d59 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -285,6 +285,7 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { // ValidateConfigurationChange takes a proposed ConfChange and // ensures that it is still valid. func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { + // TODO: this must be switched to backend as well. members, removed := membersFromStore(c.lg, c.v2store) id := types.ID(cc.NodeID) if removed[id] { @@ -370,13 +371,13 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { // AddMember adds a new Member into the cluster, and saves the given member's // raftAttributes into the store. The given member should have empty attributes. // A Member with a matching id must not exist. -func (c *RaftCluster) AddMember(m *Member) { +func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 bool) { c.Lock() defer c.Unlock() if c.v2store != nil { mustSaveMemberToStore(c.lg, c.v2store, m) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveMemberToBackend(c.lg, c.be, m) } @@ -393,13 +394,13 @@ func (c *RaftCluster) AddMember(m *Member) { // RemoveMember removes a member from the store. // The given id MUST exist, or the function panics. -func (c *RaftCluster) RemoveMember(id types.ID) { +func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 bool) { c.Lock() defer c.Unlock() if c.v2store != nil { mustDeleteMemberFromStore(c.lg, c.v2store, id) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustDeleteMemberFromBackend(c.be, id) } @@ -425,7 +426,7 @@ func (c *RaftCluster) RemoveMember(id types.ID) { } } -func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) { +func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApplyV3 bool) { c.Lock() defer c.Unlock() @@ -434,7 +435,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) { if c.v2store != nil { mustUpdateMemberAttrInStore(c.lg, c.v2store, m) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveMemberToBackend(c.lg, c.be, m) } return @@ -459,7 +460,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) { } // PromoteMember marks the member's IsLearner RaftAttributes to false. -func (c *RaftCluster) PromoteMember(id types.ID) { +func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 bool) { c.Lock() defer c.Unlock() @@ -467,7 +468,7 @@ func (c *RaftCluster) PromoteMember(id types.ID) { if c.v2store != nil { mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveMemberToBackend(c.lg, c.be, c.members[id]) } @@ -478,7 +479,7 @@ func (c *RaftCluster) PromoteMember(id types.ID) { ) } -func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { +func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, shouldApplyV3 bool) { c.Lock() defer c.Unlock() @@ -486,7 +487,7 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) if c.v2store != nil { mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveMemberToBackend(c.lg, c.be, c.members[id]) } @@ -508,7 +509,7 @@ func (c *RaftCluster) Version() *semver.Version { return semver.Must(semver.NewVersion(c.version.String())) } -func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version)) { +func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version), shouldApplyV3 bool) { c.Lock() defer c.Unlock() if c.version != nil { @@ -533,7 +534,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s if c.v2store != nil { mustSaveClusterVersionToStore(c.lg, c.v2store, ver) } - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveClusterVersionToBackend(c.be, ver) } if oldVer != nil { @@ -809,11 +810,11 @@ func (c *RaftCluster) DowngradeInfo() *DowngradeInfo { return d } -func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo) { +func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 bool) { c.Lock() defer c.Unlock() - if c.be != nil { + if c.be != nil && shouldApplyV3 { mustSaveDowngradeToBackend(c.lg, c.be, d) } diff --git a/server/etcdserver/api/membership/cluster_test.go b/server/etcdserver/api/membership/cluster_test.go index a2fff321f..c3975df25 100644 --- a/server/etcdserver/api/membership/cluster_test.go +++ b/server/etcdserver/api/membership/cluster_test.go @@ -283,9 +283,9 @@ func TestClusterValidateConfigurationChange(t *testing.T) { cl.SetStore(v2store.New()) for i := 1; i <= 4; i++ { attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}} - cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr}) + cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr}, true) } - cl.RemoveMember(4) + cl.RemoveMember(4, true) attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}} ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) @@ -446,7 +446,7 @@ func TestClusterGenID(t *testing.T) { previd := cs.ID() cs.SetStore(mockstore.NewNop()) - cs.AddMember(newTestMember(3, nil, "", nil)) + cs.AddMember(newTestMember(3, nil, "", nil), true) cs.genID() if cs.ID() == previd { t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd) @@ -489,7 +489,7 @@ func TestClusterAddMember(t *testing.T) { st := mockstore.NewRecorder() c := newTestCluster(t, nil) c.SetStore(st) - c.AddMember(newTestMember(1, nil, "node1", nil)) + c.AddMember(newTestMember(1, nil, "node1", nil), true) wactions := []testutil.Action{ { @@ -512,7 +512,7 @@ func TestClusterAddMemberAsLearner(t *testing.T) { st := mockstore.NewRecorder() c := newTestCluster(t, nil) c.SetStore(st) - c.AddMember(newTestMemberAsLearner(1, nil, "node1", nil)) + c.AddMember(newTestMemberAsLearner(1, nil, "node1", nil), true) wactions := []testutil.Action{ { @@ -555,7 +555,7 @@ func TestClusterRemoveMember(t *testing.T) { st := mockstore.NewRecorder() c := newTestCluster(t, nil) c.SetStore(st) - c.RemoveMember(1) + c.RemoveMember(1, true) wactions := []testutil.Action{ {Name: "Delete", Params: []interface{}{MemberStoreKey(1), true, true}}, @@ -595,7 +595,7 @@ func TestClusterUpdateAttributes(t *testing.T) { c := newTestCluster(t, tt.mems) c.removed = tt.removed - c.UpdateAttributes(types.ID(1), Attributes{Name: name, ClientURLs: clientURLs}) + c.UpdateAttributes(types.ID(1), Attributes{Name: name, ClientURLs: clientURLs}, true) if g := c.Members(); !reflect.DeepEqual(g, tt.wmems) { t.Errorf("#%d: members = %+v, want %+v", i, g, tt.wmems) } diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index f80f89ca3..2096ecbb4 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -54,14 +54,14 @@ type applyResult struct { // applierV3Internal is the interface for processing internal V3 raft request type applierV3Internal interface { - ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) - ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) - DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) + ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 bool) + ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 bool) + DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 bool) } // applierV3 is the interface for processing V3 raft messages type applierV3 interface { - Apply(r *pb.InternalRaftRequest) *applyResult + Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) @@ -130,7 +130,7 @@ func (s *EtcdServer) newApplierV3() applierV3 { ) } -func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { +func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult { op := "unknown" ar := &applyResult{} defer func(start time.Time) { @@ -142,6 +142,25 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { } }(time.Now()) + switch { + case r.ClusterVersionSet != nil: // Implemented in 3.5.x + op = "ClusterVersionSet" + a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3) + return nil + case r.ClusterMemberAttrSet != nil: + op = "ClusterMemberAttrSet" // Implemented in 3.5.x + a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3) + return nil + case r.DowngradeInfoSet != nil: + op = "DowngradeInfoSet" // Implemented in 3.5.x + a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3) + return nil + } + + if !shouldApplyV3 { + return nil + } + // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls switch { case r.Range != nil: @@ -221,15 +240,6 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { case r.AuthRoleList != nil: op = "AuthRoleList" ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList) - case r.ClusterVersionSet != nil: // Implemented in 3.5.x - op = "ClusterVersionSet" - a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet) - case r.ClusterMemberAttrSet != nil: - op = "ClusterMemberAttrSet" // Implemented in 3.5.x - a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet) - case r.DowngradeInfoSet != nil: - op = "DowngradeInfoSet" // Implemented in 3.5.x - a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet) default: a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r)) } @@ -903,26 +913,27 @@ func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleList return resp, err } -func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) { - a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability) +func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 bool) { + a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability, shouldApplyV3) } -func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) { +func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 bool) { a.s.cluster.UpdateAttributes( types.ID(r.Member_ID), membership.Attributes{ Name: r.MemberAttributes.Name, ClientURLs: r.MemberAttributes.ClientUrls, }, + shouldApplyV3, ) } -func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) { +func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 bool) { d := membership.DowngradeInfo{Enabled: false} if r.Enabled { d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver} } - a.s.cluster.SetDowngradeInfo(&d) + a.s.cluster.SetDowngradeInfo(&d, shouldApplyV3) } type quotaApplierV3 struct { diff --git a/server/etcdserver/apply_auth.go b/server/etcdserver/apply_auth.go index c1de09f11..140ec847d 100644 --- a/server/etcdserver/apply_auth.go +++ b/server/etcdserver/apply_auth.go @@ -41,7 +41,7 @@ func newAuthApplierV3(as auth.AuthStore, base applierV3, lessor lease.Lessor) *a return &authApplierV3{applierV3: base, as: as, lessor: lessor} } -func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult { +func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult { aa.mu.Lock() defer aa.mu.Unlock() if r.Header != nil { @@ -57,7 +57,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult { return &applyResult{err: err} } } - ret := aa.applierV3.Apply(r) + ret := aa.applierV3.Apply(r, shouldApplyV3) aa.authInfo.Username = "" aa.authInfo.Revision = 0 return ret diff --git a/server/etcdserver/apply_v2.go b/server/etcdserver/apply_v2.go index e322c29a6..f2e3d89da 100644 --- a/server/etcdserver/apply_v2.go +++ b/server/etcdserver/apply_v2.go @@ -87,7 +87,7 @@ func (a *applierV2store) Put(r *RequestV2) Response { a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) } if a.cluster != nil { - a.cluster.UpdateAttributes(id, attr) + a.cluster.UpdateAttributes(id, attr, true) } // return an empty response since there is no consumer. return Response{} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 7aee4d0b9..8797ca6b2 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -561,8 +561,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) kvindex := srv.consistIndex.ConsistentIndex() - srv.lg.Debug("restore consistentIndex", - zap.Uint64("index", kvindex)) + srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex)) if beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. @@ -2018,8 +2017,13 @@ func (s *EtcdServer) apply( es []raftpb.Entry, confState *raftpb.ConfState, ) (appliedt uint64, appliedi uint64, shouldStop bool) { + s.lg.Debug("Applying entries", zap.Int("num-entries", len(es))) for i := range es { e := es[i] + s.lg.Debug("Applying entry", + zap.Uint64("index", e.Index), + zap.Uint64("term", e.Term), + zap.Stringer("type", e.Type)) switch e.Type { case raftpb.EntryNormal: s.applyEntryNormal(&e) @@ -2028,12 +2032,14 @@ func (s *EtcdServer) apply( case raftpb.EntryConfChange: // set the consistent index of current executing entry + shouldApplyV3 := false if e.Index > s.consistIndex.ConsistentIndex() { s.consistIndex.SetConsistentIndex(e.Index) + shouldApplyV3 = true } var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) - removedSelf, err := s.applyConfChange(cc, confState) + removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3) s.setAppliedIndex(e.Index) s.setTerm(e.Term) shouldStop = shouldStop || removedSelf @@ -2085,18 +2091,16 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { var r pb.Request rp := &r pbutil.MustUnmarshal(rp, e.Data) + s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp)) s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp))) return } + s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq)) if raftReq.V2 != nil { req := (*RequestV2)(raftReq.V2) s.w.Trigger(req.ID, s.applyV2Request(req)) return } - // do not re-apply applied entries. - if !shouldApplyV3 { - return - } id := raftReq.ID if id == 0 { @@ -2109,7 +2113,12 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { if !needResult && raftReq.Txn != nil { removeNeedlessRangeReqs(raftReq.Txn) } - ar = s.applyV3.Apply(&raftReq) + ar = s.applyV3.Apply(&raftReq, shouldApplyV3) + } + + // do not re-apply applied entries. + if !shouldApplyV3 { + return } if ar == nil { @@ -2142,7 +2151,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { // applyConfChange applies a ConfChange to the server. It is only // invoked with a ConfChange that has already passed through Raft -func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) { +func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 bool) (bool, error) { if err := s.cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.r.ApplyConfChange(cc) @@ -2165,9 +2174,9 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con ) } if confChangeContext.IsPromote { - s.cluster.PromoteMember(confChangeContext.Member.ID) + s.cluster.PromoteMember(confChangeContext.Member.ID, shouldApplyV3) } else { - s.cluster.AddMember(&confChangeContext.Member) + s.cluster.AddMember(&confChangeContext.Member, shouldApplyV3) if confChangeContext.Member.ID != s.id { s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs) @@ -2185,7 +2194,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con case raftpb.ConfChangeRemoveNode: id := types.ID(cc.NodeID) - s.cluster.RemoveMember(id) + s.cluster.RemoveMember(id, shouldApplyV3) if id == s.id { return true, nil } @@ -2203,7 +2212,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con zap.String("member-id-from-message", m.ID.String()), ) } - s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) + s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, shouldApplyV3) if m.ID != s.id { s.r.transport.UpdatePeer(m.ID, m.PeerURLs) } diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 38d6ef2cf..1f1cd1bd9 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -181,7 +181,7 @@ func TestApplyRepeat(t *testing.T) { cl := newTestCluster(nil) st := v2store.New() cl.SetStore(v2store.New()) - cl.AddMember(&membership.Member{ID: 1234}) + cl.AddMember(&membership.Member{ID: 1234}, true) r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: n, @@ -509,9 +509,9 @@ func TestApplyConfChangeError(t *testing.T) { cl := membership.NewCluster(zap.NewExample(), "") cl.SetStore(v2store.New()) for i := 1; i <= 4; i++ { - cl.AddMember(&membership.Member{ID: types.ID(i)}) + cl.AddMember(&membership.Member{ID: types.ID(i)}, true) } - cl.RemoveMember(4) + cl.RemoveMember(4, true) attr := membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}} ctx, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr}) @@ -576,7 +576,7 @@ func TestApplyConfChangeError(t *testing.T) { r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}), cluster: cl, } - _, err := srv.applyConfChange(tt.cc, nil) + _, err := srv.applyConfChange(tt.cc, nil, true) if err != tt.werr { t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr) } @@ -597,7 +597,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { cl := membership.NewCluster(zap.NewExample(), "") cl.SetStore(v2store.New()) for i := 1; i <= 3; i++ { - cl.AddMember(&membership.Member{ID: types.ID(i)}) + cl.AddMember(&membership.Member{ID: types.ID(i)}, true) } r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), @@ -616,7 +616,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { NodeID: 2, } // remove non-local member - shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{}) + shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{}, true) if err != nil { t.Fatalf("unexpected error %v", err) } @@ -626,7 +626,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { // remove local member cc.NodeID = 1 - shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{}) + shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{}, true) if err != nil { t.Fatalf("unexpected error %v", err) } @@ -640,7 +640,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { cl := membership.NewCluster(zap.NewExample(), "") cl.SetStore(v2store.New()) - cl.AddMember(&membership.Member{ID: types.ID(1)}) + cl.AddMember(&membership.Member{ID: types.ID(1)}, true) r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: newNodeNop(), @@ -688,7 +688,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { cl := membership.NewCluster(zap.NewExample(), "") cl.SetStore(v2store.New()) for i := 1; i <= 5; i++ { - cl.AddMember(&membership.Member{ID: types.ID(i)}) + cl.AddMember(&membership.Member{ID: types.ID(i)}, true) } r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), @@ -1342,7 +1342,7 @@ func TestRemoveMember(t *testing.T) { cl := newTestCluster(nil) st := v2store.New() cl.SetStore(v2store.New()) - cl.AddMember(&membership.Member{ID: 1234}) + cl.AddMember(&membership.Member{ID: 1234}, true) r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: n, @@ -1386,7 +1386,7 @@ func TestUpdateMember(t *testing.T) { cl := newTestCluster(nil) st := v2store.New() cl.SetStore(st) - cl.AddMember(&membership.Member{ID: 1234}) + cl.AddMember(&membership.Member{ID: 1234}, true) r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: n, @@ -1874,7 +1874,7 @@ func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error { func newTestCluster(membs []*membership.Member) *membership.RaftCluster { c := membership.NewCluster(zap.NewExample(), "") for _, m := range membs { - c.AddMember(m) + c.AddMember(m, true) } return c } diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index 41303ee3a..2256902fe 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -43,7 +43,6 @@ var ( ErrCompacted = errors.New("mvcc: required revision has been compacted") ErrFutureRev = errors.New("mvcc: required revision is a future revision") - ErrCanceled = errors.New("mvcc: watcher is canceled") ) const ( @@ -438,6 +437,10 @@ func (s *store) restore() error { tx.Unlock() + s.lg.Info("kvstore restored", + zap.Uint64("consistent-index", s.ConsistentIndex()), + zap.Int64("current-rev", s.currentRev)) + if scheduledCompact != 0 { if _, err := s.compactLockfree(scheduledCompact); err != nil { s.lg.Warn("compaction encountered error", zap.Error(err)) diff --git a/tests/go.mod b/tests/go.mod index 4a43b433c..e1ced4037 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -26,6 +26,7 @@ require ( github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.1.1 github.com/spf13/pflag v1.0.5 + github.com/stretchr/testify v1.5.1 go.etcd.io/bbolt v1.3.5 go.etcd.io/etcd/api/v3 v3.5.0-alpha.0 go.etcd.io/etcd/client/pkg/v3 v3.5.0-alpha.0 diff --git a/tests/integration/clientv3/ordering_kv_test.go b/tests/integration/clientv3/ordering_kv_test.go index 184c09bc1..f7d984be0 100644 --- a/tests/integration/clientv3/ordering_kv_test.go +++ b/tests/integration/clientv3/ordering_kv_test.go @@ -20,13 +20,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/ordering" "go.etcd.io/etcd/tests/v3/integration" ) func TestDetectKvOrderViolation(t *testing.T) { - var errOrderViolation = errors.New("Detected Order Violation") + var errOrderViolation = errors.New("DetectedOrderViolation") integration.BeforeTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) @@ -43,7 +44,7 @@ func TestDetectKvOrderViolation(t *testing.T) { if err != nil { t.Fatal(err) } - defer cli.Close() + defer func() { assert.NoError(t, cli.Close()) }() ctx := context.TODO() if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil { @@ -69,27 +70,31 @@ func TestDetectKvOrderViolation(t *testing.T) { func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) error { return errOrderViolation }) - _, err = orderingKv.Get(ctx, "foo") + v, err := orderingKv.Get(ctx, "foo") if err != nil { t.Fatal(err) } + t.Logf("Read from the first member: v:%v err:%v", v, err) + assert.Equal(t, []byte("buzz"), v.Kvs[0].Value) // ensure that only the third member is queried during requests clus.Members[0].Stop(t) clus.Members[1].Stop(t) - clus.Members[2].Restart(t) + assert.NoError(t, clus.Members[2].Restart(t)) // force OrderingKv to query the third member cli.SetEndpoints(clus.Members[2].GRPCAddr()) time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed - _, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable()) + t.Logf("Quering m2 after restart") + v, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable()) + t.Logf("Quering m2 returned: v:%v erro:%v ", v, err) if err != errOrderViolation { - t.Fatalf("expected %v, got %v", errOrderViolation, err) + t.Fatalf("expected %v, got err:%v v:%v", errOrderViolation, err, v) } } func TestDetectTxnOrderViolation(t *testing.T) { - var errOrderViolation = errors.New("Detected Order Violation") + var errOrderViolation = errors.New("DetectedOrderViolation") integration.BeforeTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) @@ -106,7 +111,7 @@ func TestDetectTxnOrderViolation(t *testing.T) { if err != nil { t.Fatal(err) } - defer cli.Close() + defer func() { assert.NoError(t, cli.Close()) }() ctx := context.TODO() if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil { @@ -144,7 +149,7 @@ func TestDetectTxnOrderViolation(t *testing.T) { // ensure that only the third member is queried during requests clus.Members[0].Stop(t) clus.Members[1].Stop(t) - clus.Members[2].Restart(t) + assert.NoError(t, clus.Members[2].Restart(t)) // force OrderingKv to query the third member cli.SetEndpoints(clus.Members[2].GRPCAddr()) time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed