clientv3: add Err() to WatchResponse

Checking for number of events as a failure condition was a kludge.
release-2.3
Anthony Romano 2016-03-03 13:13:10 -08:00
parent dc7f9a89b0
commit 1e16758029
5 changed files with 65 additions and 17 deletions

View File

@ -139,7 +139,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
for kv == nil {
wr, ok := <-wch
if !ok || len(wr.Events) == 0 {
if !ok || wr.Err() != nil {
cancel()
return
}

View File

@ -20,7 +20,6 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
v3 "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
"github.com/coreos/etcd/storage/storagepb"
)
@ -52,10 +51,7 @@ func waitUpdate(ctx context.Context, client *v3.Client, key string, opts ...v3.O
if !ok {
return ctx.Err()
}
if len(wresp.Events) == 0 {
return v3rpc.ErrCompacted
}
return nil
return wresp.Err()
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {

View File

@ -23,6 +23,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
storagepb "github.com/coreos/etcd/storage/storagepb"
@ -347,8 +348,8 @@ func TestWatchInvalidFutureRevision(t *testing.T) {
if !ok {
t.Fatalf("expected wresp 'open'(ok true), but got ok %v", ok)
}
if !wresp.Canceled {
t.Fatalf("wresp.Canceled expected 'true', but got %v", wresp.Canceled)
if wresp.Err() != v3rpc.ErrFutureRev {
t.Fatalf("wresp.Err() expected ErrFutureRev, but got %v", wresp.Err())
}
_, ok = <-rch // ensure the channel is closed
@ -356,3 +357,42 @@ func TestWatchInvalidFutureRevision(t *testing.T) {
t.Fatalf("expected wresp 'closed'(ok false), but got ok %v", ok)
}
}
// TestWatchCompactRevision ensures the CompactRevision error is given on a
// compaction event ahead of a watcher.
func TestWatchCompactRevision(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
// set some keys
kv := clientv3.NewKV(clus.RandClient())
for i := 0; i < 5; i++ {
if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil {
t.Fatal(err)
}
}
w := clientv3.NewWatcher(clus.RandClient())
defer w.Close()
if err := kv.Compact(context.TODO(), 4); err != nil {
t.Fatal(err)
}
wch := w.Watch(context.Background(), "foo", clientv3.WithRev(2))
// get compacted error message
wresp, ok := <-wch
if !ok {
t.Fatalf("expected wresp, but got closed channel")
}
if wresp.Err() != v3rpc.ErrCompacted {
t.Fatalf("wresp.Err() expected ErrCompacteed, but got %v", wresp.Err())
}
// ensure the channel is closed
if wresp, ok = <-wch; ok {
t.Fatalf("expected closed channel, but got %v", wresp)
}
}

View File

@ -20,6 +20,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
storagepb "github.com/coreos/etcd/storage/storagepb"
)
@ -41,14 +42,25 @@ type Watcher interface {
type WatchResponse struct {
Header pb.ResponseHeader
Events []*storagepb.Event
// CompactRevision is set to the compaction revision that
// caused the watcher to cancel.
// CompactRevision is the minimum revision the watcher may receive.
CompactRevision int64
// Canceled is 'true' when it has received wrong watch start revision.
// Canceled is set to indicate the channel is about to close.
Canceled bool
}
// Err is the error value if this WatchResponse holds an error.
func (wr *WatchResponse) Err() error {
if wr.CompactRevision != 0 {
return v3rpc.ErrCompacted
}
if wr.Canceled {
return v3rpc.ErrFutureRev
}
return nil
}
// watcher implements the Watcher interface
type watcher struct {
c *Client
@ -179,12 +191,13 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
return
}
if resp.Canceled || resp.CompactRevision != 0 {
// compaction after start revision
// a cancel at id creation time means the start revision has
// been compacted out of the store
ret := make(chan WatchResponse, 1)
ret <- WatchResponse{
Header: *resp.Header,
CompactRevision: resp.CompactRevision,
Canceled: resp.Canceled}
Canceled: true}
close(ret)
pendingReq.retc <- ret
return
@ -375,8 +388,7 @@ func (w *watcher) serveStream(ws *watcherStream) {
}
select {
case outc <- *curWr:
if len(wrs[0].Events) == 0 {
// compaction message
if wrs[0].Err() != nil {
closing = true
break
}

View File

@ -53,7 +53,7 @@ func snapshotCommandFunc(cmd *cobra.Command, args []string) {
func snapshotToStdout(c *clientv3.Client) {
// must explicitly fetch first revision since no retry on stdout
wr := <-c.Watch(context.TODO(), "", clientv3.WithPrefix(), clientv3.WithRev(1))
if len(wr.Events) > 0 {
if wr.Err() == nil {
wr.CompactRevision = 1
}
if rev := snapshot(os.Stdout, c, wr.CompactRevision+1); rev != 0 {
@ -111,7 +111,7 @@ func snapshot(w io.Writer, c *clientv3.Client, rev int64) int64 {
wc := s.SyncUpdates(context.TODO())
for wr := range wc {
if len(wr.Events) == 0 {
if wr.Err() != nil {
return wr.CompactRevision
}
for _, ev := range wr.Events {