tests: Enable progress notify in linearizability tests

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
dependabot/go_modules/go.uber.org/atomic-1.10.0
Marek Siarkowicz 2023-02-02 11:37:54 +01:00 committed by Benjamin Wang
parent 586eaccc4d
commit 39d98522d6
3 changed files with 24 additions and 5 deletions

View File

@ -185,6 +185,7 @@ type EtcdProcessClusterConfig struct {
WarningUnaryRequestDuration time.Duration
ExperimentalWarningUnaryRequestDuration time.Duration
PeerProxy bool
WatchProcessNotifyInterval time.Duration
}
func DefaultConfig() *EtcdProcessClusterConfig {
@ -336,6 +337,10 @@ func WithCompactionBatchLimit(limit int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit }
}
func WithWatchProcessNotifyInterval(interval time.Duration) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.WatchProcessNotifyInterval = interval }
}
func WithPeerProxy(enabled bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.PeerProxy = enabled }
}
@ -573,6 +578,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
if cfg.ExperimentalWarningUnaryRequestDuration != 0 {
args = append(args, "--experimental-warning-unary-request-duration", cfg.ExperimentalWarningUnaryRequestDuration.String())
}
if cfg.WatchProcessNotifyInterval != 0 {
args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String())
}
if cfg.SnapshotCatchUpEntries > 0 {
args = append(args, "--experimental-snapshot-catchup-entries", fmt.Sprintf("%d", cfg.SnapshotCatchUpEntries))
}

View File

@ -103,6 +103,7 @@ func TestLinearizability(t *testing.T) {
e2e.WithSnapshotCount(100),
e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
e2e.WithWatchProcessNotifyInterval(100*time.Millisecond),
),
})
scenarios = append(scenarios, scenario{
@ -114,6 +115,7 @@ func TestLinearizability(t *testing.T) {
e2e.WithPeerProxy(true),
e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
e2e.WithWatchProcessNotifyInterval(100*time.Millisecond),
),
})
}
@ -177,7 +179,8 @@ func TestLinearizability(t *testing.T) {
waitBetweenTriggers: waitBetweenFailpointTriggers,
}, *scenario.traffic)
forcestopCluster(clus)
validateWatchResponses(t, watchResponses)
watchProgressNotifyEnabled := clus.Cfg.WatchProcessNotifyInterval != 0
validateWatchResponses(t, watchResponses, watchProgressNotifyEnabled)
longestHistory, remainingEvents := watchEventHistory(watchResponses)
validateEventsMatch(t, longestHistory, remainingEvents)
operations = patchOperationBasedOnWatchEvents(operations, longestHistory)

View File

@ -56,6 +56,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger
wg.Wait()
return memberResponses
}
func watchMember(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (resps []watchResponse) {
var lastRevision int64 = 0
for {
@ -64,7 +65,7 @@ func watchMember(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (resps
return resps
default:
}
for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1)) {
for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1), clientv3.WithProgressNotify()) {
resps = append(resps, watchResponse{resp, time.Now()})
lastRevision = resp.Header.Revision
if resp.Err() != nil {
@ -74,18 +75,22 @@ func watchMember(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (resps
}
}
func validateWatchResponses(t *testing.T, responses [][]watchResponse) {
func validateWatchResponses(t *testing.T, responses [][]watchResponse, expectProgressNotify bool) {
for _, memberResponses := range responses {
validateMemberWatchResponses(t, memberResponses)
validateMemberWatchResponses(t, memberResponses, expectProgressNotify)
}
}
func validateMemberWatchResponses(t *testing.T, responses []watchResponse) {
func validateMemberWatchResponses(t *testing.T, responses []watchResponse, expectProgressNotify bool) {
var gotProgressNotify = false
var lastRevision int64 = 1
for _, resp := range responses {
if resp.Header.Revision < lastRevision {
t.Errorf("Revision should never decrease")
}
if resp.IsProgressNotify() && resp.Header.Revision == lastRevision {
gotProgressNotify = true
}
if resp.Header.Revision == lastRevision && len(resp.Events) != 0 {
t.Errorf("Got two non-empty responses about same revision")
}
@ -100,6 +105,9 @@ func validateMemberWatchResponses(t *testing.T, responses []watchResponse) {
}
lastRevision = resp.Header.Revision
}
if gotProgressNotify != expectProgressNotify {
t.Errorf("Expected progress notify: %v, got: %v", expectProgressNotify, gotProgressNotify)
}
}
func toWatchEvents(responses []watchResponse) (events []watchEvent) {