From 0e609154c4e1b1b902138e190ad05659b65b7a80 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Mon, 9 Apr 2018 10:16:52 -0700 Subject: [PATCH] functional/tester: clean up "broadcastOperation" Signed-off-by: Gyuho Lee --- functional/tester/cluster.go | 48 +++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/functional/tester/cluster.go b/functional/tester/cluster.go index ef2d999c4..8b15c0193 100644 --- a/functional/tester/cluster.go +++ b/functional/tester/cluster.go @@ -24,6 +24,7 @@ import ( "net/url" "path/filepath" "strings" + "sync" "time" "github.com/coreos/etcd/functional/rpcpb" @@ -302,23 +303,40 @@ func (clus *Cluster) Restart() error { } func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error { + var wg sync.WaitGroup + wg.Add(len(clus.agentStreams)) + + errc := make(chan error, len(clus.agentStreams)) for i := range clus.agentStreams { - err := clus.sendOperation(i, op) - if err != nil { - if op == rpcpb.Operation_DestroyEtcdAgent && - strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") { - // agent server has already closed; - // so this error is expected - clus.lg.Info( - "successfully destroyed", - zap.String("member", clus.Members[i].EtcdClientEndpoint), - ) - continue - } - return err - } + go func(idx int, o rpcpb.Operation) { + defer wg.Done() + errc <- clus.sendOperation(idx, o) + }(i, op) } - return nil + wg.Wait() + close(errc) + + errs := []string{} + for err := range errc { + if err == nil { + continue + } + + if err != nil && + op == rpcpb.Operation_DestroyEtcdAgent && + strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") { + // agent server has already closed; + // so this error is expected + clus.lg.Info("successfully destroyed all") + continue + } + errs = append(errs, err.Error()) + } + + if len(errs) == 0 { + return nil + } + return errors.New(strings.Join(errs, ", ")) } func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {