Merge pull request #4120 from xiang90/ctrl_w

*: support watcher cancellation inside watchStream
release-2.3
Xiang Li 2016-01-03 09:14:12 -08:00
commit c832d7f6e2
5 changed files with 83 additions and 6 deletions

View File

@ -19,6 +19,7 @@ import (
"fmt"
"io"
"os"
"strconv"
"strings"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
@ -67,7 +68,7 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
// TODO: support start and end revision
segs := strings.Split(l, " ")
if len(segs) != 2 {
fmt.Fprintf(os.Stderr, "Invalid watch request format: use watch key or watchprefix prefix\n")
fmt.Fprintf(os.Stderr, "Invalid watch request format: use \"watch [key]\", \"watchprefix [prefix]\" or \"cancel [watcher ID]\"\n")
continue
}
@ -77,8 +78,15 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
r = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte(segs[1])}}
case "watchprefix":
r = &pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte(segs[1])}}
case "cancel":
id, perr := strconv.ParseInt(segs[1], 10, 64)
if perr != nil {
fmt.Fprintf(os.Stderr, "Invalid cancel ID (%v)\n", perr)
continue
}
r = &pb.WatchRequest{CancelRequest: &pb.WatchCancelRequest{WatchId: id}}
default:
fmt.Fprintf(os.Stderr, "Invalid watch request format: use watch key or watchprefix prefix\n")
fmt.Fprintf(os.Stderr, "Invalid watch request type: use watch, watchprefix or cancel\n")
continue
}
@ -103,6 +111,8 @@ func recvLoop(wStream pb.Watch_WatchClient) {
// TODO: handle canceled/compacted and other control response types
case resp.Created:
fmt.Printf("watcher created: id %08x\n", resp.WatchId)
case resp.Canceled:
fmt.Printf("watcher canceled: id %08x\n", resp.WatchId)
default:
for _, ev := range resp.Events {
fmt.Printf("%s: %s %s\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))

View File

@ -91,8 +91,18 @@ func (sws *serverWatchStream) recvLoop() error {
WatchId: id,
Created: true,
}
case req.CancelRequest != nil:
id := req.CancelRequest.WatchId
err := sws.watchStream.Cancel(id)
if err == nil {
sws.ctrlStream <- &pb.WatchResponse{
// TODO: fill in response header.
WatchId: id,
Canceled: true,
}
}
// TODO: do we need to return error back to client?
default:
// TODO: support cancellation
panic("not implemented")
}
}

View File

@ -182,6 +182,7 @@ func (s *watchableStore) NewWatchStream() WatchStream {
return &watchStream{
watchable: s,
ch: make(chan []storagepb.Event, chanBufLen),
cancels: make(map[int64]CancelFunc),
}
}

View File

@ -15,11 +15,16 @@
package storage
import (
"errors"
"sync"
"github.com/coreos/etcd/storage/storagepb"
)
var (
ErrWatcherNotExist = errors.New("storage: watcher does not exist")
)
type WatchStream interface {
// Watch creates a watcher. The watcher watches the events happening or
// happened on the given key or key prefix from the given startRev.
@ -30,11 +35,17 @@ type WatchStream interface {
//
// The returned `id` is the ID of this watcher. It appears as WatchID
// in events that are sent to the created watcher through stream channel.
//
// TODO: remove the returned CancelFunc. Always use Cancel.
Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc)
// Chan returns a chan. All watched events will be sent to the returned chan.
Chan() <-chan []storagepb.Event
// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
// returned.
Cancel(id int64) error
// Close closes the WatchChan and release all related resources.
Close()
}
@ -49,7 +60,7 @@ type watchStream struct {
// nextID is the ID pre-allocated for next new watcher in this stream
nextID int64
closed bool
cancels []CancelFunc
cancels map[int64]CancelFunc
}
// TODO: return error if ws is closed?
@ -65,8 +76,7 @@ func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) (id int64,
_, c := ws.watchable.watch(key, prefix, startRev, id, ws.ch)
// TODO: cancelFunc needs to be removed from the cancels when it is called.
ws.cancels = append(ws.cancels, c)
ws.cancels[id] = c
return id, c
}
@ -74,6 +84,16 @@ func (ws *watchStream) Chan() <-chan []storagepb.Event {
return ws.ch
}
func (ws *watchStream) Cancel(id int64) error {
cancel, ok := ws.cancels[id]
if !ok {
return ErrWatcherNotExist
}
cancel()
delete(ws.cancels, id)
return nil
}
func (ws *watchStream) Close() {
ws.mu.Lock()
defer ws.mu.Unlock()

View File

@ -65,3 +65,39 @@ func TestWatcherWatchID(t *testing.T) {
cancel()
}
}
// TestWatchStreamCancel ensures cancel calls the cancel func of the watcher
// with given id inside watchStream.
func TestWatchStreamCancelWatcherByID(t *testing.T) {
s := WatchableKV(newWatchableStore(tmpPath))
defer cleanup(s, tmpPath)
w := s.NewWatchStream()
defer w.Close()
id, _ := w.Watch([]byte("foo"), false, 0)
tests := []struct {
cancelID int64
werr error
}{
// no error should be returned when cancel the created watcher.
{id, nil},
// not exist error should be returned when cancel again.
{id, ErrWatcherNotExist},
// not exist error should be returned when cancel a bad id.
{id + 1, ErrWatcherNotExist},
}
for i, tt := range tests {
gerr := w.Cancel(tt.cancelID)
if gerr != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, gerr, tt.werr)
}
}
if l := len(w.(*watchStream).cancels); l != 0 {
t.Errorf("cancels = %d, want 0", l)
}
}