clientv3: send create event over outc
parent
8825392da2
commit
0a3d45a307
|
@ -771,6 +771,36 @@ func TestWatchWithCreatedNotification(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestWatchWithCreatedNotificationDropConn ensures that
|
||||
// a watcher with created notify does not post duplicate
|
||||
// created events from disconnect.
|
||||
func TestWatchWithCreatedNotificationDropConn(t *testing.T) {
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
client := cluster.RandClient()
|
||||
|
||||
wch := client.Watch(context.Background(), "a", clientv3.WithCreatedNotify())
|
||||
|
||||
resp := <-wch
|
||||
|
||||
if !resp.Created {
|
||||
t.Fatalf("expected created event, got %v", resp)
|
||||
}
|
||||
|
||||
cluster.Members[0].DropConnections()
|
||||
|
||||
// try to receive from watch channel again
|
||||
// ensure it doesn't post another createNotify
|
||||
select {
|
||||
case wresp := <-wch:
|
||||
t.Fatalf("got unexpected watch response: %+v\n", wresp)
|
||||
case <-time.After(time.Second):
|
||||
// watcher may not reconnect by the time it hits the select,
|
||||
// so it wouldn't have a chance to filter out the second create event
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatchCancelOnServer ensures client watcher cancels propagate back to the server.
|
||||
func TestWatchCancelOnServer(t *testing.T) {
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
|
|
|
@ -586,17 +586,6 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
|
|||
curWr := emptyWr
|
||||
outc := ws.outc
|
||||
|
||||
if len(ws.buf) > 0 && ws.buf[0].Created {
|
||||
select {
|
||||
case ws.initReq.retc <- ws.outc:
|
||||
// send first creation event and only if requested
|
||||
if !ws.initReq.createdNotify {
|
||||
ws.buf = ws.buf[1:]
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
if len(ws.buf) > 0 {
|
||||
curWr = ws.buf[0]
|
||||
} else {
|
||||
|
@ -614,13 +603,36 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
|
|||
// shutdown from closeSubstream
|
||||
return
|
||||
}
|
||||
// TODO pause channel if buffer gets too large
|
||||
ws.buf = append(ws.buf, wr)
|
||||
|
||||
if wr.Created {
|
||||
if ws.initReq.retc != nil {
|
||||
ws.initReq.retc <- ws.outc
|
||||
// to prevent next write from taking the slot in buffered channel
|
||||
// and posting duplicate create events
|
||||
ws.initReq.retc = nil
|
||||
|
||||
// send first creation event only if requested
|
||||
if ws.initReq.createdNotify {
|
||||
ws.outc <- *wr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nextRev = wr.Header.Revision
|
||||
if len(wr.Events) > 0 {
|
||||
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
|
||||
}
|
||||
ws.initReq.rev = nextRev
|
||||
|
||||
// created event is already sent above,
|
||||
// watcher should not post duplicate events
|
||||
if wr.Created {
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO pause channel if buffer gets too large
|
||||
ws.buf = append(ws.buf, wr)
|
||||
|
||||
case <-ws.initReq.ctx.Done():
|
||||
return
|
||||
case <-resumec:
|
||||
|
|
Loading…
Reference in New Issue