Merge pull request #4987 from hongchaodeng/ev

expose APIs to recognize event type
release-3.0
Xiang Li 2016-04-07 10:18:25 -07:00
commit 2e5ee26300
4 changed files with 154 additions and 6 deletions

View File

@ -101,7 +101,7 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) {
t.Fatalf("expected watcher channel, got nil")
}
readyc <- struct{}{}
evs := []*storagepb.Event{}
evs := []*clientv3.Event{}
for i := 0; i < numKeyUpdates*2; i++ {
resp, ok := <-prefixc
if !ok {
@ -430,3 +430,75 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
t.Fatalf("watch response expected in %v, but timed out", pi)
}
}
func TestWatchEventType(t *testing.T) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
client := cluster.RandClient()
ctx := context.Background()
watchChan := client.Watch(ctx, "/", clientv3.WithPrefix())
if _, err := client.Put(ctx, "/toDelete", "foo"); err != nil {
t.Fatalf("Put failed: %v", err)
}
if _, err := client.Put(ctx, "/toDelete", "bar"); err != nil {
t.Fatalf("Put failed: %v", err)
}
if _, err := client.Delete(ctx, "/toDelete"); err != nil {
t.Fatalf("Delete failed: %v", err)
}
lcr, err := client.Lease.Create(ctx, 1)
if err != nil {
t.Fatalf("lease create failed: %v", err)
}
if _, err := client.Put(ctx, "/toExpire", "foo", clientv3.WithLease(clientv3.LeaseID(lcr.ID))); err != nil {
t.Fatalf("Put failed: %v", err)
}
tests := []struct {
et storagepb.Event_EventType
isCreate bool
isModify bool
}{{
et: clientv3.EventTypePut,
isCreate: true,
}, {
et: clientv3.EventTypePut,
isModify: true,
}, {
et: clientv3.EventTypeDelete,
}, {
et: clientv3.EventTypePut,
isCreate: true,
}, {
et: clientv3.EventTypeDelete,
}}
var res []*clientv3.Event
for {
select {
case wres := <-watchChan:
res = append(res, wres.Events...)
case <-time.After(10 * time.Second):
t.Fatalf("Should receive %d events and then break out loop", len(tests))
}
if len(res) == len(tests) {
break
}
}
for i, tt := range tests {
ev := res[i]
if tt.et != ev.Type {
t.Errorf("#%d: event type want=%s, get=%s", i, tt.et, ev.Type)
}
if tt.isCreate && !ev.IsCreate() {
t.Errorf("#%d: event should be CreateEvent", i)
}
if tt.isModify && !ev.IsModify() {
t.Errorf("#%d: event should be ModifyEvent", i)
}
}
}

View File

@ -25,6 +25,13 @@ import (
"google.golang.org/grpc"
)
const (
EventTypeDelete = storagepb.DELETE
EventTypePut = storagepb.PUT
)
type Event storagepb.Event
type WatchChan <-chan WatchResponse
type Watcher interface {
@ -41,7 +48,7 @@ type Watcher interface {
type WatchResponse struct {
Header pb.ResponseHeader
Events []*storagepb.Event
Events []*Event
// CompactRevision is the minimum revision the watcher may receive.
CompactRevision int64
@ -52,6 +59,16 @@ type WatchResponse struct {
Canceled bool
}
// IsCreate returns true if the event tells that the key is newly created.
func (e *Event) IsCreate() bool {
return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
}
// IsModify returns true if the event tells that a new value is put on existing key.
func (e *Event) IsModify() bool {
return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
}
// Err is the error value if this WatchResponse holds an error.
func (wr *WatchResponse) Err() error {
if wr.CompactRevision != 0 {
@ -352,10 +369,14 @@ func (w *watcher) dispatchEvent(pbresp *pb.WatchResponse) bool {
w.mu.RLock()
defer w.mu.RUnlock()
ws, ok := w.streams[pbresp.WatchId]
events := make([]*Event, len(pbresp.Events))
for i, ev := range pbresp.Events {
events[i] = (*Event)(ev)
}
if ok {
wr := &WatchResponse{
Header: *pbresp.Header,
Events: pbresp.Events,
Events: events,
CompactRevision: pbresp.CompactRevision,
Canceled: pbresp.Canceled}
ws.recvc <- wr

55
clientv3/watch_test.go Normal file
View File

@ -0,0 +1,55 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clientv3
import (
"testing"
"github.com/coreos/etcd/storage/storagepb"
)
func TestEvent(t *testing.T) {
tests := []struct {
ev *Event
isCreate bool
isModify bool
}{{
ev: &Event{
Type: EventTypePut,
Kv: &storagepb.KeyValue{
CreateRevision: 3,
ModRevision: 3,
},
},
isCreate: true,
}, {
ev: &Event{
Type: EventTypePut,
Kv: &storagepb.KeyValue{
CreateRevision: 3,
ModRevision: 4,
},
},
isModify: false,
}}
for i, tt := range tests {
if tt.isCreate && !tt.ev.IsCreate() {
t.Errorf("#%d: event should be Create event", i)
}
if tt.isModify && !tt.ev.IsModify() {
t.Errorf("#%d: event should be Modify event", i)
}
}
}

View File

@ -21,7 +21,7 @@ import (
)
// WaitEvents waits on a key until it observes the given events and returns the final one.
func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*clientv3.Event, error) {
wc := c.Watch(context.Background(), key, clientv3.WithRev(rev))
if wc == nil {
return nil, ErrNoWatcher
@ -29,7 +29,7 @@ func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event
return waitEvents(wc, evs), nil
}
func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) {
func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*clientv3.Event, error) {
wc := c.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithRev(rev))
if wc == nil {
return nil, ErrNoWatcher
@ -37,7 +37,7 @@ func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storag
return waitEvents(wc, evs), nil
}
func waitEvents(wc clientv3.WatchChan, evs []storagepb.Event_EventType) *storagepb.Event {
func waitEvents(wc clientv3.WatchChan, evs []storagepb.Event_EventType) *clientv3.Event {
i := 0
for wresp := range wc {
for _, ev := range wresp.Events {