Compare commits
6 Commits
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | e8004f04a5 | |
Vitaliy Filippov | 775aacf12c | |
Vitaliy Filippov | 2be4ecc37d | |
Vitaliy Filippov | ac82ada21e | |
Vitaliy Filippov | 694c1bf9db | |
Vitaliy Filippov | b5cbfcd8b6 |
|
@ -338,7 +338,7 @@ func (c *Connection) readMessage() (*buffer.InMessage, error) {
|
|||
|
||||
// Loop past transient errors.
|
||||
for {
|
||||
// Attempt a reaed.
|
||||
// Attempt a read.
|
||||
err := m.Init(c.dev)
|
||||
|
||||
// Special cases:
|
||||
|
@ -405,7 +405,7 @@ func (c *Connection) ReadOp() (_ context.Context, op interface{}, _ error) {
|
|||
|
||||
// Convert the message to an op.
|
||||
outMsg := c.getOutMessage()
|
||||
op, err = convertInMessage(inMsg, outMsg, c.protocol)
|
||||
op, err = convertInMessage(&c.cfg, inMsg, outMsg, c.protocol)
|
||||
if err != nil {
|
||||
c.putOutMessage(outMsg)
|
||||
return nil, nil, fmt.Errorf("convertInMessage: %v", err)
|
||||
|
@ -485,8 +485,15 @@ func (c *Connection) Reply(ctx context.Context, opErr error) {
|
|||
outMsg := state.outMsg
|
||||
fuseID := inMsg.Header().Unique
|
||||
|
||||
suppressReuse := false
|
||||
if wr, ok := op.(*fuseops.WriteFileOp); ok {
|
||||
suppressReuse = wr.SuppressReuse
|
||||
}
|
||||
|
||||
// Make sure we destroy the messages when we're done.
|
||||
defer c.putInMessage(inMsg)
|
||||
if !suppressReuse {
|
||||
defer c.putInMessage(inMsg)
|
||||
}
|
||||
defer c.putOutMessage(outMsg)
|
||||
|
||||
// Clean up state for this op.
|
||||
|
@ -510,10 +517,16 @@ func (c *Connection) Reply(ctx context.Context, opErr error) {
|
|||
noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr)
|
||||
|
||||
if !noResponse {
|
||||
err := c.writeMessage(outMsg.Bytes())
|
||||
if err != nil && c.errorLogger != nil {
|
||||
c.errorLogger.Printf("writeMessage: %v %v", err, outMsg.Bytes())
|
||||
var err error
|
||||
if outMsg.Sglist != nil {
|
||||
_, err = writev(int(c.dev.Fd()), outMsg.Sglist)
|
||||
} else {
|
||||
err = c.writeMessage(outMsg.OutHeaderBytes())
|
||||
}
|
||||
if err != nil && c.errorLogger != nil {
|
||||
c.errorLogger.Printf("writeMessage: %v %v", err, outMsg.OutHeaderBytes())
|
||||
}
|
||||
outMsg.Sglist = nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ import (
|
|||
//
|
||||
// The caller is responsible for arranging for the message to be destroyed.
|
||||
func convertInMessage(
|
||||
config *MountConfig,
|
||||
inMsg *buffer.InMessage,
|
||||
outMsg *buffer.OutMessage,
|
||||
protocol fusekernel.Protocol) (o interface{}, err error) {
|
||||
|
@ -284,24 +285,28 @@ func convertInMessage(
|
|||
return nil, errors.New("Corrupt OpRead")
|
||||
}
|
||||
|
||||
to := &fuseops.ReadFileOp{
|
||||
Inode: fuseops.InodeID(inMsg.Header().Nodeid),
|
||||
Handle: fuseops.HandleID(in.Fh),
|
||||
Offset: int64(in.Offset),
|
||||
OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid},
|
||||
if !config.UseVectoredRead {
|
||||
// Use part of the incoming message storage as the read buffer
|
||||
buf := inMsg.GetFree(int(in.Size))
|
||||
to := &fuseops.ReadFileOp{
|
||||
Inode: fuseops.InodeID(inMsg.Header().Nodeid),
|
||||
Handle: fuseops.HandleID(in.Fh),
|
||||
Offset: int64(in.Offset),
|
||||
Dst: buf,
|
||||
OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid},
|
||||
}
|
||||
o = to
|
||||
} else {
|
||||
// Don't allocate any buffers when zero-copy is used
|
||||
to := &fuseops.VectoredReadOp{
|
||||
Inode: fuseops.InodeID(inMsg.Header().Nodeid),
|
||||
Handle: fuseops.HandleID(in.Fh),
|
||||
Offset: int64(in.Offset),
|
||||
Size: int64(in.Size),
|
||||
OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid},
|
||||
}
|
||||
o = to
|
||||
}
|
||||
o = to
|
||||
|
||||
readSize := int(in.Size)
|
||||
p := outMsg.GrowNoZero(readSize)
|
||||
if p == nil {
|
||||
return nil, fmt.Errorf("Can't grow for %d-byte read", readSize)
|
||||
}
|
||||
|
||||
sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst))
|
||||
sh.Data = uintptr(p)
|
||||
sh.Len = readSize
|
||||
sh.Cap = readSize
|
||||
|
||||
case fusekernel.OpReaddir:
|
||||
in := (*fusekernel.ReadIn)(inMsg.Consume(fusekernel.ReadInSize(protocol)))
|
||||
|
@ -318,7 +323,7 @@ func convertInMessage(
|
|||
o = to
|
||||
|
||||
readSize := int(in.Size)
|
||||
p := outMsg.GrowNoZero(readSize)
|
||||
p := outMsg.Grow(readSize)
|
||||
if p == nil {
|
||||
return nil, fmt.Errorf("Can't grow for %d-byte read", readSize)
|
||||
}
|
||||
|
@ -489,15 +494,19 @@ func convertInMessage(
|
|||
o = to
|
||||
|
||||
readSize := int(in.Size)
|
||||
p := outMsg.GrowNoZero(readSize)
|
||||
if p == nil {
|
||||
return nil, fmt.Errorf("Can't grow for %d-byte read", readSize)
|
||||
}
|
||||
if readSize > 0 {
|
||||
p := outMsg.Grow(readSize)
|
||||
if p == nil {
|
||||
return nil, fmt.Errorf("Can't grow for %d-byte read", readSize)
|
||||
}
|
||||
|
||||
sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst))
|
||||
sh.Data = uintptr(p)
|
||||
sh.Len = readSize
|
||||
sh.Cap = readSize
|
||||
sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst))
|
||||
sh.Data = uintptr(p)
|
||||
sh.Len = readSize
|
||||
sh.Cap = readSize
|
||||
} else {
|
||||
to.Dst = nil
|
||||
}
|
||||
|
||||
case fusekernel.OpListxattr:
|
||||
type input fusekernel.ListxattrIn
|
||||
|
@ -514,7 +523,7 @@ func convertInMessage(
|
|||
|
||||
readSize := int(in.Size)
|
||||
if readSize != 0 {
|
||||
p := outMsg.GrowNoZero(readSize)
|
||||
p := outMsg.Grow(readSize)
|
||||
if p == nil {
|
||||
return nil, fmt.Errorf("Can't grow for %d-byte read", readSize)
|
||||
}
|
||||
|
@ -718,9 +727,11 @@ func (c *Connection) kernelResponseForOp(
|
|||
}
|
||||
|
||||
case *fuseops.ReadFileOp:
|
||||
// convertInMessage already set up the destination buffer to be at the end
|
||||
// of the out message. We need only shrink to the right size based on how
|
||||
// much the user read.
|
||||
m.Append(o.Dst)
|
||||
m.ShrinkTo(buffer.OutMessageHeaderSize + o.BytesRead)
|
||||
|
||||
case *fuseops.VectoredReadOp:
|
||||
m.Append(o.Data...)
|
||||
m.ShrinkTo(buffer.OutMessageHeaderSize + o.BytesRead)
|
||||
|
||||
case *fuseops.WriteFileOp:
|
||||
|
|
5
debug.go
5
debug.go
|
@ -97,6 +97,11 @@ func describeRequest(op interface{}) (s string) {
|
|||
addComponent("offset %d", typed.Offset)
|
||||
addComponent("%d bytes", len(typed.Dst))
|
||||
|
||||
case *fuseops.VectoredReadOp:
|
||||
addComponent("handle %d", typed.Handle)
|
||||
addComponent("offset %d", typed.Offset)
|
||||
addComponent("%d bytes", typed.Size)
|
||||
|
||||
case *fuseops.WriteFileOp:
|
||||
addComponent("handle %d", typed.Handle)
|
||||
addComponent("offset %d", typed.Offset)
|
||||
|
|
|
@ -654,6 +654,37 @@ type ReadFileOp struct {
|
|||
OpContext OpContext
|
||||
}
|
||||
|
||||
// Vectored read - same as ReadFileOp, but the buffer isn't provided by the library.
|
||||
// The file system returns a list of slices instead.
|
||||
type VectoredReadOp struct {
|
||||
// The file inode that we are reading, and the handle previously returned by
|
||||
// CreateFile or OpenFile when opening that inode.
|
||||
Inode InodeID
|
||||
Handle HandleID
|
||||
|
||||
// The offset within the file at which to read.
|
||||
Offset int64
|
||||
|
||||
// The size of the read.
|
||||
Size int64
|
||||
|
||||
// Set by the file system: data to send back to the client.
|
||||
Data [][]byte
|
||||
|
||||
// Set by the file system: the number of bytes read.
|
||||
//
|
||||
// The FUSE documentation requires that exactly the requested number of bytes
|
||||
// be returned, except in the case of EOF or error (http://goo.gl/ZgfBkF).
|
||||
// This appears to be because it uses file mmapping machinery
|
||||
// (http://goo.gl/SGxnaN) to read a page at a time. It appears to understand
|
||||
// where EOF is by checking the inode size (http://goo.gl/0BkqKD), returned
|
||||
// by a previous call to LookUpInode, GetInodeAttributes, etc.
|
||||
//
|
||||
// If direct IO is enabled, semantics should match those of read(2).
|
||||
BytesRead int
|
||||
OpContext OpContext
|
||||
}
|
||||
|
||||
// Write data to a file previously opened with CreateFile or OpenFile.
|
||||
//
|
||||
// When the user writes data using write(2), the write goes into the page
|
||||
|
@ -710,7 +741,18 @@ type WriteFileOp struct {
|
|||
// be written, except on error (http://goo.gl/KUpwwn). This appears to be
|
||||
// because it uses file mmapping machinery (http://goo.gl/SGxnaN) to write a
|
||||
// page at a time.
|
||||
Data []byte
|
||||
Data []byte
|
||||
|
||||
// Set by the file system: "no reuse" flag.
|
||||
//
|
||||
// By default, the Data buffer is reused by the library, so the file system
|
||||
// must copy the data if it wants to use it later.
|
||||
//
|
||||
// However, if the file system sets this flag to true, the library doesn't
|
||||
// reuse this buffer, so the file system can safely store and use Data slice
|
||||
// without copying memory.
|
||||
SuppressReuse bool
|
||||
|
||||
OpContext OpContext
|
||||
}
|
||||
|
||||
|
|
|
@ -52,6 +52,7 @@ type FileSystem interface {
|
|||
ReleaseDirHandle(context.Context, *fuseops.ReleaseDirHandleOp) error
|
||||
OpenFile(context.Context, *fuseops.OpenFileOp) error
|
||||
ReadFile(context.Context, *fuseops.ReadFileOp) error
|
||||
VectoredRead(context.Context, *fuseops.VectoredReadOp) error
|
||||
WriteFile(context.Context, *fuseops.WriteFileOp) error
|
||||
SyncFile(context.Context, *fuseops.SyncFileOp) error
|
||||
FlushFile(context.Context, *fuseops.FlushFileOp) error
|
||||
|
@ -190,6 +191,9 @@ func (s *fileSystemServer) handleOp(
|
|||
case *fuseops.ReadFileOp:
|
||||
err = s.fs.ReadFile(ctx, typed)
|
||||
|
||||
case *fuseops.VectoredReadOp:
|
||||
err = s.fs.VectoredRead(ctx, typed)
|
||||
|
||||
case *fuseops.WriteFileOp:
|
||||
err = s.fs.WriteFile(ctx, typed)
|
||||
|
||||
|
|
|
@ -138,6 +138,12 @@ func (fs *NotImplementedFileSystem) ReadFile(
|
|||
return fuse.ENOSYS
|
||||
}
|
||||
|
||||
func (fs *NotImplementedFileSystem) VectoredRead(
|
||||
ctx context.Context,
|
||||
op *fuseops.VectoredReadOp) error {
|
||||
return fuse.ENOSYS
|
||||
}
|
||||
|
||||
func (fs *NotImplementedFileSystem) WriteFile(
|
||||
ctx context.Context,
|
||||
op *fuseops.WriteFileOp) error {
|
||||
|
|
|
@ -42,6 +42,7 @@ func init() {
|
|||
type InMessage struct {
|
||||
remaining []byte
|
||||
storage []byte
|
||||
size int
|
||||
}
|
||||
|
||||
// NewInMessage creates a new InMessage with its storage initialized.
|
||||
|
@ -66,6 +67,7 @@ func (m *InMessage) Init(r io.Reader) error {
|
|||
return fmt.Errorf("Unexpectedly read only %d bytes.", n)
|
||||
}
|
||||
|
||||
m.size = n
|
||||
m.remaining = m.storage[headerSize:n]
|
||||
|
||||
// Check the header's length.
|
||||
|
@ -114,3 +116,11 @@ func (m *InMessage) ConsumeBytes(n uintptr) []byte {
|
|||
|
||||
return b
|
||||
}
|
||||
|
||||
// Get the next n bytes after the message to use them as a temporary buffer
|
||||
func (m *InMessage) GetFree(n int) []byte {
|
||||
if n <= 0 || n > len(m.storage)-m.size {
|
||||
return nil
|
||||
}
|
||||
return m.storage[m.size : m.size+n]
|
||||
}
|
||||
|
|
|
@ -17,4 +17,4 @@ package buffer
|
|||
// The maximum fuse write request size that InMessage can acommodate.
|
||||
//
|
||||
// As of kernel 4.20 Linux accepts writes up to 256 pages or 1MiB
|
||||
const MaxWriteSize = 1 << 20
|
||||
const MaxWriteSize = 1 << 17
|
||||
|
|
|
@ -16,7 +16,6 @@ package buffer
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"reflect"
|
||||
"unsafe"
|
||||
|
||||
|
@ -33,30 +32,15 @@ const OutMessageHeaderSize = int(unsafe.Sizeof(fusekernel.OutHeader{}))
|
|||
//
|
||||
// Must be initialized with Reset.
|
||||
type OutMessage struct {
|
||||
// The offset into payload to which we're currently writing.
|
||||
payloadOffset int
|
||||
|
||||
header fusekernel.OutHeader
|
||||
payload [MaxReadSize]byte
|
||||
}
|
||||
|
||||
// Make sure that the header and payload are contiguous.
|
||||
func init() {
|
||||
a := unsafe.Offsetof(OutMessage{}.header) + uintptr(OutMessageHeaderSize)
|
||||
b := unsafe.Offsetof(OutMessage{}.payload)
|
||||
|
||||
if a != b {
|
||||
log.Panicf(
|
||||
"header ends at offset %d, but payload starts at offset %d",
|
||||
a, b)
|
||||
}
|
||||
header fusekernel.OutHeader
|
||||
Sglist [][]byte
|
||||
}
|
||||
|
||||
// Reset resets m so that it's ready to be used again. Afterward, the contents
|
||||
// are solely a zeroed fusekernel.OutHeader struct.
|
||||
func (m *OutMessage) Reset() {
|
||||
m.payloadOffset = 0
|
||||
m.header = fusekernel.OutHeader{}
|
||||
m.Sglist = nil
|
||||
}
|
||||
|
||||
// OutHeader returns a pointer to the header at the start of the message.
|
||||
|
@ -64,30 +48,12 @@ func (m *OutMessage) OutHeader() *fusekernel.OutHeader {
|
|||
return &m.header
|
||||
}
|
||||
|
||||
// Grow grows m's buffer by the given number of bytes, returning a pointer to
|
||||
// the start of the new segment, which is guaranteed to be zeroed. If there is
|
||||
// insufficient space, it returns nil.
|
||||
// Grow adds a new buffer of <n> bytes to the message, returning a pointer to
|
||||
// the start of the new segment, which is guaranteed to be zeroed.
|
||||
func (m *OutMessage) Grow(n int) unsafe.Pointer {
|
||||
p := m.GrowNoZero(n)
|
||||
if p != nil {
|
||||
jacobsa_fuse_memclr(p, uintptr(n))
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// GrowNoZero is equivalent to Grow, except the new segment is not zeroed. Use
|
||||
// with caution!
|
||||
func (m *OutMessage) GrowNoZero(n int) unsafe.Pointer {
|
||||
// Will we overflow the buffer?
|
||||
o := m.payloadOffset
|
||||
if len(m.payload)-o < n {
|
||||
return nil
|
||||
}
|
||||
|
||||
p := unsafe.Pointer(uintptr(unsafe.Pointer(&m.payload)) + uintptr(o))
|
||||
m.payloadOffset = o + n
|
||||
|
||||
b := make([]byte, n)
|
||||
m.Append(b)
|
||||
p := unsafe.Pointer(&b[0])
|
||||
return p
|
||||
}
|
||||
|
||||
|
@ -100,51 +66,62 @@ func (m *OutMessage) ShrinkTo(n int) {
|
|||
n,
|
||||
m.Len()))
|
||||
}
|
||||
|
||||
m.payloadOffset = n - OutMessageHeaderSize
|
||||
if n == OutMessageHeaderSize {
|
||||
m.Sglist = nil
|
||||
} else {
|
||||
i := 1
|
||||
n -= OutMessageHeaderSize
|
||||
for len(m.Sglist) > i && n >= len(m.Sglist[i]) {
|
||||
n -= len(m.Sglist[i])
|
||||
i++
|
||||
}
|
||||
if n > 0 {
|
||||
m.Sglist[i] = m.Sglist[i][0:n]
|
||||
i++
|
||||
}
|
||||
m.Sglist = m.Sglist[0:i]
|
||||
}
|
||||
}
|
||||
|
||||
// Append is equivalent to growing by len(src), then copying src over the new
|
||||
// segment. Int panics if there is not enough room available.
|
||||
func (m *OutMessage) Append(src []byte) {
|
||||
p := m.GrowNoZero(len(src))
|
||||
if p == nil {
|
||||
panic(fmt.Sprintf("Can't grow %d bytes", len(src)))
|
||||
func (m *OutMessage) Append(src ...[]byte) {
|
||||
if m.Sglist == nil {
|
||||
// First element of Sglist is pre-filled with a pointer to the header
|
||||
// to allow sending it with a single writev() call without copying the
|
||||
// slice again
|
||||
m.Sglist = append(m.Sglist, m.OutHeaderBytes())
|
||||
}
|
||||
|
||||
sh := (*reflect.SliceHeader)(unsafe.Pointer(&src))
|
||||
jacobsa_fuse_memmove(p, unsafe.Pointer(sh.Data), uintptr(sh.Len))
|
||||
|
||||
m.Sglist = append(m.Sglist, src...)
|
||||
return
|
||||
}
|
||||
|
||||
// AppendString is like Append, but accepts string input.
|
||||
func (m *OutMessage) AppendString(src string) {
|
||||
p := m.GrowNoZero(len(src))
|
||||
if p == nil {
|
||||
panic(fmt.Sprintf("Can't grow %d bytes", len(src)))
|
||||
}
|
||||
|
||||
sh := (*reflect.StringHeader)(unsafe.Pointer(&src))
|
||||
jacobsa_fuse_memmove(p, unsafe.Pointer(sh.Data), uintptr(sh.Len))
|
||||
|
||||
m.Append([]byte(src))
|
||||
return
|
||||
}
|
||||
|
||||
// Len returns the current size of the message, including the leading header.
|
||||
func (m *OutMessage) Len() int {
|
||||
return OutMessageHeaderSize + m.payloadOffset
|
||||
if m.Sglist == nil {
|
||||
return OutMessageHeaderSize
|
||||
}
|
||||
// First element of Sglist is the header, so we don't need to count it here
|
||||
r := 0
|
||||
for _, b := range m.Sglist {
|
||||
r += len(b)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// Bytes returns a reference to the current contents of the buffer, including
|
||||
// the leading header.
|
||||
func (m *OutMessage) Bytes() []byte {
|
||||
l := m.Len()
|
||||
// OutHeaderBytes returns a byte slice containing the current header.
|
||||
func (m *OutMessage) OutHeaderBytes() []byte {
|
||||
l := OutMessageHeaderSize
|
||||
sh := reflect.SliceHeader{
|
||||
Data: uintptr(unsafe.Pointer(&m.header)),
|
||||
Len: l,
|
||||
Cap: l,
|
||||
}
|
||||
|
||||
return *(*[]byte)(unsafe.Pointer(&sh))
|
||||
}
|
||||
|
|
|
@ -107,9 +107,12 @@ func TestOutMessageAppend(t *testing.T) {
|
|||
t.Errorf("om.Len() = %d, want %d", got, want)
|
||||
}
|
||||
|
||||
b := om.Bytes()
|
||||
b := []byte(nil)
|
||||
for i := 0; i < len(om.Sglist); i++ {
|
||||
b = append(b, om.Sglist[i]...)
|
||||
}
|
||||
if got, want := len(b), wantLen; got != want {
|
||||
t.Fatalf("len(om.Bytes()) = %d, want %d", got, want)
|
||||
t.Fatalf("len(om.OutHeaderBytes()) = %d, want %d", got, want)
|
||||
}
|
||||
|
||||
want := append(
|
||||
|
@ -137,9 +140,12 @@ func TestOutMessageAppendString(t *testing.T) {
|
|||
t.Errorf("om.Len() = %d, want %d", got, want)
|
||||
}
|
||||
|
||||
b := om.Bytes()
|
||||
b := []byte(nil)
|
||||
for i := 0; i < len(om.Sglist); i++ {
|
||||
b = append(b, om.Sglist[i]...)
|
||||
}
|
||||
if got, want := len(b), wantLen; got != want {
|
||||
t.Fatalf("len(om.Bytes()) = %d, want %d", got, want)
|
||||
t.Fatalf("len(om.OutHeaderBytes()) = %d, want %d", got, want)
|
||||
}
|
||||
|
||||
want := append(
|
||||
|
@ -168,9 +174,12 @@ func TestOutMessageShrinkTo(t *testing.T) {
|
|||
t.Errorf("om.Len() = %d, want %d", got, want)
|
||||
}
|
||||
|
||||
b := om.Bytes()
|
||||
b := []byte(nil)
|
||||
for i := 0; i < len(om.Sglist); i++ {
|
||||
b = append(b, om.Sglist[i]...)
|
||||
}
|
||||
if got, want := len(b), wantLen; got != want {
|
||||
t.Fatalf("len(om.Bytes()) = %d, want %d", got, want)
|
||||
t.Fatalf("len(om.OutHeaderBytes()) = %d, want %d", got, want)
|
||||
}
|
||||
|
||||
want := append(
|
||||
|
@ -201,7 +210,7 @@ func TestOutMessageHeader(t *testing.T) {
|
|||
*h = want
|
||||
|
||||
// Check that the result is as expected.
|
||||
b := om.Bytes()
|
||||
b := om.OutHeaderBytes()
|
||||
if len(b) != int(unsafe.Sizeof(want)) {
|
||||
t.Fatalf("unexpected length %d; want %d", len(b), unsafe.Sizeof(want))
|
||||
}
|
||||
|
@ -225,9 +234,7 @@ func TestOutMessageReset(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure a non-zero payload length.
|
||||
if p := om.GrowNoZero(128); p == nil {
|
||||
t.Fatal("GrowNoZero failed")
|
||||
}
|
||||
p := om.Grow(128)
|
||||
|
||||
// Reset.
|
||||
om.Reset()
|
||||
|
@ -259,10 +266,7 @@ func TestOutMessageGrow(t *testing.T) {
|
|||
// Set up garbage where the payload will soon be.
|
||||
const payloadSize = 1234
|
||||
{
|
||||
p := om.GrowNoZero(payloadSize)
|
||||
if p == nil {
|
||||
t.Fatal("GrowNoZero failed")
|
||||
}
|
||||
p := om.Grow(payloadSize)
|
||||
|
||||
err := fillWithGarbage(p, payloadSize)
|
||||
if err != nil {
|
||||
|
@ -283,7 +287,10 @@ func TestOutMessageGrow(t *testing.T) {
|
|||
t.Errorf("om.Len() = %d, want %d", got, want)
|
||||
}
|
||||
|
||||
b := om.Bytes()
|
||||
b := []byte(nil)
|
||||
for i := 0; i < len(om.Sglist); i++ {
|
||||
b = append(b, om.Sglist[i]...)
|
||||
}
|
||||
if got, want := len(b), wantLen; got != want {
|
||||
t.Fatalf("len(om.Len()) = %d, want %d", got, want)
|
||||
}
|
||||
|
@ -304,7 +311,7 @@ func BenchmarkOutMessageReset(b *testing.B) {
|
|||
om.Reset()
|
||||
}
|
||||
|
||||
b.SetBytes(int64(unsafe.Offsetof(om.payload)))
|
||||
b.SetBytes(int64(om.Len()))
|
||||
})
|
||||
|
||||
// Many megabytes worth of buffers, which should defeat the CPU cache.
|
||||
|
@ -321,7 +328,7 @@ func BenchmarkOutMessageReset(b *testing.B) {
|
|||
oms[i%numMessages].Reset()
|
||||
}
|
||||
|
||||
b.SetBytes(int64(unsafe.Offsetof(oms[0].payload)))
|
||||
b.SetBytes(int64(oms[0].Len()))
|
||||
})
|
||||
}
|
||||
|
||||
|
|
19
mount.go
19
mount.go
|
@ -20,6 +20,7 @@ import (
|
|||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
|
@ -71,6 +72,9 @@ func Mount(
|
|||
if cfgCopy.OpContext == nil {
|
||||
cfgCopy.OpContext = context.Background()
|
||||
}
|
||||
if cfgCopy.ReaderThreads < 1 {
|
||||
cfgCopy.ReaderThreads = 1
|
||||
}
|
||||
|
||||
// Create a Connection object wrapping the device.
|
||||
connection, err := newConnection(
|
||||
|
@ -83,11 +87,16 @@ func Mount(
|
|||
}
|
||||
|
||||
// Serve the connection in the background. When done, set the join status.
|
||||
go func() {
|
||||
server.ServeOps(connection)
|
||||
mfs.joinStatus = connection.close()
|
||||
close(mfs.joinStatusAvailable)
|
||||
}()
|
||||
atomic.AddInt64(&mfs.joinRemaining, int64(cfgCopy.ReaderThreads))
|
||||
for i := 0; i < cfgCopy.ReaderThreads; i++ {
|
||||
go func() {
|
||||
server.ServeOps(connection)
|
||||
if atomic.AddInt64(&mfs.joinRemaining, -1) == 0 {
|
||||
mfs.joinStatus = connection.close()
|
||||
close(mfs.joinStatusAvailable)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait for the mount process to complete.
|
||||
if err := <-ready; err != nil {
|
||||
|
|
|
@ -156,6 +156,16 @@ type MountConfig struct {
|
|||
// actually utilise any form of qualifiable UNIX permissions.
|
||||
DisableDefaultPermissions bool
|
||||
|
||||
// Use VectoredReadOp instead of ReadFileOp.
|
||||
// Vectored read allows file systems to reduce memory copying overhead if
|
||||
// the data is already in memory when they return it to FUSE.
|
||||
UseVectoredRead bool
|
||||
|
||||
// Number of goroutines (and hopefully threads) to use for reading from
|
||||
// the FUSE file descriptor. You can try to use more than 1 if memory
|
||||
// copying during write operations is a bottleneck for you
|
||||
ReaderThreads int
|
||||
|
||||
// OS X only.
|
||||
//
|
||||
// The name of the mounted volume, as displayed in the Finder. If empty, a
|
||||
|
|
|
@ -23,6 +23,7 @@ type MountedFileSystem struct {
|
|||
|
||||
// The result to return from Join. Not valid until the channel is closed.
|
||||
joinStatus error
|
||||
joinRemaining int64
|
||||
joinStatusAvailable chan struct{}
|
||||
}
|
||||
|
||||
|
|
|
@ -368,6 +368,8 @@ func (fs *cachingFS) ReadFile(
|
|||
ctx context.Context,
|
||||
op *fuseops.ReadFileOp) error {
|
||||
var err error
|
||||
op.BytesRead, err = io.ReadFull(rand.Reader, op.Dst)
|
||||
dst := make([]byte, op.Size)
|
||||
op.BytesRead, err = io.ReadFull(rand.Reader, dst)
|
||||
op.Data = [][]byte{dst}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -246,7 +246,8 @@ func (fs *dynamicFS) ReadFile(
|
|||
}
|
||||
reader := strings.NewReader(contents)
|
||||
var err error
|
||||
op.BytesRead, err = reader.ReadAt(op.Dst, op.Offset)
|
||||
op.Data = [][]byte{ make([]byte, op.Size) }
|
||||
op.BytesRead, err = reader.ReadAt(op.Data[0], op.Offset)
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ import (
|
|||
"github.com/jacobsa/fuse/fuseutil"
|
||||
)
|
||||
|
||||
const FooContents = "xxxx"
|
||||
var FooContents = []byte("xxxx")
|
||||
|
||||
const fooInodeID = fuseops.RootInodeID + 1
|
||||
|
||||
|
@ -171,7 +171,8 @@ func (fs *errorFS) ReadFile(
|
|||
return fmt.Errorf("Unexpected request: %#v", op)
|
||||
}
|
||||
|
||||
op.BytesRead = copy(op.Dst, FooContents)
|
||||
op.Data = [][]byte{FooContents}
|
||||
op.BytesRead = len(FooContents)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -196,7 +196,12 @@ func (fs *flushFS) ReadFile(
|
|||
}
|
||||
|
||||
// Read what we can.
|
||||
op.BytesRead = copy(op.Dst, fs.fooContents[op.Offset:])
|
||||
end := op.Offset+op.Size
|
||||
if end > int64(len(fs.fooContents)) {
|
||||
end = int64(len(fs.fooContents))
|
||||
}
|
||||
op.Data = [][]byte{ fs.fooContents[op.Offset : end] }
|
||||
op.BytesRead = int(end-op.Offset)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -250,7 +250,8 @@ func (fs *helloFS) ReadFile(
|
|||
reader := strings.NewReader("Hello, world!")
|
||||
|
||||
var err error
|
||||
op.BytesRead, err = reader.ReadAt(op.Dst, op.Offset)
|
||||
op.Data = [][]byte{ make([]byte, op.Size) }
|
||||
op.BytesRead, err = reader.ReadAt(op.Data[0], op.Offset)
|
||||
|
||||
// Special case: FUSE doesn't expect us to return io.EOF.
|
||||
if err == io.EOF {
|
||||
|
|
|
@ -692,7 +692,8 @@ func (fs *memFS) ReadFile(
|
|||
|
||||
// Serve the request.
|
||||
var err error
|
||||
op.BytesRead, err = inode.ReadAt(op.Dst, op.Offset)
|
||||
op.Data = [][]byte{ make([]byte, op.Size) }
|
||||
op.BytesRead, err = inode.ReadAt(op.Data[0], op.Offset)
|
||||
|
||||
// Don't return EOF errors; we just indicate EOF to fuse using a short read.
|
||||
if err == io.EOF {
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"github.com/jacobsa/fuse"
|
||||
"github.com/jacobsa/fuse/samples/readbenchfs"
|
||||
"log"
|
||||
"os"
|
||||
)
|
||||
|
||||
var fMountPoint = flag.String("mount_point", "", "Path to mount point.")
|
||||
var fReadOnly = flag.Bool("read_only", false, "Mount in read-only mode.")
|
||||
var fVectored = flag.Bool("vectored", false, "Use vectored read.")
|
||||
var fDebug = flag.Bool("debug", false, "Enable debug logging.")
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
server, err := readbenchfs.NewReadBenchServer()
|
||||
if err != nil {
|
||||
log.Fatalf("makeFS: %v", err)
|
||||
}
|
||||
|
||||
// Mount the file system.
|
||||
if *fMountPoint == "" {
|
||||
log.Fatalf("You must set --mount_point.")
|
||||
}
|
||||
|
||||
cfg := &fuse.MountConfig{
|
||||
ReadOnly: *fReadOnly,
|
||||
UseVectoredRead: *fVectored,
|
||||
}
|
||||
|
||||
if *fDebug {
|
||||
cfg.DebugLogger = log.New(os.Stderr, "fuse: ", 0)
|
||||
}
|
||||
|
||||
mfs, err := fuse.Mount(*fMountPoint, server, cfg)
|
||||
if err != nil {
|
||||
log.Fatalf("Mount: %v", err)
|
||||
}
|
||||
|
||||
// Wait for it to be unmounted.
|
||||
if err = mfs.Join(context.Background()); err != nil {
|
||||
log.Fatalf("Join: %v", err)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,186 @@
|
|||
// Copyright 2021 Vitaliy Filippov
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package readbenchfs
|
||||
|
||||
import (
|
||||
"golang.org/x/net/context"
|
||||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
|
||||
"github.com/jacobsa/fuse"
|
||||
"github.com/jacobsa/fuse/fuseops"
|
||||
"github.com/jacobsa/fuse/fuseutil"
|
||||
)
|
||||
|
||||
type readBenchFS struct {
|
||||
fuseutil.NotImplementedFileSystem
|
||||
buf []byte
|
||||
}
|
||||
|
||||
// 1 TB
|
||||
const fileSize = 1024 * 1024 * 1024 * 1024
|
||||
|
||||
var _ fuseutil.FileSystem = &readBenchFS{}
|
||||
|
||||
func NewReadBenchServer() (server fuse.Server, err error) {
|
||||
// 1 GB of random data to exceed CPU cache
|
||||
buf := make([]byte, 1024*1024*1024)
|
||||
rand.Read(buf)
|
||||
server = fuseutil.NewFileSystemServer(&readBenchFS{
|
||||
buf: buf,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (fs *readBenchFS) StatFS(ctx context.Context, op *fuseops.StatFSOp) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *readBenchFS) LookUpInode(ctx context.Context, op *fuseops.LookUpInodeOp) error {
|
||||
if op.Name == "test" {
|
||||
op.Entry = fuseops.ChildInodeEntry{
|
||||
Child: 2,
|
||||
Attributes: fuseops.InodeAttributes{
|
||||
Size: fileSize,
|
||||
Nlink: 1,
|
||||
Mode: 0444,
|
||||
},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return fuse.ENOENT
|
||||
}
|
||||
|
||||
func (fs *readBenchFS) GetInodeAttributes(ctx context.Context, op *fuseops.GetInodeAttributesOp) error {
|
||||
if op.Inode == 1 {
|
||||
op.Attributes = fuseops.InodeAttributes{
|
||||
Nlink: 1,
|
||||
Mode: 0755 | os.ModeDir,
|
||||
}
|
||||
return nil
|
||||
} else if op.Inode == 2 {
|
||||
op.Attributes = fuseops.InodeAttributes{
|
||||
Size: fileSize,
|
||||
Nlink: 1,
|
||||
Mode: 0444,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return fuse.ENOENT
|
||||
}
|
||||
|
||||
func (fs *readBenchFS) OpenDir(ctx context.Context, op *fuseops.OpenDirOp) error {
|
||||
// Allow opening any directory.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *readBenchFS) ReadDir(ctx context.Context, op *fuseops.ReadDirOp) error {
|
||||
if op.Inode != 1 {
|
||||
return fuse.ENOENT
|
||||
}
|
||||
if op.Offset > 0 {
|
||||
return nil
|
||||
}
|
||||
entries := []fuseutil.Dirent{
|
||||
fuseutil.Dirent{
|
||||
Offset: 1,
|
||||
Inode: 2,
|
||||
Name: "test",
|
||||
Type: fuseutil.DT_File,
|
||||
},
|
||||
}
|
||||
for _, e := range entries[op.Offset:] {
|
||||
n := fuseutil.WriteDirent(op.Dst[op.BytesRead:], e)
|
||||
if n == 0 {
|
||||
break
|
||||
}
|
||||
op.BytesRead += n
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *readBenchFS) OpenFile(ctx context.Context, op *fuseops.OpenFileOp) error {
|
||||
// Allow opening any file.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *readBenchFS) ReadFile(ctx context.Context, op *fuseops.ReadFileOp) error {
|
||||
if op.Offset > fileSize {
|
||||
return io.EOF
|
||||
}
|
||||
end := op.Offset + int64(len(op.Dst))
|
||||
if end > fileSize {
|
||||
end = fileSize
|
||||
}
|
||||
buflen := int64(len(fs.buf))
|
||||
for pos := op.Offset; pos < end; {
|
||||
s := pos % buflen
|
||||
e := buflen
|
||||
if e-s > end-pos {
|
||||
e = s + end - pos
|
||||
}
|
||||
copy(op.Dst[pos-op.Offset:], fs.buf[s:])
|
||||
pos = op.Offset + e
|
||||
}
|
||||
op.BytesRead = int(end - op.Offset)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *readBenchFS) VectoredRead(ctx context.Context, op *fuseops.VectoredReadOp) error {
|
||||
if op.Offset > fileSize {
|
||||
return io.EOF
|
||||
}
|
||||
end := op.Offset + op.Size
|
||||
if end > fileSize {
|
||||
end = fileSize
|
||||
}
|
||||
buflen := int64(len(fs.buf))
|
||||
for pos := op.Offset; pos < end; {
|
||||
s := pos % buflen
|
||||
e := buflen
|
||||
if e-s > end-pos {
|
||||
e = s + end - pos
|
||||
}
|
||||
op.Data = append(op.Data, fs.buf[s:e])
|
||||
pos = op.Offset + e
|
||||
}
|
||||
op.BytesRead = int(end - op.Offset)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *readBenchFS) ReleaseDirHandle(ctx context.Context, op *fuseops.ReleaseDirHandleOp) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *readBenchFS) GetXattr(ctx context.Context, op *fuseops.GetXattrOp) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *readBenchFS) ListXattr(ctx context.Context, op *fuseops.ListXattrOp) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *readBenchFS) ForgetInode(ctx context.Context, op *fuseops.ForgetInodeOp) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *readBenchFS) ReleaseFileHandle(ctx context.Context, op *fuseops.ReleaseFileHandleOp) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *readBenchFS) FlushFile(ctx context.Context, op *fuseops.FlushFileOp) error {
|
||||
return nil
|
||||
}
|
|
@ -160,8 +160,13 @@ func (fs *readonlyLoopbackFs) ReadFile(
|
|||
return fuse.EIO
|
||||
}
|
||||
|
||||
contents = contents[op.Offset:]
|
||||
op.BytesRead = copy(op.Dst, contents)
|
||||
end := op.Offset+op.Size
|
||||
if end > int64(len(contents)) {
|
||||
end = int64(len(contents))
|
||||
}
|
||||
|
||||
op.Data = [][]byte{ contents[op.Offset : end] }
|
||||
op.BytesRead = int(end-op.Offset)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
package fuse
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
func writev(fd int, packet [][]byte) (n int, err error) {
|
||||
iovecs := make([]syscall.Iovec, 0, len(packet))
|
||||
for _, v := range packet {
|
||||
if len(v) == 0 {
|
||||
continue
|
||||
}
|
||||
vec := syscall.Iovec{
|
||||
Base: &v[0],
|
||||
}
|
||||
vec.SetLen(len(v))
|
||||
iovecs = append(iovecs, vec)
|
||||
}
|
||||
n1, _, e1 := syscall.Syscall(
|
||||
syscall.SYS_WRITEV,
|
||||
uintptr(fd), uintptr(unsafe.Pointer(&iovecs[0])), uintptr(len(iovecs)),
|
||||
)
|
||||
n = int(n1)
|
||||
if e1 != 0 {
|
||||
err = syscall.Errno(e1)
|
||||
}
|
||||
return
|
||||
}
|
Loading…
Reference in New Issue