etcdserver: fix server.Stop()

Stop should be idempotent. It should simply send a stop signal to the server.
It is the server's responsibility to stop the go-routines and related components.
release-2.0
Xiang Li 2014-11-13 13:47:12 -08:00
parent 8319d4dcbe
commit 30dfdb0ea9
2 changed files with 15 additions and 12 deletions

View File

@ -157,7 +157,7 @@ type RaftTimer interface {
type EtcdServer struct { type EtcdServer struct {
w wait.Wait w wait.Wait
done chan struct{} done chan struct{}
stopped chan struct{} stop chan struct{}
id types.ID id types.ID
attributes Attributes attributes Attributes
@ -301,7 +301,7 @@ func (s *EtcdServer) start() {
} }
s.w = wait.New() s.w = wait.New()
s.done = make(chan struct{}) s.done = make(chan struct{})
s.stopped = make(chan struct{}) s.stop = make(chan struct{})
s.stats.Initialize() s.stats.Initialize()
// TODO: if this is an empty log, writes all peer infos // TODO: if this is an empty log, writes all peer infos
// into the first entry // into the first entry
@ -371,8 +371,10 @@ func (s *EtcdServer) run() {
} }
case <-syncC: case <-syncC:
s.sync(defaultSyncTimeout) s.sync(defaultSyncTimeout)
case <-s.done: case <-s.stop:
close(s.stopped) s.node.Stop()
s.sender.Stop()
close(s.done)
return return
} }
} }
@ -381,10 +383,12 @@ func (s *EtcdServer) run() {
// Stop stops the server gracefully, and shuts down the running goroutine. // Stop stops the server gracefully, and shuts down the running goroutine.
// Stop should be called after a Start(s), otherwise it will block forever. // Stop should be called after a Start(s), otherwise it will block forever.
func (s *EtcdServer) Stop() { func (s *EtcdServer) Stop() {
s.node.Stop() select {
close(s.done) case s.stop <- struct{}{}:
<-s.stopped case <-s.done:
s.sender.Stop() return
}
<-s.done
} }
// Do interprets r and performs an operation on s.store according to r.Method // Do interprets r and performs an operation on s.store according to r.Method

View File

@ -1108,10 +1108,9 @@ func TestPublishStopped(t *testing.T) {
Cluster: &Cluster{}, Cluster: &Cluster{},
w: &waitRecorder{}, w: &waitRecorder{},
done: make(chan struct{}), done: make(chan struct{}),
stopped: make(chan struct{}), stop: make(chan struct{}),
} }
close(srv.stopped) close(srv.done)
srv.Stop()
srv.publish(time.Hour) srv.publish(time.Hour)
} }
@ -1123,7 +1122,7 @@ func TestPublishRetry(t *testing.T) {
w: &waitRecorder{}, w: &waitRecorder{},
done: make(chan struct{}), done: make(chan struct{}),
} }
time.AfterFunc(500*time.Microsecond, srv.Stop) time.AfterFunc(500*time.Microsecond, func() { close(srv.done) })
srv.publish(10 * time.Nanosecond) srv.publish(10 * time.Nanosecond)
action := n.Action() action := n.Action()