functional/tester: clean up "broadcastOperation"

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
release-3.4
Gyuho Lee 2018-04-09 10:16:52 -07:00
parent cafa3b9217
commit 0e609154c4
1 changed files with 33 additions and 15 deletions

View File

@ -24,6 +24,7 @@ import (
"net/url"
"path/filepath"
"strings"
"sync"
"time"
"github.com/coreos/etcd/functional/rpcpb"
@ -302,23 +303,40 @@ func (clus *Cluster) Restart() error {
}
func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error {
var wg sync.WaitGroup
wg.Add(len(clus.agentStreams))
errc := make(chan error, len(clus.agentStreams))
for i := range clus.agentStreams {
err := clus.sendOperation(i, op)
if err != nil {
if op == rpcpb.Operation_DestroyEtcdAgent &&
strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") {
// agent server has already closed;
// so this error is expected
clus.lg.Info(
"successfully destroyed",
zap.String("member", clus.Members[i].EtcdClientEndpoint),
)
continue
}
return err
}
go func(idx int, o rpcpb.Operation) {
defer wg.Done()
errc <- clus.sendOperation(idx, o)
}(i, op)
}
return nil
wg.Wait()
close(errc)
errs := []string{}
for err := range errc {
if err == nil {
continue
}
if err != nil &&
op == rpcpb.Operation_DestroyEtcdAgent &&
strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") {
// agent server has already closed;
// so this error is expected
clus.lg.Info("successfully destroyed all")
continue
}
errs = append(errs, err.Error())
}
if len(errs) == 0 {
return nil
}
return errors.New(strings.Join(errs, ", "))
}
func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {