Fixed much of Connection.ReadOp.
parent
290ac455fd
commit
72906b755a
119
connection.go
119
connection.go
|
@ -15,6 +15,7 @@
|
||||||
package fuse
|
package fuse
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"path"
|
"path"
|
||||||
|
@ -24,6 +25,7 @@ import (
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/jacobsa/fuse/fuseops"
|
"github.com/jacobsa/fuse/fuseops"
|
||||||
|
"github.com/jacobsa/fuse/internal/fusekernel"
|
||||||
"github.com/jacobsa/fuse/internal/fuseshim"
|
"github.com/jacobsa/fuse/internal/fuseshim"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -41,11 +43,11 @@ type Connection struct {
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|
||||||
// A map from fuseshim request ID (*not* the op ID for logging used above) to
|
// A map from fuse "unique" request ID (*not* the op ID for logging used
|
||||||
// a function that cancel's its associated context.
|
// above) to a function that cancel's its associated context.
|
||||||
//
|
//
|
||||||
// GUARDED_BY(mu)
|
// GUARDED_BY(mu)
|
||||||
cancelFuncs map[fuseshim.RequestID]func()
|
cancelFuncs map[uint64]func()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Responsibility for closing the wrapped connection is transferred to the
|
// Responsibility for closing the wrapped connection is transferred to the
|
||||||
|
@ -62,7 +64,7 @@ func newConnection(
|
||||||
errorLogger: errorLogger,
|
errorLogger: errorLogger,
|
||||||
wrapped: wrapped,
|
wrapped: wrapped,
|
||||||
parentCtx: parentCtx,
|
parentCtx: parentCtx,
|
||||||
cancelFuncs: make(map[fuseshim.RequestID]func()),
|
cancelFuncs: make(map[uint64]func()),
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -104,28 +106,27 @@ func (c *Connection) debugLog(
|
||||||
|
|
||||||
// LOCKS_EXCLUDED(c.mu)
|
// LOCKS_EXCLUDED(c.mu)
|
||||||
func (c *Connection) recordCancelFunc(
|
func (c *Connection) recordCancelFunc(
|
||||||
reqID fuseshim.RequestID,
|
fuseID uint64,
|
||||||
f func()) {
|
f func()) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
if _, ok := c.cancelFuncs[reqID]; ok {
|
if _, ok := c.cancelFuncs[fuseID]; ok {
|
||||||
panic(fmt.Sprintf("Already have cancel func for request %v", reqID))
|
panic(fmt.Sprintf("Already have cancel func for request %v", fuseID))
|
||||||
}
|
}
|
||||||
|
|
||||||
c.cancelFuncs[reqID] = f
|
c.cancelFuncs[fuseID] = f
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set up state for an op that is about to be returned to the user, given its
|
// Set up state for an op that is about to be returned to the user, given its
|
||||||
// underlying fuseshim request.
|
// underlying fuse opcode and request ID.
|
||||||
//
|
//
|
||||||
// Return a context that should be used for the op.
|
// Return a context that should be used for the op.
|
||||||
//
|
//
|
||||||
// LOCKS_EXCLUDED(c.mu)
|
// LOCKS_EXCLUDED(c.mu)
|
||||||
func (c *Connection) beginOp(
|
func (c *Connection) beginOp(
|
||||||
bfReq fuseshim.Request) (ctx context.Context) {
|
opCode uint32,
|
||||||
reqID := bfReq.Hdr().ID
|
fuseID uint64) (ctx context.Context) {
|
||||||
|
|
||||||
// Start with the parent context.
|
// Start with the parent context.
|
||||||
ctx = c.parentCtx
|
ctx = c.parentCtx
|
||||||
|
|
||||||
|
@ -137,46 +138,46 @@ func (c *Connection) beginOp(
|
||||||
// should not record any state keyed on their ID.
|
// should not record any state keyed on their ID.
|
||||||
//
|
//
|
||||||
// Cf. https://github.com/osxfuse/osxfuse/issues/208
|
// Cf. https://github.com/osxfuse/osxfuse/issues/208
|
||||||
if _, ok := bfReq.(*fuseshim.ForgetRequest); !ok {
|
if opCode != fusekernel.OpForget {
|
||||||
var cancel func()
|
var cancel func()
|
||||||
ctx, cancel = context.WithCancel(ctx)
|
ctx, cancel = context.WithCancel(ctx)
|
||||||
c.recordCancelFunc(reqID, cancel)
|
c.recordCancelFunc(fuseID, cancel)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean up all state associated with an op to which the user has responded,
|
// Clean up all state associated with an op to which the user has responded,
|
||||||
// given its underlying fuseshim request. This must be called before a response
|
// given its underlying fuse opcode and request ID. This must be called before
|
||||||
// is sent to the kernel, to avoid a race where the request's ID might be
|
// a response is sent to the kernel, to avoid a race where the request's ID
|
||||||
// reused by osxfuse.
|
// might be reused by osxfuse.
|
||||||
//
|
//
|
||||||
// LOCKS_EXCLUDED(c.mu)
|
// LOCKS_EXCLUDED(c.mu)
|
||||||
func (c *Connection) finishOp(bfReq fuseshim.Request) {
|
func (c *Connection) finishOp(
|
||||||
|
opCode uint32,
|
||||||
|
fuseID uint64) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
reqID := bfReq.Hdr().ID
|
|
||||||
|
|
||||||
// Even though the op is finished, context.WithCancel requires us to arrange
|
// Even though the op is finished, context.WithCancel requires us to arrange
|
||||||
// for the cancellation function to be invoked. We also must remove it from
|
// for the cancellation function to be invoked. We also must remove it from
|
||||||
// our map.
|
// our map.
|
||||||
//
|
//
|
||||||
// Special case: we don't do this for Forget requests. See the note in
|
// Special case: we don't do this for Forget requests. See the note in
|
||||||
// beginOp above.
|
// beginOp above.
|
||||||
if _, ok := bfReq.(*fuseshim.ForgetRequest); !ok {
|
if opCode != fusekernel.OpForget {
|
||||||
cancel, ok := c.cancelFuncs[reqID]
|
cancel, ok := c.cancelFuncs[fuseID]
|
||||||
if !ok {
|
if !ok {
|
||||||
panic(fmt.Sprintf("Unknown request ID in finishOp: %v", reqID))
|
panic(fmt.Sprintf("Unknown request ID in finishOp: %v", fuseID))
|
||||||
}
|
}
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
delete(c.cancelFuncs, reqID)
|
delete(c.cancelFuncs, fuseID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// LOCKS_EXCLUDED(c.mu)
|
// LOCKS_EXCLUDED(c.mu)
|
||||||
func (c *Connection) handleInterrupt(req *fuseshim.InterruptRequest) {
|
func (c *Connection) handleInterrupt(fuseID uint64) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
@ -194,7 +195,7 @@ func (c *Connection) handleInterrupt(req *fuseshim.InterruptRequest) {
|
||||||
//
|
//
|
||||||
// Cf. https://github.com/osxfuse/osxfuse/issues/208
|
// Cf. https://github.com/osxfuse/osxfuse/issues/208
|
||||||
// Cf. http://comments.gmane.org/gmane.comp.file-systems.fuse.devel/14675
|
// Cf. http://comments.gmane.org/gmane.comp.file-systems.fuse.devel/14675
|
||||||
cancel, ok := c.cancelFuncs[req.IntrID]
|
cancel, ok := c.cancelFuncs[fuseID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -212,20 +213,55 @@ func (c *Connection) handleInterrupt(req *fuseshim.InterruptRequest) {
|
||||||
func (c *Connection) ReadOp() (op fuseops.Op, err error) {
|
func (c *Connection) ReadOp() (op fuseops.Op, err error) {
|
||||||
// Keep going until we find a request we know how to convert.
|
// Keep going until we find a request we know how to convert.
|
||||||
for {
|
for {
|
||||||
// Read a fuseshim request.
|
// Read the next message from the fuseshim connection.
|
||||||
var bfReq fuseshim.Request
|
var m *fuseshim.Message
|
||||||
bfReq, err = c.wrapped.ReadRequest()
|
m, err = c.wrapped.ReadMessage()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Choose an ID for this operation.
|
// Choose an ID for this operation for the purposes of logging.
|
||||||
opID := c.nextOpID
|
opID := c.nextOpID
|
||||||
c.nextOpID++
|
c.nextOpID++
|
||||||
|
|
||||||
|
// Set up op dependencies.
|
||||||
|
opCtx := c.beginOp(m.Hdr.Opcode, m.Hdr.Unique)
|
||||||
|
|
||||||
|
var debugLogForOp func(int, string, ...interface{})
|
||||||
|
if c.debugLogger != nil {
|
||||||
|
debugLogForOp = func(calldepth int, format string, v ...interface{}) {
|
||||||
|
c.debugLog(opID, calldepth+1, format, v...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sendReply := func(
|
||||||
|
fuseID uint64,
|
||||||
|
msg []byte,
|
||||||
|
opErr error) (err error) {
|
||||||
|
// TODO(jacobsa): Turn this into a method and maybe kill the fuseID
|
||||||
|
// parameter.
|
||||||
|
//
|
||||||
|
// TODO(jacobsa): Don't forget to destroy the message.
|
||||||
|
err = errors.New("TODO")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert the message to an Op.
|
||||||
|
op, err = fuseops.Convert(
|
||||||
|
opCtx,
|
||||||
|
m,
|
||||||
|
c.wrapped.Protocol(),
|
||||||
|
debugLogForOp,
|
||||||
|
c.errorLogger,
|
||||||
|
sendReply)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("fuseops.Convert: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Log the receipt of the operation.
|
// Log the receipt of the operation.
|
||||||
c.debugLog(opID, 1, "<- %v", bfReq)
|
c.debugLog(opID, 1, "<- %v", op)
|
||||||
|
|
||||||
// Special case: responding to statfs is required to make mounting work on
|
// Special case: responding to statfs is required to make mounting work on
|
||||||
// OS X. We don't currently expose the capability for the file system to
|
// OS X. We don't currently expose the capability for the file system to
|
||||||
|
@ -242,25 +278,6 @@ func (c *Connection) ReadOp() (op fuseops.Op, err error) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set up op dependencies.
|
|
||||||
opCtx := c.beginOp(bfReq)
|
|
||||||
|
|
||||||
var debugLogForOp func(int, string, ...interface{})
|
|
||||||
if c.debugLogger != nil {
|
|
||||||
debugLogForOp = func(calldepth int, format string, v ...interface{}) {
|
|
||||||
c.debugLog(opID, calldepth+1, format, v...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
finished := func(err error) { c.finishOp(bfReq) }
|
|
||||||
|
|
||||||
op = fuseops.Convert(
|
|
||||||
opCtx,
|
|
||||||
bfReq,
|
|
||||||
debugLogForOp,
|
|
||||||
c.errorLogger,
|
|
||||||
finished)
|
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -532,7 +532,7 @@ func (c *Conn) Protocol() fusekernel.Protocol {
|
||||||
// Read and sanity check a message from the kernel. Return io.EOF when the
|
// Read and sanity check a message from the kernel. Return io.EOF when the
|
||||||
// kernel has hung up. The offset will point to the limit of the header.
|
// kernel has hung up. The offset will point to the limit of the header.
|
||||||
//
|
//
|
||||||
// The message must later be returned by calling m.Destroy.
|
// The message must later be disposed of by calling m.Destroy.
|
||||||
func (c *Conn) ReadMessage() (m *Message, err error) {
|
func (c *Conn) ReadMessage() (m *Message, err error) {
|
||||||
m = getMessage(c)
|
m = getMessage(c)
|
||||||
loop:
|
loop:
|
||||||
|
|
Loading…
Reference in New Issue