From 019a1453047017a49b94bde47fbefd69e7f2a2dd Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Sun, 14 Feb 2016 17:47:38 -0800 Subject: [PATCH] integration: put keys after watcher ack in TestV3WatchFromCurrentRevision Watcher would miss events since the keys would be created after sending the watcher request but before etcd registered the watcher. --- integration/v3_watch_test.go | 52 ++++++++++++++---------------------- 1 file changed, 20 insertions(+), 32 deletions(-) diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index ee19f3bc5..a19b14f6f 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -45,10 +45,6 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { Key: []byte("foo")}}}, []*pb.WatchResponse{ - { - Header: &pb.ResponseHeader{Revision: 1}, - Created: true, - }, { Header: &pb.ResponseHeader{Revision: 2}, Created: false, @@ -68,12 +64,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { CreateRequest: &pb.WatchCreateRequest{ Key: []byte("helloworld")}}}, - []*pb.WatchResponse{ - { - Header: &pb.ResponseHeader{Revision: 1}, - Created: true, - }, - }, + []*pb.WatchResponse{}, }, // watch the prefix, matching { @@ -83,10 +74,6 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { Prefix: []byte("foo")}}}, []*pb.WatchResponse{ - { - Header: &pb.ResponseHeader{Revision: 1}, - Created: true, - }, { Header: &pb.ResponseHeader{Revision: 2}, Created: false, @@ -106,12 +93,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { CreateRequest: &pb.WatchCreateRequest{ Prefix: []byte("helloworld")}}}, - []*pb.WatchResponse{ - { - Header: &pb.ResponseHeader{Revision: 1}, - Created: true, - }, - }, + []*pb.WatchResponse{}, }, // multiple puts, one watcher with matching key { @@ -121,10 +103,6 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { Key: []byte("foo")}}}, []*pb.WatchResponse{ - { - Header: &pb.ResponseHeader{Revision: 1}, - Created: true, - }, { Header: &pb.ResponseHeader{Revision: 2}, Created: false, @@ -165,10 +143,6 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { Prefix: []byte("foo")}}}, []*pb.WatchResponse{ - { - Header: &pb.ResponseHeader{Revision: 1}, - Created: true, - }, { Header: &pb.ResponseHeader{Revision: 2}, Created: false, @@ -218,6 +192,23 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { t.Fatalf("#%d: wStream.Send error: %v", i, err) } + // ensure watcher request created a new watcher + cresp, err := wStream.Recv() + if err != nil { + t.Errorf("#%d: wStream.Recv error: %v", i, err) + continue + } + if cresp.Created != true { + t.Errorf("#%d: did not create watchid, got +%v", i, cresp) + continue + } + createdWatchId := cresp.WatchId + if cresp.Header == nil || cresp.Header.Revision != 1 { + t.Errorf("#%d: header revision got +%v, wanted revison 1", i, cresp) + continue + } + + // asynchronously create keys go func() { for _, k := range tt.putKeys { kvc := clus.RandClient().KV @@ -228,7 +219,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { } }() - var createdWatchId int64 + // check stream results for j, wresp := range tt.wresps { resp, err := wStream.Recv() if err != nil { @@ -245,9 +236,6 @@ func TestV3WatchFromCurrentRevision(t *testing.T) { if wresp.Created != resp.Created { t.Errorf("#%d.%d: resp.Created got = %v, want = %v", i, j, resp.Created, wresp.Created) } - if resp.Created { - createdWatchId = resp.WatchId - } if resp.WatchId != createdWatchId { t.Errorf("#%d.%d: resp.WatchId got = %d, want = %d", i, j, resp.WatchId, createdWatchId) }