Merge pull request #15283 from serathius/linearizability-blackhole-refactor

tests: Refactor blackholing and add separate BlackholePeerUntilSnapsh…
dependabot/go_modules/go.uber.org/atomic-1.10.0
Marek Siarkowicz 2023-02-14 11:56:37 +01:00 committed by GitHub
commit 6c439aa9e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 88 additions and 78 deletions

View File

@ -53,7 +53,7 @@ var (
CompactBeforeCommitBatchPanic Failpoint = goPanicFailpoint{"compactBeforeCommitBatch", triggerCompact, AnyMember}
CompactAfterCommitBatchPanic Failpoint = goPanicFailpoint{"compactAfterCommitBatch", triggerCompact, AnyMember}
RaftBeforeLeaderSendPanic Failpoint = goPanicFailpoint{"raftBeforeLeaderSend", nil, Leader}
BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{duration: time.Second}
BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{}
DelayPeerNetwork Failpoint = delayPeerNetworkFailpoint{duration: time.Second, baseLatency: 75 * time.Millisecond, randomizedLatency: 50 * time.Millisecond}
oneNodeClusterFailpoints = []Failpoint{
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic,
@ -241,76 +241,8 @@ func triggerCompact(_ *testing.T, ctx context.Context, member e2e.EtcdProcess, _
return nil
}
// latestRevisionForEndpoint gets latest revision of the first endpoint in Client.Endpoints list
func latestRevisionForEndpoint(ctx context.Context, c *clientv3.Client) (int64, error) {
cntx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
resp, err := c.Status(cntx, c.Endpoints()[0])
if err != nil {
return 0, err
}
return resp.Header.Revision, err
}
func triggerBlackholeUntilSnapshot(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error {
leader := clus.Procs[clus.WaitLeader(t)]
lc, err := clientv3.New(clientv3.Config{
Endpoints: []string{leader.Config().ClientURL},
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
if err != nil {
return err
}
defer lc.Close()
mc, err := clientv3.New(clientv3.Config{
Endpoints: []string{member.Config().ClientURL},
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
if err != nil {
return err
}
defer mc.Close()
proxy := member.PeerProxy()
// Blackholing will cause peers to not be able to use streamWriters registered with member
// but peer traffic is still possible because member has 'pipeline' with peers
// TODO: find a way to stop all traffic
proxy.BlackholeTx()
proxy.BlackholeRx()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Have to refresh revBlackholedMem. It can still increase as member processes changes that are received but not yet applied.
revBlackholedMem, err := latestRevisionForEndpoint(ctx, mc)
if err != nil {
return err
}
revLeader, err := latestRevisionForEndpoint(ctx, lc)
if err != nil {
return err
}
t.Logf("Leader: [%s], Member: [%s], revLeader: %d, revBlackholedMem: %d", leader.Config().Name, member.Config().Name, revLeader, revBlackholedMem)
// Blackholed member has to be sufficiently behind to trigger snapshot transfer.
// Need to make sure leader compacted latest revBlackholedMem inside EtcdServer.snapshot.
// That's why we wait for clus.Cfg.SnapshotCount (to trigger snapshot) + clus.Cfg.SnapshotCatchUpEntries (EtcdServer.snapshot compaction offset)
if revLeader-revBlackholedMem > int64(clus.Cfg.SnapshotCount+clus.Cfg.SnapshotCatchUpEntries) {
break
}
time.Sleep(100 * time.Millisecond)
}
proxy.UnblackholeTx()
proxy.UnblackholeRx()
return nil
return triggerBlackhole(t, ctx, member, clus, true)
}
type randomFailpoint struct {
@ -343,24 +275,102 @@ func (f randomFailpoint) Available(e2e.EtcdProcess) bool {
return true
}
type blackholePeerNetworkFailpoint struct {
duration time.Duration
}
type blackholePeerNetworkFailpoint struct{}
func (f blackholePeerNetworkFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
return triggerBlackhole(t, ctx, member, clus, false)
}
func triggerBlackhole(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error {
proxy := member.PeerProxy()
// Blackholing will cause peers to not be able to use streamWriters registered with member
// but peer traffic is still possible because member has 'pipeline' with peers
// TODO: find a way to stop all traffic
t.Logf("Blackholing traffic from and to member %q", member.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
lg.Info("Blackholing traffic from and to member", zap.String("member", member.Config().Name))
time.Sleep(f.duration)
lg.Info("Traffic restored from and to member", zap.String("member", member.Config().Name))
proxy.UnblackholeTx()
proxy.UnblackholeRx()
defer func() {
t.Logf("Traffic restored from and to member %q", member.Config().Name)
proxy.UnblackholeTx()
proxy.UnblackholeRx()
}()
if shouldWaitTillSnapshot {
return waitTillSnapshot(ctx, t, clus, member)
} else {
time.Sleep(time.Second)
return nil
}
}
func waitTillSnapshot(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, blackholedMember e2e.EtcdProcess) error {
var endpoints []string
for _, ep := range clus.EndpointsV3() {
if ep == blackholedMember.Config().ClientURL {
continue
}
endpoints = append(endpoints, ep)
}
clusterClient, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
if err != nil {
return err
}
defer clusterClient.Close()
blackholedMemberClient, err := clientv3.New(clientv3.Config{
Endpoints: []string{blackholedMember.Config().ClientURL},
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
if err != nil {
return err
}
defer blackholedMemberClient.Close()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Have to refresh blackholedMemberRevision. It can still increase as blackholedMember processes changes that are received but not yet applied.
blackholedMemberRevision, err := latestRevisionForEndpoint(ctx, blackholedMemberClient)
if err != nil {
return err
}
clusterRevision, err := latestRevisionForEndpoint(ctx, clusterClient)
if err != nil {
return err
}
t.Logf("clusterRevision: %d, blackholedMemberRevision: %d", clusterRevision, blackholedMemberRevision)
// Blackholed member has to be sufficiently behind to trigger snapshot transfer.
// Need to make sure leader compacted latest revBlackholedMem inside EtcdServer.snapshot.
// That's why we wait for clus.Cfg.SnapshotCount (to trigger snapshot) + clus.Cfg.SnapshotCatchUpEntries (EtcdServer.snapshot compaction offset)
if clusterRevision-blackholedMemberRevision > int64(clus.Cfg.SnapshotCount+clus.Cfg.SnapshotCatchUpEntries) {
break
}
time.Sleep(100 * time.Millisecond)
}
return nil
}
// latestRevisionForEndpoint gets latest revision of the first endpoint in Client.Endpoints list
func latestRevisionForEndpoint(ctx context.Context, c *clientv3.Client) (int64, error) {
cntx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
resp, err := c.Status(cntx, c.Endpoints()[0])
if err != nil {
return 0, err
}
return resp.Header.Revision, err
}
func (f blackholePeerNetworkFailpoint) Name() string {
return "blackhole"
}