From f6ed36372baf87354714b0c3eaae35823c7ca8a9 Mon Sep 17 00:00:00 2001 From: nic-chen Date: Mon, 15 Aug 2022 10:06:31 +0800 Subject: [PATCH] tests: Migrate watch test to common framework Signed-off-by: nic-chen --- pkg/expect/expect.go | 13 +++++ tests/common/watch_test.go | 82 +++++++++++++++++++++++++++ tests/e2e/ctl_v3_watch_cov_test.go | 22 ------- tests/e2e/ctl_v3_watch_no_cov_test.go | 20 ------- tests/framework/config/client.go | 6 ++ tests/framework/e2e/etcdctl.go | 57 ++++++++++++++++++- tests/framework/e2e/util.go | 1 + tests/framework/integration.go | 15 +++++ tests/framework/interface.go | 3 +- tests/framework/testutils/helpters.go | 26 +++++++++ 10 files changed, 201 insertions(+), 44 deletions(-) create mode 100644 tests/common/watch_test.go diff --git a/pkg/expect/expect.go b/pkg/expect/expect.go index 648dea34f..19adb802f 100644 --- a/pkg/expect/expect.go +++ b/pkg/expect/expect.go @@ -40,6 +40,7 @@ type ExpectProcess struct { mu sync.Mutex // protects lines and err lines []string count int // increment whenever new line gets added + cur int // current read position err error // StopSignal is the signal Stop sends to the process; defaults to SIGTERM. @@ -198,3 +199,15 @@ func (ep *ExpectProcess) Lines() []string { defer ep.mu.Unlock() return ep.lines } + +// ReadLine returns line by line. +func (ep *ExpectProcess) ReadLine() string { + ep.mu.Lock() + defer ep.mu.Unlock() + if ep.count > ep.cur { + line := ep.lines[ep.cur] + ep.cur++ + return line + } + return "" +} diff --git a/tests/common/watch_test.go b/tests/common/watch_test.go new file mode 100644 index 000000000..496e3bc1e --- /dev/null +++ b/tests/common/watch_test.go @@ -0,0 +1,82 @@ +package common + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/tests/v3/framework/config" + "go.etcd.io/etcd/tests/v3/framework/testutils" +) + +func TestWatch(t *testing.T) { + testRunner.BeforeTest(t) + watchTimeout := 1 * time.Second + for _, tc := range clusterTestCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + clus := testRunner.NewCluster(ctx, t, tc.config) + + defer clus.Close() + cc := clus.Client() + testutils.ExecuteUntil(ctx, t, func() { + tests := []struct { + puts []testutils.KV + watchKey string + opts config.WatchOptions + wanted []testutils.KV + }{ + { // watch by revision + puts: []testutils.KV{{Key: "bar", Val: "revision_1"}, {Key: "bar", Val: "revision_2"}, {Key: "bar", Val: "revision_3"}}, + watchKey: "bar", + opts: config.WatchOptions{Revision: 3}, + wanted: []testutils.KV{{Key: "bar", Val: "revision_2"}, {Key: "bar", Val: "revision_3"}}, + }, + { // watch 1 key + puts: []testutils.KV{{Key: "sample", Val: "value"}}, + watchKey: "sample", + opts: config.WatchOptions{Revision: 1}, + wanted: []testutils.KV{{Key: "sample", Val: "value"}}, + }, + { // watch 3 keys by prefix + puts: []testutils.KV{{Key: "foo1", Val: "val1"}, {Key: "foo2", Val: "val2"}, {Key: "foo3", Val: "val3"}}, + watchKey: "foo", + opts: config.WatchOptions{Revision: 1, Prefix: true}, + wanted: []testutils.KV{{Key: "foo1", Val: "val1"}, {Key: "foo2", Val: "val2"}, {Key: "foo3", Val: "val3"}}, + }, + { // watch 3 keys by range + puts: []testutils.KV{{Key: "key1", Val: "val1"}, {Key: "key3", Val: "val3"}, {Key: "key2", Val: "val2"}}, + watchKey: "key", + opts: config.WatchOptions{Revision: 1, RangeEnd: "key3"}, + wanted: []testutils.KV{{Key: "key1", Val: "val1"}, {Key: "key2", Val: "val2"}}, + }, + } + + for _, tt := range tests { + wCtx, wCancel := context.WithCancel(ctx) + wch := cc.Watch(wCtx, tt.watchKey, tt.opts) + if wch == nil { + t.Fatalf("failed to watch %s", tt.watchKey) + } + + for j := range tt.puts { + if err := cc.Put(tt.puts[j].Key, tt.puts[j].Val, config.PutOptions{}); err != nil { + t.Fatalf("can't not put key %q, err: %s", tt.puts[j].Key, err) + } + } + + kvs, err := testutils.KeyValuesFromWatchChan(wch, len(tt.wanted), watchTimeout) + if err != nil { + wCancel() + t.Fatalf("failed to get key-values from watch channel %s", err) + } + + wCancel() + assert.Equal(t, tt.wanted, kvs) + } + }) + }) + } +} diff --git a/tests/e2e/ctl_v3_watch_cov_test.go b/tests/e2e/ctl_v3_watch_cov_test.go index 8214734da..f0c77700b 100644 --- a/tests/e2e/ctl_v3_watch_cov_test.go +++ b/tests/e2e/ctl_v3_watch_cov_test.go @@ -52,43 +52,21 @@ func watchTest(cx ctlCtx) { wkv []kvExec }{ - { // watch 1 key - puts: []kv{{"sample", "value"}}, - args: []string{"sample", "--rev", "1"}, - wkv: []kvExec{{key: "sample", val: "value"}}, - }, { // watch 1 key with env puts: []kv{{"sample", "value"}}, envKey: "sample", args: []string{"--rev", "1"}, wkv: []kvExec{{key: "sample", val: "value"}}, }, - // coverage tests get extra arguments: // ./bin/etcdctl_test -test.coverprofile=e2e.1525392462795198897.coverprofile -test.outputdir=../.. // do not test watch exec commands - - { // watch 3 keys by prefix - puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}}, - args: []string{"key", "--rev", "1", "--prefix"}, - wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}}, - }, { // watch 3 keys by prefix, with env puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}}, envKey: "key", args: []string{"--rev", "1", "--prefix"}, wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}}, }, - { // watch by revision - puts: []kv{{"etcd", "revision_1"}, {"etcd", "revision_2"}, {"etcd", "revision_3"}}, - args: []string{"etcd", "--rev", "2"}, - wkv: []kvExec{{key: "etcd", val: "revision_2"}, {key: "etcd", val: "revision_3"}}, - }, - { // watch 3 keys by range - puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}}, - args: []string{"key", "key3", "--rev", "1"}, - wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}}, - }, { // watch 3 keys by range, with env puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}}, envKey: "key", diff --git a/tests/e2e/ctl_v3_watch_no_cov_test.go b/tests/e2e/ctl_v3_watch_no_cov_test.go index 85ca863ba..a755b5f2d 100644 --- a/tests/e2e/ctl_v3_watch_no_cov_test.go +++ b/tests/e2e/ctl_v3_watch_no_cov_test.go @@ -52,11 +52,6 @@ func watchTest(cx ctlCtx) { wkv []kvExec }{ - { // watch 1 key - puts: []kv{{"sample", "value"}}, - args: []string{"sample", "--rev", "1"}, - wkv: []kvExec{{key: "sample", val: "value"}}, - }, { // watch 1 key with env puts: []kv{{"sample", "value"}}, envKey: "sample", @@ -101,27 +96,12 @@ func watchTest(cx ctlCtx) { args: []string{"sample", "--rev", "1", "samplx", "--", "echo", "watch event received"}, wkv: []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}}, }, - { // watch 3 keys by prefix - puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}}, - args: []string{"key", "--rev", "1", "--prefix"}, - wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}}, - }, { // watch 3 keys by prefix, with env puts: []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}}, envKey: "key", args: []string{"--rev", "1", "--prefix"}, wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}}, }, - { // watch by revision - puts: []kv{{"etcd", "revision_1"}, {"etcd", "revision_2"}, {"etcd", "revision_3"}}, - args: []string{"etcd", "--rev", "2"}, - wkv: []kvExec{{key: "etcd", val: "revision_2"}, {key: "etcd", val: "revision_3"}}, - }, - { // watch 3 keys by range - puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}}, - args: []string{"key", "key3", "--rev", "1"}, - wkv: []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}}, - }, { // watch 3 keys by range, with env puts: []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}}, envKey: "key", diff --git a/tests/framework/config/client.go b/tests/framework/config/client.go index e8ce9ea79..041890905 100644 --- a/tests/framework/config/client.go +++ b/tests/framework/config/client.go @@ -63,3 +63,9 @@ type LeaseOption struct { type UserAddOptions struct { NoPassword bool } + +type WatchOptions struct { + Prefix bool + Revision int64 + RangeEnd string +} diff --git a/tests/framework/e2e/etcdctl.go b/tests/framework/e2e/etcdctl.go index f97cc27e2..d6376c9fc 100644 --- a/tests/framework/e2e/etcdctl.go +++ b/tests/framework/e2e/etcdctl.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "encoding/json" "fmt" "io" @@ -102,6 +103,7 @@ func (ctl *EtcdctlV3) Get(key string, o config.GetOptions) (*clientv3.GetRespons if err != nil { return nil, err } + defer cmd.Close() _, err = cmd.Expect("Count") return &resp, err } @@ -145,6 +147,7 @@ func (ctl *EtcdctlV3) Txn(compares, ifSucess, ifFail []string, o config.TxnOptio if err != nil { return nil, err } + defer cmd.Close() _, err = cmd.Expect("compares:") if err != nil { return nil, err @@ -336,6 +339,7 @@ func (ctl *EtcdctlV3) Grant(ttl int64) (*clientv3.LeaseGrantResponse, error) { if err != nil { return nil, err } + defer cmd.Close() var resp clientv3.LeaseGrantResponse line, err := cmd.Expect("ID") if err != nil { @@ -355,6 +359,7 @@ func (ctl *EtcdctlV3) TimeToLive(id clientv3.LeaseID, o config.LeaseOption) (*cl if err != nil { return nil, err } + defer cmd.Close() var resp clientv3.LeaseTimeToLiveResponse line, err := cmd.Expect("id") if err != nil { @@ -383,6 +388,7 @@ func (ctl *EtcdctlV3) LeaseList() (*clientv3.LeaseLeasesResponse, error) { if err != nil { return nil, err } + defer cmd.Close() var resp clientv3.LeaseLeasesResponse line, err := cmd.Expect("id") if err != nil { @@ -398,6 +404,7 @@ func (ctl *EtcdctlV3) LeaseKeepAliveOnce(id clientv3.LeaseID) (*clientv3.LeaseKe if err != nil { return nil, err } + defer cmd.Close() var resp clientv3.LeaseKeepAliveResponse line, err := cmd.Expect("ID") if err != nil { @@ -426,6 +433,7 @@ func (ctl *EtcdctlV3) AlarmDisarm(_ *clientv3.AlarmMember) (*clientv3.AlarmRespo if err != nil { return nil, err } + defer ep.Close() var resp clientv3.AlarmResponse line, err := ep.Expect("alarm") if err != nil { @@ -454,6 +462,7 @@ func (ctl *EtcdctlV3) UserAdd(name, password string, opts config.UserAddOptions) if err != nil { return nil, err } + defer cmd.Close() // If no password is provided, and NoPassword isn't set, the CLI will always // wait for a password, send an enter in this case for an "empty" password. @@ -492,7 +501,7 @@ func (ctl *EtcdctlV3) UserChangePass(user, newPass string) error { if err != nil { return err } - + defer cmd.Close() err = cmd.Send(newPass + "\n") if err != nil { return err @@ -545,9 +554,55 @@ func (ctl *EtcdctlV3) spawnJsonCmd(output interface{}, args ...string) error { if err != nil { return err } + defer cmd.Close() line, err := cmd.Expect("header") if err != nil { return err } return json.Unmarshal([]byte(line), output) } + +func (ctl *EtcdctlV3) Watch(ctx context.Context, key string, opts config.WatchOptions) clientv3.WatchChan { + args := ctl.cmdArgs() + args = append(args, "watch", key) + if opts.RangeEnd != "" { + args = append(args, opts.RangeEnd) + } + args = append(args, "-w", "json") + if opts.Prefix { + args = append(args, "--prefix") + } + if opts.Revision != 0 { + args = append(args, "--rev", fmt.Sprint(opts.Revision)) + } + proc, err := SpawnCmd(args, nil) + if err != nil { + return nil + } + + ch := make(chan clientv3.WatchResponse) + go func() { + defer proc.Stop() + for { + select { + case <-ctx.Done(): + close(ch) + return + default: + if line := proc.ReadLine(); line != "" { + var resp clientv3.WatchResponse + json.Unmarshal([]byte(line), &resp) + if resp.Canceled { + close(ch) + return + } + if len(resp.Events) > 0 { + ch <- resp + } + } + } + } + }() + + return ch +} diff --git a/tests/framework/e2e/util.go b/tests/framework/e2e/util.go index 6db4e404d..ece85a150 100644 --- a/tests/framework/e2e/util.go +++ b/tests/framework/e2e/util.go @@ -58,6 +58,7 @@ func SpawnWithExpectLines(args []string, envVars map[string]string, xs ...string if err != nil { return nil, err } + defer proc.Close() // process until either stdout or stderr contains // the expected string var ( diff --git a/tests/framework/integration.go b/tests/framework/integration.go index 631b7263e..efb22f24a 100644 --- a/tests/framework/integration.go +++ b/tests/framework/integration.go @@ -376,3 +376,18 @@ func getOps(ss []string) ([]clientv3.Op, error) { func (c integrationClient) MemberList() (*clientv3.MemberListResponse, error) { return c.Client.MemberList(c.ctx) } + +func (c integrationClient) Watch(ctx context.Context, key string, opts config.WatchOptions) clientv3.WatchChan { + opOpts := []clientv3.OpOption{} + if opts.Prefix { + opOpts = append(opOpts, clientv3.WithPrefix()) + } + if opts.Revision != 0 { + opOpts = append(opOpts, clientv3.WithRev(opts.Revision)) + } + if opts.RangeEnd != "" { + opOpts = append(opOpts, clientv3.WithRange(opts.RangeEnd)) + } + + return c.Client.Watch(ctx, key, opOpts...) +} diff --git a/tests/framework/interface.go b/tests/framework/interface.go index 39b2552b3..cb06afef3 100644 --- a/tests/framework/interface.go +++ b/tests/framework/interface.go @@ -57,7 +57,6 @@ type Client interface { LeaseList() (*clientv3.LeaseLeasesResponse, error) LeaseKeepAliveOnce(id clientv3.LeaseID) (*clientv3.LeaseKeepAliveResponse, error) LeaseRevoke(id clientv3.LeaseID) (*clientv3.LeaseRevokeResponse, error) - UserAdd(name, password string, opts config.UserAddOptions) (*clientv3.AuthUserAddResponse, error) UserList() (*clientv3.AuthUserListResponse, error) UserDelete(name string) (*clientv3.AuthUserDeleteResponse, error) @@ -73,4 +72,6 @@ type Client interface { Txn(compares, ifSucess, ifFail []string, o config.TxnOptions) (*clientv3.TxnResponse, error) MemberList() (*clientv3.MemberListResponse, error) + + Watch(ctx context.Context, key string, opts config.WatchOptions) clientv3.WatchChan } diff --git a/tests/framework/testutils/helpters.go b/tests/framework/testutils/helpters.go index c508bd274..dacd53dec 100644 --- a/tests/framework/testutils/helpters.go +++ b/tests/framework/testutils/helpters.go @@ -15,6 +15,9 @@ package testutils import ( + "errors" + "time" + clientv3 "go.etcd.io/etcd/client/v3" ) @@ -35,3 +38,26 @@ func KeyValuesFromGetResponse(resp *clientv3.GetResponse) (kvs []KV) { } return kvs } + +func KeyValuesFromWatchResponse(resp clientv3.WatchResponse) (kvs []KV) { + for _, event := range resp.Events { + kvs = append(kvs, KV{Key: string(event.Kv.Key), Val: string(event.Kv.Value)}) + } + return kvs +} + +func KeyValuesFromWatchChan(wch clientv3.WatchChan, wantedLen int, timeout time.Duration) (kvs []KV, err error) { + for { + select { + case watchResp, ok := <-wch: + if ok { + kvs = append(kvs, KeyValuesFromWatchResponse(watchResp)...) + if len(kvs) == wantedLen { + return kvs, nil + } + } + case <-time.After(timeout): + return nil, errors.New("closed watcher channel should not block") + } + } +}