From 018fb8e6d988c6d25338de1efc9746b220da4813 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 9 Jun 2015 19:26:23 -0700 Subject: [PATCH] pkg/testutil: ForceGosched -> WaitSchedule ForceGosched() performs bad when GOMAXPROCS>1. When GOMAXPROCS=1, it could promise that other goroutines run long enough because it always yield the processor to other goroutines. But it cannot yield processor to goroutine running on other processors. So when GOMAXPROCS>1, the yield may finish when goroutine on the other processor just runs for little time. Here is a test to confirm the case: ``` package main import ( "fmt" "runtime" "testing" ) func ForceGosched() { // possibility enough to sched up to 10 go routines. for i := 0; i < 10000; i++ { runtime.Gosched() } } var d int func loop(c chan struct{}) { for { select { case <-c: for i := 0; i < 1000; i++ { fmt.Sprintf("come to time %d", i) } d++ } } } func TestLoop(t *testing.T) { c := make(chan struct{}, 1) go loop(c) c <- struct{}{} ForceGosched() if d != 1 { t.Fatal("d is not incremented") } } ``` `go test -v -race` runs well, but `GOMAXPROCS=2 go test -v -race` fails. Change the functionality to waiting for schedule to happen. --- etcdserver/server_test.go | 12 ++++++------ pkg/testutil/testutil.go | 12 ++++-------- raft/node_test.go | 4 ++-- rafthttp/pipeline_test.go | 10 +++++----- rafthttp/stream_test.go | 6 +++--- rafthttp/transport_test.go | 2 +- 6 files changed, 21 insertions(+), 25 deletions(-) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 8a922a8d6..73538fefe 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -607,7 +607,7 @@ func TestSync(t *testing.T) { }) srv.sync(10 * time.Second) timer.Stop() - testutil.ForceGosched() + testutil.WaitSchedule() action := n.Action() if len(action) != 1 { @@ -642,7 +642,7 @@ func TestSyncTimeout(t *testing.T) { timer.Stop() // give time for goroutine in sync to cancel - testutil.ForceGosched() + testutil.WaitSchedule() w := []testutil.Action{{Name: "Propose blocked"}} if g := n.Action(); !reflect.DeepEqual(g, w) { t.Errorf("action = %v, want %v", g, w) @@ -676,7 +676,7 @@ func TestSyncTrigger(t *testing.T) { } // trigger a sync request st <- time.Time{} - testutil.ForceGosched() + testutil.WaitSchedule() action := n.Action() if len(action) != 1 { @@ -710,7 +710,7 @@ func TestSnapshot(t *testing.T) { store: st, } srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}}) - testutil.ForceGosched() + testutil.WaitSchedule() gaction := st.Action() if len(gaction) != 2 { t.Fatalf("len(action) = %d, want 1", len(gaction)) @@ -786,7 +786,7 @@ func TestRecvSnapshot(t *testing.T) { s.start() n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}} // make goroutines move forward to receive snapshot - testutil.ForceGosched() + testutil.WaitSchedule() s.Stop() wactions := []testutil.Action{{Name: "Recovery"}} @@ -827,7 +827,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) { }, } // make goroutines move forward to receive snapshot - testutil.ForceGosched() + testutil.WaitSchedule() s.Stop() actions := st.Action() diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go index 33523f54d..2eac84eb2 100644 --- a/pkg/testutil/testutil.go +++ b/pkg/testutil/testutil.go @@ -16,17 +16,13 @@ package testutil import ( "net/url" - "runtime" "testing" + "time" ) -// WARNING: This is a hack. -// Remove this when we are able to block/check the status of the go-routines. -func ForceGosched() { - // possibility enough to sched up to 10 go routines. - for i := 0; i < 10000; i++ { - runtime.Gosched() - } +// TODO: improve this when we are able to know the schedule or status of target go-routine. +func WaitSchedule() { + time.Sleep(3 * time.Millisecond) } func MustNewURLs(t *testing.T, urls []string) []url.URL { diff --git a/raft/node_test.go b/raft/node_test.go index b407136b5..f637971af 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -199,7 +199,7 @@ func TestBlockProposal(t *testing.T) { errc <- n.Propose(context.TODO(), []byte("somedata")) }() - testutil.ForceGosched() + testutil.WaitSchedule() select { case err := <-errc: t.Errorf("err = %v, want blocking", err) @@ -207,7 +207,7 @@ func TestBlockProposal(t *testing.T) { } n.Campaign(context.TODO()) - testutil.ForceGosched() + testutil.WaitSchedule() select { case err := <-errc: if err != nil { diff --git a/rafthttp/pipeline_test.go b/rafthttp/pipeline_test.go index e7da528ec..c8af450da 100644 --- a/rafthttp/pipeline_test.go +++ b/rafthttp/pipeline_test.go @@ -39,7 +39,7 @@ func TestPipelineSend(t *testing.T) { p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil) p.msgc <- raftpb.Message{Type: raftpb.MsgApp} - testutil.ForceGosched() + testutil.WaitSchedule() p.stop() if tr.Request() == nil { @@ -60,7 +60,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) { // keep the sender busy and make the buffer full // nothing can go out as we block the sender - testutil.ForceGosched() + testutil.WaitSchedule() for i := 0; i < connPerPipeline+pipelineBufSize; i++ { select { case p.msgc <- raftpb.Message{}: @@ -68,7 +68,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) { t.Errorf("failed to send out message") } // force the sender to grab data - testutil.ForceGosched() + testutil.WaitSchedule() } // try to send a data when we are sure the buffer is full @@ -80,7 +80,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) { // unblock the senders and force them to send out the data tr.unblock() - testutil.ForceGosched() + testutil.WaitSchedule() // It could send new data after previous ones succeed select { @@ -99,7 +99,7 @@ func TestPipelineSendFailed(t *testing.T) { p := newPipeline(newRespRoundTripper(0, errors.New("blah")), picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil) p.msgc <- raftpb.Message{Type: raftpb.MsgApp} - testutil.ForceGosched() + testutil.WaitSchedule() p.stop() fs.Lock() diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index f36064752..ad5d472cd 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -33,7 +33,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { prevwfc := wfc wfc = &fakeWriteFlushCloser{} sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc}) - testutil.ForceGosched() + testutil.WaitSchedule() // previous attached connection should be closed if prevwfc != nil && prevwfc.closed != true { t.Errorf("#%d: close of previous connection = %v, want true", i, prevwfc.closed) @@ -44,7 +44,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { } sw.msgc <- raftpb.Message{} - testutil.ForceGosched() + testutil.WaitSchedule() // still working if _, ok := sw.writec(); ok != true { t.Errorf("#%d: working status = %v, want true", i, ok) @@ -73,7 +73,7 @@ func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc}) sw.msgc <- raftpb.Message{} - testutil.ForceGosched() + testutil.WaitSchedule() // no longer working if _, ok := sw.writec(); ok != false { t.Errorf("working = %v, want false", ok) diff --git a/rafthttp/transport_test.go b/rafthttp/transport_test.go index c56c48b81..61043c69b 100644 --- a/rafthttp/transport_test.go +++ b/rafthttp/transport_test.go @@ -137,7 +137,7 @@ func TestTransportErrorc(t *testing.T) { } tr.peers[1].Send(raftpb.Message{}) - testutil.ForceGosched() + testutil.WaitSchedule() select { case <-errorc: default: