Merge pull request #2556 from yichengq/fix-apply-conf

etcdserver: not apply stale conf change
release-2.1
Yicheng Qin 2015-03-27 14:00:30 -07:00
commit dd92a2b484
5 changed files with 110 additions and 49 deletions

View File

@ -30,6 +30,7 @@ import (
"github.com/coreos/etcd/pkg/netutil" "github.com/coreos/etcd/pkg/netutil"
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
) )
@ -66,6 +67,11 @@ type Cluster struct {
// TODO: upgrade it as last modified index // TODO: upgrade it as last modified index
index uint64 index uint64
// transport and members maintains the view of the cluster at index.
// This might be more up to date than what stores in the store since
// the index may be higher than store index, which may happen when the
// cluster is updated from remote cluster info.
transport rafthttp.Transporter
sync.Mutex // guards members and removed map sync.Mutex // guards members and removed map
members map[types.ID]*Member members map[types.ID]*Member
// removed contains the ids of removed members in the cluster. // removed contains the ids of removed members in the cluster.
@ -240,6 +246,19 @@ func (c *Cluster) UpdateIndex(index uint64) { c.index = index }
func (c *Cluster) Recover() { func (c *Cluster) Recover() {
c.members, c.removed = membersFromStore(c.store) c.members, c.removed = membersFromStore(c.store)
// recover transport
c.transport.RemoveAllPeers()
for _, m := range c.Members() {
c.transport.AddPeer(m.ID, m.PeerURLs)
}
}
func (c *Cluster) SetTransport(tr rafthttp.Transporter) {
c.transport = tr
// add all the remote members into transport
for _, m := range c.Members() {
c.transport.AddPeer(m.ID, m.PeerURLs)
}
} }
// ValidateConfigurationChange takes a proposed ConfChange and // ValidateConfigurationChange takes a proposed ConfChange and
@ -305,7 +324,8 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
// AddMember adds a new Member into the cluster, and saves the given member's // AddMember adds a new Member into the cluster, and saves the given member's
// raftAttributes into the store. The given member should have empty attributes. // raftAttributes into the store. The given member should have empty attributes.
// A Member with a matching id must not exist. // A Member with a matching id must not exist.
func (c *Cluster) AddMember(m *Member) { // The given index indicates when the event happens.
func (c *Cluster) AddMember(m *Member, index uint64) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
b, err := json.Marshal(m.RaftAttributes) b, err := json.Marshal(m.RaftAttributes)
@ -316,22 +336,37 @@ func (c *Cluster) AddMember(m *Member) {
if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil { if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil {
log.Panicf("create raftAttributes should never fail: %v", err) log.Panicf("create raftAttributes should never fail: %v", err)
} }
c.members[m.ID] = m if index > c.index {
// TODO: check member does not exist in the cluster
// New bootstrapped member has initial cluster, which contains unadded
// peers.
c.members[m.ID] = m
c.transport.AddPeer(m.ID, m.PeerURLs)
c.index = index
}
} }
// RemoveMember removes a member from the store. // RemoveMember removes a member from the store.
// The given id MUST exist, or the function panics. // The given id MUST exist, or the function panics.
func (c *Cluster) RemoveMember(id types.ID) { // The given index indicates when the event happens.
func (c *Cluster) RemoveMember(id types.ID, index uint64) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil { if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
log.Panicf("delete member should never fail: %v", err) log.Panicf("delete member should never fail: %v", err)
} }
delete(c.members, id)
if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil { if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
log.Panicf("create removedMember should never fail: %v", err) log.Panicf("create removedMember should never fail: %v", err)
} }
c.removed[id] = true if index > c.index {
if _, ok := c.members[id]; !ok {
log.Panicf("member %s should exist in the cluster", id)
}
delete(c.members, id)
c.removed[id] = true
c.transport.RemovePeer(id)
c.index = index
}
} }
func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) { func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) {
@ -341,7 +376,9 @@ func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) {
// TODO: update store in this function // TODO: update store in this function
} }
func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { // UpdateRaftAttributes updates the raft attributes of the given id.
// The given index indicates when the event happens.
func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, index uint64) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
b, err := json.Marshal(raftAttr) b, err := json.Marshal(raftAttr)
@ -352,7 +389,11 @@ func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
if _, err := c.store.Update(p, string(b), store.Permanent); err != nil { if _, err := c.store.Update(p, string(b), store.Permanent); err != nil {
log.Panicf("update raftAttributes should never fail: %v", err) log.Panicf("update raftAttributes should never fail: %v", err)
} }
c.members[id].RaftAttributes = raftAttr if index > c.index {
c.members[id].RaftAttributes = raftAttr
c.transport.UpdatePeer(id, raftAttr.PeerURLs)
c.index = index
}
} }
// Validate ensures that there is no identical urls in the cluster peer list // Validate ensures that there is no identical urls in the cluster peer list

View File

@ -96,8 +96,9 @@ func TestClusterFromStore(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
hc := newTestCluster(nil) hc := newTestCluster(nil)
hc.SetStore(store.New()) hc.SetStore(store.New())
for _, m := range tt.mems { hc.SetTransport(&nopTransporter{})
hc.AddMember(m) for j, m := range tt.mems {
hc.AddMember(m, uint64(j))
} }
c := NewClusterFromStore("abc", hc.store) c := NewClusterFromStore("abc", hc.store)
if c.token != "abc" { if c.token != "abc" {
@ -357,11 +358,12 @@ func TestClusterValidateAndAssignIDs(t *testing.T) {
func TestClusterValidateConfigurationChange(t *testing.T) { func TestClusterValidateConfigurationChange(t *testing.T) {
cl := newCluster("") cl := newCluster("")
cl.SetStore(store.New()) cl.SetStore(store.New())
cl.SetTransport(&nopTransporter{})
for i := 1; i <= 4; i++ { for i := 1; i <= 4; i++ {
attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}} attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}}
cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr}) cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr}, uint64(i))
} }
cl.RemoveMember(4) cl.RemoveMember(4, 5)
attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}} attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}}
ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr})
@ -489,7 +491,8 @@ func TestClusterGenID(t *testing.T) {
previd := cs.ID() previd := cs.ID()
cs.SetStore(&storeRecorder{}) cs.SetStore(&storeRecorder{})
cs.AddMember(newTestMember(3, nil, "", nil)) cs.SetTransport(&nopTransporter{})
cs.AddMember(newTestMember(3, nil, "", nil), 1)
cs.genID() cs.genID()
if cs.ID() == previd { if cs.ID() == previd {
t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd) t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd)
@ -532,7 +535,8 @@ func TestClusterAddMember(t *testing.T) {
st := &storeRecorder{} st := &storeRecorder{}
c := newTestCluster(nil) c := newTestCluster(nil)
c.SetStore(st) c.SetStore(st)
c.AddMember(newTestMember(1, nil, "node1", nil)) c.SetTransport(&nopTransporter{})
c.AddMember(newTestMember(1, nil, "node1", nil), 1)
wactions := []testutil.Action{ wactions := []testutil.Action{
{ {
@ -617,10 +621,14 @@ func TestClusterString(t *testing.T) {
} }
func TestClusterRemoveMember(t *testing.T) { func TestClusterRemoveMember(t *testing.T) {
st := &storeRecorder{}
c := newTestCluster(nil) c := newTestCluster(nil)
c.SetStore(&storeRecorder{})
c.SetTransport(&nopTransporter{})
c.AddMember(newTestMember(1, nil, "", nil), 1)
st := &storeRecorder{}
c.SetStore(st) c.SetStore(st)
c.RemoveMember(1) c.RemoveMember(1, 2)
wactions := []testutil.Action{ wactions := []testutil.Action{
{Name: "Delete", Params: []interface{}{memberStoreKey(1), true, true}}, {Name: "Delete", Params: []interface{}{memberStoreKey(1), true, true}},

View File

@ -259,13 +259,8 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
} }
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats) tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
// add all the remote members into sendhub
for _, m := range cfg.Cluster.Members() {
if m.ID != id {
tr.AddPeer(m.ID, m.PeerURLs)
}
}
srv.r.transport = tr srv.r.transport = tr
srv.Cluster.SetTransport(tr)
return srv, nil return srv, nil
} }
@ -367,14 +362,6 @@ func (s *EtcdServer) run() {
// transport setting, which may block the communication. // transport setting, which may block the communication.
if s.Cluster.index < apply.snapshot.Metadata.Index { if s.Cluster.index < apply.snapshot.Metadata.Index {
s.Cluster.Recover() s.Cluster.Recover()
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
} }
appliedi = apply.snapshot.Metadata.Index appliedi = apply.snapshot.Metadata.Index
@ -672,7 +659,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
case raftpb.EntryConfChange: case raftpb.EntryConfChange:
var cc raftpb.ConfChange var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data) pbutil.MustUnmarshal(&cc, e.Data)
shouldstop, err = s.applyConfChange(cc, confState) shouldstop, err = s.applyConfChange(cc, confState, e.Index)
s.w.Trigger(cc.ID, err) s.w.Trigger(cc.ID, err)
default: default:
log.Panicf("entry type should be either EntryNormal or EntryConfChange") log.Panicf("entry type should be either EntryNormal or EntryConfChange")
@ -733,9 +720,9 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
} }
} }
// applyConfChange applies a ConfChange to the server. It is only // applyConfChange applies a ConfChange to the server at the given index. It is only
// invoked with a ConfChange that has already passed through Raft // invoked with a ConfChange that has already passed through Raft.
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) { func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, index uint64) (bool, error) {
if err := s.Cluster.ValidateConfigurationChange(cc); err != nil { if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
cc.NodeID = raft.None cc.NodeID = raft.None
s.r.ApplyConfChange(cc) s.r.ApplyConfChange(cc)
@ -751,20 +738,18 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
if cc.NodeID != uint64(m.ID) { if cc.NodeID != uint64(m.ID) {
log.Panicf("nodeID should always be equal to member ID") log.Panicf("nodeID should always be equal to member ID")
} }
s.Cluster.AddMember(m) s.Cluster.AddMember(m, index)
if m.ID == s.id { if m.ID == s.id {
log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
} else { } else {
s.r.transport.AddPeer(m.ID, m.PeerURLs)
log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
} }
case raftpb.ConfChangeRemoveNode: case raftpb.ConfChangeRemoveNode:
id := types.ID(cc.NodeID) id := types.ID(cc.NodeID)
s.Cluster.RemoveMember(id) s.Cluster.RemoveMember(id, index)
if id == s.id { if id == s.id {
return true, nil return true, nil
} else { } else {
s.r.transport.RemovePeer(id)
log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID()) log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
} }
case raftpb.ConfChangeUpdateNode: case raftpb.ConfChangeUpdateNode:
@ -775,11 +760,10 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
if cc.NodeID != uint64(m.ID) { if cc.NodeID != uint64(m.ID) {
log.Panicf("nodeID should always be equal to member ID") log.Panicf("nodeID should always be equal to member ID")
} }
s.Cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) s.Cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, index)
if m.ID == s.id { if m.ID == s.id {
log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
} else { } else {
s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
} }
} }

View File

@ -413,10 +413,11 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
func TestApplyConfChangeError(t *testing.T) { func TestApplyConfChangeError(t *testing.T) {
cl := newCluster("") cl := newCluster("")
cl.SetStore(store.New()) cl.SetStore(store.New())
cl.SetTransport(&nopTransporter{})
for i := 1; i <= 4; i++ { for i := 1; i <= 4; i++ {
cl.AddMember(&Member{ID: types.ID(i)}) cl.AddMember(&Member{ID: types.ID(i)}, uint64(i))
} }
cl.RemoveMember(4) cl.RemoveMember(4, 5)
tests := []struct { tests := []struct {
cc raftpb.ConfChange cc raftpb.ConfChange
@ -457,7 +458,7 @@ func TestApplyConfChangeError(t *testing.T) {
r: raftNode{Node: n}, r: raftNode{Node: n},
Cluster: cl, Cluster: cl,
} }
_, err := srv.applyConfChange(tt.cc, nil) _, err := srv.applyConfChange(tt.cc, nil, 10)
if err != tt.werr { if err != tt.werr {
t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr) t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
} }
@ -477,8 +478,9 @@ func TestApplyConfChangeError(t *testing.T) {
func TestApplyConfChangeShouldStop(t *testing.T) { func TestApplyConfChangeShouldStop(t *testing.T) {
cl := newCluster("") cl := newCluster("")
cl.SetStore(store.New()) cl.SetStore(store.New())
cl.SetTransport(&nopTransporter{})
for i := 1; i <= 3; i++ { for i := 1; i <= 3; i++ {
cl.AddMember(&Member{ID: types.ID(i)}) cl.AddMember(&Member{ID: types.ID(i)}, uint64(i))
} }
srv := &EtcdServer{ srv := &EtcdServer{
id: 1, id: 1,
@ -493,7 +495,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
NodeID: 2, NodeID: 2,
} }
// remove non-local member // remove non-local member
shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{}) shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{}, 10)
if err != nil { if err != nil {
t.Fatalf("unexpected error %v", err) t.Fatalf("unexpected error %v", err)
} }
@ -503,7 +505,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
// remove local member // remove local member
cc.NodeID = 1 cc.NodeID = 1
shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{}) shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{}, 10)
if err != nil { if err != nil {
t.Fatalf("unexpected error %v", err) t.Fatalf("unexpected error %v", err)
} }
@ -774,6 +776,7 @@ func TestRecvSnapshot(t *testing.T) {
p := &storageRecorder{} p := &storageRecorder{}
cl := newCluster("abc") cl := newCluster("abc")
cl.SetStore(store.New()) cl.SetStore(store.New())
cl.SetTransport(&nopTransporter{})
s := &EtcdServer{ s := &EtcdServer{
r: raftNode{ r: raftNode{
Node: n, Node: n,
@ -808,6 +811,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
st := &storeRecorder{} st := &storeRecorder{}
cl := newCluster("abc") cl := newCluster("abc")
cl.SetStore(store.New()) cl.SetStore(store.New())
cl.SetTransport(&nopTransporter{})
storage := raft.NewMemoryStorage() storage := raft.NewMemoryStorage()
s := &EtcdServer{ s := &EtcdServer{
r: raftNode{ r: raftNode{
@ -853,6 +857,7 @@ func TestAddMember(t *testing.T) {
cl := newTestCluster(nil) cl := newTestCluster(nil)
st := store.New() st := store.New()
cl.SetStore(st) cl.SetStore(st)
cl.SetTransport(&nopTransporter{})
s := &EtcdServer{ s := &EtcdServer{
r: raftNode{ r: raftNode{
Node: n, Node: n,
@ -891,7 +896,7 @@ func TestRemoveMember(t *testing.T) {
cl := newTestCluster(nil) cl := newTestCluster(nil)
st := store.New() st := store.New()
cl.SetStore(store.New()) cl.SetStore(store.New())
cl.AddMember(&Member{ID: 1234}) cl.SetTransport(&nopTransporter{})
s := &EtcdServer{ s := &EtcdServer{
r: raftNode{ r: raftNode{
Node: n, Node: n,
@ -904,6 +909,7 @@ func TestRemoveMember(t *testing.T) {
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
s.start() s.start()
s.AddMember(context.TODO(), Member{ID: 1234})
err := s.RemoveMember(context.TODO(), 1234) err := s.RemoveMember(context.TODO(), 1234)
gaction := n.Action() gaction := n.Action()
s.Stop() s.Stop()
@ -911,7 +917,12 @@ func TestRemoveMember(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("RemoveMember error: %v", err) t.Fatalf("RemoveMember error: %v", err)
} }
wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeRemoveNode"}, {Name: "ApplyConfChange:ConfChangeRemoveNode"}} wactions := []testutil.Action{
{Name: "ProposeConfChange:ConfChangeAddNode"},
{Name: "ApplyConfChange:ConfChangeAddNode"},
{Name: "ProposeConfChange:ConfChangeRemoveNode"},
{Name: "ApplyConfChange:ConfChangeRemoveNode"},
}
if !reflect.DeepEqual(gaction, wactions) { if !reflect.DeepEqual(gaction, wactions) {
t.Errorf("action = %v, want %v", gaction, wactions) t.Errorf("action = %v, want %v", gaction, wactions)
} }
@ -929,7 +940,7 @@ func TestUpdateMember(t *testing.T) {
cl := newTestCluster(nil) cl := newTestCluster(nil)
st := store.New() st := store.New()
cl.SetStore(st) cl.SetStore(st)
cl.AddMember(&Member{ID: 1234}) cl.SetTransport(&nopTransporter{})
s := &EtcdServer{ s := &EtcdServer{
r: raftNode{ r: raftNode{
Node: n, Node: n,
@ -942,6 +953,7 @@ func TestUpdateMember(t *testing.T) {
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
s.start() s.start()
s.AddMember(context.TODO(), Member{ID: 1234})
wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
err := s.UpdateMember(context.TODO(), wm) err := s.UpdateMember(context.TODO(), wm)
gaction := n.Action() gaction := n.Action()
@ -950,7 +962,12 @@ func TestUpdateMember(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("UpdateMember error: %v", err) t.Fatalf("UpdateMember error: %v", err)
} }
wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeUpdateNode"}, {Name: "ApplyConfChange:ConfChangeUpdateNode"}} wactions := []testutil.Action{
{Name: "ProposeConfChange:ConfChangeAddNode"},
{Name: "ApplyConfChange:ConfChangeAddNode"},
{Name: "ProposeConfChange:ConfChangeUpdateNode"},
{Name: "ApplyConfChange:ConfChangeUpdateNode"},
}
if !reflect.DeepEqual(gaction, wactions) { if !reflect.DeepEqual(gaction, wactions) {
t.Errorf("action = %v, want %v", gaction, wactions) t.Errorf("action = %v, want %v", gaction, wactions)
} }

View File

@ -136,6 +136,11 @@ func (t *transport) Stop() {
func (t *transport) AddPeer(id types.ID, us []string) { func (t *transport) AddPeer(id types.ID, us []string) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
// There is no need to build connection to itself because local message
// is not sent through transport.
if id == t.id {
return
}
if _, ok := t.peers[id]; ok { if _, ok := t.peers[id]; ok {
return return
} }
@ -150,6 +155,9 @@ func (t *transport) AddPeer(id types.ID, us []string) {
func (t *transport) RemovePeer(id types.ID) { func (t *transport) RemovePeer(id types.ID) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
if id == t.id {
return
}
t.removePeer(id) t.removePeer(id)
} }
@ -175,6 +183,9 @@ func (t *transport) removePeer(id types.ID) {
func (t *transport) UpdatePeer(id types.ID, us []string) { func (t *transport) UpdatePeer(id types.ID, us []string) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
if id == t.id {
return
}
// TODO: return error or just panic? // TODO: return error or just panic?
if _, ok := t.peers[id]; !ok { if _, ok := t.peers[id]; !ok {
return return