Added tests demonstrating POSIX behavior when creating in parallel.
commit
eafe864b8c
|
@ -18,13 +18,19 @@
|
|||
package memfs_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/jacobsa/gcloud/syncutil"
|
||||
. "github.com/jacobsa/oglematchers"
|
||||
. "github.com/jacobsa/ogletest"
|
||||
)
|
||||
|
@ -41,11 +47,234 @@ func getFileOffset(f *os.File) (offset int64, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func runCreateInParallelTest_NoTruncate(
|
||||
ctx context.Context,
|
||||
dir string) {
|
||||
// Ensure that we get parallelism for this test.
|
||||
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU()))
|
||||
|
||||
// Try for awhile to see if anything breaks.
|
||||
const duration = 500 * time.Millisecond
|
||||
startTime := time.Now()
|
||||
for time.Since(startTime) < duration {
|
||||
filename := path.Join(dir, "foo")
|
||||
|
||||
// Set up a function that opens the file with O_CREATE and then appends a
|
||||
// byte to it.
|
||||
worker := func(id byte) (err error) {
|
||||
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Worker %d: Open: %v", id, err)
|
||||
return
|
||||
}
|
||||
|
||||
defer f.Close()
|
||||
|
||||
_, err = f.Write([]byte{id})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Worker %d: Write: %v", id, err)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Run several workers in parallel.
|
||||
const numWorkers = 16
|
||||
b := syncutil.NewBundle(ctx)
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
id := byte(i)
|
||||
b.Add(func(ctx context.Context) (err error) {
|
||||
err = worker(id)
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
err := b.Join()
|
||||
AssertEq(nil, err)
|
||||
|
||||
// Read the contents of the file. We should see each worker's ID once.
|
||||
contents, err := ioutil.ReadFile(filename)
|
||||
AssertEq(nil, err)
|
||||
|
||||
idsSeen := make(map[byte]struct{})
|
||||
for i, _ := range contents {
|
||||
id := contents[i]
|
||||
AssertLt(id, numWorkers)
|
||||
|
||||
if _, ok := idsSeen[id]; ok {
|
||||
AddFailure("Duplicate ID: %d", id)
|
||||
}
|
||||
|
||||
idsSeen[id] = struct{}{}
|
||||
}
|
||||
|
||||
AssertEq(numWorkers, len(idsSeen))
|
||||
|
||||
// Delete the file.
|
||||
err = os.Remove(filename)
|
||||
AssertEq(nil, err)
|
||||
}
|
||||
}
|
||||
|
||||
func runCreateInParallelTest_Truncate(
|
||||
ctx context.Context,
|
||||
dir string) {
|
||||
// Ensure that we get parallelism for this test.
|
||||
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU()))
|
||||
|
||||
// Try for awhile to see if anything breaks.
|
||||
const duration = 500 * time.Millisecond
|
||||
startTime := time.Now()
|
||||
for time.Since(startTime) < duration {
|
||||
filename := path.Join(dir, "foo")
|
||||
|
||||
// Set up a function that opens the file with O_CREATE and O_TRUNC and then
|
||||
// appends a byte to it.
|
||||
worker := func(id byte) (err error) {
|
||||
f, err := os.OpenFile(
|
||||
filename,
|
||||
os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_TRUNC,
|
||||
0600)
|
||||
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Worker %d: Open: %v", id, err)
|
||||
return
|
||||
}
|
||||
|
||||
defer f.Close()
|
||||
|
||||
_, err = f.Write([]byte{id})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Worker %d: Write: %v", id, err)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Run several workers in parallel.
|
||||
const numWorkers = 16
|
||||
b := syncutil.NewBundle(ctx)
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
id := byte(i)
|
||||
b.Add(func(ctx context.Context) (err error) {
|
||||
err = worker(id)
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
err := b.Join()
|
||||
AssertEq(nil, err)
|
||||
|
||||
// Read the contents of the file. We should see at least one ID (the last
|
||||
// one that truncated), and at most all of them.
|
||||
contents, err := ioutil.ReadFile(filename)
|
||||
AssertEq(nil, err)
|
||||
|
||||
idsSeen := make(map[byte]struct{})
|
||||
for i, _ := range contents {
|
||||
id := contents[i]
|
||||
AssertLt(id, numWorkers)
|
||||
|
||||
if _, ok := idsSeen[id]; ok {
|
||||
AddFailure("Duplicate ID: %d", id)
|
||||
}
|
||||
|
||||
idsSeen[id] = struct{}{}
|
||||
}
|
||||
|
||||
AssertGe(len(idsSeen), 1)
|
||||
AssertLe(len(idsSeen), numWorkers)
|
||||
|
||||
// Delete the file.
|
||||
err = os.Remove(filename)
|
||||
AssertEq(nil, err)
|
||||
}
|
||||
}
|
||||
|
||||
func runCreateInParallelTest_Exclusive(
|
||||
ctx context.Context,
|
||||
dir string) {
|
||||
// Ensure that we get parallelism for this test.
|
||||
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU()))
|
||||
|
||||
// Try for awhile to see if anything breaks.
|
||||
const duration = 500 * time.Millisecond
|
||||
startTime := time.Now()
|
||||
for time.Since(startTime) < duration {
|
||||
filename := path.Join(dir, "foo")
|
||||
|
||||
// Set up a function that opens the file with O_CREATE and O_EXCL, and then
|
||||
// appends a byte to it if it was successfully opened.
|
||||
var openCount uint64
|
||||
worker := func(id byte) (err error) {
|
||||
f, err := os.OpenFile(
|
||||
filename,
|
||||
os.O_CREATE|os.O_EXCL|os.O_WRONLY|os.O_APPEND,
|
||||
0600)
|
||||
|
||||
// If we failed to open due to the file already existing, just leave.
|
||||
if os.IsExist(err) {
|
||||
err = nil
|
||||
return
|
||||
}
|
||||
|
||||
// Propgate other errors.
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Worker %d: Open: %v", id, err)
|
||||
return
|
||||
}
|
||||
|
||||
atomic.AddUint64(&openCount, 1)
|
||||
defer f.Close()
|
||||
|
||||
_, err = f.Write([]byte{id})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Worker %d: Write: %v", id, err)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Run several workers in parallel.
|
||||
const numWorkers = 16
|
||||
b := syncutil.NewBundle(ctx)
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
id := byte(i)
|
||||
b.Add(func(ctx context.Context) (err error) {
|
||||
err = worker(id)
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
err := b.Join()
|
||||
AssertEq(nil, err)
|
||||
|
||||
// Exactly one worker should have opened successfully.
|
||||
AssertEq(1, openCount)
|
||||
|
||||
// Read the contents of the file. It should contain that one worker's ID.
|
||||
contents, err := ioutil.ReadFile(filename)
|
||||
AssertEq(nil, err)
|
||||
|
||||
AssertEq(1, len(contents))
|
||||
AssertLt(contents[0], numWorkers)
|
||||
|
||||
// Delete the file.
|
||||
err = os.Remove(filename)
|
||||
AssertEq(nil, err)
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
// Boilerplate
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
|
||||
type PosixTest struct {
|
||||
ctx context.Context
|
||||
|
||||
// A temporary directory.
|
||||
dir string
|
||||
|
||||
|
@ -61,6 +290,8 @@ func init() { RegisterTestSuite(&PosixTest{}) }
|
|||
func (t *PosixTest) SetUp(ti *TestInfo) {
|
||||
var err error
|
||||
|
||||
t.ctx = ti.Ctx
|
||||
|
||||
// Create a temporary directory.
|
||||
t.dir, err = ioutil.TempDir("", "posix_test")
|
||||
if err != nil {
|
||||
|
@ -418,3 +649,15 @@ func (t *PosixTest) RmdirWhileOpenedForReading() {
|
|||
ExpectThat(names, ElementsAre())
|
||||
}
|
||||
}
|
||||
|
||||
func (t *PosixTest) CreateInParallel_NoTruncate() {
|
||||
runCreateInParallelTest_NoTruncate(t.ctx, t.dir)
|
||||
}
|
||||
|
||||
func (t *PosixTest) CreateInParallel_Truncate() {
|
||||
runCreateInParallelTest_Truncate(t.ctx, t.dir)
|
||||
}
|
||||
|
||||
func (t *PosixTest) CreateInParallel_Exclusive() {
|
||||
runCreateInParallelTest_Exclusive(t.ctx, t.dir)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue