diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index f6cae1a4c..d5f857d74 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -101,7 +101,7 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) { t.Fatalf("expected watcher channel, got nil") } readyc <- struct{}{} - evs := []*storagepb.Event{} + evs := []*clientv3.Event{} for i := 0; i < numKeyUpdates*2; i++ { resp, ok := <-prefixc if !ok { @@ -430,3 +430,75 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) { t.Fatalf("watch response expected in %v, but timed out", pi) } } + +func TestWatchEventType(t *testing.T) { + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + + client := cluster.RandClient() + ctx := context.Background() + watchChan := client.Watch(ctx, "/", clientv3.WithPrefix()) + + if _, err := client.Put(ctx, "/toDelete", "foo"); err != nil { + t.Fatalf("Put failed: %v", err) + } + if _, err := client.Put(ctx, "/toDelete", "bar"); err != nil { + t.Fatalf("Put failed: %v", err) + } + if _, err := client.Delete(ctx, "/toDelete"); err != nil { + t.Fatalf("Delete failed: %v", err) + } + lcr, err := client.Lease.Create(ctx, 1) + if err != nil { + t.Fatalf("lease create failed: %v", err) + } + if _, err := client.Put(ctx, "/toExpire", "foo", clientv3.WithLease(clientv3.LeaseID(lcr.ID))); err != nil { + t.Fatalf("Put failed: %v", err) + } + + tests := []struct { + et storagepb.Event_EventType + isCreate bool + isModify bool + }{{ + et: clientv3.EventTypePut, + isCreate: true, + }, { + et: clientv3.EventTypePut, + isModify: true, + }, { + et: clientv3.EventTypeDelete, + }, { + et: clientv3.EventTypePut, + isCreate: true, + }, { + et: clientv3.EventTypeDelete, + }} + + var res []*clientv3.Event + + for { + select { + case wres := <-watchChan: + res = append(res, wres.Events...) + case <-time.After(10 * time.Second): + t.Fatalf("Should receive %d events and then break out loop", len(tests)) + } + if len(res) == len(tests) { + break + } + } + + for i, tt := range tests { + ev := res[i] + if tt.et != ev.Type { + t.Errorf("#%d: event type want=%s, get=%s", i, tt.et, ev.Type) + } + if tt.isCreate && !ev.IsCreate() { + t.Errorf("#%d: event should be CreateEvent", i) + } + if tt.isModify && !ev.IsModify() { + t.Errorf("#%d: event should be ModifyEvent", i) + } + } +} diff --git a/clientv3/watch.go b/clientv3/watch.go index 5633caa5c..f9bdaff54 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -25,6 +25,13 @@ import ( "google.golang.org/grpc" ) +const ( + EventTypeDelete = storagepb.DELETE + EventTypePut = storagepb.PUT +) + +type Event storagepb.Event + type WatchChan <-chan WatchResponse type Watcher interface { @@ -41,7 +48,7 @@ type Watcher interface { type WatchResponse struct { Header pb.ResponseHeader - Events []*storagepb.Event + Events []*Event // CompactRevision is the minimum revision the watcher may receive. CompactRevision int64 @@ -52,6 +59,16 @@ type WatchResponse struct { Canceled bool } +// IsCreate returns true if the event tells that the key is newly created. +func (e *Event) IsCreate() bool { + return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision +} + +// IsModify returns true if the event tells that a new value is put on existing key. +func (e *Event) IsModify() bool { + return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision +} + // Err is the error value if this WatchResponse holds an error. func (wr *WatchResponse) Err() error { if wr.CompactRevision != 0 { @@ -352,10 +369,14 @@ func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool { w.mu.RLock() defer w.mu.RUnlock() ws, ok := w.streams[pbresp.WatchId] + events := make([]*Event, len(pbresp.Events)) + for i, ev := range pbresp.Events { + events[i] = (*Event)(ev) + } if ok { wr := &WatchResponse{ Header: *pbresp.Header, - Events: pbresp.Events, + Events: events, CompactRevision: pbresp.CompactRevision, Canceled: pbresp.Canceled} ws.recvc <- wr diff --git a/clientv3/watch_test.go b/clientv3/watch_test.go new file mode 100644 index 000000000..2220770a4 --- /dev/null +++ b/clientv3/watch_test.go @@ -0,0 +1,55 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "testing" + + "github.com/coreos/etcd/storage/storagepb" +) + +func TestEvent(t *testing.T) { + tests := []struct { + ev *Event + isCreate bool + isModify bool + }{{ + ev: &Event{ + Type: EventTypePut, + Kv: &storagepb.KeyValue{ + CreateRevision: 3, + ModRevision: 3, + }, + }, + isCreate: true, + }, { + ev: &Event{ + Type: EventTypePut, + Kv: &storagepb.KeyValue{ + CreateRevision: 3, + ModRevision: 4, + }, + }, + isModify: false, + }} + for i, tt := range tests { + if tt.isCreate && !tt.ev.IsCreate() { + t.Errorf("#%d: event should be Create event", i) + } + if tt.isModify && !tt.ev.IsModify() { + t.Errorf("#%d: event should be Modify event", i) + } + } +} diff --git a/contrib/recipes/watch.go b/contrib/recipes/watch.go index ca2e27f95..5f4ab2bb2 100644 --- a/contrib/recipes/watch.go +++ b/contrib/recipes/watch.go @@ -21,7 +21,7 @@ import ( ) // WaitEvents waits on a key until it observes the given events and returns the final one. -func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) { +func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*clientv3.Event, error) { wc := c.Watch(context.Background(), key, clientv3.WithRev(rev)) if wc == nil { return nil, ErrNoWatcher @@ -29,7 +29,7 @@ func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event return waitEvents(wc, evs), nil } -func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) { +func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*clientv3.Event, error) { wc := c.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithRev(rev)) if wc == nil { return nil, ErrNoWatcher @@ -37,7 +37,7 @@ func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storag return waitEvents(wc, evs), nil } -func waitEvents(wc clientv3.WatchChan, evs []storagepb.Event_EventType) *storagepb.Event { +func waitEvents(wc clientv3.WatchChan, evs []storagepb.Event_EventType) *clientv3.Event { i := 0 for wresp := range wc { for _, ev := range wresp.Events {