diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 461488f51..362b91569 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -263,12 +263,13 @@ func (c *Cluster) SetStore(st store.Store) { c.store = st } // ensures that it is still valid. func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { members, removed := membersFromStore(c.store) - if removed[types.ID(cc.NodeID)] { + id := types.ID(cc.NodeID) + if removed[id] { return ErrIDRemoved } switch cc.Type { case raftpb.ConfChangeAddNode: - if members[types.ID(cc.NodeID)] != nil { + if members[id] != nil { return ErrIDExists } urls := make(map[string]bool) @@ -287,11 +288,33 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { } } case raftpb.ConfChangeRemoveNode: - if members[types.ID(cc.NodeID)] == nil { + if members[id] == nil { return ErrIDNotFound } + case raftpb.ConfChangeUpdateNode: + if members[id] == nil { + return ErrIDNotFound + } + urls := make(map[string]bool) + for _, m := range members { + if m.ID == id { + continue + } + for _, u := range m.PeerURLs { + urls[u] = true + } + } + m := new(Member) + if err := json.Unmarshal(cc.Context, m); err != nil { + log.Panicf("unmarshal member should never fail: %v", err) + } + for _, u := range m.PeerURLs { + if urls[u] { + return ErrPeerURLexists + } + } default: - log.Panicf("ConfChange type should be either AddNode or RemoveNode") + log.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode") } return nil } @@ -341,6 +364,20 @@ func (c *Cluster) UpdateMemberAttributes(id types.ID, attr Attributes) { c.members[id].Attributes = attr } +func (c *Cluster) UpdateMember(nm *Member) { + c.Lock() + defer c.Unlock() + b, err := json.Marshal(nm.RaftAttributes) + if err != nil { + log.Panicf("marshal raftAttributes should never fail: %v", err) + } + p := path.Join(memberStoreKey(nm.ID), raftAttributesSuffix) + if _, err := c.store.Update(p, string(b), store.Permanent); err != nil { + log.Panicf("update raftAttributes should never fail: %v", err) + } + c.members[nm.ID].RaftAttributes = nm.RaftAttributes +} + // nodeToMember builds member through a store node. // the child nodes of the given node should be sorted by key. func nodeToMember(n *store.NodeExtern) (*Member, error) { diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 101d84e8d..08c34a9b5 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -362,7 +362,25 @@ func TestClusterValidateConfigurationChange(t *testing.T) { cl.RemoveMember(4) attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}} - cxt, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) + ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + + attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}} + ctx5, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + + attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 3)}} + ctx2to3, err := json.Marshal(&Member{ID: types.ID(2), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + + attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}} + ctx2to5, err := json.Marshal(&Member{ID: types.ID(2), RaftAttributes: attr}) if err != nil { t.Fatal(err) } @@ -403,7 +421,7 @@ func TestClusterValidateConfigurationChange(t *testing.T) { raftpb.ConfChange{ Type: raftpb.ConfChangeAddNode, NodeID: 5, - Context: cxt, + Context: ctx, }, ErrPeerURLexists, }, @@ -414,6 +432,39 @@ func TestClusterValidateConfigurationChange(t *testing.T) { }, ErrIDNotFound, }, + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: 5, + Context: ctx5, + }, + nil, + }, + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeUpdateNode, + NodeID: 5, + Context: ctx, + }, + ErrIDNotFound, + }, + // try to change the peer url of 2 to the peer url of 3 + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeUpdateNode, + NodeID: 2, + Context: ctx2to3, + }, + ErrPeerURLexists, + }, + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeUpdateNode, + NodeID: 2, + Context: ctx2to5, + }, + nil, + }, } for i, tt := range tests { err := cl.ValidateConfigurationChange(tt.cc) diff --git a/etcdserver/etcdhttp/client.go b/etcdserver/etcdhttp/client.go index 576c61431..e9e47ed7a 100644 --- a/etcdserver/etcdhttp/client.go +++ b/etcdserver/etcdhttp/client.go @@ -148,7 +148,7 @@ type membersHandler struct { } func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "GET", "POST", "DELETE") { + if !allowMethod(w, r.Method, "GET", "POST", "DELETE", "PUT") { return } w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String()) @@ -168,25 +168,13 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Printf("etcdhttp: %v", err) } case "POST": - ctype := r.Header.Get("Content-Type") - if ctype != "application/json" { - writeError(w, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype))) - return - } - b, err := ioutil.ReadAll(r.Body) - if err != nil { - writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) - return - } req := httptypes.MemberCreateRequest{} - if err := json.Unmarshal(b, &req); err != nil { - writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) + if ok := unmarshalRequest(r, &req, w); !ok { return } - now := h.clock.Now() m := etcdserver.NewMember("", req.PeerURLs, "", &now) - err = h.server.AddMember(ctx, *m) + err := h.server.AddMember(ctx, *m) switch { case err == etcdserver.ErrIDExists || err == etcdserver.ErrPeerURLexists: writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error())) @@ -203,28 +191,47 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Printf("etcdhttp: %v", err) } case "DELETE": - idStr := trimPrefix(r.URL.Path, membersPrefix) - if idStr == "" { - http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + id, ok := getID(r.URL.Path, w) + if !ok { return } - id, err := types.IDFromString(idStr) - if err != nil { - writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) - return - } - err = h.server.RemoveMember(ctx, uint64(id)) + err := h.server.RemoveMember(ctx, uint64(id)) switch { case err == etcdserver.ErrIDRemoved: - writeError(w, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", idStr))) + writeError(w, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id))) case err == etcdserver.ErrIDNotFound: - writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) + writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) case err != nil: log.Printf("etcdhttp: error removing node %s: %v", id, err) writeError(w, err) default: w.WriteHeader(http.StatusNoContent) } + case "PUT": + id, ok := getID(r.URL.Path, w) + if !ok { + return + } + req := httptypes.MemberUpdateRequest{} + if ok := unmarshalRequest(r, &req, w); !ok { + return + } + m := etcdserver.Member{ + ID: id, + RaftAttributes: etcdserver.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()}, + } + err := h.server.UpdateMember(ctx, m) + switch { + case err == etcdserver.ErrPeerURLexists: + writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error())) + case err == etcdserver.ErrIDNotFound: + writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) + case err != nil: + log.Printf("etcdhttp: error updating node %s: %v", m.ID, err) + writeError(w, err) + default: + w.WriteHeader(http.StatusNoContent) + } } } @@ -506,6 +513,38 @@ func trimErrorPrefix(err error, prefix string) error { return err } +func unmarshalRequest(r *http.Request, req json.Unmarshaler, w http.ResponseWriter) bool { + ctype := r.Header.Get("Content-Type") + if ctype != "application/json" { + writeError(w, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype))) + return false + } + b, err := ioutil.ReadAll(r.Body) + if err != nil { + writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) + return false + } + if err := req.UnmarshalJSON(b); err != nil { + writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) + return false + } + return true +} + +func getID(p string, w http.ResponseWriter) (types.ID, bool) { + idStr := trimPrefix(p, membersPrefix) + if idStr == "" { + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return 0, false + } + id, err := types.IDFromString(idStr) + if err != nil { + writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) + return 0, false + } + return id, true +} + // getUint64 extracts a uint64 by the given key from a Form. If the key does // not exist in the form, 0 is returned. If the key exists but the value is // badly formed, an error is returned. If multiple values are present only the diff --git a/etcdserver/etcdhttp/client_test.go b/etcdserver/etcdhttp/client_test.go index 78e0a4e0a..bf4f21e6c 100644 --- a/etcdserver/etcdhttp/client_test.go +++ b/etcdserver/etcdhttp/client_test.go @@ -111,6 +111,11 @@ func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error { return nil } +func (s *serverRecorder) UpdateMember(_ context.Context, m etcdserver.Member) error { + s.actions = append(s.actions, action{name: "UpdateMember", params: []interface{}{m}}) + return nil +} + type action struct { name string params []interface{} @@ -136,11 +141,12 @@ type resServer struct { func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) { return rs.res, nil } -func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } -func (rs *resServer) Start() {} -func (rs *resServer) Stop() {} -func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil } -func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil } +func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil } +func (rs *resServer) Start() {} +func (rs *resServer) Stop() {} +func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil } +func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil } +func (rs *resServer) UpdateMember(_ context.Context, _ etcdserver.Member) error { return nil } func boolp(b bool) *bool { return &b } @@ -698,6 +704,48 @@ func TestServeMembersDelete(t *testing.T) { } } +func TestServeMembersUpdate(t *testing.T) { + u := mustNewURL(t, path.Join(membersPrefix, "1")) + b := []byte(`{"peerURLs":["http://127.0.0.1:1"]}`) + req, err := http.NewRequest("PUT", u.String(), bytes.NewReader(b)) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Content-Type", "application/json") + s := &serverRecorder{} + h := &membersHandler{ + server: s, + clock: clockwork.NewFakeClock(), + clusterInfo: &fakeCluster{id: 1}, + } + rw := httptest.NewRecorder() + + h.ServeHTTP(rw, req) + + wcode := http.StatusNoContent + if rw.Code != wcode { + t.Errorf("code=%d, want %d", rw.Code, wcode) + } + + gcid := rw.Header().Get("X-Etcd-Cluster-ID") + wcid := h.clusterInfo.ID().String() + if gcid != wcid { + t.Errorf("cid = %s, want %s", gcid, wcid) + } + + wm := etcdserver.Member{ + ID: 1, + RaftAttributes: etcdserver.RaftAttributes{ + PeerURLs: []string{"http://127.0.0.1:1"}, + }, + } + + wactions := []action{{name: "UpdateMember", params: []interface{}{wm}}} + if !reflect.DeepEqual(s.actions, wactions) { + t.Errorf("actions = %+v, want %+v", s.actions, wactions) + } +} + func TestServeMembersFail(t *testing.T) { tests := []struct { req *http.Request @@ -855,6 +903,104 @@ func TestServeMembersFail(t *testing.T) { }, nil, + http.StatusMethodNotAllowed, + }, + { + // parse body error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader("bad json")), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &resServer{}, + + http.StatusBadRequest, + }, + { + // bad content type + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/bad"}}, + }, + &errServer{}, + + http.StatusUnsupportedMediaType, + }, + { + // bad url + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://a"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{}, + + http.StatusBadRequest, + }, + { + // etcdserver.UpdateMember error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{ + errors.New("blah"), + }, + + http.StatusInternalServerError, + }, + { + // etcdserver.UpdateMember error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{ + etcdserver.ErrPeerURLexists, + }, + + http.StatusConflict, + }, + { + // etcdserver.UpdateMember error + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "0")), + Method: "PUT", + Body: ioutil.NopCloser(strings.NewReader(`{"PeerURLs": ["http://127.0.0.1:1"]}`)), + Header: map[string][]string{"Content-Type": []string{"application/json"}}, + }, + &errServer{ + etcdserver.ErrIDNotFound, + }, + + http.StatusNotFound, + }, + { + // etcdserver.UpdateMember error with badly formed ID + &http.Request{ + URL: mustNewURL(t, path.Join(membersPrefix, "bad_id")), + Method: "PUT", + }, + nil, + + http.StatusNotFound, + }, + { + // etcdserver.UpdateMember with no ID + &http.Request{ + URL: mustNewURL(t, membersPrefix), + Method: "PUT", + }, + nil, + http.StatusMethodNotAllowed, }, } @@ -995,6 +1141,43 @@ func TestServeMachines(t *testing.T) { } } +func TestGetID(t *testing.T) { + tests := []struct { + path string + + wok bool + wid types.ID + wcode int + }{ + { + "123", + true, 0x123, http.StatusOK, + }, + { + "bad_id", + false, 0, http.StatusNotFound, + }, + { + "", + false, 0, http.StatusMethodNotAllowed, + }, + } + + for i, tt := range tests { + w := httptest.NewRecorder() + id, ok := getID(tt.path, w) + if id != tt.wid { + t.Errorf("#%d: id = %d, want %d", i, id, tt.wid) + } + if ok != tt.wok { + t.Errorf("#%d: ok = %t, want %t", i, ok, tt.wok) + } + if w.Code != tt.wcode { + t.Errorf("#%d code = %d, want %d", i, w.Code, tt.wcode) + } + } +} + type dummyStats struct { data []byte } diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 476a10725..9813ed47d 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -79,6 +79,9 @@ func (fs *errServer) AddMember(ctx context.Context, m etcdserver.Member) error { func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error { return fs.err } +func (fs *errServer) UpdateMember(ctx context.Context, m etcdserver.Member) error { + return fs.err +} func TestWriteError(t *testing.T) { // nil error should not panic diff --git a/etcdserver/etcdhttp/httptypes/member.go b/etcdserver/etcdhttp/httptypes/member.go index 6d8554ed0..a71457bcb 100644 --- a/etcdserver/etcdhttp/httptypes/member.go +++ b/etcdserver/etcdhttp/httptypes/member.go @@ -33,6 +33,10 @@ type MemberCreateRequest struct { PeerURLs types.URLs } +type MemberUpdateRequest struct { + MemberCreateRequest +} + func (m *MemberCreateRequest) MarshalJSON() ([]byte, error) { s := struct { PeerURLs []string `json:"peerURLs"` diff --git a/etcdserver/sender.go b/etcdserver/sender.go index dbe846f76..b2512057c 100644 --- a/etcdserver/sender.go +++ b/etcdserver/sender.go @@ -21,6 +21,9 @@ import ( "fmt" "log" "net/http" + "net/url" + "path" + "sync" "time" "github.com/coreos/etcd/etcdserver/stats" @@ -108,12 +111,30 @@ func (h *sendHub) Remove(id types.ID) { delete(h.senders, id) } +func (h *sendHub) Update(m *Member) { + // TODO: return error or just panic? + if _, ok := h.senders[m.ID]; !ok { + return + } + peerURL := m.PickPeerURL() + u, err := url.Parse(peerURL) + if err != nil { + log.Panicf("unexpect peer url %s", peerURL) + } + u.Path = path.Join(u.Path, raftPrefix) + s := h.senders[m.ID] + s.mu.Lock() + defer s.mu.Unlock() + s.u = u.String() +} + type sender struct { u string cid types.ID c *http.Client fs *stats.FollowerStats q chan []byte + mu sync.RWMutex } func newSender(u string, cid types.ID, c *http.Client, fs *stats.FollowerStats) *sender { @@ -159,7 +180,9 @@ func (s *sender) handle() { // post POSTs a data payload to a url. Returns nil if the POST succeeds, // error on any failure. func (s *sender) post(data []byte) error { + s.mu.RLock() req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data)) + s.mu.RUnlock() if err != nil { return fmt.Errorf("new request to %s error: %v", s.u, err) } diff --git a/etcdserver/server.go b/etcdserver/server.go index a33f3c227..19072c765 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -89,6 +89,7 @@ type Sender interface { Send(m []raftpb.Message) Add(m *Member) Remove(id types.ID) + Update(m *Member) Stop() } @@ -114,7 +115,7 @@ type Server interface { // Stop terminates the Server and performs any necessary finalization. // Do and Process cannot be called after Stop has been invoked. Stop() - // Do takes a request and attempts to fulfil it, returning a Response. + // Do takes a request and attempts to fulfill it, returning a Response. Do(ctx context.Context, r pb.Request) (Response, error) // Process takes a raft message and applies it to the server's raft state // machine, respecting any timeout of the given context. @@ -127,6 +128,10 @@ type Server interface { // return ErrIDRemoved if member ID is removed from the cluster, or return // ErrIDNotFound if member ID is not in the cluster. RemoveMember(ctx context.Context, id uint64) error + + // UpdateMember attempts to update a existing member in the cluster. It will + // return ErrIDNotFound if the member ID does not exist. + UpdateMember(ctx context.Context, updateMemb Member) error } type Stats interface { @@ -475,6 +480,20 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error { return s.configure(ctx, cc) } +func (s *EtcdServer) UpdateMember(ctx context.Context, memb Member) error { + b, err := json.Marshal(memb) + if err != nil { + return err + } + cc := raftpb.ConfChange{ + ID: GenID(), + Type: raftpb.ConfChangeUpdateNode, + NodeID: uint64(memb.ID), + Context: b, + } + return s.configure(ctx, cc) +} + // Implement the RaftTimer interface func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.raftIndex) @@ -672,6 +691,17 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error { s.Cluster.RemoveMember(id) s.sender.Remove(id) log.Printf("etcdserver: removed node %s from cluster %s", id, s.Cluster.ID()) + case raftpb.ConfChangeUpdateNode: + m := new(Member) + if err := json.Unmarshal(cc.Context, m); err != nil { + log.Panicf("unmarshal member should never fail: %v", err) + } + if cc.NodeID != uint64(m.ID) { + log.Panicf("nodeID should always be equal to member ID") + } + s.Cluster.UpdateMember(m) + s.sender.Update(m) + log.Printf("etcdserver: update node %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } return nil } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 5361b264e..566e8cc21 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -447,7 +447,7 @@ func TestApplyConfChangeError(t *testing.T) { }, { raftpb.ConfChange{ - Type: raftpb.ConfChangeRemoveNode, + Type: raftpb.ConfChangeUpdateNode, NodeID: 4, }, ErrIDRemoved, @@ -503,6 +503,7 @@ func (s *fakeSender) Send(msgs []raftpb.Message) { } } func (s *fakeSender) Add(m *Member) {} +func (s *fakeSender) Update(m *Member) {} func (s *fakeSender) Remove(id types.ID) {} func (s *fakeSender) Stop() {} @@ -1017,6 +1018,41 @@ func TestRemoveMember(t *testing.T) { } } +// TestUpdateMember tests RemoveMember can propose and perform node update. +func TestUpdateMember(t *testing.T) { + n := newNodeConfChangeCommitterRecorder() + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{ + RaftState: raft.StateLeader, + Nodes: []uint64{1234, 2345, 3456}, + }, + } + cl := newTestCluster([]*Member{{ID: 1234}}) + s := &EtcdServer{ + node: n, + store: &storeRecorder{}, + sender: &nopSender{}, + storage: &storageRecorder{}, + Cluster: cl, + } + s.start() + wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} + err := s.UpdateMember(context.TODO(), wm) + gaction := n.Action() + s.Stop() + + if err != nil { + t.Fatalf("UpdateMember error: %v", err) + } + wactions := []action{action{name: "ProposeConfChange:ConfChangeUpdateNode"}, action{name: "ApplyConfChange:ConfChangeUpdateNode"}} + if !reflect.DeepEqual(gaction, wactions) { + t.Errorf("action = %v, want %v", gaction, wactions) + } + if !reflect.DeepEqual(cl.Member(1234), &wm) { + t.Errorf("member = %v, want %v", cl.Member(1234), &wm) + } +} + // TODO: test server could stop itself when being removed // TODO: test wait trigger correctness in multi-server case @@ -1446,6 +1482,7 @@ type nopSender struct{} func (s *nopSender) Send(m []raftpb.Message) {} func (s *nopSender) Add(m *Member) {} func (s *nopSender) Remove(id types.ID) {} +func (s *nopSender) Update(m *Member) {} func (s *nopSender) Stop() {} func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer { diff --git a/raft/node.go b/raft/node.go index df299c56a..a6f269826 100644 --- a/raft/node.go +++ b/raft/node.go @@ -271,6 +271,8 @@ func (n *node) run(r *raft) { r.addNode(cc.NodeID) case pb.ConfChangeRemoveNode: r.removeNode(cc.NodeID) + case pb.ConfChangeUpdateNode: + r.resetPendingConf() default: panic("unexpected conf type") } diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index db726576c..03bdb8c83 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -120,15 +120,18 @@ type ConfChangeType int32 const ( ConfChangeAddNode ConfChangeType = 0 ConfChangeRemoveNode ConfChangeType = 1 + ConfChangeUpdateNode ConfChangeType = 2 ) var ConfChangeType_name = map[int32]string{ 0: "ConfChangeAddNode", 1: "ConfChangeRemoveNode", + 2: "ConfChangeUpdateNode", } var ConfChangeType_value = map[string]int32{ "ConfChangeAddNode": 0, "ConfChangeRemoveNode": 1, + "ConfChangeUpdateNode": 2, } func (x ConfChangeType) Enum() *ConfChangeType { diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index 47c9ec2ef..7b60393b8 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -60,6 +60,7 @@ message HardState { enum ConfChangeType { ConfChangeAddNode = 0; ConfChangeRemoveNode = 1; + ConfChangeUpdateNode = 2; } message ConfChange {