grpcproxy: do not send duplicate events to watchers

release-3.1
Xiang Li 2016-07-18 22:05:39 -07:00
parent fcc96c9ebd
commit 783675f91c
3 changed files with 13 additions and 11 deletions

View File

@ -29,6 +29,7 @@ type watcher struct {
id int64
wr watchRange
rev int64
filters []mvcc.FilterFunc
progress bool
ch chan<- *pb.WatchResponse
@ -39,15 +40,17 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
return
}
// todo: filter out the events that this watcher already seen.
events := make([]*mvccpb.Event, 0, len(wr.Events))
for i := range wr.Events {
filtered := false
ev := (*mvccpb.Event)(wr.Events[i])
if ev.Kv.ModRevision <= w.rev {
continue
} else {
w.rev = ev.Kv.ModRevision
}
filtered := false
if len(w.filters) != 0 {
for _, filter := range w.filters {
if filter(*ev) {

View File

@ -69,7 +69,7 @@ func (wgs *watchergroups) maybeJoinWatcherSingle(rid receiverID, ws watcherSingl
gropu, ok := wgs.groups[ws.w.wr]
if ok {
if ws.rev >= gropu.rev {
if ws.w.rev >= gropu.rev {
gropu.add(receiverID{streamID: ws.sws.id, watcherID: ws.w.id}, ws.w)
return true
}

View File

@ -31,8 +31,7 @@ type watcherSingle struct {
w watcher
rev int64 // current revision
lastSeenRev int64
lastStoreRev int64 // last seen revision of the remote mvcc store
donec chan struct{}
}
@ -53,9 +52,8 @@ func (ws watcherSingle) run() {
defer close(ws.donec)
for wr := range ws.ch {
ws.rev = wr.Header.Revision
ws.lastStoreRev = wr.Header.Revision
ws.w.send(wr)
ws.lastSeenRev = wr.Events[len(wr.Events)-1].Kv.ModRevision
if ws.sws.maybeCoalesceWatcher(ws) {
return
@ -64,9 +62,10 @@ func (ws watcherSingle) run() {
}
// canPromote returns true if a watcherSingle can promote itself to a watchergroup
// when it already caught up with the current revision.
// when it already caught up with the last seen revision from the response header
// of an etcd server.
func (ws watcherSingle) canPromote() bool {
return ws.rev == ws.lastSeenRev
return ws.w.rev == ws.lastStoreRev
}
func (ws watcherSingle) stop() {