From 904513fa5c41e3f92302cc374229cafcee56a7dd Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Sun, 26 Nov 2017 23:55:27 -0800 Subject: [PATCH 1/3] etcdctl/ctlv3: support "exec-watch" in watch command Signed-off-by: Gyu-Ho Lee --- etcdctl/ctlv3/command/watch_command.go | 135 +++++++++++-- etcdctl/ctlv3/command/watch_command_test.go | 213 ++++++++++++++++++++ 2 files changed, 331 insertions(+), 17 deletions(-) create mode 100644 etcdctl/ctlv3/command/watch_command_test.go diff --git a/etcdctl/ctlv3/command/watch_command.go b/etcdctl/ctlv3/command/watch_command.go index 33edaf28a..453d33096 100644 --- a/etcdctl/ctlv3/command/watch_command.go +++ b/etcdctl/ctlv3/command/watch_command.go @@ -17,8 +17,10 @@ package command import ( "bufio" "context" + "errors" "fmt" "os" + "os/exec" "strings" "github.com/coreos/etcd/clientv3" @@ -26,6 +28,12 @@ import ( "github.com/spf13/cobra" ) +var ( + errBadArgsNum = errors.New("bad number of arguments") + errBadArgsNumSeparator = errors.New("bad number of arguments (found separator --, but no commands)") + errBadArgsInteractiveWatch = errors.New("args[0] must be 'watch' for interactive calls") +) + var ( watchRev int64 watchPrefix bool @@ -36,7 +44,7 @@ var ( // NewWatchCommand returns the cobra command for "watch". func NewWatchCommand() *cobra.Command { cmd := &cobra.Command{ - Use: "watch [options] [key or prefix] [range_end]", + Use: "watch [options] [key or prefix] [range_end] [--] [exec-command arg1 arg2 ...]", Short: "Watches events stream on keys or prefixes", Run: watchCommandFunc, } @@ -52,24 +60,29 @@ func NewWatchCommand() *cobra.Command { // watchCommandFunc executes the "watch" command. func watchCommandFunc(cmd *cobra.Command, args []string) { if watchInteractive { - watchInteractiveFunc(cmd, args) + watchInteractiveFunc(cmd, os.Args) return } - c := mustClientFromCmd(cmd) - wc, err := getWatchChan(c, args) + watchArgs, execArgs, err := parseWatchArgs(os.Args, args, false) if err != nil { ExitWithError(ExitBadArgs, err) } - printWatchCh(wc) + c := mustClientFromCmd(cmd) + wc, err := getWatchChan(c, watchArgs) + if err != nil { + ExitWithError(ExitBadArgs, err) + } + + printWatchCh(c, wc, execArgs) if err = c.Close(); err != nil { ExitWithError(ExitBadConnection, err) } ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server")) } -func watchInteractiveFunc(cmd *cobra.Command, args []string) { +func watchInteractiveFunc(cmd *cobra.Command, osArgs []string) { c := mustClientFromCmd(cmd) reader := bufio.NewReader(os.Stdin) @@ -92,25 +105,25 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) { continue } - flagset := NewWatchCommand().Flags() - err = flagset.Parse(args[1:]) + watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, true) + if perr != nil { + ExitWithError(ExitBadArgs, perr) + } + + ch, err := getWatchChan(c, watchArgs) if err != nil { fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err) continue } - ch, err := getWatchChan(c, flagset.Args()) - if err != nil { - fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err) - continue - } - go printWatchCh(ch) + go printWatchCh(c, ch, execArgs) } } func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) { - if len(args) < 1 || len(args) > 2 { - return nil, fmt.Errorf("bad number of arguments") + if len(args) < 1 { + return nil, errBadArgsNum } + key := args[0] opts := []clientv3.OpOption{clientv3.WithRev(watchRev)} if len(args) == 2 { @@ -128,11 +141,99 @@ func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) return c.Watch(clientv3.WithRequireLeader(context.Background()), key, opts...), nil } -func printWatchCh(ch clientv3.WatchChan) { +func printWatchCh(c *clientv3.Client, ch clientv3.WatchChan, execArgs []string) { for resp := range ch { if resp.Canceled { fmt.Fprintf(os.Stderr, "watch was canceled (%v)\n", resp.Err()) } display.Watch(resp) + + if len(execArgs) > 0 { + cmd := exec.CommandContext(c.Ctx(), execArgs[0], execArgs[1:]...) + cmd.Env = os.Environ() + cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr + if err := cmd.Run(); err != nil { + fmt.Fprintf(os.Stderr, "command %q error (%v)\n", execArgs, err) + } + } } } + +// "commandArgs" is the command arguments after "spf13/cobra" parses +// all "watch" command flags, strips out special characters (e.g. "--"). +// "orArgs" is the raw arguments passed to "watch" command +// (e.g. ./bin/etcdctl watch foo --rev 1 bar). +// "--" characters are invalid arguments for "spf13/cobra" library, +// so no need to handle such cases. +func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs []string, execArgs []string, err error) { + watchArgs = commandArgs + + // remove preceding commands (e.g. "watch foo bar" in interactive mode) + idx := 0 + for idx = range watchArgs { + if watchArgs[idx] == "watch" { + break + } + } + if idx < len(watchArgs)-1 { + watchArgs = watchArgs[idx+1:] + } else if interactive { // "watch" not found + return nil, nil, errBadArgsInteractiveWatch + } + if len(watchArgs) < 1 { + return nil, nil, errBadArgsNum + } + + // remove preceding commands (e.g. ./bin/etcdctl watch) + for idx = range osArgs { + if osArgs[idx] == "watch" { + break + } + } + if idx < len(osArgs)-1 { + osArgs = osArgs[idx+1:] + } else { + return nil, nil, errBadArgsNum + } + + argsWithSep := osArgs + if interactive { // interactive mode pass "--" to the command args + argsWithSep = watchArgs + } + foundSep := false + for idx = range argsWithSep { + if argsWithSep[idx] == "--" && idx > 0 { + foundSep = true + break + } + } + if interactive { + flagset := NewWatchCommand().Flags() + if err := flagset.Parse(argsWithSep); err != nil { + return nil, nil, err + } + watchArgs = flagset.Args() + } + if !foundSep { + return watchArgs, nil, nil + } + + if idx == len(argsWithSep)-1 { + // "watch foo bar --" should error + return nil, nil, errBadArgsNumSeparator + } + execArgs = argsWithSep[idx+1:] + + // "watch foo bar --rev 1 -- echo hello" or "watch foo --rev 1 bar -- echo hello", + // then "watchArgs" is "foo bar echo hello" + // so need ignore args after "argsWithSep[idx]", which is "--" + endIdx := 0 + for endIdx = len(watchArgs) - 1; endIdx >= 0; endIdx-- { + if watchArgs[endIdx] == argsWithSep[idx+1] { + break + } + } + watchArgs = watchArgs[:endIdx] + + return watchArgs, execArgs, nil +} diff --git a/etcdctl/ctlv3/command/watch_command_test.go b/etcdctl/ctlv3/command/watch_command_test.go new file mode 100644 index 000000000..0a65fdf31 --- /dev/null +++ b/etcdctl/ctlv3/command/watch_command_test.go @@ -0,0 +1,213 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "reflect" + "testing" +) + +func Test_parseWatchArgs(t *testing.T) { + tt := []struct { + osArgs []string // raw arguments to "watch" command + commandArgs []string // arguments after "spf13/cobra" preprocessing + interactive bool + + watchArgs []string + execArgs []string + err error + }{ + { + osArgs: []string{"./bin/etcdctl", "watch", "foo", "bar"}, + commandArgs: []string{"foo", "bar"}, + interactive: false, + watchArgs: []string{"foo", "bar"}, + execArgs: nil, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "foo", "bar", "--"}, + commandArgs: []string{"foo", "bar"}, + interactive: false, + watchArgs: nil, + execArgs: nil, + err: errBadArgsNumSeparator, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "foo"}, + commandArgs: []string{"foo"}, + interactive: false, + watchArgs: []string{"foo"}, + execArgs: nil, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1", "foo"}, + commandArgs: []string{"foo"}, + interactive: false, + watchArgs: []string{"foo"}, + execArgs: nil, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "foo", "--rev", "1"}, + commandArgs: []string{"foo"}, + interactive: false, + watchArgs: []string{"foo"}, + execArgs: nil, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "foo", "--", "echo", "Hello", "World"}, + commandArgs: []string{"foo", "echo", "Hello", "World"}, + interactive: false, + watchArgs: []string{"foo"}, + execArgs: []string{"echo", "Hello", "World"}, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "foo", "--rev", "1", "--", "echo", "Hello", "World"}, + commandArgs: []string{"foo", "echo", "Hello", "World"}, + interactive: false, + watchArgs: []string{"foo"}, + execArgs: []string{"echo", "Hello", "World"}, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "foo", "bar", "--", "echo", "Hello", "World"}, + commandArgs: []string{"foo", "bar", "echo", "Hello", "World"}, + interactive: false, + watchArgs: []string{"foo", "bar"}, + execArgs: []string{"echo", "Hello", "World"}, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "--rev", "1", "foo", "bar", "--", "echo", "Hello", "World"}, + commandArgs: []string{"foo", "bar", "echo", "Hello", "World"}, + interactive: false, + watchArgs: []string{"foo", "bar"}, + execArgs: []string{"echo", "Hello", "World"}, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "foo", "--rev", "1", "bar", "--", "echo", "Hello", "World"}, + commandArgs: []string{"foo", "bar", "echo", "Hello", "World"}, + interactive: false, + watchArgs: []string{"foo", "bar"}, + execArgs: []string{"echo", "Hello", "World"}, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "foo", "bar", "--rev", "1", "--", "echo", "Hello", "World"}, + commandArgs: []string{"foo", "bar", "echo", "Hello", "World"}, + interactive: false, + watchArgs: []string{"foo", "bar"}, + execArgs: []string{"echo", "Hello", "World"}, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "-i"}, + commandArgs: []string{"foo", "bar", "--", "echo", "Hello", "World"}, + interactive: true, + watchArgs: nil, + execArgs: nil, + err: errBadArgsInteractiveWatch, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "-i"}, + commandArgs: []string{"watch", "foo"}, + interactive: true, + watchArgs: []string{"foo"}, + execArgs: nil, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "-i"}, + commandArgs: []string{"watch", "foo", "bar"}, + interactive: true, + watchArgs: []string{"foo", "bar"}, + execArgs: nil, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "-i"}, + commandArgs: []string{"watch", "foo", "--rev", "1"}, + interactive: true, + watchArgs: []string{"foo"}, + execArgs: nil, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "-i"}, + commandArgs: []string{"watch", "foo", "--rev", "1", "--", "echo", "Hello", "World"}, + interactive: true, + watchArgs: []string{"foo"}, + execArgs: []string{"echo", "Hello", "World"}, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "-i"}, + commandArgs: []string{"watch", "--rev", "1", "foo", "--", "echo", "Hello", "World"}, + interactive: true, + watchArgs: []string{"foo"}, + execArgs: []string{"echo", "Hello", "World"}, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "-i"}, + commandArgs: []string{"watch", "foo", "bar", "--", "echo", "Hello", "World"}, + interactive: true, + watchArgs: []string{"foo", "bar"}, + execArgs: []string{"echo", "Hello", "World"}, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "-i"}, + commandArgs: []string{"watch", "--rev", "1", "foo", "bar", "--", "echo", "Hello", "World"}, + interactive: true, + watchArgs: []string{"foo", "bar"}, + execArgs: []string{"echo", "Hello", "World"}, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "-i"}, + commandArgs: []string{"watch", "foo", "--rev", "1", "bar", "--", "echo", "Hello", "World"}, + interactive: true, + watchArgs: []string{"foo", "bar"}, + execArgs: []string{"echo", "Hello", "World"}, + err: nil, + }, + { + osArgs: []string{"./bin/etcdctl", "watch", "-i"}, + commandArgs: []string{"watch", "foo", "bar", "--rev", "1", "--", "echo", "Hello", "World"}, + interactive: true, + watchArgs: []string{"foo", "bar"}, + execArgs: []string{"echo", "Hello", "World"}, + err: nil, + }, + } + for i, ts := range tt { + watchArgs, execArgs, err := parseWatchArgs(ts.osArgs, ts.commandArgs, ts.interactive) + if err != ts.err { + t.Fatalf("#%d: error expected %v, got %v", i, ts.err, err) + } + if !reflect.DeepEqual(watchArgs, ts.watchArgs) { + t.Fatalf("#%d: watchArgs expected %q, got %v", i, ts.watchArgs, watchArgs) + } + if !reflect.DeepEqual(execArgs, ts.execArgs) { + t.Fatalf("#%d: execArgs expected %q, got %v", i, ts.execArgs, execArgs) + } + } +} From e89fc2054259662ee3fd3d7c1436c905201a34af Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Sun, 26 Nov 2017 23:55:47 -0800 Subject: [PATCH 2/3] etcdctl: document watch exec in v3 Signed-off-by: Gyu-Ho Lee --- etcdctl/README.md | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/etcdctl/README.md b/etcdctl/README.md index 2eeb3e639..c32ab80fd 100644 --- a/etcdctl/README.md +++ b/etcdctl/README.md @@ -337,7 +337,7 @@ Prints the compacted revision. # compacted revision 1234 ``` -### WATCH [options] [key or prefix] [range_end] +### WATCH [options] [key or prefix] [range_end] [--] [exec-command arg1 arg2 ...] Watch watches events stream on keys or prefixes, [key or prefix, range_end) if `range-end` is given. The watch command runs until it encounters an error or is terminated by the user. If range_end is given, it must be lexicographically greater than key or "\x00". @@ -378,6 +378,16 @@ watch [options] \n # bar ``` +Receive events and execute `echo watch event received`: + +```bash +./etcdctl watch foo -- echo watch event received +# PUT +# foo +# bar +# watch event received +``` + ##### Interactive ```bash @@ -392,6 +402,17 @@ watch foo # bar ``` +Receive events and execute `echo watch event received`: + +```bash +./etcdctl watch -i +watch foo -- echo watch event received +# PUT +# foo +# bar +# watch event received +``` + ### LEASE \ LEASE provides commands for key lease management. From 25222c22d9726bfa75a2b97b2ea2254ca97927ab Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Sun, 26 Nov 2017 23:56:03 -0800 Subject: [PATCH 3/3] e2e: test watch exec in v3 etcdctl Signed-off-by: Gyu-Ho Lee --- e2e/ctl_v3_auth_test.go | 10 ++++---- e2e/ctl_v3_make_mirror_test.go | 9 ++++--- e2e/ctl_v3_watch_test.go | 45 ++++++++++++++++++++++++++++++---- 3 files changed, 50 insertions(+), 14 deletions(-) diff --git a/e2e/ctl_v3_auth_test.go b/e2e/ctl_v3_auth_test.go index d59461868..2565cdfdc 100644 --- a/e2e/ctl_v3_auth_test.go +++ b/e2e/ctl_v3_auth_test.go @@ -780,32 +780,32 @@ func authTestWatch(cx ctlCtx) { puts []kv args []string - wkv []kv + wkv []kvExec want bool }{ { // watch 1 key, should be successful []kv{{"key", "value"}}, []string{"key", "--rev", "1"}, - []kv{{"key", "value"}}, + []kvExec{{key: "key", val: "value"}}, true, }, { // watch 3 keys by range, should be successful []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}}, []string{"key", "key3", "--rev", "1"}, - []kv{{"key1", "val1"}, {"key2", "val2"}}, + []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}}, true, }, { // watch 1 key, should not be successful []kv{}, []string{"key5", "--rev", "1"}, - []kv{}, + []kvExec{}, false, }, { // watch 3 keys by range, should not be successful []kv{}, []string{"key", "key6", "--rev", "1"}, - []kv{}, + []kvExec{}, false, }, } diff --git a/e2e/ctl_v3_make_mirror_test.go b/e2e/ctl_v3_make_mirror_test.go index 8bb92e1d9..ef1cf24c5 100644 --- a/e2e/ctl_v3_make_mirror_test.go +++ b/e2e/ctl_v3_make_mirror_test.go @@ -28,16 +28,17 @@ func makeMirrorTest(cx ctlCtx) { var ( flags = []string{} kvs = []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}} + kvs2 = []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}} prefix = "key" ) - testMirrorCommand(cx, flags, kvs, kvs, prefix, prefix) + testMirrorCommand(cx, flags, kvs, kvs2, prefix, prefix) } func makeMirrorModifyDestPrefixTest(cx ctlCtx) { var ( flags = []string{"--prefix", "o_", "--dest-prefix", "d_"} kvs = []kv{{"o_key1", "val1"}, {"o_key2", "val2"}, {"o_key3", "val3"}} - kvs2 = []kv{{"d_key1", "val1"}, {"d_key2", "val2"}, {"d_key3", "val3"}} + kvs2 = []kvExec{{key: "d_key1", val: "val1"}, {key: "d_key2", val: "val2"}, {key: "d_key3", val: "val3"}} srcprefix = "o_" destprefix = "d_" ) @@ -48,7 +49,7 @@ func makeMirrorNoDestPrefixTest(cx ctlCtx) { var ( flags = []string{"--prefix", "o_", "--no-dest-prefix"} kvs = []kv{{"o_key1", "val1"}, {"o_key2", "val2"}, {"o_key3", "val3"}} - kvs2 = []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}} + kvs2 = []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}} srcprefix = "o_" destprefix = "key" ) @@ -56,7 +57,7 @@ func makeMirrorNoDestPrefixTest(cx ctlCtx) { testMirrorCommand(cx, flags, kvs, kvs2, srcprefix, destprefix) } -func testMirrorCommand(cx ctlCtx, flags []string, sourcekvs, destkvs []kv, srcprefix, destprefix string) { +func testMirrorCommand(cx ctlCtx, flags []string, sourcekvs []kv, destkvs []kvExec, srcprefix, destprefix string) { // set up another cluster to mirror with mirrorcfg := configAutoTLS mirrorcfg.clusterSize = 1 diff --git a/e2e/ctl_v3_watch_test.go b/e2e/ctl_v3_watch_test.go index bc9d64ac9..9356729b9 100644 --- a/e2e/ctl_v3_watch_test.go +++ b/e2e/ctl_v3_watch_test.go @@ -38,32 +38,62 @@ func TestCtlV3WatchInteractivePeerTLS(t *testing.T) { testCtl(t, watchTest, withInteractive(), withCfg(configPeerTLS)) } +type kvExec struct { + key, val string + execOutput string +} + func watchTest(cx ctlCtx) { tests := []struct { puts []kv args []string - wkv []kv + wkv []kvExec }{ { // watch 1 key []kv{{"sample", "value"}}, []string{"sample", "--rev", "1"}, + []kvExec{{key: "sample", val: "value"}}, + }, + { // watch 1 key with "echo watch event received" []kv{{"sample", "value"}}, + []string{"sample", "--rev", "1", "--", "echo", "watch event received"}, + []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}}, + }, + { // watch 1 key with "echo watch event received" + []kv{{"sample", "value"}}, + []string{"--rev", "1", "sample", "--", "echo", "watch event received"}, + []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}}, + }, + { // watch 1 key with "echo \"Hello World!\"" + []kv{{"sample", "value"}}, + []string{"--rev", "1", "sample", "--", "echo", "\"Hello World!\""}, + []kvExec{{key: "sample", val: "value", execOutput: "Hello World!"}}, + }, + { // watch 1 key with "echo watch event received" + []kv{{"sample", "value"}}, + []string{"sample", "samplx", "--rev", "1", "--", "echo", "watch event received"}, + []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}}, + }, + { // watch 1 key with "echo watch event received" + []kv{{"sample", "value"}}, + []string{"sample", "--rev", "1", "samplx", "--", "echo", "watch event received"}, + []kvExec{{key: "sample", val: "value", execOutput: "watch event received"}}, }, { // watch 3 keys by prefix []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}}, []string{"key", "--rev", "1", "--prefix"}, - []kv{{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}}, + []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}, {key: "key3", val: "val3"}}, }, { // watch by revision []kv{{"etcd", "revision_1"}, {"etcd", "revision_2"}, {"etcd", "revision_3"}}, []string{"etcd", "--rev", "2"}, - []kv{{"etcd", "revision_2"}, {"etcd", "revision_3"}}, + []kvExec{{key: "etcd", val: "revision_2"}, {key: "etcd", val: "revision_3"}}, }, { // watch 3 keys by range []kv{{"key1", "val1"}, {"key3", "val3"}, {"key2", "val2"}}, []string{"key", "key3", "--rev", "1"}, - []kv{{"key1", "val1"}, {"key2", "val2"}}, + []kvExec{{key: "key1", val: "val1"}, {key: "key2", val: "val2"}}, }, } @@ -97,7 +127,7 @@ func setupWatchArgs(cx ctlCtx, args []string) []string { return cmdArgs } -func ctlV3Watch(cx ctlCtx, args []string, kvs ...kv) error { +func ctlV3Watch(cx ctlCtx, args []string, kvs ...kvExec) error { cmdArgs := setupWatchArgs(cx, args) proc, err := spawnCmd(cmdArgs) @@ -119,6 +149,11 @@ func ctlV3Watch(cx ctlCtx, args []string, kvs ...kv) error { if _, err = proc.Expect(elem.val); err != nil { return err } + if elem.execOutput != "" { + if _, err = proc.Expect(elem.execOutput); err != nil { + return err + } + } } return proc.Stop() }