functional: remove SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT command

Problem: both SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT and test.sh
will attempt to stop agents and remove directories.

Solution: since test.sh creates directories and starts test, it should be
responsible for cleanup.

See https://github.com/etcd-io/etcd/issues/14384

Signed-off-by: Bogdan Kanivets <bkanivets@apple.com>
dependabot/go_modules/go.uber.org/atomic-1.10.0
Bogdan Kanivets 2022-08-24 11:00:24 -07:00
parent a3f15645d8
commit c31d758189
7 changed files with 18 additions and 106 deletions

View File

@ -161,8 +161,6 @@ function functional_pass {
# TODO: These ports should be dynamically allocated instead of hard-coded.
for a in 1 2 3; do
./bin/etcd-agent --network tcp --address 127.0.0.1:${a}9027 < /dev/null &
pid="$!"
agent_pids="${agent_pids} $pid"
done
for a in 1 2 3; do
@ -172,26 +170,22 @@ function functional_pass {
done
done
trap killall_functional_test INT
trap killall_functional_test 0
log_callout "functional test START!"
run ./bin/etcd-tester --config ./tests/functional/functional.yaml -test.v && log_success "'etcd-tester' succeeded"
local etcd_tester_exit_code=$?
# shellcheck disable=SC2206
agent_pids=($agent_pids)
kill -s TERM "${agent_pids[@]}" || true
if [[ "${etcd_tester_exit_code}" -ne "0" ]]; then
log_error "ETCD_TESTER_EXIT_CODE:" ${etcd_tester_exit_code}
log_error -e "\\nFAILED! 'tail -1000 /tmp/etcd-functional-1/etcd.log'"
log_error -e "\\nFAILED! 'tail -100 /tmp/etcd-functional-1/etcd.log'"
tail -100 /tmp/etcd-functional-1/etcd.log
log_error -e "\\nFAILED! 'tail -1000 /tmp/etcd-functional-2/etcd.log'"
log_error -e "\\nFAILED! 'tail -100 /tmp/etcd-functional-2/etcd.log'"
tail -100 /tmp/etcd-functional-2/etcd.log
log_error -e "\\nFAILED! 'tail -1000 /tmp/etcd-functional-3/etcd.log'"
log_error -e "\\nFAILED! 'tail -100 /tmp/etcd-functional-3/etcd.log'"
tail -100 /tmp/etcd-functional-3/etcd.log
log_error "--- FAIL: exit code" ${etcd_tester_exit_code}

View File

@ -65,8 +65,6 @@ func (srv *Server) handleTesterRequest(req *rpcpb.Request) (resp *rpcpb.Response
case rpcpb.Operation_SIGQUIT_ETCD_AND_ARCHIVE_DATA:
return srv.handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA()
case rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT:
return srv.handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT()
case rpcpb.Operation_BLACKHOLE_PEER_PORT_TX_RX:
return srv.handle_BLACKHOLE_PEER_PORT_TX_RX(), nil
@ -639,33 +637,6 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, erro
}, nil
}
// stop proxy, etcd, delete data directory
func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() (*rpcpb.Response, error) {
if err := srv.stopEtcd(syscall.SIGQUIT); err != nil {
return nil, err
}
if srv.etcdServer != nil {
srv.etcdServer.GetLogger().Sync()
} else {
srv.etcdLogFile.Sync()
srv.etcdLogFile.Close()
}
if err := os.RemoveAll(srv.Member.BaseDir); err != nil {
return nil, err
}
srv.lg.Info("removed base directory", zap.String("dir", srv.Member.BaseDir))
// stop agent server
srv.Stop()
return &rpcpb.Response{
Success: true,
Status: "destroyed etcd and agent",
}, nil
}
func (srv *Server) handle_BLACKHOLE_PEER_PORT_TX_RX() *rpcpb.Response {
for port, px := range srv.advertisePeerPortToProxy {
srv.lg.Info("blackholing", zap.Int("peer-port", port))

View File

@ -41,7 +41,6 @@ func TestFunctional(t *testing.T) {
if err = clus.Send_INITIAL_START_ETCD(); err != nil {
t.Fatal("Bootstrap failed", zap.Error(err))
}
defer clus.Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT()
t.Log("wait health after bootstrap")
if err = clus.WaitHealth(); err != nil {

View File

@ -147,9 +147,6 @@ const (
// SIGQUIT_ETCD_AND_ARCHIVE_DATA is sent when consistency check failed,
// thus need to archive etcd data directories.
Operation_SIGQUIT_ETCD_AND_ARCHIVE_DATA Operation = 40
// SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT destroys etcd process,
// etcd data, and agent server.
Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT Operation = 41
// BLACKHOLE_PEER_PORT_TX_RX drops all outgoing/incoming packets from/to
// the peer port on target member's peer port.
Operation_BLACKHOLE_PEER_PORT_TX_RX Operation = 100
@ -172,7 +169,6 @@ var Operation_name = map[int32]string{
31: "RESTORE_RESTART_FROM_SNAPSHOT",
32: "RESTART_FROM_SNAPSHOT",
40: "SIGQUIT_ETCD_AND_ARCHIVE_DATA",
41: "SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT",
100: "BLACKHOLE_PEER_PORT_TX_RX",
101: "UNBLACKHOLE_PEER_PORT_TX_RX",
200: "DELAY_PEER_PORT_TX_RX",
@ -180,20 +176,19 @@ var Operation_name = map[int32]string{
}
var Operation_value = map[string]int32{
"NOT_STARTED": 0,
"INITIAL_START_ETCD": 10,
"RESTART_ETCD": 11,
"SIGTERM_ETCD": 20,
"SIGQUIT_ETCD_AND_REMOVE_DATA": 21,
"SAVE_SNAPSHOT": 30,
"RESTORE_RESTART_FROM_SNAPSHOT": 31,
"RESTART_FROM_SNAPSHOT": 32,
"SIGQUIT_ETCD_AND_ARCHIVE_DATA": 40,
"SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT": 41,
"BLACKHOLE_PEER_PORT_TX_RX": 100,
"UNBLACKHOLE_PEER_PORT_TX_RX": 101,
"DELAY_PEER_PORT_TX_RX": 200,
"UNDELAY_PEER_PORT_TX_RX": 201,
"NOT_STARTED": 0,
"INITIAL_START_ETCD": 10,
"RESTART_ETCD": 11,
"SIGTERM_ETCD": 20,
"SIGQUIT_ETCD_AND_REMOVE_DATA": 21,
"SAVE_SNAPSHOT": 30,
"RESTORE_RESTART_FROM_SNAPSHOT": 31,
"RESTART_FROM_SNAPSHOT": 32,
"SIGQUIT_ETCD_AND_ARCHIVE_DATA": 40,
"BLACKHOLE_PEER_PORT_TX_RX": 100,
"UNBLACKHOLE_PEER_PORT_TX_RX": 101,
"DELAY_PEER_PORT_TX_RX": 200,
"UNDELAY_PEER_PORT_TX_RX": 201,
}
func (x Operation) String() string {

View File

@ -276,9 +276,6 @@ enum Operation {
// SIGQUIT_ETCD_AND_ARCHIVE_DATA is sent when consistency check failed,
// thus need to archive etcd data directories.
SIGQUIT_ETCD_AND_ARCHIVE_DATA = 40;
// SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT destroys etcd process,
// etcd data, and agent server.
SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT = 41;
// BLACKHOLE_PEER_PORT_TX_RX drops all outgoing/incoming packets from/to
// the peer port on target member's peer port.

View File

@ -424,28 +424,7 @@ func (clus *Cluster) broadcast(op rpcpb.Operation) error {
if err == nil {
continue
}
if err != nil {
destroyed := false
if op == rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT {
if err == io.EOF {
destroyed = true
}
if strings.Contains(err.Error(),
"rpc error: code = Unavailable desc = transport is closing") {
// agent server has already closed;
// so this error is expected
destroyed = true
}
if strings.Contains(err.Error(),
"desc = os: process already finished") {
destroyed = true
}
}
if !destroyed {
errs = append(errs, err.Error())
}
}
errs = append(errs, err.Error())
}
if len(errs) == 0 {
@ -578,28 +557,6 @@ func (clus *Cluster) sendOpWithResp(idx int, op rpcpb.Operation) (*rpcpb.Respons
return resp, nil
}
// Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT terminates all tester connections to agents and etcd servers.
func (clus *Cluster) Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() {
err := clus.broadcast(rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT)
if err != nil {
clus.lg.Warn("destroying etcd/agents FAIL", zap.Error(err))
} else {
clus.lg.Info("destroying etcd/agents PASS")
}
for i, conn := range clus.agentConns {
err := conn.Close()
clus.lg.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err))
}
if clus.testerHTTPServer != nil {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
err := clus.testerHTTPServer.Shutdown(ctx)
cancel()
clus.lg.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.Addr), zap.Error(err))
}
}
// WaitHealth ensures all members are healthy
// by writing a test key to etcd cluster.
func (clus *Cluster) WaitHealth() error {

View File

@ -341,7 +341,6 @@ func (clus *Cluster) failed(err error) {
zap.Int("case-total", len(clus.cases)),
zap.Error(err),
)
clus.Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT()
os.Exit(2)
}