From bbb262ee48256894f8b3cb496554cda9c725206d Mon Sep 17 00:00:00 2001 From: Aaron Jacobs Date: Wed, 29 Jul 2015 11:02:29 +1000 Subject: [PATCH] Read directly into out messages for files. --- connection.go | 7 +-- conversions.go | 110 +++++++++++++++++++-------------- internal/buffer/out_message.go | 16 ++++- 3 files changed, 81 insertions(+), 52 deletions(-) diff --git a/connection.go b/connection.go index d888e68..9c3ce74 100644 --- a/connection.go +++ b/connection.go @@ -378,8 +378,10 @@ func (c *Connection) ReadOp() (ctx context.Context, op interface{}, err error) { } // Convert the message to an op. - op, err = convertInMessage(inMsg, c.protocol) + outMsg := c.getOutMessage() + op, err = convertInMessage(inMsg, outMsg, c.protocol) if err != nil { + c.putOutMessage(outMsg) err = fmt.Errorf("convertInMessage: %v", err) return } @@ -396,9 +398,6 @@ func (c *Connection) ReadOp() (ctx context.Context, op interface{}, err error) { continue } - // Allocate an output message up front, to be used later when replying. - outMsg := c.getOutMessage() - // Set up a context that remembers information about this op. ctx = c.beginOp(inMsg.Header().Opcode, inMsg.Header().Unique) ctx = context.WithValue(ctx, contextKey, opState{inMsg, outMsg, op, opID}) diff --git a/conversions.go b/conversions.go index 558d1d1..3fd23bb 100644 --- a/conversions.go +++ b/conversions.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "os" + "reflect" "syscall" "time" "unsafe" @@ -37,11 +38,12 @@ import ( // // The caller is responsible for arranging for the message to be destroyed. func convertInMessage( - m *buffer.InMessage, + inMsg *buffer.InMessage, + outMsg *buffer.OutMessage, protocol fusekernel.Protocol) (o interface{}, err error) { - switch m.Header().Opcode { + switch inMsg.Header().Opcode { case fusekernel.OpLookup: - buf := m.ConsumeBytes(m.Len()) + buf := inMsg.ConsumeBytes(inMsg.Len()) n := len(buf) if n == 0 || buf[n-1] != '\x00' { err = errors.New("Corrupt OpLookup") @@ -49,25 +51,25 @@ func convertInMessage( } o = &fuseops.LookUpInodeOp{ - Parent: fuseops.InodeID(m.Header().Nodeid), + Parent: fuseops.InodeID(inMsg.Header().Nodeid), Name: string(buf[:n-1]), } case fusekernel.OpGetattr: o = &fuseops.GetInodeAttributesOp{ - Inode: fuseops.InodeID(m.Header().Nodeid), + Inode: fuseops.InodeID(inMsg.Header().Nodeid), } case fusekernel.OpSetattr: type input fusekernel.SetattrIn - in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + in := (*input)(inMsg.Consume(unsafe.Sizeof(input{}))) if in == nil { err = errors.New("Corrupt OpSetattr") return } to := &fuseops.SetInodeAttributesOp{ - Inode: fuseops.InodeID(m.Header().Nodeid), + Inode: fuseops.InodeID(inMsg.Header().Nodeid), } o = to @@ -93,25 +95,25 @@ func convertInMessage( case fusekernel.OpForget: type input fusekernel.ForgetIn - in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + in := (*input)(inMsg.Consume(unsafe.Sizeof(input{}))) if in == nil { err = errors.New("Corrupt OpForget") return } o = &fuseops.ForgetInodeOp{ - Inode: fuseops.InodeID(m.Header().Nodeid), + Inode: fuseops.InodeID(inMsg.Header().Nodeid), N: in.Nlookup, } case fusekernel.OpMkdir: - in := (*fusekernel.MkdirIn)(m.Consume(fusekernel.MkdirInSize(protocol))) + in := (*fusekernel.MkdirIn)(inMsg.Consume(fusekernel.MkdirInSize(protocol))) if in == nil { err = errors.New("Corrupt OpMkdir") return } - name := m.ConsumeBytes(m.Len()) + name := inMsg.ConsumeBytes(inMsg.Len()) i := bytes.IndexByte(name, '\x00') if i < 0 { err = errors.New("Corrupt OpMkdir") @@ -120,7 +122,7 @@ func convertInMessage( name = name[:i] o = &fuseops.MkDirOp{ - Parent: fuseops.InodeID(m.Header().Nodeid), + Parent: fuseops.InodeID(inMsg.Header().Nodeid), Name: string(name), // On Linux, vfs_mkdir calls through to the inode with at most @@ -133,13 +135,13 @@ func convertInMessage( } case fusekernel.OpCreate: - in := (*fusekernel.CreateIn)(m.Consume(fusekernel.CreateInSize(protocol))) + in := (*fusekernel.CreateIn)(inMsg.Consume(fusekernel.CreateInSize(protocol))) if in == nil { err = errors.New("Corrupt OpCreate") return } - name := m.ConsumeBytes(m.Len()) + name := inMsg.ConsumeBytes(inMsg.Len()) i := bytes.IndexByte(name, '\x00') if i < 0 { err = errors.New("Corrupt OpCreate") @@ -148,14 +150,14 @@ func convertInMessage( name = name[:i] o = &fuseops.CreateFileOp{ - Parent: fuseops.InodeID(m.Header().Nodeid), + Parent: fuseops.InodeID(inMsg.Header().Nodeid), Name: string(name), Mode: convertFileMode(in.Mode), } case fusekernel.OpSymlink: // The message is "newName\0target\0". - names := m.ConsumeBytes(m.Len()) + names := inMsg.ConsumeBytes(inMsg.Len()) if len(names) == 0 || names[len(names)-1] != 0 { err = errors.New("Corrupt OpSymlink") return @@ -168,20 +170,20 @@ func convertInMessage( newName, target := names[0:i], names[i+1:len(names)-1] o = &fuseops.CreateSymlinkOp{ - Parent: fuseops.InodeID(m.Header().Nodeid), + Parent: fuseops.InodeID(inMsg.Header().Nodeid), Name: string(newName), Target: string(target), } case fusekernel.OpRename: type input fusekernel.RenameIn - in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + in := (*input)(inMsg.Consume(unsafe.Sizeof(input{}))) if in == nil { err = errors.New("Corrupt OpRename") return } - names := m.ConsumeBytes(m.Len()) + names := inMsg.ConsumeBytes(inMsg.Len()) // names should be "old\x00new\x00" if len(names) < 4 { err = errors.New("Corrupt OpRename") @@ -199,14 +201,14 @@ func convertInMessage( oldName, newName := names[:i], names[i+1:len(names)-1] o = &fuseops.RenameOp{ - OldParent: fuseops.InodeID(m.Header().Nodeid), + OldParent: fuseops.InodeID(inMsg.Header().Nodeid), OldName: string(oldName), NewParent: fuseops.InodeID(in.Newdir), NewName: string(newName), } case fusekernel.OpUnlink: - buf := m.ConsumeBytes(m.Len()) + buf := inMsg.ConsumeBytes(inMsg.Len()) n := len(buf) if n == 0 || buf[n-1] != '\x00' { err = errors.New("Corrupt OpUnlink") @@ -214,12 +216,12 @@ func convertInMessage( } o = &fuseops.UnlinkOp{ - Parent: fuseops.InodeID(m.Header().Nodeid), + Parent: fuseops.InodeID(inMsg.Header().Nodeid), Name: string(buf[:n-1]), } case fusekernel.OpRmdir: - buf := m.ConsumeBytes(m.Len()) + buf := inMsg.ConsumeBytes(inMsg.Len()) n := len(buf) if n == 0 || buf[n-1] != '\x00' { err = errors.New("Corrupt OpRmdir") @@ -227,43 +229,56 @@ func convertInMessage( } o = &fuseops.RmDirOp{ - Parent: fuseops.InodeID(m.Header().Nodeid), + Parent: fuseops.InodeID(inMsg.Header().Nodeid), Name: string(buf[:n-1]), } case fusekernel.OpOpen: o = &fuseops.OpenFileOp{ - Inode: fuseops.InodeID(m.Header().Nodeid), + Inode: fuseops.InodeID(inMsg.Header().Nodeid), } case fusekernel.OpOpendir: o = &fuseops.OpenDirOp{ - Inode: fuseops.InodeID(m.Header().Nodeid), + Inode: fuseops.InodeID(inMsg.Header().Nodeid), } case fusekernel.OpRead: - in := (*fusekernel.ReadIn)(m.Consume(fusekernel.ReadInSize(protocol))) + in := (*fusekernel.ReadIn)(inMsg.Consume(fusekernel.ReadInSize(protocol))) if in == nil { err = errors.New("Corrupt OpRead") return } - o = &fuseops.ReadFileOp{ - Inode: fuseops.InodeID(m.Header().Nodeid), + to := &fuseops.ReadFileOp{ + Inode: fuseops.InodeID(inMsg.Header().Nodeid), Handle: fuseops.HandleID(in.Fh), Offset: int64(in.Offset), Dst: make([]byte, in.Size), } + o = to + + readSize := int(in.Size) + p := outMsg.GrowNoZero(uintptr(readSize)) + if p == nil { + err = fmt.Errorf("Can't grow for %d-byte read", readSize) + return + } + + sh := (*reflect.SliceHeader)(unsafe.Pointer(&to.Dst)) + sh.Data = uintptr(p) + sh.Len = readSize + sh.Cap = readSize case fusekernel.OpReaddir: - in := (*fusekernel.ReadIn)(m.Consume(fusekernel.ReadInSize(protocol))) + in := (*fusekernel.ReadIn)(inMsg.Consume(fusekernel.ReadInSize(protocol))) if in == nil { err = errors.New("Corrupt OpReaddir") return } o = &fuseops.ReadDirOp{ - Inode: fuseops.InodeID(m.Header().Nodeid), + Inode: fuseops.InodeID(inMsg.Header().Nodeid), Handle: fuseops.HandleID(in.Fh), Offset: fuseops.DirOffset(in.Offset), Dst: make([]byte, in.Size), @@ -271,7 +286,7 @@ func convertInMessage( case fusekernel.OpRelease: type input fusekernel.ReleaseIn - in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + in := (*input)(inMsg.Consume(unsafe.Sizeof(input{}))) if in == nil { err = errors.New("Corrupt OpRelease") return @@ -283,7 +298,7 @@ func convertInMessage( case fusekernel.OpReleasedir: type input fusekernel.ReleaseIn - in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + in := (*input)(inMsg.Consume(unsafe.Sizeof(input{}))) if in == nil { err = errors.New("Corrupt OpReleasedir") return @@ -294,20 +309,20 @@ func convertInMessage( } case fusekernel.OpWrite: - in := (*fusekernel.WriteIn)(m.Consume(fusekernel.WriteInSize(protocol))) + in := (*fusekernel.WriteIn)(inMsg.Consume(fusekernel.WriteInSize(protocol))) if in == nil { err = errors.New("Corrupt OpWrite") return } - buf := m.ConsumeBytes(m.Len()) + buf := inMsg.ConsumeBytes(inMsg.Len()) if len(buf) < int(in.Size) { err = errors.New("Corrupt OpWrite") return } o = &fuseops.WriteFileOp{ - Inode: fuseops.InodeID(m.Header().Nodeid), + Inode: fuseops.InodeID(inMsg.Header().Nodeid), Handle: fuseops.HandleID(in.Fh), Data: buf, Offset: int64(in.Offset), @@ -315,33 +330,33 @@ func convertInMessage( case fusekernel.OpFsync: type input fusekernel.FsyncIn - in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + in := (*input)(inMsg.Consume(unsafe.Sizeof(input{}))) if in == nil { err = errors.New("Corrupt OpFsync") return } o = &fuseops.SyncFileOp{ - Inode: fuseops.InodeID(m.Header().Nodeid), + Inode: fuseops.InodeID(inMsg.Header().Nodeid), Handle: fuseops.HandleID(in.Fh), } case fusekernel.OpFlush: type input fusekernel.FlushIn - in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + in := (*input)(inMsg.Consume(unsafe.Sizeof(input{}))) if in == nil { err = errors.New("Corrupt OpFlush") return } o = &fuseops.FlushFileOp{ - Inode: fuseops.InodeID(m.Header().Nodeid), + Inode: fuseops.InodeID(inMsg.Header().Nodeid), Handle: fuseops.HandleID(in.Fh), } case fusekernel.OpReadlink: o = &fuseops.ReadSymlinkOp{ - Inode: fuseops.InodeID(m.Header().Nodeid), + Inode: fuseops.InodeID(inMsg.Header().Nodeid), } case fusekernel.OpStatfs: @@ -349,7 +364,7 @@ func convertInMessage( case fusekernel.OpInterrupt: type input fusekernel.InterruptIn - in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + in := (*input)(inMsg.Consume(unsafe.Sizeof(input{}))) if in == nil { err = errors.New("Corrupt OpInterrupt") return @@ -361,7 +376,7 @@ func convertInMessage( case fusekernel.OpInit: type input fusekernel.InitIn - in := (*input)(m.Consume(unsafe.Sizeof(input{}))) + in := (*input)(inMsg.Consume(unsafe.Sizeof(input{}))) if in == nil { err = errors.New("Corrupt OpInit") return @@ -375,8 +390,8 @@ func convertInMessage( default: o = &unknownOp{ - opCode: m.Header().Opcode, - inode: fuseops.InodeID(m.Header().Nodeid), + opCode: inMsg.Header().Opcode, + inode: fuseops.InodeID(inMsg.Header().Nodeid), } } @@ -484,7 +499,10 @@ func (c *Connection) kernelResponseForOp( out.Fh = uint64(o.Handle) case *fuseops.ReadFileOp: - m.Append(o.Dst[:o.BytesRead]) + // convertInMessage already set up the destination buffer to be at the end + // of the out message. We need only shrink to the right size based on how + // much the user read. + m.Shrink(uintptr(m.Len() - (int(buffer.OutMessageInitialSize) + o.BytesRead))) case *fuseops.WriteFileOp: out := (*fusekernel.WriteOut)(m.Grow(unsafe.Sizeof(fusekernel.WriteOut{}))) diff --git a/internal/buffer/out_message.go b/internal/buffer/out_message.go index 59b5d42..aedc1e7 100644 --- a/internal/buffer/out_message.go +++ b/internal/buffer/out_message.go @@ -25,6 +25,9 @@ import ( const outHeaderSize = unsafe.Sizeof(fusekernel.OutHeader{}) +// OutMessage structs begin life with Len() == OutMessageInitialSize. +const OutMessageInitialSize = outHeaderSize + // We size out messages to be large enough to hold a header for the response // plus the largest read that may come in. const outMessageSize = outHeaderSize + MaxReadSize @@ -53,8 +56,8 @@ func init() { // Reset the message so that it is ready to be used again. Afterward, the // contents are solely a zeroed header. func (m *OutMessage) Reset() { - m.offset = outHeaderSize - memclr(unsafe.Pointer(&m.storage), outHeaderSize) + m.offset = OutMessageInitialSize + memclr(unsafe.Pointer(&m.storage), OutMessageInitialSize) } // Return a pointer to the header at the start of the message. @@ -87,6 +90,15 @@ func (b *OutMessage) GrowNoZero(size uintptr) (p unsafe.Pointer) { return } +// Throw away the last n bytes. Panics if n is out of range. +func (b *OutMessage) Shrink(n uintptr) { + if n > b.offset-OutMessageInitialSize { + panic(fmt.Sprintf("Shrink(%d) out of range for offset %d", n, b.offset)) + } + + b.offset -= n +} + // Equivalent to growing by the length of p, then copying p over the new // segment. Panics if there is not enough room available. func (b *OutMessage) Append(src []byte) {