Compare commits
73 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
17cef6e3e9 | ||
![]() |
c07cba001b | ||
![]() |
b8878eac45 | ||
![]() |
e71e0c5c88 | ||
![]() |
bc44e367c3 | ||
![]() |
299e0f17aa | ||
![]() |
75d5e78d1f | ||
![]() |
c60dabf2f3 | ||
![]() |
8a4afdbcc2 | ||
![]() |
6fcab5af9f | ||
![]() |
008074187c | ||
![]() |
cf558ee8b7 | ||
![]() |
e800c62eca | ||
![]() |
0372cfc7ab | ||
![]() |
18dfb9cca3 | ||
![]() |
7b8270416d | ||
![]() |
a2c37485dd | ||
![]() |
67bfc310f0 | ||
![]() |
ed28c768a3 | ||
![]() |
d3a702a09d | ||
![]() |
319331192e | ||
![]() |
2acdf88406 | ||
![]() |
a8454e453f | ||
![]() |
32583af167 | ||
![]() |
85cc4deae6 | ||
![]() |
7dec4c412c | ||
![]() |
a4667f596a | ||
![]() |
0207d1df66 | ||
![]() |
99e893d285 | ||
![]() |
d5dec731db | ||
![]() |
81a2edc365 | ||
![]() |
e5424fc474 | ||
![]() |
4488595e05 | ||
![]() |
7b99863e02 | ||
![]() |
490c6139ac | ||
![]() |
31e49a4df3 | ||
![]() |
83fc96df0c | ||
![]() |
45192cf62b | ||
![]() |
1a1281005c | ||
![]() |
a4f42948e8 | ||
![]() |
2212a84adb | ||
![]() |
e42d7b5248 | ||
![]() |
b86bb615ff | ||
![]() |
ee963470f4 | ||
![]() |
36452a1c1d | ||
![]() |
4571e528f4 | ||
![]() |
37ac22205b | ||
![]() |
493f15c156 | ||
![]() |
c8b3c6f54c | ||
![]() |
368ff75a10 | ||
![]() |
7adbfa1144 | ||
![]() |
e151faf3cc | ||
![]() |
8292fd5051 | ||
![]() |
c37245ed4b | ||
![]() |
6dab8aff66 | ||
![]() |
c69efda350 | ||
![]() |
3d8e9a323d | ||
![]() |
963b242846 | ||
![]() |
6f011ce524 | ||
![]() |
36f8dee003 | ||
![]() |
47001f28bd | ||
![]() |
9a24f73f7b | ||
![]() |
7d1cf64049 | ||
![]() |
05c441f92f | ||
![]() |
434f7e83f0 | ||
![]() |
91b1a9182a | ||
![]() |
78f67988aa | ||
![]() |
54ba958911 | ||
![]() |
609e844f86 | ||
![]() |
166b4473fa | ||
![]() |
ed231df7c0 | ||
![]() |
cfe37de6c0 | ||
![]() |
f18976f4b8 |
@@ -174,3 +174,5 @@ As of version v3.2 if an etcd server is launched with the option `--client-cert-
|
||||
As of version v3.3 if an etcd server is launched with the option `--peer-cert-allowed-cn` or `--peer-cert-allowed-hostname` filtering of inter-peer connections is enabled. Nodes can only join the etcd cluster if their TLS certificate identity match the allowed one.
|
||||
See [etcd security page](https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/security.md) for more details.
|
||||
|
||||
## Notes on password strength
|
||||
`etcdctl` command line interface and etcd API don't check a strength (length, coexistence of numbers and alphabets, etc) of the password during creating a new user or updating password of an existing user. An administrator needs to care about a requirement of password strength by themselves.
|
||||
|
@@ -4,7 +4,7 @@ title: etcd gateway
|
||||
|
||||
## What is etcd gateway
|
||||
|
||||
etcd gateway is a simple TCP proxy that forwards network data to the etcd cluster. The gateway is stateless and transparent; it neither inspects client requests nor interferes with cluster responses.
|
||||
etcd gateway is a simple TCP proxy that forwards network data to the etcd cluster. The gateway is stateless and transparent; it neither inspects client requests nor interferes with cluster responses. It does not terminate TLS connections, do TLS handshakes on behalf of its clients, or verify if the connection is secured.
|
||||
|
||||
The gateway supports multiple etcd server endpoints and works on a simple round-robin policy. It only routes to available endpoints and hides failures from its clients. Other retry policies, such as weighted round-robin, may be supported in the future.
|
||||
|
||||
@@ -74,7 +74,7 @@ $ etcd gateway start --discovery-srv=example.com
|
||||
|
||||
* Comma-separated list of etcd server targets for forwarding client connections.
|
||||
* Default: `127.0.0.1:2379`
|
||||
* Invalid example: `https://127.0.0.1:2379` (gateway does not terminate TLS)
|
||||
* Invalid example: `https://127.0.0.1:2379` (gateway does not terminate TLS). Note that the gateway does not verify the HTTP schema or inspect the requests, it only forwards requests to the given endpoints.
|
||||
|
||||
#### --discovery-srv
|
||||
|
||||
@@ -103,5 +103,5 @@ $ etcd gateway start --discovery-srv=example.com
|
||||
|
||||
#### --trusted-ca-file
|
||||
|
||||
* Path to the client TLS CA file for the etcd cluster. Used to authenticate endpoints.
|
||||
* Path to the client TLS CA file for the etcd cluster to verify the endpoints returned from SRV discovery. Note that it is ONLY used for authenticating the discovered endpoints rather than creating connections for data transferring. The gateway never terminates TLS connections or create TLS connections on behalf of its clients.
|
||||
* Default: (not set)
|
||||
|
@@ -2,7 +2,7 @@
|
||||
title: Transport security model
|
||||
---
|
||||
|
||||
etcd supports automatic TLS as well as authentication through client certificates for both clients to server as well as peer (server to server / cluster) communication.
|
||||
etcd supports automatic TLS as well as authentication through client certificates for both clients to server as well as peer (server to server / cluster) communication. **Note that etcd doesn't enable [RBAC based authentication][auth] or the authentication feature in the transport layer by default to reduce friction for users getting started with the database. Further, changing this default would be a breaking change for the project which was established since 2013. An etcd cluster which doesn't enable security features can expose its data to any clients.**
|
||||
|
||||
To get up and running, first have a CA certificate and a signed key pair for one member. It is recommended to create and sign a new key pair for every member in a cluster.
|
||||
|
||||
@@ -426,8 +426,14 @@ Make sure to sign the certificates with a Subject Name the member's public IP ad
|
||||
|
||||
The certificate needs to be signed for the member's FQDN in its Subject Name, use Subject Alternative Names (short IP SANs) to add the IP address. The `etcd-ca` tool provides `--domain=` option for its `new-cert` command, and openssl can make [it][alt-name] too.
|
||||
|
||||
### Does etcd encrypt data stored on disk drives?
|
||||
No. etcd doesn't encrypt key/value data stored on disk drives. If a user need to encrypt data stored on etcd, there are some options:
|
||||
* Let client applications encrypt and decrypt the data
|
||||
* Use a feature of underlying storage systems for encrypting stored data like [dm-crypt]
|
||||
|
||||
[cfssl]: https://github.com/cloudflare/cfssl
|
||||
[tls-setup]: ../../hack/tls-setup
|
||||
[tls-guide]: https://github.com/coreos/docs/blob/master/os/generate-self-signed-certificates.md
|
||||
[alt-name]: http://wiki.cacert.org/FAQ/subjectAltName
|
||||
[auth]: authentication.md
|
||||
[dm-crypt]: https://en.wikipedia.org/wiki/Dm-crypt
|
||||
|
@@ -37,7 +37,7 @@ const (
|
||||
|
||||
// var for testing purposes
|
||||
var (
|
||||
simpleTokenTTL = 5 * time.Minute
|
||||
simpleTokenTTLDefault = 300 * time.Second
|
||||
simpleTokenTTLResolution = 1 * time.Second
|
||||
)
|
||||
|
||||
@@ -47,6 +47,7 @@ type simpleTokenTTLKeeper struct {
|
||||
stopc chan struct{}
|
||||
deleteTokenFunc func(string)
|
||||
mu *sync.Mutex
|
||||
simpleTokenTTL time.Duration
|
||||
}
|
||||
|
||||
func (tm *simpleTokenTTLKeeper) stop() {
|
||||
@@ -58,12 +59,12 @@ func (tm *simpleTokenTTLKeeper) stop() {
|
||||
}
|
||||
|
||||
func (tm *simpleTokenTTLKeeper) addSimpleToken(token string) {
|
||||
tm.tokens[token] = time.Now().Add(simpleTokenTTL)
|
||||
tm.tokens[token] = time.Now().Add(tm.simpleTokenTTL)
|
||||
}
|
||||
|
||||
func (tm *simpleTokenTTLKeeper) resetSimpleToken(token string) {
|
||||
if _, ok := tm.tokens[token]; ok {
|
||||
tm.tokens[token] = time.Now().Add(simpleTokenTTL)
|
||||
tm.tokens[token] = time.Now().Add(tm.simpleTokenTTL)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,6 +102,7 @@ type tokenSimple struct {
|
||||
simpleTokenKeeper *simpleTokenTTLKeeper
|
||||
simpleTokensMu sync.Mutex
|
||||
simpleTokens map[string]string // token -> username
|
||||
simpleTokenTTL time.Duration
|
||||
}
|
||||
|
||||
func (t *tokenSimple) genTokenPrefix() (string, error) {
|
||||
@@ -157,6 +159,10 @@ func (t *tokenSimple) invalidateUser(username string) {
|
||||
}
|
||||
|
||||
func (t *tokenSimple) enable() {
|
||||
if t.simpleTokenTTL <= 0 {
|
||||
t.simpleTokenTTL = simpleTokenTTLDefault
|
||||
}
|
||||
|
||||
delf := func(tk string) {
|
||||
if username, ok := t.simpleTokens[tk]; ok {
|
||||
if t.lg != nil {
|
||||
@@ -177,6 +183,7 @@ func (t *tokenSimple) enable() {
|
||||
stopc: make(chan struct{}),
|
||||
deleteTokenFunc: delf,
|
||||
mu: &t.simpleTokensMu,
|
||||
simpleTokenTTL: t.simpleTokenTTL,
|
||||
}
|
||||
go t.simpleTokenKeeper.run()
|
||||
}
|
||||
@@ -234,10 +241,14 @@ func (t *tokenSimple) isValidSimpleToken(ctx context.Context, token string) bool
|
||||
return false
|
||||
}
|
||||
|
||||
func newTokenProviderSimple(lg *zap.Logger, indexWaiter func(uint64) <-chan struct{}) *tokenSimple {
|
||||
func newTokenProviderSimple(lg *zap.Logger, indexWaiter func(uint64) <-chan struct{}, TokenTTL time.Duration) *tokenSimple {
|
||||
if lg == nil {
|
||||
lg = zap.NewNop()
|
||||
}
|
||||
return &tokenSimple{
|
||||
lg: lg,
|
||||
simpleTokens: make(map[string]string),
|
||||
indexWaiter: indexWaiter,
|
||||
lg: lg,
|
||||
simpleTokens: make(map[string]string),
|
||||
indexWaiter: indexWaiter,
|
||||
simpleTokenTTL: TokenTTL,
|
||||
}
|
||||
}
|
||||
|
@@ -24,9 +24,9 @@ import (
|
||||
// TestSimpleTokenDisabled ensures that TokenProviderSimple behaves correctly when
|
||||
// disabled.
|
||||
func TestSimpleTokenDisabled(t *testing.T) {
|
||||
initialState := newTokenProviderSimple(zap.NewExample(), dummyIndexWaiter)
|
||||
initialState := newTokenProviderSimple(zap.NewExample(), dummyIndexWaiter, simpleTokenTTLDefault)
|
||||
|
||||
explicitlyDisabled := newTokenProviderSimple(zap.NewExample(), dummyIndexWaiter)
|
||||
explicitlyDisabled := newTokenProviderSimple(zap.NewExample(), dummyIndexWaiter, simpleTokenTTLDefault)
|
||||
explicitlyDisabled.enable()
|
||||
explicitlyDisabled.disable()
|
||||
|
||||
@@ -48,7 +48,7 @@ func TestSimpleTokenDisabled(t *testing.T) {
|
||||
// TestSimpleTokenAssign ensures that TokenProviderSimple can correctly assign a
|
||||
// token, look it up with info, and invalidate it by user.
|
||||
func TestSimpleTokenAssign(t *testing.T) {
|
||||
tp := newTokenProviderSimple(zap.NewExample(), dummyIndexWaiter)
|
||||
tp := newTokenProviderSimple(zap.NewExample(), dummyIndexWaiter, simpleTokenTTLDefault)
|
||||
tp.enable()
|
||||
ctx := context.WithValue(context.WithValue(context.TODO(), AuthenticateParamIndex{}, uint64(1)), AuthenticateParamSimpleTokenPrefix{}, "dummy")
|
||||
token, err := tp.assign(ctx, "user1", 0)
|
||||
|
@@ -23,6 +23,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/auth/authpb"
|
||||
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
@@ -59,6 +60,7 @@ var (
|
||||
ErrRoleNotFound = errors.New("auth: role not found")
|
||||
ErrRoleEmpty = errors.New("auth: role name is empty")
|
||||
ErrAuthFailed = errors.New("auth: authentication failed, invalid user ID or password")
|
||||
ErrNoPasswordUser = errors.New("auth: authentication failed, password was given for no password user")
|
||||
ErrPermissionDenied = errors.New("auth: permission denied")
|
||||
ErrRoleNotGranted = errors.New("auth: role is not granted to the user")
|
||||
ErrPermissionNotGranted = errors.New("auth: permission is not granted to the role")
|
||||
@@ -346,17 +348,27 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
|
||||
return 0, ErrAuthNotEnabled
|
||||
}
|
||||
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
var user *authpb.User
|
||||
// CompareHashAndPassword is very expensive, so we use closures
|
||||
// to avoid putting it in the critical section of the tx lock.
|
||||
revision, err := func() (uint64, error) {
|
||||
tx := as.be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
|
||||
user := getUser(as.lg, tx, username)
|
||||
if user == nil {
|
||||
return 0, ErrAuthFailed
|
||||
}
|
||||
user = getUser(as.lg, tx, username)
|
||||
if user == nil {
|
||||
return 0, ErrAuthFailed
|
||||
}
|
||||
|
||||
if user.Options != nil && user.Options.NoPassword {
|
||||
return 0, ErrAuthFailed
|
||||
if user.Options != nil && user.Options.NoPassword {
|
||||
return 0, ErrNoPasswordUser
|
||||
}
|
||||
|
||||
return getRevision(tx), nil
|
||||
}()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if bcrypt.CompareHashAndPassword(user.Password, []byte(password)) != nil {
|
||||
@@ -367,7 +379,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
|
||||
}
|
||||
return 0, ErrAuthFailed
|
||||
}
|
||||
return getRevision(tx), nil
|
||||
return revision, nil
|
||||
}
|
||||
|
||||
func (as *authStore) Recover(be backend.Backend) {
|
||||
@@ -984,7 +996,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
|
||||
if !as.IsAuthEnabled() {
|
||||
return nil
|
||||
}
|
||||
if authInfo == nil {
|
||||
if authInfo == nil || authInfo.Username == "" {
|
||||
return ErrUserEmpty
|
||||
}
|
||||
|
||||
@@ -1341,7 +1353,8 @@ func decomposeOpts(lg *zap.Logger, optstr string) (string, map[string]string, er
|
||||
func NewTokenProvider(
|
||||
lg *zap.Logger,
|
||||
tokenOpts string,
|
||||
indexWaiter func(uint64) <-chan struct{}) (TokenProvider, error) {
|
||||
indexWaiter func(uint64) <-chan struct{},
|
||||
TokenTTL time.Duration) (TokenProvider, error) {
|
||||
tokenType, typeSpecificOpts, err := decomposeOpts(lg, tokenOpts)
|
||||
if err != nil {
|
||||
return nil, ErrInvalidAuthOpts
|
||||
@@ -1354,7 +1367,7 @@ func NewTokenProvider(
|
||||
} else {
|
||||
plog.Warningf("simple token is not cryptographically signed")
|
||||
}
|
||||
return newTokenProviderSimple(lg, indexWaiter), nil
|
||||
return newTokenProviderSimple(lg, indexWaiter, TokenTTL), nil
|
||||
|
||||
case tokenTypeJWT:
|
||||
return newTokenProviderJWT(lg, typeSpecificOpts)
|
||||
|
@@ -48,7 +48,7 @@ func TestNewAuthStoreRevision(t *testing.T) {
|
||||
b, tPath := backend.NewDefaultTmpBackend()
|
||||
defer os.Remove(tPath)
|
||||
|
||||
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter)
|
||||
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -78,7 +78,7 @@ func TestNewAuthStoreBcryptCost(t *testing.T) {
|
||||
b, tPath := backend.NewDefaultTmpBackend()
|
||||
defer os.Remove(tPath)
|
||||
|
||||
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter)
|
||||
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -98,7 +98,7 @@ func TestNewAuthStoreBcryptCost(t *testing.T) {
|
||||
func setupAuthStore(t *testing.T) (store *authStore, teardownfunc func(t *testing.T)) {
|
||||
b, tPath := backend.NewDefaultTmpBackend()
|
||||
|
||||
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter)
|
||||
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -626,7 +626,7 @@ func TestAuthInfoFromCtxRace(t *testing.T) {
|
||||
b, tPath := backend.NewDefaultTmpBackend()
|
||||
defer os.Remove(tPath)
|
||||
|
||||
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter)
|
||||
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -658,6 +658,12 @@ func TestIsAdminPermitted(t *testing.T) {
|
||||
t.Errorf("expected %v, got %v", ErrUserNotFound, err)
|
||||
}
|
||||
|
||||
// empty user
|
||||
err = as.IsAdminPermitted(&AuthInfo{Username: "", Revision: 1})
|
||||
if err != ErrUserEmpty {
|
||||
t.Errorf("expected %v, got %v", ErrUserEmpty, err)
|
||||
}
|
||||
|
||||
// non-admin user
|
||||
err = as.IsAdminPermitted(&AuthInfo{Username: "foo", Revision: 1})
|
||||
if err != ErrPermissionDenied {
|
||||
@@ -692,7 +698,7 @@ func TestRecoverFromSnapshot(t *testing.T) {
|
||||
|
||||
as.Close()
|
||||
|
||||
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter)
|
||||
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -725,13 +731,13 @@ func contains(array []string, str string) bool {
|
||||
|
||||
func TestHammerSimpleAuthenticate(t *testing.T) {
|
||||
// set TTL values low to try to trigger races
|
||||
oldTTL, oldTTLRes := simpleTokenTTL, simpleTokenTTLResolution
|
||||
oldTTL, oldTTLRes := simpleTokenTTLDefault, simpleTokenTTLResolution
|
||||
defer func() {
|
||||
simpleTokenTTL = oldTTL
|
||||
simpleTokenTTLDefault = oldTTL
|
||||
simpleTokenTTLResolution = oldTTLRes
|
||||
}()
|
||||
simpleTokenTTL = 10 * time.Millisecond
|
||||
simpleTokenTTLResolution = simpleTokenTTL
|
||||
simpleTokenTTLDefault = 10 * time.Millisecond
|
||||
simpleTokenTTLResolution = simpleTokenTTLDefault
|
||||
users := make(map[string]struct{})
|
||||
|
||||
as, tearDown := setupAuthStore(t)
|
||||
@@ -774,7 +780,7 @@ func TestRolesOrder(t *testing.T) {
|
||||
b, tPath := backend.NewDefaultTmpBackend()
|
||||
defer os.Remove(tPath)
|
||||
|
||||
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter)
|
||||
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -829,7 +835,7 @@ func testAuthInfoFromCtxWithRoot(t *testing.T, opts string) {
|
||||
b, tPath := backend.NewDefaultTmpBackend()
|
||||
defer os.Remove(tPath)
|
||||
|
||||
tp, err := NewTokenProvider(zap.NewExample(), opts, dummyIndexWaiter)
|
||||
tp, err := NewTokenProvider(zap.NewExample(), opts, dummyIndexWaiter, simpleTokenTTLDefault)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@@ -65,8 +65,8 @@ func TestUserErrorAuth(t *testing.T) {
|
||||
authSetupRoot(t, authapi.Auth)
|
||||
|
||||
// unauthenticated client
|
||||
if _, err := authapi.UserAdd(context.TODO(), "foo", "bar"); err != rpctypes.ErrUserNotFound {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrUserNotFound, err)
|
||||
if _, err := authapi.UserAdd(context.TODO(), "foo", "bar"); err != rpctypes.ErrUserEmpty {
|
||||
t.Fatalf("expected %v, got %v", rpctypes.ErrUserEmpty, err)
|
||||
}
|
||||
|
||||
// wrong id or password
|
||||
|
@@ -25,6 +25,7 @@ import (
|
||||
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||||
mvccpb "go.etcd.io/etcd/mvcc/mvccpb"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
@@ -140,6 +141,7 @@ type watcher struct {
|
||||
|
||||
// streams holds all the active grpc streams keyed by ctx value.
|
||||
streams map[string]*watchGrpcStream
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
// watchGrpcStream tracks all watch resources attached to a single grpc stream.
|
||||
@@ -176,6 +178,8 @@ type watchGrpcStream struct {
|
||||
resumec chan struct{}
|
||||
// closeErr is the error that closed the watch stream
|
||||
closeErr error
|
||||
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
// watchStreamRequest is a union of the supported watch request operation types
|
||||
@@ -242,6 +246,7 @@ func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
|
||||
}
|
||||
if c != nil {
|
||||
w.callOpts = c.callOpts
|
||||
w.lg = c.lg
|
||||
}
|
||||
return w
|
||||
}
|
||||
@@ -273,6 +278,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
|
||||
errc: make(chan error, 1),
|
||||
closingc: make(chan *watcherStream),
|
||||
resumec: make(chan struct{}),
|
||||
lg: w.lg,
|
||||
}
|
||||
go wgs.run()
|
||||
return wgs
|
||||
@@ -544,10 +550,18 @@ func (w *watchGrpcStream) run() {
|
||||
w.resuming = append(w.resuming, ws)
|
||||
if len(w.resuming) == 1 {
|
||||
// head of resume queue, can register a new watcher
|
||||
wc.Send(ws.initReq.toPB())
|
||||
if err := wc.Send(ws.initReq.toPB()); err != nil {
|
||||
if w.lg != nil {
|
||||
w.lg.Debug("error when sending request", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
case *progressRequest:
|
||||
wc.Send(wreq.toPB())
|
||||
if err := wc.Send(wreq.toPB()); err != nil {
|
||||
if w.lg != nil {
|
||||
w.lg.Debug("error when sending request", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// new events from the watch client
|
||||
@@ -571,7 +585,11 @@ func (w *watchGrpcStream) run() {
|
||||
}
|
||||
|
||||
if ws := w.nextResume(); ws != nil {
|
||||
wc.Send(ws.initReq.toPB())
|
||||
if err := wc.Send(ws.initReq.toPB()); err != nil {
|
||||
if w.lg != nil {
|
||||
w.lg.Debug("error when sending request", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// reset for next iteration
|
||||
@@ -616,7 +634,14 @@ func (w *watchGrpcStream) run() {
|
||||
},
|
||||
}
|
||||
req := &pb.WatchRequest{RequestUnion: cr}
|
||||
wc.Send(req)
|
||||
if w.lg != nil {
|
||||
w.lg.Debug("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId))
|
||||
}
|
||||
if err := wc.Send(req); err != nil {
|
||||
if w.lg != nil {
|
||||
w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// watch client failed on Recv; spawn another if possible
|
||||
@@ -629,7 +654,11 @@ func (w *watchGrpcStream) run() {
|
||||
return
|
||||
}
|
||||
if ws := w.nextResume(); ws != nil {
|
||||
wc.Send(ws.initReq.toPB())
|
||||
if err := wc.Send(ws.initReq.toPB()); err != nil {
|
||||
if w.lg != nil {
|
||||
w.lg.Debug("error when sending request", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
cancelSet = make(map[int64]struct{})
|
||||
|
||||
@@ -637,6 +666,25 @@ func (w *watchGrpcStream) run() {
|
||||
return
|
||||
|
||||
case ws := <-w.closingc:
|
||||
if ws.id != -1 {
|
||||
// client is closing an established watch; close it on the server proactively instead of waiting
|
||||
// to close when the next message arrives
|
||||
cancelSet[ws.id] = struct{}{}
|
||||
cr := &pb.WatchRequest_CancelRequest{
|
||||
CancelRequest: &pb.WatchCancelRequest{
|
||||
WatchId: ws.id,
|
||||
},
|
||||
}
|
||||
req := &pb.WatchRequest{RequestUnion: cr}
|
||||
if w.lg != nil {
|
||||
w.lg.Debug("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id))
|
||||
}
|
||||
if err := wc.Send(req); err != nil {
|
||||
if w.lg != nil {
|
||||
w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
w.closeSubstream(ws)
|
||||
delete(closing, ws)
|
||||
// no more watchers on this stream, shutdown
|
||||
|
@@ -273,14 +273,18 @@ type Config struct {
|
||||
AuthToken string `json:"auth-token"`
|
||||
BcryptCost uint `json:"bcrypt-cost"`
|
||||
|
||||
//The AuthTokenTTL in seconds of the simple token
|
||||
AuthTokenTTL uint `json:"auth-token-ttl"`
|
||||
|
||||
ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
|
||||
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
|
||||
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
|
||||
// ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses (array and map are supported types).
|
||||
ExperimentalBackendFreelistType string `json:"experimental-backend-bbolt-freelist-type"`
|
||||
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
|
||||
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
|
||||
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
||||
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
|
||||
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
||||
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
|
||||
|
||||
// ForceNewCluster starts a new cluster even if previously started; unsafe.
|
||||
ForceNewCluster bool `json:"force-new-cluster"`
|
||||
@@ -335,6 +339,10 @@ type Config struct {
|
||||
// Only valid if "logger" option is "capnslog".
|
||||
// WARN: DO NOT USE THIS!
|
||||
LogPkgLevels string `json:"log-package-levels"`
|
||||
|
||||
// UnsafeNoFsync disables all uses of fsync.
|
||||
// Setting this is unsafe and will cause data loss.
|
||||
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
|
||||
}
|
||||
|
||||
// configYAML holds the config suitable for yaml parsing
|
||||
@@ -406,8 +414,9 @@ func NewConfig() *Config {
|
||||
CORS: map[string]struct{}{"*": {}},
|
||||
HostWhitelist: map[string]struct{}{"*": {}},
|
||||
|
||||
AuthToken: "simple",
|
||||
BcryptCost: uint(bcrypt.DefaultCost),
|
||||
AuthToken: "simple",
|
||||
BcryptCost: uint(bcrypt.DefaultCost),
|
||||
AuthTokenTTL: 300,
|
||||
|
||||
PreVote: false, // TODO: enable by default in v3.5
|
||||
|
||||
|
@@ -178,9 +178,11 @@ func TestAutoCompactionModeParse(t *testing.T) {
|
||||
{"revision", "1", false, 1},
|
||||
{"revision", "1h", false, time.Hour},
|
||||
{"revision", "a", true, 0},
|
||||
{"revision", "-1", true, 0},
|
||||
// periodic
|
||||
{"periodic", "1", false, time.Hour},
|
||||
{"periodic", "a", true, 0},
|
||||
{"revision", "-1", true, 0},
|
||||
// err mode
|
||||
{"errmode", "1", false, 0},
|
||||
{"errmode", "1h", false, time.Hour},
|
||||
|
@@ -162,50 +162,53 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
backendFreelistType := parseBackendFreelistType(cfg.ExperimentalBackendFreelistType)
|
||||
|
||||
srvcfg := etcdserver.ServerConfig{
|
||||
Name: cfg.Name,
|
||||
ClientURLs: cfg.ACUrls,
|
||||
PeerURLs: cfg.APUrls,
|
||||
DataDir: cfg.Dir,
|
||||
DedicatedWALDir: cfg.WalDir,
|
||||
SnapshotCount: cfg.SnapshotCount,
|
||||
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
|
||||
MaxSnapFiles: cfg.MaxSnapFiles,
|
||||
MaxWALFiles: cfg.MaxWalFiles,
|
||||
InitialPeerURLsMap: urlsmap,
|
||||
InitialClusterToken: token,
|
||||
DiscoveryURL: cfg.Durl,
|
||||
DiscoveryProxy: cfg.Dproxy,
|
||||
NewCluster: cfg.IsNewCluster(),
|
||||
PeerTLSInfo: cfg.PeerTLSInfo,
|
||||
TickMs: cfg.TickMs,
|
||||
ElectionTicks: cfg.ElectionTicks(),
|
||||
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
|
||||
AutoCompactionRetention: autoCompactionRetention,
|
||||
AutoCompactionMode: cfg.AutoCompactionMode,
|
||||
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
||||
BackendBatchLimit: cfg.BackendBatchLimit,
|
||||
BackendFreelistType: backendFreelistType,
|
||||
BackendBatchInterval: cfg.BackendBatchInterval,
|
||||
MaxTxnOps: cfg.MaxTxnOps,
|
||||
MaxRequestBytes: cfg.MaxRequestBytes,
|
||||
StrictReconfigCheck: cfg.StrictReconfigCheck,
|
||||
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
|
||||
AuthToken: cfg.AuthToken,
|
||||
BcryptCost: cfg.BcryptCost,
|
||||
CORS: cfg.CORS,
|
||||
HostWhitelist: cfg.HostWhitelist,
|
||||
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
|
||||
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
|
||||
PreVote: cfg.PreVote,
|
||||
Logger: cfg.logger,
|
||||
LoggerConfig: cfg.loggerConfig,
|
||||
LoggerCore: cfg.loggerCore,
|
||||
LoggerWriteSyncer: cfg.loggerWriteSyncer,
|
||||
Debug: cfg.Debug,
|
||||
ForceNewCluster: cfg.ForceNewCluster,
|
||||
EnableGRPCGateway: cfg.EnableGRPCGateway,
|
||||
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
||||
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
||||
Name: cfg.Name,
|
||||
ClientURLs: cfg.ACUrls,
|
||||
PeerURLs: cfg.APUrls,
|
||||
DataDir: cfg.Dir,
|
||||
DedicatedWALDir: cfg.WalDir,
|
||||
SnapshotCount: cfg.SnapshotCount,
|
||||
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
|
||||
MaxSnapFiles: cfg.MaxSnapFiles,
|
||||
MaxWALFiles: cfg.MaxWalFiles,
|
||||
InitialPeerURLsMap: urlsmap,
|
||||
InitialClusterToken: token,
|
||||
DiscoveryURL: cfg.Durl,
|
||||
DiscoveryProxy: cfg.Dproxy,
|
||||
NewCluster: cfg.IsNewCluster(),
|
||||
PeerTLSInfo: cfg.PeerTLSInfo,
|
||||
TickMs: cfg.TickMs,
|
||||
ElectionTicks: cfg.ElectionTicks(),
|
||||
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
|
||||
AutoCompactionRetention: autoCompactionRetention,
|
||||
AutoCompactionMode: cfg.AutoCompactionMode,
|
||||
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
||||
BackendBatchLimit: cfg.BackendBatchLimit,
|
||||
BackendFreelistType: backendFreelistType,
|
||||
BackendBatchInterval: cfg.BackendBatchInterval,
|
||||
MaxTxnOps: cfg.MaxTxnOps,
|
||||
MaxRequestBytes: cfg.MaxRequestBytes,
|
||||
StrictReconfigCheck: cfg.StrictReconfigCheck,
|
||||
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
|
||||
AuthToken: cfg.AuthToken,
|
||||
BcryptCost: cfg.BcryptCost,
|
||||
TokenTTL: cfg.AuthTokenTTL,
|
||||
CORS: cfg.CORS,
|
||||
HostWhitelist: cfg.HostWhitelist,
|
||||
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
|
||||
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
|
||||
PreVote: cfg.PreVote,
|
||||
Logger: cfg.logger,
|
||||
LoggerConfig: cfg.loggerConfig,
|
||||
LoggerCore: cfg.loggerCore,
|
||||
LoggerWriteSyncer: cfg.loggerWriteSyncer,
|
||||
Debug: cfg.Debug,
|
||||
ForceNewCluster: cfg.ForceNewCluster,
|
||||
EnableGRPCGateway: cfg.EnableGRPCGateway,
|
||||
UnsafeNoFsync: cfg.UnsafeNoFsync,
|
||||
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
||||
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
||||
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
||||
}
|
||||
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
|
||||
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
||||
@@ -811,7 +814,7 @@ func (e *Etcd) GetLogger() *zap.Logger {
|
||||
|
||||
func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) {
|
||||
h, err := strconv.Atoi(retention)
|
||||
if err == nil {
|
||||
if err == nil && h >= 0 {
|
||||
switch mode {
|
||||
case CompactorModeRevision:
|
||||
ret = time.Duration(int64(h))
|
||||
|
@@ -60,7 +60,7 @@ func init() {
|
||||
// TODO: secure by default when etcd enables secure gRPC by default.
|
||||
rootCmd.PersistentFlags().BoolVar(&globalFlags.Insecure, "insecure-transport", true, "disable transport security for client connections")
|
||||
rootCmd.PersistentFlags().BoolVar(&globalFlags.InsecureDiscovery, "insecure-discovery", true, "accept insecure SRV records describing cluster endpoints")
|
||||
rootCmd.PersistentFlags().BoolVar(&globalFlags.InsecureSkipVerify, "insecure-skip-tls-verify", false, "skip server certificate verification")
|
||||
rootCmd.PersistentFlags().BoolVar(&globalFlags.InsecureSkipVerify, "insecure-skip-tls-verify", false, "skip server certificate verification (CAUTION: this option should be enabled only for testing purposes)")
|
||||
rootCmd.PersistentFlags().StringVar(&globalFlags.TLS.CertFile, "cert", "", "identify secure client using this TLS certificate file")
|
||||
rootCmd.PersistentFlags().StringVar(&globalFlags.TLS.KeyFile, "key", "", "identify secure client using this TLS key file")
|
||||
rootCmd.PersistentFlags().StringVar(&globalFlags.TLS.TrustedCAFile, "cacert", "", "verify certificates of TLS-enabled secure servers using this CA bundle")
|
||||
|
@@ -245,6 +245,7 @@ func newConfig() *config {
|
||||
// auth
|
||||
fs.StringVar(&cfg.ec.AuthToken, "auth-token", cfg.ec.AuthToken, "Specify auth token specific options.")
|
||||
fs.UintVar(&cfg.ec.BcryptCost, "bcrypt-cost", cfg.ec.BcryptCost, "Specify bcrypt algorithm cost factor for auth password hashing.")
|
||||
fs.UintVar(&cfg.ec.AuthTokenTTL, "auth-token-ttl", cfg.ec.AuthTokenTTL, "The lifetime in seconds of the auth token.")
|
||||
|
||||
// gateway
|
||||
fs.BoolVar(&cfg.ec.EnableGRPCGateway, "enable-grpc-gateway", true, "Enable GRPC gateway.")
|
||||
@@ -256,8 +257,10 @@ func newConfig() *config {
|
||||
fs.StringVar(&cfg.ec.ExperimentalBackendFreelistType, "experimental-backend-bbolt-freelist-type", cfg.ec.ExperimentalBackendFreelistType, "ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types)")
|
||||
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
|
||||
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
|
||||
|
||||
// unsafe
|
||||
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
|
||||
fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.")
|
||||
|
||||
// ignored
|
||||
|
@@ -358,7 +358,7 @@ func startProxy(cfg *config) error {
|
||||
}
|
||||
|
||||
cfg.ec.Dir = filepath.Join(cfg.ec.Dir, "proxy")
|
||||
err = os.MkdirAll(cfg.ec.Dir, fileutil.PrivateDirMode)
|
||||
err = fileutil.TouchDirAll(cfg.ec.Dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -71,7 +71,7 @@ func newGatewayStartCommand() *cobra.Command {
|
||||
cmd.Flags().StringVar(&gatewayDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
|
||||
cmd.Flags().StringVar(&gatewayDNSClusterServiceName, "discovery-srv-name", "", "service name to query when using DNS discovery")
|
||||
cmd.Flags().BoolVar(&gatewayInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
|
||||
cmd.Flags().StringVar(&gatewayCA, "trusted-ca-file", "", "path to the client server TLS CA file.")
|
||||
cmd.Flags().StringVar(&gatewayCA, "trusted-ca-file", "", "path to the client server TLS CA file for verifying the discovered endpoints when discovery-srv is provided.")
|
||||
|
||||
cmd.Flags().StringSliceVar(&gatewayEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
|
||||
|
||||
@@ -119,6 +119,41 @@ func startGateway(cmd *cobra.Command, args []string) {
|
||||
}
|
||||
}
|
||||
|
||||
lhost, lport, err := net.SplitHostPort(gatewayListenAddr)
|
||||
if err != nil {
|
||||
fmt.Println("failed to validate listen address:", gatewayListenAddr)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
laddrs, err := net.LookupHost(lhost)
|
||||
if err != nil {
|
||||
fmt.Println("failed to resolve listen host:", lhost)
|
||||
os.Exit(1)
|
||||
}
|
||||
laddrsMap := make(map[string]bool)
|
||||
for _, addr := range laddrs {
|
||||
laddrsMap[addr] = true
|
||||
}
|
||||
|
||||
for _, srv := range srvs.SRVs {
|
||||
var eaddrs []string
|
||||
eaddrs, err = net.LookupHost(srv.Target)
|
||||
if err != nil {
|
||||
fmt.Println("failed to resolve endpoint host:", srv.Target)
|
||||
os.Exit(1)
|
||||
}
|
||||
if fmt.Sprintf("%d", srv.Port) != lport {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, ea := range eaddrs {
|
||||
if laddrsMap[ea] {
|
||||
fmt.Printf("SRV or endpoint (%s:%d->%s:%d) should not resolve to the gateway listen addr (%s)\n", srv.Target, srv.Port, ea, srv.Port, gatewayListenAddr)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(srvs.Endpoints) == 0 {
|
||||
fmt.Println("no endpoints found")
|
||||
os.Exit(1)
|
||||
|
@@ -131,7 +131,7 @@ func newGRPCProxyStartCommand() *cobra.Command {
|
||||
cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
|
||||
cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
|
||||
cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
|
||||
cmd.Flags().BoolVar(&grpcProxyInsecureSkipTLSVerify, "insecure-skip-tls-verify", false, "skip authentication of etcd server TLS certificates")
|
||||
cmd.Flags().BoolVar(&grpcProxyInsecureSkipTLSVerify, "insecure-skip-tls-verify", false, "skip authentication of etcd server TLS certificates (CAUTION: this option should be enabled only for testing purposes)")
|
||||
|
||||
// client TLS for connecting to proxy
|
||||
cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file")
|
||||
@@ -286,6 +286,9 @@ func newClientCfg(lg *zap.Logger, eps []string) (*clientv3.Config, error) {
|
||||
return nil, err
|
||||
}
|
||||
clientTLS.InsecureSkipVerify = grpcProxyInsecureSkipTLSVerify
|
||||
if clientTLS.InsecureSkipVerify {
|
||||
lg.Warn("--insecure-skip-tls-verify was given, this grpc proxy process skips authentication of etcd server TLS certificates. This option should be enabled only for testing purposes.")
|
||||
}
|
||||
cfg.TLS = clientTLS
|
||||
lg.Info("gRPC proxy client TLS", zap.String("tls-info", fmt.Sprintf("%+v", tls)))
|
||||
}
|
||||
|
@@ -162,6 +162,8 @@ Auth:
|
||||
Specify a v3 authentication token type and its options ('simple' or 'jwt').
|
||||
--bcrypt-cost ` + fmt.Sprintf("%d", bcrypt.DefaultCost) + `
|
||||
Specify the cost / strength of the bcrypt algorithm for hashing auth passwords. Valid values are between ` + fmt.Sprintf("%d", bcrypt.MinCost) + ` and ` + fmt.Sprintf("%d", bcrypt.MaxCost) + `.
|
||||
--auth-token-ttl 300
|
||||
Time (in seconds) of the auth-token-ttl.
|
||||
|
||||
Profiling and Monitoring:
|
||||
--enable-pprof 'false'
|
||||
@@ -208,6 +210,8 @@ Experimental feature:
|
||||
ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch.
|
||||
--experimental-peer-skip-client-san-verification 'false'
|
||||
Skip verification of SAN field in client certificate for peer connections.
|
||||
--experimental-watch-progress-notify-interval '10m'
|
||||
Duration of periodical watch progress notification.
|
||||
|
||||
Unsafe feature:
|
||||
--force-new-cluster 'false'
|
||||
|
@@ -300,6 +300,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
dbSize := humanize.Bytes(uint64(n))
|
||||
receivedBytes.WithLabelValues(from).Add(float64(n))
|
||||
|
||||
downloadTook := time.Since(start)
|
||||
if h.lg != nil {
|
||||
h.lg.Info(
|
||||
"received and saved database snapshot",
|
||||
@@ -308,9 +309,10 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
|
||||
zap.Int64("incoming-snapshot-size-bytes", n),
|
||||
zap.String("incoming-snapshot-size", dbSize),
|
||||
zap.String("download-took", downloadTook.String()),
|
||||
)
|
||||
} else {
|
||||
plog.Infof("successfully received and saved database snapshot [index: %d, from: %s, raft message size: %s, db size: %s]", m.Snapshot.Metadata.Index, types.ID(m.From), msgSize, dbSize)
|
||||
plog.Infof("successfully received and saved database snapshot [index: %d, from: %s, raft message size: %s, db size: %s, took: %s]", m.Snapshot.Metadata.Index, types.ID(m.From), msgSize, dbSize, downloadTook.String())
|
||||
}
|
||||
|
||||
if err := h.r.Process(context.TODO(), m); err != nil {
|
||||
|
@@ -218,7 +218,7 @@ func (d *discovery) createSelf(contents string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
|
||||
func (d *discovery) checkCluster() ([]*client.Node, uint64, uint64, error) {
|
||||
configKey := path.Join("/", d.cluster, "_config")
|
||||
ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
|
||||
// find cluster size
|
||||
@@ -247,7 +247,7 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
|
||||
}
|
||||
return nil, 0, 0, err
|
||||
}
|
||||
size, err := strconv.Atoi(resp.Node.Value)
|
||||
size, err := strconv.ParseUint(resp.Node.Value, 10, 0)
|
||||
if err != nil {
|
||||
return nil, 0, 0, ErrBadSizeKey
|
||||
}
|
||||
@@ -288,7 +288,7 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
|
||||
if path.Base(nodes[i].Key) == path.Base(d.selfKey()) {
|
||||
break
|
||||
}
|
||||
if i >= size-1 {
|
||||
if uint64(i) >= size-1 {
|
||||
return nodes[:size], size, resp.Index, ErrFullCluster
|
||||
}
|
||||
}
|
||||
@@ -316,7 +316,7 @@ func (d *discovery) logAndBackoffForRetry(step string) {
|
||||
d.clock.Sleep(retryTimeInSecond)
|
||||
}
|
||||
|
||||
func (d *discovery) checkClusterRetry() ([]*client.Node, int, uint64, error) {
|
||||
func (d *discovery) checkClusterRetry() ([]*client.Node, uint64, uint64, error) {
|
||||
if d.retries < nRetries {
|
||||
d.logAndBackoffForRetry("cluster status check")
|
||||
return d.checkCluster()
|
||||
@@ -336,8 +336,8 @@ func (d *discovery) waitNodesRetry() ([]*client.Node, error) {
|
||||
return nil, ErrTooManyRetries
|
||||
}
|
||||
|
||||
func (d *discovery) waitNodes(nodes []*client.Node, size int, index uint64) ([]*client.Node, error) {
|
||||
if len(nodes) > size {
|
||||
func (d *discovery) waitNodes(nodes []*client.Node, size uint64, index uint64) ([]*client.Node, error) {
|
||||
if uint64(len(nodes)) > size {
|
||||
nodes = nodes[:size]
|
||||
}
|
||||
// watch from the next index
|
||||
@@ -369,16 +369,16 @@ func (d *discovery) waitNodes(nodes []*client.Node, size int, index uint64) ([]*
|
||||
}
|
||||
|
||||
// wait for others
|
||||
for len(all) < size {
|
||||
for uint64(len(all)) < size {
|
||||
if d.lg != nil {
|
||||
d.lg.Info(
|
||||
"found peers from discovery server; waiting for more",
|
||||
zap.String("discovery-url", d.url.String()),
|
||||
zap.Int("found-peers", len(all)),
|
||||
zap.Int("needed-peers", size-len(all)),
|
||||
zap.Int("needed-peers", int(size-uint64(len(all)))),
|
||||
)
|
||||
} else {
|
||||
plog.Noticef("found %d peer(s), waiting for %d more", len(all), size-len(all))
|
||||
plog.Noticef("found %d peer(s), waiting for %d more", len(all), size-uint64(len(all)))
|
||||
}
|
||||
resp, err := w.Next(context.Background())
|
||||
if err != nil {
|
||||
@@ -415,7 +415,7 @@ func (d *discovery) selfKey() string {
|
||||
return path.Join("/", d.cluster, d.id.String())
|
||||
}
|
||||
|
||||
func nodesToCluster(ns []*client.Node, size int) (string, error) {
|
||||
func nodesToCluster(ns []*client.Node, size uint64) (string, error) {
|
||||
s := make([]string, len(ns))
|
||||
for i, n := range ns {
|
||||
s[i] = n.Value
|
||||
@@ -425,7 +425,7 @@ func nodesToCluster(ns []*client.Node, size int) (string, error) {
|
||||
if err != nil {
|
||||
return us, ErrInvalidURL
|
||||
}
|
||||
if m.Len() != size {
|
||||
if uint64(m.Len()) != size {
|
||||
return us, ErrDuplicateName
|
||||
}
|
||||
return us, nil
|
||||
|
@@ -217,7 +217,7 @@ func TestCheckCluster(t *testing.T) {
|
||||
if reflect.DeepEqual(ns, tt.nodes) {
|
||||
t.Errorf("#%d: nodes = %v, want %v", i, ns, tt.nodes)
|
||||
}
|
||||
if size != tt.wsize {
|
||||
if size != uint64(tt.wsize) {
|
||||
t.Errorf("#%d: size = %v, want %d", i, size, tt.wsize)
|
||||
}
|
||||
if index != tt.index {
|
||||
@@ -301,7 +301,7 @@ func TestWaitNodes(t *testing.T) {
|
||||
fc.Advance(time.Second * (0x1 << i))
|
||||
}
|
||||
}()
|
||||
g, err := d.waitNodes(tt.nodes, 3, 0) // we do not care about index in this test
|
||||
g, err := d.waitNodes(tt.nodes, uint64(3), 0) // we do not care about index in this test
|
||||
if err != nil {
|
||||
t.Errorf("#%d: err = %v, want %v", i, err, nil)
|
||||
}
|
||||
@@ -348,7 +348,7 @@ func TestCreateSelf(t *testing.T) {
|
||||
func TestNodesToCluster(t *testing.T) {
|
||||
tests := []struct {
|
||||
nodes []*client.Node
|
||||
size int
|
||||
size uint64
|
||||
wcluster string
|
||||
werr error
|
||||
}{
|
||||
|
@@ -31,6 +31,8 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const minWatchProgressInterval = 100 * time.Millisecond
|
||||
|
||||
type watchServer struct {
|
||||
lg *zap.Logger
|
||||
|
||||
@@ -46,7 +48,7 @@ type watchServer struct {
|
||||
|
||||
// NewWatchServer returns a new watch server.
|
||||
func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
|
||||
return &watchServer{
|
||||
srv := &watchServer{
|
||||
lg: s.Cfg.Logger,
|
||||
|
||||
clusterID: int64(s.Cluster().ID()),
|
||||
@@ -58,6 +60,21 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
|
||||
watchable: s.Watchable(),
|
||||
ag: s,
|
||||
}
|
||||
if s.Cfg.WatchProgressNotifyInterval > 0 {
|
||||
if s.Cfg.WatchProgressNotifyInterval < minWatchProgressInterval {
|
||||
if srv.lg != nil {
|
||||
srv.lg.Warn(
|
||||
"adjusting watch progress notify interval to minimum period",
|
||||
zap.Duration("min-watch-progress-notify-interval", minWatchProgressInterval),
|
||||
)
|
||||
} else {
|
||||
plog.Warningf("adjusting watch progress notify interval to minimum period %v", minWatchProgressInterval)
|
||||
}
|
||||
s.Cfg.WatchProgressNotifyInterval = minWatchProgressInterval
|
||||
}
|
||||
SetProgressReportInterval(s.Cfg.WatchProgressNotifyInterval)
|
||||
}
|
||||
return srv
|
||||
}
|
||||
|
||||
var (
|
||||
|
@@ -16,6 +16,7 @@ package etcdserver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
@@ -114,7 +115,11 @@ func (a *applierV2store) Sync(r *RequestV2) Response {
|
||||
// applyV2Request interprets r as a call to v2store.X
|
||||
// and returns a Response interpreted from v2store.Event
|
||||
func (s *EtcdServer) applyV2Request(r *RequestV2) Response {
|
||||
defer warnOfExpensiveRequest(s.getLogger(), time.Now(), r, nil, nil)
|
||||
stringer := panicAlternativeStringer{
|
||||
stringer: r,
|
||||
alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
|
||||
}
|
||||
defer warnOfExpensiveRequest(s.getLogger(), time.Now(), stringer, nil, nil)
|
||||
|
||||
switch r.Method {
|
||||
case "POST":
|
||||
|
@@ -31,6 +31,7 @@ import (
|
||||
func newBackend(cfg ServerConfig) backend.Backend {
|
||||
bcfg := backend.DefaultBackendConfig()
|
||||
bcfg.Path = cfg.backendPath()
|
||||
bcfg.UnsafeNoFsync = cfg.UnsafeNoFsync
|
||||
if cfg.BackendBatchLimit != 0 {
|
||||
bcfg.BatchLimit = cfg.BackendBatchLimit
|
||||
if cfg.Logger != nil {
|
||||
|
@@ -126,6 +126,7 @@ type ServerConfig struct {
|
||||
|
||||
AuthToken string
|
||||
BcryptCost uint
|
||||
TokenTTL uint
|
||||
|
||||
// InitialCorruptCheck is true to check data corruption on boot
|
||||
// before serving any peer/client traffic.
|
||||
@@ -157,6 +158,12 @@ type ServerConfig struct {
|
||||
LeaseCheckpointInterval time.Duration
|
||||
|
||||
EnableGRPCGateway bool
|
||||
|
||||
WatchProgressNotifyInterval time.Duration
|
||||
|
||||
// UnsafeNoFsync disables all uses of fsync.
|
||||
// Setting this is unsafe and will cause data loss.
|
||||
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
|
||||
}
|
||||
|
||||
// VerifyBootstrap sanity-checks the initial config for bootstrap case
|
||||
|
@@ -137,7 +137,7 @@ type loggableValueCompare struct {
|
||||
Result Compare_CompareResult `protobuf:"varint,1,opt,name=result,proto3,enum=etcdserverpb.Compare_CompareResult"`
|
||||
Target Compare_CompareTarget `protobuf:"varint,2,opt,name=target,proto3,enum=etcdserverpb.Compare_CompareTarget"`
|
||||
Key []byte `protobuf:"bytes,3,opt,name=key,proto3"`
|
||||
ValueSize int `protobuf:"bytes,7,opt,name=value_size,proto3"`
|
||||
ValueSize int64 `protobuf:"varint,7,opt,name=value_size,proto3"`
|
||||
RangeEnd []byte `protobuf:"bytes,64,opt,name=range_end,proto3"`
|
||||
}
|
||||
|
||||
@@ -146,7 +146,7 @@ func newLoggableValueCompare(c *Compare, cv *Compare_Value) *loggableValueCompar
|
||||
c.Result,
|
||||
c.Target,
|
||||
c.Key,
|
||||
len(cv.Value),
|
||||
int64(len(cv.Value)),
|
||||
c.RangeEnd,
|
||||
}
|
||||
}
|
||||
@@ -160,7 +160,7 @@ func (*loggableValueCompare) ProtoMessage() {}
|
||||
// To preserve proto encoding of the key bytes, a faked out proto type is used here.
|
||||
type loggablePutRequest struct {
|
||||
Key []byte `protobuf:"bytes,1,opt,name=key,proto3"`
|
||||
ValueSize int `protobuf:"varint,2,opt,name=value_size,proto3"`
|
||||
ValueSize int64 `protobuf:"varint,2,opt,name=value_size,proto3"`
|
||||
Lease int64 `protobuf:"varint,3,opt,name=lease,proto3"`
|
||||
PrevKv bool `protobuf:"varint,4,opt,name=prev_kv,proto3"`
|
||||
IgnoreValue bool `protobuf:"varint,5,opt,name=ignore_value,proto3"`
|
||||
@@ -170,7 +170,7 @@ type loggablePutRequest struct {
|
||||
func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest {
|
||||
return &loggablePutRequest{
|
||||
request.Key,
|
||||
len(request.Value),
|
||||
int64(len(request.Value)),
|
||||
request.Lease,
|
||||
request.PrevKv,
|
||||
request.IgnoreValue,
|
||||
|
@@ -151,6 +151,19 @@ var (
|
||||
Help: "Server or member ID in hexadecimal format. 1 for 'server_id' label with current ID.",
|
||||
},
|
||||
[]string{"server_id"})
|
||||
|
||||
fdUsed = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "os",
|
||||
Subsystem: "fd",
|
||||
Name: "used",
|
||||
Help: "The number of used file descriptors.",
|
||||
})
|
||||
fdLimit = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "os",
|
||||
Subsystem: "fd",
|
||||
Name: "limit",
|
||||
Help: "The file descriptor limit.",
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -174,6 +187,8 @@ func init() {
|
||||
prometheus.MustRegister(isLearner)
|
||||
prometheus.MustRegister(learnerPromoteSucceed)
|
||||
prometheus.MustRegister(learnerPromoteFailed)
|
||||
prometheus.MustRegister(fdUsed)
|
||||
prometheus.MustRegister(fdLimit)
|
||||
|
||||
currentVersion.With(prometheus.Labels{
|
||||
"server_version": version.Version,
|
||||
@@ -184,7 +199,12 @@ func init() {
|
||||
}
|
||||
|
||||
func monitorFileDescriptor(lg *zap.Logger, done <-chan struct{}) {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
// This ticker will check File Descriptor Requirements ,and count all fds in used.
|
||||
// And recorded some logs when in used >= limit/5*4. Just recorded message.
|
||||
// If fds was more than 10K,It's low performance due to FDUsage() works.
|
||||
// So need to increase it.
|
||||
// See https://github.com/etcd-io/etcd/issues/11969 for more detail.
|
||||
ticker := time.NewTicker(10 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
used, err := runtime.FDUsage()
|
||||
@@ -196,6 +216,7 @@ func monitorFileDescriptor(lg *zap.Logger, done <-chan struct{}) {
|
||||
}
|
||||
return
|
||||
}
|
||||
fdUsed.Set(float64(used))
|
||||
limit, err := runtime.FDLimit()
|
||||
if err != nil {
|
||||
if lg != nil {
|
||||
@@ -205,6 +226,7 @@ func monitorFileDescriptor(lg *zap.Logger, done <-chan struct{}) {
|
||||
}
|
||||
return
|
||||
}
|
||||
fdLimit.Set(float64(limit))
|
||||
if used >= limit/5*4 {
|
||||
if lg != nil {
|
||||
lg.Warn("80% of file descriptors are used", zap.Uint64("used", used), zap.Uint64("limit", limit))
|
||||
|
@@ -465,6 +465,9 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
|
||||
plog.Panicf("create wal error: %v", err)
|
||||
}
|
||||
}
|
||||
if cfg.UnsafeNoFsync {
|
||||
w.SetUnsafeNoFsync()
|
||||
}
|
||||
peers := make([]raft.Peer, len(ids))
|
||||
for i, id := range ids {
|
||||
var ctx []byte
|
||||
@@ -527,7 +530,7 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member
|
||||
if snapshot != nil {
|
||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
}
|
||||
w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap)
|
||||
w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync)
|
||||
|
||||
if cfg.Logger != nil {
|
||||
cfg.Logger.Info(
|
||||
@@ -582,7 +585,7 @@ func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types
|
||||
if snapshot != nil {
|
||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
}
|
||||
w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap)
|
||||
w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync)
|
||||
|
||||
// discard the previously uncommitted entries
|
||||
for i, ent := range ents {
|
||||
|
@@ -553,6 +553,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
||||
func(index uint64) <-chan struct{} {
|
||||
return srv.applyWait.Wait(index)
|
||||
},
|
||||
time.Duration(cfg.TokenTTL)*time.Second,
|
||||
)
|
||||
if err != nil {
|
||||
if cfg.Logger != nil {
|
||||
|
@@ -82,7 +82,7 @@ func (st *storage) Release(snap raftpb.Snapshot) error {
|
||||
// readWAL reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
|
||||
// after the position of the given snap in the WAL.
|
||||
// The snap must have been previously saved to the WAL, or this call will panic.
|
||||
func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
|
||||
func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync bool) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
|
||||
var (
|
||||
err error
|
||||
wmetadata []byte
|
||||
@@ -97,6 +97,9 @@ func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id
|
||||
plog.Fatalf("open wal error: %v", err)
|
||||
}
|
||||
}
|
||||
if unsafeNoFsync {
|
||||
w.SetUnsafeNoFsync()
|
||||
}
|
||||
if wmetadata, st, ents, err = w.ReadAll(); err != nil {
|
||||
w.Close()
|
||||
// we can only repair ErrUnexpectedEOF and we never repair twice.
|
||||
|
@@ -185,3 +185,21 @@ func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fm
|
||||
func isNil(msg proto.Message) bool {
|
||||
return msg == nil || reflect.ValueOf(msg).IsNil()
|
||||
}
|
||||
|
||||
// panicAlternativeStringer wraps a fmt.Stringer, and if calling String() panics, calls the alternative instead.
|
||||
// This is needed to ensure logging slow v2 requests does not panic, which occurs when running integration tests
|
||||
// with the embedded server with github.com/golang/protobuf v1.4.0+. See https://github.com/etcd-io/etcd/issues/12197.
|
||||
type panicAlternativeStringer struct {
|
||||
stringer fmt.Stringer
|
||||
alternative func() string
|
||||
}
|
||||
|
||||
func (n panicAlternativeStringer) String() (s string) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
s = n.alternative()
|
||||
}
|
||||
}()
|
||||
s = n.stringer.String()
|
||||
return s
|
||||
}
|
||||
|
@@ -90,3 +90,23 @@ func (s *nopTransporterWithActiveTime) Stop() {}
|
||||
func (s *nopTransporterWithActiveTime) Pause() {}
|
||||
func (s *nopTransporterWithActiveTime) Resume() {}
|
||||
func (s *nopTransporterWithActiveTime) reset(am map[types.ID]time.Time) { s.activeMap = am }
|
||||
|
||||
func TestPanicAlternativeStringer(t *testing.T) {
|
||||
p := panicAlternativeStringer{alternative: func() string { return "alternative" }}
|
||||
|
||||
p.stringer = testStringerFunc(func() string { panic("here") })
|
||||
if s := p.String(); s != "alternative" {
|
||||
t.Fatalf("expected 'alternative', got %q", s)
|
||||
}
|
||||
|
||||
p.stringer = testStringerFunc(func() string { return "test" })
|
||||
if s := p.String(); s != "test" {
|
||||
t.Fatalf("expected 'test', got %q", s)
|
||||
}
|
||||
}
|
||||
|
||||
type testStringerFunc func() string
|
||||
|
||||
func (s testStringerFunc) String() string {
|
||||
return s()
|
||||
}
|
||||
|
@@ -428,9 +428,10 @@ func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// internalReq doesn't need to have Password because the above s.AuthStore().CheckPassword() already did it.
|
||||
// In addition, it will let a WAL entry not record password as a plain text.
|
||||
internalReq := &pb.InternalAuthenticateRequest{
|
||||
Name: r.Name,
|
||||
Password: r.Password,
|
||||
SimpleToken: st,
|
||||
}
|
||||
|
||||
|
2
go.mod
2
go.mod
@@ -1,5 +1,7 @@
|
||||
module go.etcd.io/etcd
|
||||
|
||||
go 1.14
|
||||
|
||||
require (
|
||||
github.com/bgentry/speakeasy v0.1.0
|
||||
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa
|
||||
|
@@ -1211,3 +1211,35 @@ func TestV3WatchWithPrevKV(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3WatchCancellation ensures that watch cancellation frees up server resources.
|
||||
func TestV3WatchCancellation(t *testing.T) {
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
cli := clus.RandClient()
|
||||
|
||||
// increment watcher total count and keep a stream open
|
||||
cli.Watch(ctx, "/foo")
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
cli.Watch(ctx, "/foo")
|
||||
cancel()
|
||||
}
|
||||
|
||||
// Wait a little for cancellations to take hold
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
minWatches, err := clus.Members[0].Metric("etcd_debugging_mvcc_watcher_total")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if minWatches != "1" {
|
||||
t.Fatalf("expected one watch, got %s", minWatches)
|
||||
}
|
||||
}
|
||||
|
@@ -123,6 +123,8 @@ type BackendConfig struct {
|
||||
MmapSize uint64
|
||||
// Logger logs backend-side operations.
|
||||
Logger *zap.Logger
|
||||
// UnsafeNoFsync disables all uses of fsync.
|
||||
UnsafeNoFsync bool `json:"unsafe-no-fsync"`
|
||||
}
|
||||
|
||||
func DefaultBackendConfig() BackendConfig {
|
||||
@@ -150,6 +152,8 @@ func newBackend(bcfg BackendConfig) *backend {
|
||||
}
|
||||
bopts.InitialMmapSize = bcfg.mmapSize()
|
||||
bopts.FreelistType = bcfg.BackendFreelistType
|
||||
bopts.NoSync = bcfg.UnsafeNoFsync
|
||||
bopts.NoGrowSync = bcfg.UnsafeNoFsync
|
||||
|
||||
db, err := bolt.Open(bcfg.Path, 0600, bopts)
|
||||
if err != nil {
|
||||
|
@@ -30,9 +30,8 @@ import (
|
||||
var (
|
||||
// chanBufLen is the length of the buffered chan
|
||||
// for sending out watched events.
|
||||
// TODO: find a good buf value. 1024 is just a random one that
|
||||
// seems to be reasonable.
|
||||
chanBufLen = 1024
|
||||
// See https://github.com/etcd-io/etcd/issues/11906 for more detail.
|
||||
chanBufLen = 128
|
||||
|
||||
// maxWatchersPerSync is the number of watchers to sync in a single batch
|
||||
maxWatchersPerSync = 512
|
||||
|
@@ -18,5 +18,10 @@ package fileutil
|
||||
|
||||
import "os"
|
||||
|
||||
const (
|
||||
// PrivateDirMode grants owner to make/remove files inside the directory.
|
||||
PrivateDirMode = 0700
|
||||
)
|
||||
|
||||
// OpenDir opens a directory for syncing.
|
||||
func OpenDir(path string) (*os.File, error) { return os.Open(path) }
|
||||
|
@@ -21,6 +21,11 @@ import (
|
||||
"syscall"
|
||||
)
|
||||
|
||||
const (
|
||||
// PrivateDirMode grants owner to make/remove files inside the directory.
|
||||
PrivateDirMode = 0777
|
||||
)
|
||||
|
||||
// OpenDir opens a directory in windows with write access for syncing.
|
||||
func OpenDir(path string) (*os.File, error) {
|
||||
fd, err := openDir(path)
|
||||
|
@@ -27,8 +27,6 @@ import (
|
||||
const (
|
||||
// PrivateFileMode grants owner to read/write a file.
|
||||
PrivateFileMode = 0600
|
||||
// PrivateDirMode grants owner to make/remove files inside the directory.
|
||||
PrivateDirMode = 0700
|
||||
)
|
||||
|
||||
var plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "pkg/fileutil")
|
||||
@@ -46,14 +44,22 @@ func IsDirWriteable(dir string) error {
|
||||
// TouchDirAll is similar to os.MkdirAll. It creates directories with 0700 permission if any directory
|
||||
// does not exists. TouchDirAll also ensures the given directory is writable.
|
||||
func TouchDirAll(dir string) error {
|
||||
// If path is already a directory, MkdirAll does nothing
|
||||
// and returns nil.
|
||||
err := os.MkdirAll(dir, PrivateDirMode)
|
||||
if err != nil {
|
||||
// if mkdirAll("a/text") and "text" is not
|
||||
// a directory, this will return syscall.ENOTDIR
|
||||
return err
|
||||
// If path is already a directory, MkdirAll does nothing and returns nil, so,
|
||||
// first check if dir exist with an expected permission mode.
|
||||
if Exist(dir) {
|
||||
err := CheckDirPermission(dir, PrivateDirMode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
err := os.MkdirAll(dir, PrivateDirMode)
|
||||
if err != nil {
|
||||
// if mkdirAll("a/text") and "text" is not
|
||||
// a directory, this will return syscall.ENOTDIR
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return IsDirWriteable(dir)
|
||||
}
|
||||
|
||||
@@ -102,3 +108,22 @@ func ZeroToEnd(f *os.File) error {
|
||||
_, err = f.Seek(off, io.SeekStart)
|
||||
return err
|
||||
}
|
||||
|
||||
// CheckDirPermission checks permission on an existing dir.
|
||||
// Returns error if dir is empty or exist with a different permission than specified.
|
||||
func CheckDirPermission(dir string, perm os.FileMode) error {
|
||||
if !Exist(dir) {
|
||||
return fmt.Errorf("directory %q empty, cannot check permission.", dir)
|
||||
}
|
||||
//check the existing permission on the directory
|
||||
dirInfo, err := os.Stat(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dirMode := dirInfo.Mode().Perm()
|
||||
if dirMode != perm {
|
||||
err = fmt.Errorf("directory %q,%q exist without desired file permission %q.", dir, dirInfo.Mode(), os.FileMode(PrivateDirMode))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@@ -148,3 +148,21 @@ func TestZeroToEnd(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDirPermission(t *testing.T) {
|
||||
tmpdir, err := ioutil.TempDir(os.TempDir(), "foo")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
tmpdir2 := filepath.Join(tmpdir, "testpermission")
|
||||
// create a new dir with 0700
|
||||
if err = CreateDirAll(tmpdir2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// check dir permission with mode different than created dir
|
||||
if err = CheckDirPermission(tmpdir2, 0600); err == nil {
|
||||
t.Errorf("expected error, got nil")
|
||||
}
|
||||
}
|
||||
|
@@ -16,7 +16,7 @@
|
||||
package runtime
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
@@ -29,9 +29,20 @@ func FDLimit() (uint64, error) {
|
||||
}
|
||||
|
||||
func FDUsage() (uint64, error) {
|
||||
fds, err := ioutil.ReadDir("/proc/self/fd")
|
||||
return countFiles("/proc/self/fd")
|
||||
}
|
||||
|
||||
// countFiles reads the directory named by dirname and returns the count.
|
||||
// This is same as stdlib "io/ioutil.ReadDir" but without sorting.
|
||||
func countFiles(dirname string) (uint64, error) {
|
||||
f, err := os.Open(dirname)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return uint64(len(fds)), nil
|
||||
list, err := f.Readdir(-1)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return uint64(len(list)), nil
|
||||
}
|
||||
|
@@ -31,6 +31,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/pkg/fileutil"
|
||||
"go.etcd.io/etcd/pkg/tlsutil"
|
||||
|
||||
"go.uber.org/zap"
|
||||
@@ -114,10 +115,17 @@ func (info TLSInfo) Empty() bool {
|
||||
}
|
||||
|
||||
func SelfCert(lg *zap.Logger, dirpath string, hosts []string, additionalUsages ...x509.ExtKeyUsage) (info TLSInfo, err error) {
|
||||
if err = os.MkdirAll(dirpath, 0700); err != nil {
|
||||
info.Logger = lg
|
||||
err = fileutil.TouchDirAll(dirpath)
|
||||
if err != nil {
|
||||
if info.Logger != nil {
|
||||
info.Logger.Warn(
|
||||
"cannot create cert directory",
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
return
|
||||
}
|
||||
info.Logger = lg
|
||||
|
||||
certPath := filepath.Join(dirpath, "cert.pem")
|
||||
keyPath := filepath.Join(dirpath, "key.pem")
|
||||
|
@@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.4.8"
|
||||
Version = "3.4.12"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
@@ -59,6 +59,11 @@ func (d *decoder) decode(rec *walpb.Record) error {
|
||||
return d.decodeRecord(rec)
|
||||
}
|
||||
|
||||
// raft max message size is set to 1 MB in etcd server
|
||||
// assume projects set reasonable message size limit,
|
||||
// thus entry size should never exceed 10 MB
|
||||
const maxWALEntrySizeLimit = int64(10 * 1024 * 1024)
|
||||
|
||||
func (d *decoder) decodeRecord(rec *walpb.Record) error {
|
||||
if len(d.brs) == 0 {
|
||||
return io.EOF
|
||||
@@ -79,6 +84,9 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
|
||||
}
|
||||
|
||||
recBytes, padBytes := decodeFrameSize(l)
|
||||
if recBytes >= maxWALEntrySizeLimit-padBytes {
|
||||
return ErrMaxWALEntrySizeLimitExceeded
|
||||
}
|
||||
|
||||
data := make([]byte, recBytes+padBytes)
|
||||
if _, err = io.ReadFull(d.brs[0], data); err != nil {
|
||||
|
48
wal/wal.go
48
wal/wal.go
@@ -56,12 +56,15 @@ var (
|
||||
|
||||
plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "wal")
|
||||
|
||||
ErrMetadataConflict = errors.New("wal: conflicting metadata found")
|
||||
ErrFileNotFound = errors.New("wal: file not found")
|
||||
ErrCRCMismatch = errors.New("wal: crc mismatch")
|
||||
ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
|
||||
ErrSnapshotNotFound = errors.New("wal: snapshot not found")
|
||||
crcTable = crc32.MakeTable(crc32.Castagnoli)
|
||||
ErrMetadataConflict = errors.New("wal: conflicting metadata found")
|
||||
ErrFileNotFound = errors.New("wal: file not found")
|
||||
ErrCRCMismatch = errors.New("wal: crc mismatch")
|
||||
ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
|
||||
ErrSnapshotNotFound = errors.New("wal: snapshot not found")
|
||||
ErrSliceOutOfRange = errors.New("wal: slice bounds out of range")
|
||||
ErrMaxWALEntrySizeLimitExceeded = errors.New("wal: max entry size limit exceeded")
|
||||
ErrDecoderNotFound = errors.New("wal: decoder not found")
|
||||
crcTable = crc32.MakeTable(crc32.Castagnoli)
|
||||
)
|
||||
|
||||
// WAL is a logical representation of the stable storage.
|
||||
@@ -84,6 +87,8 @@ type WAL struct {
|
||||
decoder *decoder // decoder to decode records
|
||||
readClose func() error // closer for decode reader
|
||||
|
||||
unsafeNoSync bool // if set, do not fsync
|
||||
|
||||
mu sync.Mutex
|
||||
enti uint64 // index of the last entry saved to the wal
|
||||
encoder *encoder // encoder to encode records
|
||||
@@ -93,7 +98,8 @@ type WAL struct {
|
||||
}
|
||||
|
||||
// Create creates a WAL ready for appending records. The given metadata is
|
||||
// recorded at the head of each WAL file, and can be retrieved with ReadAll.
|
||||
// recorded at the head of each WAL file, and can be retrieved with ReadAll
|
||||
// after the file is Open.
|
||||
func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
|
||||
if Exist(dirpath) {
|
||||
return nil, os.ErrExist
|
||||
@@ -233,6 +239,10 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
|
||||
return w, nil
|
||||
}
|
||||
|
||||
func (w *WAL) SetUnsafeNoFsync() {
|
||||
w.unsafeNoSync = true
|
||||
}
|
||||
|
||||
func (w *WAL) cleanupWAL(lg *zap.Logger) {
|
||||
var err error
|
||||
if err = w.Close(); err != nil {
|
||||
@@ -428,6 +438,10 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
||||
defer w.mu.Unlock()
|
||||
|
||||
rec := &walpb.Record{}
|
||||
|
||||
if w.decoder == nil {
|
||||
return nil, state, nil, ErrDecoderNotFound
|
||||
}
|
||||
decoder := w.decoder
|
||||
|
||||
var match bool
|
||||
@@ -435,8 +449,15 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
||||
switch rec.Type {
|
||||
case entryType:
|
||||
e := mustUnmarshalEntry(rec.Data)
|
||||
// 0 <= e.Index-w.start.Index - 1 < len(ents)
|
||||
if e.Index > w.start.Index {
|
||||
ents = append(ents[:e.Index-w.start.Index-1], e)
|
||||
// prevent "panic: runtime error: slice bounds out of range [:13038096702221461992] with capacity 0"
|
||||
up := e.Index - w.start.Index - 1
|
||||
if up > uint64(len(ents)) {
|
||||
// return error before append call causes runtime panic
|
||||
return nil, state, nil, ErrSliceOutOfRange
|
||||
}
|
||||
ents = append(ents[:up], e)
|
||||
}
|
||||
w.enti = e.Index
|
||||
|
||||
@@ -568,6 +589,14 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro
|
||||
snaps = append(snaps, loadedSnap)
|
||||
case stateType:
|
||||
state = mustUnmarshalState(rec.Data)
|
||||
case crcType:
|
||||
crc := decoder.crc.Sum32()
|
||||
// current crc of decoder must match the crc of the record.
|
||||
// do no need to match 0 crc, since the decoder is a new one at this case.
|
||||
if crc != 0 && rec.Validate(crc) != nil {
|
||||
return nil, ErrCRCMismatch
|
||||
}
|
||||
decoder.updateCRC(rec.Crc)
|
||||
}
|
||||
}
|
||||
// We do not have to read out all the WAL entries
|
||||
@@ -760,6 +789,9 @@ func (w *WAL) cut() error {
|
||||
}
|
||||
|
||||
func (w *WAL) sync() error {
|
||||
if w.unsafeNoSync {
|
||||
return nil
|
||||
}
|
||||
if w.encoder != nil {
|
||||
if err := w.encoder.flush(); err != nil {
|
||||
return err
|
||||
|
106
wal/wal_test.go
106
wal/wal_test.go
@@ -645,6 +645,35 @@ func TestOpenForRead(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenWithMaxIndex(t *testing.T) {
|
||||
p, err := ioutil.TempDir(os.TempDir(), "waltest")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(p)
|
||||
// create WAL
|
||||
w, err := Create(zap.NewExample(), p, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
es := []raftpb.Entry{{Index: uint64(math.MaxInt64)}}
|
||||
if err = w.Save(raftpb.HardState{}, es); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w.Close()
|
||||
|
||||
w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, _, _, err = w.ReadAll()
|
||||
if err == nil || err != ErrSliceOutOfRange {
|
||||
t.Fatalf("err = %v, want ErrSliceOutOfRange", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEmpty(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
var est raftpb.HardState
|
||||
@@ -1003,3 +1032,80 @@ func TestValidSnapshotEntries(t *testing.T) {
|
||||
t.Errorf("expected walSnaps %+v, got %+v", expected, walSnaps)
|
||||
}
|
||||
}
|
||||
|
||||
// TestValidSnapshotEntriesAfterPurgeWal ensure that there are many wal files, and after cleaning the first wal file,
|
||||
// it can work well.
|
||||
func TestValidSnapshotEntriesAfterPurgeWal(t *testing.T) {
|
||||
oldSegmentSizeBytes := SegmentSizeBytes
|
||||
SegmentSizeBytes = 64
|
||||
defer func() {
|
||||
SegmentSizeBytes = oldSegmentSizeBytes
|
||||
}()
|
||||
p, err := ioutil.TempDir(os.TempDir(), "waltest")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(p)
|
||||
snap0 := walpb.Snapshot{Index: 0, Term: 0}
|
||||
snap1 := walpb.Snapshot{Index: 1, Term: 1}
|
||||
state1 := raftpb.HardState{Commit: 1, Term: 1}
|
||||
snap2 := walpb.Snapshot{Index: 2, Term: 1}
|
||||
snap3 := walpb.Snapshot{Index: 3, Term: 2}
|
||||
state2 := raftpb.HardState{Commit: 3, Term: 2}
|
||||
func() {
|
||||
w, werr := Create(zap.NewExample(), p, nil)
|
||||
if werr != nil {
|
||||
t.Fatal(werr)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
// snap0 is implicitly created at index 0, term 0
|
||||
if err = w.SaveSnapshot(snap1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.Save(state1, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.SaveSnapshot(snap2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.SaveSnapshot(snap3); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i := 0; i < 128; i++ {
|
||||
if err = w.Save(state2, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
files, _, ferr := selectWALFiles(nil, p, snap0)
|
||||
if ferr != nil {
|
||||
t.Fatal(ferr)
|
||||
}
|
||||
os.Remove(p + "/" + files[0])
|
||||
_, err = ValidSnapshotEntries(zap.NewExample(), p)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReadAllFail ensure ReadAll error if used without opening the WAL
|
||||
func TestReadAllFail(t *testing.T) {
|
||||
dir, err := ioutil.TempDir(os.TempDir(), "waltest")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
// create initial WAL
|
||||
f, err := Create(zap.NewExample(), dir, []byte("metadata"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
f.Close()
|
||||
// try to read without opening the WAL
|
||||
_, _, _, err = f.ReadAll()
|
||||
if err == nil || err != ErrDecoderNotFound {
|
||||
t.Fatalf("err = %v, want ErrDecoderNotFound", err)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user