diff --git a/connection.go b/connection.go index 6c2f16f..7210dbc 100644 --- a/connection.go +++ b/connection.go @@ -15,6 +15,7 @@ package fuse import ( + "errors" "fmt" "log" "path" @@ -24,6 +25,7 @@ import ( "golang.org/x/net/context" "github.com/jacobsa/fuse/fuseops" + "github.com/jacobsa/fuse/internal/fusekernel" "github.com/jacobsa/fuse/internal/fuseshim" ) @@ -41,11 +43,11 @@ type Connection struct { mu sync.Mutex - // A map from fuseshim request ID (*not* the op ID for logging used above) to - // a function that cancel's its associated context. + // A map from fuse "unique" request ID (*not* the op ID for logging used + // above) to a function that cancel's its associated context. // // GUARDED_BY(mu) - cancelFuncs map[fuseshim.RequestID]func() + cancelFuncs map[uint64]func() } // Responsibility for closing the wrapped connection is transferred to the @@ -62,7 +64,7 @@ func newConnection( errorLogger: errorLogger, wrapped: wrapped, parentCtx: parentCtx, - cancelFuncs: make(map[fuseshim.RequestID]func()), + cancelFuncs: make(map[uint64]func()), } return @@ -104,28 +106,27 @@ func (c *Connection) debugLog( // LOCKS_EXCLUDED(c.mu) func (c *Connection) recordCancelFunc( - reqID fuseshim.RequestID, + fuseID uint64, f func()) { c.mu.Lock() defer c.mu.Unlock() - if _, ok := c.cancelFuncs[reqID]; ok { - panic(fmt.Sprintf("Already have cancel func for request %v", reqID)) + if _, ok := c.cancelFuncs[fuseID]; ok { + panic(fmt.Sprintf("Already have cancel func for request %v", fuseID)) } - c.cancelFuncs[reqID] = f + c.cancelFuncs[fuseID] = f } // Set up state for an op that is about to be returned to the user, given its -// underlying fuseshim request. +// underlying fuse opcode and request ID. // // Return a context that should be used for the op. // // LOCKS_EXCLUDED(c.mu) func (c *Connection) beginOp( - bfReq fuseshim.Request) (ctx context.Context) { - reqID := bfReq.Hdr().ID - + opCode uint32, + fuseID uint64) (ctx context.Context) { // Start with the parent context. ctx = c.parentCtx @@ -137,46 +138,46 @@ func (c *Connection) beginOp( // should not record any state keyed on their ID. // // Cf. https://github.com/osxfuse/osxfuse/issues/208 - if _, ok := bfReq.(*fuseshim.ForgetRequest); !ok { + if opCode != fusekernel.OpForget { var cancel func() ctx, cancel = context.WithCancel(ctx) - c.recordCancelFunc(reqID, cancel) + c.recordCancelFunc(fuseID, cancel) } return } // Clean up all state associated with an op to which the user has responded, -// given its underlying fuseshim request. This must be called before a response -// is sent to the kernel, to avoid a race where the request's ID might be -// reused by osxfuse. +// given its underlying fuse opcode and request ID. This must be called before +// a response is sent to the kernel, to avoid a race where the request's ID +// might be reused by osxfuse. // // LOCKS_EXCLUDED(c.mu) -func (c *Connection) finishOp(bfReq fuseshim.Request) { +func (c *Connection) finishOp( + opCode uint32, + fuseID uint64) { c.mu.Lock() defer c.mu.Unlock() - reqID := bfReq.Hdr().ID - // Even though the op is finished, context.WithCancel requires us to arrange // for the cancellation function to be invoked. We also must remove it from // our map. // // Special case: we don't do this for Forget requests. See the note in // beginOp above. - if _, ok := bfReq.(*fuseshim.ForgetRequest); !ok { - cancel, ok := c.cancelFuncs[reqID] + if opCode != fusekernel.OpForget { + cancel, ok := c.cancelFuncs[fuseID] if !ok { - panic(fmt.Sprintf("Unknown request ID in finishOp: %v", reqID)) + panic(fmt.Sprintf("Unknown request ID in finishOp: %v", fuseID)) } cancel() - delete(c.cancelFuncs, reqID) + delete(c.cancelFuncs, fuseID) } } // LOCKS_EXCLUDED(c.mu) -func (c *Connection) handleInterrupt(req *fuseshim.InterruptRequest) { +func (c *Connection) handleInterrupt(fuseID uint64) { c.mu.Lock() defer c.mu.Unlock() @@ -194,7 +195,7 @@ func (c *Connection) handleInterrupt(req *fuseshim.InterruptRequest) { // // Cf. https://github.com/osxfuse/osxfuse/issues/208 // Cf. http://comments.gmane.org/gmane.comp.file-systems.fuse.devel/14675 - cancel, ok := c.cancelFuncs[req.IntrID] + cancel, ok := c.cancelFuncs[fuseID] if !ok { return } @@ -212,20 +213,55 @@ func (c *Connection) handleInterrupt(req *fuseshim.InterruptRequest) { func (c *Connection) ReadOp() (op fuseops.Op, err error) { // Keep going until we find a request we know how to convert. for { - // Read a fuseshim request. - var bfReq fuseshim.Request - bfReq, err = c.wrapped.ReadRequest() - + // Read the next message from the fuseshim connection. + var m *fuseshim.Message + m, err = c.wrapped.ReadMessage() if err != nil { return } - // Choose an ID for this operation. + // Choose an ID for this operation for the purposes of logging. opID := c.nextOpID c.nextOpID++ + // Set up op dependencies. + opCtx := c.beginOp(m.Hdr.Opcode, m.Hdr.Unique) + + var debugLogForOp func(int, string, ...interface{}) + if c.debugLogger != nil { + debugLogForOp = func(calldepth int, format string, v ...interface{}) { + c.debugLog(opID, calldepth+1, format, v...) + } + } + + sendReply := func( + fuseID uint64, + msg []byte, + opErr error) (err error) { + // TODO(jacobsa): Turn this into a method and maybe kill the fuseID + // parameter. + // + // TODO(jacobsa): Don't forget to destroy the message. + err = errors.New("TODO") + return + } + + // Convert the message to an Op. + op, err = fuseops.Convert( + opCtx, + m, + c.wrapped.Protocol(), + debugLogForOp, + c.errorLogger, + sendReply) + + if err != nil { + err = fmt.Errorf("fuseops.Convert: %v", err) + return + } + // Log the receipt of the operation. - c.debugLog(opID, 1, "<- %v", bfReq) + c.debugLog(opID, 1, "<- %v", op) // Special case: responding to statfs is required to make mounting work on // OS X. We don't currently expose the capability for the file system to @@ -242,25 +278,6 @@ func (c *Connection) ReadOp() (op fuseops.Op, err error) { continue } - // Set up op dependencies. - opCtx := c.beginOp(bfReq) - - var debugLogForOp func(int, string, ...interface{}) - if c.debugLogger != nil { - debugLogForOp = func(calldepth int, format string, v ...interface{}) { - c.debugLog(opID, calldepth+1, format, v...) - } - } - - finished := func(err error) { c.finishOp(bfReq) } - - op = fuseops.Convert( - opCtx, - bfReq, - debugLogForOp, - c.errorLogger, - finished) - return } } diff --git a/internal/fuseshim/fuse.go b/internal/fuseshim/fuse.go index 900fcab..666615d 100644 --- a/internal/fuseshim/fuse.go +++ b/internal/fuseshim/fuse.go @@ -532,7 +532,7 @@ func (c *Conn) Protocol() fusekernel.Protocol { // Read and sanity check a message from the kernel. Return io.EOF when the // kernel has hung up. The offset will point to the limit of the header. // -// The message must later be returned by calling m.Destroy. +// The message must later be disposed of by calling m.Destroy. func (c *Conn) ReadMessage() (m *Message, err error) { m = getMessage(c) loop: