Merge pull request #5090 from ajityagaty/lease_id
clientv3: Use LeaseID in all the client APIs.release-3.0
commit
c1455a4f10
|
@ -39,7 +39,7 @@ func ExampleLease_grant() {
|
|||
}
|
||||
|
||||
// after 5 seconds, the key 'foo' will be removed
|
||||
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(clientv3.LeaseID(resp.ID)))
|
||||
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
@ -60,13 +60,13 @@ func ExampleLease_revoke() {
|
|||
log.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(clientv3.LeaseID(resp.ID)))
|
||||
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// revoking lease expires the key attached to its lease ID
|
||||
_, err = cli.Revoke(context.TODO(), clientv3.LeaseID(resp.ID))
|
||||
_, err = cli.Revoke(context.TODO(), resp.ID)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
@ -94,13 +94,13 @@ func ExampleLease_keepAlive() {
|
|||
log.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(clientv3.LeaseID(resp.ID)))
|
||||
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// the key 'foo' will be kept forever
|
||||
_, err = cli.KeepAlive(context.TODO(), clientv3.LeaseID(resp.ID))
|
||||
_, err = cli.KeepAlive(context.TODO(), resp.ID)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
@ -121,13 +121,13 @@ func ExampleLease_keepAliveOnce() {
|
|||
log.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(clientv3.LeaseID(resp.ID)))
|
||||
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// to renew the lease only once
|
||||
_, err = cli.KeepAliveOnce(context.TODO(), clientv3.LeaseID(resp.ID))
|
||||
_, err = cli.KeepAliveOnce(context.TODO(), resp.ID)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -24,12 +24,25 @@ import (
|
|||
)
|
||||
|
||||
type (
|
||||
LeaseGrantResponse pb.LeaseGrantResponse
|
||||
LeaseRevokeResponse pb.LeaseRevokeResponse
|
||||
LeaseKeepAliveResponse pb.LeaseKeepAliveResponse
|
||||
LeaseID int64
|
||||
LeaseRevokeResponse pb.LeaseRevokeResponse
|
||||
LeaseID int64
|
||||
)
|
||||
|
||||
// LeaseGrantResponse is used to convert the protobuf grant response.
|
||||
type LeaseGrantResponse struct {
|
||||
*pb.ResponseHeader
|
||||
ID LeaseID
|
||||
TTL int64
|
||||
Error string
|
||||
}
|
||||
|
||||
// LeaseKeepAliveResponse is used to convert the protobuf keepalive response.
|
||||
type LeaseKeepAliveResponse struct {
|
||||
*pb.ResponseHeader
|
||||
ID LeaseID
|
||||
TTL int64
|
||||
}
|
||||
|
||||
const (
|
||||
// a small buffer to store unsent lease responses.
|
||||
leaseResponseChSize = 16
|
||||
|
@ -112,7 +125,13 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
|
|||
r := &pb.LeaseGrantRequest{TTL: ttl}
|
||||
resp, err := l.getRemote().LeaseGrant(cctx, r)
|
||||
if err == nil {
|
||||
return (*LeaseGrantResponse)(resp), nil
|
||||
gresp := &LeaseGrantResponse{
|
||||
ResponseHeader: resp.GetHeader(),
|
||||
ID: LeaseID(resp.ID),
|
||||
TTL: resp.TTL,
|
||||
Error: resp.Error,
|
||||
}
|
||||
return gresp, nil
|
||||
}
|
||||
if isHalted(cctx, err) {
|
||||
return nil, err
|
||||
|
@ -245,7 +264,13 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
|
|||
if rerr != nil {
|
||||
return nil, rerr
|
||||
}
|
||||
return (*LeaseKeepAliveResponse)(resp), nil
|
||||
|
||||
karesp := &LeaseKeepAliveResponse{
|
||||
ResponseHeader: resp.GetHeader(),
|
||||
ID: LeaseID(resp.ID),
|
||||
TTL: resp.TTL,
|
||||
}
|
||||
return karesp, nil
|
||||
}
|
||||
|
||||
func (l *lessor) recvKeepAliveLoop() {
|
||||
|
@ -286,28 +311,32 @@ func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
|
|||
|
||||
// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
|
||||
func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
|
||||
id := LeaseID(resp.ID)
|
||||
karesp := &LeaseKeepAliveResponse{
|
||||
ResponseHeader: resp.GetHeader(),
|
||||
ID: LeaseID(resp.ID),
|
||||
TTL: resp.TTL,
|
||||
}
|
||||
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
ka, ok := l.keepAlives[id]
|
||||
ka, ok := l.keepAlives[karesp.ID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if resp.TTL <= 0 {
|
||||
if karesp.TTL <= 0 {
|
||||
// lease expired; close all keep alive channels
|
||||
delete(l.keepAlives, id)
|
||||
delete(l.keepAlives, karesp.ID)
|
||||
ka.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// send update to all channels
|
||||
nextDeadline := time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second)
|
||||
nextDeadline := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second)
|
||||
for _, ch := range ka.chs {
|
||||
select {
|
||||
case ch <- (*LeaseKeepAliveResponse)(resp):
|
||||
case ch <- karesp:
|
||||
ka.deadline = nextDeadline
|
||||
default:
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue