diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 1d6949d59..a8a27984f 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -72,6 +72,13 @@ type ConfigChangeContext struct { IsPromote bool `json:"isPromote"` } +type ShouldApplyV3 bool + +const ( + ApplyBoth = ShouldApplyV3(true) + ApplyV2storeOnly = ShouldApplyV3(false) +) + // NewClusterFromURLsMap creates a new raft cluster using provided urls map. Currently, it does not support creating // cluster with raft learner member. func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) { @@ -371,7 +378,7 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { // AddMember adds a new Member into the cluster, and saves the given member's // raftAttributes into the store. The given member should have empty attributes. // A Member with a matching id must not exist. -func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 bool) { +func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) { c.Lock() defer c.Unlock() if c.v2store != nil { @@ -394,7 +401,7 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 bool) { // RemoveMember removes a member from the store. // The given id MUST exist, or the function panics. -func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 bool) { +func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) { c.Lock() defer c.Unlock() if c.v2store != nil { @@ -426,7 +433,7 @@ func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 bool) { } } -func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApplyV3 bool) { +func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApplyV3 ShouldApplyV3) { c.Lock() defer c.Unlock() @@ -460,7 +467,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply } // PromoteMember marks the member's IsLearner RaftAttributes to false. -func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 bool) { +func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) { c.Lock() defer c.Unlock() @@ -479,7 +486,7 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 bool) { ) } -func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, shouldApplyV3 bool) { +func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, shouldApplyV3 ShouldApplyV3) { c.Lock() defer c.Unlock() @@ -509,7 +516,7 @@ func (c *RaftCluster) Version() *semver.Version { return semver.Must(semver.NewVersion(c.version.String())) } -func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version), shouldApplyV3 bool) { +func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version), shouldApplyV3 ShouldApplyV3) { c.Lock() defer c.Unlock() if c.version != nil { @@ -810,7 +817,7 @@ func (c *RaftCluster) DowngradeInfo() *DowngradeInfo { return d } -func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 bool) { +func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 ShouldApplyV3) { c.Lock() defer c.Unlock() diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index 2096ecbb4..9776e7b40 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -54,14 +54,14 @@ type applyResult struct { // applierV3Internal is the interface for processing internal V3 raft request type applierV3Internal interface { - ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 bool) - ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 bool) - DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 bool) + ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) + ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) + DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) } // applierV3 is the interface for processing V3 raft messages type applierV3 interface { - Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult + Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) @@ -130,7 +130,7 @@ func (s *EtcdServer) newApplierV3() applierV3 { ) } -func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult { +func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult { op := "unknown" ar := &applyResult{} defer func(start time.Time) { @@ -913,11 +913,11 @@ func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleList return resp, err } -func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 bool) { +func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) { a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability, shouldApplyV3) } -func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 bool) { +func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) { a.s.cluster.UpdateAttributes( types.ID(r.Member_ID), membership.Attributes{ @@ -928,7 +928,7 @@ func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAtt ) } -func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 bool) { +func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) { d := membership.DowngradeInfo{Enabled: false} if r.Enabled { d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver} diff --git a/server/etcdserver/apply_auth.go b/server/etcdserver/apply_auth.go index 140ec847d..74fd2b4fc 100644 --- a/server/etcdserver/apply_auth.go +++ b/server/etcdserver/apply_auth.go @@ -21,6 +21,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/auth" + "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/mvcc" ) @@ -41,7 +42,7 @@ func newAuthApplierV3(as auth.AuthStore, base applierV3, lessor lease.Lessor) *a return &authApplierV3{applierV3: base, as: as, lessor: lessor} } -func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult { +func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult { aa.mu.Lock() defer aa.mu.Unlock() if r.Header != nil { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 8797ca6b2..56b293106 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2031,12 +2031,16 @@ func (s *EtcdServer) apply( s.setTerm(e.Term) case raftpb.EntryConfChange: + // We need to apply all WAL entries on top of v2store + // and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend. + shouldApplyV3 := membership.ApplyV2storeOnly + // set the consistent index of current executing entry - shouldApplyV3 := false if e.Index > s.consistIndex.ConsistentIndex() { s.consistIndex.SetConsistentIndex(e.Index) - shouldApplyV3 = true + shouldApplyV3 = membership.ApplyBoth } + var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3) @@ -2059,17 +2063,17 @@ func (s *EtcdServer) apply( // applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { - shouldApplyV3 := false + shouldApplyV3 := membership.ApplyV2storeOnly index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry s.consistIndex.SetConsistentIndex(e.Index) - shouldApplyV3 = true + shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", zap.Uint64("consistent-index", index), zap.Uint64("entry-index", e.Index), - zap.Bool("should-applyV3", shouldApplyV3)) + zap.Bool("should-applyV3", bool(shouldApplyV3))) // raft state machine may generate noop entry when leader confirmation. // skip it in advance to avoid some potential bug in the future @@ -2151,7 +2155,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { // applyConfChange applies a ConfChange to the server. It is only // invoked with a ConfChange that has already passed through Raft -func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 bool) (bool, error) { +func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 membership.ShouldApplyV3) (bool, error) { if err := s.cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.r.ApplyConfChange(cc)