From eda0eefc25d8215a0416b08d009579c34e0ab797 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 3 Jan 2016 00:11:26 -0800 Subject: [PATCH] *: support watcher cancellation inside watchStream --- etcdctlv3/command/watch_command.go | 14 ++++++++++-- etcdserver/api/v3rpc/watch.go | 12 +++++++++- storage/watchable_store.go | 1 + storage/watcher.go | 26 ++++++++++++++++++--- storage/watcher_test.go | 36 ++++++++++++++++++++++++++++++ 5 files changed, 83 insertions(+), 6 deletions(-) diff --git a/etcdctlv3/command/watch_command.go b/etcdctlv3/command/watch_command.go index e5290c32d..926094bcb 100644 --- a/etcdctlv3/command/watch_command.go +++ b/etcdctlv3/command/watch_command.go @@ -19,6 +19,7 @@ import ( "fmt" "io" "os" + "strconv" "strings" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" @@ -67,7 +68,7 @@ func watchCommandFunc(cmd *cobra.Command, args []string) { // TODO: support start and end revision segs := strings.Split(l, " ") if len(segs) != 2 { - fmt.Fprintf(os.Stderr, "Invalid watch request format: use watch key or watchprefix prefix\n") + fmt.Fprintf(os.Stderr, "Invalid watch request format: use \"watch [key]\", \"watchprefix [prefix]\" or \"cancel [watcher ID]\"\n") continue } @@ -77,8 +78,15 @@ func watchCommandFunc(cmd *cobra.Command, args []string) { r = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte(segs[1])}} case "watchprefix": r = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte(segs[1])}} + case "cancel": + id, perr := strconv.ParseInt(segs[1], 10, 64) + if perr != nil { + fmt.Fprintf(os.Stderr, "Invalid cancel ID (%v)\n", perr) + continue + } + r = &pb.WatchRequest{CancelRequest: &pb.WatchCancelRequest{WatchId: id}} default: - fmt.Fprintf(os.Stderr, "Invalid watch request format: use watch key or watchprefix prefix\n") + fmt.Fprintf(os.Stderr, "Invalid watch request type: use watch, watchprefix or cancel\n") continue } @@ -103,6 +111,8 @@ func recvLoop(wStream pb.Watch_WatchClient) { // TODO: handle canceled/compacted and other control response types case resp.Created: fmt.Printf("watcher created: id %08x\n", resp.WatchId) + case resp.Canceled: + fmt.Printf("watcher canceled: id %08x\n", resp.WatchId) default: for _, ev := range resp.Events { fmt.Printf("%s: %s %s\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value)) diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index db8c8ab9d..df8ca437d 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -91,8 +91,18 @@ func (sws *serverWatchStream) recvLoop() error { WatchId: id, Created: true, } + case req.CancelRequest != nil: + id := req.CancelRequest.WatchId + err := sws.watchStream.Cancel(id) + if err == nil { + sws.ctrlStream <- &pb.WatchResponse{ + // TODO: fill in response header. + WatchId: id, + Canceled: true, + } + } + // TODO: do we need to return error back to client? default: - // TODO: support cancellation panic("not implemented") } } diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 41847b9e0..f1ac98770 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -182,6 +182,7 @@ func (s *watchableStore) NewWatchStream() WatchStream { return &watchStream{ watchable: s, ch: make(chan []storagepb.Event, chanBufLen), + cancels: make(map[int64]CancelFunc), } } diff --git a/storage/watcher.go b/storage/watcher.go index cc66c81ef..d2d493292 100644 --- a/storage/watcher.go +++ b/storage/watcher.go @@ -15,11 +15,16 @@ package storage import ( + "errors" "sync" "github.com/coreos/etcd/storage/storagepb" ) +var ( + ErrWatcherNotExist = errors.New("storage: watcher does not exist") +) + type WatchStream interface { // Watch creates a watcher. The watcher watches the events happening or // happened on the given key or key prefix from the given startRev. @@ -30,11 +35,17 @@ type WatchStream interface { // // The returned `id` is the ID of this watcher. It appears as WatchID // in events that are sent to the created watcher through stream channel. + // + // TODO: remove the returned CancelFunc. Always use Cancel. Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc) // Chan returns a chan. All watched events will be sent to the returned chan. Chan() <-chan []storagepb.Event + // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be + // returned. + Cancel(id int64) error + // Close closes the WatchChan and release all related resources. Close() } @@ -49,7 +60,7 @@ type watchStream struct { // nextID is the ID pre-allocated for next new watcher in this stream nextID int64 closed bool - cancels []CancelFunc + cancels map[int64]CancelFunc } // TODO: return error if ws is closed? @@ -65,8 +76,7 @@ func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) (id int64, _, c := ws.watchable.watch(key, prefix, startRev, id, ws.ch) - // TODO: cancelFunc needs to be removed from the cancels when it is called. - ws.cancels = append(ws.cancels, c) + ws.cancels[id] = c return id, c } @@ -74,6 +84,16 @@ func (ws *watchStream) Chan() <-chan []storagepb.Event { return ws.ch } +func (ws *watchStream) Cancel(id int64) error { + cancel, ok := ws.cancels[id] + if !ok { + return ErrWatcherNotExist + } + cancel() + delete(ws.cancels, id) + return nil +} + func (ws *watchStream) Close() { ws.mu.Lock() defer ws.mu.Unlock() diff --git a/storage/watcher_test.go b/storage/watcher_test.go index af92aa743..cbd5593a5 100644 --- a/storage/watcher_test.go +++ b/storage/watcher_test.go @@ -65,3 +65,39 @@ func TestWatcherWatchID(t *testing.T) { cancel() } } + +// TestWatchStreamCancel ensures cancel calls the cancel func of the watcher +// with given id inside watchStream. +func TestWatchStreamCancelWatcherByID(t *testing.T) { + s := WatchableKV(newWatchableStore(tmpPath)) + defer cleanup(s, tmpPath) + + w := s.NewWatchStream() + defer w.Close() + + id, _ := w.Watch([]byte("foo"), false, 0) + + tests := []struct { + cancelID int64 + werr error + }{ + // no error should be returned when cancel the created watcher. + {id, nil}, + // not exist error should be returned when cancel again. + {id, ErrWatcherNotExist}, + // not exist error should be returned when cancel a bad id. + {id + 1, ErrWatcherNotExist}, + } + + for i, tt := range tests { + gerr := w.Cancel(tt.cancelID) + + if gerr != tt.werr { + t.Errorf("#%d: err = %v, want %v", i, gerr, tt.werr) + } + } + + if l := len(w.(*watchStream).cancels); l != 0 { + t.Errorf("cancels = %d, want 0", l) + } +}