Merge pull request #1992 from xiang90/rm_leader

*: support removing the leader from a 2 members cluster
release-2.0
Xiang Li 2015-01-02 14:15:12 -08:00
commit 2a83e350b1
3 changed files with 25 additions and 9 deletions

View File

@ -420,7 +420,9 @@ func (s *EtcdServer) run() {
}
if len(ents) > 0 {
if appliedi, shouldstop = s.apply(ents, &confState); shouldstop {
return
m1 := fmt.Sprintf("etcdserver: removed local member %s from cluster %s", s.ID(), s.Cluster.ID())
m2 := fmt.Sprint("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
go s.stopWithDelay(10*100*time.Millisecond, m1, m2)
}
}
}
@ -445,14 +447,26 @@ func (s *EtcdServer) run() {
// Stop stops the server gracefully, and shuts down the running goroutine.
// Stop should be called after a Start(s), otherwise it will block forever.
func (s *EtcdServer) Stop() {
s.stopWithMessages()
}
func (s *EtcdServer) stopWithMessages(msgs ...string) {
select {
case s.stop <- struct{}{}:
for _, msg := range msgs {
log.Println(msg)
}
case <-s.done:
return
}
<-s.done
}
func (s *EtcdServer) stopWithDelay(d time.Duration, msgs ...string) {
time.Sleep(d)
s.stopWithMessages(msgs...)
}
// StopNotify returns a channel that receives a empty struct
// when the server is stopped.
func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
@ -662,6 +676,8 @@ func (s *EtcdServer) send(ms []raftpb.Message) {
// The given entries should not be empty.
func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) {
var applied uint64
var shouldstop bool
var err error
for i := range es {
e := es[i]
switch e.Type {
@ -672,11 +688,8 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
shouldstop, err := s.applyConfChange(cc, confState)
shouldstop, err = s.applyConfChange(cc, confState)
s.w.Trigger(cc.ID, err)
if shouldstop {
return applied, true
}
default:
log.Panicf("entry type should be either EntryNormal or EntryConfChange")
}
@ -684,7 +697,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
atomic.StoreUint64(&s.raftTerm, e.Term)
applied = e.Index
}
return applied, false
return applied, shouldstop
}
// applyRequest interprets r as a call to store.X and returns a Response interpreted
@ -765,8 +778,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
id := types.ID(cc.NodeID)
s.Cluster.RemoveMember(id)
if id == s.id {
log.Printf("etcdserver: removed local member %s from cluster %s", id, s.Cluster.ID())
log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
return true, nil
} else {
s.transport.RemovePeer(id)

View File

@ -113,7 +113,7 @@ func testDecreaseClusterSize(t *testing.T, size int) {
defer c.Terminate(t)
// TODO: remove the last but one member
for i := 0; i < size-2; i++ {
for i := 0; i < size-1; i++ {
id := c.Members[len(c.Members)-1].s.ID()
c.RemoveMember(t, uint64(id))
c.waitLeader(t, c.Members)

View File

@ -284,6 +284,11 @@ func (n *node) run(r *raft) {
case pb.ConfChangeAddNode:
r.addNode(cc.NodeID)
case pb.ConfChangeRemoveNode:
// block incoming proposal when local node is
// removed
if cc.NodeID == r.id {
n.propc = nil
}
r.removeNode(cc.NodeID)
case pb.ConfChangeUpdateNode:
r.resetPendingConf()