fusego/connection.go

371 lines
9.0 KiB
Go
Raw Normal View History

2015-03-24 07:19:42 +03:00
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fuse
2015-03-24 07:23:19 +03:00
import (
"fmt"
2015-07-24 08:07:55 +03:00
"io"
2015-03-24 07:23:19 +03:00
"log"
2015-07-24 08:07:55 +03:00
"os"
"path"
"runtime"
"sync"
2015-07-24 08:07:55 +03:00
"syscall"
2015-03-24 07:23:19 +03:00
"golang.org/x/net/context"
2015-03-24 07:23:19 +03:00
"github.com/jacobsa/fuse/fuseops"
2015-07-24 07:33:27 +03:00
"github.com/jacobsa/fuse/internal/buffer"
2015-07-24 03:19:21 +03:00
"github.com/jacobsa/fuse/internal/fusekernel"
2015-07-23 09:18:43 +03:00
"github.com/jacobsa/fuse/internal/fuseshim"
2015-03-24 07:23:19 +03:00
)
2015-03-24 07:19:42 +03:00
// A connection to the fuse kernel process.
type Connection struct {
debugLogger *log.Logger
2015-05-25 07:17:16 +03:00
errorLogger *log.Logger
2015-07-23 09:18:43 +03:00
wrapped *fuseshim.Conn
2015-04-29 04:53:17 +03:00
// The context from which all op contexts inherit.
parentCtx context.Context
2015-04-29 04:53:17 +03:00
// For logging purposes only.
nextOpID uint32
2015-05-05 03:41:09 +03:00
mu sync.Mutex
2015-07-24 03:19:21 +03:00
// A map from fuse "unique" request ID (*not* the op ID for logging used
// above) to a function that cancel's its associated context.
2015-05-05 03:41:09 +03:00
//
// GUARDED_BY(mu)
2015-07-24 03:19:21 +03:00
cancelFuncs map[uint64]func()
2015-03-24 07:19:42 +03:00
}
2015-03-24 07:34:50 +03:00
// Responsibility for closing the wrapped connection is transferred to the
// result. You must call c.close() eventually.
//
// The loggers may be nil.
2015-03-24 07:34:50 +03:00
func newConnection(
parentCtx context.Context,
debugLogger *log.Logger,
2015-05-25 07:17:16 +03:00
errorLogger *log.Logger,
2015-07-23 09:18:43 +03:00
wrapped *fuseshim.Conn) (c *Connection, err error) {
2015-03-24 07:36:09 +03:00
c = &Connection{
debugLogger: debugLogger,
2015-05-25 07:17:16 +03:00
errorLogger: errorLogger,
2015-05-05 03:42:17 +03:00
wrapped: wrapped,
parentCtx: parentCtx,
2015-07-24 03:19:21 +03:00
cancelFuncs: make(map[uint64]func()),
2015-03-24 07:36:09 +03:00
}
return
}
2015-03-24 07:23:19 +03:00
2015-04-29 04:53:17 +03:00
// Log information for an operation with the given ID. calldepth is the depth
// to use when recovering file:line information with runtime.Caller.
func (c *Connection) debugLog(
2015-04-29 04:53:17 +03:00
opID uint32,
calldepth int,
format string,
v ...interface{}) {
if c.debugLogger == nil {
return
}
// Get file:line info.
var file string
var line int
var ok bool
_, file, line, ok = runtime.Caller(calldepth)
if !ok {
file = "???"
}
2015-04-29 05:11:34 +03:00
fileLine := fmt.Sprintf("%v:%v", path.Base(file), line)
// Format the actual message to be printed.
msg := fmt.Sprintf(
2015-04-29 05:11:34 +03:00
"Op 0x%08x %24s] %v",
2015-04-29 04:51:25 +03:00
opID,
2015-04-29 05:11:34 +03:00
fileLine,
fmt.Sprintf(format, v...))
// Print it.
c.debugLogger.Println(msg)
}
2015-05-05 03:41:09 +03:00
// LOCKS_EXCLUDED(c.mu)
func (c *Connection) recordCancelFunc(
2015-07-24 03:19:21 +03:00
fuseID uint64,
2015-05-05 03:41:52 +03:00
f func()) {
c.mu.Lock()
defer c.mu.Unlock()
2015-07-24 03:19:21 +03:00
if _, ok := c.cancelFuncs[fuseID]; ok {
panic(fmt.Sprintf("Already have cancel func for request %v", fuseID))
2015-05-05 03:41:52 +03:00
}
2015-07-24 03:19:21 +03:00
c.cancelFuncs[fuseID] = f
2015-05-05 03:41:52 +03:00
}
2015-05-05 03:41:09 +03:00
// Set up state for an op that is about to be returned to the user, given its
2015-07-24 03:19:21 +03:00
// underlying fuse opcode and request ID.
//
// Return a context that should be used for the op.
2015-05-05 03:41:09 +03:00
//
// LOCKS_EXCLUDED(c.mu)
func (c *Connection) beginOp(
2015-07-24 03:19:21 +03:00
opCode uint32,
fuseID uint64) (ctx context.Context) {
// Start with the parent context.
ctx = c.parentCtx
2015-05-05 03:41:09 +03:00
// Set up a cancellation function.
//
// Special case: On Darwin, osxfuse aggressively reuses "unique" request IDs.
// This matters for Forget requests, which have no reply associated and
// therefore have IDs that are immediately eligible for reuse. For these, we
// should not record any state keyed on their ID.
//
// Cf. https://github.com/osxfuse/osxfuse/issues/208
2015-07-24 03:19:21 +03:00
if opCode != fusekernel.OpForget {
var cancel func()
ctx, cancel = context.WithCancel(ctx)
2015-07-24 03:19:21 +03:00
c.recordCancelFunc(fuseID, cancel)
}
return
2015-05-05 03:04:31 +03:00
}
2015-05-05 03:04:03 +03:00
// Clean up all state associated with an op to which the user has responded,
2015-07-24 03:19:21 +03:00
// given its underlying fuse opcode and request ID. This must be called before
// a response is sent to the kernel, to avoid a race where the request's ID
// might be reused by osxfuse.
2015-05-05 03:41:09 +03:00
//
// LOCKS_EXCLUDED(c.mu)
2015-07-24 03:19:21 +03:00
func (c *Connection) finishOp(
opCode uint32,
fuseID uint64) {
2015-05-05 03:41:09 +03:00
c.mu.Lock()
defer c.mu.Unlock()
// Even though the op is finished, context.WithCancel requires us to arrange
// for the cancellation function to be invoked. We also must remove it from
// our map.
//
// Special case: we don't do this for Forget requests. See the note in
// beginOp above.
2015-07-24 03:19:21 +03:00
if opCode != fusekernel.OpForget {
cancel, ok := c.cancelFuncs[fuseID]
if !ok {
2015-07-24 03:19:21 +03:00
panic(fmt.Sprintf("Unknown request ID in finishOp: %v", fuseID))
}
2015-05-05 03:41:09 +03:00
cancel()
2015-07-24 03:19:21 +03:00
delete(c.cancelFuncs, fuseID)
}
2015-05-05 03:04:31 +03:00
}
2015-05-05 03:04:03 +03:00
2015-05-05 05:21:57 +03:00
// LOCKS_EXCLUDED(c.mu)
2015-07-24 03:19:21 +03:00
func (c *Connection) handleInterrupt(fuseID uint64) {
2015-05-05 05:21:57 +03:00
c.mu.Lock()
defer c.mu.Unlock()
// NOTE(jacobsa): fuse.txt in the Linux kernel documentation
// (https://goo.gl/H55Dnr) defines the kernel <-> userspace protocol for
// interrupts.
//
// In particular, my reading of it is that an interrupt request cannot be
// delivered to userspace before the original request. The part about the
// race and EAGAIN appears to be aimed at userspace programs that
// concurrently process requests (cf. http://goo.gl/BES2rs).
2015-05-05 05:21:57 +03:00
//
// So in this method if we can't find the ID to be interrupted, it means that
// the request has already been replied to.
//
// Cf. https://github.com/osxfuse/osxfuse/issues/208
// Cf. http://comments.gmane.org/gmane.comp.file-systems.fuse.devel/14675
2015-07-24 03:19:21 +03:00
cancel, ok := c.cancelFuncs[fuseID]
2015-05-05 05:21:57 +03:00
if !ok {
return
}
cancel()
}
2015-07-24 07:33:27 +03:00
func (c *Connection) allocateInMessage() (m *buffer.InMessage) {
2015-07-24 08:05:27 +03:00
// TODO(jacobsa): Use a freelist.
m = new(buffer.InMessage)
return
2015-07-24 07:33:27 +03:00
}
func (c *Connection) destroyInMessage(m *buffer.InMessage) {
2015-07-24 08:05:27 +03:00
// TODO(jacobsa): Use a freelist.
2015-07-24 07:33:27 +03:00
}
// Read the next message from the kernel. The message must later be destroyed
// using destroyInMessage.
func (c *Connection) readMessage() (m *buffer.InMessage, err error) {
2015-07-24 08:04:28 +03:00
// Allocate a message.
m = c.allocateInMessage()
// Loop past transient errors.
for {
// Attempt a reaed.
2015-07-24 08:04:28 +03:00
err = m.Init(c.wrapped.Dev)
2015-07-24 08:20:43 +03:00
// Special cases:
//
// * ENODEV means fuse has hung up.
//
// * EINTR means we should try again. (This seems to happen often on
// OS X, cf. http://golang.org/issue/11180)
//
if pe, ok := err.(*os.PathError); ok {
switch pe.Err {
case syscall.ENODEV:
2015-07-24 08:07:55 +03:00
err = io.EOF
2015-07-24 08:20:43 +03:00
case syscall.EINTR:
err = nil
continue
2015-07-24 08:07:55 +03:00
}
2015-07-24 08:20:43 +03:00
}
2015-07-24 08:07:55 +03:00
2015-07-24 08:20:43 +03:00
if err != nil {
c.destroyInMessage(m)
m = nil
2015-07-24 08:04:28 +03:00
return
}
return
}
2015-07-24 07:33:27 +03:00
}
2015-03-24 07:19:42 +03:00
// Read the next op from the kernel process. Return io.EOF if the kernel has
// closed the connection.
2015-03-24 07:23:19 +03:00
//
// This function delivers ops in exactly the order they are received from
2015-04-29 04:28:16 +03:00
// /dev/fuse. It must not be called multiple times concurrently.
2015-05-05 03:41:09 +03:00
//
// LOCKS_EXCLUDED(c.mu)
2015-03-24 07:26:02 +03:00
func (c *Connection) ReadOp() (op fuseops.Op, err error) {
// Keep going until we find a request we know how to convert.
for {
2015-07-24 07:33:27 +03:00
// Read the next message from the kernel.
var m *buffer.InMessage
m, err = c.readMessage()
2015-03-24 07:26:02 +03:00
if err != nil {
return
}
2015-07-24 03:19:21 +03:00
// Choose an ID for this operation for the purposes of logging.
opID := c.nextOpID
c.nextOpID++
2015-07-24 03:19:21 +03:00
// Set up op dependencies.
2015-07-24 07:33:27 +03:00
opCtx := c.beginOp(m.Header().Opcode, m.Header().Unique)
2015-07-24 03:19:21 +03:00
var debugLogForOp func(int, string, ...interface{})
if c.debugLogger != nil {
debugLogForOp = func(calldepth int, format string, v ...interface{}) {
c.debugLog(opID, calldepth+1, format, v...)
}
}
sendReply := func(
2015-07-24 04:38:23 +03:00
op fuseops.Op,
2015-07-24 03:19:21 +03:00
fuseID uint64,
2015-07-24 04:16:49 +03:00
replyMsg []byte,
2015-07-24 03:19:21 +03:00
opErr error) (err error) {
2015-07-24 07:33:27 +03:00
// Make sure we destroy the message, as required by readMessage.
defer c.destroyInMessage(m)
2015-07-24 04:27:03 +03:00
2015-07-24 04:16:49 +03:00
// Clean up state for this op.
2015-07-24 07:33:27 +03:00
c.finishOp(m.Header().Opcode, m.Header().Unique)
2015-07-24 04:16:49 +03:00
2015-07-24 04:38:23 +03:00
// Debug logging
if c.debugLogger != nil {
if opErr == nil {
2015-07-24 04:47:39 +03:00
op.Logf("-> OK: %s", op.DebugString())
2015-07-24 04:38:23 +03:00
} else {
2015-07-24 04:47:39 +03:00
op.Logf("-> error: %v", opErr)
2015-07-24 04:38:23 +03:00
}
}
// Error logging
if opErr != nil && c.errorLogger != nil {
c.errorLogger.Printf("(%s) error: %v", op.ShortDesc(), opErr)
}
2015-07-24 04:16:49 +03:00
// Send the reply to the kernel.
2015-07-24 04:27:03 +03:00
err = c.wrapped.WriteToKernel(replyMsg)
if err != nil {
err = fmt.Errorf("WriteToKernel: %v", err)
return
}
2015-07-24 04:16:49 +03:00
2015-07-24 03:19:21 +03:00
return
}
// Convert the message to an Op.
op, err = fuseops.Convert(
opCtx,
m,
c.wrapped.Protocol(),
debugLogForOp,
c.errorLogger,
sendReply)
if err != nil {
err = fmt.Errorf("fuseops.Convert: %v", err)
return
}
// Log the receipt of the operation.
2015-07-24 04:13:17 +03:00
c.debugLog(opID, 1, "<- %v", op.ShortDesc())
2015-03-24 07:52:14 +03:00
2015-05-05 05:21:57 +03:00
// Special case: responding to statfs is required to make mounting work on
// OS X. We don't currently expose the capability for the file system to
2015-03-24 07:51:36 +03:00
// intercept this.
2015-07-24 03:32:36 +03:00
if _, ok := op.(*fuseops.InternalStatFSOp); ok {
op.Respond(nil)
2015-03-24 07:51:36 +03:00
continue
}
2015-05-05 05:21:57 +03:00
// Special case: handle interrupt requests.
2015-07-24 03:32:36 +03:00
if interruptOp, ok := op.(*fuseops.InternalInterruptOp); ok {
c.handleInterrupt(interruptOp.FuseID)
2015-05-05 05:21:57 +03:00
continue
}
2015-03-24 07:26:02 +03:00
return
}
}
2015-03-24 07:34:50 +03:00
2015-03-24 07:36:09 +03:00
func (c *Connection) waitForReady() (err error) {
<-c.wrapped.Ready
err = c.wrapped.MountError
return
}
2015-03-24 07:34:50 +03:00
// Close the connection. Must not be called until operations that were read
// from the connection have been responded to.
2015-03-24 07:36:09 +03:00
func (c *Connection) close() (err error) {
err = c.wrapped.Close()
return
}