diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index baa6e7d28..ca35b3b89 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -31,6 +31,7 @@ import ( etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/pkg/flags" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/store" ) @@ -239,6 +240,27 @@ func (c *Cluster) SetID(id types.ID) { c.id = id } func (c *Cluster) SetStore(st store.Store) { c.store = st } +func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { + appliedMembers, appliedRemoved := membersFromStore(c.store) + + if appliedRemoved[types.ID(cc.NodeID)] { + return ErrIDRemoved + } + switch cc.Type { + case raftpb.ConfChangeAddNode: + if appliedMembers[types.ID(cc.NodeID)] != nil { + return ErrIDExists + } + case raftpb.ConfChangeRemoveNode: + if appliedMembers[types.ID(cc.NodeID)] == nil { + return ErrIDNotFound + } + default: + log.Panicf("ConfChange type should be either AddNode or RemoveNode") + } + return nil +} + // AddMember puts a new Member into the store. // A Member with a matching id must not exist. func (c *Cluster) AddMember(m *Member) { diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 7d016b196..96ea42cbf 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -93,13 +93,11 @@ func TestClusterFromStore(t *testing.T) { }, } for i, tt := range tests { - st := store.New() hc := newTestCluster(nil) - hc.SetStore(st) for _, m := range tt.mems { hc.AddMember(&m) } - c := NewClusterFromStore("abc", st) + c := NewClusterFromStore("abc", hc.store) if c.token != "abc" { t.Errorf("#%d: token = %v, want %v", i, c.token, "abc") } @@ -535,8 +533,9 @@ func TestNodeToMember(t *testing.T) { func newTestCluster(membs []Member) *Cluster { c := &Cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)} - for i, m := range membs { - c.members[m.ID] = &membs[i] + c.store = store.New() + for i := range membs { + c.AddMember(&membs[i]) } return c } diff --git a/etcdserver/member.go b/etcdserver/member.go index 39d7954a9..cce6a5a58 100644 --- a/etcdserver/member.go +++ b/etcdserver/member.go @@ -45,7 +45,6 @@ type Member struct { ID types.ID `json:"id"` RaftAttributes Attributes - verified bool } // newMember creates a Member without an ID and generates one based on the diff --git a/etcdserver/server.go b/etcdserver/server.go index 584f269ee..0dbe65a3a 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -328,7 +328,7 @@ func (s *EtcdServer) run() { // race them. // TODO: apply configuration change into ClusterStore. if len(rd.CommittedEntries) != 0 { - appliedi = s.apply(rd.CommittedEntries, nodes) + appliedi = s.apply(rd.CommittedEntries) } if rd.Snapshot.Index > snapi { @@ -559,7 +559,7 @@ func getExpirationTime(r *pb.Request) time.Time { return t } -func (s *EtcdServer) apply(es []raftpb.Entry, nodes []uint64) uint64 { +func (s *EtcdServer) apply(es []raftpb.Entry) uint64 { var applied uint64 for i := range es { e := es[i] @@ -571,7 +571,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, nodes []uint64) uint64 { case raftpb.EntryConfChange: var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) - s.w.Trigger(cc.ID, s.applyConfChange(cc, nodes)) + s.w.Trigger(cc.ID, s.applyConfChange(cc)) default: log.Panicf("entry type should be either EntryNormal or EntryConfChange") } @@ -633,8 +633,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { } } -func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error { - if err := s.checkConfChange(cc, nodes); err != nil { +func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error { + if err := s.Cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.node.ApplyConfChange(cc) return err @@ -659,25 +659,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error return nil } -func (s *EtcdServer) checkConfChange(cc raftpb.ConfChange, nodes []uint64) error { - if s.Cluster.IsIDRemoved(types.ID(cc.NodeID)) { - return ErrIDRemoved - } - switch cc.Type { - case raftpb.ConfChangeAddNode: - if containsUint64(nodes, cc.NodeID) { - return ErrIDExists - } - case raftpb.ConfChangeRemoveNode: - if !containsUint64(nodes, cc.NodeID) { - return ErrIDNotFound - } - default: - log.Panicf("ConfChange type should be either AddNode or RemoveNode") - } - return nil -} - // TODO: non-blocking snapshot func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) { d, err := s.store.Save() diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index d621b86ce..995bce519 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -421,8 +421,13 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { // TODO: test ErrIDRemoved func TestApplyConfChangeError(t *testing.T) { - nodes := []uint64{1, 2, 3} - removed := map[types.ID]bool{4: true} + cl := newCluster("") + cl.SetStore(store.New()) + for i := 1; i <= 4; i++ { + cl.AddMember(&Member{ID: types.ID(i)}) + } + cl.RemoveMember(4) + tests := []struct { cc raftpb.ConfChange werr error @@ -458,12 +463,11 @@ func TestApplyConfChangeError(t *testing.T) { } for i, tt := range tests { n := &nodeRecorder{} - cl := &Cluster{removed: removed} srv := &EtcdServer{ node: n, Cluster: cl, } - err := srv.applyConfChange(tt.cc, nodes) + err := srv.applyConfChange(tt.cc) if err != tt.werr { t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr) } @@ -506,11 +510,12 @@ func testServer(t *testing.T, ns uint64) { n := raft.StartNode(id, members, 10, 1) tk := time.NewTicker(10 * time.Millisecond) defer tk.Stop() + st := store.New() cl := newCluster("abc") - cl.SetStore(&storeRecorder{}) + cl.SetStore(st) srv := &EtcdServer{ node: n, - store: store.New(), + store: st, send: send, storage: &storageRecorder{}, Ticker: tk.C, @@ -536,8 +541,8 @@ func testServer(t *testing.T, ns uint64) { g, w := resp.Event.Node, &store.NodeExtern{ Key: "/foo", - ModifiedIndex: uint64(i), - CreatedIndex: uint64(i), + ModifiedIndex: uint64(i) + 2*ns, + CreatedIndex: uint64(i) + 2*ns, Value: stringp("bar"), } @@ -576,7 +581,7 @@ func TestDoProposal(t *testing.T) { // this makes <-tk always successful, which accelerates internal clock close(tk) cl := newCluster("abc") - cl.SetStore(&storeRecorder{}) + cl.SetStore(store.New()) srv := &EtcdServer{ node: n, store: st, @@ -833,13 +838,15 @@ func TestTriggerSnap(t *testing.T) { n.Campaign(ctx) st := &storeRecorder{} p := &storageRecorder{} + cl := newCluster("abc") + cl.SetStore(store.New()) s := &EtcdServer{ store: st, send: func(_ []raftpb.Message) {}, storage: p, node: n, snapCount: 10, - Cluster: &Cluster{}, + Cluster: cl, } s.start() @@ -928,7 +935,7 @@ func TestAddMember(t *testing.T) { }, } cl := newTestCluster(nil) - cl.SetStore(&storeRecorder{}) + cl.SetStore(store.New()) s := &EtcdServer{ node: n, store: &storeRecorder{}, @@ -964,7 +971,6 @@ func TestRemoveMember(t *testing.T) { }, } cl := newTestCluster([]Member{{ID: 1234}}) - cl.SetStore(&storeRecorder{}) s := &EtcdServer{ node: n, store: &storeRecorder{},