Merge pull request #12362 from ptabor/20201001-deflake-unit-race

Fix "race" - auth unit tests leaking goroutines
release-3.5
Gyuho Lee 2020-10-04 20:47:52 -07:00 committed by GitHub
commit fdb3f89730
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 164 additions and 97 deletions

View File

@ -125,7 +125,7 @@ script:
sudo HOST_TMP_DIR=/tmp TEST_OPTS="VERBOSE='1'" make docker-test-coverage
;;
linux-amd64-fmt-unit-go-tip-2-cpu)
GOARCH=amd64 PASSES='fmt unit' 'CPU=2' ./test -p=2
GOARCH=amd64 PASSES='fmt unit' CPU='2' RACE='false' ./test -p=2
;;
linux-386-unit-1-cpu)
docker run --rm \

15
auth/main_test.go Normal file
View File

@ -0,0 +1,15 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package auth
import (
"testing"
"go.etcd.io/etcd/v3/pkg/testutil"
)
func TestMain(m *testing.M) {
testutil.MustTestMainWithLeakDetection(m)
}

View File

@ -36,6 +36,7 @@ const (
)
// var for testing purposes
// TODO: Remove this mutable global state - as it's race-prone.
var (
simpleTokenTTLDefault = 300 * time.Second
simpleTokenTTLResolution = 1 * time.Second

View File

@ -50,6 +50,7 @@ func TestSimpleTokenDisabled(t *testing.T) {
func TestSimpleTokenAssign(t *testing.T) {
tp := newTokenProviderSimple(zap.NewExample(), dummyIndexWaiter, simpleTokenTTLDefault)
tp.enable()
defer tp.disable()
ctx := context.WithValue(context.WithValue(context.TODO(), AuthenticateParamIndex{}, uint64(1)), AuthenticateParamSimpleTokenPrefix{}, "dummy")
token, err := tp.assign(ctx, "user1", 0)
if err != nil {

View File

@ -64,10 +64,10 @@ func TestNewAuthStoreRevision(t *testing.T) {
// no changes to commit
b2 := backend.NewDefaultBackend(tPath)
defer b2.Close()
as = NewAuthStore(zap.NewExample(), b2, nil, tp, bcrypt.MinCost)
defer as.Close()
new := as.Revision()
as.Close()
b2.Close()
if old != new {
t.Fatalf("expected revision %d, got %d", old, new)
@ -77,6 +77,7 @@ func TestNewAuthStoreRevision(t *testing.T) {
// TestNewAuthStoreBryptCost ensures that NewAuthStore uses default when given bcrypt-cost is invalid
func TestNewAuthStoreBcryptCost(t *testing.T) {
b, tPath := backend.NewDefaultTmpBackend()
defer b.Close()
defer os.Remove(tPath)
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
@ -87,13 +88,11 @@ func TestNewAuthStoreBcryptCost(t *testing.T) {
invalidCosts := [2]int{bcrypt.MinCost - 1, bcrypt.MaxCost + 1}
for _, invalidCost := range invalidCosts {
as := NewAuthStore(zap.NewExample(), b, nil, tp, invalidCost)
defer as.Close()
if as.BcryptCost() != bcrypt.DefaultCost {
t.Fatalf("expected DefaultCost when bcryptcost is invalid")
}
as.Close()
}
b.Close()
}
func encodePassword(s string) string {
@ -175,6 +174,7 @@ func TestUserAdd(t *testing.T) {
func TestRecover(t *testing.T) {
as, tearDown := setupAuthStore(t)
defer as.Close()
defer tearDown(t)
as.enabled = false
@ -654,6 +654,7 @@ func TestIsAuthEnabled(t *testing.T) {
// TestAuthRevisionRace ensures that access to authStore.revision is thread-safe.
func TestAuthInfoFromCtxRace(t *testing.T) {
b, tPath := backend.NewDefaultTmpBackend()
defer b.Close()
defer os.Remove(tPath)
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
@ -709,7 +710,8 @@ func TestIsAdminPermitted(t *testing.T) {
}
func TestRecoverFromSnapshot(t *testing.T) {
as, _ := setupAuthStore(t)
as, teardown := setupAuthStore(t)
defer teardown(t)
ua := &pb.AuthUserAddRequest{Name: "foo", Options: &authpb.UserAddOptions{NoPassword: false}}
_, err := as.UserAdd(ua) // add an existing user
@ -733,9 +735,7 @@ func TestRecoverFromSnapshot(t *testing.T) {
t.Fatal(err)
}
as2 := NewAuthStore(zap.NewExample(), as.be, nil, tp, bcrypt.MinCost)
defer func(a *authStore) {
a.Close()
}(as2)
defer as2.Close()
if !as2.IsAuthEnabled() {
t.Fatal("recovering authStore from existing backend failed")
@ -808,13 +808,16 @@ func TestHammerSimpleAuthenticate(t *testing.T) {
// TestRolesOrder tests authpb.User.Roles is sorted
func TestRolesOrder(t *testing.T) {
b, tPath := backend.NewDefaultTmpBackend()
defer b.Close()
defer os.Remove(tPath)
tp, err := NewTokenProvider(zap.NewExample(), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
defer tp.disable()
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
defer as.Close()
err = enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
@ -863,6 +866,7 @@ func TestAuthInfoFromCtxWithRootJWT(t *testing.T) {
// testAuthInfoFromCtxWithRoot ensures "WithRoot" properly embeds token in the context.
func testAuthInfoFromCtxWithRoot(t *testing.T, opts string) {
b, tPath := backend.NewDefaultTmpBackend()
defer b.Close()
defer os.Remove(tPath)
tp, err := NewTokenProvider(zap.NewExample(), opts, dummyIndexWaiter, simpleTokenTTLDefault)

View File

@ -5,16 +5,11 @@
package integration
import (
"os"
"testing"
"go.etcd.io/etcd/v3/pkg/testutil"
)
func TestMain(m *testing.M) {
v := m.Run()
if v == 0 && testutil.CheckLeakedGoroutine() {
os.Exit(1)
}
os.Exit(v)
testutil.MustTestMainWithLeakDetection(m)
}

View File

@ -5,16 +5,11 @@
package integration
import (
"os"
"testing"
"go.etcd.io/etcd/v3/pkg/testutil"
)
func TestMain(m *testing.M) {
v := m.Run()
if v == 0 && testutil.CheckLeakedGoroutine() {
os.Exit(1)
}
os.Exit(v)
testutil.MustTestMainWithLeakDetection(m)
}

View File

@ -43,21 +43,27 @@ func TestV3ClientMetrics(t *testing.T) {
ln net.Listener
)
// listen for all Prometheus metrics
srv := &http.Server{Handler: promhttp.Handler()}
srv.SetKeepAlivesEnabled(false)
ln, err := transport.NewUnixListener(addr)
if err != nil {
t.Errorf("Error: %v occurred while listening on addr: %v", err, addr)
}
donec := make(chan struct{})
defer func() {
ln.Close()
<-donec
}()
// listen for all Prometheus metrics
go func() {
var err error
defer close(donec)
srv := &http.Server{Handler: promhttp.Handler()}
srv.SetKeepAlivesEnabled(false)
ln, err = transport.NewUnixListener(addr)
if err != nil {
t.Errorf("Error: %v occurred while listening on addr: %v", err, addr)
}
err = srv.Serve(ln)
if err != nil && !transport.IsClosedConnError(err) {
t.Errorf("Err serving http requests: %v", err)
@ -88,7 +94,7 @@ func TestV3ClientMetrics(t *testing.T) {
pBefore := sumCountersForMetricAndLabels(t, url, "grpc_client_started_total", "Put", "unary")
_, err := cli.Put(context.Background(), "foo", "bar")
_, err = cli.Put(context.Background(), "foo", "bar")
if err != nil {
t.Errorf("Error putting value in key store")
}
@ -109,9 +115,6 @@ func TestV3ClientMetrics(t *testing.T) {
if wBefore+1 != wAfter {
t.Errorf("grpc_client_msg_received_total expected %d, got %d", 1, wAfter-wBefore)
}
ln.Close()
<-donec
}
func sumCountersForMetricAndLabels(t *testing.T, url string, metricName string, matchingLabelValues ...string) int {

View File

@ -264,13 +264,13 @@ func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
streams: make(map[grpc.ServerStream]struct{}),
}
go func() {
s.GoAttach(func() {
election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
noLeaderCnt := 0
for {
select {
case <-s.StopNotify():
case <-s.StoppingNotify():
return
case <-time.After(election):
if s.Leader() == types.ID(raft.None) {
@ -295,7 +295,7 @@ func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
}
}
}
}()
})
return smap
}

View File

@ -180,7 +180,7 @@ func (s *EtcdServer) checkHashKV() error {
Action: pb.AlarmRequest_ACTIVATE,
Alarm: pb.AlarmType_CORRUPT,
}
s.goAttach(func() {
s.GoAttach(func() {
s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
})
}

View File

@ -698,13 +698,13 @@ func (s *EtcdServer) adjustTicks() {
// should be implemented in goroutines.
func (s *EtcdServer) Start() {
s.start()
s.goAttach(func() { s.adjustTicks() })
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.goAttach(s.purgeFile)
s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) })
s.goAttach(s.monitorVersions)
s.goAttach(s.linearizableReadLoop)
s.goAttach(s.monitorKVHash)
s.GoAttach(func() { s.adjustTicks() })
s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.GoAttach(s.purgeFile)
s.GoAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) })
s.GoAttach(s.monitorVersions)
s.GoAttach(s.linearizableReadLoop)
s.GoAttach(s.monitorKVHash)
}
// start prepares and starts server in a new goroutine. It is no longer safe to
@ -939,7 +939,7 @@ func (s *EtcdServer) run() {
}
defer func() {
s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping
s.wgMu.Lock() // block concurrent waitgroup adds in GoAttach while stopping
close(s.stopping)
s.wgMu.Unlock()
s.cancel()
@ -986,7 +986,7 @@ func (s *EtcdServer) run() {
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
case leases := <-expiredLeaseC:
s.goAttach(func() {
s.GoAttach(func() {
// Increases throughput of expired leases deletion process through parallelization
c := make(chan struct{}, maxPendingRevokes)
for _, lease := range leases {
@ -996,7 +996,7 @@ func (s *EtcdServer) run() {
return
}
lid := lease.ID
s.goAttach(func() {
s.GoAttach(func() {
ctx := s.authStore.WithRoot(s.ctx)
_, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
if lerr == nil {
@ -1347,6 +1347,10 @@ func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
// when the server is stopped.
func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
// StoppingNotify returns a channel that receives a empty struct
// when the server is being stopped.
func (s *EtcdServer) StoppingNotify() <-chan struct{} { return s.stopping }
func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
func (s *EtcdServer) LeaderStats() []byte {
@ -1767,7 +1771,7 @@ func (s *EtcdServer) sync(timeout time.Duration) {
// There is no promise that node has leader when do SYNC request,
// so it uses goroutine to propose.
ctx, cancel := context.WithTimeout(s.ctx, timeout)
s.goAttach(func() {
s.GoAttach(func() {
s.r.Propose(ctx, data)
cancel()
})
@ -1908,7 +1912,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
s.r.transport.SendSnapshot(merged)
lg.Info("sending merged snapshot", fields...)
s.goAttach(func() {
s.GoAttach(func() {
select {
case ok := <-merged.CloseNotify():
// delay releasing inflight snapshot for another 30 seconds to
@ -2051,7 +2055,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
zap.Error(ar.err),
)
s.goAttach(func() {
s.GoAttach(func() {
a := &pb.AlarmRequest{
MemberID: uint64(s.ID()),
Action: pb.AlarmRequest_ACTIVATE,
@ -2144,7 +2148,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
// the go routine created below.
s.KV().Commit()
s.goAttach(func() {
s.GoAttach(func() {
lg := s.getLogger()
d, err := clone.SaveNoCopy()
@ -2268,12 +2272,12 @@ func (s *EtcdServer) monitorVersions() {
if v != nil {
verStr = v.String()
}
s.goAttach(func() { s.updateClusterVersion(verStr) })
s.GoAttach(func() { s.updateClusterVersion(verStr) })
continue
}
if v != nil && membership.IsValidVersionChange(s.cluster.Version(), v) {
s.goAttach(func() { s.updateClusterVersion(v.String()) })
s.GoAttach(func() { s.updateClusterVersion(v.String()) })
}
}
}
@ -2372,15 +2376,16 @@ func (s *EtcdServer) restoreAlarms() error {
return nil
}
// goAttach creates a goroutine on a given function and tracks it using
// GoAttach creates a goroutine on a given function and tracks it using
// the etcdserver waitgroup.
func (s *EtcdServer) goAttach(f func()) {
// The passed function should interrupt on s.StoppingNotify().
func (s *EtcdServer) GoAttach(f func()) {
s.wgMu.RLock() // this blocks with ongoing close(s.stopping)
defer s.wgMu.RUnlock()
select {
case <-s.stopping:
lg := s.getLogger()
lg.Warn("server has stopped; skipping goAttach")
lg.Warn("server has stopped; skipping GoAttach")
return
default:
}

View File

@ -429,9 +429,11 @@ func (c *cluster) waitMembersMatch(t testing.TB, membs []client.Member) {
}
}
// WaitLeader returns index of the member in c.Members that is leader (or -1).
func (c *cluster) WaitLeader(t testing.TB) int { return c.waitLeader(t, c.Members) }
// waitLeader waits until given members agree on the same leader.
// waitLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list (or -1).
func (c *cluster) waitLeader(t testing.TB, membs []*member) int {
possibleLead := make(map[uint64]bool)
var lead uint64

View File

@ -5,16 +5,11 @@
package integration
import (
"os"
"testing"
"go.etcd.io/etcd/v3/pkg/testutil"
)
func TestMain(m *testing.M) {
v := m.Run()
if v == 0 && testutil.CheckLeakedGoroutine() {
os.Exit(1)
}
os.Exit(v)
testutil.MustTestMainWithLeakDetection(m)
}

View File

@ -23,6 +23,28 @@ import (
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
)
// MustFetchNotEmptyMetric attempts to fetch given 'metric' from 'member',
// waiting for not-empty value or 'timeout'.
func MustFetchNotEmptyMetric(tb testing.TB, member *member, metric string, timeout <-chan time.Time) string {
metricValue := ""
tick := time.Tick(tickDuration)
for metricValue == "" {
tb.Logf("Waiting for metric: %v", metric)
select {
case <-timeout:
tb.Fatalf("Failed to fetch metric %v", metric)
return ""
case <-tick:
var err error
metricValue, err = member.Metric(metric)
if err != nil {
tb.Fatal(err)
}
}
}
return metricValue
}
// TestV3WatchRestoreSnapshotUnsync tests whether slow follower can restore
// from leader snapshot, and still notify on watchers from an old revision
// that were created in synced watcher group in the first place.
@ -55,8 +77,11 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
}
clus.Members[0].InjectPartition(t, clus.Members[1:]...)
clus.waitLeader(t, clus.Members[1:])
initialLead := clus.waitLeader(t, clus.Members[1:])
t.Logf("elected lead: %v", clus.Members[initialLead].s.ID())
t.Logf("sleeping for 2 seconds")
time.Sleep(2 * time.Second)
t.Logf("sleeping for 2 seconds DONE")
kvc := toGRPC(clus.Client(1)).KV
@ -71,26 +96,32 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
// trigger snapshot send from leader to this slow follower
// which then calls watchable store Restore
clus.Members[0].RecoverPartition(t, clus.Members[1:]...)
// We don't expect leadership change here, just recompute the leader's index
// within clus.Members list.
lead := clus.WaitLeader(t)
sends, err := clus.Members[lead].Metric("etcd_network_snapshot_send_inflights_total")
if err != nil {
t.Fatal(err)
}
if sends != "0" && sends != "1" {
// Sending is scheduled on fifo 'sched' within EtcdServer::run,
// so it can start delayed after recovery.
send := MustFetchNotEmptyMetric(t, clus.Members[lead],
"etcd_network_snapshot_send_inflights_total",
time.After(5*time.Second))
if send != "0" && send != "1" {
// 0 if already sent, 1 if sending
t.Fatalf("inflight snapshot sends expected 0 or 1, got %q", sends)
}
receives, err := clus.Members[(lead+1)%3].Metric("etcd_network_snapshot_receive_inflights_total")
if err != nil {
t.Fatal(err)
t.Fatalf("inflight snapshot snapshot_send_inflights_total expected 0 or 1, got %q", send)
}
receives := MustFetchNotEmptyMetric(t, clus.Members[(lead+1)%3],
"etcd_network_snapshot_receive_inflights_total",
time.After(5*time.Second))
if receives != "0" && receives != "1" {
// 0 if already received, 1 if receiving
t.Fatalf("inflight snapshot receives expected 0 or 1, got %q", receives)
}
t.Logf("sleeping for 2 seconds")
time.Sleep(2 * time.Second)
t.Logf("sleeping for 2 seconds DONE")
// slow follower now applies leader snapshot
// should be able to notify on old-revision watchers in unsynced

View File

@ -333,8 +333,15 @@ func (s *store) Restore(b backend.Backend) error {
s.b = b
s.kvindex = newTreeIndex(s.lg)
s.currentRev = 1
s.compactMainRev = -1
{
// During restore the metrics might report 'special' values
s.revMu.Lock()
s.currentRev = 1
s.compactMainRev = -1
s.revMu.Unlock()
}
s.fifoSched = schedule.NewFIFOScheduler()
s.stopc = make(chan struct{})
s.ci.SetBatchTx(b.BatchTx())
@ -358,6 +365,7 @@ func (s *store) restore() error {
_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
s.revMu.Lock()
s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
s.lg.Info(
@ -366,6 +374,7 @@ func (s *store) restore() error {
zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
zap.Int64("restored-compact-revision", s.compactMainRev),
)
s.revMu.Unlock()
}
_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
@ -394,14 +403,20 @@ func (s *store) restore() error {
revToBytes(newMin, min)
}
close(rkvc)
s.currentRev = <-revc
// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
// the correct revision should be set to compaction revision in the case, not the largest revision
// we have seen.
if s.currentRev < s.compactMainRev {
s.currentRev = s.compactMainRev
{
s.revMu.Lock()
s.currentRev = <-revc
// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
// the correct revision should be set to compaction revision in the case, not the largest revision
// we have seen.
if s.currentRev < s.compactMainRev {
s.currentRev = s.compactMainRev
}
s.revMu.Unlock()
}
if scheduledCompact <= s.compactMainRev {
scheduledCompact = 0
}

View File

@ -24,11 +24,7 @@ running(leaking) after all tests.
import "go.etcd.io/etcd/v3/pkg/testutil"
func TestMain(m *testing.M) {
v := m.Run()
if v == 0 && testutil.CheckLeakedGoroutine() {
os.Exit(1)
}
os.Exit(v)
testutil.MustTestMainWithLeakDetection(m)
}
func TestSample(t *testing.T) {
@ -38,10 +34,6 @@ running(leaking) after all tests.
*/
func CheckLeakedGoroutine() bool {
if testing.Short() {
// not counting goroutines for leakage in -short mode
return false
}
gs := interestingGoroutines()
if len(gs) == 0 {
return false
@ -66,9 +58,6 @@ func CheckLeakedGoroutine() bool {
// Waits for go-routines shutdown for 'd'.
func CheckAfterTest(d time.Duration) error {
http.DefaultTransport.(*http.Transport).CloseIdleConnections()
if testing.Short() {
return nil
}
var bad string
badSubstring := map[string]string{
").writeLoop(": "a Transport",
@ -140,3 +129,19 @@ func interestingGoroutines() (gs []string) {
sort.Strings(gs)
return gs
}
// MustTestMainWithLeakDetection expands standard m.Run with leaked
// goroutines detection.
func MustTestMainWithLeakDetection(m *testing.M) {
v := m.Run()
http.DefaultTransport.(*http.Transport).CloseIdleConnections()
// Let the other goroutines finalize.
runtime.Gosched()
if v == 0 && CheckLeakedGoroutine() {
os.Exit(1)
}
os.Exit(v)
}

View File

@ -72,7 +72,7 @@ function run {
log_callout "% ${repro}"
"${@}"
local error_code=$?
if [ ${error_code} != 0 ]; then
if [ ${error_code} -ne 0 ]; then
log_error -e "FAIL: (code:${error_code}):\n % ${repro}"
return ${error_code}
fi