diff --git a/proxy/grpcproxy/watch_broadcast.go b/proxy/grpcproxy/watch_broadcast.go index d88ccfaa2..a82c842fb 100644 --- a/proxy/grpcproxy/watch_broadcast.go +++ b/proxy/grpcproxy/watch_broadcast.go @@ -54,13 +54,20 @@ func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) defer close(wb.donec) // loop because leader loss will close channel for cctx.Err() == nil { - wch := wp.cw.Watch(cctx, w.wr.key, + opts := []clientv3.OpOption{ clientv3.WithRange(w.wr.end), clientv3.WithProgressNotify(), - clientv3.WithCreatedNotify(), clientv3.WithRev(wb.nextrev), clientv3.WithPrevKV(), - ) + } + // The create notification should be the first response; + // if the watch is recreated following leader loss, it + // shouldn't post a second create response to the client. + if wb.responses == 0 { + opts = append(opts, clientv3.WithCreatedNotify()) + } + wch := wp.cw.Watch(cctx, w.wr.key, opts...) + for wr := range wch { wb.bcast(wr) update(wb) diff --git a/proxy/grpcproxy/watch_broadcasts.go b/proxy/grpcproxy/watch_broadcasts.go index 38421a448..fc18b7425 100644 --- a/proxy/grpcproxy/watch_broadcasts.go +++ b/proxy/grpcproxy/watch_broadcasts.go @@ -60,8 +60,10 @@ func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) { continue } wbswb.mu.Lock() - // NB: victim lock already held - if wb.nextrev >= wbswb.nextrev && wbswb.nextrev != 0 { + // 1. check if wbswb is behind wb so it won't skip any events in wb + // 2. ensure wbswb started; nextrev == 0 may mean wbswb is waiting + // for a current watcher and expects a create event from the server. + if wb.nextrev >= wbswb.nextrev && wbswb.responses > 0 { for w := range wb.receivers { wbswb.receivers[w] = struct{}{} wbs.watchers[w] = wbswb