storage: publish delete events on lease revocation
parent
53def2dc5e
commit
5a967eb2a0
|
@ -1057,6 +1057,55 @@ func TestV3LeaseCreateByID(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestV3LeaseExpire ensures a key is deleted once a key expires.
|
||||||
|
func TestV3LeaseExpire(t *testing.T) {
|
||||||
|
testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
|
||||||
|
// let lease lapse; wait for deleted key
|
||||||
|
|
||||||
|
wAPI := pb.NewWatchClient(clus.RandConn())
|
||||||
|
wStream, err := wAPI.Watch(context.TODO())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
creq := &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1}
|
||||||
|
wreq := &pb.WatchRequest{CreateRequest: creq}
|
||||||
|
if err := wStream.Send(wreq); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := wStream.Recv(); err != nil {
|
||||||
|
// the 'created' message
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := wStream.Recv(); err != nil {
|
||||||
|
// the 'put' message
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
errc := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
resp, err := wStream.Recv()
|
||||||
|
switch {
|
||||||
|
case err != nil:
|
||||||
|
errc <- err
|
||||||
|
case len(resp.Events) != 1:
|
||||||
|
fallthrough
|
||||||
|
case resp.Events[0].Type != storagepb.DELETE:
|
||||||
|
errc <- fmt.Errorf("expected key delete, got %v", resp)
|
||||||
|
default:
|
||||||
|
errc <- nil
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(15 * time.Second):
|
||||||
|
return fmt.Errorf("lease expiration too slow")
|
||||||
|
case err := <-errc:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// TestV3LeaseKeepAlive ensures keepalive keeps the lease alive.
|
// TestV3LeaseKeepAlive ensures keepalive keeps the lease alive.
|
||||||
func TestV3LeaseKeepAlive(t *testing.T) {
|
func TestV3LeaseKeepAlive(t *testing.T) {
|
||||||
testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
|
testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
|
||||||
|
|
|
@ -68,6 +68,10 @@ func newWatchableStore(b backend.Backend, le lease.Lessor) *watchableStore {
|
||||||
synced: make(map[string]map[*watcher]struct{}),
|
synced: make(map[string]map[*watcher]struct{}),
|
||||||
stopc: make(chan struct{}),
|
stopc: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
if s.le != nil {
|
||||||
|
// use this store as the deleter so revokes trigger watch events
|
||||||
|
s.le.SetRangeDeleter(s)
|
||||||
|
}
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
go s.syncWatchersLoop()
|
go s.syncWatchersLoop()
|
||||||
return s
|
return s
|
||||||
|
|
Loading…
Reference in New Issue