From 596779400900221aedac7e6d3610fb03b9274578 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 11 Nov 2014 11:46:49 -0800 Subject: [PATCH] *: support updating advertise-peer-url Users might want to update the peerurl of the etcd member in several cases. For example, if the IP address of the physical machine etcd running on is changed, user need to update the adversite-pee-rurl accordingly. This commit makes etcd support updating the advertise-peer-url of its members. --- etcdserver/cluster.go | 45 +++++- etcdserver/cluster_test.go | 55 ++++++- etcdserver/etcdhttp/client.go | 91 +++++++---- etcdserver/etcdhttp/client_test.go | 193 +++++++++++++++++++++++- etcdserver/etcdhttp/http_test.go | 3 + etcdserver/etcdhttp/httptypes/member.go | 4 + etcdserver/sender.go | 23 +++ etcdserver/server.go | 32 +++- etcdserver/server_test.go | 39 ++++- raft/node.go | 2 + raft/raftpb/raft.pb.go | 3 + raft/raftpb/raft.proto | 1 + 12 files changed, 452 insertions(+), 39 deletions(-) diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 461488f51..362b91569 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -263,12 +263,13 @@ func (c *Cluster) SetStore(st store.Store) { c.store = st } // ensures that it is still valid. func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { members, removed := membersFromStore(c.store) - if removed[types.ID(cc.NodeID)] { + id := types.ID(cc.NodeID) + if removed[id] { return ErrIDRemoved } switch cc.Type { case raftpb.ConfChangeAddNode: - if members[types.ID(cc.NodeID)] != nil { + if members[id] != nil { return ErrIDExists } urls := make(map[string]bool) @@ -287,11 +288,33 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { } } case raftpb.ConfChangeRemoveNode: - if members[types.ID(cc.NodeID)] == nil { + if members[id] == nil { return ErrIDNotFound } + case raftpb.ConfChangeUpdateNode: + if members[id] == nil { + return ErrIDNotFound + } + urls := make(map[string]bool) + for _, m := range members { + if m.ID == id { + continue + } + for _, u := range m.PeerURLs { + urls[u] = true + } + } + m := new(Member) + if err := json.Unmarshal(cc.Context, m); err != nil { + log.Panicf("unmarshal member should never fail: %v", err) + } + for _, u := range m.PeerURLs { + if urls[u] { + return ErrPeerURLexists + } + } default: - log.Panicf("ConfChange type should be either AddNode or RemoveNode") + log.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode") } return nil } @@ -341,6 +364,20 @@ func (c *Cluster) UpdateMemberAttributes(id types.ID, attr Attributes) { c.members[id].Attributes = attr } +func (c *Cluster) UpdateMember(nm *Member) { + c.Lock() + defer c.Unlock() + b, err := json.Marshal(nm.RaftAttributes) + if err != nil { + log.Panicf("marshal raftAttributes should never fail: %v", err) + } + p := path.Join(memberStoreKey(nm.ID), raftAttributesSuffix) + if _, err := c.store.Update(p, string(b), store.Permanent); err != nil { + log.Panicf("update raftAttributes should never fail: %v", err) + } + c.members[nm.ID].RaftAttributes = nm.RaftAttributes +} + // nodeToMember builds member through a store node. // the child nodes of the given node should be sorted by key. func nodeToMember(n *store.NodeExtern) (*Member, error) { diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 101d84e8d..08c34a9b5 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -362,7 +362,25 @@ func TestClusterValidateConfigurationChange(t *testing.T) { cl.RemoveMember(4) attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}} - cxt, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) + ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + + attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}} + ctx5, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + + attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 3)}} + ctx2to3, err := json.Marshal(&Member{ID: types.ID(2), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + + attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}} + ctx2to5, err := json.Marshal(&Member{ID: types.ID(2), RaftAttributes: attr}) if err != nil { t.Fatal(err) } @@ -403,7 +421,7 @@ func TestClusterValidateConfigurationChange(t *testing.T) { raftpb.ConfChange{ Type: raftpb.ConfChangeAddNode, NodeID: 5, - Context: cxt, + Context: ctx, }, ErrPeerURLexists, }, @@ -414,6 +432,39 @@ func TestClusterValidateConfigurationChange(t *testing.T) { }, ErrIDNotFound, }, + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: 5, + Context: ctx5, + }, + nil, + }, + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeUpdateNode, + NodeID: 5, + Context: ctx, + }, + ErrIDNotFound, + }, + // try to change the peer url of 2 to the peer url of 3 + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeUpdateNode, + NodeID: 2, + Context: ctx2to3, + }, + ErrPeerURLexists, + }, + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeUpdateNode, + NodeID: 2, + Context: ctx2to5, + }, + nil, + }, } for i, tt := range tests { err := cl.ValidateConfigurationChange(tt.cc) diff --git a/etcdserver/etcdhttp/client.go b/etcdserver/etcdhttp/client.go index 576c61431..e9e47ed7a 100644 --- a/etcdserver/etcdhttp/client.go +++ b/etcdserver/etcdhttp/client.go @@ -148,7 +148,7 @@ type membersHandler struct { } func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "GET", "POST", "DELETE") { + if !allowMethod(w, r.Method, "GET", "POST", "DELETE", "PUT") { return } w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String()) @@ -168,25 +168,13 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Printf("etcdhttp: %v", err) } case "POST": - ctype := r.Header.Get("Content-Type") - if ctype != "application/json" { - writeError(w, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype))) - return - } - b, err := ioutil.ReadAll(r.Body) - if err != nil { - writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) - return - } req := httptypes.MemberCreateRequest{} - if err := json.Unmarshal(b, &req); err != nil { - writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) + if ok := unmarshalRequest(r, &req, w); !ok { return } - now := h.clock.Now() m := etcdserver.NewMember("", req.PeerURLs, "", &now) - err = h.server.AddMember(ctx, *m) + err := h.server.AddMember(ctx, *m) switch { case err == etcdserver.ErrIDExists || err == etcdserver.ErrPeerURLexists: writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error())) @@ -203,28 +191,47 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Printf("etcdhttp: %v", err) } case "DELETE": - idStr := trimPrefix(r.URL.Path, membersPrefix) - if idStr == "" { - http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + id, ok := getID(r.URL.Path, w) + if !ok { return } - id, err := types.IDFromString(idStr) - if err != nil { - writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) - return - } - err = h.server.RemoveMember(ctx, uint64(id)) + err := h.server.RemoveMember(ctx, uint64(id)) switch { case err == etcdserver.ErrIDRemoved: - writeError(w, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", idStr))) + writeError(w, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id))) case err == etcdserver.ErrIDNotFound: - writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) + writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) case err != nil: log.Printf("etcdhttp: error removing node %s: %v", id, err) writeError(w, err) default: w.WriteHeader(http.StatusNoContent) } + case "PUT": + id, ok := getID(r.URL.Path, w) + if !ok { + return + } + req := httptypes.MemberUpdateRequest{} + if ok := unmarshalRequest(r, &req, w); !ok { + return + } + m := etcdserver.Member{ + ID: id, + RaftAttributes: etcdserver.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()}, + } + err := h.server.UpdateMember(ctx, m) + switch { + case err == etcdserver.ErrPeerURLexists: + writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error())) + case err == etcdserver.ErrIDNotFound: + writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) + case err != nil: + log.Printf("etcdhttp: error updating node %s: %v", m.ID, err) + writeError(w, err) + default: + w.WriteHeader(http.StatusNoContent) + } } } @@ -506,6 +513,38 @@ func trimErrorPrefix(err error, prefix string) error { return err } +func unmarshalRequest(r *http.Request, req json.Unmarshaler, w http.ResponseWriter) bool { + ctype := r.Header.Get("Content-Type") + if ctype != "application/json" { + writeError(w, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype))) + return false + } + b, err := ioutil.ReadAll(r.Body) + if err != nil { + writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) + return false + } + if err := req.UnmarshalJSON(b); err != nil { + writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) + return false + } + return true +} + +func getID(p string, w http.ResponseWriter) (types.ID, bool) { + idStr := trimPrefix(p, membersPrefix) + if idStr == "" { + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return 0, false + } + id, err := types.IDFromString(idStr) + if err != nil { + writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) + return 0, false + } + return id, true +} + // getUint64 extracts a uint64 by the given key from a Form. If the key does // not exist in the form, 0 is returned. If the key exists but the value is // badly formed, an error is returned. If multiple values are present only the diff --git a/etcdserver/etcdhttp/client_test.go b/etcdserver/etcdhttp/client_test.go index 78e0a4e0a..bf4f21e6c 100644 --- a/etcdserver/etcdhttp/client_test.go +++ b/etcdserver/etcdhttp/client_test.go @@ -111,6 +111,11 @@ func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error { return nil } +func (s *serverRecorder) UpdateMember(_ context.Context, m etcdserver.Member) error { + s.actions = append(s.actions, action{name: "UpdateMember", params: []interface{}{m}}) + return nil +} + type action struct { name string params []interface{} @@ -136,11 +141,12 @@ type resServer struct { func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) { return rs.res, nil } -func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } -func (rs *resServer) Start() {} -func (rs *resServer) Stop() {} -func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil } -func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil } +func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } +func (rs *resServer) Start() {} +func (rs *resServer) Stop() {} +func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil } +func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil } +func (rs *resServer) UpdateMember(_ context.Context, _ etcdserver.Member) error { return nil } func boolp(b bool) *bool { return &b } @@ -698,6 +704,48 @@ func TestServeMembersDelete(t *testing.T) { } } +func TestServeMembersUpdate(t *testing.T) { + u := mustNewURL(t, path.Join(membersPrefix, "1")) + b := []byte(`{"peerURLs":["http://127.0.0.1:1"]}`) + req, err := http.NewRequest("PUT", u.String(), bytes.NewReader(b)) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Content-Type", "application/json") + s := &serverRecorder{} + h := &membersHandler{ + server: s, + clock: clockwork.NewFakeClock(), + clusterInfo: &fakeCluster{id: 1}, + } + rw := httptest.NewRecorder() + + h.ServeHTTP(rw, req) + + wcode := http.StatusNoContent + if rw.Code != wcode { + t.Errorf("code=%d, want %d", rw.Code, wcode) + } + + gcid := rw.Header().Get("X-Etcd-Cluster-ID") + wcid := h.clusterInfo.ID().String() + if gcid != wcid { + t.Errorf("cid = %s, want %s", gcid, wcid) + } + + wm := etcdserver.Member{ + ID: 1, + RaftAttributes: etcdserver.RaftAttributes{ + PeerURLs: []string{"http://127.0.0.1:1"}, + }, + } + + wactions := []action{{name: "UpdateMember", params: []interface{}{wm}}} + if !reflect.DeepEqual(s.actions, wactions) { + t.Errorf("actions = %+v, want %+v", s.actions, wactions) + } +} + func TestServeMembersFail(t *testing.T) { tests := []struct { req *http.Request @@ -855,6 +903,104 @@ func TestServeMembersFail(t *testing.T) { }, nil, + http.StatusMethodNotAllowed, + }, + { + // parse body error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader("bad json")), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &resServer{}, + + http.StatusBadRequest, + }, + { + // bad content type + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/bad"}}, + }, + &errServer{}, + + http.StatusUnsupportedMediaType, + }, + { + // bad url + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://a"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{}, + + http.StatusBadRequest, + }, + { + // etcdserver.UpdateMember error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{ + errors.New("blah"), + }, + + http.StatusInternalServerError, + }, + { + // etcdserver.UpdateMember error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{ + etcdserver.ErrPeerURLexists, + }, + + http.StatusConflict, + }, + { + // etcdserver.UpdateMember error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{ + etcdserver.ErrIDNotFound, + }, + + http.StatusNotFound, + }, + { + // etcdserver.UpdateMember error with badly formed ID + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "bad_id")), + Method: "PUT", + }, + nil, + + http.StatusNotFound, + }, + { + // etcdserver.UpdateMember with no ID + &http.Request{ + URL: mustNewURL(t, membersPrefix), + Method: "PUT", + }, + nil, + http.StatusMethodNotAllowed, }, } @@ -995,6 +1141,43 @@ func TestServeMachines(t *testing.T) { } } +func TestGetID(t *testing.T) { + tests := []struct { + path string + + wok bool + wid types.ID + wcode int + }{ + { + "123", + true, 0x123, http.StatusOK, + }, + { + "bad_id", + false, 0, http.StatusNotFound, + }, + { + "", + false, 0, http.StatusMethodNotAllowed, + }, + } + + for i, tt := range tests { + w := httptest.NewRecorder() + id, ok := getID(tt.path, w) + if id != tt.wid { + t.Errorf("#%d: id = %d, want %d", i, id, tt.wid) + } + if ok != tt.wok { + t.Errorf("#%d: ok = %t, want %t", i, ok, tt.wok) + } + if w.Code != tt.wcode { + t.Errorf("#%d code = %d, want %d", i, w.Code, tt.wcode) + } + } +} + type dummyStats struct { data []byte } diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 476a10725..9813ed47d 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -79,6 +79,9 @@ func (fs *errServer) AddMember(ctx context.Context, m etcdserver.Member) error { func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error { return fs.err } +func (fs *errServer) UpdateMember(ctx context.Context, m etcdserver.Member) error { + return fs.err +} func TestWriteError(t *testing.T) { // nil error should not panic diff --git a/etcdserver/etcdhttp/httptypes/member.go b/etcdserver/etcdhttp/httptypes/member.go index 6d8554ed0..a71457bcb 100644 --- a/etcdserver/etcdhttp/httptypes/member.go +++ b/etcdserver/etcdhttp/httptypes/member.go @@ -33,6 +33,10 @@ type MemberCreateRequest struct { PeerURLs types.URLs } +type MemberUpdateRequest struct { + MemberCreateRequest +} + func (m *MemberCreateRequest) MarshalJSON() ([]byte, error) { s := struct { PeerURLs []string `json:"peerURLs"` diff --git a/etcdserver/sender.go b/etcdserver/sender.go index dbe846f76..b2512057c 100644 --- a/etcdserver/sender.go +++ b/etcdserver/sender.go @@ -21,6 +21,9 @@ import ( "fmt" "log" "net/http" + "net/url" + "path" + "sync" "time" "github.com/coreos/etcd/etcdserver/stats" @@ -108,12 +111,30 @@ func (h *sendHub) Remove(id types.ID) { delete(h.senders, id) } +func (h *sendHub) Update(m *Member) { + // TODO: return error or just panic? + if _, ok := h.senders[m.ID]; !ok { + return + } + peerURL := m.PickPeerURL() + u, err := url.Parse(peerURL) + if err != nil { + log.Panicf("unexpect peer url %s", peerURL) + } + u.Path = path.Join(u.Path, raftPrefix) + s := h.senders[m.ID] + s.mu.Lock() + defer s.mu.Unlock() + s.u = u.String() +} + type sender struct { u string cid types.ID c *http.Client fs *stats.FollowerStats q chan []byte + mu sync.RWMutex } func newSender(u string, cid types.ID, c *http.Client, fs *stats.FollowerStats) *sender { @@ -159,7 +180,9 @@ func (s *sender) handle() { // post POSTs a data payload to a url. Returns nil if the POST succeeds, // error on any failure. func (s *sender) post(data []byte) error { + s.mu.RLock() req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data)) + s.mu.RUnlock() if err != nil { return fmt.Errorf("new request to %s error: %v", s.u, err) } diff --git a/etcdserver/server.go b/etcdserver/server.go index a33f3c227..19072c765 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -89,6 +89,7 @@ type Sender interface { Send(m []raftpb.Message) Add(m *Member) Remove(id types.ID) + Update(m *Member) Stop() } @@ -114,7 +115,7 @@ type Server interface { // Stop terminates the Server and performs any necessary finalization. // Do and Process cannot be called after Stop has been invoked. Stop() - // Do takes a request and attempts to fulfil it, returning a Response. + // Do takes a request and attempts to fulfill it, returning a Response. Do(ctx context.Context, r pb.Request) (Response, error) // Process takes a raft message and applies it to the server's raft state // machine, respecting any timeout of the given context. @@ -127,6 +128,10 @@ type Server interface { // return ErrIDRemoved if member ID is removed from the cluster, or return // ErrIDNotFound if member ID is not in the cluster. RemoveMember(ctx context.Context, id uint64) error + + // UpdateMember attempts to update a existing member in the cluster. It will + // return ErrIDNotFound if the member ID does not exist. + UpdateMember(ctx context.Context, updateMemb Member) error } type Stats interface { @@ -475,6 +480,20 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error { return s.configure(ctx, cc) } +func (s *EtcdServer) UpdateMember(ctx context.Context, memb Member) error { + b, err := json.Marshal(memb) + if err != nil { + return err + } + cc := raftpb.ConfChange{ + ID: GenID(), + Type: raftpb.ConfChangeUpdateNode, + NodeID: uint64(memb.ID), + Context: b, + } + return s.configure(ctx, cc) +} + // Implement the RaftTimer interface func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.raftIndex) @@ -672,6 +691,17 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error { s.Cluster.RemoveMember(id) s.sender.Remove(id) log.Printf("etcdserver: removed node %s from cluster %s", id, s.Cluster.ID()) + case raftpb.ConfChangeUpdateNode: + m := new(Member) + if err := json.Unmarshal(cc.Context, m); err != nil { + log.Panicf("unmarshal member should never fail: %v", err) + } + if cc.NodeID != uint64(m.ID) { + log.Panicf("nodeID should always be equal to member ID") + } + s.Cluster.UpdateMember(m) + s.sender.Update(m) + log.Printf("etcdserver: update node %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } return nil } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 5361b264e..566e8cc21 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -447,7 +447,7 @@ func TestApplyConfChangeError(t *testing.T) { }, { raftpb.ConfChange{ - Type: raftpb.ConfChangeRemoveNode, + Type: raftpb.ConfChangeUpdateNode, NodeID: 4, }, ErrIDRemoved, @@ -503,6 +503,7 @@ func (s *fakeSender) Send(msgs []raftpb.Message) { } } func (s *fakeSender) Add(m *Member) {} +func (s *fakeSender) Update(m *Member) {} func (s *fakeSender) Remove(id types.ID) {} func (s *fakeSender) Stop() {} @@ -1017,6 +1018,41 @@ func TestRemoveMember(t *testing.T) { } } +// TestUpdateMember tests RemoveMember can propose and perform node update. +func TestUpdateMember(t *testing.T) { + n := newNodeConfChangeCommitterRecorder() + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{ + RaftState: raft.StateLeader, + Nodes: []uint64{1234, 2345, 3456}, + }, + } + cl := newTestCluster([]*Member{{ID: 1234}}) + s := &EtcdServer{ + node: n, + store: &storeRecorder{}, + sender: &nopSender{}, + storage: &storageRecorder{}, + Cluster: cl, + } + s.start() + wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} + err := s.UpdateMember(context.TODO(), wm) + gaction := n.Action() + s.Stop() + + if err != nil { + t.Fatalf("UpdateMember error: %v", err) + } + wactions := []action{action{name: "ProposeConfChange:ConfChangeUpdateNode"}, action{name: "ApplyConfChange:ConfChangeUpdateNode"}} + if !reflect.DeepEqual(gaction, wactions) { + t.Errorf("action = %v, want %v", gaction, wactions) + } + if !reflect.DeepEqual(cl.Member(1234), &wm) { + t.Errorf("member = %v, want %v", cl.Member(1234), &wm) + } +} + // TODO: test server could stop itself when being removed // TODO: test wait trigger correctness in multi-server case @@ -1446,6 +1482,7 @@ type nopSender struct{} func (s *nopSender) Send(m []raftpb.Message) {} func (s *nopSender) Add(m *Member) {} func (s *nopSender) Remove(id types.ID) {} +func (s *nopSender) Update(m *Member) {} func (s *nopSender) Stop() {} func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer { diff --git a/raft/node.go b/raft/node.go index df299c56a..a6f269826 100644 --- a/raft/node.go +++ b/raft/node.go @@ -271,6 +271,8 @@ func (n *node) run(r *raft) { r.addNode(cc.NodeID) case pb.ConfChangeRemoveNode: r.removeNode(cc.NodeID) + case pb.ConfChangeUpdateNode: + r.resetPendingConf() default: panic("unexpected conf type") } diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index db726576c..03bdb8c83 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -120,15 +120,18 @@ type ConfChangeType int32 const ( ConfChangeAddNode ConfChangeType = 0 ConfChangeRemoveNode ConfChangeType = 1 + ConfChangeUpdateNode ConfChangeType = 2 ) var ConfChangeType_name = map[int32]string{ 0: "ConfChangeAddNode", 1: "ConfChangeRemoveNode", + 2: "ConfChangeUpdateNode", } var ConfChangeType_value = map[string]int32{ "ConfChangeAddNode": 0, "ConfChangeRemoveNode": 1, + "ConfChangeUpdateNode": 2, } func (x ConfChangeType) Enum() *ConfChangeType { diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index 47c9ec2ef..7b60393b8 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -60,6 +60,7 @@ message HardState { enum ConfChangeType { ConfChangeAddNode = 0; ConfChangeRemoveNode = 1; + ConfChangeUpdateNode = 2; } message ConfChange {