support v3 discovery to bootstrap a new etcd cluster

dependabot/go_modules/go.uber.org/atomic-1.10.0
ahrtr 2022-01-22 08:35:36 +08:00
parent 6105a6f0e8
commit ebc86d12c0
13 changed files with 1618 additions and 25 deletions

View File

@ -12,6 +12,10 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
- `etcd` will no longer start on data dir created by newer versions (for example etcd v3.6 will not run on v3.7+ data dir). To downgrade data dir please check out `etcdutl migrate` command.
### Deprecations
- Deprecated [V2 discovery](https://etcd.io/docs/v3.5/dev-internal/discovery_protocol/).
### etcdctl v3
- Add command to generate [shell completion](https://github.com/etcd-io/etcd/pull/13133).
@ -38,6 +42,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
- Add [`etcd --experimental-enable-lease-checkpoint-persist`](https://github.com/etcd-io/etcd/pull/13508) flag to handle upgrade from v3.5.2 clusters with this feature enabled.
- Add [`etcdctl make-mirror --rev`](https://github.com/etcd-io/etcd/pull/13519) flag to support incremental mirror.
- Add [`etcd --experimental-wait-cluster-ready-timeout`](https://github.com/etcd-io/etcd/pull/13525) flag to wait for cluster to be ready before serving client requests.
- Add [v3 discovery](https://github.com/etcd-io/etcd/pull/13635) to bootstrap a new etcd cluster.
- Fix [non mutating requests pass through quotaKVServer when NOSPACE](https://github.com/etcd-io/etcd/pull/13435)
- Fix [exclude the same alarm type activated by multiple peers](https://github.com/etcd-io/etcd/pull/13467).
- Fix [Provide a better liveness probe for when etcd runs as a Kubernetes pod](https://github.com/etcd-io/etcd/pull/13399)

View File

@ -25,6 +25,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/netutil"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
@ -34,12 +35,16 @@ import (
// ServerConfig holds the configuration of etcd as taken from the command line or discovery.
type ServerConfig struct {
Name string
DiscoveryURL string
DiscoveryProxy string
ClientURLs types.URLs
PeerURLs types.URLs
DataDir string
Name string
EnableV2Discovery bool
DiscoveryURL string
DiscoveryProxy string
DiscoveryCfg v3discovery.DiscoveryConfig
ClientURLs types.URLs
PeerURLs types.URLs
DataDir string
// DedicatedWALDir config will make the etcd to write the WAL to the WALDir
// rather than the dataDir/member/wal.
DedicatedWALDir string

View File

@ -15,6 +15,7 @@
package embed
import (
"errors"
"fmt"
"net"
"net/http"
@ -36,6 +37,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery"
bolt "go.etcd.io/bbolt"
"go.uber.org/multierr"
@ -62,6 +64,11 @@ const (
DefaultDowngradeCheckTime = 5 * time.Second
DefaultWaitClusterReadyTimeout = 5 * time.Second
DefaultDiscoveryDialTimeout = 2 * time.Second
DefaultDiscoveryRequestTimeOut = 5 * time.Second
DefaultDiscoveryKeepAliveTime = 2 * time.Second
DefaultDiscoveryKeepAliveTimeOut = 6 * time.Second
DefaultListenPeerURLs = "http://localhost:2380"
DefaultListenClientURLs = "http://localhost:2379"
@ -88,6 +95,8 @@ const (
// It's enabled by default.
DefaultStrictReconfigCheck = true
DefaultEnableV2Discovery = true
// maxElectionMs specifies the maximum value of election timeout.
// More details are listed in ../Documentation/tuning.md#time-parameters.
maxElectionMs = 50000
@ -213,11 +222,15 @@ type Config struct {
// Note that cipher suites are prioritized in the given order.
CipherSuites []string `json:"cipher-suites"`
ClusterState string `json:"initial-cluster-state"`
DNSCluster string `json:"discovery-srv"`
DNSClusterServiceName string `json:"discovery-srv-name"`
Dproxy string `json:"discovery-proxy"`
Durl string `json:"discovery"`
ClusterState string `json:"initial-cluster-state"`
DNSCluster string `json:"discovery-srv"`
DNSClusterServiceName string `json:"discovery-srv-name"`
Dproxy string `json:"discovery-proxy"`
EnableV2Discovery bool `json:"enable-v2-discovery"`
Durl string `json:"discovery"`
DiscoveryCfg v3discovery.DiscoveryConfig `json:"discovery-config"`
InitialCluster string `json:"initial-cluster"`
InitialClusterToken string `json:"initial-cluster-token"`
StrictReconfigCheck bool `json:"strict-reconfig-check"`
@ -504,6 +517,14 @@ func NewConfig() *Config {
ExperimentalMaxLearners: membership.DefaultMaxLearners,
V2Deprecation: config.V2_DEPR_DEFAULT,
EnableV2Discovery: DefaultEnableV2Discovery,
DiscoveryCfg: v3discovery.DiscoveryConfig{
DialTimeout: DefaultDiscoveryDialTimeout,
RequestTimeOut: DefaultDiscoveryRequestTimeOut,
KeepAliveTime: DefaultDiscoveryKeepAliveTime,
KeepAliveTimeout: DefaultDiscoveryKeepAliveTimeOut,
},
}
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
return cfg
@ -667,6 +688,22 @@ func (cfg *Config) Validate() error {
return ErrConflictBootstrapFlags
}
// Check if both v2 discovery and v3 discovery flags are passed.
v2discoveryFlagsExist := cfg.Dproxy != ""
v3discoveryFlagsExist := cfg.DiscoveryCfg.CertFile != "" ||
cfg.DiscoveryCfg.KeyFile != "" ||
cfg.DiscoveryCfg.TrustedCAFile != "" ||
cfg.DiscoveryCfg.User != "" ||
cfg.DiscoveryCfg.Password != ""
if cfg.EnableV2Discovery && v3discoveryFlagsExist {
return errors.New("v2 discovery is enabled, but some v3 discovery " +
"settings (discovery-cert, discovery-key, discovery-cacert, " +
"discovery-user, discovery-password) are set")
}
if !cfg.EnableV2Discovery && v2discoveryFlagsExist {
return errors.New("v3 discovery is enabled, but --discovery-proxy is set")
}
if cfg.TickMs == 0 {
return fmt.Errorf("--heartbeat-interval must be >0 (set to %dms)", cfg.TickMs)
}

View File

@ -175,8 +175,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
EnableV2Discovery: cfg.EnableV2Discovery,
DiscoveryURL: cfg.Durl,
DiscoveryProxy: cfg.Dproxy,
DiscoveryCfg: cfg.DiscoveryCfg,
NewCluster: cfg.IsNewCluster(),
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
@ -345,6 +347,18 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()),
zap.String("discovery-url", sc.DiscoveryURL),
zap.String("discovery-proxy", sc.DiscoveryProxy),
zap.String("discovery-dial-timeout", sc.DiscoveryCfg.DialTimeout.String()),
zap.String("discovery-request-timeout", sc.DiscoveryCfg.RequestTimeOut.String()),
zap.String("discovery-keepalive-time", sc.DiscoveryCfg.KeepAliveTime.String()),
zap.String("discovery-keepalive-timeout", sc.DiscoveryCfg.KeepAliveTimeout.String()),
zap.Bool("discovery-insecure-transport", sc.DiscoveryCfg.InsecureTransport),
zap.Bool("discovery-insecure-skip-tls-verify", sc.DiscoveryCfg.InsecureSkipVerify),
zap.String("discovery-cert", sc.DiscoveryCfg.CertFile),
zap.String("discovery-key", sc.DiscoveryCfg.KeyFile),
zap.String("discovery-cacert", sc.DiscoveryCfg.TrustedCAFile),
zap.String("discovery-user", sc.DiscoveryCfg.User),
zap.String("downgrade-check-interval", sc.DowngradeCheckTime.String()),
zap.Int("max-learners", sc.ExperimentalMaxLearners),
)

View File

@ -185,10 +185,23 @@ func newConfig() *config {
"advertise-client-urls",
"List of this member's client URLs to advertise to the public.",
)
fs.BoolVar(&cfg.ec.EnableV2Discovery, "enable-v2-discovery", cfg.ec.EnableV2Discovery, "Enable to bootstrap the cluster using v2 discovery. Will be deprecated in v3.7, and be decommissioned in v3.8.")
fs.StringVar(&cfg.ec.Durl, "discovery", cfg.ec.Durl, "Discovery URL used to bootstrap the cluster.")
fs.Var(cfg.cf.fallback, "discovery-fallback", fmt.Sprintf("Valid values include %q", cfg.cf.fallback.Valids()))
fs.StringVar(&cfg.ec.Dproxy, "discovery-proxy", cfg.ec.Dproxy, "HTTP proxy to use for traffic to discovery service.")
fs.DurationVar(&cfg.ec.DiscoveryCfg.DialTimeout, "discovery-dial-timeout", cfg.ec.DiscoveryCfg.DialTimeout, "V3 discovery: dial timeout for client connections.")
fs.DurationVar(&cfg.ec.DiscoveryCfg.RequestTimeOut, "discovery-request-timeout", cfg.ec.DiscoveryCfg.RequestTimeOut, "V3 discovery: timeout for discovery requests (excluding dial timeout).")
fs.DurationVar(&cfg.ec.DiscoveryCfg.KeepAliveTime, "discovery-keepalive-time", cfg.ec.DiscoveryCfg.KeepAliveTime, "V3 discovery: keepalive time for client connections.")
fs.DurationVar(&cfg.ec.DiscoveryCfg.KeepAliveTimeout, "discovery-keepalive-timeout", cfg.ec.DiscoveryCfg.KeepAliveTimeout, "V3 discovery: keepalive timeout for client connections.")
fs.BoolVar(&cfg.ec.DiscoveryCfg.InsecureTransport, "discovery-insecure-transport", true, "V3 discovery: disable transport security for client connections.")
fs.BoolVar(&cfg.ec.DiscoveryCfg.InsecureSkipVerify, "discovery-insecure-skip-tls-verify", false, "V3 discovery: skip server certificate verification (CAUTION: this option should be enabled only for testing purposes).")
fs.StringVar(&cfg.ec.DiscoveryCfg.CertFile, "discovery-cert", "", "V3 discovery: identify secure client using this TLS certificate file.")
fs.StringVar(&cfg.ec.DiscoveryCfg.KeyFile, "discovery-key", "", "V3 discovery: identify secure client using this TLS key file.")
fs.StringVar(&cfg.ec.DiscoveryCfg.TrustedCAFile, "discovery-cacert", "", "V3 discovery: verify certificates of TLS-enabled secure servers using this CA bundle.")
fs.StringVar(&cfg.ec.DiscoveryCfg.User, "discovery-user", "", "V3 discovery: username[:password] for authentication (prompt if password is not supplied).")
fs.StringVar(&cfg.ec.DiscoveryCfg.Password, "discovery-password", "", "V3 discovery: password for authentication (if this option is used, --user option shouldn't include password).")
fs.StringVar(&cfg.ec.Dproxy, "discovery-proxy", cfg.ec.Dproxy, "HTTP proxy to use for traffic to discovery service. Will be deprecated in v3.7, and be decommissioned in v3.8.")
fs.StringVar(&cfg.ec.DNSCluster, "discovery-srv", cfg.ec.DNSCluster, "DNS domain used to bootstrap initial cluster.")
fs.StringVar(&cfg.ec.DNSClusterServiceName, "discovery-srv-name", cfg.ec.DNSClusterServiceName, "Service name to query when using DNS discovery.")
fs.StringVar(&cfg.ec.InitialCluster, "initial-cluster", cfg.ec.InitialCluster, "Initial cluster configuration for bootstrapping.")

View File

@ -17,6 +17,7 @@ package etcdmain
import (
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
@ -34,6 +35,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery"
"go.etcd.io/etcd/server/v3/proxy/httpproxy"
"go.uber.org/zap"
@ -318,7 +320,12 @@ func startProxy(cfg *config) error {
if cfg.ec.Durl != "" {
var s string
s, err = v2discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy)
if cfg.ec.EnableV2Discovery {
lg.Warn("V2 discovery is deprecated!")
s, err = v2discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy)
} else {
s, err = v3discovery.GetCluster(lg, cfg.ec.Durl, &cfg.ec.DiscoveryCfg)
}
if err != nil {
return err
}

View File

@ -104,13 +104,37 @@ Clustering:
--advertise-client-urls 'http://localhost:2379'
List of this member's client URLs to advertise to the public.
The client URLs advertised should be accessible to machines that talk to etcd cluster. etcd client libraries parse these URLs to connect to the cluster.
--enable-v2-discovery 'true'
Enable to bootstrap the cluster using v2 discovery. Will be deprecated in v3.7, and be decommissioned in v3.8.
--discovery ''
Discovery URL used to bootstrap the cluster.
--discovery-dial-timeout '2s'
V3 discovery: dial timeout for client connections.
--discovery-request-timeout '5s'
V3 discovery: timeout for discovery requests (excluding dial timeout).
--discovery-keepalive-time '2s'
V3 discovery: keepalive time for client connections.
--discovery-keepalive-timeout '6s'
V3 discovery: keepalive timeout for client connections.
--discovery-insecure-transport 'true'
V3 discovery: disable transport security for client connections.
--discovery-insecure-skip-tls-verify 'false'
V3 discovery: skip server certificate verification (CAUTION: this option should be enabled only for testing purposes).
--discovery-cert ''
V3 discovery: identify secure client using this TLS certificate file.
--discovery-key ''
V3 discovery: identify secure client using this TLS key file.
--discovery-cacert ''
V3 discovery: verify certificates of TLS-enabled secure servers using this CA bundle.
--discovery-user ''
V3 discovery: username[:password] for authentication (prompt if password is not supplied).
--discovery-password ''
V3 discovery: password for authentication (if this option is used, --user option shouldn't include password).
--discovery-fallback 'proxy'
Expected behavior ('exit' or 'proxy') when discovery services fails.
"proxy" supports v2 API only.
--discovery-proxy ''
HTTP proxy to use for traffic to discovery service.
HTTP proxy to use for traffic to discovery service. Will be deprecated in v3.7, and be decommissioned in v3.8.
--discovery-srv ''
DNS srv domain used to bootstrap the cluster.
--discovery-srv-name ''

View File

@ -0,0 +1,580 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package v3discovery provides an implementation of the cluster discovery that
// is used by etcd with v3 client.
package v3discovery
import (
"context"
"crypto/tls"
"errors"
"math"
"net/url"
"path"
"sort"
"strconv"
"strings"
"time"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/client/v3"
"github.com/jonboulle/clockwork"
"go.uber.org/zap"
)
const (
discoveryPrefix = "/_etcd/registry"
)
var (
ErrInvalidURL = errors.New("discovery: invalid peer URL")
ErrBadSizeKey = errors.New("discovery: size key is bad")
ErrSizeNotFound = errors.New("discovery: size key not found")
ErrFullCluster = errors.New("discovery: cluster is full")
ErrTooManyRetries = errors.New("discovery: too many retries")
)
var (
// Number of retries discovery will attempt before giving up and error out.
nRetries = uint(math.MaxUint32)
maxExponentialRetries = uint(8)
)
type DiscoveryConfig struct {
Url string `json:"discovery"`
DialTimeout time.Duration `json:"discovery-dial-timeout"`
RequestTimeOut time.Duration `json:"discovery-request-timeout"`
KeepAliveTime time.Duration `json:"discovery-keepalive-time"`
KeepAliveTimeout time.Duration `json:"discovery-keepalive-timeout"`
InsecureTransport bool `json:"discovery-insecure-transport"`
InsecureSkipVerify bool `json:"discovery-insecure-skip-tls-verify"`
CertFile string `json:"discovery-cert"`
KeyFile string `json:"discovery-key"`
TrustedCAFile string `json:"discovery-cacert"`
User string `json:"discovery-user"`
Password string `json:"discovery-password"`
}
type memberInfo struct {
// peerRegKey is the key used by the member when registering in the
// discovery service.
// Format: "/_etcd/registry/<ClusterToken>/members/<memberId>".
peerRegKey string
// peerURLsMap format: "peerName=peerURLs", i.e., "member1=http://127.0.0.1:2380".
peerURLsMap string
// createRev is the member's CreateRevision in the etcd cluster backing
// the discovery service.
createRev int64
}
type clusterInfo struct {
clusterToken string
members []memberInfo
}
// key prefix for each cluster: "/_etcd/registry/<ClusterToken>".
func geClusterKeyPrefix(cluster string) string {
return path.Join(discoveryPrefix, cluster)
}
// key format for cluster size: "/_etcd/registry/<ClusterToken>/_config/size".
func geClusterSizeKey(cluster string) string {
return path.Join(geClusterKeyPrefix(cluster), "_config/size")
}
// key prefix for each member: "/_etcd/registry/<ClusterToken>/members".
func getMemberKeyPrefix(clusterToken string) string {
return path.Join(geClusterKeyPrefix(clusterToken), "members")
}
// key format for each member: "/_etcd/registry/<ClusterToken>/members/<memberId>".
func getMemberKey(cluster, memberId string) string {
return path.Join(getMemberKeyPrefix(cluster), memberId)
}
// GetCluster will connect to the discovery service at the given url and
// retrieve a string describing the cluster
func GetCluster(lg *zap.Logger, dUrl string, cfg *DiscoveryConfig) (cs string, rerr error) {
d, err := newDiscovery(lg, dUrl, cfg, 0)
if err != nil {
return "", err
}
defer d.close()
defer func() {
if rerr != nil {
d.lg.Error(
"discovery failed to get cluster",
zap.String("cluster", cs),
zap.Error(rerr),
)
} else {
d.lg.Info(
"discovery got cluster successfully",
zap.String("cluster", cs),
)
}
}()
return d.getCluster()
}
// JoinCluster will connect to the discovery service at the given url, and
// register the server represented by the given id and config to the cluster.
// The parameter `config` is supposed to be in the format "memberName=peerURLs",
// such as "member1=http://127.0.0.1:2380".
//
// The final returned string has the same format as "--initial-cluster", such as
// "infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380".
func JoinCluster(lg *zap.Logger, durl string, cfg *DiscoveryConfig, id types.ID, config string) (cs string, rerr error) {
d, err := newDiscovery(lg, durl, cfg, id)
if err != nil {
return "", err
}
defer d.close()
defer func() {
if rerr != nil {
d.lg.Error(
"discovery failed to join cluster",
zap.String("cluster", cs),
zap.Error(rerr),
)
} else {
d.lg.Info(
"discovery joined cluster successfully",
zap.String("cluster", cs),
)
}
}()
return d.joinCluster(config)
}
type discovery struct {
lg *zap.Logger
clusterToken string
memberId types.ID
c *clientv3.Client
retries uint
durl string
cfg *DiscoveryConfig
clock clockwork.Clock
}
func newDiscovery(lg *zap.Logger, durl string, dcfg *DiscoveryConfig, id types.ID) (*discovery, error) {
if lg == nil {
lg = zap.NewNop()
}
u, err := url.Parse(durl)
if err != nil {
return nil, err
}
token := u.Path
u.Path = ""
lg = lg.With(zap.String("discovery-url", durl))
cfg, err := newClientCfg(dcfg, u.String(), lg)
if err != nil {
return nil, err
}
c, err := clientv3.New(*cfg)
if err != nil {
return nil, err
}
return &discovery{
lg: lg,
clusterToken: token,
memberId: id,
c: c,
durl: u.String(),
cfg: dcfg,
clock: clockwork.NewRealClock(),
}, nil
}
// The following function follows the same logic as etcdctl, refer to
// https://github.com/etcd-io/etcd/blob/f9a8c49c695b098d66a07948666664ea10d01a82/etcdctl/ctlv3/command/global.go#L191-L250
func newClientCfg(dcfg *DiscoveryConfig, dUrl string, lg *zap.Logger) (*clientv3.Config, error) {
var cfgtls *transport.TLSInfo
if dcfg.CertFile != "" || dcfg.KeyFile != "" || dcfg.TrustedCAFile != "" {
cfgtls = &transport.TLSInfo{
CertFile: dcfg.CertFile,
KeyFile: dcfg.KeyFile,
TrustedCAFile: dcfg.TrustedCAFile,
Logger: lg,
}
}
cfg := &clientv3.Config{
Endpoints: []string{dUrl},
DialTimeout: dcfg.DialTimeout,
DialKeepAliveTime: dcfg.KeepAliveTime,
DialKeepAliveTimeout: dcfg.KeepAliveTimeout,
Username: dcfg.User,
Password: dcfg.Password,
}
if cfgtls != nil {
if clientTLS, err := cfgtls.ClientConfig(); err == nil {
cfg.TLS = clientTLS
} else {
return nil, err
}
}
// If key/cert is not given but user wants secure connection, we
// should still setup an empty tls configuration for gRPC to setup
// secure connection.
if cfg.TLS == nil && !dcfg.InsecureTransport {
cfg.TLS = &tls.Config{}
}
// If the user wants to skip TLS verification then we should set
// the InsecureSkipVerify flag in tls configuration.
if cfg.TLS != nil && dcfg.InsecureSkipVerify {
cfg.TLS.InsecureSkipVerify = true
}
return cfg, nil
}
func (d *discovery) getCluster() (string, error) {
cls, clusterSize, rev, err := d.checkCluster()
if err != nil {
if err == ErrFullCluster {
return cls.getInitClusterStr(clusterSize)
}
return "", err
}
for cls.Len() < clusterSize {
d.waitPeers(cls, clusterSize, rev)
}
return cls.getInitClusterStr(clusterSize)
}
func (d *discovery) joinCluster(config string) (string, error) {
_, _, _, err := d.checkCluster()
if err != nil {
return "", err
}
if err := d.registerSelf(config); err != nil {
return "", err
}
cls, clusterSize, rev, err := d.checkCluster()
if err != nil {
return "", err
}
for cls.Len() < clusterSize {
d.waitPeers(cls, clusterSize, rev)
}
return cls.getInitClusterStr(clusterSize)
}
func (d *discovery) getClusterSize() (int, error) {
configKey := geClusterSizeKey(d.clusterToken)
ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RequestTimeOut)
defer cancel()
resp, err := d.c.Get(ctx, configKey)
if err != nil {
d.lg.Warn(
"failed to get cluster size from discovery service",
zap.String("clusterSizeKey", configKey),
zap.Error(err),
)
return 0, err
}
if len(resp.Kvs) == 0 {
return 0, ErrSizeNotFound
}
clusterSize, err := strconv.ParseInt(string(resp.Kvs[0].Value), 10, 0)
if err != nil || clusterSize <= 0 {
return 0, ErrBadSizeKey
}
return int(clusterSize), nil
}
func (d *discovery) getClusterMembers() (*clusterInfo, int64, error) {
membersKeyPrefix := getMemberKeyPrefix(d.clusterToken)
ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RequestTimeOut)
defer cancel()
resp, err := d.c.Get(ctx, membersKeyPrefix, clientv3.WithPrefix())
if err != nil {
d.lg.Warn(
"failed to get cluster members from discovery service",
zap.String("membersKeyPrefix", membersKeyPrefix),
zap.Error(err),
)
return nil, 0, err
}
cls := &clusterInfo{clusterToken: d.clusterToken}
for _, kv := range resp.Kvs {
mKey := strings.TrimSpace(string(kv.Key))
mValue := strings.TrimSpace(string(kv.Value))
if err := cls.add(mKey, mValue, kv.CreateRevision); err != nil {
d.lg.Warn(
err.Error(),
zap.String("memberKey", mKey),
zap.String("memberInfo", mValue),
)
} else {
d.lg.Info(
"found peer from discovery service",
zap.String("memberKey", mKey),
zap.String("memberInfo", mValue),
)
}
}
return cls, resp.Header.Revision, nil
}
func (d *discovery) checkClusterRetry() (*clusterInfo, int, int64, error) {
if d.retries < nRetries {
d.logAndBackoffForRetry("cluster status check")
return d.checkCluster()
}
return nil, 0, 0, ErrTooManyRetries
}
func (d *discovery) checkCluster() (*clusterInfo, int, int64, error) {
clusterSize, err := d.getClusterSize()
if err != nil {
if err == ErrSizeNotFound || err == ErrBadSizeKey {
return nil, 0, 0, err
}
return d.checkClusterRetry()
}
cls, rev, err := d.getClusterMembers()
if err != nil {
return d.checkClusterRetry()
}
d.retries = 0
// find self position
memberSelfId := getMemberKey(d.clusterToken, d.memberId.String())
idx := 0
for _, m := range cls.members {
if m.peerRegKey == memberSelfId {
break
}
if idx >= clusterSize-1 {
return cls, clusterSize, rev, ErrFullCluster
}
idx++
}
return cls, clusterSize, rev, nil
}
func (d *discovery) registerSelfRetry(contents string) error {
if d.retries < nRetries {
d.logAndBackoffForRetry("register member itself")
return d.registerSelf(contents)
}
return ErrTooManyRetries
}
func (d *discovery) registerSelf(contents string) error {
ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RequestTimeOut)
memberKey := getMemberKey(d.clusterToken, d.memberId.String())
_, err := d.c.Put(ctx, memberKey, contents)
cancel()
if err != nil {
d.lg.Warn(
"failed to register members itself to the discovery service",
zap.String("memberKey", memberKey),
zap.Error(err),
)
return d.registerSelfRetry(contents)
}
d.retries = 0
d.lg.Info(
"register member itself successfully",
zap.String("memberKey", memberKey),
zap.String("memberInfo", contents),
)
return nil
}
func (d *discovery) waitPeers(cls *clusterInfo, clusterSize int, rev int64) {
// watch from the next revision
membersKeyPrefix := getMemberKeyPrefix(d.clusterToken)
w := d.c.Watch(context.Background(), membersKeyPrefix, clientv3.WithPrefix(), clientv3.WithRev(rev+1))
d.lg.Info(
"waiting for peers from discovery service",
zap.Int("clusterSize", clusterSize),
zap.Int("found-peers", cls.Len()),
)
// waiting for peers until all needed peers are returned
for wresp := range w {
for _, ev := range wresp.Events {
mKey := strings.TrimSpace(string(ev.Kv.Key))
mValue := strings.TrimSpace(string(ev.Kv.Value))
if err := cls.add(mKey, mValue, ev.Kv.CreateRevision); err != nil {
d.lg.Warn(
err.Error(),
zap.String("memberKey", mKey),
zap.String("memberInfo", mValue),
)
} else {
d.lg.Info(
"found peer from discovery service",
zap.String("memberKey", mKey),
zap.String("memberInfo", mValue),
)
}
}
if cls.Len() >= clusterSize {
break
}
}
d.lg.Info(
"found all needed peers from discovery service",
zap.Int("clusterSize", clusterSize),
zap.Int("found-peers", cls.Len()),
)
}
func (d *discovery) logAndBackoffForRetry(step string) {
d.retries++
// logAndBackoffForRetry stops exponential backoff when the retries are
// more than maxExpoentialRetries and is set to a constant backoff afterward.
retries := d.retries
if retries > maxExponentialRetries {
retries = maxExponentialRetries
}
retryTimeInSecond := time.Duration(0x1<<retries) * time.Second
d.lg.Warn(
"retry connecting to discovery service",
zap.String("reason", step),
zap.Duration("backoff", retryTimeInSecond),
)
d.clock.Sleep(retryTimeInSecond)
}
func (d *discovery) close() error {
if d.c != nil {
return d.c.Close()
}
return nil
}
func (cls *clusterInfo) Len() int { return len(cls.members) }
func (cls *clusterInfo) Less(i, j int) bool {
return cls.members[i].createRev < cls.members[j].createRev
}
func (cls *clusterInfo) Swap(i, j int) {
cls.members[i], cls.members[j] = cls.members[j], cls.members[i]
}
func (cls *clusterInfo) add(memberKey, memberValue string, rev int64) error {
membersKeyPrefix := getMemberKeyPrefix(cls.clusterToken)
if !strings.HasPrefix(memberKey, membersKeyPrefix) {
// It should never happen because previously we used exactly the
// same ${membersKeyPrefix} to get or watch the member list.
return errors.New("invalid peer registry key")
}
if strings.IndexRune(memberValue, '=') == -1 {
// It must be in the format "member1=http://127.0.0.1:2380".
return errors.New("invalid peer info returned from discovery service")
}
if cls.exist(memberKey) {
return errors.New("found duplicate peer from discovery service")
}
cls.members = append(cls.members, memberInfo{
peerRegKey: memberKey,
peerURLsMap: memberValue,
createRev: rev,
})
// When multiple members register at the same time, then number of
// registered members may be larger than the configured cluster size.
// So we sort all the members on the CreateRevision in ascending order,
// and get the first ${clusterSize} members in this case.
sort.Sort(cls)
return nil
}
func (cls *clusterInfo) exist(mKey string) bool {
// Usually there are just a couple of members, so performance shouldn't be a problem.
for _, m := range cls.members {
if mKey == m.peerRegKey {
return true
}
}
return false
}
func (cls *clusterInfo) getInitClusterStr(clusterSize int) (string, error) {
peerURLs := cls.getPeerURLs()
if len(peerURLs) > clusterSize {
peerURLs = peerURLs[:clusterSize]
}
us := strings.Join(peerURLs, ",")
_, err := types.NewURLsMap(us)
if err != nil {
return us, ErrInvalidURL
}
return us, nil
}
func (cls *clusterInfo) getPeerURLs() []string {
var peerURLs []string
for _, peer := range cls.members {
peerURLs = append(peerURLs, peer.peerURLsMap)
}
return peerURLs
}

View File

@ -0,0 +1,783 @@
package v3discovery
import (
"context"
"errors"
"fmt"
"testing"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/client/v3"
"github.com/jonboulle/clockwork"
"go.uber.org/zap"
)
// fakeKVForClusterSize is used to test getClusterSize.
type fakeKVForClusterSize struct {
*fakeBaseKV
clusterSizeStr string
}
// We only need to overwrite the method `Get`.
func (fkv *fakeKVForClusterSize) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
if fkv.clusterSizeStr == "" {
// cluster size isn't configured in this case.
return &clientv3.GetResponse{}, nil
}
return &clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Value: []byte(fkv.clusterSizeStr),
},
},
}, nil
}
func TestGetClusterSize(t *testing.T) {
cases := []struct {
name string
clusterSizeStr string
expectedErr error
expectedSize int
}{
{
name: "cluster size not defined",
clusterSizeStr: "",
expectedErr: ErrSizeNotFound,
},
{
name: "invalid cluster size",
clusterSizeStr: "invalidSize",
expectedErr: ErrBadSizeKey,
},
{
name: "valid cluster size",
clusterSizeStr: "3",
expectedErr: nil,
expectedSize: 3,
},
}
lg, err := zap.NewProduction()
if err != nil {
t.Errorf("Failed to create a logger, error: %v", err)
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
d := &discovery{
lg: lg,
c: &clientv3.Client{
KV: &fakeKVForClusterSize{
fakeBaseKV: &fakeBaseKV{},
clusterSizeStr: tc.clusterSizeStr,
},
},
cfg: &DiscoveryConfig{},
clusterToken: "fakeToken",
}
if cs, err := d.getClusterSize(); err != tc.expectedErr {
t.Errorf("Unexpected error, expected: %v got: %v", tc.expectedErr, err)
} else {
if err == nil && cs != tc.expectedSize {
t.Errorf("Unexpected cluster size, expected: %d got: %d", tc.expectedSize, cs)
}
}
})
}
}
// fakeKVForClusterMembers is used to test getClusterMembers.
type fakeKVForClusterMembers struct {
*fakeBaseKV
members []memberInfo
}
// We only need to overwrite method `Get`.
func (fkv *fakeKVForClusterMembers) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
kvs := memberInfoToKeyValues(fkv.members)
return &clientv3.GetResponse{
Header: &etcdserverpb.ResponseHeader{
Revision: 10,
},
Kvs: kvs,
}, nil
}
func memberInfoToKeyValues(members []memberInfo) []*mvccpb.KeyValue {
kvs := make([]*mvccpb.KeyValue, 0)
for _, mi := range members {
kvs = append(kvs, &mvccpb.KeyValue{
Key: []byte(mi.peerRegKey),
Value: []byte(mi.peerURLsMap),
CreateRevision: mi.createRev,
})
}
return kvs
}
func TestGetClusterMembers(t *testing.T) {
actualMemberInfo := []memberInfo{
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(),
peerURLsMap: "infra1=http://192.168.0.100:2380",
createRev: 8,
},
{
// invalid peer registry key
peerRegKey: "/invalidPrefix/fakeToken/members/" + types.ID(102).String(),
peerURLsMap: "infra2=http://192.168.0.102:2380",
createRev: 6,
},
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(),
peerURLsMap: "infra2=http://192.168.0.102:2380",
createRev: 6,
},
{
// invalid peer info format
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(),
peerURLsMap: "http://192.168.0.102:2380",
createRev: 6,
},
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(103).String(),
peerURLsMap: "infra3=http://192.168.0.103:2380",
createRev: 7,
},
{
// duplicate peer
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(),
peerURLsMap: "infra1=http://192.168.0.100:2380",
createRev: 2,
},
}
// sort by CreateRevision
expectedMemberInfo := []memberInfo{
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(),
peerURLsMap: "infra2=http://192.168.0.102:2380",
createRev: 6,
},
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(103).String(),
peerURLsMap: "infra3=http://192.168.0.103:2380",
createRev: 7,
},
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(),
peerURLsMap: "infra1=http://192.168.0.100:2380",
createRev: 8,
},
}
lg, err := zap.NewProduction()
if err != nil {
t.Errorf("Failed to create a logger, error: %v", err)
}
d := &discovery{
lg: lg,
c: &clientv3.Client{
KV: &fakeKVForClusterMembers{
fakeBaseKV: &fakeBaseKV{},
members: actualMemberInfo,
},
},
cfg: &DiscoveryConfig{},
clusterToken: "fakeToken",
}
clsInfo, _, err := d.getClusterMembers()
if err != nil {
t.Errorf("Failed to get cluster members, error: %v", err)
}
if clsInfo.Len() != len(expectedMemberInfo) {
t.Errorf("unexpected member count, expected: %d, got: %d", len(expectedMemberInfo), clsInfo.Len())
}
for i, m := range clsInfo.members {
if m != expectedMemberInfo[i] {
t.Errorf("unexpected member[%d], expected: %v, got: %v", i, expectedMemberInfo[i], m)
}
}
}
// fakeKVForCheckCluster is used to test checkCluster.
type fakeKVForCheckCluster struct {
*fakeBaseKV
t *testing.T
token string
clusterSizeStr string
members []memberInfo
getSizeRetries int
getMembersRetries int
}
// We only need to overwrite method `Get`.
func (fkv *fakeKVForCheckCluster) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
clusterSizeKey := fmt.Sprintf("/_etcd/registry/%s/_config/size", fkv.token)
clusterMembersKey := fmt.Sprintf("/_etcd/registry/%s/members", fkv.token)
if key == clusterSizeKey {
if fkv.getSizeRetries > 0 {
fkv.getSizeRetries--
// discovery client should retry on error.
return nil, errors.New("get cluster size failed")
}
return &clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Value: []byte(fkv.clusterSizeStr),
},
},
}, nil
} else if key == clusterMembersKey {
if fkv.getMembersRetries > 0 {
fkv.getMembersRetries--
// discovery client should retry on error.
return nil, errors.New("get cluster members failed")
}
kvs := memberInfoToKeyValues(fkv.members)
return &clientv3.GetResponse{
Header: &etcdserverpb.ResponseHeader{
Revision: 10,
},
Kvs: kvs,
}, nil
} else {
fkv.t.Errorf("unexpected key: %s", key)
return nil, fmt.Errorf("unexpected key: %s", key)
}
}
func TestCheckCluster(t *testing.T) {
actualMemberInfo := []memberInfo{
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(),
peerURLsMap: "infra1=http://192.168.0.100:2380",
createRev: 8,
},
{
// invalid peer registry key
peerRegKey: "/invalidPrefix/fakeToken/members/" + types.ID(102).String(),
peerURLsMap: "infra2=http://192.168.0.102:2380",
createRev: 6,
},
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(),
peerURLsMap: "infra2=http://192.168.0.102:2380",
createRev: 6,
},
{
// invalid peer info format
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(),
peerURLsMap: "http://192.168.0.102:2380",
createRev: 6,
},
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(103).String(),
peerURLsMap: "infra3=http://192.168.0.103:2380",
createRev: 7,
},
{
// duplicate peer
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(),
peerURLsMap: "infra1=http://192.168.0.100:2380",
createRev: 2,
},
}
// sort by CreateRevision
expectedMemberInfo := []memberInfo{
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(),
peerURLsMap: "infra2=http://192.168.0.102:2380",
createRev: 6,
},
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(103).String(),
peerURLsMap: "infra3=http://192.168.0.103:2380",
createRev: 7,
},
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(),
peerURLsMap: "infra1=http://192.168.0.100:2380",
createRev: 8,
},
}
cases := []struct {
name string
memberId types.ID
getSizeRetries int
getMembersRetries int
expectedError error
}{
{
name: "no retries",
memberId: 101,
getSizeRetries: 0,
getMembersRetries: 0,
expectedError: nil,
},
{
name: "2 retries for getClusterSize",
memberId: 102,
getSizeRetries: 2,
getMembersRetries: 0,
expectedError: nil,
},
{
name: "2 retries for getClusterMembers",
memberId: 103,
getSizeRetries: 0,
getMembersRetries: 2,
expectedError: nil,
},
{
name: "error due to cluster full",
memberId: 104,
getSizeRetries: 0,
getMembersRetries: 0,
expectedError: ErrFullCluster,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
lg, err := zap.NewProduction()
if err != nil {
t.Errorf("Failed to create a logger, error: %v", err)
}
fkv := &fakeKVForCheckCluster{
fakeBaseKV: &fakeBaseKV{},
t: t,
token: "fakeToken",
clusterSizeStr: "3",
members: actualMemberInfo,
getSizeRetries: tc.getSizeRetries,
getMembersRetries: tc.getMembersRetries,
}
d := &discovery{
lg: lg,
c: &clientv3.Client{
KV: fkv,
},
cfg: &DiscoveryConfig{},
clusterToken: "fakeToken",
memberId: tc.memberId,
clock: clockwork.NewRealClock(),
}
clsInfo, _, _, err := d.checkCluster()
if err != tc.expectedError {
t.Errorf("Unexpected error, expected: %v, got: %v", tc.expectedError, err)
}
if err == nil {
if fkv.getSizeRetries != 0 || fkv.getMembersRetries != 0 {
t.Errorf("Discovery client did not retry checking cluster on error, remaining etries: (%d, %d)", fkv.getSizeRetries, fkv.getMembersRetries)
}
if clsInfo.Len() != len(expectedMemberInfo) {
t.Errorf("Unexpected member count, expected: %d, got: %d", len(expectedMemberInfo), clsInfo.Len())
}
for mIdx, m := range clsInfo.members {
if m != expectedMemberInfo[mIdx] {
t.Errorf("Unexpected member[%d], expected: %v, got: %v", mIdx, expectedMemberInfo[mIdx], m)
}
}
}
})
}
}
// fakeKVForRegisterSelf is used to test registerSelf.
type fakeKVForRegisterSelf struct {
*fakeBaseKV
t *testing.T
expectedRegKey string
expectedRegValue string
retries int
}
// We only need to overwrite method `Put`.
func (fkv *fakeKVForRegisterSelf) Put(ctx context.Context, key string, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
if key != fkv.expectedRegKey {
fkv.t.Errorf("unexpected register key, expected: %s, got: %s", fkv.expectedRegKey, key)
}
if val != fkv.expectedRegValue {
fkv.t.Errorf("unexpected register value, expected: %s, got: %s", fkv.expectedRegValue, val)
}
if fkv.retries > 0 {
fkv.retries--
// discovery client should retry on error.
return nil, errors.New("register self failed")
}
return nil, nil
}
func TestRegisterSelf(t *testing.T) {
cases := []struct {
name string
token string
memberId types.ID
expectedRegKey string
expectedRegValue string
retries int // when retries > 0, then return an error on Put request.
}{
{
name: "no retry with token1",
token: "token1",
memberId: 101,
expectedRegKey: "/_etcd/registry/token1/members/" + types.ID(101).String(),
expectedRegValue: "infra=http://127.0.0.1:2380",
retries: 0,
},
{
name: "no retry with token2",
token: "token2",
memberId: 102,
expectedRegKey: "/_etcd/registry/token2/members/" + types.ID(102).String(),
expectedRegValue: "infra=http://127.0.0.1:2380",
retries: 0,
},
{
name: "2 retries",
token: "token3",
memberId: 103,
expectedRegKey: "/_etcd/registry/token3/members/" + types.ID(103).String(),
expectedRegValue: "infra=http://127.0.0.1:2380",
retries: 2,
},
}
lg, err := zap.NewProduction()
if err != nil {
t.Errorf("Failed to create a logger, error: %v", err)
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
fkv := &fakeKVForRegisterSelf{
fakeBaseKV: &fakeBaseKV{},
t: t,
expectedRegKey: tc.expectedRegKey,
expectedRegValue: tc.expectedRegValue,
retries: tc.retries,
}
d := &discovery{
lg: lg,
clusterToken: tc.token,
memberId: tc.memberId,
cfg: &DiscoveryConfig{},
c: &clientv3.Client{
KV: fkv,
},
clock: clockwork.NewRealClock(),
}
if err := d.registerSelf(tc.expectedRegValue); err != nil {
t.Errorf("Error occuring on register member self: %v", err)
}
if fkv.retries != 0 {
t.Errorf("Discovery client did not retry registering itself on error, remaining retries: %d", fkv.retries)
}
})
}
}
// fakeWatcherForWaitPeers is used to test waitPeers.
type fakeWatcherForWaitPeers struct {
*fakeBaseWatcher
t *testing.T
token string
members []memberInfo
}
// We only need to overwrite method `Watch`.
func (fw *fakeWatcherForWaitPeers) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
expectedWatchKey := fmt.Sprintf("/_etcd/registry/%s/members", fw.token)
if key != expectedWatchKey {
fw.t.Errorf("unexpected watch key, expected: %s, got: %s", expectedWatchKey, key)
}
ch := make(chan clientv3.WatchResponse, 1)
go func() {
for _, mi := range fw.members {
ch <- clientv3.WatchResponse{
Events: []*clientv3.Event{
{
Kv: &mvccpb.KeyValue{
Key: []byte(mi.peerRegKey),
Value: []byte(mi.peerURLsMap),
CreateRevision: mi.createRev,
},
},
},
}
}
close(ch)
}()
return ch
}
func TestWaitPeers(t *testing.T) {
actualMemberInfo := []memberInfo{
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(),
peerURLsMap: "infra1=http://192.168.0.100:2380",
createRev: 8,
},
{
// invalid peer registry key
peerRegKey: "/invalidPrefix/fakeToken/members/" + types.ID(102).String(),
peerURLsMap: "infra2=http://192.168.0.102:2380",
createRev: 6,
},
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(),
peerURLsMap: "infra2=http://192.168.0.102:2380",
createRev: 6,
},
{
// invalid peer info format
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(),
peerURLsMap: "http://192.168.0.102:2380",
createRev: 6,
},
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(103).String(),
peerURLsMap: "infra3=http://192.168.0.103:2380",
createRev: 7,
},
{
// duplicate peer
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(),
peerURLsMap: "infra1=http://192.168.0.100:2380",
createRev: 2,
},
}
// sort by CreateRevision
expectedMemberInfo := []memberInfo{
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(),
peerURLsMap: "infra2=http://192.168.0.102:2380",
createRev: 6,
},
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(103).String(),
peerURLsMap: "infra3=http://192.168.0.103:2380",
createRev: 7,
},
{
peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(),
peerURLsMap: "infra1=http://192.168.0.100:2380",
createRev: 8,
},
}
lg, err := zap.NewProduction()
if err != nil {
t.Errorf("Failed to create a logger, error: %v", err)
}
d := &discovery{
lg: lg,
c: &clientv3.Client{
KV: &fakeBaseKV{},
Watcher: &fakeWatcherForWaitPeers{
fakeBaseWatcher: &fakeBaseWatcher{},
t: t,
token: "fakeToken",
members: actualMemberInfo,
},
},
cfg: &DiscoveryConfig{},
clusterToken: "fakeToken",
}
cls := clusterInfo{
clusterToken: "fakeToken",
}
d.waitPeers(&cls, 3, 0)
if cls.Len() != len(expectedMemberInfo) {
t.Errorf("unexpected member number returned by watch, expected: %d, got: %d", len(expectedMemberInfo), cls.Len())
}
for i, m := range cls.members {
if m != expectedMemberInfo[i] {
t.Errorf("unexpected member[%d] returned by watch, expected: %v, got: %v", i, expectedMemberInfo[i], m)
}
}
}
func TestGetInitClusterStr(t *testing.T) {
cases := []struct {
name string
members []memberInfo
clusterSize int
expectedResult string
expectedError error
}{
{
name: "1 member",
members: []memberInfo{
{
peerURLsMap: "infra2=http://192.168.0.102:2380",
},
},
clusterSize: 1,
expectedResult: "infra2=http://192.168.0.102:2380",
expectedError: nil,
},
{
name: "2 members",
members: []memberInfo{
{
peerURLsMap: "infra2=http://192.168.0.102:2380",
},
{
peerURLsMap: "infra3=http://192.168.0.103:2380",
},
},
clusterSize: 2,
expectedResult: "infra2=http://192.168.0.102:2380,infra3=http://192.168.0.103:2380",
expectedError: nil,
},
{
name: "3 members",
members: []memberInfo{
{
peerURLsMap: "infra2=http://192.168.0.102:2380",
},
{
peerURLsMap: "infra3=http://192.168.0.103:2380",
},
{
peerURLsMap: "infra1=http://192.168.0.100:2380",
},
},
clusterSize: 3,
expectedResult: "infra2=http://192.168.0.102:2380,infra3=http://192.168.0.103:2380,infra1=http://192.168.0.100:2380",
expectedError: nil,
},
{
name: "should ignore redundant member",
members: []memberInfo{
{
peerURLsMap: "infra2=http://192.168.0.102:2380",
},
{
peerURLsMap: "infra3=http://192.168.0.103:2380",
},
{
peerURLsMap: "infra1=http://192.168.0.100:2380",
},
{
peerURLsMap: "infra4=http://192.168.0.104:2380",
},
},
clusterSize: 3,
expectedResult: "infra2=http://192.168.0.102:2380,infra3=http://192.168.0.103:2380,infra1=http://192.168.0.100:2380",
expectedError: nil,
},
{
name: "invalid_peer_url",
members: []memberInfo{
{
peerURLsMap: "infra2=http://192.168.0.102:2380",
},
{
peerURLsMap: "infra3=http://192.168.0.103", //not host:port
},
},
clusterSize: 2,
expectedResult: "infra2=http://192.168.0.102:2380,infra3=http://192.168.0.103:2380",
expectedError: ErrInvalidURL,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
clsInfo := &clusterInfo{
members: tc.members,
}
retStr, err := clsInfo.getInitClusterStr(tc.clusterSize)
if err != tc.expectedError {
t.Errorf("Unexpected error, expected: %v, got: %v", tc.expectedError, err)
}
if err == nil {
if retStr != tc.expectedResult {
t.Errorf("Unexpected result, expected: %s, got: %s", tc.expectedResult, retStr)
}
}
})
}
}
// fakeBaseKV is the base struct implementing the interface `clientv3.KV`.
type fakeBaseKV struct{}
func (fkv *fakeBaseKV) Put(ctx context.Context, key string, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
return nil, nil
}
func (fkv *fakeBaseKV) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
return nil, nil
}
func (fkv *fakeBaseKV) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
return nil, nil
}
func (fkv *fakeBaseKV) Compact(ctx context.Context, rev int64, opts ...clientv3.CompactOption) (*clientv3.CompactResponse, error) {
return nil, nil
}
func (fkv *fakeBaseKV) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) {
return clientv3.OpResponse{}, nil
}
func (fkv *fakeBaseKV) Txn(ctx context.Context) clientv3.Txn {
return nil
}
// fakeBaseWatcher is the base struct implementing the interface `clientv3.Watcher`.
type fakeBaseWatcher struct{}
func (fw *fakeBaseWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
return nil
}
func (fw *fakeBaseWatcher) RequestProgress(ctx context.Context) error {
return nil
}
func (fw *fakeBaseWatcher) Close() error {
return nil
}

View File

@ -40,6 +40,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
serverstorage "go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/etcd/server/v3/storage/backend"
@ -328,7 +329,11 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*
}
if cfg.ShouldDiscover() {
var str string
str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
if cfg.EnableV2Discovery {
str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
} else {
str, err = v3discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, &cfg.DiscoveryCfg, m.ID, cfg.InitialPeerURLsMap.String())
}
if err != nil {
return nil, &DiscoveryError{Op: "join", Err: err}
}

View File

@ -0,0 +1,112 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"fmt"
"strconv"
"strings"
"testing"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
func TestClusterOf1UsingV3Discovery(t *testing.T) {
testClusterUsingV3Discovery(t, 1, e2e.ClientNonTLS, false)
}
func TestClusterOf3UsingV3Discovery(t *testing.T) {
testClusterUsingV3Discovery(t, 3, e2e.ClientTLS, true)
}
func TestTLSClusterOf3UsingV3Discovery(t *testing.T) {
testClusterUsingV3Discovery(t, 5, e2e.ClientTLS, false)
}
func testClusterUsingV3Discovery(t *testing.T, clusterSize int, clientTlsType e2e.ClientConnType, isClientAutoTls bool) {
e2e.BeforeTest(t)
// step 1: start the discovery service
ds, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
InitialToken: "new",
BasePort: 2000,
ClusterSize: 1,
ClientTLS: clientTlsType,
IsClientAutoTLS: isClientAutoTls,
})
if err != nil {
t.Fatalf("could not start discovery etcd cluster (%v)", err)
}
defer ds.Close()
// step 2: configure the cluster size
clusterToken := "8A591FAB-1D72-41FA-BDF2-A27162FDA1E0"
configSizeKey := fmt.Sprintf("/_etcd/registry/%s/_config/size", clusterToken)
configSizeValStr := strconv.Itoa(clusterSize)
if err := ctlV3Put(ctlCtx{epc: ds}, configSizeKey, configSizeValStr, ""); err != nil {
t.Errorf("failed to configure cluster size to discovery serivce, error: %v", err)
}
// step 3: start the etcd cluster
epc, err := bootstrapEtcdClusterUsingV3Discovery(t, ds.EndpointsV3()[0], clusterToken, clusterSize, clientTlsType, isClientAutoTls)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
defer epc.Close()
// step 4: sanity test on the etcd cluster
etcdctl := []string{e2e.CtlBinPath, "--endpoints", strings.Join(epc.EndpointsV3(), ",")}
if err := e2e.SpawnWithExpect(append(etcdctl, "put", "key", "value"), "OK"); err != nil {
t.Fatal(err)
}
if err := e2e.SpawnWithExpect(append(etcdctl, "get", "key"), "value"); err != nil {
t.Fatal(err)
}
}
func bootstrapEtcdClusterUsingV3Discovery(t *testing.T, durl string, clusterToken string, clusterSize int, clientTlsType e2e.ClientConnType, isClientAutoTls bool) (*e2e.EtcdProcessCluster, error) {
// cluster configuration
cfg := &e2e.EtcdProcessClusterConfig{
BasePort: 3000,
ClusterSize: clusterSize,
IsPeerTLS: true,
IsPeerAutoTLS: true,
Discovery: fmt.Sprintf("%s/%s", durl, clusterToken),
}
// initialize the cluster
epc, err := e2e.InitEtcdProcessCluster(t, cfg)
if err != nil {
return epc, err
}
// populate discovery related security configuration
for _, ep := range epc.Procs {
epCfg := ep.Config()
epCfg.Args = append(epCfg.Args, "--enable-v2-discovery=false")
if clientTlsType == e2e.ClientTLS {
if isClientAutoTls {
epCfg.Args = append(epCfg.Args, "--discovery-insecure-transport=false")
epCfg.Args = append(epCfg.Args, "--discovery-insecure-skip-tls-verify=true")
} else {
epCfg.Args = append(epCfg.Args, "--discovery-cacert="+e2e.CaPath)
epCfg.Args = append(epCfg.Args, "--discovery-cert="+e2e.CertPath)
epCfg.Args = append(epCfg.Args, "--discovery-key="+e2e.PrivateKeyPath)
}
}
}
// start the cluster
return e2e.StartEtcdProcessCluster(epc, cfg)
}

View File

@ -181,16 +181,7 @@ func NewEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdPr
return nil, err
}
if cfg.RollingStart {
if err := epc.RollingStart(); err != nil {
return nil, fmt.Errorf("Cannot rolling-start: %v", err)
}
} else {
if err := epc.Start(); err != nil {
return nil, fmt.Errorf("Cannot start: %v", err)
}
}
return epc, nil
return StartEtcdProcessCluster(epc, cfg)
}
// InitEtcdProcessCluster initializes a new cluster based on the given config.
@ -218,6 +209,21 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP
return epc, nil
}
// StartEtcdProcessCluster launches a new cluster from etcd processes.
func StartEtcdProcessCluster(epc *EtcdProcessCluster, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) {
if cfg.RollingStart {
if err := epc.RollingStart(); err != nil {
return nil, fmt.Errorf("cannot rolling-start: %v", err)
}
} else {
if err := epc.Start(); err != nil {
return nil, fmt.Errorf("cannot start: %v", err)
}
}
return epc, nil
}
func (cfg *EtcdProcessClusterConfig) ClientScheme() string {
if cfg.ClientTLS == ClientTLS {
return "https"

View File

@ -588,6 +588,8 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
peerScheme := SchemeFromTLSInfo(mcfg.PeerTLS)
clientScheme := SchemeFromTLSInfo(mcfg.ClientTLS)
m.EnableV2Discovery = embed.DefaultEnableV2Discovery
pln := newLocalListener(t)
m.PeerListeners = []net.Listener{pln}
m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})