diff --git a/samples/interruptfs/interrupt_fs.go b/samples/interruptfs/interrupt_fs.go index 3208ab3..f6724bb 100644 --- a/samples/interruptfs/interrupt_fs.go +++ b/samples/interruptfs/interrupt_fs.go @@ -39,22 +39,29 @@ var fooAttrs = fuseops.InodeAttributes{ Size: 1234, } -// A file system containing exactly one file, named "foo". Reads to the file -// always hang until interrupted. Exposes a method for synchronizing with the -// arrival of a read. +// A file system containing exactly one file, named "foo". ReadFile and +// FlushFile ops can be made to hang until interrupted. Exposes a method for +// synchronizing with the arrival of a read or a flush. // // Must be created with New. type InterruptFS struct { fuseutil.NotImplementedFileSystem - mu sync.Mutex - readInFlight bool - readInFlightChanged sync.Cond + mu sync.Mutex + + blockForReads bool // GUARDED_BY(mu) + blockForFlushes bool // GUARDED_BY(mu) + + // Must hold the mutex when closing these. + readReceived chan struct{} + flushReceived chan struct{} } func New() (fs *InterruptFS) { - fs = &InterruptFS{} - fs.readInFlightChanged.L = &fs.mu + fs = &InterruptFS{ + readReceived: make(chan struct{}), + flushReceived: make(chan struct{}), + } return } @@ -64,15 +71,29 @@ func New() (fs *InterruptFS) { //////////////////////////////////////////////////////////////////////// // Block until the first read is received. -// -// LOCKS_EXCLUDED(fs.mu) -func (fs *InterruptFS) WaitForReadInFlight() { +func (fs *InterruptFS) WaitForFirstRead() { + <-fs.readReceived +} + +// Block until the first flush is received. +func (fs *InterruptFS) WaitForFirstFlush() { + <-fs.flushReceived +} + +// Enable blocking until interrupted for the next (and subsequent) read ops. +func (fs *InterruptFS) EnableReadBlocking() { fs.mu.Lock() defer fs.mu.Unlock() - for !fs.readInFlight { - fs.readInFlightChanged.Wait() - } + fs.blockForReads = true +} + +// Enable blocking until interrupted for the next (and subsequent) flush ops. +func (fs *InterruptFS) EnableFlushBlocking() { + fs.mu.Lock() + defer fs.mu.Unlock() + + fs.blockForFlushes = true } //////////////////////////////////////////////////////////////////////// @@ -128,22 +149,55 @@ func (fs *InterruptFS) OpenFile( func (fs *InterruptFS) ReadFile( ctx context.Context, op *fuseops.ReadFileOp) (err error) { - // Signal that a read has been received. fs.mu.Lock() - fs.readInFlight = true - fs.readInFlightChanged.Broadcast() + shouldBlock := fs.blockForReads + + // Signal that a read has been received, if this is the first. + select { + case <-fs.readReceived: + default: + close(fs.readReceived) + } fs.mu.Unlock() - // Wait for cancellation. - done := ctx.Done() - if done == nil { - panic("Expected non-nil channel.") + // Wait for cancellation if enabled. + if shouldBlock { + done := ctx.Done() + if done == nil { + panic("Expected non-nil channel.") + } + + <-done + err = ctx.Err() + } + + return +} + +func (fs *InterruptFS) FlushFile( + ctx context.Context, + op *fuseops.FlushFileOp) (err error) { + fs.mu.Lock() + shouldBlock := fs.blockForFlushes + + // Signal that a flush has been received, if this is the first. + select { + case <-fs.flushReceived: + default: + close(fs.flushReceived) + } + fs.mu.Unlock() + + // Wait for cancellation if enabled. + if shouldBlock { + done := ctx.Done() + if done == nil { + panic("Expected non-nil channel.") + } + + <-done + err = ctx.Err() } - <-done - - // Return the context's error. - err = ctx.Err() - return } diff --git a/samples/interruptfs/interrupt_fs_test.go b/samples/interruptfs/interrupt_fs_test.go index 5cf7f41..089e2da 100644 --- a/samples/interruptfs/interrupt_fs_test.go +++ b/samples/interruptfs/interrupt_fs_test.go @@ -73,6 +73,7 @@ func (t *InterruptFSTest) StatFoo() { func (t *InterruptFSTest) InterruptedDuringRead() { var err error + t.fs.EnableReadBlocking() // Start a sub-process that attempts to read the file. cmd := exec.Command("cat", path.Join(t.Dir, "foo")) @@ -92,7 +93,7 @@ func (t *InterruptFSTest) InterruptedDuringRead() { }() // Wait for the read to make it to the file system. - t.fs.WaitForReadInFlight() + t.fs.WaitForFirstRead() // The command should be hanging on the read, and not yet have returned. select { @@ -111,3 +112,45 @@ func (t *InterruptFSTest) InterruptedDuringRead() { ExpectThat(err, Error(HasSubstr("signal"))) ExpectThat(err, Error(HasSubstr("interrupt"))) } + +func (t *InterruptFSTest) InterruptedDuringFlush() { + var err error + t.fs.EnableFlushBlocking() + + // Start a sub-process that attempts to read the file. + cmd := exec.Command("cat", path.Join(t.Dir, "foo")) + + var cmdOutput bytes.Buffer + cmd.Stdout = &cmdOutput + cmd.Stderr = &cmdOutput + + err = cmd.Start() + AssertEq(nil, err) + + // Wait for the command in the background, writing to a channel when it is + // finished. + cmdErr := make(chan error) + go func() { + cmdErr <- cmd.Wait() + }() + + // Wait for the flush to make it to the file system. + t.fs.WaitForFirstFlush() + + // The command should be hanging on the flush, and not yet have returned. + select { + case err = <-cmdErr: + AddFailure("Command returned early with error: %v", err) + AbortTest() + + case <-time.After(10 * time.Millisecond): + } + + // Send SIGINT. + cmd.Process.Signal(os.Interrupt) + + // Now the command should return, with an appropriate error. + err = <-cmdErr + ExpectThat(err, Error(HasSubstr("signal"))) + ExpectThat(err, Error(HasSubstr("interrupt"))) +}