diff --git a/embed/config.go b/embed/config.go index c501a66f2..05f582311 100644 --- a/embed/config.go +++ b/embed/config.go @@ -281,8 +281,9 @@ type Config struct { ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"` ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"` // ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. - ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` - ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` + ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` + ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` + ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` // ForceNewCluster starts a new cluster even if previously started; unsafe. ForceNewCluster bool `json:"force-new-cluster"` diff --git a/embed/etcd.go b/embed/etcd.go index ec86319f0..85a4c7932 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -155,51 +155,52 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType) srvcfg := etcdserver.ServerConfig{ - Name: cfg.Name, - ClientURLs: cfg.ACUrls, - PeerURLs: cfg.APUrls, - DataDir: cfg.Dir, - DedicatedWALDir: cfg.WalDir, - SnapshotCount: cfg.SnapshotCount, - SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries, - MaxSnapFiles: cfg.MaxSnapFiles, - MaxWALFiles: cfg.MaxWalFiles, - InitialPeerURLsMap: urlsmap, - InitialClusterToken: token, - DiscoveryURL: cfg.Durl, - DiscoveryProxy: cfg.Dproxy, - NewCluster: cfg.IsNewCluster(), - PeerTLSInfo: cfg.PeerTLSInfo, - TickMs: cfg.TickMs, - ElectionTicks: cfg.ElectionTicks(), - InitialElectionTickAdvance: cfg.InitialElectionTickAdvance, - AutoCompactionRetention: autoCompactionRetention, - AutoCompactionMode: cfg.AutoCompactionMode, - QuotaBackendBytes: cfg.QuotaBackendBytes, - BackendBatchLimit: cfg.BackendBatchLimit, - BackendFreelistType: backendFreelistType, - BackendBatchInterval: cfg.BackendBatchInterval, - MaxTxnOps: cfg.MaxTxnOps, - MaxRequestBytes: cfg.MaxRequestBytes, - StrictReconfigCheck: cfg.StrictReconfigCheck, - ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, - AuthToken: cfg.AuthToken, - BcryptCost: cfg.BcryptCost, - TokenTTL: cfg.AuthTokenTTL, - CORS: cfg.CORS, - HostWhitelist: cfg.HostWhitelist, - InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, - CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, - PreVote: cfg.PreVote, - Logger: cfg.logger, - LoggerConfig: cfg.loggerConfig, - LoggerCore: cfg.loggerCore, - LoggerWriteSyncer: cfg.loggerWriteSyncer, - ForceNewCluster: cfg.ForceNewCluster, - EnableGRPCGateway: cfg.EnableGRPCGateway, - UnsafeNoFsync: cfg.UnsafeNoFsync, - EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, - CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, + Name: cfg.Name, + ClientURLs: cfg.ACUrls, + PeerURLs: cfg.APUrls, + DataDir: cfg.Dir, + DedicatedWALDir: cfg.WalDir, + SnapshotCount: cfg.SnapshotCount, + SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries, + MaxSnapFiles: cfg.MaxSnapFiles, + MaxWALFiles: cfg.MaxWalFiles, + InitialPeerURLsMap: urlsmap, + InitialClusterToken: token, + DiscoveryURL: cfg.Durl, + DiscoveryProxy: cfg.Dproxy, + NewCluster: cfg.IsNewCluster(), + PeerTLSInfo: cfg.PeerTLSInfo, + TickMs: cfg.TickMs, + ElectionTicks: cfg.ElectionTicks(), + InitialElectionTickAdvance: cfg.InitialElectionTickAdvance, + AutoCompactionRetention: autoCompactionRetention, + AutoCompactionMode: cfg.AutoCompactionMode, + QuotaBackendBytes: cfg.QuotaBackendBytes, + BackendBatchLimit: cfg.BackendBatchLimit, + BackendFreelistType: backendFreelistType, + BackendBatchInterval: cfg.BackendBatchInterval, + MaxTxnOps: cfg.MaxTxnOps, + MaxRequestBytes: cfg.MaxRequestBytes, + StrictReconfigCheck: cfg.StrictReconfigCheck, + ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, + AuthToken: cfg.AuthToken, + BcryptCost: cfg.BcryptCost, + TokenTTL: cfg.AuthTokenTTL, + CORS: cfg.CORS, + HostWhitelist: cfg.HostWhitelist, + InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, + CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, + PreVote: cfg.PreVote, + Logger: cfg.logger, + LoggerConfig: cfg.loggerConfig, + LoggerCore: cfg.loggerCore, + LoggerWriteSyncer: cfg.loggerWriteSyncer, + ForceNewCluster: cfg.ForceNewCluster, + EnableGRPCGateway: cfg.EnableGRPCGateway, + UnsafeNoFsync: cfg.UnsafeNoFsync, + EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, + CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, + WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, } print(e.cfg.logger, *cfg, srvcfg, memberInitialized) if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { diff --git a/etcdmain/config.go b/etcdmain/config.go index 3fc21bc94..7d0c7d7b2 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -251,6 +251,7 @@ func newConfig() *config { fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.") fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.") fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.") + fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.") // unsafe fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.") diff --git a/etcdmain/help.go b/etcdmain/help.go index 320065f4a..6784cd567 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -210,6 +210,8 @@ Experimental feature: ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch. --experimental-peer-skip-client-san-verification 'false' Skip verification of SAN field in client certificate for peer connections. + --experimental-watch-progress-notify-interval '10m' + Duration of periodical watch progress notification. Unsafe feature: --force-new-cluster 'false' diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index c623d3d94..2144779fc 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -31,6 +31,8 @@ import ( "go.uber.org/zap" ) +const minWatchProgressInterval = 100 * time.Millisecond + type watchServer struct { lg *zap.Logger @@ -61,6 +63,16 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer { if srv.lg == nil { srv.lg = zap.NewNop() } + if s.Cfg.WatchProgressNotifyInterval > 0 { + if s.Cfg.WatchProgressNotifyInterval < minWatchProgressInterval { + srv.lg.Warn( + "adjusting watch progress notify interval to minimum period", + zap.Duration("min-watch-progress-notify-interval", minWatchProgressInterval), + ) + s.Cfg.WatchProgressNotifyInterval = minWatchProgressInterval + } + SetProgressReportInterval(s.Cfg.WatchProgressNotifyInterval) + } return srv } diff --git a/etcdserver/config.go b/etcdserver/config.go index 20f0d4647..773333756 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -157,6 +157,8 @@ type ServerConfig struct { EnableGRPCGateway bool + WatchProgressNotifyInterval time.Duration + // UnsafeNoFsync disables all uses of fsync. // Setting this is unsafe and will cause data loss. UnsafeNoFsync bool `json:"unsafe-no-fsync"`