etcdserver: recover transport when recovering from a snapshot

release-2.0
Xiang Li 2015-02-12 10:48:06 -08:00
parent d7840b75c3
commit c16cc3a6a3
3 changed files with 25 additions and 0 deletions

View File

@ -392,6 +392,16 @@ func (s *EtcdServer) run() {
log.Panicf("recovery store error: %v", err)
}
s.Cluster.Recover()
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.Cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
appliedi = rd.Snapshot.Metadata.Index
confState = rd.Snapshot.Metadata.ConfState
log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)

View File

@ -1393,6 +1393,7 @@ func (s *nopTransporter) Handler() http.Handler { return nil }
func (s *nopTransporter) Send(m []raftpb.Message) {}
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
func (s *nopTransporter) RemovePeer(id types.ID) {}
func (s *nopTransporter) RemoveAllPeers() {}
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
func (s *nopTransporter) Stop() {}
func (s *nopTransporter) Pause() {}

View File

@ -37,6 +37,7 @@ type Transporter interface {
Send(m []raftpb.Message)
AddPeer(id types.ID, urls []string)
RemovePeer(id types.ID)
RemoveAllPeers()
UpdatePeer(id types.ID, urls []string)
Stop()
}
@ -132,6 +133,19 @@ func (t *transport) AddPeer(id types.ID, urls []string) {
func (t *transport) RemovePeer(id types.ID) {
t.mu.Lock()
defer t.mu.Unlock()
t.removePeer(id)
}
func (t *transport) RemoveAllPeers() {
t.mu.Lock()
defer t.mu.Unlock()
for id, _ := range t.peers {
t.removePeer(id)
}
}
// the caller of this function must have the peers mutex.
func (t *transport) removePeer(id types.ID) {
if peer, ok := t.peers[id]; ok {
peer.Stop()
} else {