integration: put keys after watcher ack in TestV3WatchFromCurrentRevision
Watcher would miss events since the keys would be created after sending the watcher request but before etcd registered the watcher.release-2.3
parent
c15b2a5077
commit
019a145304
|
@ -45,10 +45,6 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
|||
Key: []byte("foo")}}},
|
||||
|
||||
[]*pb.WatchResponse{
|
||||
{
|
||||
Header: &pb.ResponseHeader{Revision: 1},
|
||||
Created: true,
|
||||
},
|
||||
{
|
||||
Header: &pb.ResponseHeader{Revision: 2},
|
||||
Created: false,
|
||||
|
@ -68,12 +64,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
|||
CreateRequest: &pb.WatchCreateRequest{
|
||||
Key: []byte("helloworld")}}},
|
||||
|
||||
[]*pb.WatchResponse{
|
||||
{
|
||||
Header: &pb.ResponseHeader{Revision: 1},
|
||||
Created: true,
|
||||
},
|
||||
},
|
||||
[]*pb.WatchResponse{},
|
||||
},
|
||||
// watch the prefix, matching
|
||||
{
|
||||
|
@ -83,10 +74,6 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
|||
Prefix: []byte("foo")}}},
|
||||
|
||||
[]*pb.WatchResponse{
|
||||
{
|
||||
Header: &pb.ResponseHeader{Revision: 1},
|
||||
Created: true,
|
||||
},
|
||||
{
|
||||
Header: &pb.ResponseHeader{Revision: 2},
|
||||
Created: false,
|
||||
|
@ -106,12 +93,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
|||
CreateRequest: &pb.WatchCreateRequest{
|
||||
Prefix: []byte("helloworld")}}},
|
||||
|
||||
[]*pb.WatchResponse{
|
||||
{
|
||||
Header: &pb.ResponseHeader{Revision: 1},
|
||||
Created: true,
|
||||
},
|
||||
},
|
||||
[]*pb.WatchResponse{},
|
||||
},
|
||||
// multiple puts, one watcher with matching key
|
||||
{
|
||||
|
@ -121,10 +103,6 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
|||
Key: []byte("foo")}}},
|
||||
|
||||
[]*pb.WatchResponse{
|
||||
{
|
||||
Header: &pb.ResponseHeader{Revision: 1},
|
||||
Created: true,
|
||||
},
|
||||
{
|
||||
Header: &pb.ResponseHeader{Revision: 2},
|
||||
Created: false,
|
||||
|
@ -165,10 +143,6 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
|||
Prefix: []byte("foo")}}},
|
||||
|
||||
[]*pb.WatchResponse{
|
||||
{
|
||||
Header: &pb.ResponseHeader{Revision: 1},
|
||||
Created: true,
|
||||
},
|
||||
{
|
||||
Header: &pb.ResponseHeader{Revision: 2},
|
||||
Created: false,
|
||||
|
@ -218,6 +192,23 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
|||
t.Fatalf("#%d: wStream.Send error: %v", i, err)
|
||||
}
|
||||
|
||||
// ensure watcher request created a new watcher
|
||||
cresp, err := wStream.Recv()
|
||||
if err != nil {
|
||||
t.Errorf("#%d: wStream.Recv error: %v", i, err)
|
||||
continue
|
||||
}
|
||||
if cresp.Created != true {
|
||||
t.Errorf("#%d: did not create watchid, got +%v", i, cresp)
|
||||
continue
|
||||
}
|
||||
createdWatchId := cresp.WatchId
|
||||
if cresp.Header == nil || cresp.Header.Revision != 1 {
|
||||
t.Errorf("#%d: header revision got +%v, wanted revison 1", i, cresp)
|
||||
continue
|
||||
}
|
||||
|
||||
// asynchronously create keys
|
||||
go func() {
|
||||
for _, k := range tt.putKeys {
|
||||
kvc := clus.RandClient().KV
|
||||
|
@ -228,7 +219,7 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
var createdWatchId int64
|
||||
// check stream results
|
||||
for j, wresp := range tt.wresps {
|
||||
resp, err := wStream.Recv()
|
||||
if err != nil {
|
||||
|
@ -245,9 +236,6 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
|
|||
if wresp.Created != resp.Created {
|
||||
t.Errorf("#%d.%d: resp.Created got = %v, want = %v", i, j, resp.Created, wresp.Created)
|
||||
}
|
||||
if resp.Created {
|
||||
createdWatchId = resp.WatchId
|
||||
}
|
||||
if resp.WatchId != createdWatchId {
|
||||
t.Errorf("#%d.%d: resp.WatchId got = %d, want = %d", i, j, resp.WatchId, createdWatchId)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue