diff --git a/connection.go b/connection.go index e1d03da..2d5d9ee 100644 --- a/connection.go +++ b/connection.go @@ -16,14 +16,18 @@ package fuse import ( "fmt" + "io" "log" + "os" "path" "runtime" "sync" + "syscall" "golang.org/x/net/context" "github.com/jacobsa/fuse/fuseops" + "github.com/jacobsa/fuse/internal/buffer" "github.com/jacobsa/fuse/internal/fusekernel" "github.com/jacobsa/fuse/internal/fuseshim" ) @@ -202,6 +206,60 @@ func (c *Connection) handleInterrupt(fuseID uint64) { cancel() } +func (c *Connection) allocateInMessage() (m *buffer.InMessage) { + // TODO(jacobsa): Use a freelist. + m = new(buffer.InMessage) + return +} + +func (c *Connection) destroyInMessage(m *buffer.InMessage) { + // TODO(jacobsa): Use a freelist. +} + +// Read the next message from the kernel. The message must later be destroyed +// using destroyInMessage. +func (c *Connection) readMessage() (m *buffer.InMessage, err error) { + // Allocate a message. + m = c.allocateInMessage() + + // Loop past transient errors. + for { + // Lock and read. + // + // TODO(jacobsa): Ensure that we document concurrency constraints that make + // it safe, then kill the lock here. + c.wrapped.Rio.RLock() + err = m.Init(c.wrapped.Dev) + c.wrapped.Rio.RUnlock() + + // Special cases: + // + // * ENODEV means fuse has hung up. + // + // * EINTR means we should try again. (This seems to happen often on + // OS X, cf. http://golang.org/issue/11180) + // + if pe, ok := err.(*os.PathError); ok { + switch pe.Err { + case syscall.ENODEV: + err = io.EOF + + case syscall.EINTR: + err = nil + continue + } + } + + if err != nil { + c.destroyInMessage(m) + m = nil + return + } + + return + } +} + // Read the next op from the kernel process. Return io.EOF if the kernel has // closed the connection. // @@ -212,9 +270,9 @@ func (c *Connection) handleInterrupt(fuseID uint64) { func (c *Connection) ReadOp() (op fuseops.Op, err error) { // Keep going until we find a request we know how to convert. for { - // Read the next message from the fuseshim connection. - var m *fuseshim.Message - m, err = c.wrapped.ReadMessage() + // Read the next message from the kernel. + var m *buffer.InMessage + m, err = c.readMessage() if err != nil { return } @@ -224,7 +282,7 @@ func (c *Connection) ReadOp() (op fuseops.Op, err error) { c.nextOpID++ // Set up op dependencies. - opCtx := c.beginOp(m.Hdr.Opcode, m.Hdr.Unique) + opCtx := c.beginOp(m.Header().Opcode, m.Header().Unique) var debugLogForOp func(int, string, ...interface{}) if c.debugLogger != nil { @@ -238,12 +296,11 @@ func (c *Connection) ReadOp() (op fuseops.Op, err error) { fuseID uint64, replyMsg []byte, opErr error) (err error) { - // Make sure we destroy the message, as required by - // fuseshim.Connection.ReadMessage. - defer m.Destroy() + // Make sure we destroy the message, as required by readMessage. + defer c.destroyInMessage(m) // Clean up state for this op. - c.finishOp(m.Hdr.Opcode, m.Hdr.Unique) + c.finishOp(m.Header().Opcode, m.Header().Unique) // Debug logging if c.debugLogger != nil { diff --git a/fuseops/common_op.go b/fuseops/common_op.go index 203793c..fb3a224 100644 --- a/fuseops/common_op.go +++ b/fuseops/common_op.go @@ -36,7 +36,7 @@ type internalOp interface { // // Special case: a zero return value means that the kernel is not expecting a // response. - kernelResponse() (b buffer.Buffer) + kernelResponse() (b buffer.OutMessage) } // A function that sends a reply message back to the kernel for the request @@ -142,11 +142,11 @@ func (o *commonOp) Respond(err error) { // If successful, we ask the op for an appopriate response to the kernel, and // it is responsible for leaving room for the fusekernel.OutHeader struct. // Otherwise, create our own. - var b buffer.Buffer + var b buffer.OutMessage if err == nil { b = o.op.kernelResponse() } else { - b = buffer.New(0) + b = buffer.NewOutMessage(0) } // Fill in the header if a reply is needed. diff --git a/fuseops/convert.go b/fuseops/convert.go index fb30d8e..0f265cd 100644 --- a/fuseops/convert.go +++ b/fuseops/convert.go @@ -23,6 +23,7 @@ import ( "time" "unsafe" + "github.com/jacobsa/fuse/internal/buffer" "github.com/jacobsa/fuse/internal/fusekernel" "github.com/jacobsa/fuse/internal/fuseshim" "golang.org/x/net/context" @@ -39,7 +40,7 @@ import ( // responsible for arranging for the message to be destroyed. func Convert( opCtx context.Context, - m *fuseshim.Message, + m *buffer.InMessage, protocol fusekernel.Protocol, debugLogForOp func(int, string, ...interface{}), errorLogger *log.Logger, @@ -47,9 +48,9 @@ func Convert( var co *commonOp var io internalOp - switch m.Hdr.Opcode { + switch m.Header().Opcode { case fusekernel.OpLookup: - buf := m.Bytes() + buf := m.ConsumeBytes(m.Len()) n := len(buf) if n == 0 || buf[n-1] != '\x00' { err = errors.New("Corrupt OpLookup") @@ -58,7 +59,7 @@ func Convert( to := &LookUpInodeOp{ protocol: protocol, - Parent: InodeID(m.Hdr.Nodeid), + Parent: InodeID(m.Header().Nodeid), Name: string(buf[:n-1]), } io = to @@ -67,21 +68,22 @@ func Convert( case fusekernel.OpGetattr: to := &GetInodeAttributesOp{ protocol: protocol, - Inode: InodeID(m.Hdr.Nodeid), + Inode: InodeID(m.Header().Nodeid), } io = to co = &to.commonOp case fusekernel.OpSetattr: - in := (*fusekernel.SetattrIn)(m.Data()) - if m.Len() < unsafe.Sizeof(*in) { + type input fusekernel.SetattrIn + in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + if in == nil { err = errors.New("Corrupt OpSetattr") return } to := &SetInodeAttributesOp{ protocol: protocol, - Inode: InodeID(m.Hdr.Nodeid), + Inode: InodeID(m.Header().Nodeid), } valid := fusekernel.SetattrValid(in.Valid) @@ -108,27 +110,28 @@ func Convert( co = &to.commonOp case fusekernel.OpForget: - in := (*fusekernel.ForgetIn)(m.Data()) - if m.Len() < unsafe.Sizeof(*in) { + type input fusekernel.ForgetIn + in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + if in == nil { err = errors.New("Corrupt OpForget") return } to := &ForgetInodeOp{ - Inode: InodeID(m.Hdr.Nodeid), + Inode: InodeID(m.Header().Nodeid), N: in.Nlookup, } io = to co = &to.commonOp case fusekernel.OpMkdir: - size := fusekernel.MkdirInSize(protocol) - if m.Len() < size { + in := (*fusekernel.MkdirIn)(m.Consume(fusekernel.MkdirInSize(protocol))) + if in == nil { err = errors.New("Corrupt OpMkdir") return } - in := (*fusekernel.MkdirIn)(m.Data()) - name := m.Bytes()[size:] + + name := m.ConsumeBytes(m.Len()) i := bytes.IndexByte(name, '\x00') if i < 0 { err = errors.New("Corrupt OpMkdir") @@ -138,7 +141,7 @@ func Convert( to := &MkDirOp{ protocol: protocol, - Parent: InodeID(m.Hdr.Nodeid), + Parent: InodeID(m.Header().Nodeid), Name: string(name), // On Linux, vfs_mkdir calls through to the inode with at most @@ -154,13 +157,13 @@ func Convert( co = &to.commonOp case fusekernel.OpCreate: - size := fusekernel.CreateInSize(protocol) - if m.Len() < size { + in := (*fusekernel.CreateIn)(m.Consume(fusekernel.CreateInSize(protocol))) + if in == nil { err = errors.New("Corrupt OpCreate") return } - in := (*fusekernel.CreateIn)(m.Data()) - name := m.Bytes()[size:] + + name := m.ConsumeBytes(m.Len()) i := bytes.IndexByte(name, '\x00') if i < 0 { err = errors.New("Corrupt OpCreate") @@ -170,7 +173,7 @@ func Convert( to := &CreateFileOp{ protocol: protocol, - Parent: InodeID(m.Hdr.Nodeid), + Parent: InodeID(m.Header().Nodeid), Name: string(name), Mode: fuseshim.FileMode(in.Mode), } @@ -178,8 +181,8 @@ func Convert( co = &to.commonOp case fusekernel.OpSymlink: - // m.Bytes() is "newName\0target\0" - names := m.Bytes() + // The message is "newName\0target\0". + names := m.ConsumeBytes(m.Len()) if len(names) == 0 || names[len(names)-1] != 0 { err = errors.New("Corrupt OpSymlink") return @@ -193,7 +196,7 @@ func Convert( to := &CreateSymlinkOp{ protocol: protocol, - Parent: InodeID(m.Hdr.Nodeid), + Parent: InodeID(m.Header().Nodeid), Name: string(newName), Target: string(target), } @@ -201,12 +204,14 @@ func Convert( co = &to.commonOp case fusekernel.OpRename: - in := (*fusekernel.RenameIn)(m.Data()) - if m.Len() < unsafe.Sizeof(*in) { + type input fusekernel.RenameIn + in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + if in == nil { err = errors.New("Corrupt OpRename") return } - names := m.Bytes()[unsafe.Sizeof(*in):] + + names := m.ConsumeBytes(m.Len()) // names should be "old\x00new\x00" if len(names) < 4 { err = errors.New("Corrupt OpRename") @@ -224,7 +229,7 @@ func Convert( oldName, newName := names[:i], names[i+1:len(names)-1] to := &RenameOp{ - OldParent: InodeID(m.Hdr.Nodeid), + OldParent: InodeID(m.Header().Nodeid), OldName: string(oldName), NewParent: InodeID(in.Newdir), NewName: string(newName), @@ -233,7 +238,7 @@ func Convert( co = &to.commonOp case fusekernel.OpUnlink: - buf := m.Bytes() + buf := m.ConsumeBytes(m.Len()) n := len(buf) if n == 0 || buf[n-1] != '\x00' { err = errors.New("Corrupt OpUnlink") @@ -241,14 +246,14 @@ func Convert( } to := &UnlinkOp{ - Parent: InodeID(m.Hdr.Nodeid), + Parent: InodeID(m.Header().Nodeid), Name: string(buf[:n-1]), } io = to co = &to.commonOp case fusekernel.OpRmdir: - buf := m.Bytes() + buf := m.ConsumeBytes(m.Len()) n := len(buf) if n == 0 || buf[n-1] != '\x00' { err = errors.New("Corrupt OpRmdir") @@ -256,7 +261,7 @@ func Convert( } to := &RmDirOp{ - Parent: InodeID(m.Hdr.Nodeid), + Parent: InodeID(m.Header().Nodeid), Name: string(buf[:n-1]), } io = to @@ -264,27 +269,27 @@ func Convert( case fusekernel.OpOpen: to := &OpenFileOp{ - Inode: InodeID(m.Hdr.Nodeid), + Inode: InodeID(m.Header().Nodeid), } io = to co = &to.commonOp case fusekernel.OpOpendir: to := &OpenDirOp{ - Inode: InodeID(m.Hdr.Nodeid), + Inode: InodeID(m.Header().Nodeid), } io = to co = &to.commonOp case fusekernel.OpRead: - in := (*fusekernel.ReadIn)(m.Data()) - if m.Len() < fusekernel.ReadInSize(protocol) { + in := (*fusekernel.ReadIn)(m.Consume(fusekernel.ReadInSize(protocol))) + if in == nil { err = errors.New("Corrupt OpRead") return } to := &ReadFileOp{ - Inode: InodeID(m.Hdr.Nodeid), + Inode: InodeID(m.Header().Nodeid), Handle: HandleID(in.Fh), Offset: int64(in.Offset), Size: int(in.Size), @@ -293,14 +298,14 @@ func Convert( co = &to.commonOp case fusekernel.OpReaddir: - in := (*fusekernel.ReadIn)(m.Data()) - if m.Len() < fusekernel.ReadInSize(protocol) { + in := (*fusekernel.ReadIn)(m.Consume(fusekernel.ReadInSize(protocol))) + if in == nil { err = errors.New("Corrupt OpReaddir") return } to := &ReadDirOp{ - Inode: InodeID(m.Hdr.Nodeid), + Inode: InodeID(m.Header().Nodeid), Handle: HandleID(in.Fh), Offset: DirOffset(in.Offset), Size: int(in.Size), @@ -309,8 +314,9 @@ func Convert( co = &to.commonOp case fusekernel.OpRelease: - in := (*fusekernel.ReleaseIn)(m.Data()) - if m.Len() < unsafe.Sizeof(*in) { + type input fusekernel.ReleaseIn + in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + if in == nil { err = errors.New("Corrupt OpRelease") return } @@ -322,8 +328,9 @@ func Convert( co = &to.commonOp case fusekernel.OpReleasedir: - in := (*fusekernel.ReleaseIn)(m.Data()) - if m.Len() < unsafe.Sizeof(*in) { + type input fusekernel.ReleaseIn + in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + if in == nil { err = errors.New("Corrupt OpReleasedir") return } @@ -335,21 +342,20 @@ func Convert( co = &to.commonOp case fusekernel.OpWrite: - in := (*fusekernel.WriteIn)(m.Data()) - size := fusekernel.WriteInSize(protocol) - if m.Len() < size { + in := (*fusekernel.WriteIn)(m.Consume(fusekernel.WriteInSize(protocol))) + if in == nil { err = errors.New("Corrupt OpWrite") return } - buf := m.Bytes()[size:] + buf := m.ConsumeBytes(m.Len()) if len(buf) < int(in.Size) { err = errors.New("Corrupt OpWrite") return } to := &WriteFileOp{ - Inode: InodeID(m.Hdr.Nodeid), + Inode: InodeID(m.Header().Nodeid), Handle: HandleID(in.Fh), Data: buf, Offset: int64(in.Offset), @@ -358,28 +364,30 @@ func Convert( co = &to.commonOp case fusekernel.OpFsync: - in := (*fusekernel.FsyncIn)(m.Data()) - if m.Len() < unsafe.Sizeof(*in) { + type input fusekernel.FsyncIn + in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + if in == nil { err = errors.New("Corrupt OpFsync") return } to := &SyncFileOp{ - Inode: InodeID(m.Hdr.Nodeid), + Inode: InodeID(m.Header().Nodeid), Handle: HandleID(in.Fh), } io = to co = &to.commonOp case fusekernel.OpFlush: - in := (*fusekernel.FlushIn)(m.Data()) - if m.Len() < unsafe.Sizeof(*in) { + type input fusekernel.FlushIn + in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + if in == nil { err = errors.New("Corrupt OpFlush") return } to := &FlushFileOp{ - Inode: InodeID(m.Hdr.Nodeid), + Inode: InodeID(m.Header().Nodeid), Handle: HandleID(in.Fh), } io = to @@ -387,7 +395,7 @@ func Convert( case fusekernel.OpReadlink: to := &ReadSymlinkOp{ - Inode: InodeID(m.Hdr.Nodeid), + Inode: InodeID(m.Header().Nodeid), } io = to co = &to.commonOp @@ -398,8 +406,9 @@ func Convert( co = &to.commonOp case fusekernel.OpInterrupt: - in := (*fusekernel.InterruptIn)(m.Data()) - if m.Len() < unsafe.Sizeof(*in) { + type input fusekernel.InterruptIn + in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + if in == nil { err = errors.New("Corrupt OpInterrupt") return } @@ -412,8 +421,8 @@ func Convert( default: to := &unknownOp{ - opCode: m.Hdr.Opcode, - inode: InodeID(m.Hdr.Nodeid), + opCode: m.Header().Opcode, + inode: InodeID(m.Header().Nodeid), } io = to co = &to.commonOp @@ -422,7 +431,7 @@ func Convert( co.init( opCtx, io, - m.Hdr.Unique, + m.Header().Unique, sendReply, debugLogForOp, errorLogger) diff --git a/fuseops/ops.go b/fuseops/ops.go index 97cf058..65f951b 100644 --- a/fuseops/ops.go +++ b/fuseops/ops.go @@ -88,9 +88,9 @@ func (o *LookUpInodeOp) ShortDesc() (desc string) { return } -func (o *LookUpInodeOp) kernelResponse() (b buffer.Buffer) { +func (o *LookUpInodeOp) kernelResponse() (b buffer.OutMessage) { size := fusekernel.EntryOutSize(o.protocol) - b = buffer.New(size) + b = buffer.NewOutMessage(size) out := (*fusekernel.EntryOut)(b.Grow(size)) convertChildInodeEntry(&o.Entry, out) @@ -123,9 +123,9 @@ func (o *GetInodeAttributesOp) DebugString() string { o.Attributes.DebugString()) } -func (o *GetInodeAttributesOp) kernelResponse() (b buffer.Buffer) { +func (o *GetInodeAttributesOp) kernelResponse() (b buffer.OutMessage) { size := fusekernel.AttrOutSize(o.protocol) - b = buffer.New(size) + b = buffer.NewOutMessage(size) out := (*fusekernel.AttrOut)(b.Grow(size)) out.AttrValid, out.AttrValidNsec = convertExpirationTime(o.AttributesExpiration) convertAttributes(o.Inode, &o.Attributes, &out.Attr) @@ -157,9 +157,9 @@ type SetInodeAttributesOp struct { AttributesExpiration time.Time } -func (o *SetInodeAttributesOp) kernelResponse() (b buffer.Buffer) { +func (o *SetInodeAttributesOp) kernelResponse() (b buffer.OutMessage) { size := fusekernel.AttrOutSize(o.protocol) - b = buffer.New(size) + b = buffer.NewOutMessage(size) out := (*fusekernel.AttrOut)(b.Grow(size)) out.AttrValid, out.AttrValidNsec = convertExpirationTime(o.AttributesExpiration) convertAttributes(o.Inode, &o.Attributes, &out.Attr) @@ -216,7 +216,7 @@ type ForgetInodeOp struct { N uint64 } -func (o *ForgetInodeOp) kernelResponse() (b buffer.Buffer) { +func (o *ForgetInodeOp) kernelResponse() (b buffer.OutMessage) { // No response. return } @@ -259,9 +259,9 @@ func (o *MkDirOp) ShortDesc() (desc string) { return } -func (o *MkDirOp) kernelResponse() (b buffer.Buffer) { +func (o *MkDirOp) kernelResponse() (b buffer.OutMessage) { size := fusekernel.EntryOutSize(o.protocol) - b = buffer.New(size) + b = buffer.NewOutMessage(size) out := (*fusekernel.EntryOut)(b.Grow(size)) convertChildInodeEntry(&o.Entry, out) @@ -311,9 +311,9 @@ func (o *CreateFileOp) ShortDesc() (desc string) { return } -func (o *CreateFileOp) kernelResponse() (b buffer.Buffer) { +func (o *CreateFileOp) kernelResponse() (b buffer.OutMessage) { eSize := fusekernel.EntryOutSize(o.protocol) - b = buffer.New(eSize + unsafe.Sizeof(fusekernel.OpenOut{})) + b = buffer.NewOutMessage(eSize + unsafe.Sizeof(fusekernel.OpenOut{})) e := (*fusekernel.EntryOut)(b.Grow(eSize)) convertChildInodeEntry(&o.Entry, e) @@ -357,9 +357,9 @@ func (o *CreateSymlinkOp) ShortDesc() (desc string) { return } -func (o *CreateSymlinkOp) kernelResponse() (b buffer.Buffer) { +func (o *CreateSymlinkOp) kernelResponse() (b buffer.OutMessage) { size := fusekernel.EntryOutSize(o.protocol) - b = buffer.New(size) + b = buffer.NewOutMessage(size) out := (*fusekernel.EntryOut)(b.Grow(size)) convertChildInodeEntry(&o.Entry, out) @@ -418,8 +418,8 @@ type RenameOp struct { NewName string } -func (o *RenameOp) kernelResponse() (b buffer.Buffer) { - b = buffer.New(0) +func (o *RenameOp) kernelResponse() (b buffer.OutMessage) { + b = buffer.NewOutMessage(0) return } @@ -439,8 +439,8 @@ type RmDirOp struct { Name string } -func (o *RmDirOp) kernelResponse() (b buffer.Buffer) { - b = buffer.New(0) +func (o *RmDirOp) kernelResponse() (b buffer.OutMessage) { + b = buffer.NewOutMessage(0) return } @@ -459,8 +459,8 @@ type UnlinkOp struct { Name string } -func (o *UnlinkOp) kernelResponse() (b buffer.Buffer) { - b = buffer.New(0) +func (o *UnlinkOp) kernelResponse() (b buffer.OutMessage) { + b = buffer.NewOutMessage(0) return } @@ -491,8 +491,8 @@ type OpenDirOp struct { Handle HandleID } -func (o *OpenDirOp) kernelResponse() (b buffer.Buffer) { - b = buffer.New(unsafe.Sizeof(fusekernel.OpenOut{})) +func (o *OpenDirOp) kernelResponse() (b buffer.OutMessage) { + b = buffer.NewOutMessage(unsafe.Sizeof(fusekernel.OpenOut{})) out := (*fusekernel.OpenOut)(b.Grow(unsafe.Sizeof(fusekernel.OpenOut{}))) out.Fh = uint64(o.Handle) @@ -589,8 +589,8 @@ type ReadDirOp struct { Data []byte } -func (o *ReadDirOp) kernelResponse() (b buffer.Buffer) { - b = buffer.New(uintptr(len(o.Data))) +func (o *ReadDirOp) kernelResponse() (b buffer.OutMessage) { + b = buffer.NewOutMessage(uintptr(len(o.Data))) b.Append(o.Data) return } @@ -612,8 +612,8 @@ type ReleaseDirHandleOp struct { Handle HandleID } -func (o *ReleaseDirHandleOp) kernelResponse() (b buffer.Buffer) { - b = buffer.New(0) +func (o *ReleaseDirHandleOp) kernelResponse() (b buffer.OutMessage) { + b = buffer.NewOutMessage(0) return } @@ -643,8 +643,8 @@ type OpenFileOp struct { Handle HandleID } -func (o *OpenFileOp) kernelResponse() (b buffer.Buffer) { - b = buffer.New(unsafe.Sizeof(fusekernel.OpenOut{})) +func (o *OpenFileOp) kernelResponse() (b buffer.OutMessage) { + b = buffer.NewOutMessage(unsafe.Sizeof(fusekernel.OpenOut{})) out := (*fusekernel.OpenOut)(b.Grow(unsafe.Sizeof(fusekernel.OpenOut{}))) out.Fh = uint64(o.Handle) @@ -680,8 +680,8 @@ type ReadFileOp struct { Data []byte } -func (o *ReadFileOp) kernelResponse() (b buffer.Buffer) { - b = buffer.New(uintptr(len(o.Data))) +func (o *ReadFileOp) kernelResponse() (b buffer.OutMessage) { + b = buffer.NewOutMessage(uintptr(len(o.Data))) b.Append(o.Data) return } @@ -756,8 +756,8 @@ type WriteFileOp struct { Data []byte } -func (o *WriteFileOp) kernelResponse() (b buffer.Buffer) { - b = buffer.New(unsafe.Sizeof(fusekernel.WriteOut{})) +func (o *WriteFileOp) kernelResponse() (b buffer.OutMessage) { + b = buffer.NewOutMessage(unsafe.Sizeof(fusekernel.WriteOut{})) out := (*fusekernel.WriteOut)(b.Grow(unsafe.Sizeof(fusekernel.WriteOut{}))) out.Size = uint32(len(o.Data)) @@ -788,8 +788,8 @@ type SyncFileOp struct { Handle HandleID } -func (o *SyncFileOp) kernelResponse() (b buffer.Buffer) { - b = buffer.New(0) +func (o *SyncFileOp) kernelResponse() (b buffer.OutMessage) { + b = buffer.NewOutMessage(0) return } @@ -848,8 +848,8 @@ type FlushFileOp struct { Handle HandleID } -func (o *FlushFileOp) kernelResponse() (b buffer.Buffer) { - b = buffer.New(0) +func (o *FlushFileOp) kernelResponse() (b buffer.OutMessage) { + b = buffer.NewOutMessage(0) return } @@ -870,8 +870,8 @@ type ReleaseFileHandleOp struct { Handle HandleID } -func (o *ReleaseFileHandleOp) kernelResponse() (b buffer.Buffer) { - b = buffer.New(0) +func (o *ReleaseFileHandleOp) kernelResponse() (b buffer.OutMessage) { + b = buffer.NewOutMessage(0) return } @@ -888,7 +888,7 @@ func (o *unknownOp) ShortDesc() (desc string) { return } -func (o *unknownOp) kernelResponse() (b buffer.Buffer) { +func (o *unknownOp) kernelResponse() (b buffer.OutMessage) { panic(fmt.Sprintf("Should never get here for unknown op: %s", o.ShortDesc())) } @@ -907,8 +907,8 @@ type ReadSymlinkOp struct { Target string } -func (o *ReadSymlinkOp) kernelResponse() (b buffer.Buffer) { - b = buffer.New(uintptr(len(o.Target))) +func (o *ReadSymlinkOp) kernelResponse() (b buffer.OutMessage) { + b = buffer.NewOutMessage(uintptr(len(o.Target))) b.AppendString(o.Target) return } @@ -929,8 +929,8 @@ type InternalStatFSOp struct { commonOp } -func (o *InternalStatFSOp) kernelResponse() (b buffer.Buffer) { - b = buffer.New(unsafe.Sizeof(fusekernel.StatfsOut{})) +func (o *InternalStatFSOp) kernelResponse() (b buffer.OutMessage) { + b = buffer.NewOutMessage(unsafe.Sizeof(fusekernel.StatfsOut{})) b.Grow(unsafe.Sizeof(fusekernel.StatfsOut{})) return @@ -942,6 +942,6 @@ type InternalInterruptOp struct { FuseID uint64 } -func (o *InternalInterruptOp) kernelResponse() (b buffer.Buffer) { +func (o *InternalInterruptOp) kernelResponse() (b buffer.OutMessage) { panic("Shouldn't get here.") } diff --git a/internal/buffer/in_message.go b/internal/buffer/in_message.go new file mode 100644 index 0000000..f87f3b0 --- /dev/null +++ b/internal/buffer/in_message.go @@ -0,0 +1,115 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buffer + +import ( + "fmt" + "io" + "syscall" + "unsafe" + + "github.com/jacobsa/fuse/internal/fusekernel" +) + +// All requests read from the kernel, without data, are shorter than +// this. +const pageSize = 4096 + +func init() { + // Confirm the page size. + if syscall.Getpagesize() != pageSize { + panic(fmt.Sprintf("Page size is unexpectedly %d", syscall.Getpagesize())) + } +} + +// We size the buffer to have enough room for a fuse request plus data +// associated with a write request. +const bufSize = pageSize + MaxWriteSize + +// An incoming message from the kernel, including leading fusekernel.InHeader +// struct. Provides storage for messages and convenient access to their +// contents. +type InMessage struct { + remaining []byte + storage [bufSize]byte +} + +// Initialize with the data read by a single call to r.Read. The first call to +// Consume will consume the bytes directly after the fusekernel.InHeader +// struct. +func (m *InMessage) Init(r io.Reader) (err error) { + n, err := r.Read(m.storage[:]) + if err != nil { + return + } + + // Make sure the message is long enough. + const headerSize = unsafe.Sizeof(fusekernel.InHeader{}) + if uintptr(n) < headerSize { + err = fmt.Errorf("Unexpectedly read only %d bytes.", n) + return + } + + m.remaining = m.storage[headerSize:n] + + // Check the header's length. + if int(m.Header().Len) != n { + err = fmt.Errorf( + "Header says %d bytes, but we read %d", + m.Header().Len, + n) + + return + } + + return +} + +// Return a reference to the header read in the most recent call to Init. +func (m *InMessage) Header() (h *fusekernel.InHeader) { + h = (*fusekernel.InHeader)(unsafe.Pointer(&m.storage[0])) + return +} + +// Return the number of bytes left to consume. +func (m *InMessage) Len() uintptr { + return uintptr(len(m.remaining)) +} + +// Consume the next n bytes from the message, returning a nil pointer if there +// are fewer than n bytes available. +func (m *InMessage) Consume(n uintptr) (p unsafe.Pointer) { + if m.Len() == 0 || n > m.Len() { + return + } + + p = unsafe.Pointer(&m.remaining[0]) + m.remaining = m.remaining[n:] + + return +} + +// Equivalent to Consume, except returns a slice of bytes. The result will be +// nil if Consume would fail. +func (m *InMessage) ConsumeBytes(n uintptr) (b []byte) { + if n > m.Len() { + return + } + + b = m.remaining[:n] + m.remaining = m.remaining[n:] + + return +} diff --git a/internal/buffer/in_message_darwin.go b/internal/buffer/in_message_darwin.go new file mode 100644 index 0000000..af37a02 --- /dev/null +++ b/internal/buffer/in_message_darwin.go @@ -0,0 +1,21 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buffer + +// The maximum fuse write request size that InMessage can acommodate. +// +// Experimentally, OS X appears to cap the size of writes to 1 MiB, regardless +// of whether a larger size is specified in the mount options. +const MaxWriteSize = 1 << 20 diff --git a/internal/buffer/in_message_linux.go b/internal/buffer/in_message_linux.go new file mode 100644 index 0000000..964c7da --- /dev/null +++ b/internal/buffer/in_message_linux.go @@ -0,0 +1,21 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buffer + +// The maximum fuse write request size that InMessage can acommodate. +// +// Experimentally, Linux appears to refuse to honor a MaxWrite setting in an +// INIT response of more than 128 KiB. +const MaxWriteSize = 1 << 17 diff --git a/internal/buffer/buffer.go b/internal/buffer/out_message.go similarity index 81% rename from internal/buffer/buffer.go rename to internal/buffer/out_message.go index fe69a79..0fe8ef4 100644 --- a/internal/buffer/buffer.go +++ b/internal/buffer/out_message.go @@ -21,25 +21,26 @@ import ( "github.com/jacobsa/fuse/internal/fusekernel" ) -// Buffer provides a mechanism for constructing a single contiguous fuse +// OutMessage provides a mechanism for constructing a single contiguous fuse // message from multiple segments, where the first segment is always a // fusekernel.OutHeader message. // -// Must be created with New. Exception: the zero value has Bytes() == nil. -type Buffer struct { +// Must be created with NewOutMessage. Exception: the zero value has +// Bytes() == nil. +type OutMessage struct { slice []byte } // Create a new buffer whose initial contents are a zeroed fusekernel.OutHeader // message, and with room enough to grow by extra bytes. -func New(extra uintptr) (b Buffer) { +func NewOutMessage(extra uintptr) (b OutMessage) { const headerSize = unsafe.Sizeof(fusekernel.OutHeader{}) b.slice = make([]byte, headerSize, headerSize+extra) return } // Return a pointer to the header at the start of the buffer. -func (b *Buffer) OutHeader() (h *fusekernel.OutHeader) { +func (b *OutMessage) OutHeader() (h *fusekernel.OutHeader) { sh := (*reflect.SliceHeader)(unsafe.Pointer(&b.slice)) h = (*fusekernel.OutHeader)(unsafe.Pointer(sh.Data)) return @@ -48,7 +49,7 @@ func (b *Buffer) OutHeader() (h *fusekernel.OutHeader) { // Grow the buffer by the supplied number of bytes, returning a pointer to the // start of the new segment. The sum of the arguments given to Grow must not // exceed the argument given to New when creating the buffer. -func (b *Buffer) Grow(size uintptr) (p unsafe.Pointer) { +func (b *OutMessage) Grow(size uintptr) (p unsafe.Pointer) { sh := (*reflect.SliceHeader)(unsafe.Pointer(&b.slice)) p = unsafe.Pointer(sh.Data + uintptr(sh.Len)) b.slice = b.slice[:len(b.slice)+int(size)] @@ -56,7 +57,7 @@ func (b *Buffer) Grow(size uintptr) (p unsafe.Pointer) { } // Equivalent to growing by the length of p, then copying p into the new segment. -func (b *Buffer) Append(p []byte) { +func (b *OutMessage) Append(p []byte) { sh := reflect.SliceHeader{ Data: uintptr(b.Grow(uintptr(len(p)))), Len: len(p), @@ -67,7 +68,7 @@ func (b *Buffer) Append(p []byte) { } // Equivalent to growing by the length of s, then copying s into the new segment. -func (b *Buffer) AppendString(s string) { +func (b *OutMessage) AppendString(s string) { sh := reflect.SliceHeader{ Data: uintptr(b.Grow(uintptr(len(s)))), Len: len(s), @@ -78,6 +79,6 @@ func (b *Buffer) AppendString(s string) { } // Return a reference to the current contents of the buffer. -func (b *Buffer) Bytes() []byte { +func (b *OutMessage) Bytes() []byte { return b.slice } diff --git a/internal/fuseshim/fuse.go b/internal/fuseshim/fuse.go index 4748db2..22fde82 100644 --- a/internal/fuseshim/fuse.go +++ b/internal/fuseshim/fuse.go @@ -125,10 +125,10 @@ type Conn struct { MountError error // File handle for kernel communication. Only safe to access if - // rio or wio is held. - dev *os.File - wio sync.RWMutex - rio sync.RWMutex + // Rio or Wio is held. + Dev *os.File + Wio sync.RWMutex + Rio sync.RWMutex // Protocol version negotiated with InitRequest/InitResponse. proto fusekernel.Protocol @@ -162,7 +162,7 @@ func Mount(dir string, options ...MountOption) (*Conn, error) { if err != nil { return nil, err } - c.dev = f + c.Dev = f if err := initMount(c, &conf); err != nil { c.Close() @@ -513,16 +513,16 @@ func (malformedMessage) String() string { // Close closes the FUSE connection. func (c *Conn) Close() error { - c.wio.Lock() - defer c.wio.Unlock() - c.rio.Lock() - defer c.rio.Unlock() - return c.dev.Close() + c.Wio.Lock() + defer c.Wio.Unlock() + c.Rio.Lock() + defer c.Rio.Unlock() + return c.Dev.Close() } -// caller must hold wio or rio +// caller must hold Wio or Rio func (c *Conn) fd() int { - return int(c.dev.Fd()) + return int(c.Dev.Fd()) } func (c *Conn) Protocol() fusekernel.Protocol { @@ -536,9 +536,9 @@ func (c *Conn) Protocol() fusekernel.Protocol { func (c *Conn) ReadMessage() (m *Message, err error) { m = getMessage(c) loop: - c.rio.RLock() + c.Rio.RLock() n, err := syscall.Read(c.fd(), m.buf) - c.rio.RUnlock() + c.Rio.RUnlock() if err == syscall.EINTR { // OSXFUSE sends EINTR to userspace when a request interrupt // completed before it got sent to userspace? @@ -1068,8 +1068,8 @@ func (c *Conn) writeToKernel(msg []byte) error { } func (c *Conn) WriteToKernel(msg []byte) error { - c.wio.RLock() - defer c.wio.RUnlock() + c.Wio.RLock() + defer c.Wio.RUnlock() _, err := syscall.Write(c.fd(), msg) return err }