From e40da39143c4eaa52598fd57d5ac0578457d0208 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Sun, 13 Nov 2016 16:51:31 -0800 Subject: [PATCH 1/2] grpcproxy: only coalesce watchers that have received create response Current watchers may have nextrev=0; check response count instead. --- proxy/grpcproxy/watch_broadcasts.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/proxy/grpcproxy/watch_broadcasts.go b/proxy/grpcproxy/watch_broadcasts.go index 38421a448..fc18b7425 100644 --- a/proxy/grpcproxy/watch_broadcasts.go +++ b/proxy/grpcproxy/watch_broadcasts.go @@ -60,8 +60,10 @@ func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) { continue } wbswb.mu.Lock() - // NB: victim lock already held - if wb.nextrev >= wbswb.nextrev && wbswb.nextrev != 0 { + // 1. check if wbswb is behind wb so it won't skip any events in wb + // 2. ensure wbswb started; nextrev == 0 may mean wbswb is waiting + // for a current watcher and expects a create event from the server. + if wb.nextrev >= wbswb.nextrev && wbswb.responses > 0 { for w := range wb.receivers { wbswb.receivers[w] = struct{}{} wbs.watchers[w] = wbswb From 90ea3fbadc86f333899f185d8938799e95c87a60 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 14 Nov 2016 09:06:09 -0800 Subject: [PATCH 2/2] grpcproxy: do not resend create event after leader loss Only set CreateNotify if no watch responses have been received. --- proxy/grpcproxy/watch_broadcast.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/proxy/grpcproxy/watch_broadcast.go b/proxy/grpcproxy/watch_broadcast.go index d88ccfaa2..a82c842fb 100644 --- a/proxy/grpcproxy/watch_broadcast.go +++ b/proxy/grpcproxy/watch_broadcast.go @@ -54,13 +54,20 @@ func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) defer close(wb.donec) // loop because leader loss will close channel for cctx.Err() == nil { - wch := wp.cw.Watch(cctx, w.wr.key, + opts := []clientv3.OpOption{ clientv3.WithRange(w.wr.end), clientv3.WithProgressNotify(), - clientv3.WithCreatedNotify(), clientv3.WithRev(wb.nextrev), clientv3.WithPrevKV(), - ) + } + // The create notification should be the first response; + // if the watch is recreated following leader loss, it + // shouldn't post a second create response to the client. + if wb.responses == 0 { + opts = append(opts, clientv3.WithCreatedNotify()) + } + wch := wp.cw.Watch(cctx, w.wr.key, opts...) + for wr := range wch { wb.bcast(wr) update(wb)