grpcproxy: support prevKV watcher
Makes all server watchers PrevKV, discards if client watcher is not PrevKV.release-3.1
parent
3c97e7a475
commit
6604306398
|
@ -211,6 +211,7 @@ func (wps *watchProxyStream) recvLoop() error {
|
|||
|
||||
nextrev: cr.StartRevision,
|
||||
progress: cr.ProgressNotify,
|
||||
prevKV: cr.PrevKv,
|
||||
filters: v3rpc.FiltersFromRequest(cr),
|
||||
}
|
||||
if !w.wr.valid() {
|
||||
|
|
|
@ -59,6 +59,7 @@ func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast))
|
|||
clientv3.WithProgressNotify(),
|
||||
clientv3.WithCreatedNotify(),
|
||||
clientv3.WithRev(wb.nextrev),
|
||||
clientv3.WithPrevKV(),
|
||||
)
|
||||
for wr := range wch {
|
||||
wb.bcast(wr)
|
||||
|
|
|
@ -37,6 +37,7 @@ type watcher struct {
|
|||
wr watchRange
|
||||
filters []mvcc.FilterFunc
|
||||
progress bool
|
||||
prevKV bool
|
||||
|
||||
// id is the id returned to the client on its watch stream.
|
||||
id int64
|
||||
|
@ -78,18 +79,22 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
|
|||
}
|
||||
|
||||
filtered := false
|
||||
if len(w.filters) != 0 {
|
||||
for _, filter := range w.filters {
|
||||
if filter(*ev) {
|
||||
filtered = true
|
||||
break
|
||||
}
|
||||
for _, filter := range w.filters {
|
||||
if filter(*ev) {
|
||||
filtered = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !filtered {
|
||||
events = append(events, ev)
|
||||
if filtered {
|
||||
continue
|
||||
}
|
||||
|
||||
if !w.prevKV {
|
||||
evCopy := *ev
|
||||
evCopy.PrevKv = nil
|
||||
ev = &evCopy
|
||||
}
|
||||
events = append(events, ev)
|
||||
}
|
||||
|
||||
if lastRev >= w.nextrev {
|
||||
|
|
Loading…
Reference in New Issue