From 64a0e346022d828b6b3771e3142b9a1be5a781a5 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Sat, 13 Aug 2016 12:41:04 -0700 Subject: [PATCH] etcdserver: transfer leadership when stopping --- etcdserver/errors.go | 1 + etcdserver/server.go | 65 ++++++++++++++++++++++++++++-- etcdserver/util.go | 27 +++++++++++++ etcdserver/util_test.go | 89 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 179 insertions(+), 3 deletions(-) create mode 100644 etcdserver/util_test.go diff --git a/etcdserver/errors.go b/etcdserver/errors.go index 032960aff..ce9d0cd30 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -26,6 +26,7 @@ var ( ErrTimeout = errors.New("etcdserver: request timed out") ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") + ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long") ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") ErrNoLeader = errors.New("etcdserver: no leader") ErrRequestTooLarge = errors.New("etcdserver: request is too large") diff --git a/etcdserver/server.go b/etcdserver/server.go index 67d3043e6..aa830a603 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -775,9 +775,57 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { ep.snapi = ep.appliedi } -// Stop stops the server gracefully, and shuts down the running goroutine. -// Stop should be called after a Start(s), otherwise it will block forever. -func (s *EtcdServer) Stop() { +func (s *EtcdServer) isMultiNode() bool { + return s.cluster != nil && len(s.cluster.MemberIDs()) > 1 +} + +func (s *EtcdServer) isLeader() bool { + return uint64(s.ID()) == s.Lead() +} + +// transferLeadership transfers the leader to the given transferee. +// TODO: maybe expose to client? +func (s *EtcdServer) transferLeadership(ctx context.Context, lead, transferee uint64) error { + now := time.Now() + interval := time.Duration(s.Cfg.TickMs) * time.Millisecond + + plog.Infof("%s starts leadership transfer from %s to %s", s.ID(), types.ID(lead), types.ID(transferee)) + s.r.TransferLeadership(ctx, lead, transferee) + for s.Lead() != transferee { + select { + case <-ctx.Done(): // time out + return ErrTimeoutLeaderTransfer + case <-time.After(interval): + } + } + + // TODO: drain all requests, or drop all messages to the old leader + + plog.Infof("%s finished leadership transfer from %s to to %s (took %v)", s.ID(), types.ID(lead), types.ID(transferee), time.Since(now)) + return nil +} + +// TransferLeadership transfers the leader to the chosen transferee. +func (s *EtcdServer) TransferLeadership() error { + if !s.isMultiNode() || !s.isLeader() { + plog.Printf("skipping leader transfer since multi-node %v or is-leader %v", s.isMultiNode(), s.isLeader()) + return nil + } + + transferee, ok := longestConnected(s.r.transport, s.cluster.MemberIDs()) + if !ok { + return ErrUnhealthy + } + + tm := s.Cfg.ReqTimeout() + ctx, cancel := context.WithTimeout(context.TODO(), tm) + err := s.transferLeadership(ctx, s.Lead(), uint64(transferee)) + cancel() + return err +} + +// HardStop stops the server without coordination with other members in the cluster. +func (s *EtcdServer) HardStop() { select { case s.stop <- struct{}{}: case <-s.done: @@ -786,6 +834,17 @@ func (s *EtcdServer) Stop() { <-s.done } +// Stop stops the server gracefully, and shuts down the running goroutine. +// Stop should be called after a Start(s), otherwise it will block forever. +// When stopping leader, Stop transfers its leadership to one of its peers +// before stopping the server. +func (s *EtcdServer) Stop() { + if err := s.TransferLeadership(); err != nil { + plog.Warningf("%s failed to transfer leadership (%v)", s.ID(), err) + } + s.HardStop() +} + // ReadyNotify returns a channel that will be closed when the server // is ready to serve client requests func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych } diff --git a/etcdserver/util.go b/etcdserver/util.go index 8365bf7cf..f189cd946 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -52,3 +52,30 @@ func numConnectedSince(transport rafthttp.Transporter, since time.Time, self typ } return connectedNum } + +// longestConnected chooses the member with longest active-since-time. +// It returns false, if nothing is active. +func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool) { + var longest types.ID + var oldest time.Time + for _, id := range membs { + tm := tp.ActiveSince(id) + if tm.IsZero() { // inactive + continue + } + + if oldest.IsZero() { // first longest candidate + oldest = tm + longest = id + } + + if tm.Before(oldest) { + oldest = tm + longest = id + } + } + if uint64(longest) == 0 { + return longest, false + } + return longest, true +} diff --git a/etcdserver/util_test.go b/etcdserver/util_test.go new file mode 100644 index 000000000..79edabd12 --- /dev/null +++ b/etcdserver/util_test.go @@ -0,0 +1,89 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "net/http" + "testing" + "time" + + "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/rafthttp" + "github.com/coreos/etcd/snap" +) + +func TestLongestConnected(t *testing.T) { + umap, err := types.NewURLsMap("mem1=http://10.1:2379,mem2=http://10.2:2379,mem3=http://10.3:2379") + if err != nil { + t.Fatal(err) + } + clus, err := membership.NewClusterFromURLsMap("test", umap) + if err != nil { + t.Fatal(err) + } + memberIDs := clus.MemberIDs() + + tr := newNopTransporterWithActiveTime(memberIDs) + transferee, ok := longestConnected(tr, memberIDs) + if !ok { + t.Fatalf("unexpected ok %v", ok) + } + if memberIDs[0] != transferee { + t.Fatalf("expected first member %s to be transferee, got %s", memberIDs[0], transferee) + } + + // make all members non-active + amap := make(map[types.ID]time.Time) + for _, id := range memberIDs { + amap[id] = time.Time{} + } + tr.(*nopTransporterWithActiveTime).reset(amap) + + _, ok2 := longestConnected(tr, memberIDs) + if ok2 { + t.Fatalf("unexpected ok %v", ok) + } +} + +type nopTransporterWithActiveTime struct { + activeMap map[types.ID]time.Time +} + +// newNopTransporterWithActiveTime creates nopTransporterWithActiveTime with the first member +// being the most stable (longest active-since time). +func newNopTransporterWithActiveTime(memberIDs []types.ID) rafthttp.Transporter { + am := make(map[types.ID]time.Time) + for i, id := range memberIDs { + am[id] = time.Now().Add(time.Duration(i) * time.Second) + } + return &nopTransporterWithActiveTime{activeMap: am} +} + +func (s *nopTransporterWithActiveTime) Start() error { return nil } +func (s *nopTransporterWithActiveTime) Handler() http.Handler { return nil } +func (s *nopTransporterWithActiveTime) Send(m []raftpb.Message) {} +func (s *nopTransporterWithActiveTime) SendSnapshot(m snap.Message) {} +func (s *nopTransporterWithActiveTime) AddRemote(id types.ID, us []string) {} +func (s *nopTransporterWithActiveTime) AddPeer(id types.ID, us []string) {} +func (s *nopTransporterWithActiveTime) RemovePeer(id types.ID) {} +func (s *nopTransporterWithActiveTime) RemoveAllPeers() {} +func (s *nopTransporterWithActiveTime) UpdatePeer(id types.ID, us []string) {} +func (s *nopTransporterWithActiveTime) ActiveSince(id types.ID) time.Time { return s.activeMap[id] } +func (s *nopTransporterWithActiveTime) Stop() {} +func (s *nopTransporterWithActiveTime) Pause() {} +func (s *nopTransporterWithActiveTime) Resume() {} +func (s *nopTransporterWithActiveTime) reset(am map[types.ID]time.Time) { s.activeMap = am }