etcdserver: transfer leadership when stopping

release-3.1
Gyu-Ho Lee 2016-08-13 12:41:04 -07:00
parent 7b11c288fe
commit 64a0e34602
4 changed files with 179 additions and 3 deletions

View File

@ -26,6 +26,7 @@ var (
ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrNoLeader = errors.New("etcdserver: no leader")
ErrRequestTooLarge = errors.New("etcdserver: request is too large")

View File

@ -775,9 +775,57 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
ep.snapi = ep.appliedi
}
// Stop stops the server gracefully, and shuts down the running goroutine.
// Stop should be called after a Start(s), otherwise it will block forever.
func (s *EtcdServer) Stop() {
func (s *EtcdServer) isMultiNode() bool {
return s.cluster != nil && len(s.cluster.MemberIDs()) > 1
}
func (s *EtcdServer) isLeader() bool {
return uint64(s.ID()) == s.Lead()
}
// transferLeadership transfers the leader to the given transferee.
// TODO: maybe expose to client?
func (s *EtcdServer) transferLeadership(ctx context.Context, lead, transferee uint64) error {
now := time.Now()
interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
plog.Infof("%s starts leadership transfer from %s to %s", s.ID(), types.ID(lead), types.ID(transferee))
s.r.TransferLeadership(ctx, lead, transferee)
for s.Lead() != transferee {
select {
case <-ctx.Done(): // time out
return ErrTimeoutLeaderTransfer
case <-time.After(interval):
}
}
// TODO: drain all requests, or drop all messages to the old leader
plog.Infof("%s finished leadership transfer from %s to to %s (took %v)", s.ID(), types.ID(lead), types.ID(transferee), time.Since(now))
return nil
}
// TransferLeadership transfers the leader to the chosen transferee.
func (s *EtcdServer) TransferLeadership() error {
if !s.isMultiNode() || !s.isLeader() {
plog.Printf("skipping leader transfer since multi-node %v or is-leader %v", s.isMultiNode(), s.isLeader())
return nil
}
transferee, ok := longestConnected(s.r.transport, s.cluster.MemberIDs())
if !ok {
return ErrUnhealthy
}
tm := s.Cfg.ReqTimeout()
ctx, cancel := context.WithTimeout(context.TODO(), tm)
err := s.transferLeadership(ctx, s.Lead(), uint64(transferee))
cancel()
return err
}
// HardStop stops the server without coordination with other members in the cluster.
func (s *EtcdServer) HardStop() {
select {
case s.stop <- struct{}{}:
case <-s.done:
@ -786,6 +834,17 @@ func (s *EtcdServer) Stop() {
<-s.done
}
// Stop stops the server gracefully, and shuts down the running goroutine.
// Stop should be called after a Start(s), otherwise it will block forever.
// When stopping leader, Stop transfers its leadership to one of its peers
// before stopping the server.
func (s *EtcdServer) Stop() {
if err := s.TransferLeadership(); err != nil {
plog.Warningf("%s failed to transfer leadership (%v)", s.ID(), err)
}
s.HardStop()
}
// ReadyNotify returns a channel that will be closed when the server
// is ready to serve client requests
func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych }

View File

@ -52,3 +52,30 @@ func numConnectedSince(transport rafthttp.Transporter, since time.Time, self typ
}
return connectedNum
}
// longestConnected chooses the member with longest active-since-time.
// It returns false, if nothing is active.
func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool) {
var longest types.ID
var oldest time.Time
for _, id := range membs {
tm := tp.ActiveSince(id)
if tm.IsZero() { // inactive
continue
}
if oldest.IsZero() { // first longest candidate
oldest = tm
longest = id
}
if tm.Before(oldest) {
oldest = tm
longest = id
}
}
if uint64(longest) == 0 {
return longest, false
}
return longest, true
}

89
etcdserver/util_test.go Normal file
View File

@ -0,0 +1,89 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import (
"net/http"
"testing"
"time"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/snap"
)
func TestLongestConnected(t *testing.T) {
umap, err := types.NewURLsMap("mem1=http://10.1:2379,mem2=http://10.2:2379,mem3=http://10.3:2379")
if err != nil {
t.Fatal(err)
}
clus, err := membership.NewClusterFromURLsMap("test", umap)
if err != nil {
t.Fatal(err)
}
memberIDs := clus.MemberIDs()
tr := newNopTransporterWithActiveTime(memberIDs)
transferee, ok := longestConnected(tr, memberIDs)
if !ok {
t.Fatalf("unexpected ok %v", ok)
}
if memberIDs[0] != transferee {
t.Fatalf("expected first member %s to be transferee, got %s", memberIDs[0], transferee)
}
// make all members non-active
amap := make(map[types.ID]time.Time)
for _, id := range memberIDs {
amap[id] = time.Time{}
}
tr.(*nopTransporterWithActiveTime).reset(amap)
_, ok2 := longestConnected(tr, memberIDs)
if ok2 {
t.Fatalf("unexpected ok %v", ok)
}
}
type nopTransporterWithActiveTime struct {
activeMap map[types.ID]time.Time
}
// newNopTransporterWithActiveTime creates nopTransporterWithActiveTime with the first member
// being the most stable (longest active-since time).
func newNopTransporterWithActiveTime(memberIDs []types.ID) rafthttp.Transporter {
am := make(map[types.ID]time.Time)
for i, id := range memberIDs {
am[id] = time.Now().Add(time.Duration(i) * time.Second)
}
return &nopTransporterWithActiveTime{activeMap: am}
}
func (s *nopTransporterWithActiveTime) Start() error { return nil }
func (s *nopTransporterWithActiveTime) Handler() http.Handler { return nil }
func (s *nopTransporterWithActiveTime) Send(m []raftpb.Message) {}
func (s *nopTransporterWithActiveTime) SendSnapshot(m snap.Message) {}
func (s *nopTransporterWithActiveTime) AddRemote(id types.ID, us []string) {}
func (s *nopTransporterWithActiveTime) AddPeer(id types.ID, us []string) {}
func (s *nopTransporterWithActiveTime) RemovePeer(id types.ID) {}
func (s *nopTransporterWithActiveTime) RemoveAllPeers() {}
func (s *nopTransporterWithActiveTime) UpdatePeer(id types.ID, us []string) {}
func (s *nopTransporterWithActiveTime) ActiveSince(id types.ID) time.Time { return s.activeMap[id] }
func (s *nopTransporterWithActiveTime) Stop() {}
func (s *nopTransporterWithActiveTime) Pause() {}
func (s *nopTransporterWithActiveTime) Resume() {}
func (s *nopTransporterWithActiveTime) reset(am map[types.ID]time.Time) { s.activeMap = am }