diff --git a/samples/memfs/posix_test.go b/samples/memfs/posix_test.go index b9197df..1052814 100644 --- a/samples/memfs/posix_test.go +++ b/samples/memfs/posix_test.go @@ -18,13 +18,19 @@ package memfs_test import ( + "fmt" "io" "io/ioutil" "os" "path" "runtime" + "sync/atomic" "testing" + "time" + "golang.org/x/net/context" + + "github.com/jacobsa/gcloud/syncutil" . "github.com/jacobsa/oglematchers" . "github.com/jacobsa/ogletest" ) @@ -41,11 +47,234 @@ func getFileOffset(f *os.File) (offset int64, err error) { return } +func runCreateInParallelTest_NoTruncate( + ctx context.Context, + dir string) { + // Ensure that we get parallelism for this test. + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU())) + + // Try for awhile to see if anything breaks. + const duration = 500 * time.Millisecond + startTime := time.Now() + for time.Since(startTime) < duration { + filename := path.Join(dir, "foo") + + // Set up a function that opens the file with O_CREATE and then appends a + // byte to it. + worker := func(id byte) (err error) { + f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) + if err != nil { + err = fmt.Errorf("Worker %d: Open: %v", id, err) + return + } + + defer f.Close() + + _, err = f.Write([]byte{id}) + if err != nil { + err = fmt.Errorf("Worker %d: Write: %v", id, err) + return + } + + return + } + + // Run several workers in parallel. + const numWorkers = 16 + b := syncutil.NewBundle(ctx) + for i := 0; i < numWorkers; i++ { + id := byte(i) + b.Add(func(ctx context.Context) (err error) { + err = worker(id) + return + }) + } + + err := b.Join() + AssertEq(nil, err) + + // Read the contents of the file. We should see each worker's ID once. + contents, err := ioutil.ReadFile(filename) + AssertEq(nil, err) + + idsSeen := make(map[byte]struct{}) + for i, _ := range contents { + id := contents[i] + AssertLt(id, numWorkers) + + if _, ok := idsSeen[id]; ok { + AddFailure("Duplicate ID: %d", id) + } + + idsSeen[id] = struct{}{} + } + + AssertEq(numWorkers, len(idsSeen)) + + // Delete the file. + err = os.Remove(filename) + AssertEq(nil, err) + } +} + +func runCreateInParallelTest_Truncate( + ctx context.Context, + dir string) { + // Ensure that we get parallelism for this test. + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU())) + + // Try for awhile to see if anything breaks. + const duration = 500 * time.Millisecond + startTime := time.Now() + for time.Since(startTime) < duration { + filename := path.Join(dir, "foo") + + // Set up a function that opens the file with O_CREATE and O_TRUNC and then + // appends a byte to it. + worker := func(id byte) (err error) { + f, err := os.OpenFile( + filename, + os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_TRUNC, + 0600) + + if err != nil { + err = fmt.Errorf("Worker %d: Open: %v", id, err) + return + } + + defer f.Close() + + _, err = f.Write([]byte{id}) + if err != nil { + err = fmt.Errorf("Worker %d: Write: %v", id, err) + return + } + + return + } + + // Run several workers in parallel. + const numWorkers = 16 + b := syncutil.NewBundle(ctx) + for i := 0; i < numWorkers; i++ { + id := byte(i) + b.Add(func(ctx context.Context) (err error) { + err = worker(id) + return + }) + } + + err := b.Join() + AssertEq(nil, err) + + // Read the contents of the file. We should see at least one ID (the last + // one that truncated), and at most all of them. + contents, err := ioutil.ReadFile(filename) + AssertEq(nil, err) + + idsSeen := make(map[byte]struct{}) + for i, _ := range contents { + id := contents[i] + AssertLt(id, numWorkers) + + if _, ok := idsSeen[id]; ok { + AddFailure("Duplicate ID: %d", id) + } + + idsSeen[id] = struct{}{} + } + + AssertGe(len(idsSeen), 1) + AssertLe(len(idsSeen), numWorkers) + + // Delete the file. + err = os.Remove(filename) + AssertEq(nil, err) + } +} + +func runCreateInParallelTest_Exclusive( + ctx context.Context, + dir string) { + // Ensure that we get parallelism for this test. + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(runtime.NumCPU())) + + // Try for awhile to see if anything breaks. + const duration = 500 * time.Millisecond + startTime := time.Now() + for time.Since(startTime) < duration { + filename := path.Join(dir, "foo") + + // Set up a function that opens the file with O_CREATE and O_EXCL, and then + // appends a byte to it if it was successfully opened. + var openCount uint64 + worker := func(id byte) (err error) { + f, err := os.OpenFile( + filename, + os.O_CREATE|os.O_EXCL|os.O_WRONLY|os.O_APPEND, + 0600) + + // If we failed to open due to the file already existing, just leave. + if os.IsExist(err) { + err = nil + return + } + + // Propgate other errors. + if err != nil { + err = fmt.Errorf("Worker %d: Open: %v", id, err) + return + } + + atomic.AddUint64(&openCount, 1) + defer f.Close() + + _, err = f.Write([]byte{id}) + if err != nil { + err = fmt.Errorf("Worker %d: Write: %v", id, err) + return + } + + return + } + + // Run several workers in parallel. + const numWorkers = 16 + b := syncutil.NewBundle(ctx) + for i := 0; i < numWorkers; i++ { + id := byte(i) + b.Add(func(ctx context.Context) (err error) { + err = worker(id) + return + }) + } + + err := b.Join() + AssertEq(nil, err) + + // Exactly one worker should have opened successfully. + AssertEq(1, openCount) + + // Read the contents of the file. It should contain that one worker's ID. + contents, err := ioutil.ReadFile(filename) + AssertEq(nil, err) + + AssertEq(1, len(contents)) + AssertLt(contents[0], numWorkers) + + // Delete the file. + err = os.Remove(filename) + AssertEq(nil, err) + } +} + //////////////////////////////////////////////////////////////////////// // Boilerplate //////////////////////////////////////////////////////////////////////// type PosixTest struct { + ctx context.Context + // A temporary directory. dir string @@ -61,6 +290,8 @@ func init() { RegisterTestSuite(&PosixTest{}) } func (t *PosixTest) SetUp(ti *TestInfo) { var err error + t.ctx = ti.Ctx + // Create a temporary directory. t.dir, err = ioutil.TempDir("", "posix_test") if err != nil { @@ -418,3 +649,15 @@ func (t *PosixTest) RmdirWhileOpenedForReading() { ExpectThat(names, ElementsAre()) } } + +func (t *PosixTest) CreateInParallel_NoTruncate() { + runCreateInParallelTest_NoTruncate(t.ctx, t.dir) +} + +func (t *PosixTest) CreateInParallel_Truncate() { + runCreateInParallelTest_Truncate(t.ctx, t.dir) +} + +func (t *PosixTest) CreateInParallel_Exclusive() { + runCreateInParallelTest_Exclusive(t.ctx, t.dir) +}