*: add experimental flag for watch notify interval
parent
06f89cc4f8
commit
9a698476bf
|
@ -281,8 +281,9 @@ type Config struct {
|
||||||
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
|
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
|
||||||
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
|
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
|
||||||
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
|
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
|
||||||
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
|
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
|
||||||
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
||||||
|
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
|
||||||
|
|
||||||
// ForceNewCluster starts a new cluster even if previously started; unsafe.
|
// ForceNewCluster starts a new cluster even if previously started; unsafe.
|
||||||
ForceNewCluster bool `json:"force-new-cluster"`
|
ForceNewCluster bool `json:"force-new-cluster"`
|
||||||
|
|
|
@ -155,51 +155,52 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||||
backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)
|
backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)
|
||||||
|
|
||||||
srvcfg := etcdserver.ServerConfig{
|
srvcfg := etcdserver.ServerConfig{
|
||||||
Name: cfg.Name,
|
Name: cfg.Name,
|
||||||
ClientURLs: cfg.ACUrls,
|
ClientURLs: cfg.ACUrls,
|
||||||
PeerURLs: cfg.APUrls,
|
PeerURLs: cfg.APUrls,
|
||||||
DataDir: cfg.Dir,
|
DataDir: cfg.Dir,
|
||||||
DedicatedWALDir: cfg.WalDir,
|
DedicatedWALDir: cfg.WalDir,
|
||||||
SnapshotCount: cfg.SnapshotCount,
|
SnapshotCount: cfg.SnapshotCount,
|
||||||
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
|
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
|
||||||
MaxSnapFiles: cfg.MaxSnapFiles,
|
MaxSnapFiles: cfg.MaxSnapFiles,
|
||||||
MaxWALFiles: cfg.MaxWalFiles,
|
MaxWALFiles: cfg.MaxWalFiles,
|
||||||
InitialPeerURLsMap: urlsmap,
|
InitialPeerURLsMap: urlsmap,
|
||||||
InitialClusterToken: token,
|
InitialClusterToken: token,
|
||||||
DiscoveryURL: cfg.Durl,
|
DiscoveryURL: cfg.Durl,
|
||||||
DiscoveryProxy: cfg.Dproxy,
|
DiscoveryProxy: cfg.Dproxy,
|
||||||
NewCluster: cfg.IsNewCluster(),
|
NewCluster: cfg.IsNewCluster(),
|
||||||
PeerTLSInfo: cfg.PeerTLSInfo,
|
PeerTLSInfo: cfg.PeerTLSInfo,
|
||||||
TickMs: cfg.TickMs,
|
TickMs: cfg.TickMs,
|
||||||
ElectionTicks: cfg.ElectionTicks(),
|
ElectionTicks: cfg.ElectionTicks(),
|
||||||
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
|
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
|
||||||
AutoCompactionRetention: autoCompactionRetention,
|
AutoCompactionRetention: autoCompactionRetention,
|
||||||
AutoCompactionMode: cfg.AutoCompactionMode,
|
AutoCompactionMode: cfg.AutoCompactionMode,
|
||||||
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
||||||
BackendBatchLimit: cfg.BackendBatchLimit,
|
BackendBatchLimit: cfg.BackendBatchLimit,
|
||||||
BackendFreelistType: backendFreelistType,
|
BackendFreelistType: backendFreelistType,
|
||||||
BackendBatchInterval: cfg.BackendBatchInterval,
|
BackendBatchInterval: cfg.BackendBatchInterval,
|
||||||
MaxTxnOps: cfg.MaxTxnOps,
|
MaxTxnOps: cfg.MaxTxnOps,
|
||||||
MaxRequestBytes: cfg.MaxRequestBytes,
|
MaxRequestBytes: cfg.MaxRequestBytes,
|
||||||
StrictReconfigCheck: cfg.StrictReconfigCheck,
|
StrictReconfigCheck: cfg.StrictReconfigCheck,
|
||||||
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
|
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
|
||||||
AuthToken: cfg.AuthToken,
|
AuthToken: cfg.AuthToken,
|
||||||
BcryptCost: cfg.BcryptCost,
|
BcryptCost: cfg.BcryptCost,
|
||||||
TokenTTL: cfg.AuthTokenTTL,
|
TokenTTL: cfg.AuthTokenTTL,
|
||||||
CORS: cfg.CORS,
|
CORS: cfg.CORS,
|
||||||
HostWhitelist: cfg.HostWhitelist,
|
HostWhitelist: cfg.HostWhitelist,
|
||||||
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
|
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
|
||||||
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
|
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
|
||||||
PreVote: cfg.PreVote,
|
PreVote: cfg.PreVote,
|
||||||
Logger: cfg.logger,
|
Logger: cfg.logger,
|
||||||
LoggerConfig: cfg.loggerConfig,
|
LoggerConfig: cfg.loggerConfig,
|
||||||
LoggerCore: cfg.loggerCore,
|
LoggerCore: cfg.loggerCore,
|
||||||
LoggerWriteSyncer: cfg.loggerWriteSyncer,
|
LoggerWriteSyncer: cfg.loggerWriteSyncer,
|
||||||
ForceNewCluster: cfg.ForceNewCluster,
|
ForceNewCluster: cfg.ForceNewCluster,
|
||||||
EnableGRPCGateway: cfg.EnableGRPCGateway,
|
EnableGRPCGateway: cfg.EnableGRPCGateway,
|
||||||
UnsafeNoFsync: cfg.UnsafeNoFsync,
|
UnsafeNoFsync: cfg.UnsafeNoFsync,
|
||||||
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
||||||
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
||||||
|
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
||||||
}
|
}
|
||||||
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
|
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
|
||||||
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
||||||
|
|
|
@ -251,6 +251,7 @@ func newConfig() *config {
|
||||||
fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")
|
fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")
|
||||||
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
|
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
|
||||||
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
|
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
|
||||||
|
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
|
||||||
|
|
||||||
// unsafe
|
// unsafe
|
||||||
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
|
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
|
||||||
|
|
|
@ -210,6 +210,8 @@ Experimental feature:
|
||||||
ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch.
|
ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch.
|
||||||
--experimental-peer-skip-client-san-verification 'false'
|
--experimental-peer-skip-client-san-verification 'false'
|
||||||
Skip verification of SAN field in client certificate for peer connections.
|
Skip verification of SAN field in client certificate for peer connections.
|
||||||
|
--experimental-watch-progress-notify-interval '10m'
|
||||||
|
Duration of periodical watch progress notification.
|
||||||
|
|
||||||
Unsafe feature:
|
Unsafe feature:
|
||||||
--force-new-cluster 'false'
|
--force-new-cluster 'false'
|
||||||
|
|
|
@ -31,6 +31,8 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const minWatchProgressInterval = 100 * time.Millisecond
|
||||||
|
|
||||||
type watchServer struct {
|
type watchServer struct {
|
||||||
lg *zap.Logger
|
lg *zap.Logger
|
||||||
|
|
||||||
|
@ -61,6 +63,16 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
|
||||||
if srv.lg == nil {
|
if srv.lg == nil {
|
||||||
srv.lg = zap.NewNop()
|
srv.lg = zap.NewNop()
|
||||||
}
|
}
|
||||||
|
if s.Cfg.WatchProgressNotifyInterval > 0 {
|
||||||
|
if s.Cfg.WatchProgressNotifyInterval < minWatchProgressInterval {
|
||||||
|
srv.lg.Warn(
|
||||||
|
"adjusting watch progress notify interval to minimum period",
|
||||||
|
zap.Duration("min-watch-progress-notify-interval", minWatchProgressInterval),
|
||||||
|
)
|
||||||
|
s.Cfg.WatchProgressNotifyInterval = minWatchProgressInterval
|
||||||
|
}
|
||||||
|
SetProgressReportInterval(s.Cfg.WatchProgressNotifyInterval)
|
||||||
|
}
|
||||||
return srv
|
return srv
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -157,6 +157,8 @@ type ServerConfig struct {
|
||||||
|
|
||||||
EnableGRPCGateway bool
|
EnableGRPCGateway bool
|
||||||
|
|
||||||
|
WatchProgressNotifyInterval time.Duration
|
||||||
|
|
||||||
// UnsafeNoFsync disables all uses of fsync.
|
// UnsafeNoFsync disables all uses of fsync.
|
||||||
// Setting this is unsafe and will cause data loss.
|
// Setting this is unsafe and will cause data loss.
|
||||||
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
|
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
|
||||||
|
|
Loading…
Reference in New Issue