Synchronize on ops in flight in tear-down, fixing forgetfs_test.
parent
0461ddac4f
commit
6305cb930b
|
@ -16,6 +16,7 @@ package fuse
|
|||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/jacobsa/bazilfuse"
|
||||
"github.com/jacobsa/fuse/fuseops"
|
||||
|
@ -23,8 +24,9 @@ import (
|
|||
|
||||
// A connection to the fuse kernel process.
|
||||
type Connection struct {
|
||||
logger *log.Logger
|
||||
wrapped *bazilfuse.Conn
|
||||
logger *log.Logger
|
||||
wrapped *bazilfuse.Conn
|
||||
opsInFlight sync.WaitGroup
|
||||
}
|
||||
|
||||
// Responsibility for closing the wrapped connection is transferred to the
|
||||
|
@ -72,12 +74,13 @@ func (c *Connection) ReadOp() (op fuseops.Op, err error) {
|
|||
}
|
||||
|
||||
// Convert it, if possible.
|
||||
if op = fuseops.Convert(bfReq, c.logger); op == nil {
|
||||
if op = fuseops.Convert(bfReq, c.logger, &c.opsInFlight); op == nil {
|
||||
c.logger.Printf("Returning ENOSYS for unknown bazilfuse request: %v", bfReq)
|
||||
bfReq.RespondError(ENOSYS)
|
||||
continue
|
||||
}
|
||||
|
||||
c.opsInFlight.Add(1)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -88,7 +91,9 @@ func (c *Connection) waitForReady() (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// Close the connection and wait for in-flight ops.
|
||||
func (c *Connection) close() (err error) {
|
||||
err = c.wrapped.Close()
|
||||
c.opsInFlight.Wait()
|
||||
return
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package fuseops
|
|||
import (
|
||||
"log"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jacobsa/bazilfuse"
|
||||
|
@ -31,7 +32,10 @@ import (
|
|||
//
|
||||
// This function is an implementation detail of the fuse package, and must not
|
||||
// be called by anyone else.
|
||||
func Convert(r bazilfuse.Request, logger *log.Logger) (o Op) {
|
||||
func Convert(
|
||||
r bazilfuse.Request,
|
||||
logger *log.Logger,
|
||||
opsInFlight *sync.WaitGroup) (o Op) {
|
||||
var co *commonOp
|
||||
|
||||
switch typed := r.(type) {
|
||||
|
@ -214,7 +218,7 @@ func Convert(r bazilfuse.Request, logger *log.Logger) (o Op) {
|
|||
return
|
||||
}
|
||||
|
||||
co.init(reflect.TypeOf(o).String(), r, logger)
|
||||
co.init(reflect.TypeOf(o).String(), r, logger, opsInFlight)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -264,20 +268,23 @@ func convertChildInodeEntry(
|
|||
|
||||
// A helper for embedding common behavior.
|
||||
type commonOp struct {
|
||||
opType string
|
||||
ctx context.Context
|
||||
r bazilfuse.Request
|
||||
logger *log.Logger
|
||||
opType string
|
||||
ctx context.Context
|
||||
r bazilfuse.Request
|
||||
logger *log.Logger
|
||||
opsInFlight *sync.WaitGroup
|
||||
}
|
||||
|
||||
func (o *commonOp) init(
|
||||
opType string,
|
||||
r bazilfuse.Request,
|
||||
logger *log.Logger) {
|
||||
logger *log.Logger,
|
||||
opsInFlight *sync.WaitGroup) {
|
||||
o.opType = opType
|
||||
o.ctx = context.Background()
|
||||
o.r = r
|
||||
o.logger = logger
|
||||
o.opsInFlight = opsInFlight
|
||||
}
|
||||
|
||||
func (o *commonOp) Header() OpHeader {
|
||||
|
|
|
@ -53,6 +53,8 @@ type InitOp struct {
|
|||
}
|
||||
|
||||
func (o *InitOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -135,6 +137,8 @@ type LookUpInodeOp struct {
|
|||
}
|
||||
|
||||
func (o *LookUpInodeOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -165,6 +169,8 @@ type GetInodeAttributesOp struct {
|
|||
}
|
||||
|
||||
func (o *GetInodeAttributesOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -203,6 +209,8 @@ type SetInodeAttributesOp struct {
|
|||
}
|
||||
|
||||
func (o *SetInodeAttributesOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -267,6 +275,8 @@ type ForgetInodeOp struct {
|
|||
}
|
||||
|
||||
func (o *ForgetInodeOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -305,6 +315,8 @@ type MkDirOp struct {
|
|||
}
|
||||
|
||||
func (o *MkDirOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -361,6 +373,8 @@ type CreateFileOp struct {
|
|||
}
|
||||
|
||||
func (o *CreateFileOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -398,6 +412,8 @@ type RmDirOp struct {
|
|||
}
|
||||
|
||||
func (o *RmDirOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -422,6 +438,8 @@ type UnlinkOp struct {
|
|||
}
|
||||
|
||||
func (o *UnlinkOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -462,6 +480,8 @@ type OpenDirOp struct {
|
|||
}
|
||||
|
||||
func (o *OpenDirOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -566,6 +586,8 @@ type ReadDirOp struct {
|
|||
}
|
||||
|
||||
func (o *ReadDirOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -597,6 +619,8 @@ type ReleaseDirHandleOp struct {
|
|||
}
|
||||
|
||||
func (o *ReleaseDirHandleOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -636,6 +660,8 @@ type OpenFileOp struct {
|
|||
}
|
||||
|
||||
func (o *OpenFileOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -679,6 +705,8 @@ type ReadFileOp struct {
|
|||
}
|
||||
|
||||
func (o *ReadFileOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -763,6 +791,8 @@ type WriteFileOp struct {
|
|||
}
|
||||
|
||||
func (o *WriteFileOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -801,6 +831,8 @@ type SyncFileOp struct {
|
|||
}
|
||||
|
||||
func (o *SyncFileOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -866,6 +898,8 @@ type FlushFileOp struct {
|
|||
}
|
||||
|
||||
func (o *FlushFileOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
@ -893,6 +927,8 @@ type ReleaseFileHandleOp struct {
|
|||
}
|
||||
|
||||
func (o *ReleaseFileHandleOp) Respond(err error) {
|
||||
defer o.commonOp.opsInFlight.Done()
|
||||
|
||||
if err != nil {
|
||||
o.commonOp.respondErr(err)
|
||||
return
|
||||
|
|
|
@ -44,9 +44,12 @@ func (mfs *MountedFileSystem) Dir() string {
|
|||
return mfs.dir
|
||||
}
|
||||
|
||||
// Block until a mounted file system has been unmounted. The return value will
|
||||
// be non-nil if anything unexpected happened while serving. May be called
|
||||
// multiple times.
|
||||
// Block until a mounted file system has been unmounted. Do not return
|
||||
// successfully until all ops read from the connection have been responded to
|
||||
// (i.e. the file system server has finished processing all in-flight ops).
|
||||
//
|
||||
// The return value will be non-nil if anything unexpected happened while
|
||||
// serving. May be called multiple times.
|
||||
func (mfs *MountedFileSystem) Join(ctx context.Context) error {
|
||||
select {
|
||||
case <-mfs.joinStatusAvailable:
|
||||
|
|
|
@ -129,5 +129,12 @@ func (t *SampleTest) destroy() (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// Join the file system.
|
||||
err = t.mfs.Join(t.Ctx)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("mfs.Join: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue