diff --git a/etcdserver/sendhub.go b/etcdserver/sendhub.go index 529c5c416..870e41398 100644 --- a/etcdserver/sendhub.go +++ b/etcdserver/sendhub.go @@ -32,6 +32,16 @@ const ( raftPrefix = "/raft" ) +type SendHub interface { + rafthttp.SenderFinder + Send(m []raftpb.Message) + Add(m *Member) + Remove(id types.ID) + Update(m *Member) + Stop() + ShouldStopNotify() <-chan struct{} +} + type sendHub struct { tr http.RoundTripper cl ClusterInfo @@ -129,3 +139,16 @@ func (h *sendHub) Update(m *Member) { u.Path = path.Join(u.Path, raftPrefix) h.senders[m.ID].Update(u.String()) } + +// for testing +func (h *sendHub) pause() { + for _, s := range h.senders { + s.Pause() + } +} + +func (h *sendHub) resume() { + for _, s := range h.senders { + s.Resume() + } +} diff --git a/etcdserver/server.go b/etcdserver/server.go index d1fda20d9..0d52615b0 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -90,16 +90,6 @@ type Response struct { err error } -type SendHub interface { - rafthttp.SenderFinder - Send(m []raftpb.Message) - Add(m *Member) - Remove(id types.ID) - Update(m *Member) - Stop() - ShouldStopNotify() <-chan struct{} -} - type Storage interface { // Save function saves ents and state to the underlying stable storage. // Save MUST block until st and ents are on stable storage. @@ -860,6 +850,17 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) { log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index) } +// for testing +func (s *EtcdServer) PauseSending() { + hub := s.sendhub.(*sendHub) + hub.pause() +} + +func (s *EtcdServer) ResumeSending() { + hub := s.sendhub.(*sendHub) + hub.resume() +} + // checkClientURLsEmptyFromPeers does its best to get the cluster from peers, // and if this succeeds, checks that the member of the given id exists in the // cluster, and its ClientURLs is empty. diff --git a/integration/cluster_test.go b/integration/cluster_test.go index f333a78f9..de90dab20 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -34,6 +34,7 @@ import ( "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/etcdhttp" "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" + "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/rafthttp" @@ -391,8 +392,9 @@ type member struct { etcdserver.ServerConfig PeerListeners, ClientListeners []net.Listener - s *etcdserver.EtcdServer - hss []*httptest.Server + raftHandler *testutil.PauseableHandler + s *etcdserver.EtcdServer + hss []*httptest.Server } func mustNewMember(t *testing.T, name string) *member { @@ -469,10 +471,12 @@ func (m *member) Launch() error { m.s.SyncTicker = time.Tick(500 * time.Millisecond) m.s.Start() + m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)} + for _, ln := range m.PeerListeners { hs := &httptest.Server{ Listener: ln, - Config: &http.Server{Handler: etcdhttp.NewPeerHandler(m.s)}, + Config: &http.Server{Handler: m.raftHandler}, } hs.Start() m.hss = append(m.hss, hs) @@ -488,6 +492,16 @@ func (m *member) Launch() error { return nil } +func (m *member) Pause() { + m.raftHandler.Pause() + m.s.PauseSending() +} + +func (m *member) Resume() { + m.raftHandler.Resume() + m.s.ResumeSending() +} + // Stop stops the member, but the data dir of the member is preserved. func (m *member) Stop(t *testing.T) { m.s.Stop() diff --git a/integration/member_test.go b/integration/member_test.go index b0679b318..f844db51a 100644 --- a/integration/member_test.go +++ b/integration/member_test.go @@ -4,8 +4,24 @@ import ( "io/ioutil" "os" "testing" + "time" ) +func TestPauseMember(t *testing.T) { + defer afterTest(t) + c := NewCluster(t, 5) + c.Launch(t) + defer c.Terminate(t) + + for i := 0; i < 5; i++ { + c.Members[i].Pause() + time.Sleep(20 * tickDuration) + c.Members[i].Resume() + } + c.waitLeader(t) + clusterMustProgress(t, c) +} + func TestRestartMember(t *testing.T) { defer afterTest(t) c := NewCluster(t, 3) diff --git a/pkg/testutil/pauseable_handler.go b/pkg/testutil/pauseable_handler.go new file mode 100644 index 000000000..6062caccf --- /dev/null +++ b/pkg/testutil/pauseable_handler.go @@ -0,0 +1,61 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package testutil + +import ( + "net/http" + "sync" +) + +type PauseableHandler struct { + Next http.Handler + mu sync.Mutex + paused bool +} + +func (ph *PauseableHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ph.mu.Lock() + paused := ph.paused + ph.mu.Unlock() + if !paused { + ph.Next.ServeHTTP(w, r) + } else { + hj, ok := w.(http.Hijacker) + if !ok { + panic("webserver doesn't support hijacking") + return + } + conn, _, err := hj.Hijack() + if err != nil { + panic(err.Error()) + return + } + conn.Close() + } +} + +func (ph *PauseableHandler) Pause() { + ph.mu.Lock() + defer ph.mu.Unlock() + ph.paused = true +} + +func (ph *PauseableHandler) Resume() { + ph.mu.Lock() + defer ph.mu.Unlock() + ph.paused = false +} diff --git a/rafthttp/sender.go b/rafthttp/sender.go index 008e70673..13290e8f4 100644 --- a/rafthttp/sender.go +++ b/rafthttp/sender.go @@ -51,6 +51,13 @@ type Sender interface { // Stop performs any necessary finalization and terminates the Sender // elegantly. Stop() + + // Pause pauses the sender. The sender will simply drops all incoming + // messages without retruning an error. + Pause() + + // Resume resumes a paused sender. + Resume() } func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender { @@ -85,8 +92,9 @@ type sender struct { strmSrvMu sync.Mutex q chan []byte - mu sync.RWMutex - wg sync.WaitGroup + paused bool + mu sync.RWMutex + wg sync.WaitGroup } func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) { @@ -112,6 +120,13 @@ func (s *sender) Update(u string) { // TODO (xiangli): reasonable retry logic func (s *sender) Send(m raftpb.Message) error { + s.mu.RLock() + pause := s.paused + s.mu.RUnlock() + if pause { + return nil + } + s.maybeStopStream(m.Term) if shouldInitStream(m) && !s.hasStreamClient() { s.initStream(types.ID(m.From), types.ID(m.To), m.Term) @@ -152,6 +167,18 @@ func (s *sender) Stop() { } } +func (s *sender) Pause() { + s.mu.Lock() + defer s.mu.Unlock() + s.paused = true +} + +func (s *sender) Resume() { + s.mu.Lock() + defer s.mu.Unlock() + s.paused = false +} + func (s *sender) maybeStopStream(term uint64) { if s.strmCln != nil && term > s.strmCln.term { s.strmCln.stop()