diff --git a/pkg/netutil/isolate_linux.go b/pkg/netutil/isolate_linux.go new file mode 100644 index 000000000..1a322553b --- /dev/null +++ b/pkg/netutil/isolate_linux.go @@ -0,0 +1,42 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netutil + +import ( + "fmt" + "os/exec" +) + +// DropPort drops all network packets that are received from the given port and sent to the given port. +func DropPort(port int) error { + cmdStr := fmt.Sprintf("sudo iptables -A OUTPUT --destination-port %d -j DROP", port) + if _, err := exec.Command("/bin/sh", "-c", cmdStr).Output(); err != nil { + return err + } + cmdStr = fmt.Sprintf("sudo iptables -A INPUT --destination-port %d -j DROP", port) + _, err := exec.Command("/bin/sh", "-c", cmdStr).Output() + return err +} + +// RecoverPort stops dropping network packets at given port. +func RecoverPort(port int) error { + cmdStr := fmt.Sprintf("sudo iptables -D OUTPUT --destination-port %d -j DROP", port) + if _, err := exec.Command("/bin/sh", "-c", cmdStr).Output(); err != nil { + return err + } + cmdStr = fmt.Sprintf("sudo iptables -D INPUT --destination-port %d -j DROP", port) + _, err := exec.Command("/bin/sh", "-c", cmdStr).Output() + return err +} diff --git a/pkg/netutil/isolate_stub.go b/pkg/netutil/isolate_stub.go new file mode 100644 index 000000000..c8f70f9b6 --- /dev/null +++ b/pkg/netutil/isolate_stub.go @@ -0,0 +1,21 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !linux + +package netutil + +func DropPort(port int) error { return nil } + +func RecoverPort(port int) error { return nil } diff --git a/tools/functional-tester/etcd-agent/agent.go b/tools/functional-tester/etcd-agent/agent.go index d63143ce2..97ab8cf1a 100644 --- a/tools/functional-tester/etcd-agent/agent.go +++ b/tools/functional-tester/etcd-agent/agent.go @@ -22,6 +22,7 @@ import ( "path" "time" + "github.com/coreos/etcd/pkg/netutil" "github.com/coreos/etcd/tools/functional-tester/etcd-agent/client" ) @@ -132,6 +133,14 @@ func (a *Agent) terminate() error { return nil } +func (a *Agent) dropPort(port int) error { + return netutil.DropPort(port) +} + +func (a *Agent) recoverPort(port int) error { + return netutil.RecoverPort(port) +} + func (a *Agent) status() client.Status { return client.Status{State: a.state} } diff --git a/tools/functional-tester/etcd-agent/client/client.go b/tools/functional-tester/etcd-agent/client/client.go index ea92f99ea..5e9fe7ad4 100644 --- a/tools/functional-tester/etcd-agent/client/client.go +++ b/tools/functional-tester/etcd-agent/client/client.go @@ -34,8 +34,10 @@ type Agent interface { Cleanup() error // Terminate stops the exiting etcd the agent started and removes its data dir. Terminate() error - // Isoloate isolates the network of etcd - Isolate() error + // DropPort drops all network packets at the given port. + DropPort(port int) error + // RecoverPort stops dropping all network packets at the given port. + RecoverPort(port int) error // Status returns the status of etcd on the agent Status() (Status, error) } @@ -83,8 +85,12 @@ func (a *agent) Terminate() error { return a.rpcClient.Call("Agent.RPCTerminate", struct{}{}, nil) } -func (a *agent) Isolate() error { - panic("not implemented") +func (a *agent) DropPort(port int) error { + return a.rpcClient.Call("Agent.RPCDropPort", port, nil) +} + +func (a *agent) RecoverPort(port int) error { + return a.rpcClient.Call("Agent.RPCRecoverPort", port, nil) } func (a *agent) Status() (Status, error) { diff --git a/tools/functional-tester/etcd-agent/rpc.go b/tools/functional-tester/etcd-agent/rpc.go index 2910a13ab..32b857587 100644 --- a/tools/functional-tester/etcd-agent/rpc.go +++ b/tools/functional-tester/etcd-agent/rpc.go @@ -84,8 +84,22 @@ func (a *Agent) RPCTerminate(args struct{}, reply *struct{}) error { return nil } -func (a *Agent) RPCIsolate(args struct{}, reply *struct{}) error { - panic("not implemented") +func (a *Agent) RPCDropPort(port int, reply *struct{}) error { + log.Printf("rpc: drop port %d", port) + err := a.dropPort(port) + if err != nil { + log.Println("rpc: error dropping port", err) + } + return nil +} + +func (a *Agent) RPCRecoverPort(port int, reply *struct{}) error { + log.Printf("rpc: recover port %d", port) + err := a.recoverPort(port) + if err != nil { + log.Println("rpc: error recovering port", err) + } + return nil } func (a *Agent) RPCStatus(args struct{}, status *client.Status) error { diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index 0a5d7c25b..947281857 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -27,6 +27,8 @@ import ( "github.com/coreos/etcd/tools/functional-tester/etcd-agent/client" ) +const peerURLPort = 2380 + type cluster struct { agentEndpoints []string datadir string @@ -76,7 +78,7 @@ func (c *cluster) Bootstrap() error { return err } clientURLs[i] = fmt.Sprintf("http://%s:2379", host) - peerURLs[i] = fmt.Sprintf("http://%s:2380", host) + peerURLs[i] = fmt.Sprintf("http://%s:%d", host, peerURLPort) members[i] = fmt.Sprintf("%s=%s", names[i], peerURLs[i]) } @@ -196,8 +198,10 @@ func setHealthKey(us []string) error { if err != nil { return err } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) kapi := etcdclient.NewKeysAPI(c) - _, err = kapi.Set(context.TODO(), "health", "good", nil) + _, err = kapi.Set(ctx, "health", "good", nil) + cancel() if err != nil { return err } diff --git a/tools/functional-tester/etcd-tester/failure.go b/tools/functional-tester/etcd-tester/failure.go index 79efe4c1b..3f4fb5159 100644 --- a/tools/functional-tester/etcd-tester/failure.go +++ b/tools/functional-tester/etcd-tester/failure.go @@ -172,3 +172,57 @@ func (f *failureKillOneForLongTime) Recover(c *cluster, round int) error { } return c.WaitHealth() } + +type failureIsolate struct { + description +} + +func newFailureIsolate() *failureIsolate { + return &failureIsolate{ + description: "isolate one member", + } +} + +func (f *failureIsolate) Inject(c *cluster, round int) error { + i := round % c.Size + if err := c.Agents[i].DropPort(peerURLPort); err != nil { + return err + } + return nil +} + +func (f *failureIsolate) Recover(c *cluster, round int) error { + i := round % c.Size + if err := c.Agents[i].RecoverPort(peerURLPort); err != nil { + return err + } + return c.WaitHealth() +} + +type failureIsolateAll struct { + description +} + +func newFailureIsolateAll() *failureIsolateAll { + return &failureIsolateAll{ + description: "isolate all members", + } +} + +func (f *failureIsolateAll) Inject(c *cluster, round int) error { + for _, a := range c.Agents { + if err := a.DropPort(peerURLPort); err != nil { + return err + } + } + return nil +} + +func (f *failureIsolateAll) Recover(c *cluster, round int) error { + for _, a := range c.Agents { + if err := a.RecoverPort(peerURLPort); err != nil { + return err + } + } + return c.WaitHealth() +} diff --git a/tools/functional-tester/etcd-tester/main.go b/tools/functional-tester/etcd-tester/main.go index 35856b81f..4af87bf14 100644 --- a/tools/functional-tester/etcd-tester/main.go +++ b/tools/functional-tester/etcd-tester/main.go @@ -40,6 +40,8 @@ func main() { newFailureKillMajority(), newFailureKillOne(), newFailureKillOneForLongTime(), + newFailureIsolate(), + newFailureIsolateAll(), }, cluster: c, limit: *limit, diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go index ba2d49f01..dce7931e0 100644 --- a/tools/functional-tester/etcd-tester/stresser.go +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -61,8 +61,10 @@ func (s *stresser) Stress() error { for i := 0; i < s.N; i++ { go func() { for { + setctx, setcancel := context.WithTimeout(ctx, time.Second) key := fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange)) - _, err := kv.Set(ctx, key, randStr(s.KeySize), nil) + _, err := kv.Set(setctx, key, randStr(s.KeySize), nil) + setcancel() if err == context.Canceled { return }