diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 1b544ca41..de2d65cac 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -771,6 +771,36 @@ func TestWatchWithCreatedNotification(t *testing.T) { } } +// TestWatchWithCreatedNotificationDropConn ensures that +// a watcher with created notify does not post duplicate +// created events from disconnect. +func TestWatchWithCreatedNotificationDropConn(t *testing.T) { + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + + client := cluster.RandClient() + + wch := client.Watch(context.Background(), "a", clientv3.WithCreatedNotify()) + + resp := <-wch + + if !resp.Created { + t.Fatalf("expected created event, got %v", resp) + } + + cluster.Members[0].DropConnections() + + // try to receive from watch channel again + // ensure it doesn't post another createNotify + select { + case wresp := <-wch: + t.Fatalf("got unexpected watch response: %+v\n", wresp) + case <-time.After(time.Second): + // watcher may not reconnect by the time it hits the select, + // so it wouldn't have a chance to filter out the second create event + } +} + // TestWatchCancelOnServer ensures client watcher cancels propagate back to the server. func TestWatchCancelOnServer(t *testing.T) { cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) diff --git a/clientv3/watch.go b/clientv3/watch.go index 5281f8f5c..2b0e657ca 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -586,17 +586,6 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{ curWr := emptyWr outc := ws.outc - if len(ws.buf) > 0 && ws.buf[0].Created { - select { - case ws.initReq.retc <- ws.outc: - // send first creation event and only if requested - if !ws.initReq.createdNotify { - ws.buf = ws.buf[1:] - } - default: - } - } - if len(ws.buf) > 0 { curWr = ws.buf[0] } else { @@ -614,13 +603,36 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{ // shutdown from closeSubstream return } - // TODO pause channel if buffer gets too large - ws.buf = append(ws.buf, wr) + + if wr.Created { + if ws.initReq.retc != nil { + ws.initReq.retc <- ws.outc + // to prevent next write from taking the slot in buffered channel + // and posting duplicate create events + ws.initReq.retc = nil + + // send first creation event only if requested + if ws.initReq.createdNotify { + ws.outc <- *wr + } + } + } + nextRev = wr.Header.Revision if len(wr.Events) > 0 { nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1 } ws.initReq.rev = nextRev + + // created event is already sent above, + // watcher should not post duplicate events + if wr.Created { + continue + } + + // TODO pause channel if buffer gets too large + ws.buf = append(ws.buf, wr) + case <-ws.initReq.ctx.Done(): return case <-resumec: