Merge pull request #6842 from heyitsanthony/watch-prevkv

grpcproxy: support prevKV watcher
release-3.1
Anthony Romano 2016-11-11 16:32:51 -08:00 committed by GitHub
commit a2e86c1371
3 changed files with 16 additions and 9 deletions

View File

@ -211,6 +211,7 @@ func (wps *watchProxyStream) recvLoop() error {
nextrev: cr.StartRevision, nextrev: cr.StartRevision,
progress: cr.ProgressNotify, progress: cr.ProgressNotify,
prevKV: cr.PrevKv,
filters: v3rpc.FiltersFromRequest(cr), filters: v3rpc.FiltersFromRequest(cr),
} }
if !w.wr.valid() { if !w.wr.valid() {

View File

@ -59,6 +59,7 @@ func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast))
clientv3.WithProgressNotify(), clientv3.WithProgressNotify(),
clientv3.WithCreatedNotify(), clientv3.WithCreatedNotify(),
clientv3.WithRev(wb.nextrev), clientv3.WithRev(wb.nextrev),
clientv3.WithPrevKV(),
) )
for wr := range wch { for wr := range wch {
wb.bcast(wr) wb.bcast(wr)

View File

@ -37,6 +37,7 @@ type watcher struct {
wr watchRange wr watchRange
filters []mvcc.FilterFunc filters []mvcc.FilterFunc
progress bool progress bool
prevKV bool
// id is the id returned to the client on its watch stream. // id is the id returned to the client on its watch stream.
id int64 id int64
@ -78,18 +79,22 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
} }
filtered := false filtered := false
if len(w.filters) != 0 { for _, filter := range w.filters {
for _, filter := range w.filters { if filter(*ev) {
if filter(*ev) { filtered = true
filtered = true break
break
}
} }
} }
if filtered {
if !filtered { continue
events = append(events, ev)
} }
if !w.prevKV {
evCopy := *ev
evCopy.PrevKv = nil
ev = &evCopy
}
events = append(events, ev)
} }
if lastRev >= w.nextrev { if lastRev >= w.nextrev {