Merge pull request #4602 from gyuho/watch_option

*: combine Watch, WatchPrefix with variadic function
release-2.3
Gyu-Ho Lee 2016-02-23 20:26:54 -08:00
commit 783e6f6b0d
9 changed files with 104 additions and 68 deletions

View File

@ -35,7 +35,7 @@ func ExampleWatcher_watch() {
wc := clientv3.NewWatcher(cli)
defer wc.Close()
rch := wc.Watch(context.Background(), "foo", 0)
rch := wc.Watch(context.Background(), "foo")
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
@ -57,7 +57,7 @@ func ExampleWatcher_watchPrefix() {
wc := clientv3.NewWatcher(cli)
defer wc.Close()
rch := wc.WatchPrefix(context.Background(), "foo", 0)
rch := wc.Watch(context.Background(), "foo", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)

View File

@ -350,7 +350,7 @@ func TestKVCompact(t *testing.T) {
wc := clientv3.NewWatcher(clus.RandClient())
defer wc.Close()
wchan := wc.Watch(ctx, "foo", 3)
wchan := wc.Watch(ctx, "foo", clientv3.WithRev(3))
if wr := <-wchan; wr.CompactRevision != 7 {
t.Fatalf("wchan CompactRevision got %v, want 7", wr.CompactRevision)

View File

@ -73,7 +73,7 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) {
for _, k := range keys {
// key watcher
go func(key string) {
ch := wctx.w.Watch(context.TODO(), key, 0)
ch := wctx.w.Watch(context.TODO(), key)
if ch == nil {
t.Fatalf("expected watcher channel, got nil")
}
@ -94,7 +94,7 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) {
}
// prefix watcher on "b" (bar and baz)
go func() {
prefixc := wctx.w.WatchPrefix(context.TODO(), "b", 0)
prefixc := wctx.w.Watch(context.TODO(), "b", clientv3.WithPrefix())
if prefixc == nil {
t.Fatalf("expected watcher channel, got nil")
}
@ -181,7 +181,7 @@ func testWatchReconnRequest(t *testing.T, wctx *watchctx) {
}
}()
// should reconnect when requesting watch
if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil {
if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
t.Fatalf("expected non-nil channel")
}
@ -200,7 +200,7 @@ func TestWatchReconnInit(t *testing.T) {
}
func testWatchReconnInit(t *testing.T, wctx *watchctx) {
if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil {
if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
t.Fatalf("expected non-nil channel")
}
// take down watcher connection
@ -216,7 +216,7 @@ func TestWatchReconnRunning(t *testing.T) {
}
func testWatchReconnRunning(t *testing.T, wctx *watchctx) {
if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil {
if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
t.Fatalf("expected non-nil channel")
}
putAndWatch(t, wctx, "a", "a")
@ -233,7 +233,7 @@ func TestWatchCancelInit(t *testing.T) {
func testWatchCancelInit(t *testing.T, wctx *watchctx) {
ctx, cancel := context.WithCancel(context.Background())
if wctx.ch = wctx.w.Watch(ctx, "a", 0); wctx.ch == nil {
if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil {
t.Fatalf("expected non-nil watcher channel")
}
cancel()
@ -254,7 +254,7 @@ func TestWatchCancelRunning(t *testing.T) {
func testWatchCancelRunning(t *testing.T, wctx *watchctx) {
ctx, cancel := context.WithCancel(context.Background())
if wctx.ch = wctx.w.Watch(ctx, "a", 0); wctx.ch == nil {
if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil {
t.Fatalf("expected non-nil watcher channel")
}
if _, err := wctx.kv.Put(ctx, "a", "a"); err != nil {

View File

@ -15,6 +15,8 @@
package clientv3
import (
"reflect"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
)
@ -36,10 +38,12 @@ type Op struct {
// for range
limit int64
rev int64
sort *SortOption
serializable bool
// for range, watch
rev int64
// for put
val []byte
leaseID lease.LeaseID
@ -65,6 +69,27 @@ func (op Op) toRequestUnion() *pb.RequestUnion {
}
}
func (op Op) toWatchRequest() *watchRequest {
switch op.t {
case tRange:
key := string(op.key)
prefix := ""
if op.end != nil {
prefix = key
key = ""
}
wr := &watchRequest{
key: key,
prefix: prefix,
rev: op.rev,
}
return wr
default:
panic("Only for tRange")
}
}
func (op Op) isWrite() bool {
return op.t != tRange
}
@ -111,6 +136,24 @@ func OpPut(key, val string, opts ...OpOption) Op {
return ret
}
func opWatch(key string, opts ...OpOption) Op {
ret := Op{t: tRange, key: []byte(key)}
ret.applyOpts(opts)
switch {
case ret.end != nil && !reflect.DeepEqual(ret.end, getPrefix(ret.key)):
panic("only supports single keys or prefixes")
case ret.leaseID != 0:
panic("unexpected lease in watch")
case ret.limit != 0:
panic("unexpected limit in watch")
case ret.sort != nil:
panic("unexpected sort in watch")
case ret.serializable != false:
panic("unexpected serializable in watch")
}
return ret
}
func (op *Op) applyOpts(opts []OpOption) {
for _, opt := range opts {
opt(op)
@ -129,8 +172,7 @@ func WithLease(leaseID lease.LeaseID) OpOption {
func WithLimit(n int64) OpOption { return func(op *Op) { op.limit = n } }
// WithRev specifies the store revision for 'Get' request.
//
// TODO: support Watch API
// Or the start revision of 'Watch' request.
func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } }
// WithSort specifies the ordering in 'Get' request. It requires
@ -143,25 +185,28 @@ func WithSort(target SortTarget, order SortOrder) OpOption {
}
}
// WithPrefix enables 'Get' or 'Delete' requests to operate on the
// keys with matching prefix. For example, 'Get(foo, WithPrefix())'
func getPrefix(key []byte) []byte {
end := make([]byte, len(key))
copy(end, key)
for i := len(end) - 1; i >= 0; i-- {
if end[i] < 0xff {
end[i] = end[i] + 1
end = end[:i+1]
return end
}
}
// next prefix does not exist (e.g., 0xffff);
// default to WithFromKey policy
end = []byte{0}
return end
}
// WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate
// on the keys with matching prefix. For example, 'Get(foo, WithPrefix())'
// can return 'foo1', 'foo2', and so on.
//
// TODO: support Watch API
func WithPrefix() OpOption {
return func(op *Op) {
op.end = make([]byte, len(op.key))
copy(op.end, op.key)
for i := len(op.end) - 1; i >= 0; i-- {
if op.end[i] < 0xff {
op.end[i] = op.end[i] + 1
op.end = op.end[:i+1]
return
}
}
// next prefix does not exist (e.g., 0xffff);
// default to WithFromKey policy
op.end = []byte{0}
op.end = getPrefix(op.key)
}
}

View File

@ -116,7 +116,7 @@ func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan {
// get all events since revision (or get non-compacted revision, if
// rev is too far behind)
wch := wapi.WatchPrefix(ctx, s.prefix, s.rev)
wch := wapi.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev))
for wr := range wch {
respchan <- wr
}

View File

@ -27,17 +27,12 @@ import (
type WatchChan <-chan WatchResponse
type Watcher interface {
// Watch watches on a single key. The watched events will be returned
// Watch watches on a key or prefix. The watched events will be returned
// through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed.
Watch(ctx context.Context, key string, rev int64) WatchChan
// WatchPrefix watches on a prefix. The watched events will be returned
// through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed.
WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan
// 'opts' can be: 'WithRev' and/or 'WitchPrefix'.
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
// Close closes the watcher and cancels all watch requests.
Close() error
@ -127,27 +122,16 @@ func NewWatcher(c *Client) Watcher {
return w
}
func (w *watcher) Watch(ctx context.Context, key string, rev int64) WatchChan {
return w.watch(ctx, key, "", rev)
}
// Watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
ow := opWatch(key, opts...)
func (w *watcher) WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan {
return w.watch(ctx, "", prefix, rev)
}
wr := ow.toWatchRequest()
wr.ctx = ctx
func (w *watcher) Close() error {
select {
case w.stopc <- struct{}{}:
case <-w.donec:
}
<-w.donec
return <-w.errc
}
// watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) WatchChan {
retc := make(chan chan WatchResponse, 1)
wr := &watchRequest{ctx: ctx, key: key, prefix: prefix, rev: rev, retc: retc}
wr.retc = retc
// submit request
select {
case w.reqc <- wr:
@ -167,6 +151,15 @@ func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) Watc
}
}
func (w *watcher) Close() error {
select {
case w.stopc <- struct{}{}:
case <-w.donec:
}
<-w.donec
return <-w.errc
}
func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
if pendingReq == nil {
// no pending request; ignore

View File

@ -23,7 +23,7 @@ import (
// WaitEvents waits on a key until it observes the given events and returns the final one.
func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
w := clientv3.NewWatcher(c)
wc := w.Watch(context.Background(), key, rev)
wc := w.Watch(context.Background(), key, clientv3.WithRev(rev))
if wc == nil {
w.Close()
return nil, ErrNoWatcher
@ -33,7 +33,7 @@ func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event
func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
w := clientv3.NewWatcher(c)
wc := w.WatchPrefix(context.Background(), prefix, rev)
wc := w.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithRev(rev))
if wc == nil {
w.Close()
return nil, ErrNoWatcher

View File

@ -54,7 +54,7 @@ func snapshotToStdout(c *clientv3.Client) {
// must explicitly fetch first revision since no retry on stdout
wapi := clientv3.NewWatcher(c)
defer wapi.Close()
wr := <-wapi.WatchPrefix(context.TODO(), "", 1)
wr := <-wapi.Watch(context.TODO(), "", clientv3.WithPrefix(), clientv3.WithRev(1))
if len(wr.Events) > 0 {
wr.CompactRevision = 1
}

View File

@ -60,12 +60,11 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
c := mustClientFromCmd(cmd)
w := clientv3.NewWatcher(c)
var wc clientv3.WatchChan
if !watchPrefix {
wc = w.Watch(context.TODO(), args[0], watchRev)
} else {
wc = w.Watch(context.TODO(), args[0], watchRev)
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
if watchPrefix {
opts = append(opts, clientv3.WithPrefix())
}
wc := w.Watch(context.TODO(), args[0], opts...)
printWatchCh(wc)
err := w.Close()
if err == nil {
@ -114,12 +113,11 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) {
if err != nil {
key = moreargs[0]
}
var ch clientv3.WatchChan
opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
if watchPrefix {
ch = w.WatchPrefix(context.TODO(), key, watchRev)
} else {
ch = w.Watch(context.TODO(), key, watchRev)
opts = append(opts, clientv3.WithPrefix())
}
ch := w.Watch(context.TODO(), key, opts...)
go printWatchCh(ch)
}
}