diff --git a/connection.go b/connection.go index ea99282..f214d7d 100644 --- a/connection.go +++ b/connection.go @@ -333,7 +333,7 @@ func (c *Connection) readMessage() (*buffer.InMessage, error) { // Loop past transient errors. for { - // Attempt a reaed. + // Attempt a read. err := m.Init(c.dev) // Special cases: @@ -400,7 +400,7 @@ func (c *Connection) ReadOp() (_ context.Context, op interface{}, _ error) { // Convert the message to an op. outMsg := c.getOutMessage() - op, err = convertInMessage(inMsg, outMsg, c.protocol) + op, err = convertInMessage(&c.cfg, inMsg, outMsg, c.protocol) if err != nil { c.putOutMessage(outMsg) return nil, nil, fmt.Errorf("convertInMessage: %v", err) @@ -505,10 +505,16 @@ func (c *Connection) Reply(ctx context.Context, opErr error) { noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr) if !noResponse { - err := c.writeMessage(outMsg.Bytes()) + var err error + if outMsg.Sglist != nil { + _, err = writev(int(c.dev.Fd()), outMsg.Sglist) + } else { + err = c.writeMessage(outMsg.Bytes()) + } if err != nil && c.errorLogger != nil { c.errorLogger.Printf("writeMessage: %v %v", err, outMsg.Bytes()) } + outMsg.Sglist = nil } } diff --git a/conversions.go b/conversions.go index dddb3c0..6750369 100644 --- a/conversions.go +++ b/conversions.go @@ -38,6 +38,7 @@ import ( // // The caller is responsible for arranging for the message to be destroyed. func convertInMessage( + config *MountConfig, inMsg *buffer.InMessage, outMsg *buffer.OutMessage, protocol fusekernel.Protocol) (o interface{}, err error) { @@ -271,24 +272,28 @@ func convertInMessage( return nil, errors.New("Corrupt OpRead") } - to := &fuseops.ReadFileOp{ - Inode: fuseops.InodeID(inMsg.Header().Nodeid), - Handle: fuseops.HandleID(in.Fh), - Offset: int64(in.Offset), - OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid}, + if !config.UseVectoredRead { + // Use part of the incoming message storage as a read buffer + buf := inMsg.GetFree(int(in.Size)) + to := &fuseops.ReadFileOp{ + Inode: fuseops.InodeID(inMsg.Header().Nodeid), + Handle: fuseops.HandleID(in.Fh), + Offset: int64(in.Offset), + Dst: buf, + OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid}, + } + o = to + } else { + // Don't allocate any buffers when zero-copy is used + to := &fuseops.VectoredReadOp{ + Inode: fuseops.InodeID(inMsg.Header().Nodeid), + Handle: fuseops.HandleID(in.Fh), + Offset: int64(in.Offset), + Size: int64(in.Size), + OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid}, + } + o = to } - o = to - - readSize := int(in.Size) - p := outMsg.GrowNoZero(readSize) - if p == nil { - return nil, fmt.Errorf("Can't grow for %d-byte read", readSize) - } - - sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst)) - sh.Data = uintptr(p) - sh.Len = readSize - sh.Cap = readSize case fusekernel.OpReaddir: in := (*fusekernel.ReadIn)(inMsg.Consume(fusekernel.ReadInSize(protocol))) @@ -476,15 +481,19 @@ func convertInMessage( o = to readSize := int(in.Size) - p := outMsg.GrowNoZero(readSize) - if p == nil { - return nil, fmt.Errorf("Can't grow for %d-byte read", readSize) - } + if readSize > 0 { + p := outMsg.GrowNoZero(readSize) + if p == nil { + return nil, fmt.Errorf("Can't grow for %d-byte read", readSize) + } - sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst)) - sh.Data = uintptr(p) - sh.Len = readSize - sh.Cap = readSize + sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst)) + sh.Data = uintptr(p) + sh.Len = readSize + sh.Cap = readSize + } else { + to.Dst = nil + } case fusekernel.OpListxattr: type input fusekernel.ListxattrIn @@ -705,9 +714,11 @@ func (c *Connection) kernelResponseForOp( } case *fuseops.ReadFileOp: - // convertInMessage already set up the destination buffer to be at the end - // of the out message. We need only shrink to the right size based on how - // much the user read. + m.Append(o.Dst) + m.ShrinkTo(buffer.OutMessageHeaderSize + o.BytesRead) + + case *fuseops.VectoredReadOp: + m.Append(o.Data...) m.ShrinkTo(buffer.OutMessageHeaderSize + o.BytesRead) case *fuseops.WriteFileOp: diff --git a/debug.go b/debug.go index 2a4dcc0..8b0d0e1 100644 --- a/debug.go +++ b/debug.go @@ -97,6 +97,11 @@ func describeRequest(op interface{}) (s string) { addComponent("offset %d", typed.Offset) addComponent("%d bytes", len(typed.Dst)) + case *fuseops.VectoredReadOp: + addComponent("handle %d", typed.Handle) + addComponent("offset %d", typed.Offset) + addComponent("%d bytes", typed.Size) + case *fuseops.WriteFileOp: addComponent("handle %d", typed.Handle) addComponent("offset %d", typed.Offset) diff --git a/fuseops/ops.go b/fuseops/ops.go index 8af3d51..c3a3e68 100644 --- a/fuseops/ops.go +++ b/fuseops/ops.go @@ -654,6 +654,37 @@ type ReadFileOp struct { OpContext OpContext } +// Vectored read - same as ReadFileOp, but the buffer isn't provided by the library. +// The file system returns a list of slices instead. +type VectoredReadOp struct { + // The file inode that we are reading, and the handle previously returned by + // CreateFile or OpenFile when opening that inode. + Inode InodeID + Handle HandleID + + // The offset within the file at which to read. + Offset int64 + + // The size of the read. + Size int64 + + // Set by the file system: data to send back to the client. + Data [][]byte + + // Set by the file system: the number of bytes read. + // + // The FUSE documentation requires that exactly the requested number of bytes + // be returned, except in the case of EOF or error (http://goo.gl/ZgfBkF). + // This appears to be because it uses file mmapping machinery + // (http://goo.gl/SGxnaN) to read a page at a time. It appears to understand + // where EOF is by checking the inode size (http://goo.gl/0BkqKD), returned + // by a previous call to LookUpInode, GetInodeAttributes, etc. + // + // If direct IO is enabled, semantics should match those of read(2). + BytesRead int + OpContext OpContext +} + // Write data to a file previously opened with CreateFile or OpenFile. // // When the user writes data using write(2), the write goes into the page diff --git a/fuseutil/file_system.go b/fuseutil/file_system.go index 5eb8bca..711a4b4 100644 --- a/fuseutil/file_system.go +++ b/fuseutil/file_system.go @@ -52,6 +52,7 @@ type FileSystem interface { ReleaseDirHandle(context.Context, *fuseops.ReleaseDirHandleOp) error OpenFile(context.Context, *fuseops.OpenFileOp) error ReadFile(context.Context, *fuseops.ReadFileOp) error + VectoredRead(context.Context, *fuseops.VectoredReadOp) error WriteFile(context.Context, *fuseops.WriteFileOp) error SyncFile(context.Context, *fuseops.SyncFileOp) error FlushFile(context.Context, *fuseops.FlushFileOp) error @@ -190,6 +191,9 @@ func (s *fileSystemServer) handleOp( case *fuseops.ReadFileOp: err = s.fs.ReadFile(ctx, typed) + case *fuseops.VectoredReadOp: + err = s.fs.VectoredRead(ctx, typed) + case *fuseops.WriteFileOp: err = s.fs.WriteFile(ctx, typed) diff --git a/fuseutil/not_implemented_file_system.go b/fuseutil/not_implemented_file_system.go index 4d29cfa..9aa59d2 100644 --- a/fuseutil/not_implemented_file_system.go +++ b/fuseutil/not_implemented_file_system.go @@ -138,6 +138,12 @@ func (fs *NotImplementedFileSystem) ReadFile( return fuse.ENOSYS } +func (fs *NotImplementedFileSystem) VectoredRead( + ctx context.Context, + op *fuseops.VectoredReadOp) error { + return fuse.ENOSYS +} + func (fs *NotImplementedFileSystem) WriteFile( ctx context.Context, op *fuseops.WriteFileOp) error { diff --git a/internal/buffer/in_message.go b/internal/buffer/in_message.go index cfa1d31..a63b5cc 100644 --- a/internal/buffer/in_message.go +++ b/internal/buffer/in_message.go @@ -42,13 +42,16 @@ func init() { type InMessage struct { remaining []byte storage []byte + size int } // Initialize with the data read by a single call to r.Read. The first call to // Consume will consume the bytes directly after the fusekernel.InHeader // struct. func (m *InMessage) Init(r io.Reader) error { - m.storage = make([]byte, bufSize, bufSize) + if m.storage == nil { + m.storage = make([]byte, bufSize, bufSize) + } n, err := r.Read(m.storage[:]) if err != nil { return err @@ -60,6 +63,7 @@ func (m *InMessage) Init(r io.Reader) error { return fmt.Errorf("Unexpectedly read only %d bytes.", n) } + m.size = n m.remaining = m.storage[headerSize:n] // Check the header's length. @@ -108,3 +112,11 @@ func (m *InMessage) ConsumeBytes(n uintptr) []byte { return b } + +// Get the next n bytes after the message to use them as a temporary buffer +func (m *InMessage) GetFree(n int) []byte { + if n <= 0 || n > len(m.storage)-m.size { + return nil + } + return m.storage[m.size : m.size+n] +} diff --git a/internal/buffer/out_message.go b/internal/buffer/out_message.go index 7e89afb..2a280bb 100644 --- a/internal/buffer/out_message.go +++ b/internal/buffer/out_message.go @@ -16,7 +16,6 @@ package buffer import ( "fmt" - "log" "reflect" "unsafe" @@ -33,30 +32,15 @@ const OutMessageHeaderSize = int(unsafe.Sizeof(fusekernel.OutHeader{})) // // Must be initialized with Reset. type OutMessage struct { - // The offset into payload to which we're currently writing. - payloadOffset int - header fusekernel.OutHeader - payload [MaxReadSize]byte -} - -// Make sure that the header and payload are contiguous. -func init() { - a := unsafe.Offsetof(OutMessage{}.header) + uintptr(OutMessageHeaderSize) - b := unsafe.Offsetof(OutMessage{}.payload) - - if a != b { - log.Panicf( - "header ends at offset %d, but payload starts at offset %d", - a, b) - } + Sglist [][]byte } // Reset resets m so that it's ready to be used again. Afterward, the contents // are solely a zeroed fusekernel.OutHeader struct. func (m *OutMessage) Reset() { - m.payloadOffset = 0 m.header = fusekernel.OutHeader{} + m.Sglist = nil } // OutHeader returns a pointer to the header at the start of the message. @@ -69,25 +53,15 @@ func (m *OutMessage) OutHeader() *fusekernel.OutHeader { // insufficient space, it returns nil. func (m *OutMessage) Grow(n int) unsafe.Pointer { p := m.GrowNoZero(n) - if p != nil { - jacobsa_fuse_memclr(p, uintptr(n)) - } - return p } // GrowNoZero is equivalent to Grow, except the new segment is not zeroed. Use // with caution! func (m *OutMessage) GrowNoZero(n int) unsafe.Pointer { - // Will we overflow the buffer? - o := m.payloadOffset - if len(m.payload)-o < n { - return nil - } - - p := unsafe.Pointer(uintptr(unsafe.Pointer(&m.payload)) + uintptr(o)) - m.payloadOffset = o + n - + b := make([]byte, n) + m.Append(b) + p := unsafe.Pointer(&b[0]) return p } @@ -100,51 +74,63 @@ func (m *OutMessage) ShrinkTo(n int) { n, m.Len())) } - - m.payloadOffset = n - OutMessageHeaderSize + if n == OutMessageHeaderSize { + m.Sglist = nil + } else { + i := 1 + n -= OutMessageHeaderSize + for len(m.Sglist) > i && n >= len(m.Sglist[i]) { + n -= len(m.Sglist[i]) + i++ + } + if n > 0 { + m.Sglist[i] = m.Sglist[i][0 : n] + i++ + } + m.Sglist = m.Sglist[0 : i] + } } // Append is equivalent to growing by len(src), then copying src over the new // segment. Int panics if there is not enough room available. -func (m *OutMessage) Append(src []byte) { - p := m.GrowNoZero(len(src)) - if p == nil { - panic(fmt.Sprintf("Can't grow %d bytes", len(src))) +func (m *OutMessage) Append(src ...[]byte) { + if m.Sglist == nil { + m.Sglist = append(m.Sglist, nil) + *(*reflect.SliceHeader)(unsafe.Pointer(&m.Sglist[0])) = reflect.SliceHeader{ + Data: uintptr(unsafe.Pointer(&m.header)), + Len: OutMessageHeaderSize, + Cap: OutMessageHeaderSize, + } } - - sh := (*reflect.SliceHeader)(unsafe.Pointer(&src)) - jacobsa_fuse_memmove(p, unsafe.Pointer(sh.Data), uintptr(sh.Len)) - + m.Sglist = append(m.Sglist, src...) return } // AppendString is like Append, but accepts string input. func (m *OutMessage) AppendString(src string) { - p := m.GrowNoZero(len(src)) - if p == nil { - panic(fmt.Sprintf("Can't grow %d bytes", len(src))) - } - - sh := (*reflect.StringHeader)(unsafe.Pointer(&src)) - jacobsa_fuse_memmove(p, unsafe.Pointer(sh.Data), uintptr(sh.Len)) - + m.Append([]byte(src)) return } // Len returns the current size of the message, including the leading header. func (m *OutMessage) Len() int { - return OutMessageHeaderSize + m.payloadOffset + if m.Sglist == nil { + return OutMessageHeaderSize + } + r := 0 + for _, b := range m.Sglist { + r += len(b) + } + return r } -// Bytes returns a reference to the current contents of the buffer, including -// the leading header. +// Bytes returns a byte slice containing the current header. func (m *OutMessage) Bytes() []byte { - l := m.Len() + l := OutMessageHeaderSize sh := reflect.SliceHeader{ Data: uintptr(unsafe.Pointer(&m.header)), Len: l, Cap: l, } - return *(*[]byte)(unsafe.Pointer(&sh)) } diff --git a/internal/buffer/out_message_test.go b/internal/buffer/out_message_test.go index 6292794..4945405 100644 --- a/internal/buffer/out_message_test.go +++ b/internal/buffer/out_message_test.go @@ -107,7 +107,10 @@ func TestOutMessageAppend(t *testing.T) { t.Errorf("om.Len() = %d, want %d", got, want) } - b := om.Bytes() + b := []byte(nil) + for i := 0; i < len(om.Sglist); i++ { + b = append(b, om.Sglist[i]...) + } if got, want := len(b), wantLen; got != want { t.Fatalf("len(om.Bytes()) = %d, want %d", got, want) } @@ -137,7 +140,10 @@ func TestOutMessageAppendString(t *testing.T) { t.Errorf("om.Len() = %d, want %d", got, want) } - b := om.Bytes() + b := []byte(nil) + for i := 0; i < len(om.Sglist); i++ { + b = append(b, om.Sglist[i]...) + } if got, want := len(b), wantLen; got != want { t.Fatalf("len(om.Bytes()) = %d, want %d", got, want) } @@ -168,7 +174,10 @@ func TestOutMessageShrinkTo(t *testing.T) { t.Errorf("om.Len() = %d, want %d", got, want) } - b := om.Bytes() + b := []byte(nil) + for i := 0; i < len(om.Sglist); i++ { + b = append(b, om.Sglist[i]...) + } if got, want := len(b), wantLen; got != want { t.Fatalf("len(om.Bytes()) = %d, want %d", got, want) } @@ -283,7 +292,10 @@ func TestOutMessageGrow(t *testing.T) { t.Errorf("om.Len() = %d, want %d", got, want) } - b := om.Bytes() + b := []byte(nil) + for i := 0; i < len(om.Sglist); i++ { + b = append(b, om.Sglist[i]...) + } if got, want := len(b), wantLen; got != want { t.Fatalf("len(om.Len()) = %d, want %d", got, want) } @@ -304,7 +316,7 @@ func BenchmarkOutMessageReset(b *testing.B) { om.Reset() } - b.SetBytes(int64(unsafe.Offsetof(om.payload))) + b.SetBytes(int64(om.Len())) }) // Many megabytes worth of buffers, which should defeat the CPU cache. @@ -321,7 +333,7 @@ func BenchmarkOutMessageReset(b *testing.B) { oms[i%numMessages].Reset() } - b.SetBytes(int64(unsafe.Offsetof(oms[0].payload))) + b.SetBytes(int64(oms[0].Len())) }) } diff --git a/mount_config.go b/mount_config.go index a93c321..16bfcc3 100644 --- a/mount_config.go +++ b/mount_config.go @@ -156,6 +156,11 @@ type MountConfig struct { // actually utilise any form of qualifiable UNIX permissions. DisableDefaultPermissions bool + // Use VectoredReadOp instead of ReadFileOp. + // Vectored read allows file systems to reduce memory copying overhead if + // the data is already in memory when they return it to FUSE. + UseVectoredRead bool + // OS X only. // // The name of the mounted volume, as displayed in the Finder. If empty, a diff --git a/writev.go b/writev.go new file mode 100644 index 0000000..05c9358 --- /dev/null +++ b/writev.go @@ -0,0 +1,29 @@ +package fuse + +import ( + "syscall" + "unsafe" +) + +func writev(fd int, packet [][]byte) (n int, err error) { + iovecs := make([]syscall.Iovec, 0, len(packet)) + for _, v := range packet { + if len(v) == 0 { + continue + } + vec := syscall.Iovec{ + Base: &v[0], + } + vec.SetLen(len(v)) + iovecs = append(iovecs, vec) + } + n1, _, e1 := syscall.Syscall( + syscall.SYS_WRITEV, + uintptr(fd), uintptr(unsafe.Pointer(&iovecs[0])), uintptr(len(iovecs)), + ) + n = int(n1) + if e1 != 0 { + err = syscall.Errno(e1) + } + return +}