Compare commits

...

6 Commits

Author SHA1 Message Date
Vitaliy Filippov a7dcac672f Fix RenameOp compatibility with macfuse 4.x
Closed-source macfuse 4.x has broken compatibility with osxfuse 3.x:
it passes an additional 64-bit field (flags) after RenameIn regardless
that we don't enable the support for RENAME_SWAP/RENAME_EXCL.

The simplest fix is just to check for the presence of all-zero flags
and strip them when they're present.
2021-10-08 17:26:53 +03:00
Vitaliy Filippov ec521aa7b7 Use newer mounting method (similar to fusermount) with macfuse 4.x
macfuse 4.x turns out to be incompatible with the old mounting method where
you open the device by yourself and only supports the newer method where you
receive a file descriptor from `mount_macfuse` through a unix socket.
2021-09-30 18:31:06 +03:00
Vitaliy Filippov 575b70f3fd Allow to use "zero-copy" writes 2021-09-01 14:14:51 +03:00
Vitaliy Filippov 513d4815ce Add vectored read to readbenchfs
You can now run `./readbenchfs --mount_point dir --vectored` and then
`dd if=dir/test of=/dev/null iflag=direct bs=1M status=progress` to test
vectored read speed.

My results with GOMAXPROCS=1:
- Before vectored read patch: 390 MB/s read
- Non-vectored read after vectored read patch: 830 MB/s read
- Vectored read: 1200 MB/s read
2021-09-01 14:14:15 +03:00
Vitaliy Filippov fcabfc89e9 Introduce VectoredReadOp
Read requests can now take vectored responses from the filesystem
implementation and send them to FUSE device via the writev() system call.

This allows file systems to send data without copying it into the
library-provided buffer if the data is already in memory.

The change also speeds up normal ReadFileOps as a side effect because
it removes extra memory allocations.
2021-09-01 14:14:15 +03:00
Vitaliy Filippov f1a2d0d300 Add ReadBenchFS to test linear read speed 2021-08-31 13:15:55 +03:00
17 changed files with 650 additions and 208 deletions

View File

@ -333,7 +333,7 @@ func (c *Connection) readMessage() (*buffer.InMessage, error) {
// Loop past transient errors. // Loop past transient errors.
for { for {
// Attempt a reaed. // Attempt a read.
err := m.Init(c.dev) err := m.Init(c.dev)
// Special cases: // Special cases:
@ -400,7 +400,7 @@ func (c *Connection) ReadOp() (_ context.Context, op interface{}, _ error) {
// Convert the message to an op. // Convert the message to an op.
outMsg := c.getOutMessage() outMsg := c.getOutMessage()
op, err = convertInMessage(inMsg, outMsg, c.protocol) op, err = convertInMessage(&c.cfg, inMsg, outMsg, c.protocol)
if err != nil { if err != nil {
c.putOutMessage(outMsg) c.putOutMessage(outMsg)
return nil, nil, fmt.Errorf("convertInMessage: %v", err) return nil, nil, fmt.Errorf("convertInMessage: %v", err)
@ -480,8 +480,15 @@ func (c *Connection) Reply(ctx context.Context, opErr error) {
outMsg := state.outMsg outMsg := state.outMsg
fuseID := inMsg.Header().Unique fuseID := inMsg.Header().Unique
suppressReuse := false
if wr, ok := op.(*fuseops.WriteFileOp); ok {
suppressReuse = wr.SuppressReuse
}
// Make sure we destroy the messages when we're done. // Make sure we destroy the messages when we're done.
if !suppressReuse {
defer c.putInMessage(inMsg) defer c.putInMessage(inMsg)
}
defer c.putOutMessage(outMsg) defer c.putOutMessage(outMsg)
// Clean up state for this op. // Clean up state for this op.
@ -505,10 +512,16 @@ func (c *Connection) Reply(ctx context.Context, opErr error) {
noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr) noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr)
if !noResponse { if !noResponse {
err := c.writeMessage(outMsg.Bytes()) var err error
if outMsg.Sglist != nil {
_, err = writev(int(c.dev.Fd()), outMsg.Sglist)
} else {
err = c.writeMessage(outMsg.Bytes())
}
if err != nil && c.errorLogger != nil { if err != nil && c.errorLogger != nil {
c.errorLogger.Printf("writeMessage: %v %v", err, outMsg.Bytes()) c.errorLogger.Printf("writeMessage: %v %v", err, outMsg.Bytes())
} }
outMsg.Sglist = nil
} }
} }

View File

@ -38,6 +38,7 @@ import (
// //
// The caller is responsible for arranging for the message to be destroyed. // The caller is responsible for arranging for the message to be destroyed.
func convertInMessage( func convertInMessage(
config *MountConfig,
inMsg *buffer.InMessage, inMsg *buffer.InMessage,
outMsg *buffer.OutMessage, outMsg *buffer.OutMessage,
protocol fusekernel.Protocol) (o interface{}, err error) { protocol fusekernel.Protocol) (o interface{}, err error) {
@ -206,6 +207,15 @@ func convertInMessage(
} }
names := inMsg.ConsumeBytes(inMsg.Len()) names := inMsg.ConsumeBytes(inMsg.Len())
// closed-source macfuse 4.x has broken compatibility with osxfuse 3.x:
// it passes an additional 64-bit field (flags) after RenameIn regardless
// that we don't enable the support for RENAME_SWAP/RENAME_EXCL
// the simplest fix is just to check for the presence of all-zero flags
if len(names) >= 8 &&
names[0] == 0 && names[1] == 0 && names[2] == 0 && names[3] == 0 &&
names[4] == 0 && names[5] == 0 && names[6] == 0 && names[7] == 0 {
names = names[8 : ]
}
// names should be "old\x00new\x00" // names should be "old\x00new\x00"
if len(names) < 4 { if len(names) < 4 {
return nil, errors.New("Corrupt OpRename") return nil, errors.New("Corrupt OpRename")
@ -271,24 +281,28 @@ func convertInMessage(
return nil, errors.New("Corrupt OpRead") return nil, errors.New("Corrupt OpRead")
} }
if !config.UseVectoredRead {
// Use part of the incoming message storage as a read buffer
buf := inMsg.GetFree(int(in.Size))
to := &fuseops.ReadFileOp{ to := &fuseops.ReadFileOp{
Inode: fuseops.InodeID(inMsg.Header().Nodeid), Inode: fuseops.InodeID(inMsg.Header().Nodeid),
Handle: fuseops.HandleID(in.Fh), Handle: fuseops.HandleID(in.Fh),
Offset: int64(in.Offset), Offset: int64(in.Offset),
Dst: buf,
OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid},
}
o = to
} else {
// Don't allocate any buffers when zero-copy is used
to := &fuseops.VectoredReadOp{
Inode: fuseops.InodeID(inMsg.Header().Nodeid),
Handle: fuseops.HandleID(in.Fh),
Offset: int64(in.Offset),
Size: int64(in.Size),
OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid}, OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid},
} }
o = to o = to
readSize := int(in.Size)
p := outMsg.GrowNoZero(readSize)
if p == nil {
return nil, fmt.Errorf("Can't grow for %d-byte read", readSize)
} }
sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst))
sh.Data = uintptr(p)
sh.Len = readSize
sh.Cap = readSize
case fusekernel.OpReaddir: case fusekernel.OpReaddir:
in := (*fusekernel.ReadIn)(inMsg.Consume(fusekernel.ReadInSize(protocol))) in := (*fusekernel.ReadIn)(inMsg.Consume(fusekernel.ReadInSize(protocol)))
@ -476,6 +490,7 @@ func convertInMessage(
o = to o = to
readSize := int(in.Size) readSize := int(in.Size)
if readSize > 0 {
p := outMsg.GrowNoZero(readSize) p := outMsg.GrowNoZero(readSize)
if p == nil { if p == nil {
return nil, fmt.Errorf("Can't grow for %d-byte read", readSize) return nil, fmt.Errorf("Can't grow for %d-byte read", readSize)
@ -485,6 +500,9 @@ func convertInMessage(
sh.Data = uintptr(p) sh.Data = uintptr(p)
sh.Len = readSize sh.Len = readSize
sh.Cap = readSize sh.Cap = readSize
} else {
to.Dst = nil
}
case fusekernel.OpListxattr: case fusekernel.OpListxattr:
type input fusekernel.ListxattrIn type input fusekernel.ListxattrIn
@ -705,9 +723,11 @@ func (c *Connection) kernelResponseForOp(
} }
case *fuseops.ReadFileOp: case *fuseops.ReadFileOp:
// convertInMessage already set up the destination buffer to be at the end m.Append(o.Dst)
// of the out message. We need only shrink to the right size based on how m.ShrinkTo(buffer.OutMessageHeaderSize + o.BytesRead)
// much the user read.
case *fuseops.VectoredReadOp:
m.Append(o.Data...)
m.ShrinkTo(buffer.OutMessageHeaderSize + o.BytesRead) m.ShrinkTo(buffer.OutMessageHeaderSize + o.BytesRead)
case *fuseops.WriteFileOp: case *fuseops.WriteFileOp:

View File

@ -97,6 +97,11 @@ func describeRequest(op interface{}) (s string) {
addComponent("offset %d", typed.Offset) addComponent("offset %d", typed.Offset)
addComponent("%d bytes", len(typed.Dst)) addComponent("%d bytes", len(typed.Dst))
case *fuseops.VectoredReadOp:
addComponent("handle %d", typed.Handle)
addComponent("offset %d", typed.Offset)
addComponent("%d bytes", typed.Size)
case *fuseops.WriteFileOp: case *fuseops.WriteFileOp:
addComponent("handle %d", typed.Handle) addComponent("handle %d", typed.Handle)
addComponent("offset %d", typed.Offset) addComponent("offset %d", typed.Offset)

View File

@ -654,6 +654,37 @@ type ReadFileOp struct {
OpContext OpContext OpContext OpContext
} }
// Vectored read - same as ReadFileOp, but the buffer isn't provided by the library.
// The file system returns a list of slices instead.
type VectoredReadOp struct {
// The file inode that we are reading, and the handle previously returned by
// CreateFile or OpenFile when opening that inode.
Inode InodeID
Handle HandleID
// The offset within the file at which to read.
Offset int64
// The size of the read.
Size int64
// Set by the file system: data to send back to the client.
Data [][]byte
// Set by the file system: the number of bytes read.
//
// The FUSE documentation requires that exactly the requested number of bytes
// be returned, except in the case of EOF or error (http://goo.gl/ZgfBkF).
// This appears to be because it uses file mmapping machinery
// (http://goo.gl/SGxnaN) to read a page at a time. It appears to understand
// where EOF is by checking the inode size (http://goo.gl/0BkqKD), returned
// by a previous call to LookUpInode, GetInodeAttributes, etc.
//
// If direct IO is enabled, semantics should match those of read(2).
BytesRead int
OpContext OpContext
}
// Write data to a file previously opened with CreateFile or OpenFile. // Write data to a file previously opened with CreateFile or OpenFile.
// //
// When the user writes data using write(2), the write goes into the page // When the user writes data using write(2), the write goes into the page
@ -711,6 +742,17 @@ type WriteFileOp struct {
// because it uses file mmapping machinery (http://goo.gl/SGxnaN) to write a // because it uses file mmapping machinery (http://goo.gl/SGxnaN) to write a
// page at a time. // page at a time.
Data []byte Data []byte
// Set by the file system: "no reuse" flag.
//
// By default, the Data buffer is reused by the library, so the file system
// must copy the data if it wants to use it later.
//
// However, if the file system sets this flag to true, the library doesn't
// reuse this buffer, so the file system can safely store and use Data slice
// without copying memory.
SuppressReuse bool
OpContext OpContext OpContext OpContext
} }

View File

@ -52,6 +52,7 @@ type FileSystem interface {
ReleaseDirHandle(context.Context, *fuseops.ReleaseDirHandleOp) error ReleaseDirHandle(context.Context, *fuseops.ReleaseDirHandleOp) error
OpenFile(context.Context, *fuseops.OpenFileOp) error OpenFile(context.Context, *fuseops.OpenFileOp) error
ReadFile(context.Context, *fuseops.ReadFileOp) error ReadFile(context.Context, *fuseops.ReadFileOp) error
VectoredRead(context.Context, *fuseops.VectoredReadOp) error
WriteFile(context.Context, *fuseops.WriteFileOp) error WriteFile(context.Context, *fuseops.WriteFileOp) error
SyncFile(context.Context, *fuseops.SyncFileOp) error SyncFile(context.Context, *fuseops.SyncFileOp) error
FlushFile(context.Context, *fuseops.FlushFileOp) error FlushFile(context.Context, *fuseops.FlushFileOp) error
@ -190,6 +191,9 @@ func (s *fileSystemServer) handleOp(
case *fuseops.ReadFileOp: case *fuseops.ReadFileOp:
err = s.fs.ReadFile(ctx, typed) err = s.fs.ReadFile(ctx, typed)
case *fuseops.VectoredReadOp:
err = s.fs.VectoredRead(ctx, typed)
case *fuseops.WriteFileOp: case *fuseops.WriteFileOp:
err = s.fs.WriteFile(ctx, typed) err = s.fs.WriteFile(ctx, typed)

View File

@ -138,6 +138,12 @@ func (fs *NotImplementedFileSystem) ReadFile(
return fuse.ENOSYS return fuse.ENOSYS
} }
func (fs *NotImplementedFileSystem) VectoredRead(
ctx context.Context,
op *fuseops.VectoredReadOp) error {
return fuse.ENOSYS
}
func (fs *NotImplementedFileSystem) WriteFile( func (fs *NotImplementedFileSystem) WriteFile(
ctx context.Context, ctx context.Context,
op *fuseops.WriteFileOp) error { op *fuseops.WriteFileOp) error {

View File

@ -42,13 +42,16 @@ func init() {
type InMessage struct { type InMessage struct {
remaining []byte remaining []byte
storage []byte storage []byte
size int
} }
// Initialize with the data read by a single call to r.Read. The first call to // Initialize with the data read by a single call to r.Read. The first call to
// Consume will consume the bytes directly after the fusekernel.InHeader // Consume will consume the bytes directly after the fusekernel.InHeader
// struct. // struct.
func (m *InMessage) Init(r io.Reader) error { func (m *InMessage) Init(r io.Reader) error {
if m.storage == nil {
m.storage = make([]byte, bufSize, bufSize) m.storage = make([]byte, bufSize, bufSize)
}
n, err := r.Read(m.storage[:]) n, err := r.Read(m.storage[:])
if err != nil { if err != nil {
return err return err
@ -60,6 +63,7 @@ func (m *InMessage) Init(r io.Reader) error {
return fmt.Errorf("Unexpectedly read only %d bytes.", n) return fmt.Errorf("Unexpectedly read only %d bytes.", n)
} }
m.size = n
m.remaining = m.storage[headerSize:n] m.remaining = m.storage[headerSize:n]
// Check the header's length. // Check the header's length.
@ -108,3 +112,11 @@ func (m *InMessage) ConsumeBytes(n uintptr) []byte {
return b return b
} }
// Get the next n bytes after the message to use them as a temporary buffer
func (m *InMessage) GetFree(n int) []byte {
if n <= 0 || n > len(m.storage)-m.size {
return nil
}
return m.storage[m.size : m.size+n]
}

View File

@ -17,4 +17,4 @@ package buffer
// The maximum fuse write request size that InMessage can acommodate. // The maximum fuse write request size that InMessage can acommodate.
// //
// As of kernel 4.20 Linux accepts writes up to 256 pages or 1MiB // As of kernel 4.20 Linux accepts writes up to 256 pages or 1MiB
const MaxWriteSize = 1 << 20 const MaxWriteSize = 1 << 17

View File

@ -16,7 +16,6 @@ package buffer
import ( import (
"fmt" "fmt"
"log"
"reflect" "reflect"
"unsafe" "unsafe"
@ -33,30 +32,15 @@ const OutMessageHeaderSize = int(unsafe.Sizeof(fusekernel.OutHeader{}))
// //
// Must be initialized with Reset. // Must be initialized with Reset.
type OutMessage struct { type OutMessage struct {
// The offset into payload to which we're currently writing.
payloadOffset int
header fusekernel.OutHeader header fusekernel.OutHeader
payload [MaxReadSize]byte Sglist [][]byte
}
// Make sure that the header and payload are contiguous.
func init() {
a := unsafe.Offsetof(OutMessage{}.header) + uintptr(OutMessageHeaderSize)
b := unsafe.Offsetof(OutMessage{}.payload)
if a != b {
log.Panicf(
"header ends at offset %d, but payload starts at offset %d",
a, b)
}
} }
// Reset resets m so that it's ready to be used again. Afterward, the contents // Reset resets m so that it's ready to be used again. Afterward, the contents
// are solely a zeroed fusekernel.OutHeader struct. // are solely a zeroed fusekernel.OutHeader struct.
func (m *OutMessage) Reset() { func (m *OutMessage) Reset() {
m.payloadOffset = 0
m.header = fusekernel.OutHeader{} m.header = fusekernel.OutHeader{}
m.Sglist = nil
} }
// OutHeader returns a pointer to the header at the start of the message. // OutHeader returns a pointer to the header at the start of the message.
@ -69,25 +53,15 @@ func (m *OutMessage) OutHeader() *fusekernel.OutHeader {
// insufficient space, it returns nil. // insufficient space, it returns nil.
func (m *OutMessage) Grow(n int) unsafe.Pointer { func (m *OutMessage) Grow(n int) unsafe.Pointer {
p := m.GrowNoZero(n) p := m.GrowNoZero(n)
if p != nil {
jacobsa_fuse_memclr(p, uintptr(n))
}
return p return p
} }
// GrowNoZero is equivalent to Grow, except the new segment is not zeroed. Use // GrowNoZero is equivalent to Grow, except the new segment is not zeroed. Use
// with caution! // with caution!
func (m *OutMessage) GrowNoZero(n int) unsafe.Pointer { func (m *OutMessage) GrowNoZero(n int) unsafe.Pointer {
// Will we overflow the buffer? b := make([]byte, n)
o := m.payloadOffset m.Append(b)
if len(m.payload)-o < n { p := unsafe.Pointer(&b[0])
return nil
}
p := unsafe.Pointer(uintptr(unsafe.Pointer(&m.payload)) + uintptr(o))
m.payloadOffset = o + n
return p return p
} }
@ -100,51 +74,63 @@ func (m *OutMessage) ShrinkTo(n int) {
n, n,
m.Len())) m.Len()))
} }
if n == OutMessageHeaderSize {
m.payloadOffset = n - OutMessageHeaderSize m.Sglist = nil
} else {
i := 1
n -= OutMessageHeaderSize
for len(m.Sglist) > i && n >= len(m.Sglist[i]) {
n -= len(m.Sglist[i])
i++
}
if n > 0 {
m.Sglist[i] = m.Sglist[i][0 : n]
i++
}
m.Sglist = m.Sglist[0 : i]
}
} }
// Append is equivalent to growing by len(src), then copying src over the new // Append is equivalent to growing by len(src), then copying src over the new
// segment. Int panics if there is not enough room available. // segment. Int panics if there is not enough room available.
func (m *OutMessage) Append(src []byte) { func (m *OutMessage) Append(src ...[]byte) {
p := m.GrowNoZero(len(src)) if m.Sglist == nil {
if p == nil { m.Sglist = append(m.Sglist, nil)
panic(fmt.Sprintf("Can't grow %d bytes", len(src))) *(*reflect.SliceHeader)(unsafe.Pointer(&m.Sglist[0])) = reflect.SliceHeader{
Data: uintptr(unsafe.Pointer(&m.header)),
Len: OutMessageHeaderSize,
Cap: OutMessageHeaderSize,
} }
}
sh := (*reflect.SliceHeader)(unsafe.Pointer(&src)) m.Sglist = append(m.Sglist, src...)
jacobsa_fuse_memmove(p, unsafe.Pointer(sh.Data), uintptr(sh.Len))
return return
} }
// AppendString is like Append, but accepts string input. // AppendString is like Append, but accepts string input.
func (m *OutMessage) AppendString(src string) { func (m *OutMessage) AppendString(src string) {
p := m.GrowNoZero(len(src)) m.Append([]byte(src))
if p == nil {
panic(fmt.Sprintf("Can't grow %d bytes", len(src)))
}
sh := (*reflect.StringHeader)(unsafe.Pointer(&src))
jacobsa_fuse_memmove(p, unsafe.Pointer(sh.Data), uintptr(sh.Len))
return return
} }
// Len returns the current size of the message, including the leading header. // Len returns the current size of the message, including the leading header.
func (m *OutMessage) Len() int { func (m *OutMessage) Len() int {
return OutMessageHeaderSize + m.payloadOffset if m.Sglist == nil {
return OutMessageHeaderSize
}
r := 0
for _, b := range m.Sglist {
r += len(b)
}
return r
} }
// Bytes returns a reference to the current contents of the buffer, including // Bytes returns a byte slice containing the current header.
// the leading header.
func (m *OutMessage) Bytes() []byte { func (m *OutMessage) Bytes() []byte {
l := m.Len() l := OutMessageHeaderSize
sh := reflect.SliceHeader{ sh := reflect.SliceHeader{
Data: uintptr(unsafe.Pointer(&m.header)), Data: uintptr(unsafe.Pointer(&m.header)),
Len: l, Len: l,
Cap: l, Cap: l,
} }
return *(*[]byte)(unsafe.Pointer(&sh)) return *(*[]byte)(unsafe.Pointer(&sh))
} }

View File

@ -107,7 +107,10 @@ func TestOutMessageAppend(t *testing.T) {
t.Errorf("om.Len() = %d, want %d", got, want) t.Errorf("om.Len() = %d, want %d", got, want)
} }
b := om.Bytes() b := []byte(nil)
for i := 0; i < len(om.Sglist); i++ {
b = append(b, om.Sglist[i]...)
}
if got, want := len(b), wantLen; got != want { if got, want := len(b), wantLen; got != want {
t.Fatalf("len(om.Bytes()) = %d, want %d", got, want) t.Fatalf("len(om.Bytes()) = %d, want %d", got, want)
} }
@ -137,7 +140,10 @@ func TestOutMessageAppendString(t *testing.T) {
t.Errorf("om.Len() = %d, want %d", got, want) t.Errorf("om.Len() = %d, want %d", got, want)
} }
b := om.Bytes() b := []byte(nil)
for i := 0; i < len(om.Sglist); i++ {
b = append(b, om.Sglist[i]...)
}
if got, want := len(b), wantLen; got != want { if got, want := len(b), wantLen; got != want {
t.Fatalf("len(om.Bytes()) = %d, want %d", got, want) t.Fatalf("len(om.Bytes()) = %d, want %d", got, want)
} }
@ -168,7 +174,10 @@ func TestOutMessageShrinkTo(t *testing.T) {
t.Errorf("om.Len() = %d, want %d", got, want) t.Errorf("om.Len() = %d, want %d", got, want)
} }
b := om.Bytes() b := []byte(nil)
for i := 0; i < len(om.Sglist); i++ {
b = append(b, om.Sglist[i]...)
}
if got, want := len(b), wantLen; got != want { if got, want := len(b), wantLen; got != want {
t.Fatalf("len(om.Bytes()) = %d, want %d", got, want) t.Fatalf("len(om.Bytes()) = %d, want %d", got, want)
} }
@ -283,7 +292,10 @@ func TestOutMessageGrow(t *testing.T) {
t.Errorf("om.Len() = %d, want %d", got, want) t.Errorf("om.Len() = %d, want %d", got, want)
} }
b := om.Bytes() b := []byte(nil)
for i := 0; i < len(om.Sglist); i++ {
b = append(b, om.Sglist[i]...)
}
if got, want := len(b), wantLen; got != want { if got, want := len(b), wantLen; got != want {
t.Fatalf("len(om.Len()) = %d, want %d", got, want) t.Fatalf("len(om.Len()) = %d, want %d", got, want)
} }
@ -304,7 +316,7 @@ func BenchmarkOutMessageReset(b *testing.B) {
om.Reset() om.Reset()
} }
b.SetBytes(int64(unsafe.Offsetof(om.payload))) b.SetBytes(int64(om.Len()))
}) })
// Many megabytes worth of buffers, which should defeat the CPU cache. // Many megabytes worth of buffers, which should defeat the CPU cache.
@ -321,7 +333,7 @@ func BenchmarkOutMessageReset(b *testing.B) {
oms[i%numMessages].Reset() oms[i%numMessages].Reset()
} }
b.SetBytes(int64(unsafe.Offsetof(oms[0].payload))) b.SetBytes(int64(oms[0].Len()))
}) })
} }

View File

@ -17,7 +17,10 @@ package fuse
import ( import (
"context" "context"
"fmt" "fmt"
"net"
"os" "os"
"os/exec"
"syscall"
) )
// Server is an interface for any type that knows how to serve ops read from a // Server is an interface for any type that knows how to serve ops read from a
@ -93,3 +96,82 @@ func Mount(
return mfs, nil return mfs, nil
} }
func fusermount(binary string, argv []string, additionalEnv []string, wait bool) (*os.File, error) {
// Create a socket pair.
fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0)
if err != nil {
return nil, fmt.Errorf("Socketpair: %v", err)
}
// Wrap the sockets into os.File objects that we will pass off to fusermount.
writeFile := os.NewFile(uintptr(fds[0]), "fusermount-child-writes")
defer writeFile.Close()
readFile := os.NewFile(uintptr(fds[1]), "fusermount-parent-reads")
defer readFile.Close()
// Start fusermount/mount_macfuse/mount_osxfuse.
cmd := exec.Command(binary, argv...)
cmd.Env = append(os.Environ(), "_FUSE_COMMFD=3")
cmd.Env = append(cmd.Env, additionalEnv...)
cmd.ExtraFiles = []*os.File{writeFile}
cmd.Stderr = os.Stderr
// Run the command.
if wait {
err = cmd.Run()
} else {
err = cmd.Start()
}
if err != nil {
return nil, fmt.Errorf("running %v: %v", binary, err)
}
// Wrap the socket file in a connection.
c, err := net.FileConn(readFile)
if err != nil {
return nil, fmt.Errorf("FileConn: %v", err)
}
defer c.Close()
// We expect to have a Unix domain socket.
uc, ok := c.(*net.UnixConn)
if !ok {
return nil, fmt.Errorf("Expected UnixConn, got %T", c)
}
// Read a message.
buf := make([]byte, 32) // expect 1 byte
oob := make([]byte, 32) // expect 24 bytes
_, oobn, _, _, err := uc.ReadMsgUnix(buf, oob)
if err != nil {
return nil, fmt.Errorf("ReadMsgUnix: %v", err)
}
// Parse the message.
scms, err := syscall.ParseSocketControlMessage(oob[:oobn])
if err != nil {
return nil, fmt.Errorf("ParseSocketControlMessage: %v", err)
}
// We expect one message.
if len(scms) != 1 {
return nil, fmt.Errorf("expected 1 SocketControlMessage; got scms = %#v", scms)
}
scm := scms[0]
// Pull out the FD returned by fusermount
gotFds, err := syscall.ParseUnixRights(&scm)
if err != nil {
return nil, fmt.Errorf("syscall.ParseUnixRights: %v", err)
}
if len(gotFds) != 1 {
return nil, fmt.Errorf("wanted 1 fd; got %#v", gotFds)
}
// Turn the FD into an os.File.
return os.NewFile(uintptr(gotFds[0]), "/dev/fuse"), nil
}

View File

@ -156,6 +156,11 @@ type MountConfig struct {
// actually utilise any form of qualifiable UNIX permissions. // actually utilise any form of qualifiable UNIX permissions.
DisableDefaultPermissions bool DisableDefaultPermissions bool
// Use VectoredReadOp instead of ReadFileOp.
// Vectored read allows file systems to reduce memory copying overhead if
// the data is already in memory when they return it to FUSE.
UseVectoredRead bool
// OS X only. // OS X only.
// //
// The name of the mounted volume, as displayed in the Finder. If empty, a // The name of the mounted volume, as displayed in the Finder. If empty, a

View File

@ -40,6 +40,10 @@ type osxfuseInstallation struct {
// Environment variable used to pass the "called by library" flag. // Environment variable used to pass the "called by library" flag.
LibVar string LibVar string
// Open device manually (false) or receive the FD through a UNIX socket,
// like with fusermount (true)
UseCommFD bool
} }
var ( var (
@ -51,6 +55,7 @@ var (
Mount: "/Library/Filesystems/macfuse.fs/Contents/Resources/mount_macfuse", Mount: "/Library/Filesystems/macfuse.fs/Contents/Resources/mount_macfuse",
DaemonVar: "_FUSE_DAEMON_PATH", DaemonVar: "_FUSE_DAEMON_PATH",
LibVar: "_FUSE_CALL_BY_LIB", LibVar: "_FUSE_CALL_BY_LIB",
UseCommFD: true,
}, },
// v3 // v3
@ -106,6 +111,36 @@ func openOSXFUSEDev(devPrefix string) (dev *os.File, err error) {
} }
} }
func convertMountArgs(daemonVar string, libVar string,
cfg *MountConfig) ([]string, []string, error) {
// The mount helper doesn't understand any escaping.
for k, v := range cfg.toMap() {
if strings.Contains(k, ",") || strings.Contains(v, ",") {
return nil, nil, fmt.Errorf(
"mount options cannot contain commas on darwin: %q=%q",
k,
v)
}
}
env := []string{ libVar+"=" }
if daemonVar != "" {
env = append(env, daemonVar+"="+os.Args[0])
}
argv := []string{
"-o", cfg.toOptionsString(),
// Tell osxfuse-kext how large our buffer is. It must split
// writes larger than this into multiple writes.
//
// OSXFUSE seems to ignore InitResponse.MaxWrite, and uses
// this instead.
"-o", "iosize="+strconv.FormatUint(buffer.MaxWriteSize, 10),
}
return argv, env, nil
}
func callMount( func callMount(
bin string, bin string,
daemonVar string, daemonVar string,
@ -115,39 +150,21 @@ func callMount(
dev *os.File, dev *os.File,
ready chan<- error) error { ready chan<- error) error {
// The mount helper doesn't understand any escaping. argv, env, err := convertMountArgs(daemonVar, libVar, cfg)
for k, v := range cfg.toMap() { if err != nil {
if strings.Contains(k, ",") || strings.Contains(v, ",") { return err
return fmt.Errorf(
"mount options cannot contain commas on darwin: %q=%q",
k,
v)
}
} }
// Call the mount helper, passing in the device file and saving output into a // Call the mount helper, passing in the device file and saving output into a
// buffer. // buffer.
cmd := exec.Command( argv = append(argv,
bin,
"-o", cfg.toOptionsString(),
// Tell osxfuse-kext how large our buffer is. It must split
// writes larger than this into multiple writes.
//
// OSXFUSE seems to ignore InitResponse.MaxWrite, and uses
// this instead.
"-o", "iosize="+strconv.FormatUint(buffer.MaxWriteSize, 10),
// refers to fd passed in cmd.ExtraFiles // refers to fd passed in cmd.ExtraFiles
"3", "3",
dir, dir,
) )
cmd := exec.Command(bin, argv...)
cmd.ExtraFiles = []*os.File{dev} cmd.ExtraFiles = []*os.File{dev}
cmd.Env = os.Environ() cmd.Env = env
cmd.Env = append(cmd.Env, libVar+"=")
daemon := os.Args[0]
if daemonVar != "" {
cmd.Env = append(cmd.Env, daemonVar+"="+daemon)
}
var buf bytes.Buffer var buf bytes.Buffer
cmd.Stdout = &buf cmd.Stdout = &buf
@ -174,6 +191,23 @@ func callMount(
return nil return nil
} }
func callMountCommFD(
bin string,
daemonVar string,
libVar string,
dir string,
cfg *MountConfig) (*os.File, error) {
argv, env, err := convertMountArgs(daemonVar, libVar, cfg)
if err != nil {
return nil, err
}
env = append(env, "_FUSE_COMMVERS=2")
argv = append(argv, dir)
return fusermount(bin, argv, env, false)
}
// Begin the process of mounting at the given directory, returning a connection // Begin the process of mounting at the given directory, returning a connection
// to the kernel. Mounting continues in the background, and is complete when an // to the kernel. Mounting continues in the background, and is complete when an
// error is written to the supplied channel. The file system may need to // error is written to the supplied channel. The file system may need to
@ -189,6 +223,16 @@ func mount(
continue continue
} }
if loc.UseCommFD {
// Call the mount binary with the device.
ready <- nil
dev, err = callMountCommFD(loc.Mount, loc.DaemonVar, loc.LibVar, dir, cfg)
if err != nil {
return nil, fmt.Errorf("callMount: %v", err)
}
return
}
// Open the device. // Open the device.
dev, err = openOSXFUSEDev(loc.DevicePrefix) dev, err = openOSXFUSEDev(loc.DevicePrefix)

View File

@ -1,10 +1,8 @@
package fuse package fuse
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"net"
"os" "os"
"os/exec" "os/exec"
"syscall" "syscall"
@ -23,92 +21,6 @@ func findFusermount() (string, error) {
return path, nil return path, nil
} }
func fusermount(dir string, cfg *MountConfig) (*os.File, error) {
// Create a socket pair.
fds, err := syscall.Socketpair(syscall.AF_FILE, syscall.SOCK_STREAM, 0)
if err != nil {
return nil, fmt.Errorf("Socketpair: %v", err)
}
// Wrap the sockets into os.File objects that we will pass off to fusermount.
writeFile := os.NewFile(uintptr(fds[0]), "fusermount-child-writes")
defer writeFile.Close()
readFile := os.NewFile(uintptr(fds[1]), "fusermount-parent-reads")
defer readFile.Close()
// Start fusermount, passing it a buffer in which to write stderr.
var stderr bytes.Buffer
fusermount, err := findFusermount()
if err != nil {
return nil, err
}
cmd := exec.Command(
fusermount,
"-o", cfg.toOptionsString(),
"--",
dir,
)
cmd.Env = append(os.Environ(), "_FUSE_COMMFD=3")
cmd.ExtraFiles = []*os.File{writeFile}
cmd.Stderr = &stderr
// Run the command.
err = cmd.Run()
if err != nil {
return nil, fmt.Errorf("running fusermount: %v\n\nstderr:\n%s", err, stderr.Bytes())
}
// Wrap the socket file in a connection.
c, err := net.FileConn(readFile)
if err != nil {
return nil, fmt.Errorf("FileConn: %v", err)
}
defer c.Close()
// We expect to have a Unix domain socket.
uc, ok := c.(*net.UnixConn)
if !ok {
return nil, fmt.Errorf("Expected UnixConn, got %T", c)
}
// Read a message.
buf := make([]byte, 32) // expect 1 byte
oob := make([]byte, 32) // expect 24 bytes
_, oobn, _, _, err := uc.ReadMsgUnix(buf, oob)
if err != nil {
return nil, fmt.Errorf("ReadMsgUnix: %v", err)
}
// Parse the message.
scms, err := syscall.ParseSocketControlMessage(oob[:oobn])
if err != nil {
return nil, fmt.Errorf("ParseSocketControlMessage: %v", err)
}
// We expect one message.
if len(scms) != 1 {
return nil, fmt.Errorf("expected 1 SocketControlMessage; got scms = %#v", scms)
}
scm := scms[0]
// Pull out the FD returned by fusermount
gotFds, err := syscall.ParseUnixRights(&scm)
if err != nil {
return nil, fmt.Errorf("syscall.ParseUnixRights: %v", err)
}
if len(gotFds) != 1 {
return nil, fmt.Errorf("wanted 1 fd; got %#v", gotFds)
}
// Turn the FD into an os.File.
return os.NewFile(uintptr(gotFds[0]), "/dev/fuse"), nil
}
func enableFunc(flag uintptr) func(uintptr) uintptr { func enableFunc(flag uintptr) func(uintptr) uintptr {
return func(v uintptr) uintptr { return func(v uintptr) uintptr {
return v | flag return v | flag
@ -198,7 +110,16 @@ func mount(dir string, cfg *MountConfig, ready chan<- error) (*os.File, error) {
// have the CAP_SYS_ADMIN capability. // have the CAP_SYS_ADMIN capability.
dev, err := directmount(dir, cfg) dev, err := directmount(dir, cfg)
if err == errFallback { if err == errFallback {
return fusermount(dir, cfg) fusermountPath, err := findFusermount()
if err != nil {
return nil, err
}
argv := []string{
"-o", cfg.toOptionsString(),
"--",
dir,
}
return fusermount(fusermountPath, argv, []string{}, true)
} }
return dev, err return dev, err
} }

View File

@ -0,0 +1,48 @@
package main
import (
"context"
"flag"
"log"
"os"
"github.com/jacobsa/fuse"
"github.com/jacobsa/fuse/samples/readbenchfs"
)
var fMountPoint = flag.String("mount_point", "", "Path to mount point.")
var fReadOnly = flag.Bool("read_only", false, "Mount in read-only mode.")
var fVectored = flag.Bool("vectored", false, "Use vectored read.")
var fDebug = flag.Bool("debug", false, "Enable debug logging.")
func main() {
flag.Parse()
server, err := readbenchfs.NewReadBenchServer()
if err != nil {
log.Fatalf("makeFS: %v", err)
}
// Mount the file system.
if *fMountPoint == "" {
log.Fatalf("You must set --mount_point.")
}
cfg := &fuse.MountConfig{
ReadOnly: *fReadOnly,
UseVectoredRead: *fVectored,
}
if *fDebug {
cfg.DebugLogger = log.New(os.Stderr, "fuse: ", 0)
}
mfs, err := fuse.Mount(*fMountPoint, server, cfg)
if err != nil {
log.Fatalf("Mount: %v", err)
}
// Wait for it to be unmounted.
if err = mfs.Join(context.Background()); err != nil {
log.Fatalf("Join: %v", err)
}
}

View File

@ -0,0 +1,213 @@
// Copyright 2021 Vitaliy Filippov
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package readbenchfs
import (
"golang.org/x/net/context"
"os"
"io"
"math/rand"
"github.com/jacobsa/fuse"
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/fuse/fuseutil"
)
type readBenchFS struct {
fuseutil.NotImplementedFileSystem
buf []byte
}
const FILE_SIZE = 1024*1024*1024*1024
var _ fuseutil.FileSystem = &readBenchFS{}
func NewReadBenchServer() (server fuse.Server, err error) {
// 1 GB of random data to exceed CPU cache
buf := make([]byte, 1024*1024*1024)
rand.Read(buf)
server = fuseutil.NewFileSystemServer(&readBenchFS{
buf: buf,
})
return
}
func (fs *readBenchFS) StatFS(
ctx context.Context,
op *fuseops.StatFSOp) error {
return nil
}
func (fs *readBenchFS) LookUpInode(
ctx context.Context,
op *fuseops.LookUpInodeOp) error {
if op.Name == "test" {
op.Entry = fuseops.ChildInodeEntry{
Child: 2,
Attributes: fuseops.InodeAttributes{
Size: FILE_SIZE,
Nlink: 1,
Mode: 0444,
},
}
return nil
}
return fuse.ENOENT
}
func (fs *readBenchFS) GetInodeAttributes(
ctx context.Context,
op *fuseops.GetInodeAttributesOp) error {
if op.Inode == 1 {
op.Attributes = fuseops.InodeAttributes{
Nlink: 1,
Mode: 0755 | os.ModeDir,
}
return nil
} else if op.Inode == 2 {
op.Attributes = fuseops.InodeAttributes{
Size: FILE_SIZE,
Nlink: 1,
Mode: 0444,
}
return nil
}
return fuse.ENOENT
}
func (fs *readBenchFS) OpenDir(
ctx context.Context,
op *fuseops.OpenDirOp) error {
// Allow opening any directory.
return nil
}
func (fs *readBenchFS) ReadDir(
ctx context.Context,
op *fuseops.ReadDirOp) error {
if op.Inode != 1 {
return fuse.ENOENT
}
if op.Offset > 0 {
return nil
}
entries := []fuseutil.Dirent{
fuseutil.Dirent{
Offset: 1,
Inode: 2,
Name: "test",
Type: fuseutil.DT_File,
},
}
for _, e := range entries[op.Offset:] {
n := fuseutil.WriteDirent(op.Dst[op.BytesRead:], e)
if n == 0 {
break
}
op.BytesRead += n
}
return nil
}
func (fs *readBenchFS) OpenFile(
ctx context.Context,
op *fuseops.OpenFileOp) error {
// Allow opening any file.
return nil
}
func (fs *readBenchFS) ReadFile(
ctx context.Context,
op *fuseops.ReadFileOp) error {
if op.Offset > FILE_SIZE {
return io.EOF
}
end := op.Offset+int64(len(op.Dst))
if end > FILE_SIZE {
end = FILE_SIZE
}
buflen := int64(len(fs.buf))
for pos := op.Offset; pos < end; {
s := pos % buflen
e := buflen
if e-s > end-pos {
e = s+end-pos
}
copy(op.Dst[pos-op.Offset : ], fs.buf[s : ])
pos = op.Offset+e
}
op.BytesRead = int(end-op.Offset)
return nil
}
func (fs *readBenchFS) VectoredRead(
ctx context.Context,
op *fuseops.VectoredReadOp) error {
if op.Offset > FILE_SIZE {
return io.EOF
}
end := op.Offset+op.Size
if end > FILE_SIZE {
end = FILE_SIZE
}
buflen := int64(len(fs.buf))
for pos := op.Offset; pos < end; {
s := pos % buflen
e := buflen
if e-s > end-pos {
e = s+end-pos
}
op.Data = append(op.Data, fs.buf[s : e])
pos = op.Offset+e
}
op.BytesRead = int(end-op.Offset)
return nil
}
func (fs *readBenchFS) ReleaseDirHandle(
ctx context.Context,
op *fuseops.ReleaseDirHandleOp) error {
return nil
}
func (fs *readBenchFS) GetXattr(
ctx context.Context,
op *fuseops.GetXattrOp) error {
return nil
}
func (fs *readBenchFS) ListXattr(
ctx context.Context,
op *fuseops.ListXattrOp) error {
return nil
}
func (fs *readBenchFS) ForgetInode(
ctx context.Context,
op *fuseops.ForgetInodeOp) error {
return nil
}
func (fs *readBenchFS) ReleaseFileHandle(
ctx context.Context,
op *fuseops.ReleaseFileHandleOp) error {
return nil
}
func (fs *readBenchFS) FlushFile(
ctx context.Context,
op *fuseops.FlushFileOp) error {
return nil
}

29
writev.go Normal file
View File

@ -0,0 +1,29 @@
package fuse
import (
"syscall"
"unsafe"
)
func writev(fd int, packet [][]byte) (n int, err error) {
iovecs := make([]syscall.Iovec, 0, len(packet))
for _, v := range packet {
if len(v) == 0 {
continue
}
vec := syscall.Iovec{
Base: &v[0],
}
vec.SetLen(len(v))
iovecs = append(iovecs, vec)
}
n1, _, e1 := syscall.Syscall(
syscall.SYS_WRITEV,
uintptr(fd), uintptr(unsafe.Pointer(&iovecs[0])), uintptr(len(iovecs)),
)
n = int(n1)
if e1 != 0 {
err = syscall.Errno(e1)
}
return
}