Make ShouldApplyV3 an enum - not bool
parent
b1c04ce043
commit
d69e46ea47
|
@ -72,6 +72,13 @@ type ConfigChangeContext struct {
|
||||||
IsPromote bool `json:"isPromote"`
|
IsPromote bool `json:"isPromote"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ShouldApplyV3 bool
|
||||||
|
|
||||||
|
const (
|
||||||
|
ApplyBoth = ShouldApplyV3(true)
|
||||||
|
ApplyV2storeOnly = ShouldApplyV3(false)
|
||||||
|
)
|
||||||
|
|
||||||
// NewClusterFromURLsMap creates a new raft cluster using provided urls map. Currently, it does not support creating
|
// NewClusterFromURLsMap creates a new raft cluster using provided urls map. Currently, it does not support creating
|
||||||
// cluster with raft learner member.
|
// cluster with raft learner member.
|
||||||
func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) {
|
func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) {
|
||||||
|
@ -371,7 +378,7 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
|
||||||
// AddMember adds a new Member into the cluster, and saves the given member's
|
// AddMember adds a new Member into the cluster, and saves the given member's
|
||||||
// raftAttributes into the store. The given member should have empty attributes.
|
// raftAttributes into the store. The given member should have empty attributes.
|
||||||
// A Member with a matching id must not exist.
|
// A Member with a matching id must not exist.
|
||||||
func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 bool) {
|
func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
if c.v2store != nil {
|
if c.v2store != nil {
|
||||||
|
@ -394,7 +401,7 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 bool) {
|
||||||
|
|
||||||
// RemoveMember removes a member from the store.
|
// RemoveMember removes a member from the store.
|
||||||
// The given id MUST exist, or the function panics.
|
// The given id MUST exist, or the function panics.
|
||||||
func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 bool) {
|
func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
if c.v2store != nil {
|
if c.v2store != nil {
|
||||||
|
@ -426,7 +433,7 @@ func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApplyV3 bool) {
|
func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApplyV3 ShouldApplyV3) {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
|
|
||||||
|
@ -460,7 +467,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply
|
||||||
}
|
}
|
||||||
|
|
||||||
// PromoteMember marks the member's IsLearner RaftAttributes to false.
|
// PromoteMember marks the member's IsLearner RaftAttributes to false.
|
||||||
func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 bool) {
|
func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
|
|
||||||
|
@ -479,7 +486,7 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 bool) {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, shouldApplyV3 bool) {
|
func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, shouldApplyV3 ShouldApplyV3) {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
|
|
||||||
|
@ -509,7 +516,7 @@ func (c *RaftCluster) Version() *semver.Version {
|
||||||
return semver.Must(semver.NewVersion(c.version.String()))
|
return semver.Must(semver.NewVersion(c.version.String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version), shouldApplyV3 bool) {
|
func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version), shouldApplyV3 ShouldApplyV3) {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
if c.version != nil {
|
if c.version != nil {
|
||||||
|
@ -810,7 +817,7 @@ func (c *RaftCluster) DowngradeInfo() *DowngradeInfo {
|
||||||
return d
|
return d
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 bool) {
|
func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 ShouldApplyV3) {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -54,14 +54,14 @@ type applyResult struct {
|
||||||
|
|
||||||
// applierV3Internal is the interface for processing internal V3 raft request
|
// applierV3Internal is the interface for processing internal V3 raft request
|
||||||
type applierV3Internal interface {
|
type applierV3Internal interface {
|
||||||
ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 bool)
|
ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3)
|
||||||
ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 bool)
|
ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3)
|
||||||
DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 bool)
|
DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3)
|
||||||
}
|
}
|
||||||
|
|
||||||
// applierV3 is the interface for processing V3 raft messages
|
// applierV3 is the interface for processing V3 raft messages
|
||||||
type applierV3 interface {
|
type applierV3 interface {
|
||||||
Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult
|
Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult
|
||||||
|
|
||||||
Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
|
Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
|
||||||
Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
|
Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
|
||||||
|
@ -130,7 +130,7 @@ func (s *EtcdServer) newApplierV3() applierV3 {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult {
|
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {
|
||||||
op := "unknown"
|
op := "unknown"
|
||||||
ar := &applyResult{}
|
ar := &applyResult{}
|
||||||
defer func(start time.Time) {
|
defer func(start time.Time) {
|
||||||
|
@ -913,11 +913,11 @@ func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleList
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 bool) {
|
func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
|
||||||
a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability, shouldApplyV3)
|
a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability, shouldApplyV3)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 bool) {
|
func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
|
||||||
a.s.cluster.UpdateAttributes(
|
a.s.cluster.UpdateAttributes(
|
||||||
types.ID(r.Member_ID),
|
types.ID(r.Member_ID),
|
||||||
membership.Attributes{
|
membership.Attributes{
|
||||||
|
@ -928,7 +928,7 @@ func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAtt
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 bool) {
|
func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
|
||||||
d := membership.DowngradeInfo{Enabled: false}
|
d := membership.DowngradeInfo{Enabled: false}
|
||||||
if r.Enabled {
|
if r.Enabled {
|
||||||
d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
|
d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||||
"go.etcd.io/etcd/pkg/v3/traceutil"
|
"go.etcd.io/etcd/pkg/v3/traceutil"
|
||||||
"go.etcd.io/etcd/server/v3/auth"
|
"go.etcd.io/etcd/server/v3/auth"
|
||||||
|
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||||
"go.etcd.io/etcd/server/v3/lease"
|
"go.etcd.io/etcd/server/v3/lease"
|
||||||
"go.etcd.io/etcd/server/v3/mvcc"
|
"go.etcd.io/etcd/server/v3/mvcc"
|
||||||
)
|
)
|
||||||
|
@ -41,7 +42,7 @@ func newAuthApplierV3(as auth.AuthStore, base applierV3, lessor lease.Lessor) *a
|
||||||
return &authApplierV3{applierV3: base, as: as, lessor: lessor}
|
return &authApplierV3{applierV3: base, as: as, lessor: lessor}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, shouldApplyV3 bool) *applyResult {
|
func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {
|
||||||
aa.mu.Lock()
|
aa.mu.Lock()
|
||||||
defer aa.mu.Unlock()
|
defer aa.mu.Unlock()
|
||||||
if r.Header != nil {
|
if r.Header != nil {
|
||||||
|
|
|
@ -2031,12 +2031,16 @@ func (s *EtcdServer) apply(
|
||||||
s.setTerm(e.Term)
|
s.setTerm(e.Term)
|
||||||
|
|
||||||
case raftpb.EntryConfChange:
|
case raftpb.EntryConfChange:
|
||||||
|
// We need to apply all WAL entries on top of v2store
|
||||||
|
// and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
|
||||||
|
shouldApplyV3 := membership.ApplyV2storeOnly
|
||||||
|
|
||||||
// set the consistent index of current executing entry
|
// set the consistent index of current executing entry
|
||||||
shouldApplyV3 := false
|
|
||||||
if e.Index > s.consistIndex.ConsistentIndex() {
|
if e.Index > s.consistIndex.ConsistentIndex() {
|
||||||
s.consistIndex.SetConsistentIndex(e.Index)
|
s.consistIndex.SetConsistentIndex(e.Index)
|
||||||
shouldApplyV3 = true
|
shouldApplyV3 = membership.ApplyBoth
|
||||||
}
|
}
|
||||||
|
|
||||||
var cc raftpb.ConfChange
|
var cc raftpb.ConfChange
|
||||||
pbutil.MustUnmarshal(&cc, e.Data)
|
pbutil.MustUnmarshal(&cc, e.Data)
|
||||||
removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
|
removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
|
||||||
|
@ -2059,17 +2063,17 @@ func (s *EtcdServer) apply(
|
||||||
|
|
||||||
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
|
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
|
||||||
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||||
shouldApplyV3 := false
|
shouldApplyV3 := membership.ApplyV2storeOnly
|
||||||
index := s.consistIndex.ConsistentIndex()
|
index := s.consistIndex.ConsistentIndex()
|
||||||
if e.Index > index {
|
if e.Index > index {
|
||||||
// set the consistent index of current executing entry
|
// set the consistent index of current executing entry
|
||||||
s.consistIndex.SetConsistentIndex(e.Index)
|
s.consistIndex.SetConsistentIndex(e.Index)
|
||||||
shouldApplyV3 = true
|
shouldApplyV3 = membership.ApplyBoth
|
||||||
}
|
}
|
||||||
s.lg.Debug("apply entry normal",
|
s.lg.Debug("apply entry normal",
|
||||||
zap.Uint64("consistent-index", index),
|
zap.Uint64("consistent-index", index),
|
||||||
zap.Uint64("entry-index", e.Index),
|
zap.Uint64("entry-index", e.Index),
|
||||||
zap.Bool("should-applyV3", shouldApplyV3))
|
zap.Bool("should-applyV3", bool(shouldApplyV3)))
|
||||||
|
|
||||||
// raft state machine may generate noop entry when leader confirmation.
|
// raft state machine may generate noop entry when leader confirmation.
|
||||||
// skip it in advance to avoid some potential bug in the future
|
// skip it in advance to avoid some potential bug in the future
|
||||||
|
@ -2151,7 +2155,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||||
|
|
||||||
// applyConfChange applies a ConfChange to the server. It is only
|
// applyConfChange applies a ConfChange to the server. It is only
|
||||||
// invoked with a ConfChange that has already passed through Raft
|
// invoked with a ConfChange that has already passed through Raft
|
||||||
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 bool) (bool, error) {
|
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 membership.ShouldApplyV3) (bool, error) {
|
||||||
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
|
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
|
||||||
cc.NodeID = raft.None
|
cc.NodeID = raft.None
|
||||||
s.r.ApplyConfChange(cc)
|
s.r.ApplyConfChange(cc)
|
||||||
|
|
Loading…
Reference in New Issue