diff --git a/tools/functional-tester/tester/cluster.go b/tools/functional-tester/tester/cluster.go index bb4a13019..f94287515 100644 --- a/tools/functional-tester/tester/cluster.go +++ b/tools/functional-tester/tester/cluster.go @@ -265,11 +265,11 @@ func (clus *Cluster) updateFailures() { for _, cs := range clus.Tester.FailureCases { switch cs { case "KILL_ONE_FOLLOWER": - clus.failures = append(clus.failures, newFailureKillOne()) // TODO + clus.failures = append(clus.failures, newFailureKillOneFollower()) case "KILL_LEADER": clus.failures = append(clus.failures, newFailureKillLeader()) case "KILL_ONE_FOLLOWER_FOR_LONG": - clus.failures = append(clus.failures, newFailureKillOneForLongTime()) // TODO + clus.failures = append(clus.failures, newFailureKillOneFollowerForLongTime()) case "KILL_LEADER_FOR_LONG": clus.failures = append(clus.failures, newFailureKillLeaderForLongTime()) case "KILL_QUORUM": @@ -277,17 +277,17 @@ func (clus *Cluster) updateFailures() { case "KILL_ALL": clus.failures = append(clus.failures, newFailureKillAll()) case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER": - clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOne()) // TODO - case "BLACKHOLE_PEER_PORT_TX_RX_LEADER_ONE": - clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOne()) // TODO + clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollower()) + case "BLACKHOLE_PEER_PORT_TX_RX_LEADER": + clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeader()) case "BLACKHOLE_PEER_PORT_TX_RX_ALL": clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxAll()) case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER": - clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneMember()) // TODO + clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower()) case "DELAY_PEER_PORT_TX_RX_LEADER": - clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader()) // TODO + clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader()) case "DELAY_PEER_PORT_TX_RX_ALL": - clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll()) // TODO + clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll()) case "FAILPOINTS": fpFailures, fperr := failpointFailures(clus) if len(fpFailures) == 0 { diff --git a/tools/functional-tester/tester/cluster_test.go b/tools/functional-tester/tester/cluster_test.go index 82a785621..eab971142 100644 --- a/tools/functional-tester/tester/cluster_test.go +++ b/tools/functional-tester/tester/cluster_test.go @@ -126,7 +126,7 @@ func Test_newCluster(t *testing.T) { "KILL_QUORUM", "KILL_ALL", "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER", - "BLACKHOLE_PEER_PORT_TX_RX_LEADER_ONE", + "BLACKHOLE_PEER_PORT_TX_RX_LEADER", "BLACKHOLE_PEER_PORT_TX_RX_ALL", "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER", "DELAY_PEER_PORT_TX_RX_LEADER", diff --git a/tools/functional-tester/tester/failure.go b/tools/functional-tester/tester/failure.go index 5ff1e856e..b0fbb0b86 100644 --- a/tools/functional-tester/tester/failure.go +++ b/tools/functional-tester/tester/failure.go @@ -14,6 +14,12 @@ package tester +import ( + "fmt" + "math/rand" + "time" +) + // Failure defines failure injection interface. // To add a fail case: // 1. implement "Failure" interface @@ -28,3 +34,173 @@ type Failure interface { // Desc returns a description of the failure Desc() string } + +type description string + +func (d description) Desc() string { return string(d) } + +type injectMemberFunc func(*Cluster, int) error +type recoverMemberFunc func(*Cluster, int) error + +type failureByFunc struct { + description + injectMember injectMemberFunc + recoverMember recoverMemberFunc +} + +type failureFollower struct { + failureByFunc + last int + lead int +} + +type failureLeader struct { + failureByFunc + last int + lead int +} + +func (f *failureFollower) updateIndex(clus *Cluster) error { + idx, err := clus.GetLeader() + if err != nil { + return err + } + f.lead = idx + + n := len(clus.Members) + if f.last == -1 { // first run + f.last = clus.rd % n + if f.last == f.lead { + f.last = (f.last + 1) % n + } + } else { + f.last = (f.last + 1) % n + if f.last == f.lead { + f.last = (f.last + 1) % n + } + } + return nil +} + +func (f *failureLeader) updateIndex(clus *Cluster) error { + idx, err := clus.GetLeader() + if err != nil { + return err + } + f.lead = idx + f.last = idx + return nil +} + +type failureQuorum failureByFunc +type failureAll failureByFunc + +// failureUntilSnapshot injects a failure and waits for a snapshot event +type failureUntilSnapshot struct{ Failure } + +func (f *failureFollower) Inject(clus *Cluster) error { + if err := f.updateIndex(clus); err != nil { + return err + } + return f.injectMember(clus, f.last) +} + +func (f *failureFollower) Recover(clus *Cluster) error { + return f.recoverMember(clus, f.last) +} + +func (f *failureLeader) Inject(clus *Cluster) error { + if err := f.updateIndex(clus); err != nil { + return err + } + return f.injectMember(clus, f.last) +} + +func (f *failureLeader) Recover(clus *Cluster) error { + return f.recoverMember(clus, f.last) +} + +func (f *failureQuorum) Inject(clus *Cluster) error { + for i := range killMap(len(clus.Members), clus.rd) { + if err := f.injectMember(clus, i); err != nil { + return err + } + } + return nil +} + +func (f *failureQuorum) Recover(clus *Cluster) error { + for i := range killMap(len(clus.Members), clus.rd) { + if err := f.recoverMember(clus, i); err != nil { + return err + } + } + return nil +} + +func (f *failureAll) Inject(clus *Cluster) error { + for i := range clus.Members { + if err := f.injectMember(clus, i); err != nil { + return err + } + } + return nil +} + +func (f *failureAll) Recover(clus *Cluster) error { + for i := range clus.Members { + if err := f.recoverMember(clus, i); err != nil { + return err + } + } + return nil +} + +const snapshotCount = 10000 + +func (f *failureUntilSnapshot) Inject(clus *Cluster) error { + if err := f.Failure.Inject(clus); err != nil { + return err + } + if len(clus.Members) < 3 { + return nil + } + // maxRev may fail since failure just injected, retry if failed. + startRev, err := clus.maxRev() + for i := 0; i < 10 && startRev == 0; i++ { + startRev, err = clus.maxRev() + } + if startRev == 0 { + return err + } + lastRev := startRev + // Normal healthy cluster could accept 1000req/s at least. + // Give it 3-times time to create a new snapshot. + retry := snapshotCount / 1000 * 3 + for j := 0; j < retry; j++ { + lastRev, _ = clus.maxRev() + // If the number of proposals committed is bigger than snapshot count, + // a new snapshot should have been created. + if lastRev-startRev > snapshotCount { + return nil + } + time.Sleep(time.Second) + } + return fmt.Errorf("cluster too slow: only commit %d requests in %ds", lastRev-startRev, retry) +} + +func (f *failureUntilSnapshot) Desc() string { + return f.Failure.Desc() + " for a long time and expect it to recover from an incoming snapshot" +} + +func killMap(size int, seed int) map[int]bool { + m := make(map[int]bool) + r := rand.New(rand.NewSource(int64(seed))) + majority := size/2 + 1 + for { + m[r.Intn(size)] = true + if len(m) >= majority { + return m + } + } +} diff --git a/tools/functional-tester/tester/failure_case_failpoints.go b/tools/functional-tester/tester/failure_case_failpoints.go index e6ceaf442..ede60916a 100644 --- a/tools/functional-tester/tester/failure_case_failpoints.go +++ b/tools/functional-tester/tester/failure_case_failpoints.go @@ -83,8 +83,26 @@ func failuresFromFailpoint(fp string, failpointCommands []string) (fs []Failure) for _, fcmd := range failpointCommands { inject := makeInjectFailpoint(fp, fcmd) fs = append(fs, []Failure{ - &failureOne{ - description: description(fmt.Sprintf("failpoint %s (one: %s)", fp, fcmd)), + &failureFollower{ + failureByFunc: failureByFunc{ + description: description(fmt.Sprintf("failpoint %s (one: %s)", fp, fcmd)), + injectMember: inject, + recoverMember: recov, + }, + last: -1, + lead: -1, + }, + &failureLeader{ + failureByFunc: failureByFunc{ + description: description(fmt.Sprintf("failpoint %s (leader: %s)", fp, fcmd)), + injectMember: inject, + recoverMember: recov, + }, + last: -1, + lead: -1, + }, + &failureQuorum{ + description: description(fmt.Sprintf("failpoint %s (quorum: %s)", fp, fcmd)), injectMember: inject, recoverMember: recov, }, @@ -93,19 +111,6 @@ func failuresFromFailpoint(fp string, failpointCommands []string) (fs []Failure) injectMember: inject, recoverMember: recov, }, - &failureQuorum{ - description: description(fmt.Sprintf("failpoint %s (majority: %s)", fp, fcmd)), - injectMember: inject, - recoverMember: recov, - }, - &failureLeader{ - failureByFunc{ - description: description(fmt.Sprintf("failpoint %s (leader: %s)", fp, fcmd)), - injectMember: inject, - recoverMember: recov, - }, - 0, - }, }...) } return fs diff --git a/tools/functional-tester/tester/failure_case_kill.go b/tools/functional-tester/tester/failure_case_kill.go index ddbfab350..d59fddf8f 100644 --- a/tools/functional-tester/tester/failure_case_kill.go +++ b/tools/functional-tester/tester/failure_case_kill.go @@ -14,15 +14,7 @@ package tester -import ( - "fmt" - "math/rand" - "time" - - "github.com/coreos/etcd/tools/functional-tester/rpcpb" -) - -const snapshotCount = 10000 +import "github.com/coreos/etcd/tools/functional-tester/rpcpb" func injectKill(clus *Cluster, idx int) error { return clus.sendOperation(idx, rpcpb.Operation_KillEtcd) @@ -32,182 +24,44 @@ func recoverKill(clus *Cluster, idx int) error { return clus.sendOperation(idx, rpcpb.Operation_RestartEtcd) } -func newFailureKillAll() Failure { - return &failureAll{ - description: "kill all members", - injectMember: injectKill, - recoverMember: recoverKill, - } -} - -func newFailureKillQuorum() Failure { - return &failureQuorum{ - description: "kill quorum of the cluster", - injectMember: injectKill, - recoverMember: recoverKill, - } -} - -func newFailureKillOne() Failure { - return &failureOne{ - description: "kill one random member", +func newFailureKillOneFollower() Failure { + ff := failureByFunc{ + description: "kill one follower", injectMember: injectKill, recoverMember: recoverKill, } + return &failureFollower{ff, -1, -1} } func newFailureKillLeader() Failure { ff := failureByFunc{ - description: "kill leader member", + description: "kill leader", injectMember: injectKill, recoverMember: recoverKill, } - return &failureLeader{ff, 0} + return &failureLeader{ff, -1, -1} } -func newFailureKillOneForLongTime() Failure { - return &failureUntilSnapshot{newFailureKillOne()} +func newFailureKillQuorum() Failure { + return &failureQuorum{ + description: "kill quorum", + injectMember: injectKill, + recoverMember: recoverKill, + } +} + +func newFailureKillAll() Failure { + return &failureAll{ + description: "kill all", + injectMember: injectKill, + recoverMember: recoverKill, + } +} + +func newFailureKillOneFollowerForLongTime() Failure { + return &failureUntilSnapshot{newFailureKillOneFollower()} } func newFailureKillLeaderForLongTime() Failure { return &failureUntilSnapshot{newFailureKillLeader()} } - -type description string - -func (d description) Desc() string { return string(d) } - -type injectMemberFunc func(*Cluster, int) error -type recoverMemberFunc func(*Cluster, int) error - -type failureByFunc struct { - description - injectMember injectMemberFunc - recoverMember recoverMemberFunc -} - -// TODO: support kill follower -type failureOne failureByFunc -type failureAll failureByFunc -type failureQuorum failureByFunc - -type failureLeader struct { - failureByFunc - idx int -} - -// failureUntilSnapshot injects a failure and waits for a snapshot event -type failureUntilSnapshot struct{ Failure } - -func (f *failureOne) Inject(clus *Cluster) error { - return f.injectMember(clus, clus.rd%len(clus.Members)) -} - -func (f *failureOne) Recover(clus *Cluster) error { - if err := f.recoverMember(clus, clus.rd%len(clus.Members)); err != nil { - return err - } - clus.logger.Info("wait health after recovering failureOne") - return clus.WaitHealth() -} - -func (f *failureAll) Inject(clus *Cluster) error { - for i := range clus.Members { - if err := f.injectMember(clus, i); err != nil { - return err - } - } - return nil -} - -func (f *failureAll) Recover(clus *Cluster) error { - for i := range clus.Members { - if err := f.recoverMember(clus, i); err != nil { - return err - } - } - clus.logger.Info("wait health after recovering failureAll") - return clus.WaitHealth() -} - -func (f *failureQuorum) Inject(clus *Cluster) error { - for i := range killMap(len(clus.Members), clus.rd) { - if err := f.injectMember(clus, i); err != nil { - return err - } - } - return nil -} - -func (f *failureQuorum) Recover(clus *Cluster) error { - for i := range killMap(len(clus.Members), clus.rd) { - if err := f.recoverMember(clus, i); err != nil { - return err - } - } - return nil -} - -func (f *failureLeader) Inject(clus *Cluster) error { - idx, err := clus.GetLeader() - if err != nil { - return err - } - f.idx = idx - return f.injectMember(clus, idx) -} - -func (f *failureLeader) Recover(clus *Cluster) error { - if err := f.recoverMember(clus, f.idx); err != nil { - return err - } - clus.logger.Info("wait health after recovering failureLeader") - return clus.WaitHealth() -} - -func (f *failureUntilSnapshot) Inject(clus *Cluster) error { - if err := f.Failure.Inject(clus); err != nil { - return err - } - if len(clus.Members) < 3 { - return nil - } - // maxRev may fail since failure just injected, retry if failed. - startRev, err := clus.maxRev() - for i := 0; i < 10 && startRev == 0; i++ { - startRev, err = clus.maxRev() - } - if startRev == 0 { - return err - } - lastRev := startRev - // Normal healthy cluster could accept 1000req/s at least. - // Give it 3-times time to create a new snapshot. - retry := snapshotCount / 1000 * 3 - for j := 0; j < retry; j++ { - lastRev, _ = clus.maxRev() - // If the number of proposals committed is bigger than snapshot count, - // a new snapshot should have been created. - if lastRev-startRev > snapshotCount { - return nil - } - time.Sleep(time.Second) - } - return fmt.Errorf("cluster too slow: only commit %d requests in %ds", lastRev-startRev, retry) -} - -func (f *failureUntilSnapshot) Desc() string { - return f.Failure.Desc() + " for a long time and expect it to recover from an incoming snapshot" -} - -func killMap(size int, seed int) map[int]bool { - m := make(map[int]bool) - r := rand.New(rand.NewSource(int64(seed))) - majority := size/2 + 1 - for { - m[r.Intn(size)] = true - if len(m) >= majority { - return m - } - } -} diff --git a/tools/functional-tester/tester/failure_case_blackhole.go b/tools/functional-tester/tester/failure_case_network_blackhole.go similarity index 71% rename from tools/functional-tester/tester/failure_case_blackhole.go rename to tools/functional-tester/tester/failure_case_network_blackhole.go index 44e698d04..6951c892f 100644 --- a/tools/functional-tester/tester/failure_case_blackhole.go +++ b/tools/functional-tester/tester/failure_case_network_blackhole.go @@ -24,12 +24,26 @@ func recoverBlackholePeerPortTxRx(clus *Cluster, idx int) error { return clus.sendOperation(idx, rpcpb.Operation_UnblackholePeerPortTxRx) } -func newFailureBlackholePeerPortTxRxOne() Failure { - f := &failureOne{ - description: "blackhole peer port on one member", +func newFailureBlackholePeerPortTxRxOneFollower() Failure { + ff := failureByFunc{ + description: "blackhole peer port on one follower", injectMember: injectBlackholePeerPortTxRx, recoverMember: recoverBlackholePeerPortTxRx, } + f := &failureFollower{ff, -1, -1} + return &failureDelay{ + Failure: f, + delayDuration: triggerElectionDur, + } +} + +func newFailureBlackholePeerPortTxRxLeader() Failure { + ff := failureByFunc{ + description: "blackhole peer port on leader", + injectMember: injectBlackholePeerPortTxRx, + recoverMember: recoverBlackholePeerPortTxRx, + } + f := &failureLeader{ff, -1, -1} return &failureDelay{ Failure: f, delayDuration: triggerElectionDur, @@ -38,7 +52,7 @@ func newFailureBlackholePeerPortTxRxOne() Failure { func newFailureBlackholePeerPortTxRxAll() Failure { f := &failureAll{ - description: "blackhole peer port on all members", + description: "blackhole peer port on all", injectMember: injectBlackholePeerPortTxRx, recoverMember: recoverBlackholePeerPortTxRx, } diff --git a/tools/functional-tester/tester/failure_case_slow_network.go b/tools/functional-tester/tester/failure_case_network_slow.go similarity index 86% rename from tools/functional-tester/tester/failure_case_slow_network.go rename to tools/functional-tester/tester/failure_case_network_slow.go index 5bd932680..3011ba140 100644 --- a/tools/functional-tester/tester/failure_case_slow_network.go +++ b/tools/functional-tester/tester/failure_case_network_slow.go @@ -45,13 +45,14 @@ func recoverDelayPeerPortTxRx(clus *Cluster, idx int) error { return err } -func newFailureDelayPeerPortTxRxOneMember() Failure { - desc := fmt.Sprintf("delay one member's network by adding %d ms latency", slowNetworkLatency) - f := &failureOne{ +func newFailureDelayPeerPortTxRxOneFollower() Failure { + desc := fmt.Sprintf("delay follower peer port by adding %d ms latency", slowNetworkLatency) + ff := failureByFunc{ description: description(desc), injectMember: injectDelayPeerPortTxRx, recoverMember: recoverDelayPeerPortTxRx, } + f := &failureFollower{ff, -1, -1} return &failureDelay{ Failure: f, delayDuration: triggerElectionDur, @@ -59,13 +60,13 @@ func newFailureDelayPeerPortTxRxOneMember() Failure { } func newFailureDelayPeerPortTxRxLeader() Failure { - desc := fmt.Sprintf("delay leader's network by adding %d ms latency", slowNetworkLatency) + desc := fmt.Sprintf("delay leader peer port by adding %d ms latency", slowNetworkLatency) ff := failureByFunc{ description: description(desc), injectMember: injectDelayPeerPortTxRx, recoverMember: recoverDelayPeerPortTxRx, } - f := &failureLeader{ff, 0} + f := &failureLeader{ff, -1, -1} return &failureDelay{ Failure: f, delayDuration: triggerElectionDur, @@ -74,7 +75,7 @@ func newFailureDelayPeerPortTxRxLeader() Failure { func newFailureDelayPeerPortTxRxAll() Failure { f := &failureAll{ - description: "delay all members' network", + description: "delay all peer port", injectMember: injectDelayPeerPortTxRx, recoverMember: recoverDelayPeerPortTxRx, } diff --git a/tools/functional-tester/tester/local-test.yaml b/tools/functional-tester/tester/local-test.yaml index 46ec27460..4104691d4 100644 --- a/tools/functional-tester/tester/local-test.yaml +++ b/tools/functional-tester/tester/local-test.yaml @@ -92,7 +92,7 @@ tester-config: - KILL_QUORUM - KILL_ALL - BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER - - BLACKHOLE_PEER_PORT_TX_RX_LEADER_ONE + - BLACKHOLE_PEER_PORT_TX_RX_LEADER - BLACKHOLE_PEER_PORT_TX_RX_ALL - DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER - DELAY_PEER_PORT_TX_RX_LEADER