diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index d17d3fa36..62e8df4d2 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -211,6 +211,7 @@ func (wps *watchProxyStream) recvLoop() error { nextrev: cr.StartRevision, progress: cr.ProgressNotify, + prevKV: cr.PrevKv, filters: v3rpc.FiltersFromRequest(cr), } if !w.wr.valid() { diff --git a/proxy/grpcproxy/watch_broadcast.go b/proxy/grpcproxy/watch_broadcast.go index c22bf08b9..d88ccfaa2 100644 --- a/proxy/grpcproxy/watch_broadcast.go +++ b/proxy/grpcproxy/watch_broadcast.go @@ -59,6 +59,7 @@ func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) clientv3.WithProgressNotify(), clientv3.WithCreatedNotify(), clientv3.WithRev(wb.nextrev), + clientv3.WithPrevKV(), ) for wr := range wch { wb.bcast(wr) diff --git a/proxy/grpcproxy/watcher.go b/proxy/grpcproxy/watcher.go index 96ff08420..e860a69ce 100644 --- a/proxy/grpcproxy/watcher.go +++ b/proxy/grpcproxy/watcher.go @@ -37,6 +37,7 @@ type watcher struct { wr watchRange filters []mvcc.FilterFunc progress bool + prevKV bool // id is the id returned to the client on its watch stream. id int64 @@ -78,18 +79,22 @@ func (w *watcher) send(wr clientv3.WatchResponse) { } filtered := false - if len(w.filters) != 0 { - for _, filter := range w.filters { - if filter(*ev) { - filtered = true - break - } + for _, filter := range w.filters { + if filter(*ev) { + filtered = true + break } } - - if !filtered { - events = append(events, ev) + if filtered { + continue } + + if !w.prevKV { + evCopy := *ev + evCopy.PrevKv = nil + ev = &evCopy + } + events = append(events, ev) } if lastRev >= w.nextrev {