From 40197f06987aac9c3a539e9022ad1f1e573326e7 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 23 Mar 2015 23:19:38 -0700 Subject: [PATCH] etcdserver: not apply stale conf change in cluster and transport --- etcdserver/cluster.go | 55 +++++++++++++++++++++++++++++++++----- etcdserver/cluster_test.go | 24 +++++++++++------ etcdserver/server.go | 32 ++++++---------------- etcdserver/server_test.go | 37 ++++++++++++++++++------- rafthttp/transport.go | 11 ++++++++ 5 files changed, 110 insertions(+), 49 deletions(-) diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index 1b94342e0..cdd2284d3 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -30,6 +30,7 @@ import ( "github.com/coreos/etcd/pkg/netutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/store" ) @@ -66,6 +67,11 @@ type Cluster struct { // TODO: upgrade it as last modified index index uint64 + // transport and members maintains the view of the cluster at index. + // This might be more up to date than what stores in the store since + // the index may be higher than store index, which may happen when the + // cluster is updated from remote cluster info. + transport rafthttp.Transporter sync.Mutex // guards members and removed map members map[types.ID]*Member // removed contains the ids of removed members in the cluster. @@ -240,6 +246,19 @@ func (c *Cluster) UpdateIndex(index uint64) { c.index = index } func (c *Cluster) Recover() { c.members, c.removed = membersFromStore(c.store) + // recover transport + c.transport.RemoveAllPeers() + for _, m := range c.Members() { + c.transport.AddPeer(m.ID, m.PeerURLs) + } +} + +func (c *Cluster) SetTransport(tr rafthttp.Transporter) { + c.transport = tr + // add all the remote members into transport + for _, m := range c.Members() { + c.transport.AddPeer(m.ID, m.PeerURLs) + } } // ValidateConfigurationChange takes a proposed ConfChange and @@ -305,7 +324,8 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { // AddMember adds a new Member into the cluster, and saves the given member's // raftAttributes into the store. The given member should have empty attributes. // A Member with a matching id must not exist. -func (c *Cluster) AddMember(m *Member) { +// The given index indicates when the event happens. +func (c *Cluster) AddMember(m *Member, index uint64) { c.Lock() defer c.Unlock() b, err := json.Marshal(m.RaftAttributes) @@ -316,22 +336,37 @@ func (c *Cluster) AddMember(m *Member) { if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil { log.Panicf("create raftAttributes should never fail: %v", err) } - c.members[m.ID] = m + if index > c.index { + // TODO: check member does not exist in the cluster + // New bootstrapped member has initial cluster, which contains unadded + // peers. + c.members[m.ID] = m + c.transport.AddPeer(m.ID, m.PeerURLs) + c.index = index + } } // RemoveMember removes a member from the store. // The given id MUST exist, or the function panics. -func (c *Cluster) RemoveMember(id types.ID) { +// The given index indicates when the event happens. +func (c *Cluster) RemoveMember(id types.ID, index uint64) { c.Lock() defer c.Unlock() if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil { log.Panicf("delete member should never fail: %v", err) } - delete(c.members, id) if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil { log.Panicf("create removedMember should never fail: %v", err) } - c.removed[id] = true + if index > c.index { + if _, ok := c.members[id]; !ok { + log.Panicf("member %s should exist in the cluster", id) + } + delete(c.members, id) + c.removed[id] = true + c.transport.RemovePeer(id) + c.index = index + } } func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) { @@ -341,7 +376,9 @@ func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) { // TODO: update store in this function } -func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { +// UpdateRaftAttributes updates the raft attributes of the given id. +// The given index indicates when the event happens. +func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, index uint64) { c.Lock() defer c.Unlock() b, err := json.Marshal(raftAttr) @@ -352,7 +389,11 @@ func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { if _, err := c.store.Update(p, string(b), store.Permanent); err != nil { log.Panicf("update raftAttributes should never fail: %v", err) } - c.members[id].RaftAttributes = raftAttr + if index > c.index { + c.members[id].RaftAttributes = raftAttr + c.transport.UpdatePeer(id, raftAttr.PeerURLs) + c.index = index + } } // Validate ensures that there is no identical urls in the cluster peer list diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 7b3ec4732..9af9ce175 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -96,8 +96,9 @@ func TestClusterFromStore(t *testing.T) { for i, tt := range tests { hc := newTestCluster(nil) hc.SetStore(store.New()) - for _, m := range tt.mems { - hc.AddMember(m) + hc.SetTransport(&nopTransporter{}) + for j, m := range tt.mems { + hc.AddMember(m, uint64(j)) } c := NewClusterFromStore("abc", hc.store) if c.token != "abc" { @@ -357,11 +358,12 @@ func TestClusterValidateAndAssignIDs(t *testing.T) { func TestClusterValidateConfigurationChange(t *testing.T) { cl := newCluster("") cl.SetStore(store.New()) + cl.SetTransport(&nopTransporter{}) for i := 1; i <= 4; i++ { attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}} - cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr}) + cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr}, uint64(i)) } - cl.RemoveMember(4) + cl.RemoveMember(4, 5) attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}} ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) @@ -489,7 +491,8 @@ func TestClusterGenID(t *testing.T) { previd := cs.ID() cs.SetStore(&storeRecorder{}) - cs.AddMember(newTestMember(3, nil, "", nil)) + cs.SetTransport(&nopTransporter{}) + cs.AddMember(newTestMember(3, nil, "", nil), 1) cs.genID() if cs.ID() == previd { t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd) @@ -532,7 +535,8 @@ func TestClusterAddMember(t *testing.T) { st := &storeRecorder{} c := newTestCluster(nil) c.SetStore(st) - c.AddMember(newTestMember(1, nil, "node1", nil)) + c.SetTransport(&nopTransporter{}) + c.AddMember(newTestMember(1, nil, "node1", nil), 1) wactions := []testutil.Action{ { @@ -617,10 +621,14 @@ func TestClusterString(t *testing.T) { } func TestClusterRemoveMember(t *testing.T) { - st := &storeRecorder{} c := newTestCluster(nil) + c.SetStore(&storeRecorder{}) + c.SetTransport(&nopTransporter{}) + c.AddMember(newTestMember(1, nil, "", nil), 1) + + st := &storeRecorder{} c.SetStore(st) - c.RemoveMember(1) + c.RemoveMember(1, 2) wactions := []testutil.Action{ {Name: "Delete", Params: []interface{}{memberStoreKey(1), true, true}}, diff --git a/etcdserver/server.go b/etcdserver/server.go index 033aab3d6..ba25755c0 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -258,13 +258,8 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { } tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats) - // add all the remote members into sendhub - for _, m := range cfg.Cluster.Members() { - if m.ID != id { - tr.AddPeer(m.ID, m.PeerURLs) - } - } srv.r.transport = tr + srv.Cluster.SetTransport(tr) return srv, nil } @@ -366,14 +361,6 @@ func (s *EtcdServer) run() { // transport setting, which may block the communication. if s.Cluster.index < apply.snapshot.Metadata.Index { s.Cluster.Recover() - // recover raft transport - s.r.transport.RemoveAllPeers() - for _, m := range s.Cluster.Members() { - if m.ID == s.ID() { - continue - } - s.r.transport.AddPeer(m.ID, m.PeerURLs) - } } appliedi = apply.snapshot.Metadata.Index @@ -671,7 +658,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint case raftpb.EntryConfChange: var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) - shouldstop, err = s.applyConfChange(cc, confState) + shouldstop, err = s.applyConfChange(cc, confState, e.Index) s.w.Trigger(cc.ID, err) default: log.Panicf("entry type should be either EntryNormal or EntryConfChange") @@ -732,9 +719,9 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response { } } -// applyConfChange applies a ConfChange to the server. It is only -// invoked with a ConfChange that has already passed through Raft -func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) { +// applyConfChange applies a ConfChange to the server at the given index. It is only +// invoked with a ConfChange that has already passed through Raft. +func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, index uint64) (bool, error) { if err := s.Cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.r.ApplyConfChange(cc) @@ -750,20 +737,18 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if cc.NodeID != uint64(m.ID) { log.Panicf("nodeID should always be equal to member ID") } - s.Cluster.AddMember(m) + s.Cluster.AddMember(m, index) if m.ID == s.id { log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } else { - s.r.transport.AddPeer(m.ID, m.PeerURLs) log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } case raftpb.ConfChangeRemoveNode: id := types.ID(cc.NodeID) - s.Cluster.RemoveMember(id) + s.Cluster.RemoveMember(id, index) if id == s.id { return true, nil } else { - s.r.transport.RemovePeer(id) log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID()) } case raftpb.ConfChangeUpdateNode: @@ -774,11 +759,10 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if cc.NodeID != uint64(m.ID) { log.Panicf("nodeID should always be equal to member ID") } - s.Cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) + s.Cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, index) if m.ID == s.id { log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } else { - s.r.transport.UpdatePeer(m.ID, m.PeerURLs) log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index a7cd4a42b..130f8ffd0 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -413,10 +413,11 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { func TestApplyConfChangeError(t *testing.T) { cl := newCluster("") cl.SetStore(store.New()) + cl.SetTransport(&nopTransporter{}) for i := 1; i <= 4; i++ { - cl.AddMember(&Member{ID: types.ID(i)}) + cl.AddMember(&Member{ID: types.ID(i)}, uint64(i)) } - cl.RemoveMember(4) + cl.RemoveMember(4, 5) tests := []struct { cc raftpb.ConfChange @@ -457,7 +458,7 @@ func TestApplyConfChangeError(t *testing.T) { r: raftNode{Node: n}, Cluster: cl, } - _, err := srv.applyConfChange(tt.cc, nil) + _, err := srv.applyConfChange(tt.cc, nil, 10) if err != tt.werr { t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr) } @@ -477,8 +478,9 @@ func TestApplyConfChangeError(t *testing.T) { func TestApplyConfChangeShouldStop(t *testing.T) { cl := newCluster("") cl.SetStore(store.New()) + cl.SetTransport(&nopTransporter{}) for i := 1; i <= 3; i++ { - cl.AddMember(&Member{ID: types.ID(i)}) + cl.AddMember(&Member{ID: types.ID(i)}, uint64(i)) } srv := &EtcdServer{ id: 1, @@ -493,7 +495,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { NodeID: 2, } // remove non-local member - shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{}) + shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{}, 10) if err != nil { t.Fatalf("unexpected error %v", err) } @@ -503,7 +505,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { // remove local member cc.NodeID = 1 - shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{}) + shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{}, 10) if err != nil { t.Fatalf("unexpected error %v", err) } @@ -774,6 +776,7 @@ func TestRecvSnapshot(t *testing.T) { p := &storageRecorder{} cl := newCluster("abc") cl.SetStore(store.New()) + cl.SetTransport(&nopTransporter{}) s := &EtcdServer{ r: raftNode{ Node: n, @@ -808,6 +811,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) { st := &storeRecorder{} cl := newCluster("abc") cl.SetStore(store.New()) + cl.SetTransport(&nopTransporter{}) storage := raft.NewMemoryStorage() s := &EtcdServer{ r: raftNode{ @@ -853,6 +857,7 @@ func TestAddMember(t *testing.T) { cl := newTestCluster(nil) st := store.New() cl.SetStore(st) + cl.SetTransport(&nopTransporter{}) s := &EtcdServer{ r: raftNode{ Node: n, @@ -891,7 +896,7 @@ func TestRemoveMember(t *testing.T) { cl := newTestCluster(nil) st := store.New() cl.SetStore(store.New()) - cl.AddMember(&Member{ID: 1234}) + cl.SetTransport(&nopTransporter{}) s := &EtcdServer{ r: raftNode{ Node: n, @@ -904,6 +909,7 @@ func TestRemoveMember(t *testing.T) { reqIDGen: idutil.NewGenerator(0, time.Time{}), } s.start() + s.AddMember(context.TODO(), Member{ID: 1234}) err := s.RemoveMember(context.TODO(), 1234) gaction := n.Action() s.Stop() @@ -911,7 +917,12 @@ func TestRemoveMember(t *testing.T) { if err != nil { t.Fatalf("RemoveMember error: %v", err) } - wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeRemoveNode"}, {Name: "ApplyConfChange:ConfChangeRemoveNode"}} + wactions := []testutil.Action{ + {Name: "ProposeConfChange:ConfChangeAddNode"}, + {Name: "ApplyConfChange:ConfChangeAddNode"}, + {Name: "ProposeConfChange:ConfChangeRemoveNode"}, + {Name: "ApplyConfChange:ConfChangeRemoveNode"}, + } if !reflect.DeepEqual(gaction, wactions) { t.Errorf("action = %v, want %v", gaction, wactions) } @@ -929,7 +940,7 @@ func TestUpdateMember(t *testing.T) { cl := newTestCluster(nil) st := store.New() cl.SetStore(st) - cl.AddMember(&Member{ID: 1234}) + cl.SetTransport(&nopTransporter{}) s := &EtcdServer{ r: raftNode{ Node: n, @@ -942,6 +953,7 @@ func TestUpdateMember(t *testing.T) { reqIDGen: idutil.NewGenerator(0, time.Time{}), } s.start() + s.AddMember(context.TODO(), Member{ID: 1234}) wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} err := s.UpdateMember(context.TODO(), wm) gaction := n.Action() @@ -950,7 +962,12 @@ func TestUpdateMember(t *testing.T) { if err != nil { t.Fatalf("UpdateMember error: %v", err) } - wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeUpdateNode"}, {Name: "ApplyConfChange:ConfChangeUpdateNode"}} + wactions := []testutil.Action{ + {Name: "ProposeConfChange:ConfChangeAddNode"}, + {Name: "ApplyConfChange:ConfChangeAddNode"}, + {Name: "ProposeConfChange:ConfChangeUpdateNode"}, + {Name: "ApplyConfChange:ConfChangeUpdateNode"}, + } if !reflect.DeepEqual(gaction, wactions) { t.Errorf("action = %v, want %v", gaction, wactions) } diff --git a/rafthttp/transport.go b/rafthttp/transport.go index aaff28c22..9af0ef15d 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -136,6 +136,11 @@ func (t *transport) Stop() { func (t *transport) AddPeer(id types.ID, us []string) { t.mu.Lock() defer t.mu.Unlock() + // There is no need to build connection to itself because local message + // is not sent through transport. + if id == t.id { + return + } if _, ok := t.peers[id]; ok { return } @@ -150,6 +155,9 @@ func (t *transport) AddPeer(id types.ID, us []string) { func (t *transport) RemovePeer(id types.ID) { t.mu.Lock() defer t.mu.Unlock() + if id == t.id { + return + } t.removePeer(id) } @@ -175,6 +183,9 @@ func (t *transport) removePeer(id types.ID) { func (t *transport) UpdatePeer(id types.ID, us []string) { t.mu.Lock() defer t.mu.Unlock() + if id == t.id { + return + } // TODO: return error or just panic? if _, ok := t.peers[id]; !ok { return