diff --git a/connection.go b/connection.go index 1c92dfb..3bfdb22 100644 --- a/connection.go +++ b/connection.go @@ -16,6 +16,7 @@ package fuse import ( "log" + "sync" "github.com/jacobsa/bazilfuse" "github.com/jacobsa/fuse/fuseops" @@ -23,8 +24,9 @@ import ( // A connection to the fuse kernel process. type Connection struct { - logger *log.Logger - wrapped *bazilfuse.Conn + logger *log.Logger + wrapped *bazilfuse.Conn + opsInFlight sync.WaitGroup } // Responsibility for closing the wrapped connection is transferred to the @@ -72,12 +74,13 @@ func (c *Connection) ReadOp() (op fuseops.Op, err error) { } // Convert it, if possible. - if op = fuseops.Convert(bfReq, c.logger); op == nil { + if op = fuseops.Convert(bfReq, c.logger, &c.opsInFlight); op == nil { c.logger.Printf("Returning ENOSYS for unknown bazilfuse request: %v", bfReq) bfReq.RespondError(ENOSYS) continue } + c.opsInFlight.Add(1) return } } @@ -88,7 +91,9 @@ func (c *Connection) waitForReady() (err error) { return } +// Close the connection and wait for in-flight ops. func (c *Connection) close() (err error) { err = c.wrapped.Close() + c.opsInFlight.Wait() return } diff --git a/fuseops/convert.go b/fuseops/convert.go index 738321d..843c903 100644 --- a/fuseops/convert.go +++ b/fuseops/convert.go @@ -20,6 +20,7 @@ package fuseops import ( "log" "reflect" + "sync" "time" "github.com/jacobsa/bazilfuse" @@ -31,7 +32,10 @@ import ( // // This function is an implementation detail of the fuse package, and must not // be called by anyone else. -func Convert(r bazilfuse.Request, logger *log.Logger) (o Op) { +func Convert( + r bazilfuse.Request, + logger *log.Logger, + opsInFlight *sync.WaitGroup) (o Op) { var co *commonOp switch typed := r.(type) { @@ -214,7 +218,7 @@ func Convert(r bazilfuse.Request, logger *log.Logger) (o Op) { return } - co.init(reflect.TypeOf(o).String(), r, logger) + co.init(reflect.TypeOf(o).String(), r, logger, opsInFlight) return } @@ -264,20 +268,23 @@ func convertChildInodeEntry( // A helper for embedding common behavior. type commonOp struct { - opType string - ctx context.Context - r bazilfuse.Request - logger *log.Logger + opType string + ctx context.Context + r bazilfuse.Request + logger *log.Logger + opsInFlight *sync.WaitGroup } func (o *commonOp) init( opType string, r bazilfuse.Request, - logger *log.Logger) { + logger *log.Logger, + opsInFlight *sync.WaitGroup) { o.opType = opType o.ctx = context.Background() o.r = r o.logger = logger + o.opsInFlight = opsInFlight } func (o *commonOp) Header() OpHeader { diff --git a/fuseops/ops.go b/fuseops/ops.go index ea7af98..4f4e38b 100644 --- a/fuseops/ops.go +++ b/fuseops/ops.go @@ -53,6 +53,8 @@ type InitOp struct { } func (o *InitOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -135,6 +137,8 @@ type LookUpInodeOp struct { } func (o *LookUpInodeOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -165,6 +169,8 @@ type GetInodeAttributesOp struct { } func (o *GetInodeAttributesOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -203,6 +209,8 @@ type SetInodeAttributesOp struct { } func (o *SetInodeAttributesOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -267,6 +275,8 @@ type ForgetInodeOp struct { } func (o *ForgetInodeOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -305,6 +315,8 @@ type MkDirOp struct { } func (o *MkDirOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -361,6 +373,8 @@ type CreateFileOp struct { } func (o *CreateFileOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -398,6 +412,8 @@ type RmDirOp struct { } func (o *RmDirOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -422,6 +438,8 @@ type UnlinkOp struct { } func (o *UnlinkOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -462,6 +480,8 @@ type OpenDirOp struct { } func (o *OpenDirOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -566,6 +586,8 @@ type ReadDirOp struct { } func (o *ReadDirOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -597,6 +619,8 @@ type ReleaseDirHandleOp struct { } func (o *ReleaseDirHandleOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -636,6 +660,8 @@ type OpenFileOp struct { } func (o *OpenFileOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -679,6 +705,8 @@ type ReadFileOp struct { } func (o *ReadFileOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -763,6 +791,8 @@ type WriteFileOp struct { } func (o *WriteFileOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -801,6 +831,8 @@ type SyncFileOp struct { } func (o *SyncFileOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -866,6 +898,8 @@ type FlushFileOp struct { } func (o *FlushFileOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return @@ -893,6 +927,8 @@ type ReleaseFileHandleOp struct { } func (o *ReleaseFileHandleOp) Respond(err error) { + defer o.commonOp.opsInFlight.Done() + if err != nil { o.commonOp.respondErr(err) return diff --git a/mounted_file_system.go b/mounted_file_system.go index db3e9e1..6772793 100644 --- a/mounted_file_system.go +++ b/mounted_file_system.go @@ -44,9 +44,12 @@ func (mfs *MountedFileSystem) Dir() string { return mfs.dir } -// Block until a mounted file system has been unmounted. The return value will -// be non-nil if anything unexpected happened while serving. May be called -// multiple times. +// Block until a mounted file system has been unmounted. Do not return +// successfully until all ops read from the connection have been responded to +// (i.e. the file system server has finished processing all in-flight ops). +// +// The return value will be non-nil if anything unexpected happened while +// serving. May be called multiple times. func (mfs *MountedFileSystem) Join(ctx context.Context) error { select { case <-mfs.joinStatusAvailable: diff --git a/samples/in_process.go b/samples/in_process.go index 2fc2ecf..ae78210 100644 --- a/samples/in_process.go +++ b/samples/in_process.go @@ -129,5 +129,12 @@ func (t *SampleTest) destroy() (err error) { return } + // Join the file system. + err = t.mfs.Join(t.Ctx) + if err != nil { + err = fmt.Errorf("mfs.Join: %v", err) + return + } + return }