From d22c1c64ae3d0ae3858c67fb71f7b7631986a25a Mon Sep 17 00:00:00 2001 From: Aaron Jacobs Date: Tue, 5 May 2015 10:41:09 +1000 Subject: [PATCH] Arrange for a cancellation channel. --- connection.go | 40 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/connection.go b/connection.go index 56ed8a4..8b6ff52 100644 --- a/connection.go +++ b/connection.go @@ -38,6 +38,14 @@ type Connection struct { // For logging purposes only. nextOpID uint32 + + mu sync.Mutex + + // A map from bazilfuse request ID (*not* the op ID for logging used above) + // to a function that cancel's its associated context. + // + // GUARDED_BY(mu) + cancelFuncs map[bazilfuse.RequestID]func() } // Responsibility for closing the wrapped connection is transferred to the @@ -85,21 +93,47 @@ func (c *Connection) log( c.logger.Println(msg) } +// LOCKS_EXCLUDED(c.mu) +func (c *Connection) recordCancelFunc( + reqID bazilfuse.RequestID, + f func()) + // Set up state for an op that is about to be returned to the user, given its // bazilfuse request ID. // // Return a context that should be used for the op. +// +// LOCKS_EXCLUDED(c.mu) func (c *Connection) beginOp(reqID bazilfuse.RequestID) (ctx context.Context) { + // Note that the op is in flight. c.opsInFlight.Add(1) - // TODO(jacobsa): Use WithCancel and stash a cancellation function. - ctx = c.parentCtx + // Set up a cancellation function. + ctx, cancel := context.WithCancel(c.parentCtx) + c.recordCancelFunc(reqID, cancel) return } // Clean up all state associated with an op to which the user has responded. +// +// LOCKS_EXCLUDED(c.mu) func (c *Connection) finishOp(reqID bazilfuse.RequestID) { + c.mu.Lock() + defer c.mu.Unlock() + + // Even though the op is finished, context.WithCancel requires us to arrange + // for the cancellation function to be invoked. We also must remove it from + // our map. + cancel, ok := c.cancelFuncs[reqID] + if !ok { + panic(fmt.Sprintf("Unknown request ID in finishOp: %v", reqID)) + } + + cancel() + delete(c.cancelFuncs, reqID) + + // Decrement the in-flight counter. c.opsInFlight.Done() } @@ -108,6 +142,8 @@ func (c *Connection) finishOp(reqID bazilfuse.RequestID) { // // This function delivers ops in exactly the order they are received from // /dev/fuse. It must not be called multiple times concurrently. +// +// LOCKS_EXCLUDED(c.mu) func (c *Connection) ReadOp() (op fuseops.Op, err error) { // Keep going until we find a request we know how to convert. for {