diff --git a/embed/config.go b/embed/config.go index 88e357270..f094ce25b 100644 --- a/embed/config.go +++ b/embed/config.go @@ -56,6 +56,7 @@ const ( DefaultGRPCKeepAliveMinTime = 5 * time.Second DefaultGRPCKeepAliveInterval = 2 * time.Hour DefaultGRPCKeepAliveTimeout = 20 * time.Second + DefaultDowngradeCheckTime = 5 * time.Second DefaultListenPeerURLs = "http://localhost:2380" DefaultListenClientURLs = "http://localhost:2379" @@ -330,6 +331,8 @@ type Config struct { // UnsafeNoFsync disables all uses of fsync. // Setting this is unsafe and will cause data loss. UnsafeNoFsync bool `json:"unsafe-no-fsync"` + + ExperimentalDowngradeCheckTime time.Duration `json:"experimental-downgrade-check-time"` } // configYAML holds the config suitable for yaml parsing @@ -413,6 +416,8 @@ func NewConfig() *Config { LogOutputs: []string{DefaultLogOutput}, LogLevel: logutil.DefaultLogLevel, EnableGRPCGateway: true, + + ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime, } cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) return cfg diff --git a/embed/etcd.go b/embed/etcd.go index 95e43ced9..e8aaf73c4 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -201,6 +201,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, + DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime, } print(e.cfg.logger, *cfg, srvcfg, memberInitialized) if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { @@ -303,6 +304,7 @@ func print(lg *zap.Logger, ec Config, sc etcdserver.ServerConfig, memberInitiali zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()), zap.String("discovery-url", sc.DiscoveryURL), zap.String("discovery-proxy", sc.DiscoveryProxy), + zap.String("downgrade-check-interval", sc.DowngradeCheckTime.String()), ) } diff --git a/etcdmain/config.go b/etcdmain/config.go index 7295f34f9..76cd76d33 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -252,6 +252,7 @@ func newConfig() *config { fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.") fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.") fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.") + fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.") // unsafe fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.") diff --git a/etcdserver/api/etcdhttp/peer.go b/etcdserver/api/etcdhttp/peer.go index 0b97c05fb..0cd793025 100644 --- a/etcdserver/api/etcdhttp/peer.go +++ b/etcdserver/api/etcdhttp/peer.go @@ -38,7 +38,7 @@ const ( // NewPeerHandler generates an http.Handler to handle etcd peer requests. func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler { - return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler()) + return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler()) } func newPeerHandler( @@ -47,6 +47,7 @@ func newPeerHandler( raftHandler http.Handler, leaseHandler http.Handler, hashKVHandler http.Handler, + downgradeEnabledHandler http.Handler, ) http.Handler { if lg == nil { lg = zap.NewNop() @@ -64,6 +65,9 @@ func newPeerHandler( mux.Handle(leasehttp.LeasePrefix, leaseHandler) mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler) } + if downgradeEnabledHandler != nil { + mux.Handle(etcdserver.DowngradeEnabledPath, downgradeEnabledHandler) + } if hashKVHandler != nil { mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler) } diff --git a/etcdserver/api/etcdhttp/peer_test.go b/etcdserver/api/etcdhttp/peer_test.go index 89222d46a..58ea43b1b 100644 --- a/etcdserver/api/etcdhttp/peer_test.go +++ b/etcdserver/api/etcdhttp/peer_test.go @@ -83,7 +83,7 @@ var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Reque // TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that // handles raft-prefix requests well. func TestNewPeerHandlerOnRaftPrefix(t *testing.T) { - ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil) + ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil, nil) srv := httptest.NewServer(ph) defer srv.Close() @@ -231,7 +231,7 @@ func TestServeMemberPromoteFails(t *testing.T) { // TestNewPeerHandlerOnMembersPromotePrefix verifies the request with members promote prefix is routed correctly func TestNewPeerHandlerOnMembersPromotePrefix(t *testing.T) { - ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil) + ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil, nil) srv := httptest.NewServer(ph) defer srv.Close() diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index 8b9edbb03..84c567cfd 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -21,6 +21,7 @@ import ( "io/ioutil" "net/http" "sort" + "strconv" "strings" "time" @@ -369,9 +370,103 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R // getDowngradeEnabledFromRemotePeers will get the downgrade enabled status of the cluster. func getDowngradeEnabledFromRemotePeers(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool { + members := cl.Members() + + for _, m := range members { + if m.ID == local { + continue + } + enable, err := getDowngradeEnabled(lg, m, rt) + if err != nil { + lg.Warn("failed to get downgrade enabled status", zap.String("remote-member-id", m.ID.String()), zap.Error(err)) + } else { + // Since the "/downgrade/enabled" serves linearized data, + // this function can return once it gets a non-error response from the endpoint. + return enable + } + } return false } +// getDowngradeEnabled returns the downgrade enabled status of the given member +// via its peerURLs. Returns the last error if it fails to get it. +func getDowngradeEnabled(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (bool, error) { + cc := &http.Client{ + Transport: rt, + } + var ( + err error + resp *http.Response + ) + + for _, u := range m.PeerURLs { + addr := u + DowngradeEnabledPath + resp, err = cc.Get(addr) + if err != nil { + lg.Warn( + "failed to reach the peer URL", + zap.String("address", addr), + zap.String("remote-member-id", m.ID.String()), + zap.Error(err), + ) + continue + } + var b []byte + b, err = ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + lg.Warn( + "failed to read body of response", + zap.String("address", addr), + zap.String("remote-member-id", m.ID.String()), + zap.Error(err), + ) + continue + } + var enable bool + if enable, err = strconv.ParseBool(string(b)); err != nil { + lg.Warn( + "failed to convert response", + zap.String("address", addr), + zap.String("remote-member-id", m.ID.String()), + zap.Error(err), + ) + continue + } + return enable, nil + } + return false, err +} + +// isMatchedVersions returns true if all server versions are equal to target version, otherwise return false. +// It can be used to decide the whether the cluster finishes downgrading to target version. +func isMatchedVersions(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool { + for mid, ver := range vers { + if ver == nil { + return false + } + v, err := semver.NewVersion(ver.Cluster) + if err != nil { + lg.Warn( + "failed to parse server version of remote member", + zap.String("remote-member-id", mid), + zap.String("remote-member-version", ver.Server), + zap.Error(err), + ) + return false + } + if !targetVersion.Equal(*v) { + lg.Warn("remotes server has mismatching etcd version", + zap.String("remote-member-id", mid), + zap.String("current-server-version", v.String()), + zap.String("target-version", targetVersion.String()), + ) + return false + } + } + return true +} + func convertToClusterVersion(v string) (*semver.Version, error) { ver, err := semver.NewVersion(v) if err != nil { diff --git a/etcdserver/cluster_util_test.go b/etcdserver/cluster_util_test.go index 8571cc08a..67978db1d 100644 --- a/etcdserver/cluster_util_test.go +++ b/etcdserver/cluster_util_test.go @@ -215,3 +215,52 @@ func TestDecideAllowedVersionRange(t *testing.T) { }) } } + +func TestIsMatchedVersions(t *testing.T) { + tests := []struct { + name string + targetVersion *semver.Version + versionMap map[string]*version.Versions + expectedFinished bool + }{ + { + "When downgrade finished", + &semver.Version{Major: 3, Minor: 4}, + map[string]*version.Versions{ + "mem1": {Server: "3.4.1", Cluster: "3.4.0"}, + "mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"}, + "mem3": {Server: "3.4.2", Cluster: "3.4.0"}, + }, + true, + }, + { + "When cannot parse peer version", + &semver.Version{Major: 3, Minor: 4}, + map[string]*version.Versions{ + "mem1": {Server: "3.4.1", Cluster: "3.4"}, + "mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"}, + "mem3": {Server: "3.4.2", Cluster: "3.4.0"}, + }, + false, + }, + { + "When downgrade not finished", + &semver.Version{Major: 3, Minor: 4}, + map[string]*version.Versions{ + "mem1": {Server: "3.4.1", Cluster: "3.4.0"}, + "mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"}, + "mem3": {Server: "3.5.2", Cluster: "3.5.0"}, + }, + false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual := isMatchedVersions(zap.NewNop(), tt.targetVersion, tt.versionMap) + if actual != tt.expectedFinished { + t.Errorf("expected downgrade finished is %v; got %v", tt.expectedFinished, actual) + } + }) + } +} diff --git a/etcdserver/config.go b/etcdserver/config.go index 773333756..d2d4dd9f9 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -162,6 +162,8 @@ type ServerConfig struct { // UnsafeNoFsync disables all uses of fsync. // Setting this is unsafe and will cause data loss. UnsafeNoFsync bool `json:"unsafe-no-fsync"` + + DowngradeCheckTime time.Duration } // VerifyBootstrap sanity-checks the initial config for bootstrap case diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index a57b993d7..46fdf4a47 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -337,11 +337,6 @@ func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo return nil, ErrCorrupt } -type ServerPeerV2 interface { - ServerPeer - HashKVHandler() http.Handler -} - const PeerHashKVPath = "/members/hashkv" type hashKVHandler struct { diff --git a/etcdserver/server.go b/etcdserver/server.go index fb5d04ea6..3db0c706e 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -25,6 +25,7 @@ import ( "os" "path" "regexp" + "strconv" "sync" "sync/atomic" "time" @@ -101,6 +102,8 @@ const ( recommendedMaxRequestBytes = 10 * 1024 * 1024 readyPercent = 0.9 + + DowngradeEnabledPath = "/downgrade/enabled" ) var ( @@ -705,6 +708,7 @@ func (s *EtcdServer) Start() { s.GoAttach(s.monitorVersions) s.GoAttach(s.linearizableReadLoop) s.GoAttach(s.monitorKVHash) + s.GoAttach(s.monitorDowngrade) } // start prepares and starts server in a new goroutine. It is no longer safe to @@ -814,6 +818,56 @@ func (s *EtcdServer) LeaseHandler() http.Handler { func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() } +type ServerPeerV2 interface { + ServerPeer + HashKVHandler() http.Handler + DowngradeEnabledHandler() http.Handler +} + +func (s *EtcdServer) DowngradeInfo() *membership.DowngradeInfo { return s.cluster.DowngradeInfo() } + +type downgradeEnabledHandler struct { + lg *zap.Logger + cluster api.Cluster + server *EtcdServer +} + +func (s *EtcdServer) DowngradeEnabledHandler() http.Handler { + return &downgradeEnabledHandler{ + lg: s.getLogger(), + cluster: s.cluster, + server: s, + } +} + +func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.Header().Set("Allow", http.MethodGet) + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) + + if r.URL.Path != DowngradeEnabledPath { + http.Error(w, "bad path", http.StatusBadRequest) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), h.server.Cfg.ReqTimeout()) + defer cancel() + + // serve with linearized downgrade info + if err := h.server.linearizableReadNotify(ctx); err != nil { + http.Error(w, fmt.Sprintf("failed linearized read: %v", err), + http.StatusInternalServerError) + return + } + enabled := h.server.DowngradeInfo().Enabled + w.Header().Set("Content-Type", "text/plain") + w.Write([]byte(strconv.FormatBool(enabled))) +} + // Process takes a raft message and applies it to the server's raft state // machine, respecting any timeout of the given context. func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { @@ -2318,6 +2372,41 @@ func (s *EtcdServer) updateClusterVersion(ver string) { } } +func (s *EtcdServer) monitorDowngrade() { + t := s.Cfg.DowngradeCheckTime + if t == 0 { + return + } + lg := s.getLogger() + for { + select { + case <-time.After(t): + case <-s.stopping: + return + } + + if !s.isLeader() { + continue + } + + d := s.cluster.DowngradeInfo() + if !d.Enabled { + continue + } + + targetVersion := d.TargetVersion + v := semver.Must(semver.NewVersion(targetVersion)) + if isMatchedVersions(s.getLogger(), v, getVersions(s.getLogger(), s.cluster, s.id, s.peerRt)) { + lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion)) + ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) + if _, err := s.downgradeCancel(ctx); err != nil { + lg.Warn("failed to cancel downgrade", zap.Error(err)) + } + cancel() + } + } +} + func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { switch err { case context.Canceled: