tests/integration: Demonstrate manual progress notification race
This will fail basically every time, as the progress notification request catches the watcher in an asynchronised state. Signed-off-by: Peter Wortmann <peter.wortmann@skao.int>storage-doc
parent
b504ac1840
commit
af25936fb7
|
@ -1397,3 +1397,71 @@ func TestV3WatchCloseCancelRace(t *testing.T) {
|
||||||
t.Fatalf("expected %s watch, got %s", expected, minWatches)
|
t.Fatalf("expected %s watch, got %s", expected, minWatches)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestV3WatchProgressWaitsForSync checks that progress notifications
|
||||||
|
// don't get sent until the watcher is synchronised
|
||||||
|
func TestV3WatchProgressWaitsForSync(t *testing.T) {
|
||||||
|
|
||||||
|
// Disable for gRPC proxy, as it does not support requesting
|
||||||
|
// progress notifications
|
||||||
|
if integration.ThroughProxy {
|
||||||
|
t.Skip("grpc proxy currently does not support requesting progress notifications")
|
||||||
|
}
|
||||||
|
|
||||||
|
integration.BeforeTest(t)
|
||||||
|
|
||||||
|
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
client := clus.RandClient()
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Write a couple values into key to make sure there's a
|
||||||
|
// non-trivial amount of history.
|
||||||
|
count := 1001
|
||||||
|
t.Logf("Writing key 'foo' %d times", count)
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
_, err := client.Put(ctx, "foo", fmt.Sprintf("bar%d", i))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create watch channel starting at revision 1 (i.e. it starts
|
||||||
|
// unsynced because of the update above)
|
||||||
|
wch := client.Watch(ctx, "foo", clientv3.WithRev(1))
|
||||||
|
|
||||||
|
// Immediately request a progress notification. As the client
|
||||||
|
// is unsynchronised, the server will have to defer the
|
||||||
|
// notification internally.
|
||||||
|
err := client.RequestProgress(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Verify that we get the watch responses first. Note that
|
||||||
|
// events might be spread across multiple packets.
|
||||||
|
var event_count = 0
|
||||||
|
for event_count < count {
|
||||||
|
wr := <-wch
|
||||||
|
if wr.Err() != nil {
|
||||||
|
t.Fatal(fmt.Errorf("watch error: %w", wr.Err()))
|
||||||
|
}
|
||||||
|
if wr.IsProgressNotify() {
|
||||||
|
t.Fatal("Progress notification from unsynced client!")
|
||||||
|
}
|
||||||
|
if wr.Header.Revision != int64(count+1) {
|
||||||
|
t.Fatal("Incomplete watch response!")
|
||||||
|
}
|
||||||
|
event_count += len(wr.Events)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ... followed by the requested progress notification
|
||||||
|
wr2 := <-wch
|
||||||
|
if wr2.Err() != nil {
|
||||||
|
t.Fatal(fmt.Errorf("watch error: %w", wr2.Err()))
|
||||||
|
}
|
||||||
|
if !wr2.IsProgressNotify() {
|
||||||
|
t.Fatal("Did not receive progress notification!")
|
||||||
|
}
|
||||||
|
if wr2.Header.Revision != int64(count+1) {
|
||||||
|
t.Fatal("Wrong revision in progress notification!")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue