Updated the interruptfs API.

geesefs-0-30-9
Aaron Jacobs 2015-07-29 14:51:10 +10:00
parent 94e31a27b6
commit 0796c46a9c
1 changed files with 80 additions and 26 deletions

View File

@ -39,22 +39,29 @@ var fooAttrs = fuseops.InodeAttributes{
Size: 1234, Size: 1234,
} }
// A file system containing exactly one file, named "foo". Reads to the file // A file system containing exactly one file, named "foo". ReadFile and
// always hang until interrupted. Exposes a method for synchronizing with the // FlushFile ops can be made to hang until interrupted. Exposes a method for
// arrival of a read. // synchronizing with the arrival of a read or a flush.
// //
// Must be created with New. // Must be created with New.
type InterruptFS struct { type InterruptFS struct {
fuseutil.NotImplementedFileSystem fuseutil.NotImplementedFileSystem
mu sync.Mutex mu sync.Mutex
readInFlight bool
readInFlightChanged sync.Cond blockForReads bool // GUARDED_BY(mu)
blockForFlushes bool // GUARDED_BY(mu)
// Must hold the mutex when closing these.
readReceived chan struct{}
flushReceived chan struct{}
} }
func New() (fs *InterruptFS) { func New() (fs *InterruptFS) {
fs = &InterruptFS{} fs = &InterruptFS{
fs.readInFlightChanged.L = &fs.mu readReceived: make(chan struct{}),
flushReceived: make(chan struct{}),
}
return return
} }
@ -64,15 +71,29 @@ func New() (fs *InterruptFS) {
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
// Block until the first read is received. // Block until the first read is received.
// func (fs *InterruptFS) WaitForFirstRead() {
// LOCKS_EXCLUDED(fs.mu) <-fs.readReceived
func (fs *InterruptFS) WaitForReadInFlight() { }
// Block until the first flush is received.
func (fs *InterruptFS) WaitForFirstFlush() {
<-fs.flushReceived
}
// Enable blocking until interrupted for the next (and subsequent) read ops.
func (fs *InterruptFS) EnableReadBlocking() {
fs.mu.Lock() fs.mu.Lock()
defer fs.mu.Unlock() defer fs.mu.Unlock()
for !fs.readInFlight { fs.blockForReads = true
fs.readInFlightChanged.Wait() }
}
// Enable blocking until interrupted for the next (and subsequent) flush ops.
func (fs *InterruptFS) EnableFlushBlocking() {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.blockForFlushes = true
} }
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
@ -128,22 +149,55 @@ func (fs *InterruptFS) OpenFile(
func (fs *InterruptFS) ReadFile( func (fs *InterruptFS) ReadFile(
ctx context.Context, ctx context.Context,
op *fuseops.ReadFileOp) (err error) { op *fuseops.ReadFileOp) (err error) {
// Signal that a read has been received.
fs.mu.Lock() fs.mu.Lock()
fs.readInFlight = true shouldBlock := fs.blockForReads
fs.readInFlightChanged.Broadcast()
// Signal that a read has been received, if this is the first.
select {
case <-fs.readReceived:
default:
close(fs.readReceived)
}
fs.mu.Unlock() fs.mu.Unlock()
// Wait for cancellation. // Wait for cancellation if enabled.
done := ctx.Done() if shouldBlock {
if done == nil { done := ctx.Done()
panic("Expected non-nil channel.") if done == nil {
panic("Expected non-nil channel.")
}
<-done
err = ctx.Err()
}
return
}
func (fs *InterruptFS) FlushFile(
ctx context.Context,
op *fuseops.FlushFileOp) (err error) {
fs.mu.Lock()
shouldBlock := fs.blockForFlushes
// Signal that a flush has been received, if this is the first.
select {
case <-fs.flushReceived:
default:
close(fs.flushReceived)
}
fs.mu.Unlock()
// Wait for cancellation if enabled.
if shouldBlock {
done := ctx.Done()
if done == nil {
panic("Expected non-nil channel.")
}
<-done
err = ctx.Err()
} }
<-done
// Return the context's error.
err = ctx.Err()
return return
} }