From 100c7af28ac0f34e23e3b765c159779404d4a25e Mon Sep 17 00:00:00 2001 From: Aaron Jacobs Date: Tue, 9 Jun 2015 10:47:35 +1000 Subject: [PATCH 1/3] Put a wait group in fileSystemServer. --- fuseutil/file_system.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/fuseutil/file_system.go b/fuseutil/file_system.go index 511e0a7..40650a8 100644 --- a/fuseutil/file_system.go +++ b/fuseutil/file_system.go @@ -18,6 +18,7 @@ import ( "flag" "io" "math/rand" + "sync" "time" "github.com/jacobsa/fuse" @@ -73,11 +74,14 @@ type FileSystem interface { // cf. http://goo.gl/jnkHPO, fuse-devel thread "Fuse guarantees on concurrent // requests"). func NewFileSystemServer(fs FileSystem) fuse.Server { - return fileSystemServer{fs} + return fileSystemServer{ + fs: fs, + } } type fileSystemServer struct { - fs FileSystem + fs FileSystem + opsInFlight sync.WaitGroup } func (s fileSystemServer) ServeOps(c *fuse.Connection) { @@ -91,6 +95,7 @@ func (s fileSystemServer) ServeOps(c *fuse.Connection) { panic(err) } + s.opsInFlight.Add(1) go s.handleOp(op) } } @@ -171,4 +176,5 @@ func (s fileSystemServer) handleOp(op fuseops.Op) { } op.Respond(err) + s.opsInFlight.Done() } From 5ae9856c295dd4afd7892036a59d8bb6314c6f5d Mon Sep 17 00:00:00 2001 From: Aaron Jacobs Date: Tue, 9 Jun 2015 11:01:11 +1000 Subject: [PATCH 2/3] Define that Server.ServeOps must wait for in-flights. --- fuseutil/file_system.go | 11 +++++++---- mounted_file_system.go | 3 ++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/fuseutil/file_system.go b/fuseutil/file_system.go index 40650a8..b8d894e 100644 --- a/fuseutil/file_system.go +++ b/fuseutil/file_system.go @@ -74,7 +74,7 @@ type FileSystem interface { // cf. http://goo.gl/jnkHPO, fuse-devel thread "Fuse guarantees on concurrent // requests"). func NewFileSystemServer(fs FileSystem) fuse.Server { - return fileSystemServer{ + return &fileSystemServer{ fs: fs, } } @@ -84,7 +84,9 @@ type fileSystemServer struct { opsInFlight sync.WaitGroup } -func (s fileSystemServer) ServeOps(c *fuse.Connection) { +func (s *fileSystemServer) ServeOps(c *fuse.Connection) { + defer s.opsInFlight.Wait() + for { op, err := c.ReadOp() if err == io.EOF { @@ -100,7 +102,9 @@ func (s fileSystemServer) ServeOps(c *fuse.Connection) { } } -func (s fileSystemServer) handleOp(op fuseops.Op) { +func (s *fileSystemServer) handleOp(op fuseops.Op) { + defer s.opsInFlight.Done() + // Delay if requested. if *fRandomDelays { const delayLimit = 100 * time.Microsecond @@ -176,5 +180,4 @@ func (s fileSystemServer) handleOp(op fuseops.Op) { } op.Respond(err) - s.opsInFlight.Done() } diff --git a/mounted_file_system.go b/mounted_file_system.go index 6b52ad4..4055a3d 100644 --- a/mounted_file_system.go +++ b/mounted_file_system.go @@ -26,7 +26,8 @@ import ( // A type that knows how to serve ops read from a connection. type Server interface { - // Read and serve ops from the supplied connection until EOF. + // Read and serve ops from the supplied connection until EOF. Do not return + // until all operations have been responded to. ServeOps(*Connection) } From 6c85a93914510381d3d2a69d6480402d8eddb223 Mon Sep 17 00:00:00 2001 From: Aaron Jacobs Date: Tue, 9 Jun 2015 11:02:08 +1000 Subject: [PATCH 3/3] Removed synchronization from Connection.Close. --- connection.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/connection.go b/connection.go index 11543f8..9170f58 100644 --- a/connection.go +++ b/connection.go @@ -43,7 +43,6 @@ type Connection struct { debugLogger *log.Logger errorLogger *log.Logger wrapped *bazilfuse.Conn - opsInFlight sync.WaitGroup // The context from which all op contexts inherit. parentCtx context.Context @@ -217,9 +216,6 @@ func (c *Connection) beginOp( bfReq bazilfuse.Request) (ctx context.Context) { reqID := bfReq.Hdr().ID - // Note that the op is in flight. - c.opsInFlight.Add(1) - // Choose a parent context. ctx = c.maybeTraceByPID(int(bfReq.Hdr().Pid)) @@ -267,9 +263,6 @@ func (c *Connection) finishOp(bfReq bazilfuse.Request) { cancel() delete(c.cancelFuncs, reqID) } - - // Decrement the in-flight counter. - c.opsInFlight.Done() } // LOCKS_EXCLUDED(c.mu) @@ -365,9 +358,9 @@ func (c *Connection) waitForReady() (err error) { return } -// Close the connection and wait for in-flight ops. +// Close the connection. Must not be called until operations that were read +// from the connection have been responded to. func (c *Connection) close() (err error) { err = c.wrapped.Close() - c.opsInFlight.Wait() return }