Moved in-flight waiting from Connection into Server.

For #9.
geesefs-0-30-9
Aaron Jacobs 2015-06-09 11:03:33 +10:00
commit c1d30a3823
3 changed files with 17 additions and 14 deletions

View File

@ -43,7 +43,6 @@ type Connection struct {
debugLogger *log.Logger debugLogger *log.Logger
errorLogger *log.Logger errorLogger *log.Logger
wrapped *bazilfuse.Conn wrapped *bazilfuse.Conn
opsInFlight sync.WaitGroup
// The context from which all op contexts inherit. // The context from which all op contexts inherit.
parentCtx context.Context parentCtx context.Context
@ -217,9 +216,6 @@ func (c *Connection) beginOp(
bfReq bazilfuse.Request) (ctx context.Context) { bfReq bazilfuse.Request) (ctx context.Context) {
reqID := bfReq.Hdr().ID reqID := bfReq.Hdr().ID
// Note that the op is in flight.
c.opsInFlight.Add(1)
// Choose a parent context. // Choose a parent context.
ctx = c.maybeTraceByPID(int(bfReq.Hdr().Pid)) ctx = c.maybeTraceByPID(int(bfReq.Hdr().Pid))
@ -267,9 +263,6 @@ func (c *Connection) finishOp(bfReq bazilfuse.Request) {
cancel() cancel()
delete(c.cancelFuncs, reqID) delete(c.cancelFuncs, reqID)
} }
// Decrement the in-flight counter.
c.opsInFlight.Done()
} }
// LOCKS_EXCLUDED(c.mu) // LOCKS_EXCLUDED(c.mu)
@ -365,9 +358,9 @@ func (c *Connection) waitForReady() (err error) {
return return
} }
// Close the connection and wait for in-flight ops. // Close the connection. Must not be called until operations that were read
// from the connection have been responded to.
func (c *Connection) close() (err error) { func (c *Connection) close() (err error) {
err = c.wrapped.Close() err = c.wrapped.Close()
c.opsInFlight.Wait()
return return
} }

View File

@ -18,6 +18,7 @@ import (
"flag" "flag"
"io" "io"
"math/rand" "math/rand"
"sync"
"time" "time"
"github.com/jacobsa/fuse" "github.com/jacobsa/fuse"
@ -73,14 +74,19 @@ type FileSystem interface {
// cf. http://goo.gl/jnkHPO, fuse-devel thread "Fuse guarantees on concurrent // cf. http://goo.gl/jnkHPO, fuse-devel thread "Fuse guarantees on concurrent
// requests"). // requests").
func NewFileSystemServer(fs FileSystem) fuse.Server { func NewFileSystemServer(fs FileSystem) fuse.Server {
return fileSystemServer{fs} return &fileSystemServer{
fs: fs,
}
} }
type fileSystemServer struct { type fileSystemServer struct {
fs FileSystem fs FileSystem
opsInFlight sync.WaitGroup
} }
func (s fileSystemServer) ServeOps(c *fuse.Connection) { func (s *fileSystemServer) ServeOps(c *fuse.Connection) {
defer s.opsInFlight.Wait()
for { for {
op, err := c.ReadOp() op, err := c.ReadOp()
if err == io.EOF { if err == io.EOF {
@ -91,11 +97,14 @@ func (s fileSystemServer) ServeOps(c *fuse.Connection) {
panic(err) panic(err)
} }
s.opsInFlight.Add(1)
go s.handleOp(op) go s.handleOp(op)
} }
} }
func (s fileSystemServer) handleOp(op fuseops.Op) { func (s *fileSystemServer) handleOp(op fuseops.Op) {
defer s.opsInFlight.Done()
// Delay if requested. // Delay if requested.
if *fRandomDelays { if *fRandomDelays {
const delayLimit = 100 * time.Microsecond const delayLimit = 100 * time.Microsecond

View File

@ -26,7 +26,8 @@ import (
// A type that knows how to serve ops read from a connection. // A type that knows how to serve ops read from a connection.
type Server interface { type Server interface {
// Read and serve ops from the supplied connection until EOF. // Read and serve ops from the supplied connection until EOF. Do not return
// until all operations have been responded to.
ServeOps(*Connection) ServeOps(*Connection)
} }