Merge pull request #14345 from nic-chen/tests/watch

tests: Migrate watch test to common framework
dependabot/go_modules/go.uber.org/atomic-1.10.0
Marek Siarkowicz 2022-08-25 13:35:34 +02:00 committed by GitHub
commit f36a8782f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 201 additions and 44 deletions

View File

@ -43,6 +43,7 @@ type ExpectProcess struct {
mu sync.Mutex // protects lines and err
lines []string
count int // increment whenever new line gets added
cur int // current read position
err error
// StopSignal is the signal Stop sends to the process; defaults to SIGTERM.
@ -218,3 +219,15 @@ func (ep *ExpectProcess) Lines() []string {
defer ep.mu.Unlock()
return ep.lines
}
// ReadLine returns line by line.
func (ep *ExpectProcess) ReadLine() string {
ep.mu.Lock()
defer ep.mu.Unlock()
if ep.count > ep.cur {
line := ep.lines[ep.cur]
ep.cur++
return line
}
return ""
}

View File

@ -0,0 +1,82 @@
package common
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/testutils"
)
func TestWatch(t *testing.T) {
testRunner.BeforeTest(t)
watchTimeout := 1 * time.Second
for _, tc := range clusterTestCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
clus := testRunner.NewCluster(ctx, t, tc.config)
defer clus.Close()
cc := clus.Client()
testutils.ExecuteUntil(ctx, t, func() {
tests := []struct {
puts []testutils.KV
watchKey string
opts config.WatchOptions
wanted []testutils.KV
}{
{ // watch by revision
puts: []testutils.KV{{Key: "bar", Val: "revision_1"}, {Key: "bar", Val: "revision_2"}, {Key: "bar", Val: "revision_3"}},
watchKey: "bar",
opts: config.WatchOptions{Revision: 3},
wanted: []testutils.KV{{Key: "bar", Val: "revision_2"}, {Key: "bar", Val: "revision_3"}},
},
{ // watch 1 key
puts: []testutils.KV{{Key: "sample", Val: "value"}},
watchKey: "sample",
opts: config.WatchOptions{Revision: 1},
wanted: []testutils.KV{{Key: "sample", Val: "value"}},
},
{ // watch 3 keys by prefix
puts: []testutils.KV{{Key: "foo1", Val: "val1"}, {Key: "foo2", Val: "val2"}, {Key: "foo3", Val: "val3"}},
watchKey: "foo",
opts: config.WatchOptions{Revision: 1, Prefix: true},
wanted: []testutils.KV{{Key: "foo1", Val: "val1"}, {Key: "foo2", Val: "val2"}, {Key: "foo3", Val: "val3"}},
},
{ // watch 3 keys by range
puts: []testutils.KV{{Key: "key1", Val: "val1"}, {Key: "key3", Val: "val3"}, {Key: "key2", Val: "val2"}},
watchKey: "key",
opts: config.WatchOptions{Revision: 1, RangeEnd: "key3"},
wanted: []testutils.KV{{Key: "key1", Val: "val1"}, {Key: "key2", Val: "val2"}},
},
}
for _, tt := range tests {
wCtx, wCancel := context.WithCancel(ctx)
wch := cc.Watch(wCtx, tt.watchKey, tt.opts)
if wch == nil {
t.Fatalf("failed to watch %s", tt.watchKey)
}
for j := range tt.puts {
if err := cc.Put(tt.puts[j].Key, tt.puts[j].Val, config.PutOptions{}); err != nil {
t.Fatalf("can't not put key %q, err: %s", tt.puts[j].Key, err)
}
}
kvs, err := testutils.KeyValuesFromWatchChan(wch, len(tt.wanted), watchTimeout)
if err != nil {
wCancel()
t.Fatalf("failed to get key-values from watch channel %s", err)
}
wCancel()
assert.Equal(t, tt.wanted, kvs)
}
})
})
}
}

View File

@ -52,43 +52,21 @@ func watchTest(cx ctlCtx) {
wkv []kvExec
}{
{ // watch 1 key
puts: []kv{{"sample", "value"}},
args: []string{"sample", "--rev", "1"},
wkv: []kvExec{{key: "sample", val: "value"}},
},
{ // watch 1 key with env
puts: []kv{{"sample", "value"}},
envKey: "sample",
args: []string{"--rev", "1"},
wkv: []kvExec{{key: "sample", val: "value"}},
},
// coverage tests get extra arguments:
// ./bin/etcdctl_test -test.coverprofile=e2e.1525392462795198897.coverprofile -test.outputdir=../..
// do not test watch exec commands
{ // watch 3 keys by prefix
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
args: []string{"key", "--rev", "1", "--prefix"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
},
{ // watch 3 keys by prefix, with env
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
envKey: "key",
args: []string{"--rev", "1", "--prefix"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
},
{ // watch by revision
puts: []kv{{"etcd", "revision_1"}, {"etcd", "revision_2"}, {"etcd", "revision_3"}},
args: []string{"etcd", "--rev", "2"},
wkv: []kvExec{{key: "etcd", val: "revision_2"}, {key: "etcd", val: "revision_3"}},
},
{ // watch 3 keys by range
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
args: []string{"key", "key3", "--rev", "1"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}},
},
{ // watch 3 keys by range, with env
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
envKey: "key",

View File

@ -52,11 +52,6 @@ func watchTest(cx ctlCtx) {
wkv []kvExec
}{
{ // watch 1 key
puts: []kv{{"sample", "value"}},
args: []string{"sample", "--rev", "1"},
wkv: []kvExec{{key: "sample", val: "value"}},
},
{ // watch 1 key with env
puts: []kv{{"sample", "value"}},
envKey: "sample",
@ -101,27 +96,12 @@ func watchTest(cx ctlCtx) {
args: []string{"sample", "--rev", "1", "samplx", "--", "echo", "watch event received"},
wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}},
},
{ // watch 3 keys by prefix
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
args: []string{"key", "--rev", "1", "--prefix"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
},
{ // watch 3 keys by prefix, with env
puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}},
envKey: "key",
args: []string{"--rev", "1", "--prefix"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}},
},
{ // watch by revision
puts: []kv{{"etcd", "revision_1"}, {"etcd", "revision_2"}, {"etcd", "revision_3"}},
args: []string{"etcd", "--rev", "2"},
wkv: []kvExec{{key: "etcd", val: "revision_2"}, {key: "etcd", val: "revision_3"}},
},
{ // watch 3 keys by range
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
args: []string{"key", "key3", "--rev", "1"},
wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}},
},
{ // watch 3 keys by range, with env
puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}},
envKey: "key",

View File

@ -63,3 +63,9 @@ type LeaseOption struct {
type UserAddOptions struct {
NoPassword bool
}
type WatchOptions struct {
Prefix bool
Revision int64
RangeEnd string
}

View File

@ -15,6 +15,7 @@
package e2e
import (
"context"
"encoding/json"
"fmt"
"io"
@ -102,6 +103,7 @@ func (ctl *EtcdctlV3) Get(key string, o config.GetOptions) (*clientv3.GetRespons
if err != nil {
return nil, err
}
defer cmd.Close()
_, err = cmd.Expect("Count")
return &resp, err
}
@ -145,6 +147,7 @@ func (ctl *EtcdctlV3) Txn(compares, ifSucess, ifFail []string, o config.TxnOptio
if err != nil {
return nil, err
}
defer cmd.Close()
_, err = cmd.Expect("compares:")
if err != nil {
return nil, err
@ -336,6 +339,7 @@ func (ctl *EtcdctlV3) Grant(ttl int64) (*clientv3.LeaseGrantResponse, error) {
if err != nil {
return nil, err
}
defer cmd.Close()
var resp clientv3.LeaseGrantResponse
line, err := cmd.Expect("ID")
if err != nil {
@ -355,6 +359,7 @@ func (ctl *EtcdctlV3) TimeToLive(id clientv3.LeaseID, o config.LeaseOption) (*cl
if err != nil {
return nil, err
}
defer cmd.Close()
var resp clientv3.LeaseTimeToLiveResponse
line, err := cmd.Expect("id")
if err != nil {
@ -383,6 +388,7 @@ func (ctl *EtcdctlV3) LeaseList() (*clientv3.LeaseLeasesResponse, error) {
if err != nil {
return nil, err
}
defer cmd.Close()
var resp clientv3.LeaseLeasesResponse
line, err := cmd.Expect("id")
if err != nil {
@ -398,6 +404,7 @@ func (ctl *EtcdctlV3) LeaseKeepAliveOnce(id clientv3.LeaseID) (*clientv3.LeaseKe
if err != nil {
return nil, err
}
defer cmd.Close()
var resp clientv3.LeaseKeepAliveResponse
line, err := cmd.Expect("ID")
if err != nil {
@ -426,6 +433,7 @@ func (ctl *EtcdctlV3) AlarmDisarm(_ *clientv3.AlarmMember) (*clientv3.AlarmRespo
if err != nil {
return nil, err
}
defer ep.Close()
var resp clientv3.AlarmResponse
line, err := ep.Expect("alarm")
if err != nil {
@ -454,6 +462,7 @@ func (ctl *EtcdctlV3) UserAdd(name, password string, opts config.UserAddOptions)
if err != nil {
return nil, err
}
defer cmd.Close()
// If no password is provided, and NoPassword isn't set, the CLI will always
// wait for a password, send an enter in this case for an "empty" password.
@ -492,7 +501,7 @@ func (ctl *EtcdctlV3) UserChangePass(user, newPass string) error {
if err != nil {
return err
}
defer cmd.Close()
err = cmd.Send(newPass + "\n")
if err != nil {
return err
@ -545,9 +554,55 @@ func (ctl *EtcdctlV3) spawnJsonCmd(output interface{}, args ...string) error {
if err != nil {
return err
}
defer cmd.Close()
line, err := cmd.Expect("header")
if err != nil {
return err
}
return json.Unmarshal([]byte(line), output)
}
func (ctl *EtcdctlV3) Watch(ctx context.Context, key string, opts config.WatchOptions) clientv3.WatchChan {
args := ctl.cmdArgs()
args = append(args, "watch", key)
if opts.RangeEnd != "" {
args = append(args, opts.RangeEnd)
}
args = append(args, "-w", "json")
if opts.Prefix {
args = append(args, "--prefix")
}
if opts.Revision != 0 {
args = append(args, "--rev", fmt.Sprint(opts.Revision))
}
proc, err := SpawnCmd(args, nil)
if err != nil {
return nil
}
ch := make(chan clientv3.WatchResponse)
go func() {
defer proc.Stop()
for {
select {
case <-ctx.Done():
close(ch)
return
default:
if line := proc.ReadLine(); line != "" {
var resp clientv3.WatchResponse
json.Unmarshal([]byte(line), &resp)
if resp.Canceled {
close(ch)
return
}
if len(resp.Events) > 0 {
ch <- resp
}
}
}
}
}()
return ch
}

View File

@ -59,6 +59,7 @@ func SpawnWithExpectLines(args []string, envVars map[string]string, xs ...string
if err != nil {
return nil, err
}
defer proc.Close()
// process until either stdout or stderr contains
// the expected string
var (

View File

@ -376,3 +376,18 @@ func getOps(ss []string) ([]clientv3.Op, error) {
func (c integrationClient) MemberList() (*clientv3.MemberListResponse, error) {
return c.Client.MemberList(c.ctx)
}
func (c integrationClient) Watch(ctx context.Context, key string, opts config.WatchOptions) clientv3.WatchChan {
opOpts := []clientv3.OpOption{}
if opts.Prefix {
opOpts = append(opOpts, clientv3.WithPrefix())
}
if opts.Revision != 0 {
opOpts = append(opOpts, clientv3.WithRev(opts.Revision))
}
if opts.RangeEnd != "" {
opOpts = append(opOpts, clientv3.WithRange(opts.RangeEnd))
}
return c.Client.Watch(ctx, key, opOpts...)
}

View File

@ -57,7 +57,6 @@ type Client interface {
LeaseList() (*clientv3.LeaseLeasesResponse, error)
LeaseKeepAliveOnce(id clientv3.LeaseID) (*clientv3.LeaseKeepAliveResponse, error)
LeaseRevoke(id clientv3.LeaseID) (*clientv3.LeaseRevokeResponse, error)
UserAdd(name, password string, opts config.UserAddOptions) (*clientv3.AuthUserAddResponse, error)
UserList() (*clientv3.AuthUserListResponse, error)
UserDelete(name string) (*clientv3.AuthUserDeleteResponse, error)
@ -73,4 +72,6 @@ type Client interface {
Txn(compares, ifSucess, ifFail []string, o config.TxnOptions) (*clientv3.TxnResponse, error)
MemberList() (*clientv3.MemberListResponse, error)
Watch(ctx context.Context, key string, opts config.WatchOptions) clientv3.WatchChan
}

View File

@ -15,6 +15,9 @@
package testutils
import (
"errors"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
@ -35,3 +38,26 @@ func KeyValuesFromGetResponse(resp *clientv3.GetResponse) (kvs []KV) {
}
return kvs
}
func KeyValuesFromWatchResponse(resp clientv3.WatchResponse) (kvs []KV) {
for _, event := range resp.Events {
kvs = append(kvs, KV{Key: string(event.Kv.Key), Val: string(event.Kv.Value)})
}
return kvs
}
func KeyValuesFromWatchChan(wch clientv3.WatchChan, wantedLen int, timeout time.Duration) (kvs []KV, err error) {
for {
select {
case watchResp, ok := <-wch:
if ok {
kvs = append(kvs, KeyValuesFromWatchResponse(watchResp)...)
if len(kvs) == wantedLen {
return kvs, nil
}
}
case <-time.After(timeout):
return nil, errors.New("closed watcher channel should not block")
}
}
}