diff --git a/conversions.go b/conversions.go index b1246bc..b30a643 100644 --- a/conversions.go +++ b/conversions.go @@ -561,6 +561,10 @@ func (c *Connection) kernelResponseForOp( out.OpenFlags |= uint32(fusekernel.OpenKeepCache) } + if o.UseDirectIO { + out.OpenFlags |= uint32(fusekernel.OpenDirectIO) + } + case *fuseops.ReadFileOp: // convertInMessage already set up the destination buffer to be at the end // of the out message. We need only shrink to the right size based on how diff --git a/fuseops/ops.go b/fuseops/ops.go index 609249f..f8fa739 100644 --- a/fuseops/ops.go +++ b/fuseops/ops.go @@ -564,6 +564,15 @@ type OpenFileOp struct { // is set to true, regardless of its value, at least for files opened in the // same mode. (Cf. https://github.com/osxfuse/osxfuse/issues/223) KeepPageCache bool + + // Whether to use direct IO for this file handle. By default, the kernel + // suppresses what it sees as redundant operations (including reads beyond + // the precomputed EOF). + // + // Enabling direct IO ensures that all client operations reach the fuse + // layer. This allows for filesystems whose file sizes are not known in + // advance, for example, because contents are generated on the fly. + UseDirectIO bool } // Read data from a file previously opened with CreateFile or OpenFile. @@ -591,6 +600,8 @@ type ReadFileOp struct { // (http://goo.gl/SGxnaN) to read a page at a time. It appears to understand // where EOF is by checking the inode size (http://goo.gl/0BkqKD), returned // by a previous call to LookUpInode, GetInodeAttributes, etc. + // + // If direct IO is enabled, semantics should match those of read(2). BytesRead int } diff --git a/samples/dynamicfs/dynamic_fs.go b/samples/dynamicfs/dynamic_fs.go new file mode 100644 index 0000000..7922389 --- /dev/null +++ b/samples/dynamicfs/dynamic_fs.go @@ -0,0 +1,284 @@ +package dynamicfs + +import ( + "fmt" + "io" + "log" + "os" + "strings" + "sync" + "time" + + "golang.org/x/net/context" + + "github.com/jacobsa/fuse" + "github.com/jacobsa/fuse/fuseops" + "github.com/jacobsa/fuse/fuseutil" + "github.com/jacobsa/timeutil" +) + +// Create a file system that contains 2 files (`age` and `weekday`) and no +// directories. Every time the `age` file is opened, its contents are refreshed +// to show the number of seconds elapsed since the file system was created (as +// opposed to mounted). Every time the `weekday` file is opened, its contents +// are refreshed to reflect the current weekday. +// +// The contents of both of these files is updated within the filesystem itself, +// i.e., these changes do not go through the kernel. Additionally, file access +// times are not updated and file size is not known in advance and is set to 0. +// This simulates a filesystem that is backed by a dynamic data source where +// file metadata is not necessarily known before the file is read. For example, +// a filesystem backed by an expensive RPC or by a stream that's generated on +// the fly might not know data size ahead of time. +// +// This implementation depends on direct IO in fuse. Without it, all read +// operations are suppressed because the kernel detects that they read beyond +// the end of the files. +func NewDynamicFS(clock timeutil.Clock) (server fuse.Server, err error) { + createTime := clock.Now() + fs := &dynamicFS{ + clock: clock, + createTime: createTime, + fileHandles: make(map[fuseops.HandleID]string), + } + server = fuseutil.NewFileSystemServer(fs) + return +} + +type dynamicFS struct { + fuseutil.NotImplementedFileSystem + mu sync.Mutex + clock timeutil.Clock + createTime time.Time + nextHandle fuseops.HandleID + fileHandles map[fuseops.HandleID]string +} + +const ( + rootInode fuseops.InodeID = fuseops.RootInodeID + iota + ageInode + weekdayInode +) + +type inodeInfo struct { + attributes fuseops.InodeAttributes + + // File or directory? + dir bool + + // For directories, children. + children []fuseutil.Dirent +} + +// We have a fixed directory structure. +var gInodeInfo = map[fuseops.InodeID]inodeInfo{ + // root + rootInode: { + attributes: fuseops.InodeAttributes{ + Nlink: 1, + Mode: 0555 | os.ModeDir, + }, + dir: true, + children: []fuseutil.Dirent{ + { + Offset: 1, + Inode: ageInode, + Name: "age", + Type: fuseutil.DT_File, + }, + { + Offset: 2, + Inode: weekdayInode, + Name: "weekday", + Type: fuseutil.DT_File, + }, + }, + }, + + // age + ageInode: { + attributes: fuseops.InodeAttributes{ + Nlink: 1, + Mode: 0444, + }, + }, + + // weekday + weekdayInode: { + attributes: fuseops.InodeAttributes{ + Nlink: 1, + Mode: 0444, + // Size left at 0. + }, + }, +} + +func findChildInode( + name string, + children []fuseutil.Dirent) (inode fuseops.InodeID, err error) { + for _, e := range children { + if e.Name == name { + inode = e.Inode + return + } + } + + err = fuse.ENOENT + return +} + +func (fs *dynamicFS) findUnusedHandle() fuseops.HandleID { + // TODO: Mutex annotation? + handle := fs.nextHandle + for _, exists := fs.fileHandles[handle]; exists; _, exists = fs.fileHandles[handle] { + handle++ + } + fs.nextHandle = handle + 1 + return handle +} + +func (fs *dynamicFS) GetInodeAttributes( + ctx context.Context, + op *fuseops.GetInodeAttributesOp) (err error) { + // Find the info for this inode. + info, ok := gInodeInfo[op.Inode] + if !ok { + err = fuse.ENOENT + return + } + // Copy over its attributes. + op.Attributes = info.attributes + return +} + +func (fs *dynamicFS) LookUpInode( + ctx context.Context, + op *fuseops.LookUpInodeOp) (err error) { + // Find the info for the parent. + parentInfo, ok := gInodeInfo[op.Parent] + if !ok { + err = fuse.ENOENT + return + } + + // Find the child within the parent. + childInode, err := findChildInode(op.Name, parentInfo.children) + if err != nil { + return + } + + // Copy over information. + op.Entry.Child = childInode + op.Entry.Attributes = gInodeInfo[childInode].attributes + + return +} + +func (fs *dynamicFS) OpenDir( + ctx context.Context, + op *fuseops.OpenDirOp) (err error) { + // Allow opening directory. + return +} + +func (fs *dynamicFS) ReadDir( + ctx context.Context, + op *fuseops.ReadDirOp) (err error) { + // Find the info for this inode. + info, ok := gInodeInfo[op.Inode] + if !ok { + err = fuse.ENOENT + return + } + + if !info.dir { + err = fuse.EIO + return + } + + entries := info.children + + // Grab the range of interest. + if op.Offset > fuseops.DirOffset(len(entries)) { + err = fuse.EIO + return + } + + entries = entries[op.Offset:] + + // Resume at the specified offset into the array. + for _, e := range entries { + n := fuseutil.WriteDirent(op.Dst[op.BytesRead:], e) + if n == 0 { + break + } + + op.BytesRead += n + } + + return +} + +func (fs *dynamicFS) OpenFile( + ctx context.Context, + op *fuseops.OpenFileOp) (err error) { + fs.mu.Lock() + defer fs.mu.Unlock() + var contents string + // Update file contents on (and only on) open. + switch op.Inode { + case ageInode: + now := fs.clock.Now() + ageInSeconds := int(now.Sub(fs.createTime).Seconds()) + contents = fmt.Sprintf("This filesystem is %d seconds old.", ageInSeconds) + case weekdayInode: + contents = fmt.Sprintf("Today is %s.", fs.clock.Now().Weekday()) + default: + err = fuse.EINVAL + return + } + handle := fs.findUnusedHandle() + fs.fileHandles[handle] = contents + op.UseDirectIO = true + op.Handle = handle + return +} + +func (fs *dynamicFS) ReadFile( + ctx context.Context, + op *fuseops.ReadFileOp) (err error) { + fs.mu.Lock() + defer fs.mu.Unlock() + contents, ok := fs.fileHandles[op.Handle] + if !ok { + log.Printf("ReadFile: no open file handle: %d", op.Handle) + err = fuse.EIO + return + } + reader := strings.NewReader(contents) + op.BytesRead, err = reader.ReadAt(op.Dst, op.Offset) + if err == io.EOF { + err = nil + } + return +} + +func (fs *dynamicFS) ReleaseFileHandle( + ctx context.Context, + op *fuseops.ReleaseFileHandleOp) (err error) { + fs.mu.Lock() + defer fs.mu.Unlock() + _, ok := fs.fileHandles[op.Handle] + if !ok { + log.Printf("ReleaseFileHandle: bad handle: %d", op.Handle) + err = fuse.EIO + return + } + delete(fs.fileHandles, op.Handle) + return +} + +func (fs *dynamicFS) StatFS(ctx context.Context, + op *fuseops.StatFSOp) (err error) { + return +} diff --git a/samples/dynamicfs/dynamic_fs_test.go b/samples/dynamicfs/dynamic_fs_test.go new file mode 100644 index 0000000..5091f27 --- /dev/null +++ b/samples/dynamicfs/dynamic_fs_test.go @@ -0,0 +1,181 @@ +package dynamicfs_test + +import ( + "testing" + + "github.com/jacobsa/fuse/fusetesting" + "github.com/jacobsa/fuse/samples" + "github.com/jacobsa/fuse/samples/dynamicfs" + + "bytes" + "fmt" + "io/ioutil" + "os" + "path" + "syscall" + "time" + + . "github.com/jacobsa/oglematchers" + . "github.com/jacobsa/ogletest" +) + +func TestDynamicFS(t *testing.T) { RunTests(t) } + +type DynamicFSTest struct { + samples.SampleTest +} + +func init() { + RegisterTestSuite(&DynamicFSTest{}) +} + +var gCreateTime = time.Date(2017, 5, 4, 14, 53, 10, 0, time.UTC) + +func (t *DynamicFSTest) SetUp(ti *TestInfo) { + var err error + t.Clock.SetTime(gCreateTime) + t.Server, err = dynamicfs.NewDynamicFS(&t.Clock) + AssertEq(nil, err) + t.SampleTest.SetUp(ti) +} + +func (t *DynamicFSTest) ReadDir_Root() { + entries, err := fusetesting.ReadDirPicky(t.Dir) + AssertEq(nil, err) + AssertEq(2, len(entries)) + + var fi os.FileInfo + fi = entries[0] + ExpectEq("age", fi.Name()) + ExpectEq(0, fi.Size()) + ExpectEq(0444, fi.Mode()) + ExpectFalse(fi.IsDir()) + + fi = entries[1] + ExpectEq("weekday", fi.Name()) + ExpectEq(0, fi.Size()) + ExpectEq(0444, fi.Mode()) + ExpectFalse(fi.IsDir()) +} + +func (t *DynamicFSTest) ReadDir_NonExistent() { + _, err := fusetesting.ReadDirPicky(path.Join(t.Dir, "nosuchfile")) + + AssertNe(nil, err) + ExpectThat(err, Error(HasSubstr("no such file"))) +} + +func (t *DynamicFSTest) Stat_Age() { + fi, err := os.Stat(path.Join(t.Dir, "age")) + AssertEq(nil, err) + + ExpectEq("age", fi.Name()) + ExpectEq(0, fi.Size()) + ExpectEq(0444, fi.Mode()) + ExpectFalse(fi.IsDir()) + ExpectEq(1, fi.Sys().(*syscall.Stat_t).Nlink) +} + +func (t *DynamicFSTest) Stat_Weekday() { + fi, err := os.Stat(path.Join(t.Dir, "weekday")) + AssertEq(nil, err) + + ExpectEq("weekday", fi.Name()) + ExpectEq(0, fi.Size()) + ExpectEq(0444, fi.Mode()) + ExpectFalse(fi.IsDir()) + ExpectEq(1, fi.Sys().(*syscall.Stat_t).Nlink) +} + +func (t *DynamicFSTest) Stat_NonExistent() { + _, err := os.Stat(path.Join(t.Dir, "nosuchfile")) + + AssertNe(nil, err) + ExpectThat(err, Error(HasSubstr("no such file"))) +} + +func (t *DynamicFSTest) ReadFile_AgeZero() { + t.Clock.SetTime(gCreateTime) + slice, err := ioutil.ReadFile(path.Join(t.Dir, "age")) + + AssertEq(nil, err) + ExpectEq("This filesystem is 0 seconds old.", string(slice)) +} + +func (t *DynamicFSTest) ReadFile_Age1000() { + t.Clock.SetTime(gCreateTime.Add(1000 * time.Second)) + slice, err := ioutil.ReadFile(path.Join(t.Dir, "age")) + + AssertEq(nil, err) + ExpectEq("This filesystem is 1000 seconds old.", string(slice)) +} + +func (t *DynamicFSTest) ReadFile_WeekdayNow() { + now := t.Clock.Now() + // Does simulated clock advance itself by default? + // Manually set time to ensure it's frozen. + t.Clock.SetTime(now) + slice, err := ioutil.ReadFile(path.Join(t.Dir, "weekday")) + + AssertEq(nil, err) + ExpectEq(fmt.Sprintf("Today is %s.", now.Weekday().String()), string(slice)) +} + +func (t *DynamicFSTest) ReadFile_WeekdayCreateTime() { + t.Clock.SetTime(gCreateTime) + slice, err := ioutil.ReadFile(path.Join(t.Dir, "weekday")) + + AssertEq(nil, err) + ExpectEq(fmt.Sprintf("Today is %s.", gCreateTime.Weekday().String()), string(slice)) +} + +func (t *DynamicFSTest) ReadFile_AgeUnchangedForHandle() { + t.Clock.SetTime(gCreateTime.Add(100 * time.Second)) + var err error + var file *os.File + file, err = os.Open(path.Join(t.Dir, "age")) + AssertEq(nil, err) + + // Ensure that all reads from the same handle return the contents created at + // file open time. + func(file *os.File) { + defer file.Close() + + var expectedContents string + var buffer bytes.Buffer + var bytesRead int64 + + expectedContents = "This filesystem is 100 seconds old." + bytesRead, err = buffer.ReadFrom(file) + AssertEq(nil, err) + ExpectEq(len(expectedContents), bytesRead) + ExpectEq(expectedContents, buffer.String()) + + t.Clock.SetTime(gCreateTime.Add(1000 * time.Second)) + // Seek back to the beginning of the file. The contents should be unchanged + // for the life of the file handle. + _, err = file.Seek(0, 0) + AssertEq(nil, err) + + buffer.Reset() + bytesRead, err = buffer.ReadFrom(file) + AssertEq(nil, err) + ExpectEq(len(expectedContents), bytesRead) + ExpectEq(expectedContents, buffer.String()) + }(file) + + // The clock was advanced while the old handle was open. The content change + // should be reflected by the new handle. + file, err = os.Open(path.Join(t.Dir, "age")) + AssertEq(nil, err) + func(file *os.File) { + defer file.Close() + + expectedContents := "This filesystem is 1000 seconds old." + buffer := bytes.Buffer{} + bytesRead, err := buffer.ReadFrom(file) + AssertEq(nil, err) + ExpectEq(len(expectedContents), bytesRead) + ExpectEq(expectedContents, buffer.String()) + }(file) +}