Arrange for a cancellation channel.
parent
4fdecd8417
commit
d22c1c64ae
|
@ -38,6 +38,14 @@ type Connection struct {
|
||||||
|
|
||||||
// For logging purposes only.
|
// For logging purposes only.
|
||||||
nextOpID uint32
|
nextOpID uint32
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
|
||||||
|
// A map from bazilfuse request ID (*not* the op ID for logging used above)
|
||||||
|
// to a function that cancel's its associated context.
|
||||||
|
//
|
||||||
|
// GUARDED_BY(mu)
|
||||||
|
cancelFuncs map[bazilfuse.RequestID]func()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Responsibility for closing the wrapped connection is transferred to the
|
// Responsibility for closing the wrapped connection is transferred to the
|
||||||
|
@ -85,21 +93,47 @@ func (c *Connection) log(
|
||||||
c.logger.Println(msg)
|
c.logger.Println(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LOCKS_EXCLUDED(c.mu)
|
||||||
|
func (c *Connection) recordCancelFunc(
|
||||||
|
reqID bazilfuse.RequestID,
|
||||||
|
f func())
|
||||||
|
|
||||||
// Set up state for an op that is about to be returned to the user, given its
|
// Set up state for an op that is about to be returned to the user, given its
|
||||||
// bazilfuse request ID.
|
// bazilfuse request ID.
|
||||||
//
|
//
|
||||||
// Return a context that should be used for the op.
|
// Return a context that should be used for the op.
|
||||||
|
//
|
||||||
|
// LOCKS_EXCLUDED(c.mu)
|
||||||
func (c *Connection) beginOp(reqID bazilfuse.RequestID) (ctx context.Context) {
|
func (c *Connection) beginOp(reqID bazilfuse.RequestID) (ctx context.Context) {
|
||||||
|
// Note that the op is in flight.
|
||||||
c.opsInFlight.Add(1)
|
c.opsInFlight.Add(1)
|
||||||
|
|
||||||
// TODO(jacobsa): Use WithCancel and stash a cancellation function.
|
// Set up a cancellation function.
|
||||||
ctx = c.parentCtx
|
ctx, cancel := context.WithCancel(c.parentCtx)
|
||||||
|
c.recordCancelFunc(reqID, cancel)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean up all state associated with an op to which the user has responded.
|
// Clean up all state associated with an op to which the user has responded.
|
||||||
|
//
|
||||||
|
// LOCKS_EXCLUDED(c.mu)
|
||||||
func (c *Connection) finishOp(reqID bazilfuse.RequestID) {
|
func (c *Connection) finishOp(reqID bazilfuse.RequestID) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
// Even though the op is finished, context.WithCancel requires us to arrange
|
||||||
|
// for the cancellation function to be invoked. We also must remove it from
|
||||||
|
// our map.
|
||||||
|
cancel, ok := c.cancelFuncs[reqID]
|
||||||
|
if !ok {
|
||||||
|
panic(fmt.Sprintf("Unknown request ID in finishOp: %v", reqID))
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
delete(c.cancelFuncs, reqID)
|
||||||
|
|
||||||
|
// Decrement the in-flight counter.
|
||||||
c.opsInFlight.Done()
|
c.opsInFlight.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,6 +142,8 @@ func (c *Connection) finishOp(reqID bazilfuse.RequestID) {
|
||||||
//
|
//
|
||||||
// This function delivers ops in exactly the order they are received from
|
// This function delivers ops in exactly the order they are received from
|
||||||
// /dev/fuse. It must not be called multiple times concurrently.
|
// /dev/fuse. It must not be called multiple times concurrently.
|
||||||
|
//
|
||||||
|
// LOCKS_EXCLUDED(c.mu)
|
||||||
func (c *Connection) ReadOp() (op fuseops.Op, err error) {
|
func (c *Connection) ReadOp() (op fuseops.Op, err error) {
|
||||||
// Keep going until we find a request we know how to convert.
|
// Keep going until we find a request we know how to convert.
|
||||||
for {
|
for {
|
||||||
|
|
Loading…
Reference in New Issue