From ba9fd620e89254f60c18b8fd3383fafb65ee183d Mon Sep 17 00:00:00 2001 From: WizardCXY Date: Thu, 21 Mar 2019 18:06:26 +0800 Subject: [PATCH] etcdserver: support MemberPromote for learner --- etcdserver/api/membership/cluster.go | 30 ++++++++++--- etcdserver/api/membership/errors.go | 9 ++-- etcdserver/api/v2v3/server.go | 8 ++++ etcdserver/api/v3rpc/member.go | 8 ++-- etcdserver/api/v3rpc/rpctypes/error.go | 1 + etcdserver/api/v3rpc/util.go | 1 + etcdserver/server.go | 61 +++++++++++++++++++++++++- etcdserver/server_test.go | 48 ++++++++++++++++++++ 8 files changed, 151 insertions(+), 15 deletions(-) diff --git a/etcdserver/api/membership/cluster.go b/etcdserver/api/membership/cluster.go index bfe250cb5..a8e55aacc 100644 --- a/etcdserver/api/membership/cluster.go +++ b/etcdserver/api/membership/cluster.go @@ -252,6 +252,16 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { } } +// IsPromoteChange checks if m is a promoteChange +func (c *RaftCluster) IsPromoteChange(m *Member) bool { + members, _ := membersFromStore(c.lg, c.v2store) + + if members[m.ID] != nil && members[m.ID].IsLearner && !m.IsLearner { + return true + } + return false +} + // ValidateConfigurationChange takes a proposed ConfChange and // ensures that it is still valid. func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { @@ -262,9 +272,6 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { } switch cc.Type { case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode: - if members[id] != nil { - return ErrIDExists - } urls := make(map[string]bool) for _, m := range members { for _, u := range m.PeerURLs { @@ -279,12 +286,21 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { plog.Panicf("unmarshal member should never fail: %v", err) } } - for _, u := range m.PeerURLs { - if urls[u] { - return ErrPeerURLexists + + if members[id] != nil && members[id].IsLearner && cc.Type == raftpb.ConfChangeAddNode { + // TODO promote a learner node case check + } else { + // add a member leanrner or a follower case + if members[id] != nil { + return ErrIDExists + } + + for _, u := range m.PeerURLs { + if urls[u] { + return ErrPeerURLexists + } } } - case raftpb.ConfChangeRemoveNode: if members[id] == nil { return ErrIDNotFound diff --git a/etcdserver/api/membership/errors.go b/etcdserver/api/membership/errors.go index a92f9582b..f34e8849a 100644 --- a/etcdserver/api/membership/errors.go +++ b/etcdserver/api/membership/errors.go @@ -21,10 +21,11 @@ import ( ) var ( - ErrIDRemoved = errors.New("membership: ID removed") - ErrIDExists = errors.New("membership: ID exists") - ErrIDNotFound = errors.New("membership: ID not found") - ErrPeerURLexists = errors.New("membership: peerURL exists") + ErrIDRemoved = errors.New("membership: ID removed") + ErrIDExists = errors.New("membership: ID exists") + ErrIDNotFound = errors.New("membership: ID not found") + ErrPeerURLexists = errors.New("membership: peerURL exists") + ErrPromotionFailed = errors.New("membership: promotion failed") ) func isKeyNotFound(err error) bool { diff --git a/etcdserver/api/v2v3/server.go b/etcdserver/api/v2v3/server.go index fa9d66d75..0093f6e0a 100644 --- a/etcdserver/api/v2v3/server.go +++ b/etcdserver/api/v2v3/server.go @@ -79,6 +79,14 @@ func (s *v2v3Server) RemoveMember(ctx context.Context, id uint64) ([]*membership return v3MembersToMembership(resp.Members), nil } +func (s *v2v3Server) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) { + resp, err := s.c.MemberPromote(ctx, id) + if err != nil { + return nil, err + } + return v3MembersToMembership(resp.Members), nil +} + func (s *v2v3Server) UpdateMember(ctx context.Context, m membership.Member) ([]*membership.Member, error) { resp, err := s.c.MemberUpdate(ctx, uint64(m.ID), m.PeerURLs) if err != nil { diff --git a/etcdserver/api/v3rpc/member.go b/etcdserver/api/v3rpc/member.go index 16813836d..d749f47db 100644 --- a/etcdserver/api/v3rpc/member.go +++ b/etcdserver/api/v3rpc/member.go @@ -16,7 +16,6 @@ package v3rpc import ( "context" - "errors" "time" "go.etcd.io/etcd/v3/etcdserver" @@ -94,8 +93,11 @@ func (cs *ClusterServer) MemberList(ctx context.Context, r *pb.MemberListRequest } func (cs *ClusterServer) MemberPromote(ctx context.Context, r *pb.MemberPromoteRequest) (*pb.MemberPromoteResponse, error) { - // TODO: implement - return nil, errors.New("not implemented") + membs, err := cs.server.PromoteMember(ctx, r.ID) + if err != nil { + return nil, togRPCError(err) + } + return &pb.MemberPromoteResponse{Header: cs.header(), Members: membersToProtoMembers(membs)}, nil } func (cs *ClusterServer) header() *pb.ResponseHeader { diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 3d1ee11b0..e2431318c 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -40,6 +40,7 @@ var ( ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err() ErrGRPCMemberBadURLs = status.New(codes.InvalidArgument, "etcdserver: given member URLs are invalid").Err() ErrGRPCMemberNotFound = status.New(codes.NotFound, "etcdserver: member not found").Err() + ErrGRPCMemberPromtotionFailed = status.New(codes.FailedPrecondition, "etcdserver: learner member promotion failed").Err() ErrGRPCRequestTooLarge = status.New(codes.InvalidArgument, "etcdserver: request is too large").Err() ErrGRPCRequestTooManyRequests = status.New(codes.ResourceExhausted, "etcdserver: too many requests").Err() diff --git a/etcdserver/api/v3rpc/util.go b/etcdserver/api/v3rpc/util.go index 37443406e..6bbed1c45 100644 --- a/etcdserver/api/v3rpc/util.go +++ b/etcdserver/api/v3rpc/util.go @@ -35,6 +35,7 @@ var toGRPCErrorMap = map[error]error{ membership.ErrIDNotFound: rpctypes.ErrGRPCMemberNotFound, membership.ErrIDExists: rpctypes.ErrGRPCMemberExist, membership.ErrPeerURLexists: rpctypes.ErrGRPCPeerURLExist, + membership.ErrPromotionFailed: rpctypes.ErrGRPCMemberPromtotionFailed, etcdserver.ErrNotEnoughStartedMembers: rpctypes.ErrMemberNotEnoughStarted, mvcc.ErrCompacted: rpctypes.ErrGRPCCompacted, diff --git a/etcdserver/server.go b/etcdserver/server.go index 423a6e96c..3b4e5f886 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -156,6 +156,10 @@ type Server interface { // UpdateMember attempts to update an existing member in the cluster. It will // return ErrIDNotFound if the member ID does not exist. UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error) + // PromoteMember attempts to promote a non-voting node to a voting node. It will + // return ErrIDNotFound if the member ID does not exist. + // return ErrPromotionFailed if the member can't be promoted. + PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) // ClusterVersion is the cluster-wide minimum major.minor version. // Cluster version is set to the min version that an etcd member is @@ -1611,6 +1615,56 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership return s.configure(ctx, cc) } +// PromoteMember promotes a learner node to a voting node. +func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) { + if err := s.checkMembershipOperationPermission(ctx); err != nil { + return nil, err + } + + if err := s.mayPromoteMember(types.ID(id)); err != nil { + return nil, err + } + + var memb membership.Member + members := s.cluster.Members() + isExist := false + for _, member := range members { + if uint64(member.ID) == id { + memb = *member + isExist = true + break + } + } + + if !isExist { + return nil, membership.ErrIDNotFound + } + memb.IsLearner = false + + b, err := json.Marshal(memb) + if err != nil { + return nil, err + } + + cc := raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: id, + Context: b, + } + + return s.configure(ctx, cc) +} + +func (s *EtcdServer) mayPromoteMember(id types.ID) error { + if !s.Cfg.StrictReconfigCheck { + return nil + } + // TODO add more checks whether the member can be promoted. + // like learner progress check or if cluster is ready to promote a learner + + return nil +} + func (s *EtcdServer) mayRemoveMember(id types.ID) error { if !s.Cfg.StrictReconfigCheck { return nil @@ -2080,7 +2134,12 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con plog.Panicf("nodeID should always be equal to member ID") } } - s.cluster.AddMember(m) + if s.cluster.IsPromoteChange(m) { + s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) + } else { + s.cluster.AddMember(m) + } + if m.ID != s.id { s.r.transport.AddPeer(m.ID, m.PeerURLs) } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index a8dbbb333..dbdeb1dbf 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1318,6 +1318,54 @@ func TestRemoveMember(t *testing.T) { } } +// TestPromoteMember tests PromoteMember can propose and perform learner node promotion. +func TestPromoteMember(t *testing.T) { + n := newNodeConfChangeCommitterRecorder() + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{RaftState: raft.StateLeader}, + } + cl := newTestCluster(nil) + st := v2store.New() + cl.SetStore(v2store.New()) + cl.AddMember(&membership.Member{ + ID: 1234, + RaftAttributes: membership.RaftAttributes{ + IsLearner: true, + }, + }) + r := newRaftNode(raftNodeConfig{ + lg: zap.NewExample(), + Node: n, + raftStorage: raft.NewMemoryStorage(), + storage: mockstorage.NewStorageRecorder(""), + transport: newNopTransporter(), + }) + s := &EtcdServer{ + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + r: *r, + v2store: st, + cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, + } + s.start() + _, err := s.PromoteMember(context.TODO(), 1234) + gaction := n.Action() + s.Stop() + + if err != nil { + t.Fatalf("PromoteMember error: %v", err) + } + wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeAddNode"}, {Name: "ApplyConfChange:ConfChangeAddNode"}} + if !reflect.DeepEqual(gaction, wactions) { + t.Errorf("action = %v, want %v", gaction, wactions) + } + if cl.Member(1234).IsLearner == true { + t.Errorf("member with id 1234 is not promoted") + } +} + // TestUpdateMember tests RemoveMember can propose and perform node update. func TestUpdateMember(t *testing.T) { n := newNodeConfChangeCommitterRecorder()