clientv3/integration: Fix leaked goroutine in case of skipped test.

release-3.5
Piotr Tabor 2020-10-01 17:56:28 +02:00
parent 528f5315d6
commit 220f711a2a
4 changed files with 44 additions and 36 deletions

View File

@ -43,21 +43,27 @@ func TestV3ClientMetrics(t *testing.T) {
ln net.Listener
)
// listen for all Prometheus metrics
srv := &http.Server{Handler: promhttp.Handler()}
srv.SetKeepAlivesEnabled(false)
ln, err := transport.NewUnixListener(addr)
if err != nil {
t.Errorf("Error: %v occurred while listening on addr: %v", err, addr)
}
donec := make(chan struct{})
defer func() {
ln.Close()
<-donec
}()
// listen for all Prometheus metrics
go func() {
var err error
defer close(donec)
srv := &http.Server{Handler: promhttp.Handler()}
srv.SetKeepAlivesEnabled(false)
ln, err = transport.NewUnixListener(addr)
if err != nil {
t.Errorf("Error: %v occurred while listening on addr: %v", err, addr)
}
err = srv.Serve(ln)
if err != nil && !transport.IsClosedConnError(err) {
t.Errorf("Err serving http requests: %v", err)
@ -88,7 +94,7 @@ func TestV3ClientMetrics(t *testing.T) {
pBefore := sumCountersForMetricAndLabels(t, url, "grpc_client_started_total", "Put", "unary")
_, err := cli.Put(context.Background(), "foo", "bar")
_, err = cli.Put(context.Background(), "foo", "bar")
if err != nil {
t.Errorf("Error putting value in key store")
}
@ -109,9 +115,6 @@ func TestV3ClientMetrics(t *testing.T) {
if wBefore+1 != wAfter {
t.Errorf("grpc_client_msg_received_total expected %d, got %d", 1, wAfter-wBefore)
}
ln.Close()
<-donec
}
func sumCountersForMetricAndLabels(t *testing.T, url string, metricName string, matchingLabelValues ...string) int {

View File

@ -264,13 +264,13 @@ func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
streams: make(map[grpc.ServerStream]struct{}),
}
go func() {
s.GoAttach(func() {
election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
noLeaderCnt := 0
for {
select {
case <-s.StopNotify():
case <-s.StoppingNotify():
return
case <-time.After(election):
if s.Leader() == types.ID(raft.None) {
@ -295,7 +295,7 @@ func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
}
}
}
}()
})
return smap
}

View File

@ -180,7 +180,7 @@ func (s *EtcdServer) checkHashKV() error {
Action: pb.AlarmRequest_ACTIVATE,
Alarm: pb.AlarmType_CORRUPT,
}
s.goAttach(func() {
s.GoAttach(func() {
s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
})
}

View File

@ -698,13 +698,13 @@ func (s *EtcdServer) adjustTicks() {
// should be implemented in goroutines.
func (s *EtcdServer) Start() {
s.start()
s.goAttach(func() { s.adjustTicks() })
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.goAttach(s.purgeFile)
s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) })
s.goAttach(s.monitorVersions)
s.goAttach(s.linearizableReadLoop)
s.goAttach(s.monitorKVHash)
s.GoAttach(func() { s.adjustTicks() })
s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.GoAttach(s.purgeFile)
s.GoAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) })
s.GoAttach(s.monitorVersions)
s.GoAttach(s.linearizableReadLoop)
s.GoAttach(s.monitorKVHash)
}
// start prepares and starts server in a new goroutine. It is no longer safe to
@ -939,7 +939,7 @@ func (s *EtcdServer) run() {
}
defer func() {
s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping
s.wgMu.Lock() // block concurrent waitgroup adds in GoAttach while stopping
close(s.stopping)
s.wgMu.Unlock()
s.cancel()
@ -986,7 +986,7 @@ func (s *EtcdServer) run() {
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
case leases := <-expiredLeaseC:
s.goAttach(func() {
s.GoAttach(func() {
// Increases throughput of expired leases deletion process through parallelization
c := make(chan struct{}, maxPendingRevokes)
for _, lease := range leases {
@ -996,7 +996,7 @@ func (s *EtcdServer) run() {
return
}
lid := lease.ID
s.goAttach(func() {
s.GoAttach(func() {
ctx := s.authStore.WithRoot(s.ctx)
_, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
if lerr == nil {
@ -1347,6 +1347,10 @@ func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
// when the server is stopped.
func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
// StoppingNotify returns a channel that receives a empty struct
// when the server is being stopped.
func (s *EtcdServer) StoppingNotify() <-chan struct{} { return s.stopping }
func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
func (s *EtcdServer) LeaderStats() []byte {
@ -1767,7 +1771,7 @@ func (s *EtcdServer) sync(timeout time.Duration) {
// There is no promise that node has leader when do SYNC request,
// so it uses goroutine to propose.
ctx, cancel := context.WithTimeout(s.ctx, timeout)
s.goAttach(func() {
s.GoAttach(func() {
s.r.Propose(ctx, data)
cancel()
})
@ -1908,7 +1912,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
s.r.transport.SendSnapshot(merged)
lg.Info("sending merged snapshot", fields...)
s.goAttach(func() {
s.GoAttach(func() {
select {
case ok := <-merged.CloseNotify():
// delay releasing inflight snapshot for another 30 seconds to
@ -2051,7 +2055,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
zap.Error(ar.err),
)
s.goAttach(func() {
s.GoAttach(func() {
a := &pb.AlarmRequest{
MemberID: uint64(s.ID()),
Action: pb.AlarmRequest_ACTIVATE,
@ -2144,7 +2148,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
// the go routine created below.
s.KV().Commit()
s.goAttach(func() {
s.GoAttach(func() {
lg := s.getLogger()
d, err := clone.SaveNoCopy()
@ -2268,12 +2272,12 @@ func (s *EtcdServer) monitorVersions() {
if v != nil {
verStr = v.String()
}
s.goAttach(func() { s.updateClusterVersion(verStr) })
s.GoAttach(func() { s.updateClusterVersion(verStr) })
continue
}
if v != nil && membership.IsValidVersionChange(s.cluster.Version(), v) {
s.goAttach(func() { s.updateClusterVersion(v.String()) })
s.GoAttach(func() { s.updateClusterVersion(v.String()) })
}
}
}
@ -2372,15 +2376,16 @@ func (s *EtcdServer) restoreAlarms() error {
return nil
}
// goAttach creates a goroutine on a given function and tracks it using
// GoAttach creates a goroutine on a given function and tracks it using
// the etcdserver waitgroup.
func (s *EtcdServer) goAttach(f func()) {
// The passed function should interrupt on s.StoppingNotify().
func (s *EtcdServer) GoAttach(f func()) {
s.wgMu.RLock() // this blocks with ongoing close(s.stopping)
defer s.wgMu.RUnlock()
select {
case <-s.stopping:
lg := s.getLogger()
lg.Warn("server has stopped; skipping goAttach")
lg.Warn("server has stopped; skipping GoAttach")
return
default:
}