Merge branch 'main' into main

dependabot/go_modules/go.uber.org/atomic-1.10.0
Allen Ray 2022-08-29 12:07:27 -04:00 committed by GitHub
commit c52108942b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
69 changed files with 1223 additions and 498 deletions

View File

@ -6,6 +6,9 @@ Previous change logs can be found at [CHANGELOG-3.4](https://github.com/etcd-io/
## v3.5.5 (TBD)
### Deprecations
- Deprecated [SetKeepAlive and SetKeepAlivePeriod in limitListenerConn](https://github.com/etcd-io/etcd/pull/14366).
### Package `clientv3`
- Fix [do not overwrite authTokenBundle on dial](https://github.com/etcd-io/etcd/pull/14132).

View File

@ -17,6 +17,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
### Deprecations
- Deprecated [V2 discovery](https://etcd.io/docs/v3.5/dev-internal/discovery_protocol/).
- Deprecated [SetKeepAlive and SetKeepAlivePeriod in limitListenerConn](https://github.com/etcd-io/etcd/pull/14356).
- Removed [etcdctl defrag --data-dir](https://github.com/etcd-io/etcd/pull/13793).
- Removed [etcdctl snapshot status](https://github.com/etcd-io/etcd/pull/13809).
- Removed [etcdctl snapshot restore](https://github.com/etcd-io/etcd/pull/13809).
@ -57,6 +58,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
- Add [v3 discovery](https://github.com/etcd-io/etcd/pull/13635) to bootstrap a new etcd cluster.
- Add [field `storage`](https://github.com/etcd-io/etcd/pull/13772) into the response body of endpoint `/version`.
- Add [`etcd --max-concurrent-streams`](https://github.com/etcd-io/etcd/pull/14169) flag to configure the max concurrent streams each client can open at a time, and defaults to math.MaxUint32.
- Add [`etcd grpc-proxy --experimental-enable-grpc-logging`](https://github.com/etcd-io/etcd/pull/14266) flag to logging all grpc requests and responses.
- Fix [non mutating requests pass through quotaKVServer when NOSPACE](https://github.com/etcd-io/etcd/pull/13435)
- Fix [exclude the same alarm type activated by multiple peers](https://github.com/etcd-io/etcd/pull/13467).
- Fix [Provide a better liveness probe for when etcd runs as a Kubernetes pod](https://github.com/etcd-io/etcd/pull/13399)
@ -76,8 +78,9 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
- Fix [Restrict the max size of each WAL entry to the remaining size of the WAL file](https://github.com/etcd-io/etcd/pull/14122).
- Fix [memberID equals zero in corruption alarm](https://github.com/etcd-io/etcd/pull/14272)
### gRPC Proxy
### etcd grpc-proxy
- Add [`etcd grpc-proxy start --endpoints-auto-sync-interval`](https://github.com/etcd-io/etcd/pull/14354) flag to enable and configure interval of auto sync of endpoints with server.
- Add [listen-cipher-suites](https://github.com/etcd-io/etcd/pull/14308) flag.
### tools/benchmark

View File

@ -5,7 +5,6 @@ etcd is Apache 2.0 licensed and accepts contributions via GitHub pull requests.
# Email and chat
- Email: [etcd-dev](https://groups.google.com/forum/?hl=en#!forum/etcd-dev)
- IRC: #[etcd](irc://irc.freenode.org:6667/#etcd) IRC channel on freenode.org
- Slack: [#etcd](https://kubernetes.slack.com/messages/C3HD8ARJ5/details/)
## Getting started

View File

@ -15,13 +15,13 @@ The `main` branch is our development branch. All new features land here first.
To try new and experimental features, pull `main` and play with it. Note that `main` may not be stable because new features may introduce bugs.
Before the release of the next stable version, feature PRs will be frozen. A [release manager](../dev-internal/release/#release-management) will be assigned to major/minor version and will lead the etcd community in test, bug-fix and documentation of the release for one to two weeks.
Before the release of the next stable version, feature PRs will be frozen. A [release manager](./release.md/#release-management) will be assigned to major/minor version and will lead the etcd community in test, bug-fix and documentation of the release for one to two weeks.
### Stable branches
All branches with prefix `release-` are considered _stable_ branches.
After every minor release ([semver.org](https://semver.org/)), we will have a new stable branch for that release, managed by a [patch release manager](../dev-internal/release/#release-management). We will keep fixing the backwards-compatible bugs for the latest two stable releases. A _patch_ release to each supported release branch, incorporating any bug fixes, will be once every two weeks, given any patches.
After every minor release ([semver.org](https://semver.org/)), we will have a new stable branch for that release, managed by a [patch release manager](./release.md/#release-management). We will keep fixing the backwards-compatible bugs for the latest two stable releases. A _patch_ release to each supported release branch, incorporating any bug fixes, will be once every two weeks, given any patches.
[main]: https://github.com/etcd-io/etcd/tree/main

View File

@ -71,5 +71,5 @@ git log ...${PREV_VERSION} --pretty=format:"%an" | sort | uniq | tr '\n' ',' | s
## Post release
- Create new stable branch through `git push origin ${VERSION_MAJOR}.${VERSION_MINOR}` if this is a major stable release. This assumes `origin` corresponds to "https://github.com/etcd-io/etcd\".
- Create new stable branch through `git push origin ${VERSION_MAJOR}.${VERSION_MINOR}` if this is a major stable release. This assumes `origin` corresponds to "https://github.com/etcd-io/etcd".
- Bump [hardcoded Version in the repository](https://github.com/etcd-io/etcd/blob/v3.4.15/version/version.go#L30) to the version `${VERSION}+git`.

View File

@ -9,7 +9,7 @@ A PR can have various labels, milestone, reviewer etc. The detailed list of labe
https://github.com/kubernetes/kubernetes/labels
Following are few example searches on PR for convenience:
* [Open PRS for milestone etcd-v3.4](https://github.com/etcd-io/etcd/pulls?utf8=%E2%9C%93&q=is%3Apr+is%3Aopen+milestone%3Aetcd-v3.4)
* [Open PRS for milestone etcd-v3.6](https://github.com/etcd-io/etcd/pulls?utf8=%E2%9C%93&q=is%3Apr+is%3Aopen+milestone%3Aetcd-v3.6)
* [PRs under investigation](https://github.com/etcd-io/etcd/labels/Investigating)
## Scope

View File

@ -14,7 +14,7 @@ please see [CONTRIBUTING](./CONTRIBUTING.md) guide.
## Maintainers
[Maintainers](./MAINTAINERS) are first and foremost contributors that have shown they
Maintainers are first and foremost contributors that have shown they
are committed to the long term success of a project. Maintainership is about building
trust with the current maintainers of the project and being a person that they can
depend on to make decisions in the best interest of the project in a consistent manner.
@ -39,18 +39,35 @@ below.
- Resolution of bugs triaged to a package/feature
- Regularly review pull requests to the pkg subsystem
Contributors who are interested in becoming a maintainer, if performing these
responsibilities, should discuss their interest with the existing maintainers. New
maintainers must be nominated by an existing maintainer and must be elected by a
supermajority of maintainers with a fallback on lazy consensus after three business weeks
inactive voting period and as long as two maintainers are on board. Maintainers can be
removed by a supermajority of the maintainers and moved to emeritus status.
### Nomination and retiring of maintainers
Life priorities, interests, and passions can change. If a maintainer needs to step
down, inform other maintainers about this intention, and if possible, help find someone
to pick up the related work. At the very least, ensure the related work can be continued.
Afterward, create a pull request to remove yourself from the [MAINTAINERS](./MAINTAINERS)
file.
[Maintainers](./MAINTAINERS) file on the `main` branch reflects the latest
state of project maintainers. List of existing maintainers should be kept up to
date by existing maintainers to properly reflect community health and to gain
better understanding of recruiting need for new maintainers. Changes to list of
maintainers should be done by opening a pull request and CCing all the existing
maintainers.
Contributors who are interested in becoming a maintainer, if performing relevant
responsibilities, should discuss their interest with the existing maintainers.
New maintainers must be nominated by an existing maintainer and must be elected
by a supermajority of maintainers with a fallback on lazy consensus after three
business weeks inactive voting period and as long as two maintainers are on board.
Life priorities, interests, and passions can change. Maintainers can retire and
move to the [emeritus status](./README.md#etcd-emeritus-maintainers). If a
maintainer needs to step down, they should inform other maintainers, if possible,
help find someone to pick up the related work. At the very least, ensure the
related work can be continued. Afterward they can remove themselves from list of
existing maintainers.
If a maintainer is not been performing their duties for period of 12 months,
they can be removed by other maintainers. In that case inactive maintainer will
be first notified via an email. If situation doesn't improve, they will be
removed. If an emeritus maintainer wants to regain an active role, they can do
so by renewing their contributions. Active maintainers should welcome such a move.
Retiring of other maintainers or regaining the status should require approval
of at least two active maintainers.
## Reviewers

View File

@ -158,7 +158,7 @@ Now it's time to dig into the full etcd API and other guides.
## Contact
- Mailing list: [etcd-dev](https://groups.google.com/forum/?hl=en#!forum/etcd-dev)
- IRC: #[etcd](irc://irc.freenode.org:6667/#etcd) on freenode.org
- Slack: [#etcd](https://kubernetes.slack.com/messages/C3HD8ARJ5/details/)
- Planning: [milestones](https://github.com/etcd-io/etcd/milestones)
- Bugs: [issues](https://github.com/etcd-io/etcd/issues)
@ -182,7 +182,7 @@ See [PR management](https://github.com/etcd-io/etcd/blob/main/Documentation/cont
## etcd Emeritus Maintainers
These emeritus maintainers dedicated a part of their career to etcd and reviewed code, triaged bugs, and pushed the project forward over a substantial period of time. Their contribution is greatly appreciated.
These emeritus maintainers dedicated a part of their career to etcd and reviewed code, triaged bugs and pushed the project forward over a substantial period of time. Their contribution is greatly appreciated.
* Fanmin Shi
* Anthony Romano

View File

@ -21,26 +21,29 @@ import (
"time"
)
type keepAliveConn interface {
SetKeepAlive(bool) error
SetKeepAlivePeriod(d time.Duration) error
}
// NewKeepAliveListener returns a listener that listens on the given address.
// Be careful when wrap around KeepAliveListener with another Listener if TLSInfo is not nil.
// Some pkgs (like go/http) might expect Listener to return TLSConn type to start TLS handshake.
// http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html
//
// Note(ahrtr):
// only `net.TCPConn` supports `SetKeepAlive` and `SetKeepAlivePeriod`
// by default, so if you want to wrap multiple layers of net.Listener,
// the `keepaliveListener` should be the one which is closest to the
// original `net.Listener` implementation, namely `TCPListener`.
func NewKeepAliveListener(l net.Listener, scheme string, tlscfg *tls.Config) (net.Listener, error) {
kal := &keepaliveListener{
Listener: l,
}
if scheme == "https" {
if tlscfg == nil {
return nil, fmt.Errorf("cannot listen on TLS for given listener: KeyFile and CertFile are not presented")
}
return newTLSKeepaliveListener(l, tlscfg), nil
return newTLSKeepaliveListener(kal, tlscfg), nil
}
return &keepaliveListener{
Listener: l,
}, nil
return kal, nil
}
type keepaliveListener struct{ net.Listener }
@ -50,13 +53,43 @@ func (kln *keepaliveListener) Accept() (net.Conn, error) {
if err != nil {
return nil, err
}
kac := c.(keepAliveConn)
kac, err := createKeepaliveConn(c)
if err != nil {
return nil, fmt.Errorf("create keepalive connection failed, %w", err)
}
// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
// default on linux: 30 + 8 * 30
// default on osx: 30 + 8 * 75
kac.SetKeepAlive(true)
kac.SetKeepAlivePeriod(30 * time.Second)
return c, nil
if err := kac.SetKeepAlive(true); err != nil {
return nil, fmt.Errorf("SetKeepAlive failed, %w", err)
}
if err := kac.SetKeepAlivePeriod(30 * time.Second); err != nil {
return nil, fmt.Errorf("SetKeepAlivePeriod failed, %w", err)
}
return kac, nil
}
func createKeepaliveConn(c net.Conn) (*keepAliveConn, error) {
tcpc, ok := c.(*net.TCPConn)
if !ok {
return nil, ErrNotTCP
}
return &keepAliveConn{tcpc}, nil
}
type keepAliveConn struct {
*net.TCPConn
}
// SetKeepAlive sets keepalive
func (l *keepAliveConn) SetKeepAlive(doKeepAlive bool) error {
return l.TCPConn.SetKeepAlive(doKeepAlive)
}
// SetKeepAlivePeriod sets keepalive period
func (l *keepAliveConn) SetKeepAlivePeriod(d time.Duration) error {
return l.TCPConn.SetKeepAlivePeriod(d)
}
// A tlsKeepaliveListener implements a network listener (net.Listener) for TLS connections.
@ -72,12 +105,7 @@ func (l *tlsKeepaliveListener) Accept() (c net.Conn, err error) {
if err != nil {
return
}
kac := c.(keepAliveConn)
// detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl
// default on linux: 30 + 8 * 30
// default on osx: 30 + 8 * 75
kac.SetKeepAlive(true)
kac.SetKeepAlivePeriod(30 * time.Second)
c = tls.Server(c, l.config)
return c, nil
}

View File

@ -40,6 +40,9 @@ func TestNewKeepAliveListener(t *testing.T) {
if err != nil {
t.Fatalf("unexpected Accept error: %v", err)
}
if _, ok := conn.(*keepAliveConn); !ok {
t.Fatalf("Unexpected conn type: %T, wanted *keepAliveConn", conn)
}
conn.Close()
ln.Close()

View File

@ -63,6 +63,9 @@ func (l *limitListenerConn) Close() error {
return err
}
// SetKeepAlive sets keepalive
//
// Deprecated: use (*keepAliveConn) SetKeepAlive instead.
func (l *limitListenerConn) SetKeepAlive(doKeepAlive bool) error {
tcpc, ok := l.Conn.(*net.TCPConn)
if !ok {
@ -71,6 +74,9 @@ func (l *limitListenerConn) SetKeepAlive(doKeepAlive bool) error {
return tcpc.SetKeepAlive(doKeepAlive)
}
// SetKeepAlivePeriod sets keepalive period
//
// Deprecated: use (*keepAliveConn) SetKeepAlivePeriod instead.
func (l *limitListenerConn) SetKeepAlivePeriod(d time.Duration) error {
tcpc, ok := l.Conn.(*net.TCPConn)
if !ok {

View File

@ -68,7 +68,7 @@ func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, err
fallthrough
case lnOpts.IsTimeout(), lnOpts.IsSocketOpts():
// timeout listener with socket options.
ln, err := lnOpts.ListenConfig.Listen(context.TODO(), "tcp", addr)
ln, err := newKeepAliveListener(&lnOpts.ListenConfig, addr)
if err != nil {
return nil, err
}
@ -78,7 +78,7 @@ func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, err
writeTimeout: lnOpts.writeTimeout,
}
case lnOpts.IsTimeout():
ln, err := net.Listen("tcp", addr)
ln, err := newKeepAliveListener(nil, addr)
if err != nil {
return nil, err
}
@ -88,7 +88,7 @@ func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, err
writeTimeout: lnOpts.writeTimeout,
}
default:
ln, err := net.Listen("tcp", addr)
ln, err := newKeepAliveListener(nil, addr)
if err != nil {
return nil, err
}
@ -102,6 +102,19 @@ func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, err
return wrapTLS(scheme, lnOpts.tlsInfo, lnOpts.Listener)
}
func newKeepAliveListener(cfg *net.ListenConfig, addr string) (ln net.Listener, err error) {
if cfg != nil {
ln, err = cfg.Listen(context.TODO(), "tcp", addr)
} else {
ln, err = net.Listen("tcp", addr)
}
if err != nil {
return
}
return NewKeepAliveListener(ln, "tcp", nil)
}
func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, error) {
if scheme != "https" && scheme != "unixs" {
return l, nil

View File

@ -205,6 +205,15 @@ func TestNewListenerWithSocketOpts(t *testing.T) {
if !test.expectedErr && err != nil {
t.Fatalf("unexpected error: %v", err)
}
if test.scheme == "http" {
lnOpts := newListenOpts(test.opts...)
if !lnOpts.IsSocketOpts() && !lnOpts.IsTimeout() {
if _, ok := ln.(*keepaliveListener); !ok {
t.Fatalf("ln: unexpected listener type: %T, wanted *keepaliveListener", ln)
}
}
}
})
}
}

View File

@ -397,7 +397,7 @@ func (l *lessor) closeRequireLeader() {
}
}
func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (karesp *LeaseKeepAliveResponse, ferr error) {
cctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -406,6 +406,15 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
return nil, toErr(ctx, err)
}
defer func() {
if err := stream.CloseSend(); err != nil {
if ferr == nil {
ferr = toErr(ctx, err)
}
return
}
}()
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
if err != nil {
return nil, toErr(ctx, err)
@ -416,7 +425,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
return nil, toErr(ctx, rerr)
}
karesp := &LeaseKeepAliveResponse{
karesp = &LeaseKeepAliveResponse{
ResponseHeader: resp.GetHeader(),
ID: LeaseID(resp.ID),
TTL: resp.TTL,

View File

@ -43,12 +43,8 @@ require (
replace (
go.etcd.io/etcd/api/v3 => ../api
go.etcd.io/etcd/client/pkg/v3 => ../client/pkg
go.etcd.io/etcd/client/v2 => ../client/v2
go.etcd.io/etcd/client/v3 => ../client/v3
go.etcd.io/etcd/etcdutl/v3 => ../etcdutl
go.etcd.io/etcd/pkg/v3 => ../pkg
go.etcd.io/etcd/raft/v3 => ../raft
go.etcd.io/etcd/server/v3 => ../server
)
// Bad imports are sometimes causing attempts to pull that code.

View File

@ -18,6 +18,7 @@ package expect
import (
"bufio"
"context"
"fmt"
"io"
"os"
@ -33,6 +34,8 @@ import (
const DEBUG_LINES_TAIL = 40
type ExpectProcess struct {
name string
cmd *exec.Cmd
fpty *os.File
wg sync.WaitGroup
@ -40,6 +43,7 @@ type ExpectProcess struct {
mu sync.Mutex // protects lines and err
lines []string
count int // increment whenever new line gets added
cur int // current read position
err error
// StopSignal is the signal Stop sends to the process; defaults to SIGTERM.
@ -48,15 +52,16 @@ type ExpectProcess struct {
// NewExpect creates a new process for expect testing.
func NewExpect(name string, arg ...string) (ep *ExpectProcess, err error) {
// if env[] is nil, use current system env
return NewExpectWithEnv(name, arg, nil)
// if env[] is nil, use current system env and the default command as name
return NewExpectWithEnv(name, arg, nil, name)
}
// NewExpectWithEnv creates a new process with user defined env variables for expect testing.
func NewExpectWithEnv(name string, args []string, env []string) (ep *ExpectProcess, err error) {
func NewExpectWithEnv(name string, args []string, env []string, serverProcessConfigName string) (ep *ExpectProcess, err error) {
cmd := exec.Command(name, args...)
cmd.Env = env
ep = &ExpectProcess{
name: serverProcessConfigName,
cmd: cmd,
StopSignal: syscall.SIGTERM,
}
@ -72,6 +77,10 @@ func NewExpectWithEnv(name string, args []string, env []string) (ep *ExpectProce
return ep, nil
}
func (ep *ExpectProcess) Pid() int {
return ep.cmd.Process.Pid
}
func (ep *ExpectProcess) read() {
defer ep.wg.Done()
printDebugLines := os.Getenv("EXPECT_DEBUG") != ""
@ -81,7 +90,7 @@ func (ep *ExpectProcess) read() {
ep.mu.Lock()
if l != "" {
if printDebugLines {
fmt.Printf("%s-%d: %s", ep.cmd.Path, ep.cmd.Process.Pid, l)
fmt.Printf("%s (%s) (%d): %s", ep.cmd.Path, ep.name, ep.cmd.Process.Pid, l)
}
ep.lines = append(ep.lines, l)
ep.count++
@ -96,7 +105,7 @@ func (ep *ExpectProcess) read() {
}
// ExpectFunc returns the first line satisfying the function f.
func (ep *ExpectProcess) ExpectFunc(f func(string) bool) (string, error) {
func (ep *ExpectProcess) ExpectFunc(ctx context.Context, f func(string) bool) (string, error) {
i := 0
for {
@ -114,7 +123,13 @@ func (ep *ExpectProcess) ExpectFunc(f func(string) bool) (string, error) {
break
}
ep.mu.Unlock()
time.Sleep(time.Millisecond * 10)
select {
case <-ctx.Done():
return "", fmt.Errorf("failed to find match string: %w", ctx.Err())
case <-time.After(time.Millisecond * 10):
// continue loop
}
}
ep.mu.Lock()
lastLinesIndex := len(ep.lines) - DEBUG_LINES_TAIL
@ -128,9 +143,15 @@ func (ep *ExpectProcess) ExpectFunc(f func(string) bool) (string, error) {
ep.err, lastLines)
}
// ExpectWithContext returns the first line containing the given string.
func (ep *ExpectProcess) ExpectWithContext(ctx context.Context, s string) (string, error) {
return ep.ExpectFunc(ctx, func(txt string) bool { return strings.Contains(txt, s) })
}
// Expect returns the first line containing the given string.
// Deprecated: please use ExpectWithContext instead.
func (ep *ExpectProcess) Expect(s string) (string, error) {
return ep.ExpectFunc(func(txt string) bool { return strings.Contains(txt, s) })
return ep.ExpectWithContext(context.Background(), s)
}
// LineCount returns the number of recorded lines since
@ -198,3 +219,15 @@ func (ep *ExpectProcess) Lines() []string {
defer ep.mu.Unlock()
return ep.lines
}
// ReadLine returns line by line.
func (ep *ExpectProcess) ReadLine() string {
ep.mu.Lock()
defer ep.mu.Unlock()
if ep.count > ep.cur {
line := ep.lines[ep.cur]
ep.cur++
return line
}
return ""
}

View File

@ -17,9 +17,12 @@
package expect
import (
"context"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestExpectFunc(t *testing.T) {
@ -28,7 +31,7 @@ func TestExpectFunc(t *testing.T) {
t.Fatal(err)
}
wstr := "hello world\r\n"
l, eerr := ep.ExpectFunc(func(a string) bool { return len(a) > 10 })
l, eerr := ep.ExpectFunc(context.Background(), func(a string) bool { return len(a) > 10 })
if eerr != nil {
t.Fatal(eerr)
}
@ -40,6 +43,33 @@ func TestExpectFunc(t *testing.T) {
}
}
func TestExpectFuncTimeout(t *testing.T) {
ep, err := NewExpect("tail", "-f", "/dev/null")
if err != nil {
t.Fatal(err)
}
go func() {
// It's enough to have "talkative" process to stuck in the infinite loop of reading
for {
err := ep.Send("new line\n")
if err != nil {
return
}
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
_, err = ep.ExpectFunc(ctx, func(a string) bool { return false })
require.ErrorAs(t, err, &context.DeadlineExceeded)
if err = ep.Stop(); err != nil {
t.Fatal(err)
}
}
func TestEcho(t *testing.T) {
ep, err := NewExpect("echo", "hello world")
if err != nil {

View File

@ -216,7 +216,7 @@ func TestLogMaybeAppend(t *testing.T) {
lastterm, lastindex, lastindex + 2, []pb.Entry{{Index: lastindex + 1, Term: 4}, {Index: lastindex + 2, Term: 4}},
lastindex + 2, true, lastindex + 2, false,
},
// match with the the entry in the middle
// match with the entry in the middle
{
lastterm - 1, lastindex - 1, lastindex, []pb.Entry{{Index: lastindex, Term: 4}},
lastindex, true, lastindex, false,

View File

@ -1691,7 +1691,7 @@ func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.Co
r.maybeSendAppend(id, false /* sendIfEmpty */)
})
}
// If the the leadTransferee was removed or demoted, abort the leadership transfer.
// If the leadTransferee was removed or demoted, abort the leadership transfer.
if _, tOK := r.prs.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
r.abortLeaderTransfer()
}

View File

@ -146,6 +146,12 @@ function generic_checker {
fi
}
function killall_functional_test {
log_callout "Killing all etcd-agent and etcd processes..."
killall -9 etcd-agent
killall -9 etcd
}
function functional_pass {
run ./tests/functional/build.sh || exit 1
@ -166,32 +172,32 @@ function functional_pass {
done
done
trap killall_functional_test INT
log_callout "functional test START!"
run ./bin/etcd-tester --config ./tests/functional/functional.yaml -test.v && log_success "'etcd-tester' succeeded"
local etcd_tester_exit_code=$?
if [[ "${etcd_tester_exit_code}" -ne "0" ]]; then
log_error "ETCD_TESTER_EXIT_CODE:" ${etcd_tester_exit_code}
exit 1
fi
# shellcheck disable=SC2206
agent_pids=($agent_pids)
kill -s TERM "${agent_pids[@]}" || true
if [[ "${etcd_tester_exit_code}" -ne "0" ]]; then
log_error "ETCD_TESTER_EXIT_CODE:" ${etcd_tester_exit_code}
log_error -e "\\nFAILED! 'tail -1000 /tmp/etcd-functional-1/etcd.log'"
tail -1000 /tmp/etcd-functional-1/etcd.log
tail -100 /tmp/etcd-functional-1/etcd.log
log_error -e "\\nFAILED! 'tail -1000 /tmp/etcd-functional-2/etcd.log'"
tail -1000 /tmp/etcd-functional-2/etcd.log
tail -100 /tmp/etcd-functional-2/etcd.log
log_error -e "\\nFAILED! 'tail -1000 /tmp/etcd-functional-3/etcd.log'"
tail -1000 /tmp/etcd-functional-3/etcd.log
tail -100 /tmp/etcd-functional-3/etcd.log
log_error "--- FAIL: exit code" ${etcd_tester_exit_code}
return ${etcd_tester_exit_code}
exit ${etcd_tester_exit_code}
fi
log_success "functional test PASS!"
}

View File

@ -965,6 +965,8 @@ func NewAuthStore(lg *zap.Logger, be AuthBackend, tp TokenProvider, bcryptCost i
as.setupMetricsReporter()
as.refreshRangePermCache(tx)
tx.Unlock()
be.ForceCommit()

View File

@ -666,12 +666,6 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
}
if network == "tcp" {
if sctx.l, err = transport.NewKeepAliveListener(sctx.l, network, nil); err != nil {
return nil, err
}
}
defer func(u url.URL) {
if err == nil {
return

View File

@ -19,6 +19,7 @@ import (
"os"
"runtime"
"strings"
"time"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/logutil"
@ -207,6 +208,8 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
select {
case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
case <-time.After(cfg.ExperimentalWaitClusterReadyTimeout):
e.GetLogger().Warn("startEtcd: timed out waiting for the ready notification")
}
return e.Server.StopNotify(), e.Err(), nil
}

View File

@ -43,6 +43,9 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/v3lock/v3lockpb"
"go.etcd.io/etcd/server/v3/proxy/grpcproxy"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
@ -55,15 +58,16 @@ import (
)
var (
grpcProxyListenAddr string
grpcProxyMetricsListenAddr string
grpcProxyEndpoints []string
grpcProxyDNSCluster string
grpcProxyDNSClusterServiceName string
grpcProxyInsecureDiscovery bool
grpcProxyDataDir string
grpcMaxCallSendMsgSize int
grpcMaxCallRecvMsgSize int
grpcProxyListenAddr string
grpcProxyMetricsListenAddr string
grpcProxyEndpoints []string
grpcProxyEndpointsAutoSyncInterval time.Duration
grpcProxyDNSCluster string
grpcProxyDNSClusterServiceName string
grpcProxyInsecureDiscovery bool
grpcProxyDataDir string
grpcMaxCallSendMsgSize int
grpcMaxCallRecvMsgSize int
// tls for connecting to etcd
@ -91,6 +95,7 @@ var (
grpcProxyEnablePprof bool
grpcProxyEnableOrdering bool
grpcProxyEnableLogging bool
grpcProxyDebug bool
@ -132,6 +137,7 @@ func newGRPCProxyStartCommand() *cobra.Command {
cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for endpoint /metrics requests on an additional interface")
cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
cmd.Flags().DurationVar(&grpcProxyEndpointsAutoSyncInterval, "endpoints-auto-sync-interval", 0, "etcd endpoints auto sync interval (disabled by default)")
cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
@ -162,6 +168,7 @@ func newGRPCProxyStartCommand() *cobra.Command {
// experimental flags
cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")
cmd.Flags().StringVar(&grpcProxyLeasing, "experimental-leasing-prefix", "", "leasing metadata prefix for disconnected linearized reads.")
cmd.Flags().BoolVar(&grpcProxyEnableLogging, "experimental-enable-grpc-logging", false, "logging all grpc requests and responses")
cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")
@ -343,8 +350,9 @@ func newProxyClientCfg(lg *zap.Logger, eps []string, tls *transport.TLSInfo) (*c
func newClientCfg(lg *zap.Logger, eps []string) (*clientv3.Config, error) {
// set tls if any one tls option set
cfg := clientv3.Config{
Endpoints: eps,
DialTimeout: 5 * time.Second,
Endpoints: eps,
AutoSyncInterval: grpcProxyEndpointsAutoSyncInterval,
DialTimeout: 5 * time.Second,
}
if grpcMaxCallSendMsgSize > 0 {
@ -440,9 +448,32 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
electionp := grpcproxy.NewElectionProxy(client)
lockp := grpcproxy.NewLockProxy(client)
alwaysLoggingDeciderServer := func(ctx context.Context, fullMethodName string, servingObject interface{}) bool { return true }
grpcChainStreamList := []grpc.StreamServerInterceptor{
grpc_prometheus.StreamServerInterceptor,
}
grpcChainUnaryList := []grpc.UnaryServerInterceptor{
grpc_prometheus.UnaryServerInterceptor,
}
if grpcProxyEnableLogging {
grpcChainStreamList = append(grpcChainStreamList,
grpc_ctxtags.StreamServerInterceptor(),
grpc_zap.PayloadStreamServerInterceptor(lg, alwaysLoggingDeciderServer),
)
grpcChainUnaryList = append(grpcChainUnaryList,
grpc_ctxtags.UnaryServerInterceptor(),
grpc_zap.PayloadUnaryServerInterceptor(lg, alwaysLoggingDeciderServer),
)
}
gopts := []grpc.ServerOption{
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpcChainStreamList...,
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpcChainUnaryList...,
)),
grpc.MaxConcurrentStreams(math.MaxUint32),
}
if grpcKeepAliveMinTime > time.Duration(0) {

View File

@ -346,8 +346,9 @@ func (sws *serverWatchStream) recvLoop() error {
}
default:
// we probably should not shutdown the entire stream when
// receive an valid command.
// receive an invalid command.
// so just do nothing instead.
sws.lg.Sugar().Infof("invalid watch request type %T received in gRPC stream", uv)
continue
}
}

View File

@ -685,7 +685,7 @@ func TestKVRestore(t *testing.T) {
keysBefore := readGaugeInt(keysGauge)
s.Close()
// ns should recover the the previous state from backend.
// ns should recover the previous state from backend.
ns := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore {

View File

@ -35,14 +35,14 @@ func TestAlarm(t *testing.T) {
testutils.ExecuteUntil(ctx, t, func() {
// test small put still works
smallbuf := strings.Repeat("a", 64)
if err := clus.Client().Put("1st_test", smallbuf, config.PutOptions{}); err != nil {
if err := clus.Client().Put(ctx, "1st_test", smallbuf, config.PutOptions{}); err != nil {
t.Fatalf("alarmTest: put kv error (%v)", err)
}
// write some chunks to fill up the database
buf := strings.Repeat("b", os.Getpagesize())
for {
if err := clus.Client().Put("2nd_test", buf, config.PutOptions{}); err != nil {
if err := clus.Client().Put(ctx, "2nd_test", buf, config.PutOptions{}); err != nil {
if !strings.Contains(err.Error(), "etcdserver: mvcc: database space exceeded") {
t.Fatal(err)
}
@ -51,20 +51,20 @@ func TestAlarm(t *testing.T) {
}
// quota alarm should now be on
alarmResp, err := clus.Client().AlarmList()
alarmResp, err := clus.Client().AlarmList(ctx)
if err != nil {
t.Fatalf("alarmTest: Alarm error (%v)", err)
}
// check that Put is rejected when alarm is on
if err := clus.Client().Put("3rd_test", smallbuf, config.PutOptions{}); err != nil {
if err := clus.Client().Put(ctx, "3rd_test", smallbuf, config.PutOptions{}); err != nil {
if !strings.Contains(err.Error(), "etcdserver: mvcc: database space exceeded") {
t.Fatal(err)
}
}
// get latest revision to compact
sresp, err := clus.Client().Status()
sresp, err := clus.Client().Status(ctx)
if err != nil {
t.Fatalf("get endpoint status error: %v", err)
}
@ -77,12 +77,12 @@ func TestAlarm(t *testing.T) {
}
// make some space
_, err = clus.Client().Compact(rvs, config.CompactOption{Physical: true, Timeout: 10 * time.Second})
_, err = clus.Client().Compact(ctx, rvs, config.CompactOption{Physical: true, Timeout: 10 * time.Second})
if err != nil {
t.Fatalf("alarmTest: Compact error (%v)", err)
}
if err = clus.Client().Defragment(config.DefragOption{Timeout: 10 * time.Second}); err != nil {
if err = clus.Client().Defragment(ctx, config.DefragOption{Timeout: 10 * time.Second}); err != nil {
t.Fatalf("alarmTest: defrag error (%v)", err)
}
@ -92,14 +92,14 @@ func TestAlarm(t *testing.T) {
MemberID: alarm.MemberID,
Alarm: alarm.Alarm,
}
_, err = clus.Client().AlarmDisarm(alarmMember)
_, err = clus.Client().AlarmDisarm(ctx, alarmMember)
if err != nil {
t.Fatalf("alarmTest: Alarm error (%v)", err)
}
}
// put one more key below quota
if err := clus.Client().Put("4th_test", smallbuf, config.PutOptions{}); err != nil {
if err := clus.Client().Put(ctx, "4th_test", smallbuf, config.PutOptions{}); err != nil {
t.Fatal(err)
}
})

View File

@ -50,11 +50,11 @@ func TestCompact(t *testing.T) {
testutils.ExecuteUntil(ctx, t, func() {
var kvs = []testutils.KV{{Key: "key", Val: "val1"}, {Key: "key", Val: "val2"}, {Key: "key", Val: "val3"}}
for i := range kvs {
if err := clus.Client().Put(kvs[i].Key, kvs[i].Val, config.PutOptions{}); err != nil {
if err := clus.Client().Put(ctx, kvs[i].Key, kvs[i].Val, config.PutOptions{}); err != nil {
t.Fatalf("compactTest #%d: put kv error (%v)", i, err)
}
}
get, err := clus.Client().Get("key", config.GetOptions{Revision: 3})
get, err := clus.Client().Get(ctx, "key", config.GetOptions{Revision: 3})
if err != nil {
t.Fatalf("compactTest: Get kv by revision error (%v)", err)
}
@ -62,12 +62,12 @@ func TestCompact(t *testing.T) {
getkvs := testutils.KeyValuesFromGetResponse(get)
assert.Equal(t, kvs[1:2], getkvs)
_, err = clus.Client().Compact(4, tc.options)
_, err = clus.Client().Compact(ctx, 4, tc.options)
if err != nil {
t.Fatalf("compactTest: Compact error (%v)", err)
}
get, err = clus.Client().Get("key", config.GetOptions{Revision: 3})
get, err = clus.Client().Get(ctx, "key", config.GetOptions{Revision: 3})
if err != nil {
if !strings.Contains(err.Error(), "required revision has been compacted") {
t.Fatalf("compactTest: Get compact key error (%v)", err)
@ -76,7 +76,7 @@ func TestCompact(t *testing.T) {
t.Fatalf("expected '...has been compacted' error, got <nil>")
}
_, err = clus.Client().Compact(2, tc.options)
_, err = clus.Client().Compact(ctx, 2, tc.options)
if err != nil {
if !strings.Contains(err.Error(), "required revision has been compacted") {
t.Fatal(err)

View File

@ -33,16 +33,16 @@ func TestDefragOnline(t *testing.T) {
defer clus.Close()
var kvs = []testutils.KV{{Key: "key", Val: "val1"}, {Key: "key", Val: "val2"}, {Key: "key", Val: "val3"}}
for i := range kvs {
if err := clus.Client().Put(kvs[i].Key, kvs[i].Val, config.PutOptions{}); err != nil {
if err := clus.Client().Put(ctx, kvs[i].Key, kvs[i].Val, config.PutOptions{}); err != nil {
t.Fatalf("compactTest #%d: put kv error (%v)", i, err)
}
}
_, err := clus.Client().Compact(4, config.CompactOption{Physical: true, Timeout: 10 * time.Second})
_, err := clus.Client().Compact(ctx, 4, config.CompactOption{Physical: true, Timeout: 10 * time.Second})
if err != nil {
t.Fatalf("defrag_test: compact with revision error (%v)", err)
}
if err = clus.Client().Defragment(options); err != nil {
if err = clus.Client().Defragment(ctx, options); err != nil {
t.Fatalf("defrag_test: defrag error (%v)", err)
}
})

View File

@ -30,7 +30,7 @@ func TestEndpointStatus(t *testing.T) {
clus := testRunner.NewCluster(ctx, t, config.ClusterConfig{ClusterSize: 3})
defer clus.Close()
testutils.ExecuteUntil(ctx, t, func() {
_, err := clus.Client().Status()
_, err := clus.Client().Status(ctx)
if err != nil {
t.Fatalf("get endpoint status error: %v", err)
}
@ -44,7 +44,7 @@ func TestEndpointHashKV(t *testing.T) {
clus := testRunner.NewCluster(ctx, t, config.ClusterConfig{ClusterSize: 3})
defer clus.Close()
testutils.ExecuteUntil(ctx, t, func() {
_, err := clus.Client().HashKV(0)
_, err := clus.Client().HashKV(ctx, 0)
if err != nil {
t.Fatalf("get endpoint hashkv error: %v", err)
}
@ -58,7 +58,7 @@ func TestEndpointHealth(t *testing.T) {
clus := testRunner.NewCluster(ctx, t, config.ClusterConfig{ClusterSize: 3})
defer clus.Close()
testutils.ExecuteUntil(ctx, t, func() {
if err := clus.Client().Health(); err != nil {
if err := clus.Client().Health(ctx); err != nil {
t.Fatalf("get endpoint health error: %v", err)
}
})

View File

@ -38,10 +38,10 @@ func TestKVPut(t *testing.T) {
testutils.ExecuteUntil(ctx, t, func() {
key, value := "foo", "bar"
if err := cc.Put(key, value, config.PutOptions{}); err != nil {
if err := cc.Put(ctx, key, value, config.PutOptions{}); err != nil {
t.Fatalf("count not put key %q, err: %s", key, err)
}
resp, err := cc.Get(key, config.GetOptions{})
resp, err := cc.Get(ctx, key, config.GetOptions{})
if err != nil {
t.Fatalf("count not get key %q, err: %s", key, err)
}
@ -78,7 +78,7 @@ func TestKVGet(t *testing.T) {
)
for i := range kvs {
if err := cc.Put(kvs[i], "bar", config.PutOptions{}); err != nil {
if err := cc.Put(ctx, kvs[i], "bar", config.PutOptions{}); err != nil {
t.Fatalf("count not put key %q, err: %s", kvs[i], err)
}
}
@ -107,7 +107,7 @@ func TestKVGet(t *testing.T) {
{begin: "", options: config.GetOptions{Prefix: true, Order: clientv3.SortDescend, SortBy: clientv3.SortByKey}, wkv: reversedKvs},
}
for _, tt := range tests {
resp, err := cc.Get(tt.begin, tt.options)
resp, err := cc.Get(ctx, tt.begin, tt.options)
if err != nil {
t.Fatalf("count not get key %q, err: %s", tt.begin, err)
}
@ -178,16 +178,16 @@ func TestKVDelete(t *testing.T) {
}
for _, tt := range tests {
for i := range kvs {
if err := cc.Put(kvs[i], "bar", config.PutOptions{}); err != nil {
if err := cc.Put(ctx, kvs[i], "bar", config.PutOptions{}); err != nil {
t.Fatalf("count not put key %q, err: %s", kvs[i], err)
}
}
del, err := cc.Delete(tt.deleteKey, tt.options)
del, err := cc.Delete(ctx, tt.deleteKey, tt.options)
if err != nil {
t.Fatalf("count not get key %q, err: %s", tt.deleteKey, err)
}
assert.Equal(t, tt.wantDeleted, int(del.Deleted))
get, err := cc.Get("", config.GetOptions{Prefix: true})
get, err := cc.Get(ctx, "", config.GetOptions{Prefix: true})
if err != nil {
t.Fatalf("count not get key, err: %s", err)
}
@ -230,7 +230,7 @@ func TestKVGetNoQuorum(t *testing.T) {
cc := clus.Members()[2].Client()
testutils.ExecuteUntil(ctx, t, func() {
key := "foo"
_, err := cc.Get(key, tc.options)
_, err := cc.Get(ctx, key, tc.options)
gotError := err != nil
if gotError != tc.wantError {
t.Fatalf("Unexpeted result, wantError: %v, gotErr: %v, err: %s", tc.wantError, gotError, err)

View File

@ -63,10 +63,10 @@ func TestLeaseGrantTimeToLive(t *testing.T) {
testutils.ExecuteUntil(ctx, t, func() {
ttl := int64(10)
leaseResp, err := cc.Grant(ttl)
leaseResp, err := cc.Grant(ctx, ttl)
require.NoError(t, err)
ttlResp, err := cc.TimeToLive(leaseResp.ID, config.LeaseOption{})
ttlResp, err := cc.TimeToLive(ctx, leaseResp.ID, config.LeaseOption{})
require.NoError(t, err)
require.Equal(t, ttl, ttlResp.GrantedTTL)
})
@ -108,7 +108,7 @@ func TestLeaseGrantAndList(t *testing.T) {
testutils.ExecuteUntil(ctx, t, func() {
createdLeases := []clientv3.LeaseID{}
for i := 0; i < nc.leaseCount; i++ {
leaseResp, err := cc.Grant(10)
leaseResp, err := cc.Grant(ctx, 10)
t.Logf("Grant returned: resp:%s err:%v", leaseResp.String(), err)
require.NoError(t, err)
createdLeases = append(createdLeases, leaseResp.ID)
@ -119,7 +119,7 @@ func TestLeaseGrantAndList(t *testing.T) {
// or by hitting an up to date member.
leases := []clientv3.LeaseStatus{}
require.Eventually(t, func() bool {
resp, err := cc.LeaseList()
resp, err := cc.Leases(ctx)
if err != nil {
return false
}
@ -153,23 +153,23 @@ func TestLeaseGrantTimeToLiveExpired(t *testing.T) {
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
leaseResp, err := cc.Grant(2)
leaseResp, err := cc.Grant(ctx, 2)
require.NoError(t, err)
err = cc.Put("foo", "bar", config.PutOptions{LeaseID: leaseResp.ID})
err = cc.Put(ctx, "foo", "bar", config.PutOptions{LeaseID: leaseResp.ID})
require.NoError(t, err)
getResp, err := cc.Get("foo", config.GetOptions{})
getResp, err := cc.Get(ctx, "foo", config.GetOptions{})
require.NoError(t, err)
require.Equal(t, int64(1), getResp.Count)
time.Sleep(3 * time.Second)
ttlResp, err := cc.TimeToLive(leaseResp.ID, config.LeaseOption{})
ttlResp, err := cc.TimeToLive(ctx, leaseResp.ID, config.LeaseOption{})
require.NoError(t, err)
require.Equal(t, int64(-1), ttlResp.TTL)
getResp, err = cc.Get("foo", config.GetOptions{})
getResp, err = cc.Get(ctx, "foo", config.GetOptions{})
require.NoError(t, err)
// Value should expire with the lease
require.Equal(t, int64(0), getResp.Count)
@ -190,15 +190,15 @@ func TestLeaseGrantKeepAliveOnce(t *testing.T) {
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
leaseResp, err := cc.Grant(2)
leaseResp, err := cc.Grant(ctx, 2)
require.NoError(t, err)
_, err = cc.LeaseKeepAliveOnce(leaseResp.ID)
_, err = cc.KeepAliveOnce(ctx, leaseResp.ID)
require.NoError(t, err)
time.Sleep(2 * time.Second) // Wait for the original lease to expire
ttlResp, err := cc.TimeToLive(leaseResp.ID, config.LeaseOption{})
ttlResp, err := cc.TimeToLive(ctx, leaseResp.ID, config.LeaseOption{})
require.NoError(t, err)
// We still have a lease!
require.Greater(t, int64(2), ttlResp.TTL)
@ -219,24 +219,24 @@ func TestLeaseGrantRevoke(t *testing.T) {
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
leaseResp, err := cc.Grant(20)
leaseResp, err := cc.Grant(ctx, 20)
require.NoError(t, err)
err = cc.Put("foo", "bar", config.PutOptions{LeaseID: leaseResp.ID})
err = cc.Put(ctx, "foo", "bar", config.PutOptions{LeaseID: leaseResp.ID})
require.NoError(t, err)
getResp, err := cc.Get("foo", config.GetOptions{})
getResp, err := cc.Get(ctx, "foo", config.GetOptions{})
require.NoError(t, err)
require.Equal(t, int64(1), getResp.Count)
_, err = cc.LeaseRevoke(leaseResp.ID)
_, err = cc.Revoke(ctx, leaseResp.ID)
require.NoError(t, err)
ttlResp, err := cc.TimeToLive(leaseResp.ID, config.LeaseOption{})
ttlResp, err := cc.TimeToLive(ctx, leaseResp.ID, config.LeaseOption{})
require.NoError(t, err)
require.Equal(t, int64(-1), ttlResp.TTL)
getResp, err = cc.Get("foo", config.GetOptions{})
getResp, err = cc.Get(ctx, "foo", config.GetOptions{})
require.NoError(t, err)
// Value should expire with the lease
require.Equal(t, int64(0), getResp.Count)

View File

@ -16,9 +16,10 @@ package common
import (
"context"
"go.etcd.io/etcd/tests/v3/framework/testutils"
"testing"
"time"
"go.etcd.io/etcd/tests/v3/framework/testutils"
)
func TestMemberList(t *testing.T) {
@ -33,7 +34,7 @@ func TestMemberList(t *testing.T) {
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
resp, err := cc.MemberList()
resp, err := cc.MemberList(ctx)
if err != nil {
t.Fatalf("could not get member list, err: %s", err)
}

View File

@ -37,7 +37,7 @@ func TestRoleAdd_Simple(t *testing.T) {
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
_, err := cc.RoleAdd("root")
_, err := cc.RoleAdd(ctx, "root")
if err != nil {
t.Fatalf("want no error, but got (%v)", err)
}
@ -54,15 +54,15 @@ func TestRoleAdd_Error(t *testing.T) {
defer clus.Close()
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
_, err := cc.RoleAdd("test-role")
_, err := cc.RoleAdd(ctx, "test-role")
if err != nil {
t.Fatalf("want no error, but got (%v)", err)
}
_, err = cc.RoleAdd("test-role")
_, err = cc.RoleAdd(ctx, "test-role")
if err == nil || !strings.Contains(err.Error(), rpctypes.ErrRoleAlreadyExist.Error()) {
t.Fatalf("want (%v) error, but got (%v)", rpctypes.ErrRoleAlreadyExist, err)
}
_, err = cc.RoleAdd("")
_, err = cc.RoleAdd(ctx, "")
if err == nil || !strings.Contains(err.Error(), rpctypes.ErrRoleEmpty.Error()) {
t.Fatalf("want (%v) error, but got (%v)", rpctypes.ErrRoleEmpty, err)
}
@ -77,21 +77,21 @@ func TestRootRole(t *testing.T) {
defer clus.Close()
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
_, err := cc.RoleAdd("root")
_, err := cc.RoleAdd(ctx, "root")
if err != nil {
t.Fatalf("want no error, but got (%v)", err)
}
resp, err := cc.RoleGet("root")
resp, err := cc.RoleGet(ctx, "root")
if err != nil {
t.Fatalf("want no error, but got (%v)", err)
}
t.Logf("get role resp %+v", resp)
// granting to root should be refused by server and a no-op
_, err = cc.RoleGrantPermission("root", "foo", "", clientv3.PermissionType(clientv3.PermReadWrite))
_, err = cc.RoleGrantPermission(ctx, "root", "foo", "", clientv3.PermissionType(clientv3.PermReadWrite))
if err != nil {
t.Fatalf("want no error, but got (%v)", err)
}
resp2, err := cc.RoleGet("root")
resp2, err := cc.RoleGet(ctx, "root")
if err != nil {
t.Fatalf("want no error, but got (%v)", err)
}
@ -107,27 +107,27 @@ func TestRoleGrantRevokePermission(t *testing.T) {
defer clus.Close()
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
_, err := cc.RoleAdd("role1")
_, err := cc.RoleAdd(ctx, "role1")
if err != nil {
t.Fatalf("want no error, but got (%v)", err)
}
_, err = cc.RoleGrantPermission("role1", "bar", "", clientv3.PermissionType(clientv3.PermRead))
_, err = cc.RoleGrantPermission(ctx, "role1", "bar", "", clientv3.PermissionType(clientv3.PermRead))
if err != nil {
t.Fatalf("want no error, but got (%v)", err)
}
_, err = cc.RoleGrantPermission("role1", "bar", "", clientv3.PermissionType(clientv3.PermWrite))
_, err = cc.RoleGrantPermission(ctx, "role1", "bar", "", clientv3.PermissionType(clientv3.PermWrite))
if err != nil {
t.Fatalf("want no error, but got (%v)", err)
}
_, err = cc.RoleGrantPermission("role1", "bar", "foo", clientv3.PermissionType(clientv3.PermReadWrite))
_, err = cc.RoleGrantPermission(ctx, "role1", "bar", "foo", clientv3.PermissionType(clientv3.PermReadWrite))
if err != nil {
t.Fatalf("want no error, but got (%v)", err)
}
_, err = cc.RoleRevokePermission("role1", "foo", "")
_, err = cc.RoleRevokePermission(ctx, "role1", "foo", "")
if err == nil || !strings.Contains(err.Error(), rpctypes.ErrPermissionNotGranted.Error()) {
t.Fatalf("want error (%v), but got (%v)", rpctypes.ErrPermissionNotGranted, err)
}
_, err = cc.RoleRevokePermission("role1", "bar", "foo")
_, err = cc.RoleRevokePermission(ctx, "role1", "bar", "foo")
if err != nil {
t.Fatalf("want no error, but got (%v)", err)
}
@ -142,11 +142,11 @@ func TestRoleDelete(t *testing.T) {
defer clus.Close()
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
_, err := cc.RoleAdd("role1")
_, err := cc.RoleAdd(ctx, "role1")
if err != nil {
t.Fatalf("want no error, but got (%v)", err)
}
_, err = cc.RoleDelete("role1")
_, err = cc.RoleDelete(ctx, "role1")
if err != nil {
t.Fatalf("want no error, but got (%v)", err)
}

View File

@ -35,7 +35,7 @@ func TestStatus(t *testing.T) {
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
rs, err := cc.Status()
rs, err := cc.Status(ctx)
if err != nil {
t.Fatalf("could not get status, err: %s", err)
}

View File

@ -62,14 +62,14 @@ func TestTxnSucc(t *testing.T) {
defer clus.Close()
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
if err := cc.Put("key1", "value1", config.PutOptions{}); err != nil {
if err := cc.Put(ctx, "key1", "value1", config.PutOptions{}); err != nil {
t.Fatalf("could not create key:%s, value:%s", "key1", "value1")
}
if err := cc.Put("key2", "value2", config.PutOptions{}); err != nil {
if err := cc.Put(ctx, "key2", "value2", config.PutOptions{}); err != nil {
t.Fatalf("could not create key:%s, value:%s", "key2", "value2")
}
for _, req := range reqs {
resp, err := cc.Txn(req.compare, req.ifSucess, req.ifFail, config.TxnOptions{
resp, err := cc.Txn(ctx, req.compare, req.ifSucess, req.ifFail, config.TxnOptions{
Interactive: true,
})
if err != nil {
@ -106,11 +106,11 @@ func TestTxnFail(t *testing.T) {
defer clus.Close()
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
if err := cc.Put("key1", "value1", config.PutOptions{}); err != nil {
if err := cc.Put(ctx, "key1", "value1", config.PutOptions{}); err != nil {
t.Fatalf("could not create key:%s, value:%s", "key1", "value1")
}
for _, req := range reqs {
resp, err := cc.Txn(req.compare, req.ifSucess, req.ifFail, config.TxnOptions{
resp, err := cc.Txn(ctx, req.compare, req.ifSucess, req.ifFail, config.TxnOptions{
Interactive: true,
})
if err != nil {

View File

@ -71,7 +71,7 @@ func TestUserAdd_Simple(t *testing.T) {
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
resp, err := cc.UserAdd(nc.username, nc.password, config.UserAddOptions{NoPassword: nc.noPassword})
resp, err := cc.UserAdd(ctx, nc.username, nc.password, config.UserAddOptions{NoPassword: nc.noPassword})
if nc.expectedError != "" {
if err != nil {
assert.Contains(t, err.Error(), nc.expectedError)
@ -108,12 +108,12 @@ func TestUserAdd_DuplicateUserNotAllowed(t *testing.T) {
user := "barb"
password := "rhubarb"
_, err := cc.UserAdd(user, password, config.UserAddOptions{})
_, err := cc.UserAdd(ctx, user, password, config.UserAddOptions{})
if err != nil {
t.Fatalf("first user creation should succeed, err: %v", err)
}
_, err = cc.UserAdd(user, password, config.UserAddOptions{})
_, err = cc.UserAdd(ctx, user, password, config.UserAddOptions{})
if err == nil {
t.Fatalf("duplicate user creation should fail")
}
@ -135,7 +135,7 @@ func TestUserList(t *testing.T) {
testutils.ExecuteUntil(ctx, t, func() {
// No Users Yet
resp, err := cc.UserList()
resp, err := cc.UserList(ctx)
if err != nil {
t.Fatalf("user listing should succeed, err: %v", err)
}
@ -146,13 +146,13 @@ func TestUserList(t *testing.T) {
user := "barb"
password := "rhubarb"
_, err = cc.UserAdd(user, password, config.UserAddOptions{})
_, err = cc.UserAdd(ctx, user, password, config.UserAddOptions{})
if err != nil {
t.Fatalf("user creation should succeed, err: %v", err)
}
// Users!
resp, err = cc.UserList()
resp, err = cc.UserList(ctx)
if err != nil {
t.Fatalf("user listing should succeed, err: %v", err)
}
@ -178,12 +178,12 @@ func TestUserDelete(t *testing.T) {
user := "barb"
password := "rhubarb"
_, err := cc.UserAdd(user, password, config.UserAddOptions{})
_, err := cc.UserAdd(ctx, user, password, config.UserAddOptions{})
if err != nil {
t.Fatalf("user creation should succeed, err: %v", err)
}
resp, err := cc.UserList()
resp, err := cc.UserList(ctx)
if err != nil {
t.Fatalf("user listing should succeed, err: %v", err)
}
@ -192,12 +192,12 @@ func TestUserDelete(t *testing.T) {
}
// Delete barb, sorry barb!
_, err = cc.UserDelete(user)
_, err = cc.UserDelete(ctx, user)
if err != nil {
t.Fatalf("user deletion should succeed at first, err: %v", err)
}
resp, err = cc.UserList()
resp, err = cc.UserList(ctx)
if err != nil {
t.Fatalf("user listing should succeed, err: %v", err)
}
@ -206,7 +206,7 @@ func TestUserDelete(t *testing.T) {
}
// Try to delete barb again
_, err = cc.UserDelete(user)
_, err = cc.UserDelete(ctx, user)
if err == nil {
t.Fatalf("deleting a non-existent user should fail")
}
@ -231,17 +231,17 @@ func TestUserChangePassword(t *testing.T) {
password := "rhubarb"
newPassword := "potato"
_, err := cc.UserAdd(user, password, config.UserAddOptions{})
_, err := cc.UserAdd(ctx, user, password, config.UserAddOptions{})
if err != nil {
t.Fatalf("user creation should succeed, err: %v", err)
}
err = cc.UserChangePass(user, newPassword)
err = cc.UserChangePass(ctx, user, newPassword)
if err != nil {
t.Fatalf("user password change should succeed, err: %v", err)
}
err = cc.UserChangePass("non-existent-user", newPassword)
err = cc.UserChangePass(ctx, "non-existent-user", newPassword)
if err == nil {
t.Fatalf("user password change for non-existent user should fail")
}

View File

@ -0,0 +1,82 @@
package common
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/testutils"
)
func TestWatch(t *testing.T) {
testRunner.BeforeTest(t)
watchTimeout := 1 * time.Second
for _, tc := range clusterTestCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
clus := testRunner.NewCluster(ctx, t, tc.config)
defer clus.Close()
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
tests := []struct {
puts []testutils.KV
watchKey string
opts config.WatchOptions
wanted []testutils.KV
}{
{ // watch by revision
puts: []testutils.KV{{Key: "bar", Val: "revision_1"}, {Key: "bar", Val: "revision_2"}, {Key: "bar", Val: "revision_3"}},
watchKey: "bar",
opts: config.WatchOptions{Revision: 3},
wanted: []testutils.KV{{Key: "bar", Val: "revision_2"}, {Key: "bar", Val: "revision_3"}},
},
{ // watch 1 key
puts: []testutils.KV{{Key: "sample", Val: "value"}},
watchKey: "sample",
opts: config.WatchOptions{Revision: 1},
wanted: []testutils.KV{{Key: "sample", Val: "value"}},
},
{ // watch 3 keys by prefix
puts: []testutils.KV{{Key: "foo1", Val: "val1"}, {Key: "foo2", Val: "val2"}, {Key: "foo3", Val: "val3"}},
watchKey: "foo",
opts: config.WatchOptions{Revision: 1, Prefix: true},
wanted: []testutils.KV{{Key: "foo1", Val: "val1"}, {Key: "foo2", Val: "val2"}, {Key: "foo3", Val: "val3"}},
},
{ // watch 3 keys by range
puts: []testutils.KV{{Key: "key1", Val: "val1"}, {Key: "key3", Val: "val3"}, {Key: "key2", Val: "val2"}},
watchKey: "key",
opts: config.WatchOptions{Revision: 1, RangeEnd: "key3"},
wanted: []testutils.KV{{Key: "key1", Val: "val1"}, {Key: "key2", Val: "val2"}},
},
}
for _, tt := range tests {
wCtx, wCancel := context.WithCancel(ctx)
wch := cc.Watch(wCtx, tt.watchKey, tt.opts)
if wch == nil {
t.Fatalf("failed to watch %s", tt.watchKey)
}
for j := range tt.puts {
if err := cc.Put(ctx, tt.puts[j].Key, tt.puts[j].Val, config.PutOptions{}); err != nil {
t.Fatalf("can't not put key %q, err: %s", tt.puts[j].Key, err)
}
}
kvs, err := testutils.KeyValuesFromWatchChan(wch, len(tt.wanted), watchTimeout)
if err != nil {
wCancel()
t.Fatalf("failed to get key-values from watch channel %s", err)
}
wCancel()
assert.Equal(t, tt.wanted, kvs)
}
})
})
}
}

View File

@ -61,7 +61,7 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) {
t.Log("Downgrade enabled, validating if cluster is ready for downgrade")
for i := 0; i < len(epc.Procs); i++ {
expectLog(t, epc.Procs[i], "The server is ready to downgrade")
e2e.AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade")
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr})
}
t.Log("Cluster is ready for downgrade")
@ -73,7 +73,7 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) {
startEtcd(t, epc.Procs[i], lastReleaseBinary)
}
t.Log("All members downgraded, validating downgrade")
expectLog(t, leader(t, epc), "the cluster has been downgraded")
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: lastVersionStr})
}
@ -124,7 +124,7 @@ func startEtcd(t *testing.T, ep e2e.EtcdProcess, execPath string) {
func downgradeEnable(t *testing.T, epc *e2e.EtcdProcessCluster, ver semver.Version) {
c := e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
testutils.ExecuteWithTimeout(t, 20*time.Second, func() {
err := c.DowngradeEnable(ver.String())
err := c.DowngradeEnable(context.TODO(), ver.String())
if err != nil {
t.Fatal(err)
}
@ -164,17 +164,6 @@ func validateVersion(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, member e2e
}
}
func expectLog(t *testing.T, ep e2e.EtcdProcess, expectLog string) {
t.Helper()
var err error
testutils.ExecuteWithTimeout(t, 30*time.Second, func() {
_, err = ep.Logs().Expect(expectLog)
})
if err != nil {
t.Fatal(err)
}
}
func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

View File

@ -100,6 +100,8 @@ func corruptTest(cx ctlCtx) {
func TestPeriodicCheckDetectsCorruption(t *testing.T) {
checkTime := time.Second
e2e.BeforeTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
KeepDataDir: true,
@ -117,11 +119,11 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) {
cc := e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
for i := 0; i < 10; i++ {
err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i), config.PutOptions{})
err := cc.Put(ctx, testutil.PickKey(int64(i)), fmt.Sprint(i), config.PutOptions{})
assert.NoError(t, err, "error on put")
}
members, err := cc.MemberList()
members, err := cc.MemberList(ctx)
assert.NoError(t, err, "error on member list")
var memberID uint64
for _, m := range members.Members {
@ -137,7 +139,7 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) {
err = epc.Procs[0].Restart()
assert.NoError(t, err)
time.Sleep(checkTime * 11 / 10)
alarmResponse, err := cc.AlarmList()
alarmResponse, err := cc.AlarmList(ctx)
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms)
}
@ -145,6 +147,8 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) {
func TestCompactHashCheckDetectCorruption(t *testing.T) {
checkTime := time.Second
e2e.BeforeTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
KeepDataDir: true,
@ -163,10 +167,10 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
cc := e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
for i := 0; i < 10; i++ {
err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i), config.PutOptions{})
err := cc.Put(ctx, testutil.PickKey(int64(i)), fmt.Sprint(i), config.PutOptions{})
assert.NoError(t, err, "error on put")
}
members, err := cc.MemberList()
members, err := cc.MemberList(ctx)
assert.NoError(t, err, "error on member list")
var memberID uint64
for _, m := range members.Members {
@ -181,10 +185,10 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
err = epc.Procs[0].Restart()
assert.NoError(t, err)
_, err = cc.Compact(5, config.CompactOption{})
_, err = cc.Compact(ctx, 5, config.CompactOption{})
assert.NoError(t, err)
time.Sleep(checkTime * 11 / 10)
alarmResponse, err := cc.AlarmList()
alarmResponse, err := cc.AlarmList(ctx)
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms)
}

View File

@ -77,6 +77,8 @@ func TestCtlV3AuthJWTExpire(t *testing.T) {
}
func TestCtlV3AuthRevisionConsistency(t *testing.T) { testCtl(t, authTestRevisionConsistency) }
func TestCtlV3AuthTestCacheReload(t *testing.T) { testCtl(t, authTestCacheReload) }
func authEnableTest(cx ctlCtx) {
if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
@ -1326,3 +1328,85 @@ func ctlV3User(cx ctlCtx, args []string, expStr string, stdIn []string) error {
_, err = proc.Expect(expStr)
return err
}
// authTestCacheReload tests the permissions when a member restarts
func authTestCacheReload(cx ctlCtx) {
authData := []struct {
user string
role string
pass string
}{
{
user: "root",
role: "root",
pass: "123",
},
{
user: "user0",
role: "role0",
pass: "123",
},
}
node0 := cx.epc.Procs[0]
endpoint := node0.EndpointsV3()[0]
// create a client
c, err := clientv3.New(clientv3.Config{Endpoints: []string{endpoint}, DialTimeout: 3 * time.Second})
if err != nil {
cx.t.Fatal(err)
}
defer c.Close()
for _, authObj := range authData {
// add role
if _, err = c.RoleAdd(context.TODO(), authObj.role); err != nil {
cx.t.Fatal(err)
}
// add user
if _, err = c.UserAdd(context.TODO(), authObj.user, authObj.pass); err != nil {
cx.t.Fatal(err)
}
// grant role to user
if _, err = c.UserGrantRole(context.TODO(), authObj.user, authObj.role); err != nil {
cx.t.Fatal(err)
}
}
// role grant permission to role0
if _, err = c.RoleGrantPermission(context.TODO(), authData[1].role, "foo", "", clientv3.PermissionType(clientv3.PermReadWrite)); err != nil {
cx.t.Fatal(err)
}
// enable auth
if _, err = c.AuthEnable(context.TODO()); err != nil {
cx.t.Fatal(err)
}
// create another client with ID:Password
c2, err := clientv3.New(clientv3.Config{Endpoints: []string{endpoint}, Username: authData[1].user, Password: authData[1].pass, DialTimeout: 3 * time.Second})
if err != nil {
cx.t.Fatal(err)
}
defer c2.Close()
// create foo since that is within the permission set
// expectation is to succeed
if _, err = c2.Put(context.TODO(), "foo", "bar"); err != nil {
cx.t.Fatal(err)
}
// restart the node
node0.WithStopSignal(syscall.SIGINT)
if err := node0.Restart(); err != nil {
cx.t.Fatal(err)
}
// nothing has changed, but it fails without refreshing cache after restart
if _, err = c2.Put(context.TODO(), "foo", "bar2"); err != nil {
cx.t.Fatal(err)
}
}

View File

@ -15,6 +15,7 @@
package e2e
import (
"context"
"os"
"strings"
"testing"
@ -106,7 +107,7 @@ func ctlV3Elect(cx ctlCtx, name, proposal string) (*expect.ExpectProcess, <-chan
return proc, outc, err
}
go func() {
s, xerr := proc.ExpectFunc(func(string) bool { return true })
s, xerr := proc.ExpectFunc(context.TODO(), func(string) bool { return true })
if xerr != nil {
cx.t.Errorf("expect failed (%v)", xerr)
}

View File

@ -18,6 +18,7 @@
package e2e
import (
"context"
"fmt"
"strings"
"testing"
@ -83,6 +84,8 @@ func TestAuthority(t *testing.T) {
for _, clusterSize := range []int{1, 3} {
t.Run(fmt.Sprintf("Size: %d, Scenario: %q", clusterSize, tc.name), func(t *testing.T) {
e2e.BeforeTest(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfg := e2e.NewConfigNoTLS()
cfg.ClusterSize = clusterSize
@ -101,7 +104,7 @@ func TestAuthority(t *testing.T) {
endpoints := templateEndpoints(t, tc.clientURLPattern, epc)
client := e2e.NewEtcdctl(cfg, endpoints)
err = client.Put("foo", "bar", config.PutOptions{})
err = client.Put(ctx, "foo", "bar", config.PutOptions{})
if err != nil {
t.Fatal(err)
}
@ -148,7 +151,7 @@ func firstMatch(t *testing.T, expectLine string, logs ...e2e.LogsExpect) string
match := make(chan string, len(logs))
for i := range logs {
go func(l e2e.LogsExpect) {
line, _ := l.Expect(expectLine)
line, _ := l.ExpectWithContext(context.TODO(), expectLine)
match <- line
}(logs[i])
}

View File

@ -15,6 +15,7 @@
package e2e
import (
"context"
"fmt"
"os"
"strings"
@ -127,7 +128,7 @@ func ctlV3Lock(cx ctlCtx, name string) (*expect.ExpectProcess, <-chan string, er
return proc, outc, err
}
go func() {
s, xerr := proc.ExpectFunc(func(string) bool { return true })
s, xerr := proc.ExpectFunc(context.TODO(), func(string) bool { return true })
if xerr != nil {
cx.t.Errorf("expect failed (%v)", xerr)
}

View File

@ -121,12 +121,11 @@ func dialWithSchemeTest(cx ctlCtx) {
}
type ctlCtx struct {
t *testing.T
apiPrefix string
cfg e2e.EtcdProcessClusterConfig
quotaBackendBytes int64
corruptFunc func(string) error
noStrictReconfig bool
t *testing.T
apiPrefix string
cfg e2e.EtcdProcessClusterConfig
corruptFunc func(string) error
noStrictReconfig bool
epc *e2e.EtcdProcessCluster
@ -143,9 +142,6 @@ type ctlCtx struct {
initialCorruptCheck bool
// for compaction
compactPhysical bool
// dir that was used during the test
dataDir string
}
@ -156,6 +152,7 @@ func (cx *ctlCtx) applyOpts(opts []ctlOption) {
for _, opt := range opts {
opt(cx)
}
cx.initialCorruptCheck = true
}
@ -179,10 +176,6 @@ func withInteractive() ctlOption {
return func(cx *ctlCtx) { cx.interactive = true }
}
func withQuota(b int64) ctlOption {
return func(cx *ctlCtx) { cx.quotaBackendBytes = b }
}
func withInitialCorruptCheck() ctlOption {
return func(cx *ctlCtx) { cx.initialCorruptCheck = true }
}
@ -232,9 +225,6 @@ func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc fun
if !ret.quorum {
ret.cfg = *e2e.ConfigStandalone(ret.cfg)
}
if ret.quotaBackendBytes > 0 {
ret.cfg.QuotaBackendBytes = ret.quotaBackendBytes
}
ret.cfg.NoStrictReconfig = ret.noStrictReconfig
if ret.initialCorruptCheck {
ret.cfg.InitialCorruptCheck = ret.initialCorruptCheck

View File

@ -52,43 +52,21 @@ func watchTest(cx ctlCtx) {
wkv []kvExec
}{
{ // watch 1 key
puts: []kv{{"sample", "value"}},
args: []string{"sample", "--rev", "1"},
wkv: []kvExec{{key: "sample", val: "value"}},
},
{ // watch 1 key with env
puts: []kv{{"sample", "value"}},
envKey: "sample",
args: []string{"--rev", "1"},
wkv: []kvExec{{key: "sample", val: "value"}},
},
// coverage tests get extra arguments:
// ./bin/etcdctl_test -test.coverprofile=e2e.1525392462795198897.coverprofile -test.outputdir=../..
// do not test watch exec commands
{ // watch 3 keys by prefix
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
args: []string{"key", "--rev", "1", "--prefix"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
},
{ // watch 3 keys by prefix, with env
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
envKey: "key",
args: []string{"--rev", "1", "--prefix"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
},
{ // watch by revision
puts: []kv{{"etcd", "revision_1"}, {"etcd", "revision_2"}, {"etcd", "revision_3"}},
args: []string{"etcd", "--rev", "2"},
wkv: []kvExec{{key: "etcd", val: "revision_2"}, {key: "etcd", val: "revision_3"}},
},
{ // watch 3 keys by range
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
args: []string{"key", "key3", "--rev", "1"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}},
},
{ // watch 3 keys by range, with env
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
envKey: "key",

View File

@ -52,11 +52,6 @@ func watchTest(cx ctlCtx) {
wkv []kvExec
}{
{ // watch 1 key
puts: []kv{{"sample", "value"}},
args: []string{"sample", "--rev", "1"},
wkv: []kvExec{{key: "sample", val: "value"}},
},
{ // watch 1 key with env
puts: []kv{{"sample", "value"}},
envKey: "sample",
@ -101,27 +96,12 @@ func watchTest(cx ctlCtx) {
args: []string{"sample", "--rev", "1", "samplx", "--", "echo", "watch event received"},
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
},
{ // watch 3 keys by prefix
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
args: []string{"key", "--rev", "1", "--prefix"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
},
{ // watch 3 keys by prefix, with env
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
envKey: "key",
args: []string{"--rev", "1", "--prefix"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
},
{ // watch by revision
puts: []kv{{"etcd", "revision_1"}, {"etcd", "revision_2"}, {"etcd", "revision_3"}},
args: []string{"etcd", "--rev", "2"},
wkv: []kvExec{{key: "etcd", val: "revision_2"}, {key: "etcd", val: "revision_3"}},
},
{ // watch 3 keys by range
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
args: []string{"key", "key3", "--rev", "1"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}},
},
{ // watch 3 keys by range, with env
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
envKey: "key",

View File

@ -0,0 +1,171 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/framework/testutils"
)
func TestGrpcProxyAutoSync(t *testing.T) {
e2e.SkipInShortMode(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var (
node1Name = "node1"
node1ClientURL = "http://localhost:12379"
node1PeerURL = "http://localhost:12380"
node2Name = "node2"
node2ClientURL = "http://localhost:22379"
node2PeerURL = "http://localhost:22380"
proxyClientURL = "127.0.0.1:32379"
autoSyncInterval = 1 * time.Second
)
// Run cluster of one node
proc1, err := runEtcdNode(
node1Name, t.TempDir(),
node1ClientURL, node1PeerURL,
"new", fmt.Sprintf("%s=%s", node1Name, node1PeerURL),
)
require.NoError(t, err)
// Run grpc-proxy instance
proxyProc, err := e2e.SpawnCmd([]string{e2e.BinDir + "/etcd", "grpc-proxy", "start",
"--advertise-client-url", proxyClientURL, "--listen-addr", proxyClientURL,
"--endpoints", node1ClientURL,
"--endpoints-auto-sync-interval", autoSyncInterval.String(),
}, nil)
require.NoError(t, err)
proxyCtl := e2e.NewEtcdctl(&e2e.EtcdProcessClusterConfig{}, []string{proxyClientURL})
err = proxyCtl.Put(ctx, "k1", "v1", config.PutOptions{})
require.NoError(t, err)
memberCtl := e2e.NewEtcdctl(&e2e.EtcdProcessClusterConfig{}, []string{node1ClientURL})
_, err = memberCtl.MemberAdd(ctx, node2Name, []string{node2PeerURL})
if err != nil {
t.Fatal(err)
}
// Run new member
proc2, err := runEtcdNode(
node2Name, t.TempDir(),
node2ClientURL, node2PeerURL,
"existing", fmt.Sprintf("%s=%s,%s=%s", node1Name, node1PeerURL, node2Name, node2PeerURL),
)
require.NoError(t, err)
// Wait for auto sync of endpoints
err = waitForEndpointInLog(proxyProc, node2ClientURL)
require.NoError(t, err)
memberList, err := memberCtl.MemberList(ctx)
require.NoError(t, err)
node1MemberID, err := findMemberIDByEndpoint(memberList.Members, node1ClientURL)
require.NoError(t, err)
// Second node could be not ready yet
for i := 0; i < 10; i++ {
_, err = memberCtl.MemberRemove(ctx, node1MemberID)
if err != nil && strings.Contains(err.Error(), rpctypes.ErrGRPCUnhealthy.Error()) {
time.Sleep(500 * time.Millisecond)
continue
}
break
}
// Remove node1 from member list and stop this nod
require.NoError(t, err)
require.NoError(t, proc1.Stop())
var resp *clientv3.GetResponse
for i := 0; i < 10; i++ {
resp, err = proxyCtl.Get(ctx, "k1", config.GetOptions{})
if err != nil && strings.Contains(err.Error(), rpctypes.ErrGRPCLeaderChanged.Error()) {
time.Sleep(500 * time.Millisecond)
continue
}
}
require.NoError(t, err)
kvs := testutils.KeyValuesFromGetResponse(resp)
assert.Equal(t, []testutils.KV{{Key: "k1", Val: "v1"}}, kvs)
require.NoError(t, proc2.Stop())
require.NoError(t, proxyProc.Stop())
}
func runEtcdNode(name, dataDir, clientURL, peerURL, clusterState, initialCluster string) (*expect.ExpectProcess, error) {
proc, err := e2e.SpawnCmd([]string{e2e.BinDir + "/etcd",
"--name", name,
"--data-dir", dataDir,
"--listen-client-urls", clientURL, "--advertise-client-urls", clientURL,
"--listen-peer-urls", peerURL, "--initial-advertise-peer-urls", peerURL,
"--initial-cluster-token", "etcd-cluster",
"--initial-cluster-state", clusterState,
"--initial-cluster", initialCluster,
}, nil)
if err != nil {
return nil, err
}
_, err = proc.Expect("ready to serve client requests")
return proc, err
}
func findMemberIDByEndpoint(members []*etcdserverpb.Member, endpoint string) (uint64, error) {
for _, m := range members {
if m.ClientURLs[0] == endpoint {
return m.ID, nil
}
}
return 0, fmt.Errorf("member not found")
}
func waitForEndpointInLog(proxyProc *expect.ExpectProcess, endpoint string) error {
endpoint = strings.Replace(endpoint, "http://", "", 1)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := proxyProc.ExpectFunc(ctx, func(s string) bool {
if strings.Contains(s, endpoint) && strings.Contains(s, "Resolver state updated") {
return true
}
return false
})
return err
}

View File

@ -0,0 +1,44 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"testing"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
func TestInitDaemonNotifyWithoutQuorum(t *testing.T) {
// Initialize a cluster with 3 members
epc, err := e2e.InitEtcdProcessCluster(t, e2e.NewConfigAutoTLS())
if err != nil {
t.Fatalf("Failed to initilize the etcd cluster: %v", err)
}
// Remove two members, so that only one etcd will get started
epc.Procs = epc.Procs[:1]
// Start the etcd cluster with only one member
if err := epc.Start(); err != nil {
t.Fatalf("Failed to start the etcd cluster: %v", err)
}
// Expect log message indicating time out waiting for quorum hit
e2e.AssertProcessLogs(t, epc.Procs[0], "startEtcd: timed out waiting for the ready notification")
// Expect log message indicating systemd notify message has been sent
e2e.AssertProcessLogs(t, epc.Procs[0], "notifying init daemon")
epc.Close()
}

View File

@ -16,6 +16,7 @@ package e2e
import (
"bytes"
"context"
"fmt"
"sort"
"strings"
@ -97,6 +98,8 @@ func TestV2DeprecationSnapshotMatches(t *testing.T) {
e2e.BeforeTest(t)
lastReleaseData := t.TempDir()
currentReleaseData := t.TempDir()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
lastReleaseBinary := e2e.BinDir + "/etcd-last-release"
currentReleaseBinary := e2e.BinDir + "/etcd"
@ -106,10 +109,10 @@ func TestV2DeprecationSnapshotMatches(t *testing.T) {
}
snapshotCount := 10
epc := runEtcdAndCreateSnapshot(t, lastReleaseBinary, lastReleaseData, snapshotCount)
members1 := addAndRemoveKeysAndMembers(t, e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3()), snapshotCount)
members1 := addAndRemoveKeysAndMembers(ctx, t, e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3()), snapshotCount)
assert.NoError(t, epc.Close())
epc = runEtcdAndCreateSnapshot(t, currentReleaseBinary, currentReleaseData, snapshotCount)
members2 := addAndRemoveKeysAndMembers(t, e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3()), snapshotCount)
members2 := addAndRemoveKeysAndMembers(ctx, t, e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3()), snapshotCount)
assert.NoError(t, epc.Close())
assertSnapshotsMatch(t, lastReleaseData, currentReleaseData, func(data []byte) []byte {
@ -130,6 +133,8 @@ func TestV2DeprecationSnapshotMatches(t *testing.T) {
func TestV2DeprecationSnapshotRecover(t *testing.T) {
e2e.BeforeTest(t)
dataDir := t.TempDir()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
lastReleaseBinary := e2e.BinDir + "/etcd-last-release"
currentReleaseBinary := e2e.BinDir + "/etcd"
@ -141,10 +146,10 @@ func TestV2DeprecationSnapshotRecover(t *testing.T) {
cc := e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
lastReleaseGetResponse, err := cc.Get("", config.GetOptions{Prefix: true})
lastReleaseGetResponse, err := cc.Get(ctx, "", config.GetOptions{Prefix: true})
assert.NoError(t, err)
lastReleaseMemberListResponse, err := cc.MemberList()
lastReleaseMemberListResponse, err := cc.MemberList(ctx)
assert.NoError(t, err)
assert.NoError(t, epc.Close())
@ -153,10 +158,10 @@ func TestV2DeprecationSnapshotRecover(t *testing.T) {
assert.NoError(t, err)
cc = e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
currentReleaseGetResponse, err := cc.Get("", config.GetOptions{Prefix: true})
currentReleaseGetResponse, err := cc.Get(ctx, "", config.GetOptions{Prefix: true})
assert.NoError(t, err)
currentReleaseMemberListResponse, err := cc.MemberList()
currentReleaseMemberListResponse, err := cc.MemberList(ctx)
assert.NoError(t, err)
assert.Equal(t, lastReleaseGetResponse.Kvs, currentReleaseGetResponse.Kvs)
@ -171,33 +176,33 @@ func runEtcdAndCreateSnapshot(t testing.TB, binary, dataDir string, snapshotCoun
return epc
}
func addAndRemoveKeysAndMembers(t testing.TB, cc *e2e.EtcdctlV3, snapshotCount int) (members []uint64) {
func addAndRemoveKeysAndMembers(ctx context.Context, t testing.TB, cc *e2e.EtcdctlV3, snapshotCount int) (members []uint64) {
// Execute some non-trivial key&member operation
for i := 0; i < snapshotCount*3; i++ {
err := cc.Put(fmt.Sprintf("%d", i), "1", config.PutOptions{})
err := cc.Put(ctx, fmt.Sprintf("%d", i), "1", config.PutOptions{})
assert.NoError(t, err)
}
member1, err := cc.MemberAddAsLearner("member1", []string{"http://127.0.0.1:2000"})
member1, err := cc.MemberAddAsLearner(ctx, "member1", []string{"http://127.0.0.1:2000"})
assert.NoError(t, err)
members = append(members, member1.Member.ID)
for i := 0; i < snapshotCount*2; i++ {
_, err = cc.Delete(fmt.Sprintf("%d", i), config.DeleteOptions{})
_, err = cc.Delete(ctx, fmt.Sprintf("%d", i), config.DeleteOptions{})
assert.NoError(t, err)
}
_, err = cc.MemberRemove(member1.Member.ID)
_, err = cc.MemberRemove(ctx, member1.Member.ID)
assert.NoError(t, err)
for i := 0; i < snapshotCount; i++ {
err = cc.Put(fmt.Sprintf("%d", i), "2", config.PutOptions{})
err = cc.Put(ctx, fmt.Sprintf("%d", i), "2", config.PutOptions{})
assert.NoError(t, err)
}
member2, err := cc.MemberAddAsLearner("member2", []string{"http://127.0.0.1:2001"})
member2, err := cc.MemberAddAsLearner(ctx, "member2", []string{"http://127.0.0.1:2001"})
assert.NoError(t, err)
members = append(members, member2.Member.ID)
for i := 0; i < snapshotCount/2; i++ {
err = cc.Put(fmt.Sprintf("%d", i), "3", config.PutOptions{})
err = cc.Put(ctx, fmt.Sprintf("%d", i), "3", config.PutOptions{})
assert.NoError(t, err)
}
return members

View File

@ -15,6 +15,7 @@
package e2e
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
@ -249,7 +250,7 @@ func testV3CurlAuth(cx ctlCtx) {
testutil.AssertNil(cx.t, err)
defer proc.Close()
cURLRes, err := proc.ExpectFunc(lineFunc)
cURLRes, err := proc.ExpectFunc(context.Background(), lineFunc)
testutil.AssertNil(cx.t, err)
authRes := make(map[string]interface{})
@ -287,7 +288,7 @@ func testV3CurlCampaign(cx ctlCtx) {
Endpoint: path.Join(cx.apiPrefix, "/election/campaign"),
Value: string(cdata),
})
lines, err := e2e.SpawnWithExpectLines(cargs, cx.envMap, `"leader":{"name":"`)
lines, err := e2e.SpawnWithExpectLines(context.TODO(), cargs, cx.envMap, `"leader":{"name":"`)
if err != nil {
cx.t.Fatalf("failed post campaign request (%s) (%v)", cx.apiPrefix, err)
}

View File

@ -63,3 +63,9 @@ type LeaseOption struct {
type UserAddOptions struct {
NoPassword bool
}
type WatchOptions struct {
Prefix bool
Revision int64
RangeEnd string
}

View File

@ -114,7 +114,7 @@ func (c *e2eCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, mem
t.Fatal("WaitMembersForLeader timeout")
default:
}
_, err := cc.Get("0", config.GetOptions{Timeout: 10*config.TickDuration + time.Second})
_, err := cc.Get(ctx, "0", config.GetOptions{Timeout: 10*config.TickDuration + time.Second})
if err == nil || strings.Contains(err.Error(), "Key not found") {
break
}
@ -129,7 +129,7 @@ func (c *e2eCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, mem
default:
}
for i := range membs {
resp, err := membs[i].Client().Status()
resp, err := membs[i].Client().Status(ctx)
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
// if member[i] has stopped

View File

@ -19,13 +19,15 @@ import (
"net/url"
"os"
"path"
"regexp"
"strings"
"testing"
"time"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"go.etcd.io/etcd/server/v3/etcdserver"
)
const EtcdProcessBasePort = 20000
@ -38,6 +40,9 @@ const (
ClientTLSAndNonTLS
)
// allow alphanumerics, underscores and dashes
var testNameCleanRegex = regexp.MustCompile(`[^a-zA-Z0-9 \-_]+`)
func NewConfigNoTLS() *EtcdProcessClusterConfig {
return &EtcdProcessClusterConfig{ClusterSize: 3,
InitialToken: "new",
@ -284,7 +289,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*
}
purl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
name := fmt.Sprintf("test-%d", i)
name := fmt.Sprintf("%s-test-%d", testNameCleanRegex.ReplaceAllString(tb.Name(), ""), i)
dataDirPath := cfg.DataDirPath
if cfg.DataDirPath == "" {
dataDirPath = tb.TempDir()

View File

@ -26,8 +26,9 @@ import (
"strconv"
"strings"
"go.etcd.io/etcd/pkg/v3/expect"
"go.uber.org/zap"
"go.etcd.io/etcd/pkg/v3/expect"
)
type proxyEtcdProcess struct {
@ -109,6 +110,7 @@ func (p *proxyEtcdProcess) Logs() LogsExpect {
type proxyProc struct {
lg *zap.Logger
name string
execPath string
args []string
ep string
@ -124,7 +126,7 @@ func (pp *proxyProc) start() error {
if pp.proc != nil {
panic("already started")
}
proc, err := SpawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...), nil)
proc, err := SpawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...), nil, pp.name)
if err != nil {
return err
}
@ -188,6 +190,7 @@ func newProxyV2Proc(cfg *EtcdServerProcessConfig) *proxyV2Proc {
}
return &proxyV2Proc{
proxyProc: proxyProc{
name: cfg.Name,
lg: cfg.lg,
execPath: cfg.ExecPath,
args: append(args, cfg.TlsArgs...),
@ -251,6 +254,7 @@ func newProxyV3Proc(cfg *EtcdServerProcessConfig) *proxyV3Proc {
return &proxyV3Proc{
proxyProc{
name: cfg.Name,
lg: cfg.lg,
execPath: cfg.ExecPath,
args: append(args, tlsArgs...),

View File

@ -15,13 +15,17 @@
package e2e
import (
"context"
"fmt"
"net/url"
"os"
"testing"
"time"
"go.uber.org/zap"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/pkg/v3/expect"
"go.uber.org/zap"
)
var (
@ -47,7 +51,7 @@ type EtcdProcess interface {
}
type LogsExpect interface {
Expect(string) (string, error)
ExpectWithContext(context.Context, string) (string, error)
Lines() []string
LineCount() int
}
@ -100,33 +104,33 @@ func (ep *EtcdServerProcess) Start() error {
panic("already started")
}
ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.Name))
proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars)
proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name)
if err != nil {
return err
}
ep.proc = proc
err = ep.waitReady()
if err == nil {
ep.cfg.lg.Info("started server.", zap.String("name", ep.cfg.Name))
ep.cfg.lg.Info("started server.", zap.String("name", ep.cfg.Name), zap.Int("pid", ep.proc.Pid()))
}
return err
}
func (ep *EtcdServerProcess) Restart() error {
ep.cfg.lg.Info("restaring server...", zap.String("name", ep.cfg.Name))
ep.cfg.lg.Info("restarting server...", zap.String("name", ep.cfg.Name))
if err := ep.Stop(); err != nil {
return err
}
ep.donec = make(chan struct{})
err := ep.Start()
if err == nil {
ep.cfg.lg.Info("restared server", zap.String("name", ep.cfg.Name))
ep.cfg.lg.Info("restarted server", zap.String("name", ep.cfg.Name))
}
return err
}
func (ep *EtcdServerProcess) Stop() (err error) {
ep.cfg.lg.Info("stoping server...", zap.String("name", ep.cfg.Name))
ep.cfg.lg.Info("stopping server...", zap.String("name", ep.cfg.Name))
if ep == nil || ep.proc == nil {
return nil
}
@ -174,7 +178,18 @@ func (ep *EtcdServerProcess) Config() *EtcdServerProcessConfig { return ep.cfg }
func (ep *EtcdServerProcess) Logs() LogsExpect {
if ep.proc == nil {
ep.cfg.lg.Panic("Please grap logs before process is stopped")
ep.cfg.lg.Panic("Please grab logs before process is stopped")
}
return ep.proc
}
func AssertProcessLogs(t *testing.T, ep EtcdProcess, expectLog string) {
t.Helper()
var err error
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = ep.Logs().ExpectWithContext(ctx, expectLog)
if err != nil {
t.Fatal(err)
}
}

View File

@ -15,10 +15,17 @@
package e2e
import (
"go.etcd.io/etcd/pkg/v3/expect"
"strings"
"go.uber.org/zap"
"go.etcd.io/etcd/pkg/v3/expect"
)
func SpawnCmd(args []string, envVars map[string]string) (*expect.ExpectProcess, error) {
return SpawnCmdWithLogger(zap.NewNop(), args, envVars)
return SpawnNamedCmd(strings.Join(args, "_"), args, envVars)
}
func SpawnNamedCmd(processName string, args []string, envVars map[string]string) (*expect.ExpectProcess, error) {
return SpawnCmdWithLogger(zap.NewNop(), args, envVars, processName)
}

View File

@ -36,7 +36,7 @@ var (
coverDir = integration.MustAbsPath(os.Getenv("COVERDIR"))
)
func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string) (*expect.ExpectProcess, error) {
func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string, name string) (*expect.ExpectProcess, error) {
cmd := args[0]
env := mergeEnvVariables(envVars)
switch {
@ -63,9 +63,13 @@ func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string
// when withFlagByEnv() is used in testCtl(), env variables for ctl is set to os.env.
// they must be included in ctl_cov_env.
all_args := append(args[1:], covArgs...)
lg.Info("spawning process", zap.Strings("args", all_args), zap.String("working-dir", wd))
ep, err := expect.NewExpectWithEnv(cmd, all_args, env)
allArgs := append(args[1:], covArgs...)
lg.Info("spawning process in cov test",
zap.Strings("args", args),
zap.String("working-dir", wd),
zap.String("name", name),
zap.Strings("environment-variables", env))
ep, err := expect.NewExpectWithEnv(cmd, allArgs, env, name)
if err != nil {
return nil, err
}

View File

@ -21,13 +21,14 @@ import (
"os"
"strings"
"go.etcd.io/etcd/pkg/v3/expect"
"go.uber.org/zap"
"go.etcd.io/etcd/pkg/v3/expect"
)
const noOutputLineCount = 0 // regular binaries emit no extra lines
func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string) (*expect.ExpectProcess, error) {
func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string, name string) (*expect.ExpectProcess, error) {
wd, err := os.Getwd()
if err != nil {
return nil, err
@ -35,9 +36,17 @@ func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string
env := mergeEnvVariables(envVars)
if strings.HasSuffix(args[0], "/etcdctl3") {
env = append(env, "ETCDCTL_API=3")
lg.Info("spawning process with ETCDCTL_API=3", zap.Strings("args", args), zap.String("working-dir", wd), zap.Strings("environment-variables", env))
return expect.NewExpectWithEnv(CtlBinPath, args[1:], env)
lg.Info("spawning process with ETCDCTL_API=3",
zap.Strings("args", args),
zap.String("working-dir", wd),
zap.String("name", name),
zap.Strings("environment-variables", env))
return expect.NewExpectWithEnv(CtlBinPath, args[1:], env, name)
}
lg.Info("spawning process", zap.Strings("args", args), zap.String("working-dir", wd), zap.Strings("environment-variables", env))
return expect.NewExpectWithEnv(args[0], args[1:], env)
lg.Info("spawning process",
zap.Strings("args", args),
zap.String("working-dir", wd),
zap.String("name", name),
zap.Strings("environment-variables", env))
return expect.NewExpectWithEnv(args[0], args[1:], env, name)
}

View File

@ -15,6 +15,7 @@
package e2e
import (
"context"
"encoding/json"
"fmt"
"io"
@ -39,11 +40,12 @@ func NewEtcdctl(cfg *EtcdProcessClusterConfig, endpoints []string) *EtcdctlV3 {
}
}
func (ctl *EtcdctlV3) DowngradeEnable(version string) error {
return SpawnWithExpect(ctl.cmdArgs("downgrade", "enable", version), "Downgrade enable success")
func (ctl *EtcdctlV3) DowngradeEnable(ctx context.Context, version string) error {
_, err := SpawnWithExpectLines(ctx, ctl.cmdArgs("downgrade", "enable", version), nil, "Downgrade enable success")
return err
}
func (ctl *EtcdctlV3) Get(key string, o config.GetOptions) (*clientv3.GetResponse, error) {
func (ctl *EtcdctlV3) Get(ctx context.Context, key string, o config.GetOptions) (*clientv3.GetResponse, error) {
resp := clientv3.GetResponse{}
var args []string
if o.Timeout != 0 {
@ -102,23 +104,25 @@ func (ctl *EtcdctlV3) Get(key string, o config.GetOptions) (*clientv3.GetRespons
if err != nil {
return nil, err
}
_, err = cmd.Expect("Count")
defer cmd.Close()
_, err = cmd.ExpectWithContext(ctx, "Count")
return &resp, err
}
err := ctl.spawnJsonCmd(&resp, args...)
err := ctl.spawnJsonCmd(ctx, &resp, args...)
return &resp, err
}
func (ctl *EtcdctlV3) Put(key, value string, opts config.PutOptions) error {
func (ctl *EtcdctlV3) Put(ctx context.Context, key, value string, opts config.PutOptions) error {
args := ctl.cmdArgs()
args = append(args, "put", key, value)
if opts.LeaseID != 0 {
args = append(args, "--lease", strconv.FormatInt(int64(opts.LeaseID), 16))
}
return SpawnWithExpect(args, "OK")
_, err := SpawnWithExpectLines(ctx, args, nil, "OK")
return err
}
func (ctl *EtcdctlV3) Delete(key string, o config.DeleteOptions) (*clientv3.DeleteResponse, error) {
func (ctl *EtcdctlV3) Delete(ctx context.Context, key string, o config.DeleteOptions) (*clientv3.DeleteResponse, error) {
args := []string{"del", key}
if o.End != "" {
args = append(args, o.End)
@ -130,11 +134,11 @@ func (ctl *EtcdctlV3) Delete(key string, o config.DeleteOptions) (*clientv3.Dele
args = append(args, "--from-key")
}
var resp clientv3.DeleteResponse
err := ctl.spawnJsonCmd(&resp, args...)
err := ctl.spawnJsonCmd(ctx, &resp, args...)
return &resp, err
}
func (ctl *EtcdctlV3) Txn(compares, ifSucess, ifFail []string, o config.TxnOptions) (*clientv3.TxnResponse, error) {
func (ctl *EtcdctlV3) Txn(ctx context.Context, compares, ifSucess, ifFail []string, o config.TxnOptions) (*clientv3.TxnResponse, error) {
args := ctl.cmdArgs()
args = append(args, "txn")
if o.Interactive {
@ -145,7 +149,8 @@ func (ctl *EtcdctlV3) Txn(compares, ifSucess, ifFail []string, o config.TxnOptio
if err != nil {
return nil, err
}
_, err = cmd.Expect("compares:")
defer cmd.Close()
_, err = cmd.ExpectWithContext(ctx, "compares:")
if err != nil {
return nil, err
}
@ -157,7 +162,7 @@ func (ctl *EtcdctlV3) Txn(compares, ifSucess, ifFail []string, o config.TxnOptio
if err := cmd.Send("\r"); err != nil {
return nil, err
}
_, err = cmd.Expect("success requests (get, put, del):")
_, err = cmd.ExpectWithContext(ctx, "success requests (get, put, del):")
if err != nil {
return nil, err
}
@ -170,7 +175,7 @@ func (ctl *EtcdctlV3) Txn(compares, ifSucess, ifFail []string, o config.TxnOptio
return nil, err
}
_, err = cmd.Expect("failure requests (get, put, del):")
_, err = cmd.ExpectWithContext(ctx, "failure requests (get, put, del):")
if err != nil {
return nil, err
}
@ -183,7 +188,7 @@ func (ctl *EtcdctlV3) Txn(compares, ifSucess, ifFail []string, o config.TxnOptio
return nil, err
}
var line string
line, err = cmd.Expect("header")
line, err = cmd.ExpectWithContext(ctx, "header")
if err != nil {
return nil, err
}
@ -229,21 +234,27 @@ func AddTxnResponse(resp *clientv3.TxnResponse, jsonData string) {
}
}
}
func (ctl *EtcdctlV3) MemberList() (*clientv3.MemberListResponse, error) {
func (ctl *EtcdctlV3) MemberList(ctx context.Context) (*clientv3.MemberListResponse, error) {
var resp clientv3.MemberListResponse
err := ctl.spawnJsonCmd(&resp, "member", "list")
err := ctl.spawnJsonCmd(ctx, &resp, "member", "list")
return &resp, err
}
func (ctl *EtcdctlV3) MemberAddAsLearner(name string, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
func (ctl *EtcdctlV3) MemberAdd(ctx context.Context, name string, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
var resp clientv3.MemberAddResponse
err := ctl.spawnJsonCmd(&resp, "member", "add", name, "--learner", "--peer-urls", strings.Join(peerAddrs, ","))
err := ctl.spawnJsonCmd(ctx, &resp, "member", "add", name, "--peer-urls", strings.Join(peerAddrs, ","))
return &resp, err
}
func (ctl *EtcdctlV3) MemberRemove(id uint64) (*clientv3.MemberRemoveResponse, error) {
func (ctl *EtcdctlV3) MemberAddAsLearner(ctx context.Context, name string, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
var resp clientv3.MemberAddResponse
err := ctl.spawnJsonCmd(ctx, &resp, "member", "add", name, "--learner", "--peer-urls", strings.Join(peerAddrs, ","))
return &resp, err
}
func (ctl *EtcdctlV3) MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) {
var resp clientv3.MemberRemoveResponse
err := ctl.spawnJsonCmd(&resp, "member", "remove", fmt.Sprintf("%x", id))
err := ctl.spawnJsonCmd(ctx, &resp, "member", "remove", fmt.Sprintf("%x", id))
return &resp, err
}
@ -275,7 +286,7 @@ func (ctl *EtcdctlV3) flags() map[string]string {
return fmap
}
func (ctl *EtcdctlV3) Compact(rev int64, o config.CompactOption) (*clientv3.CompactResponse, error) {
func (ctl *EtcdctlV3) Compact(ctx context.Context, rev int64, o config.CompactOption) (*clientv3.CompactResponse, error) {
args := ctl.cmdArgs("compact", fmt.Sprint(rev))
if o.Timeout != 0 {
args = append(args, fmt.Sprintf("--command-timeout=%s", o.Timeout))
@ -284,15 +295,16 @@ func (ctl *EtcdctlV3) Compact(rev int64, o config.CompactOption) (*clientv3.Comp
args = append(args, "--physical")
}
return nil, SpawnWithExpect(args, fmt.Sprintf("compacted revision %v", rev))
_, err := SpawnWithExpectLines(ctx, args, nil, fmt.Sprintf("compacted revision %v", rev))
return nil, err
}
func (ctl *EtcdctlV3) Status() ([]*clientv3.StatusResponse, error) {
func (ctl *EtcdctlV3) Status(ctx context.Context) ([]*clientv3.StatusResponse, error) {
var epStatus []*struct {
Endpoint string
Status *clientv3.StatusResponse
}
err := ctl.spawnJsonCmd(&epStatus, "endpoint", "status")
err := ctl.spawnJsonCmd(ctx, &epStatus, "endpoint", "status")
if err != nil {
return nil, err
}
@ -303,12 +315,12 @@ func (ctl *EtcdctlV3) Status() ([]*clientv3.StatusResponse, error) {
return resp, err
}
func (ctl *EtcdctlV3) HashKV(rev int64) ([]*clientv3.HashKVResponse, error) {
func (ctl *EtcdctlV3) HashKV(ctx context.Context, rev int64) ([]*clientv3.HashKVResponse, error) {
var epHashKVs []*struct {
Endpoint string
HashKV *clientv3.HashKVResponse
}
err := ctl.spawnJsonCmd(&epHashKVs, "endpoint", "hashkv", "--endpoints", strings.Join(ctl.endpoints, ","), "--rev", fmt.Sprint(rev))
err := ctl.spawnJsonCmd(ctx, &epHashKVs, "endpoint", "hashkv", "--endpoints", strings.Join(ctl.endpoints, ","), "--rev", fmt.Sprint(rev))
if err != nil {
return nil, err
}
@ -319,25 +331,27 @@ func (ctl *EtcdctlV3) HashKV(rev int64) ([]*clientv3.HashKVResponse, error) {
return resp, err
}
func (ctl *EtcdctlV3) Health() error {
func (ctl *EtcdctlV3) Health(ctx context.Context) error {
args := ctl.cmdArgs()
args = append(args, "endpoint", "health")
lines := make([]string, len(ctl.endpoints))
for i := range lines {
lines[i] = "is healthy"
}
return SpawnWithExpects(args, map[string]string{}, lines...)
_, err := SpawnWithExpectLines(ctx, args, nil, lines...)
return err
}
func (ctl *EtcdctlV3) Grant(ttl int64) (*clientv3.LeaseGrantResponse, error) {
func (ctl *EtcdctlV3) Grant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) {
args := ctl.cmdArgs()
args = append(args, "lease", "grant", strconv.FormatInt(ttl, 10), "-w", "json")
cmd, err := SpawnCmd(args, nil)
if err != nil {
return nil, err
}
defer cmd.Close()
var resp clientv3.LeaseGrantResponse
line, err := cmd.Expect("ID")
line, err := cmd.ExpectWithContext(ctx, "ID")
if err != nil {
return nil, err
}
@ -345,7 +359,7 @@ func (ctl *EtcdctlV3) Grant(ttl int64) (*clientv3.LeaseGrantResponse, error) {
return &resp, err
}
func (ctl *EtcdctlV3) TimeToLive(id clientv3.LeaseID, o config.LeaseOption) (*clientv3.LeaseTimeToLiveResponse, error) {
func (ctl *EtcdctlV3) TimeToLive(ctx context.Context, id clientv3.LeaseID, o config.LeaseOption) (*clientv3.LeaseTimeToLiveResponse, error) {
args := ctl.cmdArgs()
args = append(args, "lease", "timetolive", strconv.FormatInt(int64(id), 16), "-w", "json")
if o.WithAttachedKeys {
@ -355,8 +369,9 @@ func (ctl *EtcdctlV3) TimeToLive(id clientv3.LeaseID, o config.LeaseOption) (*cl
if err != nil {
return nil, err
}
defer cmd.Close()
var resp clientv3.LeaseTimeToLiveResponse
line, err := cmd.Expect("id")
line, err := cmd.ExpectWithContext(ctx, "id")
if err != nil {
return nil, err
}
@ -364,7 +379,7 @@ func (ctl *EtcdctlV3) TimeToLive(id clientv3.LeaseID, o config.LeaseOption) (*cl
return &resp, err
}
func (ctl *EtcdctlV3) Defragment(o config.DefragOption) error {
func (ctl *EtcdctlV3) Defragment(ctx context.Context, o config.DefragOption) error {
args := append(ctl.cmdArgs(), "defrag")
if o.Timeout != 0 {
args = append(args, fmt.Sprintf("--command-timeout=%s", o.Timeout))
@ -373,18 +388,19 @@ func (ctl *EtcdctlV3) Defragment(o config.DefragOption) error {
for i := range lines {
lines[i] = "Finished defragmenting etcd member"
}
_, err := SpawnWithExpectLines(args, map[string]string{}, lines...)
_, err := SpawnWithExpectLines(ctx, args, map[string]string{}, lines...)
return err
}
func (ctl *EtcdctlV3) LeaseList() (*clientv3.LeaseLeasesResponse, error) {
func (ctl *EtcdctlV3) Leases(ctx context.Context) (*clientv3.LeaseLeasesResponse, error) {
args := ctl.cmdArgs("lease", "list", "-w", "json")
cmd, err := SpawnCmd(args, nil)
if err != nil {
return nil, err
}
defer cmd.Close()
var resp clientv3.LeaseLeasesResponse
line, err := cmd.Expect("id")
line, err := cmd.ExpectWithContext(ctx, "id")
if err != nil {
return nil, err
}
@ -392,14 +408,15 @@ func (ctl *EtcdctlV3) LeaseList() (*clientv3.LeaseLeasesResponse, error) {
return &resp, err
}
func (ctl *EtcdctlV3) LeaseKeepAliveOnce(id clientv3.LeaseID) (*clientv3.LeaseKeepAliveResponse, error) {
func (ctl *EtcdctlV3) KeepAliveOnce(ctx context.Context, id clientv3.LeaseID) (*clientv3.LeaseKeepAliveResponse, error) {
args := ctl.cmdArgs("lease", "keep-alive", strconv.FormatInt(int64(id), 16), "--once", "-w", "json")
cmd, err := SpawnCmd(args, nil)
if err != nil {
return nil, err
}
defer cmd.Close()
var resp clientv3.LeaseKeepAliveResponse
line, err := cmd.Expect("ID")
line, err := cmd.ExpectWithContext(ctx, "ID")
if err != nil {
return nil, err
}
@ -407,27 +424,28 @@ func (ctl *EtcdctlV3) LeaseKeepAliveOnce(id clientv3.LeaseID) (*clientv3.LeaseKe
return &resp, err
}
func (ctl *EtcdctlV3) LeaseRevoke(id clientv3.LeaseID) (*clientv3.LeaseRevokeResponse, error) {
func (ctl *EtcdctlV3) Revoke(ctx context.Context, id clientv3.LeaseID) (*clientv3.LeaseRevokeResponse, error) {
var resp clientv3.LeaseRevokeResponse
err := ctl.spawnJsonCmd(&resp, "lease", "revoke", strconv.FormatInt(int64(id), 16))
err := ctl.spawnJsonCmd(ctx, &resp, "lease", "revoke", strconv.FormatInt(int64(id), 16))
return &resp, err
}
func (ctl *EtcdctlV3) AlarmList() (*clientv3.AlarmResponse, error) {
func (ctl *EtcdctlV3) AlarmList(ctx context.Context) (*clientv3.AlarmResponse, error) {
var resp clientv3.AlarmResponse
err := ctl.spawnJsonCmd(&resp, "alarm", "list")
err := ctl.spawnJsonCmd(ctx, &resp, "alarm", "list")
return &resp, err
}
func (ctl *EtcdctlV3) AlarmDisarm(_ *clientv3.AlarmMember) (*clientv3.AlarmResponse, error) {
func (ctl *EtcdctlV3) AlarmDisarm(ctx context.Context, _ *clientv3.AlarmMember) (*clientv3.AlarmResponse, error) {
args := ctl.cmdArgs()
args = append(args, "alarm", "disarm", "-w", "json")
ep, err := SpawnCmd(args, nil)
if err != nil {
return nil, err
}
defer ep.Close()
var resp clientv3.AlarmResponse
line, err := ep.Expect("alarm")
line, err := ep.ExpectWithContext(ctx, "alarm")
if err != nil {
return nil, err
}
@ -435,7 +453,7 @@ func (ctl *EtcdctlV3) AlarmDisarm(_ *clientv3.AlarmMember) (*clientv3.AlarmRespo
return &resp, err
}
func (ctl *EtcdctlV3) UserAdd(name, password string, opts config.UserAddOptions) (*clientv3.AuthUserAddResponse, error) {
func (ctl *EtcdctlV3) UserAdd(ctx context.Context, name, password string, opts config.UserAddOptions) (*clientv3.AuthUserAddResponse, error) {
args := ctl.cmdArgs()
args = append(args, "user", "add")
if password == "" {
@ -454,6 +472,7 @@ func (ctl *EtcdctlV3) UserAdd(name, password string, opts config.UserAddOptions)
if err != nil {
return nil, err
}
defer cmd.Close()
// If no password is provided, and NoPassword isn't set, the CLI will always
// wait for a password, send an enter in this case for an "empty" password.
@ -465,7 +484,7 @@ func (ctl *EtcdctlV3) UserAdd(name, password string, opts config.UserAddOptions)
}
var resp clientv3.AuthUserAddResponse
line, err := cmd.Expect("header")
line, err := cmd.ExpectWithContext(ctx, "header")
if err != nil {
return nil, err
}
@ -473,81 +492,127 @@ func (ctl *EtcdctlV3) UserAdd(name, password string, opts config.UserAddOptions)
return &resp, err
}
func (ctl *EtcdctlV3) UserList() (*clientv3.AuthUserListResponse, error) {
func (ctl *EtcdctlV3) UserList(ctx context.Context) (*clientv3.AuthUserListResponse, error) {
var resp clientv3.AuthUserListResponse
err := ctl.spawnJsonCmd(&resp, "user", "list")
err := ctl.spawnJsonCmd(ctx, &resp, "user", "list")
return &resp, err
}
func (ctl *EtcdctlV3) UserDelete(name string) (*clientv3.AuthUserDeleteResponse, error) {
func (ctl *EtcdctlV3) UserDelete(ctx context.Context, name string) (*clientv3.AuthUserDeleteResponse, error) {
var resp clientv3.AuthUserDeleteResponse
err := ctl.spawnJsonCmd(&resp, "user", "delete", name)
err := ctl.spawnJsonCmd(ctx, &resp, "user", "delete", name)
return &resp, err
}
func (ctl *EtcdctlV3) UserChangePass(user, newPass string) error {
func (ctl *EtcdctlV3) UserChangePass(ctx context.Context, user, newPass string) error {
args := ctl.cmdArgs()
args = append(args, "user", "passwd", user, "--interactive=false")
cmd, err := SpawnCmd(args, nil)
if err != nil {
return err
}
defer cmd.Close()
err = cmd.Send(newPass + "\n")
if err != nil {
return err
}
_, err = cmd.Expect("Password updated")
_, err = cmd.ExpectWithContext(ctx, "Password updated")
return err
}
func (ctl *EtcdctlV3) RoleAdd(name string) (*clientv3.AuthRoleAddResponse, error) {
func (ctl *EtcdctlV3) RoleAdd(ctx context.Context, name string) (*clientv3.AuthRoleAddResponse, error) {
var resp clientv3.AuthRoleAddResponse
err := ctl.spawnJsonCmd(&resp, "role", "add", name)
err := ctl.spawnJsonCmd(ctx, &resp, "role", "add", name)
return &resp, err
}
func (ctl *EtcdctlV3) RoleGrantPermission(name string, key, rangeEnd string, permType clientv3.PermissionType) (*clientv3.AuthRoleGrantPermissionResponse, error) {
func (ctl *EtcdctlV3) RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType clientv3.PermissionType) (*clientv3.AuthRoleGrantPermissionResponse, error) {
permissionType := authpb.Permission_Type_name[int32(permType)]
var resp clientv3.AuthRoleGrantPermissionResponse
err := ctl.spawnJsonCmd(&resp, "role", "grant-permission", name, permissionType, key, rangeEnd)
err := ctl.spawnJsonCmd(ctx, &resp, "role", "grant-permission", name, permissionType, key, rangeEnd)
return &resp, err
}
func (ctl *EtcdctlV3) RoleGet(role string) (*clientv3.AuthRoleGetResponse, error) {
func (ctl *EtcdctlV3) RoleGet(ctx context.Context, role string) (*clientv3.AuthRoleGetResponse, error) {
var resp clientv3.AuthRoleGetResponse
err := ctl.spawnJsonCmd(&resp, "role", "get", role)
err := ctl.spawnJsonCmd(ctx, &resp, "role", "get", role)
return &resp, err
}
func (ctl *EtcdctlV3) RoleList() (*clientv3.AuthRoleListResponse, error) {
func (ctl *EtcdctlV3) RoleList(ctx context.Context) (*clientv3.AuthRoleListResponse, error) {
var resp clientv3.AuthRoleListResponse
err := ctl.spawnJsonCmd(&resp, "role", "list")
err := ctl.spawnJsonCmd(ctx, &resp, "role", "list")
return &resp, err
}
func (ctl *EtcdctlV3) RoleRevokePermission(role string, key, rangeEnd string) (*clientv3.AuthRoleRevokePermissionResponse, error) {
func (ctl *EtcdctlV3) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*clientv3.AuthRoleRevokePermissionResponse, error) {
var resp clientv3.AuthRoleRevokePermissionResponse
err := ctl.spawnJsonCmd(&resp, "role", "revoke-permission", role, key, rangeEnd)
err := ctl.spawnJsonCmd(ctx, &resp, "role", "revoke-permission", role, key, rangeEnd)
return &resp, err
}
func (ctl *EtcdctlV3) RoleDelete(role string) (*clientv3.AuthRoleDeleteResponse, error) {
func (ctl *EtcdctlV3) RoleDelete(ctx context.Context, role string) (*clientv3.AuthRoleDeleteResponse, error) {
var resp clientv3.AuthRoleDeleteResponse
err := ctl.spawnJsonCmd(&resp, "role", "delete", role)
err := ctl.spawnJsonCmd(ctx, &resp, "role", "delete", role)
return &resp, err
}
func (ctl *EtcdctlV3) spawnJsonCmd(output interface{}, args ...string) error {
func (ctl *EtcdctlV3) spawnJsonCmd(ctx context.Context, output interface{}, args ...string) error {
args = append(args, "-w", "json")
cmd, err := SpawnCmd(append(ctl.cmdArgs(), args...), nil)
if err != nil {
return err
}
line, err := cmd.Expect("header")
defer cmd.Close()
line, err := cmd.ExpectWithContext(ctx, "header")
if err != nil {
return err
}
return json.Unmarshal([]byte(line), output)
}
func (ctl *EtcdctlV3) Watch(ctx context.Context, key string, opts config.WatchOptions) clientv3.WatchChan {
args := ctl.cmdArgs()
args = append(args, "watch", key)
if opts.RangeEnd != "" {
args = append(args, opts.RangeEnd)
}
args = append(args, "-w", "json")
if opts.Prefix {
args = append(args, "--prefix")
}
if opts.Revision != 0 {
args = append(args, "--rev", fmt.Sprint(opts.Revision))
}
proc, err := SpawnCmd(args, nil)
if err != nil {
return nil
}
ch := make(chan clientv3.WatchResponse)
go func() {
defer proc.Stop()
for {
select {
case <-ctx.Done():
close(ch)
return
default:
if line := proc.ReadLine(); line != "" {
var resp clientv3.WatchResponse
json.Unmarshal([]byte(line), &resp)
if resp.Canceled {
close(ch)
return
}
if len(resp.Events) > 0 {
ch <- resp
}
}
}
}
}()
return ch
}

View File

@ -15,6 +15,7 @@
package e2e
import (
"context"
"encoding/json"
"fmt"
"math/rand"
@ -36,7 +37,7 @@ func WaitReadyExpectProc(exproc *expect.ExpectProcess, readyStrs []string) error
}
return false
}
_, err := exproc.ExpectFunc(matchSet)
_, err := exproc.ExpectFunc(context.Background(), matchSet)
return err
}
@ -49,22 +50,23 @@ func SpawnWithExpectWithEnv(args []string, envVars map[string]string, expected s
}
func SpawnWithExpects(args []string, envVars map[string]string, xs ...string) error {
_, err := SpawnWithExpectLines(args, envVars, xs...)
_, err := SpawnWithExpectLines(context.TODO(), args, envVars, xs...)
return err
}
func SpawnWithExpectLines(args []string, envVars map[string]string, xs ...string) ([]string, error) {
func SpawnWithExpectLines(ctx context.Context, args []string, envVars map[string]string, xs ...string) ([]string, error) {
proc, err := SpawnCmd(args, envVars)
if err != nil {
return nil, err
}
defer proc.Close()
// process until either stdout or stderr contains
// the expected string
var (
lines []string
)
for _, txt := range xs {
l, lerr := proc.Expect(txt)
l, lerr := proc.ExpectWithContext(ctx, txt)
if lerr != nil {
proc.Close()
return nil, fmt.Errorf("%v %v (expected %q, got %q). Try EXPECT_DEBUG=TRUE", args, lerr, txt, lines)

View File

@ -87,19 +87,18 @@ type integrationCluster struct {
func (c *integrationCluster) Members() (ms []Member) {
for _, m := range c.Cluster.Members {
ms = append(ms, integrationMember{Member: m, t: c.t, ctx: c.ctx})
ms = append(ms, integrationMember{Member: m, t: c.t})
}
return ms
}
type integrationMember struct {
*integration.Member
t testing.TB
ctx context.Context
t testing.TB
}
func (m integrationMember) Client() Client {
return integrationClient{Client: m.Member.Client, ctx: m.ctx}
return integrationClient{Client: m.Member.Client}
}
func (m integrationMember) Start() error {
@ -120,16 +119,14 @@ func (c *integrationCluster) Client() Client {
if err != nil {
c.t.Fatal(err)
}
return integrationClient{Client: cc, ctx: c.ctx}
return integrationClient{Client: cc}
}
type integrationClient struct {
*clientv3.Client
ctx context.Context
}
func (c integrationClient) Get(key string, o config.GetOptions) (*clientv3.GetResponse, error) {
ctx := c.ctx
func (c integrationClient) Get(ctx context.Context, key string, o config.GetOptions) (*clientv3.GetResponse, error) {
if o.Timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, o.Timeout)
@ -163,16 +160,16 @@ func (c integrationClient) Get(key string, o config.GetOptions) (*clientv3.GetRe
return c.Client.Get(ctx, key, clientOpts...)
}
func (c integrationClient) Put(key, value string, opts config.PutOptions) error {
func (c integrationClient) Put(ctx context.Context, key, value string, opts config.PutOptions) error {
clientOpts := []clientv3.OpOption{}
if opts.LeaseID != 0 {
clientOpts = append(clientOpts, clientv3.WithLease(opts.LeaseID))
}
_, err := c.Client.Put(c.ctx, key, value, clientOpts...)
_, err := c.Client.Put(ctx, key, value, clientOpts...)
return err
}
func (c integrationClient) Delete(key string, o config.DeleteOptions) (*clientv3.DeleteResponse, error) {
func (c integrationClient) Delete(ctx context.Context, key string, o config.DeleteOptions) (*clientv3.DeleteResponse, error) {
clientOpts := []clientv3.OpOption{}
if o.Prefix {
clientOpts = append(clientOpts, clientv3.WithPrefix())
@ -183,11 +180,10 @@ func (c integrationClient) Delete(key string, o config.DeleteOptions) (*clientv3
if o.End != "" {
clientOpts = append(clientOpts, clientv3.WithRange(o.End))
}
return c.Client.Delete(c.ctx, key, clientOpts...)
return c.Client.Delete(ctx, key, clientOpts...)
}
func (c integrationClient) Compact(rev int64, o config.CompactOption) (*clientv3.CompactResponse, error) {
ctx := c.ctx
func (c integrationClient) Compact(ctx context.Context, rev int64, o config.CompactOption) (*clientv3.CompactResponse, error) {
if o.Timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, o.Timeout)
@ -200,19 +196,11 @@ func (c integrationClient) Compact(rev int64, o config.CompactOption) (*clientv3
return c.Client.Compact(ctx, rev, clientOpts...)
}
func (c integrationClient) AlarmList() (*clientv3.AlarmResponse, error) {
return c.Client.AlarmList(c.ctx)
}
func (c integrationClient) AlarmDisarm(alarmMember *clientv3.AlarmMember) (*clientv3.AlarmResponse, error) {
return c.Client.AlarmDisarm(c.ctx, alarmMember)
}
func (c integrationClient) Status() ([]*clientv3.StatusResponse, error) {
func (c integrationClient) Status(ctx context.Context) ([]*clientv3.StatusResponse, error) {
endpoints := c.Client.Endpoints()
var resp []*clientv3.StatusResponse
for _, ep := range endpoints {
status, err := c.Client.Status(c.ctx, ep)
status, err := c.Client.Status(ctx, ep)
if err != nil {
return nil, err
}
@ -221,11 +209,11 @@ func (c integrationClient) Status() ([]*clientv3.StatusResponse, error) {
return resp, nil
}
func (c integrationClient) HashKV(rev int64) ([]*clientv3.HashKVResponse, error) {
func (c integrationClient) HashKV(ctx context.Context, rev int64) ([]*clientv3.HashKVResponse, error) {
endpoints := c.Client.Endpoints()
var resp []*clientv3.HashKVResponse
for _, ep := range endpoints {
hashKV, err := c.Client.HashKV(c.ctx, ep, rev)
hashKV, err := c.Client.HashKV(ctx, ep, rev)
if err != nil {
return nil, err
}
@ -234,9 +222,9 @@ func (c integrationClient) HashKV(rev int64) ([]*clientv3.HashKVResponse, error)
return resp, nil
}
func (c integrationClient) Health() error {
func (c integrationClient) Health(ctx context.Context) error {
cli := healthpb.NewHealthClient(c.Client.ActiveConnection())
resp, err := cli.Check(c.ctx, &healthpb.HealthCheckRequest{})
resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{})
if err != nil {
return err
}
@ -246,8 +234,7 @@ func (c integrationClient) Health() error {
return nil
}
func (c integrationClient) Defragment(o config.DefragOption) error {
ctx := c.ctx
func (c integrationClient) Defragment(ctx context.Context, o config.DefragOption) error {
if o.Timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, o.Timeout)
@ -262,76 +249,28 @@ func (c integrationClient) Defragment(o config.DefragOption) error {
return nil
}
func (c integrationClient) Grant(ttl int64) (*clientv3.LeaseGrantResponse, error) {
return c.Client.Grant(c.ctx, ttl)
}
func (c integrationClient) TimeToLive(id clientv3.LeaseID, o config.LeaseOption) (*clientv3.LeaseTimeToLiveResponse, error) {
func (c integrationClient) TimeToLive(ctx context.Context, id clientv3.LeaseID, o config.LeaseOption) (*clientv3.LeaseTimeToLiveResponse, error) {
leaseOpts := []clientv3.LeaseOption{}
if o.WithAttachedKeys {
leaseOpts = append(leaseOpts, clientv3.WithAttachedKeys())
}
return c.Client.TimeToLive(c.ctx, id, leaseOpts...)
return c.Client.TimeToLive(ctx, id, leaseOpts...)
}
func (c integrationClient) LeaseList() (*clientv3.LeaseLeasesResponse, error) {
return c.Client.Leases(c.ctx)
}
func (c integrationClient) LeaseKeepAliveOnce(id clientv3.LeaseID) (*clientv3.LeaseKeepAliveResponse, error) {
return c.Client.KeepAliveOnce(c.ctx, id)
}
func (c integrationClient) LeaseRevoke(id clientv3.LeaseID) (*clientv3.LeaseRevokeResponse, error) {
return c.Client.Revoke(c.ctx, id)
}
func (c integrationClient) UserAdd(name, password string, opts config.UserAddOptions) (*clientv3.AuthUserAddResponse, error) {
return c.Client.UserAddWithOptions(c.ctx, name, password, &clientv3.UserAddOptions{
func (c integrationClient) UserAdd(ctx context.Context, name, password string, opts config.UserAddOptions) (*clientv3.AuthUserAddResponse, error) {
return c.Client.UserAddWithOptions(ctx, name, password, &clientv3.UserAddOptions{
NoPassword: opts.NoPassword,
})
}
func (c integrationClient) UserList() (*clientv3.AuthUserListResponse, error) {
return c.Client.UserList(c.ctx)
}
func (c integrationClient) UserDelete(name string) (*clientv3.AuthUserDeleteResponse, error) {
return c.Client.UserDelete(c.ctx, name)
}
func (c integrationClient) UserChangePass(user, newPass string) error {
_, err := c.Client.UserChangePassword(c.ctx, user, newPass)
func (c integrationClient) UserChangePass(ctx context.Context, user, newPass string) error {
_, err := c.Client.UserChangePassword(ctx, user, newPass)
return err
}
func (c integrationClient) RoleAdd(name string) (*clientv3.AuthRoleAddResponse, error) {
return c.Client.RoleAdd(c.ctx, name)
}
func (c integrationClient) RoleGrantPermission(name string, key, rangeEnd string, permType clientv3.PermissionType) (*clientv3.AuthRoleGrantPermissionResponse, error) {
return c.Client.RoleGrantPermission(c.ctx, name, key, rangeEnd, permType)
}
func (c integrationClient) RoleGet(role string) (*clientv3.AuthRoleGetResponse, error) {
return c.Client.RoleGet(c.ctx, role)
}
func (c integrationClient) RoleList() (*clientv3.AuthRoleListResponse, error) {
return c.Client.RoleList(c.ctx)
}
func (c integrationClient) RoleRevokePermission(role string, key, rangeEnd string) (*clientv3.AuthRoleRevokePermissionResponse, error) {
return c.Client.RoleRevokePermission(c.ctx, role, key, rangeEnd)
}
func (c integrationClient) RoleDelete(role string) (*clientv3.AuthRoleDeleteResponse, error) {
return c.Client.RoleDelete(c.ctx, role)
}
func (c integrationClient) Txn(compares, ifSucess, ifFail []string, o config.TxnOptions) (*clientv3.TxnResponse, error) {
txn := c.Client.Txn(c.ctx)
func (c integrationClient) Txn(ctx context.Context, compares, ifSucess, ifFail []string, o config.TxnOptions) (*clientv3.TxnResponse, error) {
txn := c.Client.Txn(ctx)
cmps := []clientv3.Cmp{}
for _, c := range compares {
cmp, err := etcdctlcmd.ParseCompare(c)
@ -373,6 +312,17 @@ func getOps(ss []string) ([]clientv3.Op, error) {
return ops, nil
}
func (c integrationClient) MemberList() (*clientv3.MemberListResponse, error) {
return c.Client.MemberList(c.ctx)
func (c integrationClient) Watch(ctx context.Context, key string, opts config.WatchOptions) clientv3.WatchChan {
opOpts := []clientv3.OpOption{}
if opts.Prefix {
opOpts = append(opOpts, clientv3.WithPrefix())
}
if opts.Revision != 0 {
opOpts = append(opOpts, clientv3.WithRev(opts.Revision))
}
if opts.RangeEnd != "" {
opOpts = append(opOpts, clientv3.WithRange(opts.RangeEnd))
}
return c.Client.Watch(ctx, key, opOpts...)
}

View File

@ -42,35 +42,36 @@ type Member interface {
}
type Client interface {
Put(key, value string, opts config.PutOptions) error
Get(key string, opts config.GetOptions) (*clientv3.GetResponse, error)
Delete(key string, opts config.DeleteOptions) (*clientv3.DeleteResponse, error)
Compact(rev int64, opts config.CompactOption) (*clientv3.CompactResponse, error)
Status() ([]*clientv3.StatusResponse, error)
HashKV(rev int64) ([]*clientv3.HashKVResponse, error)
Health() error
Defragment(opts config.DefragOption) error
AlarmList() (*clientv3.AlarmResponse, error)
AlarmDisarm(alarmMember *clientv3.AlarmMember) (*clientv3.AlarmResponse, error)
Grant(ttl int64) (*clientv3.LeaseGrantResponse, error)
TimeToLive(id clientv3.LeaseID, opts config.LeaseOption) (*clientv3.LeaseTimeToLiveResponse, error)
LeaseList() (*clientv3.LeaseLeasesResponse, error)
LeaseKeepAliveOnce(id clientv3.LeaseID) (*clientv3.LeaseKeepAliveResponse, error)
LeaseRevoke(id clientv3.LeaseID) (*clientv3.LeaseRevokeResponse, error)
Put(context context.Context, key, value string, opts config.PutOptions) error
Get(context context.Context, key string, opts config.GetOptions) (*clientv3.GetResponse, error)
Delete(context context.Context, key string, opts config.DeleteOptions) (*clientv3.DeleteResponse, error)
Compact(context context.Context, rev int64, opts config.CompactOption) (*clientv3.CompactResponse, error)
Status(context context.Context) ([]*clientv3.StatusResponse, error)
HashKV(context context.Context, rev int64) ([]*clientv3.HashKVResponse, error)
Health(context context.Context) error
Defragment(context context.Context, opts config.DefragOption) error
AlarmList(context context.Context) (*clientv3.AlarmResponse, error)
AlarmDisarm(context context.Context, alarmMember *clientv3.AlarmMember) (*clientv3.AlarmResponse, error)
Grant(context context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error)
TimeToLive(context context.Context, id clientv3.LeaseID, opts config.LeaseOption) (*clientv3.LeaseTimeToLiveResponse, error)
Leases(context context.Context) (*clientv3.LeaseLeasesResponse, error)
KeepAliveOnce(context context.Context, id clientv3.LeaseID) (*clientv3.LeaseKeepAliveResponse, error)
Revoke(context context.Context, id clientv3.LeaseID) (*clientv3.LeaseRevokeResponse, error)
UserAdd(context context.Context, name, password string, opts config.UserAddOptions) (*clientv3.AuthUserAddResponse, error)
UserList(context context.Context) (*clientv3.AuthUserListResponse, error)
UserDelete(context context.Context, name string) (*clientv3.AuthUserDeleteResponse, error)
UserChangePass(context context.Context, user, newPass string) error
UserAdd(name, password string, opts config.UserAddOptions) (*clientv3.AuthUserAddResponse, error)
UserList() (*clientv3.AuthUserListResponse, error)
UserDelete(name string) (*clientv3.AuthUserDeleteResponse, error)
UserChangePass(user, newPass string) error
RoleAdd(context context.Context, name string) (*clientv3.AuthRoleAddResponse, error)
RoleGrantPermission(context context.Context, name string, key, rangeEnd string, permType clientv3.PermissionType) (*clientv3.AuthRoleGrantPermissionResponse, error)
RoleGet(context context.Context, role string) (*clientv3.AuthRoleGetResponse, error)
RoleList(context context.Context) (*clientv3.AuthRoleListResponse, error)
RoleRevokePermission(context context.Context, role string, key, rangeEnd string) (*clientv3.AuthRoleRevokePermissionResponse, error)
RoleDelete(context context.Context, role string) (*clientv3.AuthRoleDeleteResponse, error)
RoleAdd(name string) (*clientv3.AuthRoleAddResponse, error)
RoleGrantPermission(name string, key, rangeEnd string, permType clientv3.PermissionType) (*clientv3.AuthRoleGrantPermissionResponse, error)
RoleGet(role string) (*clientv3.AuthRoleGetResponse, error)
RoleList() (*clientv3.AuthRoleListResponse, error)
RoleRevokePermission(role string, key, rangeEnd string) (*clientv3.AuthRoleRevokePermissionResponse, error)
RoleDelete(role string) (*clientv3.AuthRoleDeleteResponse, error)
Txn(context context.Context, compares, ifSucess, ifFail []string, o config.TxnOptions) (*clientv3.TxnResponse, error)
Txn(compares, ifSucess, ifFail []string, o config.TxnOptions) (*clientv3.TxnResponse, error)
MemberList(context context.Context) (*clientv3.MemberListResponse, error)
MemberList() (*clientv3.MemberListResponse, error)
Watch(ctx context.Context, key string, opts config.WatchOptions) clientv3.WatchChan
}

View File

@ -15,6 +15,9 @@
package testutils
import (
"errors"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
@ -35,3 +38,26 @@ func KeyValuesFromGetResponse(resp *clientv3.GetResponse) (kvs []KV) {
}
return kvs
}
func KeyValuesFromWatchResponse(resp clientv3.WatchResponse) (kvs []KV) {
for _, event := range resp.Events {
kvs = append(kvs, KV{Key: string(event.Kv.Key), Val: string(event.Kv.Value)})
}
return kvs
}
func KeyValuesFromWatchChan(wch clientv3.WatchChan, wantedLen int, timeout time.Duration) (kvs []KV, err error) {
for {
select {
case watchResp, ok := <-wch:
if ok {
kvs = append(kvs, KeyValuesFromWatchResponse(watchResp)...)
if len(kvs) == wantedLen {
return kvs, nil
}
}
case <-time.After(timeout):
return nil, errors.New("closed watcher channel should not block")
}
}
}

View File

@ -125,7 +125,7 @@ func (srv *Server) createEtcd(fromSnapshot bool, failpoints string) error {
func (srv *Server) runEtcd() error {
errc := make(chan error)
go func() {
time.Sleep(5 * time.Second)
time.Sleep(1 * time.Second)
// server advertise client/peer listener had to start first
// before setting up proxy listener
errc <- srv.startProxy()
@ -137,17 +137,19 @@ func (srv *Server) runEtcd() error {
zap.String("command-path", srv.etcdCmd.Path),
)
err := srv.etcdCmd.Start()
perr := <-errc
srv.lg.Info(
"started etcd command",
zap.String("command-path", srv.etcdCmd.Path),
zap.Strings("command-args", srv.etcdCmd.Args),
zap.Errors("errors", []error{err, perr}),
zap.Strings("envs", srv.etcdCmd.Env),
zap.Error(err),
)
if err != nil {
return err
}
return perr
return <-errc
}
select {
@ -218,6 +220,11 @@ func (srv *Server) startProxy() error {
return err
}
srv.lg.Info("Checking client target's connectivity", zap.String("target", listenClientURL.Host))
if err := checkTCPConnect(srv.lg, listenClientURL.Host); err != nil {
return fmt.Errorf("check client target failed, %w", err)
}
srv.lg.Info("starting proxy on client traffic", zap.String("url", advertiseClientURL.String()))
srv.advertiseClientPortToProxy[advertiseClientURLPort] = proxy.NewServer(proxy.ServerConfig{
Logger: srv.lg,
@ -226,6 +233,7 @@ func (srv *Server) startProxy() error {
})
select {
case err = <-srv.advertiseClientPortToProxy[advertiseClientURLPort].Error():
srv.lg.Info("starting client proxy failed", zap.Error(err))
return err
case <-time.After(2 * time.Second):
srv.lg.Info("started proxy on client traffic", zap.String("url", advertiseClientURL.String()))
@ -242,6 +250,11 @@ func (srv *Server) startProxy() error {
return err
}
srv.lg.Info("Checking peer target's connectivity", zap.String("target", listenPeerURL.Host))
if err := checkTCPConnect(srv.lg, listenPeerURL.Host); err != nil {
return fmt.Errorf("check peer target failed, %w", err)
}
srv.lg.Info("starting proxy on peer traffic", zap.String("url", advertisePeerURL.String()))
srv.advertisePeerPortToProxy[advertisePeerURLPort] = proxy.NewServer(proxy.ServerConfig{
Logger: srv.lg,
@ -250,6 +263,7 @@ func (srv *Server) startProxy() error {
})
select {
case err = <-srv.advertisePeerPortToProxy[advertisePeerURLPort].Error():
srv.lg.Info("starting peer proxy failed", zap.Error(err))
return err
case <-time.After(2 * time.Second):
srv.lg.Info("started proxy on peer traffic", zap.String("url", advertisePeerURL.String()))

View File

@ -19,7 +19,6 @@ import (
"net"
"os"
"os/exec"
"strings"
"go.etcd.io/etcd/pkg/v3/proxy"
"go.etcd.io/etcd/server/v3/embed"
@ -102,17 +101,16 @@ func (srv *Server) StartServe() error {
zap.String("listener-address", srv.ln.Addr().String()),
)
err = srv.grpcServer.Serve(srv.ln)
if err != nil && strings.Contains(err.Error(), "use of closed network connection") {
srv.lg.Info(
"gRPC server is shut down",
if err != nil {
srv.lg.Warn(
"gRPC server is stopped with error",
zap.String("address", srv.address),
zap.Error(err),
)
} else {
srv.lg.Warn(
"gRPC server returned with error",
srv.lg.Info(
"gRPC server is stopped",
zap.String("address", srv.address),
zap.Error(err),
)
}
return err

View File

@ -126,6 +126,23 @@ func loadFileData(filePath string) ([]byte, error) {
return data, nil
}
func checkTCPConnect(lg *zap.Logger, target string) error {
for i := 0; i < 10; i++ {
if conn, err := net.Dial("tcp", target); err != nil {
lg.Error("The target isn't reachable", zap.Int("retries", i), zap.String("target", target), zap.Error(err))
} else {
if conn != nil {
conn.Close()
lg.Info("The target is reachable", zap.Int("retries", i), zap.String("target", target))
return nil
}
lg.Error("The target isn't reachable due to the returned conn is nil", zap.Int("retries", i), zap.String("target", target))
}
time.Sleep(time.Second)
}
return fmt.Errorf("timed out waiting for the target (%s) to be reachable", target)
}
func cleanPageCache() error {
// https://www.kernel.org/doc/Documentation/sysctl/vm.txt
// https://github.com/torvalds/linux/blob/master/fs/drop_caches.c

View File

@ -33,13 +33,18 @@ import (
var errExpected = errors.New("expected error")
func isErrorExpected(err error) bool {
return clientv3test.IsClientTimeout(err) || clientv3test.IsServerCtxTimeout(err) ||
err == rpctypes.ErrTimeout || err == rpctypes.ErrTimeoutDueToLeaderFail
}
// TestBalancerUnderNetworkPartitionPut tests when one member becomes isolated,
// first Put request fails, and following retry succeeds with client balancer
// switching to others.
func TestBalancerUnderNetworkPartitionPut(t *testing.T) {
testBalancerUnderNetworkPartition(t, func(cli *clientv3.Client, ctx context.Context) error {
_, err := cli.Put(ctx, "a", "b")
if clientv3test.IsClientTimeout(err) || clientv3test.IsServerCtxTimeout(err) || err == rpctypes.ErrTimeout {
if isErrorExpected(err) {
return errExpected
}
return err
@ -49,7 +54,7 @@ func TestBalancerUnderNetworkPartitionPut(t *testing.T) {
func TestBalancerUnderNetworkPartitionDelete(t *testing.T) {
testBalancerUnderNetworkPartition(t, func(cli *clientv3.Client, ctx context.Context) error {
_, err := cli.Delete(ctx, "a")
if clientv3test.IsClientTimeout(err) || clientv3test.IsServerCtxTimeout(err) || err == rpctypes.ErrTimeout {
if isErrorExpected(err) {
return errExpected
}
return err
@ -62,7 +67,7 @@ func TestBalancerUnderNetworkPartitionTxn(t *testing.T) {
If(clientv3.Compare(clientv3.Version("foo"), "=", 0)).
Then(clientv3.OpPut("foo", "bar")).
Else(clientv3.OpPut("foo", "baz")).Commit()
if clientv3test.IsClientTimeout(err) || clientv3test.IsServerCtxTimeout(err) || err == rpctypes.ErrTimeout {
if isErrorExpected(err) {
return errExpected
}
return err
@ -75,7 +80,7 @@ func TestBalancerUnderNetworkPartitionTxn(t *testing.T) {
func TestBalancerUnderNetworkPartitionLinearizableGetWithLongTimeout(t *testing.T) {
testBalancerUnderNetworkPartition(t, func(cli *clientv3.Client, ctx context.Context) error {
_, err := cli.Get(ctx, "a")
if clientv3test.IsClientTimeout(err) || clientv3test.IsServerCtxTimeout(err) || err == rpctypes.ErrTimeout {
if isErrorExpected(err) {
return errExpected
}
return err

View File

@ -421,3 +421,79 @@ func TestV3AuthOldRevConcurrent(t *testing.T) {
}
wg.Wait()
}
func TestV3AuthRestartMember(t *testing.T) {
integration.BeforeTest(t)
// create a cluster with 1 member
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
// create a client
c, cerr := integration.NewClient(t, clientv3.Config{
Endpoints: clus.Client(0).Endpoints(),
DialTimeout: 5 * time.Second,
})
testutil.AssertNil(t, cerr)
defer c.Close()
authData := []struct {
user string
role string
pass string
}{
{
user: "root",
role: "root",
pass: "123",
},
{
user: "user0",
role: "role0",
pass: "123",
},
}
for _, authObj := range authData {
// add a role
_, err := c.RoleAdd(context.TODO(), authObj.role)
testutil.AssertNil(t, err)
// add a user
_, err = c.UserAdd(context.TODO(), authObj.user, authObj.pass)
testutil.AssertNil(t, err)
// grant role to user
_, err = c.UserGrantRole(context.TODO(), authObj.user, authObj.role)
testutil.AssertNil(t, err)
}
// role grant permission to role0
_, err := c.RoleGrantPermission(context.TODO(), authData[1].role, "foo", "", clientv3.PermissionType(clientv3.PermReadWrite))
testutil.AssertNil(t, err)
// enable auth
_, err = c.AuthEnable(context.TODO())
testutil.AssertNil(t, err)
// create another client with ID:Password
c2, cerr := integration.NewClient(t, clientv3.Config{
Endpoints: clus.Client(0).Endpoints(),
DialTimeout: 5 * time.Second,
Username: authData[1].user,
Password: authData[1].pass,
})
testutil.AssertNil(t, cerr)
defer c2.Close()
// create foo since that is within the permission set
// expectation is to succeed
_, err = c2.Put(context.TODO(), "foo", "bar")
testutil.AssertNil(t, err)
clus.Members[0].Stop(t)
err = clus.Members[0].Restart(t)
testutil.AssertNil(t, err)
// nothing has changed, but it fails without refreshing cache after restart
_, err = c2.Put(context.TODO(), "foo", "bar2")
testutil.AssertNil(t, err)
}