etcdserver: add getDowngradeEnabled http handler; attach downgrade monitor to server to monitor downgrade status.

release-3.5
yoyinzyc 2020-10-12 10:50:35 -07:00
parent 00e49d0c10
commit 3e8ffc7cda
10 changed files with 250 additions and 8 deletions

View File

@ -56,6 +56,7 @@ const (
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
DefaultGRPCKeepAliveTimeout = 20 * time.Second
DefaultDowngradeCheckTime = 5 * time.Second
DefaultListenPeerURLs = "http://localhost:2380"
DefaultListenClientURLs = "http://localhost:2379"
@ -330,6 +331,8 @@ type Config struct {
// UnsafeNoFsync disables all uses of fsync.
// Setting this is unsafe and will cause data loss.
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
ExperimentalDowngradeCheckTime time.Duration `json:"experimental-downgrade-check-time"`
}
// configYAML holds the config suitable for yaml parsing
@ -413,6 +416,8 @@ func NewConfig() *Config {
LogOutputs: []string{DefaultLogOutput},
LogLevel: logutil.DefaultLogLevel,
EnableGRPCGateway: true,
ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
}
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
return cfg

View File

@ -201,6 +201,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
}
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
@ -303,6 +304,7 @@ func print(lg *zap.Logger, ec Config, sc etcdserver.ServerConfig, memberInitiali
zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()),
zap.String("discovery-url", sc.DiscoveryURL),
zap.String("discovery-proxy", sc.DiscoveryProxy),
zap.String("downgrade-check-interval", sc.DowngradeCheckTime.String()),
)
}

View File

@ -252,6 +252,7 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
// unsafe
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")

View File

@ -38,7 +38,7 @@ const (
// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler {
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler())
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler())
}
func newPeerHandler(
@ -47,6 +47,7 @@ func newPeerHandler(
raftHandler http.Handler,
leaseHandler http.Handler,
hashKVHandler http.Handler,
downgradeEnabledHandler http.Handler,
) http.Handler {
if lg == nil {
lg = zap.NewNop()
@ -64,6 +65,9 @@ func newPeerHandler(
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
}
if downgradeEnabledHandler != nil {
mux.Handle(etcdserver.DowngradeEnabledPath, downgradeEnabledHandler)
}
if hashKVHandler != nil {
mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler)
}

View File

@ -83,7 +83,7 @@ var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Reque
// TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that
// handles raft-prefix requests well.
func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil)
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil, nil)
srv := httptest.NewServer(ph)
defer srv.Close()
@ -231,7 +231,7 @@ func TestServeMemberPromoteFails(t *testing.T) {
// TestNewPeerHandlerOnMembersPromotePrefix verifies the request with members promote prefix is routed correctly
func TestNewPeerHandlerOnMembersPromotePrefix(t *testing.T) {
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil)
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil, nil)
srv := httptest.NewServer(ph)
defer srv.Close()

View File

@ -21,6 +21,7 @@ import (
"io/ioutil"
"net/http"
"sort"
"strconv"
"strings"
"time"
@ -369,9 +370,103 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R
// getDowngradeEnabledFromRemotePeers will get the downgrade enabled status of the cluster.
func getDowngradeEnabledFromRemotePeers(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
members := cl.Members()
for _, m := range members {
if m.ID == local {
continue
}
enable, err := getDowngradeEnabled(lg, m, rt)
if err != nil {
lg.Warn("failed to get downgrade enabled status", zap.String("remote-member-id", m.ID.String()), zap.Error(err))
} else {
// Since the "/downgrade/enabled" serves linearized data,
// this function can return once it gets a non-error response from the endpoint.
return enable
}
}
return false
}
// getDowngradeEnabled returns the downgrade enabled status of the given member
// via its peerURLs. Returns the last error if it fails to get it.
func getDowngradeEnabled(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (bool, error) {
cc := &http.Client{
Transport: rt,
}
var (
err error
resp *http.Response
)
for _, u := range m.PeerURLs {
addr := u + DowngradeEnabledPath
resp, err = cc.Get(addr)
if err != nil {
lg.Warn(
"failed to reach the peer URL",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
continue
}
var b []byte
b, err = ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
lg.Warn(
"failed to read body of response",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
continue
}
var enable bool
if enable, err = strconv.ParseBool(string(b)); err != nil {
lg.Warn(
"failed to convert response",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
continue
}
return enable, nil
}
return false, err
}
// isMatchedVersions returns true if all server versions are equal to target version, otherwise return false.
// It can be used to decide the whether the cluster finishes downgrading to target version.
func isMatchedVersions(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool {
for mid, ver := range vers {
if ver == nil {
return false
}
v, err := semver.NewVersion(ver.Cluster)
if err != nil {
lg.Warn(
"failed to parse server version of remote member",
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
zap.Error(err),
)
return false
}
if !targetVersion.Equal(*v) {
lg.Warn("remotes server has mismatching etcd version",
zap.String("remote-member-id", mid),
zap.String("current-server-version", v.String()),
zap.String("target-version", targetVersion.String()),
)
return false
}
}
return true
}
func convertToClusterVersion(v string) (*semver.Version, error) {
ver, err := semver.NewVersion(v)
if err != nil {

View File

@ -215,3 +215,52 @@ func TestDecideAllowedVersionRange(t *testing.T) {
})
}
}
func TestIsMatchedVersions(t *testing.T) {
tests := []struct {
name string
targetVersion *semver.Version
versionMap map[string]*version.Versions
expectedFinished bool
}{
{
"When downgrade finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
true,
},
{
"When cannot parse peer version",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
false,
},
{
"When downgrade not finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.5.2", Cluster: "3.5.0"},
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := isMatchedVersions(zap.NewNop(), tt.targetVersion, tt.versionMap)
if actual != tt.expectedFinished {
t.Errorf("expected downgrade finished is %v; got %v", tt.expectedFinished, actual)
}
})
}
}

View File

@ -162,6 +162,8 @@ type ServerConfig struct {
// UnsafeNoFsync disables all uses of fsync.
// Setting this is unsafe and will cause data loss.
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
DowngradeCheckTime time.Duration
}
// VerifyBootstrap sanity-checks the initial config for bootstrap case

View File

@ -337,11 +337,6 @@ func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo
return nil, ErrCorrupt
}
type ServerPeerV2 interface {
ServerPeer
HashKVHandler() http.Handler
}
const PeerHashKVPath = "/members/hashkv"
type hashKVHandler struct {

View File

@ -25,6 +25,7 @@ import (
"os"
"path"
"regexp"
"strconv"
"sync"
"sync/atomic"
"time"
@ -101,6 +102,8 @@ const (
recommendedMaxRequestBytes = 10 * 1024 * 1024
readyPercent = 0.9
DowngradeEnabledPath = "/downgrade/enabled"
)
var (
@ -705,6 +708,7 @@ func (s *EtcdServer) Start() {
s.GoAttach(s.monitorVersions)
s.GoAttach(s.linearizableReadLoop)
s.GoAttach(s.monitorKVHash)
s.GoAttach(s.monitorDowngrade)
}
// start prepares and starts server in a new goroutine. It is no longer safe to
@ -814,6 +818,56 @@ func (s *EtcdServer) LeaseHandler() http.Handler {
func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
type ServerPeerV2 interface {
ServerPeer
HashKVHandler() http.Handler
DowngradeEnabledHandler() http.Handler
}
func (s *EtcdServer) DowngradeInfo() *membership.DowngradeInfo { return s.cluster.DowngradeInfo() }
type downgradeEnabledHandler struct {
lg *zap.Logger
cluster api.Cluster
server *EtcdServer
}
func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
return &downgradeEnabledHandler{
lg: s.getLogger(),
cluster: s.cluster,
server: s,
}
}
func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.Header().Set("Allow", http.MethodGet)
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
if r.URL.Path != DowngradeEnabledPath {
http.Error(w, "bad path", http.StatusBadRequest)
return
}
ctx, cancel := context.WithTimeout(context.Background(), h.server.Cfg.ReqTimeout())
defer cancel()
// serve with linearized downgrade info
if err := h.server.linearizableReadNotify(ctx); err != nil {
http.Error(w, fmt.Sprintf("failed linearized read: %v", err),
http.StatusInternalServerError)
return
}
enabled := h.server.DowngradeInfo().Enabled
w.Header().Set("Content-Type", "text/plain")
w.Write([]byte(strconv.FormatBool(enabled)))
}
// Process takes a raft message and applies it to the server's raft state
// machine, respecting any timeout of the given context.
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
@ -2318,6 +2372,41 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
}
}
func (s *EtcdServer) monitorDowngrade() {
t := s.Cfg.DowngradeCheckTime
if t == 0 {
return
}
lg := s.getLogger()
for {
select {
case <-time.After(t):
case <-s.stopping:
return
}
if !s.isLeader() {
continue
}
d := s.cluster.DowngradeInfo()
if !d.Enabled {
continue
}
targetVersion := d.TargetVersion
v := semver.Must(semver.NewVersion(targetVersion))
if isMatchedVersions(s.getLogger(), v, getVersions(s.getLogger(), s.cluster, s.id, s.peerRt)) {
lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion))
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if _, err := s.downgradeCancel(ctx); err != nil {
lg.Warn("failed to cancel downgrade", zap.Error(err))
}
cancel()
}
}
}
func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
switch err {
case context.Canceled: