From fcabfc89e991664e849c989acaa77ef01827ee43 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 25 Aug 2021 11:05:53 +0300 Subject: [PATCH] Introduce VectoredReadOp Read requests can now take vectored responses from the filesystem implementation and send them to FUSE device via the writev() system call. This allows file systems to send data without copying it into the library-provided buffer if the data is already in memory. The change also speeds up normal ReadFileOps as a side effect because it removes extra memory allocations. --- connection.go | 12 +++- conversions.go | 67 ++++++++++-------- debug.go | 5 ++ fuseops/ops.go | 31 ++++++++ fuseutil/file_system.go | 4 ++ fuseutil/not_implemented_file_system.go | 6 ++ internal/buffer/in_message.go | 14 +++- internal/buffer/out_message.go | 94 +++++++++++-------------- internal/buffer/out_message_test.go | 24 +++++-- mount_config.go | 5 ++ writev.go | 29 ++++++++ 11 files changed, 199 insertions(+), 92 deletions(-) create mode 100644 writev.go diff --git a/connection.go b/connection.go index ea99282..f214d7d 100644 --- a/connection.go +++ b/connection.go @@ -333,7 +333,7 @@ func (c *Connection) readMessage() (*buffer.InMessage, error) { // Loop past transient errors. for { - // Attempt a reaed. + // Attempt a read. err := m.Init(c.dev) // Special cases: @@ -400,7 +400,7 @@ func (c *Connection) ReadOp() (_ context.Context, op interface{}, _ error) { // Convert the message to an op. outMsg := c.getOutMessage() - op, err = convertInMessage(inMsg, outMsg, c.protocol) + op, err = convertInMessage(&c.cfg, inMsg, outMsg, c.protocol) if err != nil { c.putOutMessage(outMsg) return nil, nil, fmt.Errorf("convertInMessage: %v", err) @@ -505,10 +505,16 @@ func (c *Connection) Reply(ctx context.Context, opErr error) { noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr) if !noResponse { - err := c.writeMessage(outMsg.Bytes()) + var err error + if outMsg.Sglist != nil { + _, err = writev(int(c.dev.Fd()), outMsg.Sglist) + } else { + err = c.writeMessage(outMsg.Bytes()) + } if err != nil && c.errorLogger != nil { c.errorLogger.Printf("writeMessage: %v %v", err, outMsg.Bytes()) } + outMsg.Sglist = nil } } diff --git a/conversions.go b/conversions.go index dddb3c0..6750369 100644 --- a/conversions.go +++ b/conversions.go @@ -38,6 +38,7 @@ import ( // // The caller is responsible for arranging for the message to be destroyed. func convertInMessage( + config *MountConfig, inMsg *buffer.InMessage, outMsg *buffer.OutMessage, protocol fusekernel.Protocol) (o interface{}, err error) { @@ -271,24 +272,28 @@ func convertInMessage( return nil, errors.New("Corrupt OpRead") } - to := &fuseops.ReadFileOp{ - Inode: fuseops.InodeID(inMsg.Header().Nodeid), - Handle: fuseops.HandleID(in.Fh), - Offset: int64(in.Offset), - OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid}, + if !config.UseVectoredRead { + // Use part of the incoming message storage as a read buffer + buf := inMsg.GetFree(int(in.Size)) + to := &fuseops.ReadFileOp{ + Inode: fuseops.InodeID(inMsg.Header().Nodeid), + Handle: fuseops.HandleID(in.Fh), + Offset: int64(in.Offset), + Dst: buf, + OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid}, + } + o = to + } else { + // Don't allocate any buffers when zero-copy is used + to := &fuseops.VectoredReadOp{ + Inode: fuseops.InodeID(inMsg.Header().Nodeid), + Handle: fuseops.HandleID(in.Fh), + Offset: int64(in.Offset), + Size: int64(in.Size), + OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid}, + } + o = to } - o = to - - readSize := int(in.Size) - p := outMsg.GrowNoZero(readSize) - if p == nil { - return nil, fmt.Errorf("Can't grow for %d-byte read", readSize) - } - - sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst)) - sh.Data = uintptr(p) - sh.Len = readSize - sh.Cap = readSize case fusekernel.OpReaddir: in := (*fusekernel.ReadIn)(inMsg.Consume(fusekernel.ReadInSize(protocol))) @@ -476,15 +481,19 @@ func convertInMessage( o = to readSize := int(in.Size) - p := outMsg.GrowNoZero(readSize) - if p == nil { - return nil, fmt.Errorf("Can't grow for %d-byte read", readSize) - } + if readSize > 0 { + p := outMsg.GrowNoZero(readSize) + if p == nil { + return nil, fmt.Errorf("Can't grow for %d-byte read", readSize) + } - sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst)) - sh.Data = uintptr(p) - sh.Len = readSize - sh.Cap = readSize + sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst)) + sh.Data = uintptr(p) + sh.Len = readSize + sh.Cap = readSize + } else { + to.Dst = nil + } case fusekernel.OpListxattr: type input fusekernel.ListxattrIn @@ -705,9 +714,11 @@ func (c *Connection) kernelResponseForOp( } case *fuseops.ReadFileOp: - // convertInMessage already set up the destination buffer to be at the end - // of the out message. We need only shrink to the right size based on how - // much the user read. + m.Append(o.Dst) + m.ShrinkTo(buffer.OutMessageHeaderSize + o.BytesRead) + + case *fuseops.VectoredReadOp: + m.Append(o.Data...) m.ShrinkTo(buffer.OutMessageHeaderSize + o.BytesRead) case *fuseops.WriteFileOp: diff --git a/debug.go b/debug.go index 2a4dcc0..8b0d0e1 100644 --- a/debug.go +++ b/debug.go @@ -97,6 +97,11 @@ func describeRequest(op interface{}) (s string) { addComponent("offset %d", typed.Offset) addComponent("%d bytes", len(typed.Dst)) + case *fuseops.VectoredReadOp: + addComponent("handle %d", typed.Handle) + addComponent("offset %d", typed.Offset) + addComponent("%d bytes", typed.Size) + case *fuseops.WriteFileOp: addComponent("handle %d", typed.Handle) addComponent("offset %d", typed.Offset) diff --git a/fuseops/ops.go b/fuseops/ops.go index 8af3d51..c3a3e68 100644 --- a/fuseops/ops.go +++ b/fuseops/ops.go @@ -654,6 +654,37 @@ type ReadFileOp struct { OpContext OpContext } +// Vectored read - same as ReadFileOp, but the buffer isn't provided by the library. +// The file system returns a list of slices instead. +type VectoredReadOp struct { + // The file inode that we are reading, and the handle previously returned by + // CreateFile or OpenFile when opening that inode. + Inode InodeID + Handle HandleID + + // The offset within the file at which to read. + Offset int64 + + // The size of the read. + Size int64 + + // Set by the file system: data to send back to the client. + Data [][]byte + + // Set by the file system: the number of bytes read. + // + // The FUSE documentation requires that exactly the requested number of bytes + // be returned, except in the case of EOF or error (http://goo.gl/ZgfBkF). + // This appears to be because it uses file mmapping machinery + // (http://goo.gl/SGxnaN) to read a page at a time. It appears to understand + // where EOF is by checking the inode size (http://goo.gl/0BkqKD), returned + // by a previous call to LookUpInode, GetInodeAttributes, etc. + // + // If direct IO is enabled, semantics should match those of read(2). + BytesRead int + OpContext OpContext +} + // Write data to a file previously opened with CreateFile or OpenFile. // // When the user writes data using write(2), the write goes into the page diff --git a/fuseutil/file_system.go b/fuseutil/file_system.go index 5eb8bca..711a4b4 100644 --- a/fuseutil/file_system.go +++ b/fuseutil/file_system.go @@ -52,6 +52,7 @@ type FileSystem interface { ReleaseDirHandle(context.Context, *fuseops.ReleaseDirHandleOp) error OpenFile(context.Context, *fuseops.OpenFileOp) error ReadFile(context.Context, *fuseops.ReadFileOp) error + VectoredRead(context.Context, *fuseops.VectoredReadOp) error WriteFile(context.Context, *fuseops.WriteFileOp) error SyncFile(context.Context, *fuseops.SyncFileOp) error FlushFile(context.Context, *fuseops.FlushFileOp) error @@ -190,6 +191,9 @@ func (s *fileSystemServer) handleOp( case *fuseops.ReadFileOp: err = s.fs.ReadFile(ctx, typed) + case *fuseops.VectoredReadOp: + err = s.fs.VectoredRead(ctx, typed) + case *fuseops.WriteFileOp: err = s.fs.WriteFile(ctx, typed) diff --git a/fuseutil/not_implemented_file_system.go b/fuseutil/not_implemented_file_system.go index 4d29cfa..9aa59d2 100644 --- a/fuseutil/not_implemented_file_system.go +++ b/fuseutil/not_implemented_file_system.go @@ -138,6 +138,12 @@ func (fs *NotImplementedFileSystem) ReadFile( return fuse.ENOSYS } +func (fs *NotImplementedFileSystem) VectoredRead( + ctx context.Context, + op *fuseops.VectoredReadOp) error { + return fuse.ENOSYS +} + func (fs *NotImplementedFileSystem) WriteFile( ctx context.Context, op *fuseops.WriteFileOp) error { diff --git a/internal/buffer/in_message.go b/internal/buffer/in_message.go index cfa1d31..a63b5cc 100644 --- a/internal/buffer/in_message.go +++ b/internal/buffer/in_message.go @@ -42,13 +42,16 @@ func init() { type InMessage struct { remaining []byte storage []byte + size int } // Initialize with the data read by a single call to r.Read. The first call to // Consume will consume the bytes directly after the fusekernel.InHeader // struct. func (m *InMessage) Init(r io.Reader) error { - m.storage = make([]byte, bufSize, bufSize) + if m.storage == nil { + m.storage = make([]byte, bufSize, bufSize) + } n, err := r.Read(m.storage[:]) if err != nil { return err @@ -60,6 +63,7 @@ func (m *InMessage) Init(r io.Reader) error { return fmt.Errorf("Unexpectedly read only %d bytes.", n) } + m.size = n m.remaining = m.storage[headerSize:n] // Check the header's length. @@ -108,3 +112,11 @@ func (m *InMessage) ConsumeBytes(n uintptr) []byte { return b } + +// Get the next n bytes after the message to use them as a temporary buffer +func (m *InMessage) GetFree(n int) []byte { + if n <= 0 || n > len(m.storage)-m.size { + return nil + } + return m.storage[m.size : m.size+n] +} diff --git a/internal/buffer/out_message.go b/internal/buffer/out_message.go index 7e89afb..2a280bb 100644 --- a/internal/buffer/out_message.go +++ b/internal/buffer/out_message.go @@ -16,7 +16,6 @@ package buffer import ( "fmt" - "log" "reflect" "unsafe" @@ -33,30 +32,15 @@ const OutMessageHeaderSize = int(unsafe.Sizeof(fusekernel.OutHeader{})) // // Must be initialized with Reset. type OutMessage struct { - // The offset into payload to which we're currently writing. - payloadOffset int - header fusekernel.OutHeader - payload [MaxReadSize]byte -} - -// Make sure that the header and payload are contiguous. -func init() { - a := unsafe.Offsetof(OutMessage{}.header) + uintptr(OutMessageHeaderSize) - b := unsafe.Offsetof(OutMessage{}.payload) - - if a != b { - log.Panicf( - "header ends at offset %d, but payload starts at offset %d", - a, b) - } + Sglist [][]byte } // Reset resets m so that it's ready to be used again. Afterward, the contents // are solely a zeroed fusekernel.OutHeader struct. func (m *OutMessage) Reset() { - m.payloadOffset = 0 m.header = fusekernel.OutHeader{} + m.Sglist = nil } // OutHeader returns a pointer to the header at the start of the message. @@ -69,25 +53,15 @@ func (m *OutMessage) OutHeader() *fusekernel.OutHeader { // insufficient space, it returns nil. func (m *OutMessage) Grow(n int) unsafe.Pointer { p := m.GrowNoZero(n) - if p != nil { - jacobsa_fuse_memclr(p, uintptr(n)) - } - return p } // GrowNoZero is equivalent to Grow, except the new segment is not zeroed. Use // with caution! func (m *OutMessage) GrowNoZero(n int) unsafe.Pointer { - // Will we overflow the buffer? - o := m.payloadOffset - if len(m.payload)-o < n { - return nil - } - - p := unsafe.Pointer(uintptr(unsafe.Pointer(&m.payload)) + uintptr(o)) - m.payloadOffset = o + n - + b := make([]byte, n) + m.Append(b) + p := unsafe.Pointer(&b[0]) return p } @@ -100,51 +74,63 @@ func (m *OutMessage) ShrinkTo(n int) { n, m.Len())) } - - m.payloadOffset = n - OutMessageHeaderSize + if n == OutMessageHeaderSize { + m.Sglist = nil + } else { + i := 1 + n -= OutMessageHeaderSize + for len(m.Sglist) > i && n >= len(m.Sglist[i]) { + n -= len(m.Sglist[i]) + i++ + } + if n > 0 { + m.Sglist[i] = m.Sglist[i][0 : n] + i++ + } + m.Sglist = m.Sglist[0 : i] + } } // Append is equivalent to growing by len(src), then copying src over the new // segment. Int panics if there is not enough room available. -func (m *OutMessage) Append(src []byte) { - p := m.GrowNoZero(len(src)) - if p == nil { - panic(fmt.Sprintf("Can't grow %d bytes", len(src))) +func (m *OutMessage) Append(src ...[]byte) { + if m.Sglist == nil { + m.Sglist = append(m.Sglist, nil) + *(*reflect.SliceHeader)(unsafe.Pointer(&m.Sglist[0])) = reflect.SliceHeader{ + Data: uintptr(unsafe.Pointer(&m.header)), + Len: OutMessageHeaderSize, + Cap: OutMessageHeaderSize, + } } - - sh := (*reflect.SliceHeader)(unsafe.Pointer(&src)) - jacobsa_fuse_memmove(p, unsafe.Pointer(sh.Data), uintptr(sh.Len)) - + m.Sglist = append(m.Sglist, src...) return } // AppendString is like Append, but accepts string input. func (m *OutMessage) AppendString(src string) { - p := m.GrowNoZero(len(src)) - if p == nil { - panic(fmt.Sprintf("Can't grow %d bytes", len(src))) - } - - sh := (*reflect.StringHeader)(unsafe.Pointer(&src)) - jacobsa_fuse_memmove(p, unsafe.Pointer(sh.Data), uintptr(sh.Len)) - + m.Append([]byte(src)) return } // Len returns the current size of the message, including the leading header. func (m *OutMessage) Len() int { - return OutMessageHeaderSize + m.payloadOffset + if m.Sglist == nil { + return OutMessageHeaderSize + } + r := 0 + for _, b := range m.Sglist { + r += len(b) + } + return r } -// Bytes returns a reference to the current contents of the buffer, including -// the leading header. +// Bytes returns a byte slice containing the current header. func (m *OutMessage) Bytes() []byte { - l := m.Len() + l := OutMessageHeaderSize sh := reflect.SliceHeader{ Data: uintptr(unsafe.Pointer(&m.header)), Len: l, Cap: l, } - return *(*[]byte)(unsafe.Pointer(&sh)) } diff --git a/internal/buffer/out_message_test.go b/internal/buffer/out_message_test.go index 6292794..4945405 100644 --- a/internal/buffer/out_message_test.go +++ b/internal/buffer/out_message_test.go @@ -107,7 +107,10 @@ func TestOutMessageAppend(t *testing.T) { t.Errorf("om.Len() = %d, want %d", got, want) } - b := om.Bytes() + b := []byte(nil) + for i := 0; i < len(om.Sglist); i++ { + b = append(b, om.Sglist[i]...) + } if got, want := len(b), wantLen; got != want { t.Fatalf("len(om.Bytes()) = %d, want %d", got, want) } @@ -137,7 +140,10 @@ func TestOutMessageAppendString(t *testing.T) { t.Errorf("om.Len() = %d, want %d", got, want) } - b := om.Bytes() + b := []byte(nil) + for i := 0; i < len(om.Sglist); i++ { + b = append(b, om.Sglist[i]...) + } if got, want := len(b), wantLen; got != want { t.Fatalf("len(om.Bytes()) = %d, want %d", got, want) } @@ -168,7 +174,10 @@ func TestOutMessageShrinkTo(t *testing.T) { t.Errorf("om.Len() = %d, want %d", got, want) } - b := om.Bytes() + b := []byte(nil) + for i := 0; i < len(om.Sglist); i++ { + b = append(b, om.Sglist[i]...) + } if got, want := len(b), wantLen; got != want { t.Fatalf("len(om.Bytes()) = %d, want %d", got, want) } @@ -283,7 +292,10 @@ func TestOutMessageGrow(t *testing.T) { t.Errorf("om.Len() = %d, want %d", got, want) } - b := om.Bytes() + b := []byte(nil) + for i := 0; i < len(om.Sglist); i++ { + b = append(b, om.Sglist[i]...) + } if got, want := len(b), wantLen; got != want { t.Fatalf("len(om.Len()) = %d, want %d", got, want) } @@ -304,7 +316,7 @@ func BenchmarkOutMessageReset(b *testing.B) { om.Reset() } - b.SetBytes(int64(unsafe.Offsetof(om.payload))) + b.SetBytes(int64(om.Len())) }) // Many megabytes worth of buffers, which should defeat the CPU cache. @@ -321,7 +333,7 @@ func BenchmarkOutMessageReset(b *testing.B) { oms[i%numMessages].Reset() } - b.SetBytes(int64(unsafe.Offsetof(oms[0].payload))) + b.SetBytes(int64(oms[0].Len())) }) } diff --git a/mount_config.go b/mount_config.go index a93c321..16bfcc3 100644 --- a/mount_config.go +++ b/mount_config.go @@ -156,6 +156,11 @@ type MountConfig struct { // actually utilise any form of qualifiable UNIX permissions. DisableDefaultPermissions bool + // Use VectoredReadOp instead of ReadFileOp. + // Vectored read allows file systems to reduce memory copying overhead if + // the data is already in memory when they return it to FUSE. + UseVectoredRead bool + // OS X only. // // The name of the mounted volume, as displayed in the Finder. If empty, a diff --git a/writev.go b/writev.go new file mode 100644 index 0000000..05c9358 --- /dev/null +++ b/writev.go @@ -0,0 +1,29 @@ +package fuse + +import ( + "syscall" + "unsafe" +) + +func writev(fd int, packet [][]byte) (n int, err error) { + iovecs := make([]syscall.Iovec, 0, len(packet)) + for _, v := range packet { + if len(v) == 0 { + continue + } + vec := syscall.Iovec{ + Base: &v[0], + } + vec.SetLen(len(v)) + iovecs = append(iovecs, vec) + } + n1, _, e1 := syscall.Syscall( + syscall.SYS_WRITEV, + uintptr(fd), uintptr(unsafe.Pointer(&iovecs[0])), uintptr(len(iovecs)), + ) + n = int(n1) + if e1 != 0 { + err = syscall.Errno(e1) + } + return +}