diff --git a/connection.go b/connection.go index 2d5d9ee..4e83b8b 100644 --- a/connection.go +++ b/connection.go @@ -38,6 +38,11 @@ type Connection struct { errorLogger *log.Logger wrapped *fuseshim.Conn + // The device through which we're talking to the kernel, and the protocol + // version that we're using to talk to it. + dev *os.File + protocol fusekernel.Protocol + // The context from which all op contexts inherit. parentCtx context.Context @@ -66,6 +71,8 @@ func newConnection( debugLogger: debugLogger, errorLogger: errorLogger, wrapped: wrapped, + dev: wrapped.Dev, + protocol: wrapped.Protocol(), parentCtx: parentCtx, cancelFuncs: make(map[uint64]func()), } @@ -224,13 +231,8 @@ func (c *Connection) readMessage() (m *buffer.InMessage, err error) { // Loop past transient errors. for { - // Lock and read. - // - // TODO(jacobsa): Ensure that we document concurrency constraints that make - // it safe, then kill the lock here. - c.wrapped.Rio.RLock() - err = m.Init(c.wrapped.Dev) - c.wrapped.Rio.RUnlock() + // Attempt a reaed. + err = m.Init(c.dev) // Special cases: // @@ -260,6 +262,22 @@ func (c *Connection) readMessage() (m *buffer.InMessage, err error) { } } +// Write the supplied message to the kernel. +func (c *Connection) writeMessage(msg []byte) (err error) { + // Avoid the retry loop in os.File.Write. + n, err := syscall.Write(int(c.dev.Fd()), msg) + if err != nil { + return + } + + if n != len(msg) { + err = fmt.Errorf("Wrote %d bytes; expected %d", n, len(msg)) + return + } + + return +} + // Read the next op from the kernel process. Return io.EOF if the kernel has // closed the connection. // @@ -317,9 +335,9 @@ func (c *Connection) ReadOp() (op fuseops.Op, err error) { } // Send the reply to the kernel. - err = c.wrapped.WriteToKernel(replyMsg) + err = c.writeMessage(replyMsg) if err != nil { - err = fmt.Errorf("WriteToKernel: %v", err) + err = fmt.Errorf("writeMessage: %v", err) return } @@ -330,7 +348,7 @@ func (c *Connection) ReadOp() (op fuseops.Op, err error) { op, err = fuseops.Convert( opCtx, m, - c.wrapped.Protocol(), + c.protocol, debugLogForOp, c.errorLogger, sendReply) @@ -370,6 +388,9 @@ func (c *Connection) waitForReady() (err error) { // Close the connection. Must not be called until operations that were read // from the connection have been responded to. func (c *Connection) close() (err error) { - err = c.wrapped.Close() + // Posix doesn't say that close can be called concurrently with read or + // write, but luckily we exclude the possibility of a race by requiring the + // user to respond to all ops first. + err = c.dev.Close() return }