Merge pull request #11850 from jackkleeman/cancel-watch
clientv3: cancel watches proactively on client context cancellationrelease-3.5
commit
6d067997e9
|
@ -624,8 +624,9 @@ func (w *watchGrpcStream) run() {
|
|||
},
|
||||
}
|
||||
req := &pb.WatchRequest{RequestUnion: cr}
|
||||
lg.Info("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId))
|
||||
if err := wc.Send(req); err != nil {
|
||||
lg.Warningf("error when sending request: %v", err)
|
||||
lg.Warning("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -649,6 +650,21 @@ func (w *watchGrpcStream) run() {
|
|||
return
|
||||
|
||||
case ws := <-w.closingc:
|
||||
if ws.id != -1 {
|
||||
// client is closing an established watch; close it on the server proactively instead of waiting
|
||||
// to close when the next message arrives
|
||||
cancelSet[ws.id] = struct{}{}
|
||||
cr := &pb.WatchRequest_CancelRequest{
|
||||
CancelRequest: &pb.WatchCancelRequest{
|
||||
WatchId: ws.id,
|
||||
},
|
||||
}
|
||||
req := &pb.WatchRequest{RequestUnion: cr}
|
||||
lg.Info("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id))
|
||||
if err := wc.Send(req); err != nil {
|
||||
lg.Warning("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err))
|
||||
}
|
||||
}
|
||||
w.closeSubstream(ws)
|
||||
delete(closing, ws)
|
||||
// no more watchers on this stream, shutdown
|
||||
|
|
|
@ -1213,3 +1213,35 @@ func TestV3WatchWithPrevKV(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3WatchCancellation ensures that watch cancellation frees up server resources.
|
||||
func TestV3WatchCancellation(t *testing.T) {
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
cli := clus.RandClient()
|
||||
|
||||
// increment watcher total count and keep a stream open
|
||||
cli.Watch(ctx, "/foo")
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
cli.Watch(ctx, "/foo")
|
||||
cancel()
|
||||
}
|
||||
|
||||
// Wait a little for cancellations to take hold
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
minWatches, err := clus.Members[0].Metric("etcd_debugging_mvcc_watcher_total")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if minWatches != "1" {
|
||||
t.Fatalf("expected one watch, got %s", minWatches)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue