Made "create in parallel" tests reusable.

geesefs-0-30-9
Aaron Jacobs 2015-05-21 15:43:41 +10:00
parent 1e7620a379
commit aa62bac439
2 changed files with 10 additions and 343 deletions

View File

@ -1253,21 +1253,21 @@ func (t *MemFSTest) DeleteSymlink() {
}
func (t *MemFSTest) CreateInParallel_NoTruncate() {
runCreateInParallelTest_NoTruncate(t.Ctx, t.Dir)
fusetesting.RunCreateInParallelTest_NoTruncate(t.Ctx, t.Dir)
}
func (t *MemFSTest) CreateInParallel_Truncate() {
runCreateInParallelTest_Truncate(t.Ctx, t.Dir)
fusetesting.RunCreateInParallelTest_Truncate(t.Ctx, t.Dir)
}
func (t *MemFSTest) CreateInParallel_Exclusive() {
runCreateInParallelTest_Exclusive(t.Ctx, t.Dir)
fusetesting.RunCreateInParallelTest_Exclusive(t.Ctx, t.Dir)
}
func (t *MemFSTest) MkdirInParallel() {
runMkdirInParallelTest(t.Ctx, t.Dir)
fusetesting.RunMkdirInParallelTest(t.Ctx, t.Dir)
}
func (t *MemFSTest) SymlinkInParallel() {
runSymlinkInParallelTest(t.Ctx, t.Dir)
fusetesting.RunSymlinkInParallelTest(t.Ctx, t.Dir)
}

View File

@ -18,20 +18,16 @@
package memfs_test
import (
"fmt"
"io"
"io/ioutil"
"os"
"path"
"runtime"
"sync/atomic"
"testing"
"time"
"golang.org/x/net/context"
"github.com/jacobsa/fuse/fusetesting"
"github.com/jacobsa/gcloud/syncutil"
. "github.com/jacobsa/oglematchers"
. "github.com/jacobsa/ogletest"
)
@ -48,335 +44,6 @@ func getFileOffset(f *os.File) (offset int64, err error) {
return
}
func runCreateInParallelTest_NoTruncate(
ctx context.Context,
dir string) {
// Ensure that we get parallelism for this test.
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU()))
// Try for awhile to see if anything breaks.
const duration = 500 * time.Millisecond
startTime := time.Now()
for time.Since(startTime) < duration {
filename := path.Join(dir, "foo")
// Set up a function that opens the file with O_CREATE and then appends a
// byte to it.
worker := func(id byte) (err error) {
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
err = fmt.Errorf("Worker %d: Open: %v", id, err)
return
}
defer f.Close()
_, err = f.Write([]byte{id})
if err != nil {
err = fmt.Errorf("Worker %d: Write: %v", id, err)
return
}
return
}
// Run several workers in parallel.
const numWorkers = 16
b := syncutil.NewBundle(ctx)
for i := 0; i < numWorkers; i++ {
id := byte(i)
b.Add(func(ctx context.Context) (err error) {
err = worker(id)
return
})
}
err := b.Join()
AssertEq(nil, err)
// Read the contents of the file. We should see each worker's ID once.
contents, err := ioutil.ReadFile(filename)
AssertEq(nil, err)
idsSeen := make(map[byte]struct{})
for i, _ := range contents {
id := contents[i]
AssertLt(id, numWorkers)
if _, ok := idsSeen[id]; ok {
AddFailure("Duplicate ID: %d", id)
}
idsSeen[id] = struct{}{}
}
AssertEq(numWorkers, len(idsSeen))
// Delete the file.
err = os.Remove(filename)
AssertEq(nil, err)
}
}
func runCreateInParallelTest_Truncate(
ctx context.Context,
dir string) {
// Ensure that we get parallelism for this test.
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU()))
// Try for awhile to see if anything breaks.
const duration = 500 * time.Millisecond
startTime := time.Now()
for time.Since(startTime) < duration {
filename := path.Join(dir, "foo")
// Set up a function that opens the file with O_CREATE and O_TRUNC and then
// appends a byte to it.
worker := func(id byte) (err error) {
f, err := os.OpenFile(
filename,
os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_TRUNC,
0600)
if err != nil {
err = fmt.Errorf("Worker %d: Open: %v", id, err)
return
}
defer f.Close()
_, err = f.Write([]byte{id})
if err != nil {
err = fmt.Errorf("Worker %d: Write: %v", id, err)
return
}
return
}
// Run several workers in parallel.
const numWorkers = 16
b := syncutil.NewBundle(ctx)
for i := 0; i < numWorkers; i++ {
id := byte(i)
b.Add(func(ctx context.Context) (err error) {
err = worker(id)
return
})
}
err := b.Join()
AssertEq(nil, err)
// Read the contents of the file. We should see at least one ID (the last
// one that truncated), and at most all of them.
contents, err := ioutil.ReadFile(filename)
AssertEq(nil, err)
idsSeen := make(map[byte]struct{})
for i, _ := range contents {
id := contents[i]
AssertLt(id, numWorkers)
if _, ok := idsSeen[id]; ok {
AddFailure("Duplicate ID: %d", id)
}
idsSeen[id] = struct{}{}
}
AssertGe(len(idsSeen), 1)
AssertLe(len(idsSeen), numWorkers)
// Delete the file.
err = os.Remove(filename)
AssertEq(nil, err)
}
}
func runCreateInParallelTest_Exclusive(
ctx context.Context,
dir string) {
// Ensure that we get parallelism for this test.
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU()))
// Try for awhile to see if anything breaks.
const duration = 500 * time.Millisecond
startTime := time.Now()
for time.Since(startTime) < duration {
filename := path.Join(dir, "foo")
// Set up a function that opens the file with O_CREATE and O_EXCL, and then
// appends a byte to it if it was successfully opened.
var openCount uint64
worker := func(id byte) (err error) {
f, err := os.OpenFile(
filename,
os.O_CREATE|os.O_EXCL|os.O_WRONLY|os.O_APPEND,
0600)
// If we failed to open due to the file already existing, just leave.
if os.IsExist(err) {
err = nil
return
}
// Propgate other errors.
if err != nil {
err = fmt.Errorf("Worker %d: Open: %v", id, err)
return
}
atomic.AddUint64(&openCount, 1)
defer f.Close()
_, err = f.Write([]byte{id})
if err != nil {
err = fmt.Errorf("Worker %d: Write: %v", id, err)
return
}
return
}
// Run several workers in parallel.
const numWorkers = 16
b := syncutil.NewBundle(ctx)
for i := 0; i < numWorkers; i++ {
id := byte(i)
b.Add(func(ctx context.Context) (err error) {
err = worker(id)
return
})
}
err := b.Join()
AssertEq(nil, err)
// Exactly one worker should have opened successfully.
AssertEq(1, openCount)
// Read the contents of the file. It should contain that one worker's ID.
contents, err := ioutil.ReadFile(filename)
AssertEq(nil, err)
AssertEq(1, len(contents))
AssertLt(contents[0], numWorkers)
// Delete the file.
err = os.Remove(filename)
AssertEq(nil, err)
}
}
func runMkdirInParallelTest(
ctx context.Context,
dir string) {
// Ensure that we get parallelism for this test.
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU()))
// Try for awhile to see if anything breaks.
const duration = 500 * time.Millisecond
startTime := time.Now()
for time.Since(startTime) < duration {
filename := path.Join(dir, "foo")
// Set up a function that creates the directory, ignoring EEXIST errors.
worker := func(id byte) (err error) {
err = os.Mkdir(filename, 0700)
if os.IsExist(err) {
err = nil
}
if err != nil {
err = fmt.Errorf("Worker %d: Mkdir: %v", id, err)
return
}
return
}
// Run several workers in parallel.
const numWorkers = 16
b := syncutil.NewBundle(ctx)
for i := 0; i < numWorkers; i++ {
id := byte(i)
b.Add(func(ctx context.Context) (err error) {
err = worker(id)
return
})
}
err := b.Join()
AssertEq(nil, err)
// The directory should have been created, once.
entries, err := fusetesting.ReadDirPicky(dir)
AssertEq(nil, err)
AssertEq(1, len(entries))
AssertEq("foo", entries[0].Name())
// Delete the directory.
err = os.Remove(filename)
AssertEq(nil, err)
}
}
func runSymlinkInParallelTest(
ctx context.Context,
dir string) {
// Ensure that we get parallelism for this test.
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU()))
// Try for awhile to see if anything breaks.
const duration = 500 * time.Millisecond
startTime := time.Now()
for time.Since(startTime) < duration {
filename := path.Join(dir, "foo")
// Set up a function that creates the symlink, ignoring EEXIST errors.
worker := func(id byte) (err error) {
err = os.Symlink("blah", filename)
if os.IsExist(err) {
err = nil
}
if err != nil {
err = fmt.Errorf("Worker %d: Symlink: %v", id, err)
return
}
return
}
// Run several workers in parallel.
const numWorkers = 16
b := syncutil.NewBundle(ctx)
for i := 0; i < numWorkers; i++ {
id := byte(i)
b.Add(func(ctx context.Context) (err error) {
err = worker(id)
return
})
}
err := b.Join()
AssertEq(nil, err)
// The symlink should have been created, once.
entries, err := fusetesting.ReadDirPicky(dir)
AssertEq(nil, err)
AssertEq(1, len(entries))
AssertEq("foo", entries[0].Name())
// Delete the directory.
err = os.Remove(filename)
AssertEq(nil, err)
}
}
////////////////////////////////////////////////////////////////////////
// Boilerplate
////////////////////////////////////////////////////////////////////////
@ -760,21 +427,21 @@ func (t *PosixTest) RmdirWhileOpenedForReading() {
}
func (t *PosixTest) CreateInParallel_NoTruncate() {
runCreateInParallelTest_NoTruncate(t.ctx, t.dir)
fusetesting.RunCreateInParallelTest_NoTruncate(t.ctx, t.dir)
}
func (t *PosixTest) CreateInParallel_Truncate() {
runCreateInParallelTest_Truncate(t.ctx, t.dir)
fusetesting.RunCreateInParallelTest_Truncate(t.ctx, t.dir)
}
func (t *PosixTest) CreateInParallel_Exclusive() {
runCreateInParallelTest_Exclusive(t.ctx, t.dir)
fusetesting.RunCreateInParallelTest_Exclusive(t.ctx, t.dir)
}
func (t *PosixTest) MkdirInParallel() {
runMkdirInParallelTest(t.ctx, t.dir)
fusetesting.RunMkdirInParallelTest(t.ctx, t.dir)
}
func (t *PosixTest) SymlinkInParallel() {
runSymlinkInParallelTest(t.ctx, t.dir)
fusetesting.RunSymlinkInParallelTest(t.ctx, t.dir)
}