Merge pull request #6321 from gyuho/lease-information

*: lease timetolive
release-3.1
Gyu-Ho Lee 2016-09-09 08:43:28 +09:00 committed by GitHub
commit 0b675845f6
21 changed files with 1723 additions and 368 deletions

View File

@ -59,6 +59,7 @@ for grpc-gateway
| LeaseGrant | LeaseGrantRequest | LeaseGrantResponse | LeaseGrant creates a lease which expires if the server does not receive a keepAlive within a given time to live period. All keys attached to the lease will be expired and deleted if the lease expires. Each expired key generates a delete event in the event history. |
| LeaseRevoke | LeaseRevokeRequest | LeaseRevokeResponse | LeaseRevoke revokes a lease. All keys attached to the lease will expire and be deleted. |
| LeaseKeepAlive | LeaseKeepAliveRequest | LeaseKeepAliveResponse | LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client to the server and streaming keep alive responses from the server to the client. |
| LeaseTimeToLive | LeaseTimeToLiveRequest | LeaseTimeToLiveResponse | LeaseTimeToLive retrieves lease information. |
@ -510,6 +511,27 @@ Empty field.
##### message `LeaseTimeToLiveRequest` (etcdserver/etcdserverpb/rpc.proto)
| Field | Description | Type |
| ----- | ----------- | ---- |
| ID | ID is the lease ID for the lease. | int64 |
| keys | keys is true to query all the keys attached to this lease. | bool |
##### message `LeaseTimeToLiveResponse` (etcdserver/etcdserverpb/rpc.proto)
| Field | Description | Type |
| ----- | ----------- | ---- |
| header | | ResponseHeader |
| ID | ID is the lease ID from the keep alive request. | int64 |
| TTL | TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds. | int64 |
| grantedTTL | GrantedTTL is the initial granted time in seconds upon lease creation/renewal. | int64 |
| keys | Keys is the list of keys attached to this lease. | (slice of) bytes |
##### message `Member` (etcdserver/etcdserverpb/rpc.proto)
| Field | Description | Type |
@ -799,6 +821,22 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive
##### message `LeaseInternalRequest` (lease/leasepb/lease.proto)
| Field | Description | Type |
| ----- | ----------- | ---- |
| LeaseTimeToLiveRequest | | etcdserverpb.LeaseTimeToLiveRequest |
##### message `LeaseInternalResponse` (lease/leasepb/lease.proto)
| Field | Description | Type |
| ----- | ----------- | ---- |
| LeaseTimeToLiveResponse | | etcdserverpb.LeaseTimeToLiveResponse |
##### message `Permission` (auth/authpb/auth.proto)
Permission is a single entity

View File

@ -636,6 +636,33 @@
]
}
},
"/v3alpha/kv/lease/timetolive": {
"post": {
"summary": "LeaseTimeToLive retrieves lease information.",
"operationId": "LeaseTimeToLive",
"responses": {
"200": {
"description": "",
"schema": {
"$ref": "#/definitions/etcdserverpbLeaseTimeToLiveResponse"
}
}
},
"parameters": [
{
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/etcdserverpbLeaseTimeToLiveRequest"
}
}
],
"tags": [
"Lease"
]
}
},
"/v3alpha/kv/put": {
"post": {
"summary": "Put puts the given key into the key-value store.\nA put request increments the revision of the key-value store\nand generates one event in the event history.",
@ -1614,6 +1641,52 @@
}
}
},
"etcdserverpbLeaseTimeToLiveRequest": {
"type": "object",
"properties": {
"ID": {
"type": "string",
"format": "int64",
"description": "ID is the lease ID for the lease."
},
"keys": {
"type": "boolean",
"format": "boolean",
"description": "keys is true to query all the keys attached to this lease."
}
}
},
"etcdserverpbLeaseTimeToLiveResponse": {
"type": "object",
"properties": {
"ID": {
"type": "string",
"format": "int64",
"description": "ID is the lease ID from the keep alive request."
},
"TTL": {
"type": "string",
"format": "int64",
"description": "TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds."
},
"grantedTTL": {
"type": "string",
"format": "int64",
"description": "GrantedTTL is the initial granted time in seconds upon lease creation/renewal."
},
"header": {
"$ref": "#/definitions/etcdserverpbResponseHeader"
},
"keys": {
"type": "array",
"items": {
"type": "string",
"format": "byte"
},
"description": "Keys is the list of keys attached to this lease."
}
}
},
"etcdserverpbMember": {
"type": "object",
"properties": {

View File

@ -15,6 +15,8 @@
package integration
import (
"reflect"
"sort"
"testing"
"time"
@ -455,3 +457,56 @@ func TestLeaseKeepAliveTTLTimeout(t *testing.T) {
clus.Members[0].Restart(t)
}
func TestLeaseTimeToLive(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
lapi := clientv3.NewLease(clus.RandClient())
defer lapi.Close()
resp, err := lapi.Grant(context.Background(), 10)
if err != nil {
t.Errorf("failed to create lease %v", err)
}
kv := clientv3.NewKV(clus.RandClient())
keys := []string{"foo1", "foo2"}
for i := range keys {
if _, err = kv.Put(context.TODO(), keys[i], "bar", clientv3.WithLease(resp.ID)); err != nil {
t.Fatal(err)
}
}
lresp, lerr := lapi.TimeToLive(context.Background(), resp.ID, clientv3.WithAttachedKeys())
if lerr != nil {
t.Fatal(lerr)
}
if lresp.ID != resp.ID {
t.Fatalf("leaseID expected %d, got %d", resp.ID, lresp.ID)
}
if lresp.GrantedTTL != int64(10) {
t.Fatalf("GrantedTTL expected %d, got %d", 10, lresp.GrantedTTL)
}
if lresp.TTL == 0 || lresp.TTL > lresp.GrantedTTL {
t.Fatalf("unexpected TTL %d (granted %d)", lresp.TTL, lresp.GrantedTTL)
}
ks := make([]string, len(lresp.Keys))
for i := range lresp.Keys {
ks[i] = string(lresp.Keys[i])
}
sort.Strings(ks)
if !reflect.DeepEqual(ks, keys) {
t.Fatalf("keys expected %v, got %v", keys, ks)
}
lresp, lerr = lapi.TimeToLive(context.Background(), resp.ID)
if lerr != nil {
t.Fatal(lerr)
}
if len(lresp.Keys) != 0 {
t.Fatalf("unexpected keys %+v", lresp.Keys)
}
}

View File

@ -44,6 +44,21 @@ type LeaseKeepAliveResponse struct {
TTL int64
}
// LeaseTimeToLiveResponse is used to convert the protobuf lease timetolive response.
type LeaseTimeToLiveResponse struct {
*pb.ResponseHeader
ID LeaseID `json:"id"`
// TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds.
TTL int64 `json:"ttl"`
// GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
GrantedTTL int64 `json:"granted-ttl"`
// Keys is the list of keys attached to this lease.
Keys [][]byte `json:"keys"`
}
const (
// defaultTTL is the assumed lease TTL used for the first keepalive
// deadline before the actual TTL is known to the client.
@ -61,6 +76,9 @@ type Lease interface {
// Revoke revokes the given lease.
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
// TimeToLive retrieves the lease information of the given lease ID.
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
// KeepAlive keeps the given lease alive forever.
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
@ -170,6 +188,30 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
}
}
func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
cctx, cancel := context.WithCancel(ctx)
done := cancelWhenStop(cancel, l.stopCtx.Done())
defer close(done)
for {
r := toLeaseTimeToLiveRequest(id, opts...)
resp, err := l.remote.LeaseTimeToLive(cctx, r)
if err == nil {
gresp := &LeaseTimeToLiveResponse{
ResponseHeader: resp.GetHeader(),
ID: LeaseID(resp.ID),
TTL: resp.TTL,
GrantedTTL: resp.GrantedTTL,
Keys: resp.Keys,
}
return gresp, nil
}
if isHaltErr(cctx, err) {
return nil, toErr(cctx, err)
}
}
}
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)

View File

@ -83,7 +83,6 @@ func (op Op) toRequestOp() *pb.RequestOp {
return &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: r}}
case tDeleteRange:
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}}
default:
panic("Unknown Op")
@ -320,3 +319,32 @@ func WithPrevKV() OpOption {
op.prevKV = true
}
}
// LeaseOp represents an Operation that lease can execute.
type LeaseOp struct {
id LeaseID
// for TimeToLive
attachedKeys bool
}
// LeaseOption configures lease operations.
type LeaseOption func(*LeaseOp)
func (op *LeaseOp) applyOpts(opts []LeaseOption) {
for _, opt := range opts {
opt(op)
}
}
// WithAttachedKeys requests lease timetolive API to return
// attached keys of given lease ID.
func WithAttachedKeys() LeaseOption {
return func(op *LeaseOp) { op.attachedKeys = true }
}
func toLeaseTimeToLiveRequest(id LeaseID, opts ...LeaseOption) *pb.LeaseTimeToLiveRequest {
ret := &LeaseOp{id: id}
ret.applyOpts(opts)
return &pb.LeaseTimeToLiveRequest{ID: int64(id), Keys: ret.attachedKeys}
}

View File

@ -21,8 +21,35 @@ import (
"testing"
)
func TestCtlV3LeaseKeepAlive(t *testing.T) { testCtl(t, leaseTestKeepAlive) }
func TestCtlV3LeaseRevoke(t *testing.T) { testCtl(t, leaseTestRevoke) }
func TestCtlV3LeaseGrantTimeToLive(t *testing.T) { testCtl(t, leaseTestGrantTimeToLive) }
func TestCtlV3LeaseKeepAlive(t *testing.T) { testCtl(t, leaseTestKeepAlive) }
func TestCtlV3LeaseRevoke(t *testing.T) { testCtl(t, leaseTestRevoke) }
func leaseTestGrantTimeToLive(cx ctlCtx) {
id, err := ctlV3LeaseGrant(cx, 10)
if err != nil {
cx.t.Fatal(err)
}
cmdArgs := append(cx.PrefixArgs(), "lease", "timetolive", id, "--keys")
proc, err := spawnCmd(cmdArgs)
if err != nil {
cx.t.Fatal(err)
}
line, err := proc.Expect(" granted with TTL(")
if err != nil {
cx.t.Fatal(err)
}
if err = proc.Close(); err != nil {
cx.t.Fatal(err)
}
if !strings.Contains(line, ", attached keys") {
cx.t.Fatalf("expected 'attached keys', got %q", line)
}
if !strings.Contains(line, id) {
cx.t.Fatalf("expected leaseID %q, got %q", id, line)
}
}
func leaseTestKeepAlive(cx ctlCtx) {
// put with TTL 10 seconds and keep-alive

View File

@ -326,6 +326,40 @@ LEASE REVOKE destroys a given lease, deleting all attached keys.
lease 32695410dcc0ca06 revoked
```
### LEASE TIMETOLIVE \<leaseID\>
LEASE TIMETOLIVE retrieves the lease information with the given lease ID.
#### Return value
- On success, prints lease information.
- On failure, prints an error message and returns with a non-zero exit code.
#### Example
```bash
./etcdctl lease grant 500
lease 2d8257079fa1bc0c granted with TTL(500s)
./etcdctl put foo1 bar --lease=2d8257079fa1bc0c
./etcdctl put foo2 bar --lease=2d8257079fa1bc0c
./etcdctl lease timetolive 2d8257079fa1bc0c
lease 2d8257079fa1bc0c granted with TTL(500s), remaining(481s)
./etcdctl lease timetolive 2d8257079fa1bc0c --keys
lease 2d8257079fa1bc0c granted with TTL(500s), remaining(472s), attached keys([foo2 foo1])
./etcdctl lease timetolive 2d8257079fa1bc0c --write-out=json
{"cluster_id":17186838941855831277,"member_id":4845372305070271874,"revision":3,"raft_term":2,"id":3279279168933706764,"ttl":465,"granted-ttl":500,"keys":null}
./etcdctl lease timetolive 2d8257079fa1bc0c --write-out=json --keys
{"cluster_id":17186838941855831277,"member_id":4845372305070271874,"revision":3,"raft_term":2,"id":3279279168933706764,"ttl":459,"granted-ttl":500,"keys":["Zm9vMQ==","Zm9vMg=="]}
```
### LEASE KEEP-ALIVE \<leaseID\>
LEASE KEEP-ALIVE periodically refreshes a lease so it does not expire.

View File

@ -32,6 +32,7 @@ func NewLeaseCommand() *cobra.Command {
lc.AddCommand(NewLeaseGrantCommand())
lc.AddCommand(NewLeaseRevokeCommand())
lc.AddCommand(NewLeaseTimeToLiveCommand())
lc.AddCommand(NewLeaseKeepAliveCommand())
return lc
@ -87,13 +88,9 @@ func leaseRevokeCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitBadArgs, fmt.Errorf("lease revoke command needs 1 argument"))
}
id, err := strconv.ParseInt(args[0], 16, 64)
if err != nil {
ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err))
}
id := leaseFromArgs(args[0])
ctx, cancel := commandCtx(cmd)
_, err = mustClientFromCmd(cmd).Revoke(ctx, v3.LeaseID(id))
_, err := mustClientFromCmd(cmd).Revoke(ctx, id)
cancel()
if err != nil {
ExitWithError(ExitError, fmt.Errorf("failed to revoke lease (%v)\n", err))
@ -101,6 +98,37 @@ func leaseRevokeCommandFunc(cmd *cobra.Command, args []string) {
fmt.Printf("lease %016x revoked\n", id)
}
var timeToLiveKeys bool
// NewLeaseTimeToLiveCommand returns the cobra command for "lease timetolive".
func NewLeaseTimeToLiveCommand() *cobra.Command {
lc := &cobra.Command{
Use: "timetolive <leaseID>",
Short: "Get lease information",
Run: leaseTimeToLiveCommandFunc,
}
lc.Flags().BoolVar(&timeToLiveKeys, "keys", false, "Get keys attached to this lease")
return lc
}
// leaseTimeToLiveCommandFunc executes the "lease timetolive" command.
func leaseTimeToLiveCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
ExitWithError(ExitBadArgs, fmt.Errorf("lease timetolive command needs lease ID as argument"))
}
var opts []v3.LeaseOption
if timeToLiveKeys {
opts = append(opts, v3.WithAttachedKeys())
}
resp, rerr := mustClientFromCmd(cmd).TimeToLive(context.TODO(), leaseFromArgs(args[0]), opts...)
if rerr != nil {
ExitWithError(ExitBadConnection, rerr)
}
display.TimeToLive(*resp, timeToLiveKeys)
}
// NewLeaseKeepAliveCommand returns the cobra command for "lease keep-alive".
func NewLeaseKeepAliveCommand() *cobra.Command {
lc := &cobra.Command{
@ -119,12 +147,8 @@ func leaseKeepAliveCommandFunc(cmd *cobra.Command, args []string) {
ExitWithError(ExitBadArgs, fmt.Errorf("lease keep-alive command needs lease ID as argument"))
}
id, err := strconv.ParseInt(args[0], 16, 64)
if err != nil {
ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err))
}
respc, kerr := mustClientFromCmd(cmd).KeepAlive(context.TODO(), v3.LeaseID(id))
id := leaseFromArgs(args[0])
respc, kerr := mustClientFromCmd(cmd).KeepAlive(context.TODO(), id)
if kerr != nil {
ExitWithError(ExitBadConnection, kerr)
}
@ -134,3 +158,11 @@ func leaseKeepAliveCommandFunc(cmd *cobra.Command, args []string) {
}
fmt.Printf("lease %016x expired or revoked.\n", id)
}
func leaseFromArgs(arg string) v3.LeaseID {
id, err := strconv.ParseInt(arg, 16, 64)
if err != nil {
ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err))
}
return v3.LeaseID(id)
}

View File

@ -35,6 +35,8 @@ type printer interface {
Txn(v3.TxnResponse)
Watch(v3.WatchResponse)
TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool)
MemberList(v3.MemberListResponse)
EndpointStatus([]epStatus)
@ -159,6 +161,18 @@ func (s *simplePrinter) Watch(resp v3.WatchResponse) {
}
}
func (s *simplePrinter) TimeToLive(resp v3.LeaseTimeToLiveResponse, keys bool) {
txt := fmt.Sprintf("lease %016x granted with TTL(%ds), remaining(%ds)", resp.ID, resp.GrantedTTL, resp.TTL)
if keys {
ks := make([]string, len(resp.Keys))
for i := range resp.Keys {
ks[i] = string(resp.Keys[i])
}
txt += fmt.Sprintf(", attached keys(%v)", ks)
}
fmt.Println(txt)
}
func (s *simplePrinter) Alarm(resp v3.AlarmResponse) {
for _, e := range resp.Alarms {
fmt.Printf("%+v\n", e)
@ -203,6 +217,9 @@ func (tp *tablePrinter) Txn(r v3.TxnResponse) {
func (tp *tablePrinter) Watch(r v3.WatchResponse) {
ExitWithError(ExitBadFeature, errors.New("table is not supported as output format"))
}
func (tp *tablePrinter) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) {
ExitWithError(ExitBadFeature, errors.New("table is not supported as output format"))
}
func (tp *tablePrinter) Alarm(r v3.AlarmResponse) {
ExitWithError(ExitBadFeature, errors.New("table is not supported as output format"))
}
@ -236,15 +253,16 @@ func (tp *tablePrinter) DBStatus(r dbstatus) {
type jsonPrinter struct{}
func (p *jsonPrinter) Del(r v3.DeleteResponse) { printJSON(r) }
func (p *jsonPrinter) Get(r v3.GetResponse) { printJSON(r) }
func (p *jsonPrinter) Put(r v3.PutResponse) { printJSON(r) }
func (p *jsonPrinter) Txn(r v3.TxnResponse) { printJSON(r) }
func (p *jsonPrinter) Watch(r v3.WatchResponse) { printJSON(r) }
func (p *jsonPrinter) Alarm(r v3.AlarmResponse) { printJSON(r) }
func (p *jsonPrinter) MemberList(r v3.MemberListResponse) { printJSON(r) }
func (p *jsonPrinter) EndpointStatus(r []epStatus) { printJSON(r) }
func (p *jsonPrinter) DBStatus(r dbstatus) { printJSON(r) }
func (p *jsonPrinter) Del(r v3.DeleteResponse) { printJSON(r) }
func (p *jsonPrinter) Get(r v3.GetResponse) { printJSON(r) }
func (p *jsonPrinter) Put(r v3.PutResponse) { printJSON(r) }
func (p *jsonPrinter) Txn(r v3.TxnResponse) { printJSON(r) }
func (p *jsonPrinter) Watch(r v3.WatchResponse) { printJSON(r) }
func (p *jsonPrinter) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) { printJSON(r) }
func (p *jsonPrinter) Alarm(r v3.AlarmResponse) { printJSON(r) }
func (p *jsonPrinter) MemberList(r v3.MemberListResponse) { printJSON(r) }
func (p *jsonPrinter) EndpointStatus(r []epStatus) { printJSON(r) }
func (p *jsonPrinter) DBStatus(r dbstatus) { printJSON(r) }
func printJSON(v interface{}) {
b, err := json.Marshal(v)
@ -283,6 +301,10 @@ func (p *pbPrinter) Watch(r v3.WatchResponse) {
}
}
func (p *pbPrinter) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) {
ExitWithError(ExitBadFeature, errors.New("only support simple or json as output format"))
}
func (p *pbPrinter) Alarm(r v3.AlarmResponse) {
printPB((*pb.AlarmResponse)(&r))
}

View File

@ -26,7 +26,6 @@ import (
const (
peerMembersPrefix = "/members"
leasesPrefix = "/leases"
)
// NewPeerHandler generates an http.Handler to handle etcd peer requests.
@ -49,7 +48,8 @@ func newPeerHandler(cluster api.Cluster, raftHandler http.Handler, leaseHandler
mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
mux.Handle(peerMembersPrefix, mh)
if leaseHandler != nil {
mux.Handle(leasesPrefix, leaseHandler)
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
}
mux.HandleFunc(versionPath, versionHandler(cluster, serveVersion))
return mux

View File

@ -54,6 +54,15 @@ func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeReques
return resp, nil
}
func (ls *LeaseServer) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
resp, err := ls.le.LeaseTimeToLive(ctx, rr)
if err != nil {
return nil, rpctypes.ErrGRPCLeaseNotFound
}
ls.hdr.fill(resp.Header)
return resp, nil
}
func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
for {
req, err := stream.Recv()

View File

@ -45,6 +45,8 @@
LeaseRevokeResponse
LeaseKeepAliveRequest
LeaseKeepAliveResponse
LeaseTimeToLiveRequest
LeaseTimeToLiveResponse
Member
MemberAddRequest
MemberAddResponse

File diff suppressed because it is too large Load Diff

View File

@ -222,6 +222,19 @@ func request_Lease_LeaseKeepAlive_0(ctx context.Context, marshaler runtime.Marsh
return stream, metadata, nil
}
func request_Lease_LeaseTimeToLive_0(ctx context.Context, marshaler runtime.Marshaler, client LeaseClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq LeaseTimeToLiveRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil {
return nil, metadata, grpc.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.LeaseTimeToLive(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func request_Cluster_MemberAdd_0(ctx context.Context, marshaler runtime.Marshaler, client ClusterClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq MemberAddRequest
var metadata runtime.ServerMetadata
@ -935,6 +948,34 @@ func RegisterLeaseHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc
})
mux.Handle("POST", pattern_Lease_LeaseTimeToLive_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, req)
if err != nil {
runtime.HTTPError(ctx, outboundMarshaler, w, req, err)
}
resp, md, err := request_Lease_LeaseTimeToLive_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, outboundMarshaler, w, req, err)
return
}
forward_Lease_LeaseTimeToLive_0(ctx, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
@ -944,6 +985,8 @@ var (
pattern_Lease_LeaseRevoke_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"v3alpha", "kv", "lease", "revoke"}, ""))
pattern_Lease_LeaseKeepAlive_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v3alpha", "lease", "keepalive"}, ""))
pattern_Lease_LeaseTimeToLive_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"v3alpha", "kv", "lease", "timetolive"}, ""))
)
var (
@ -952,6 +995,8 @@ var (
forward_Lease_LeaseRevoke_0 = runtime.ForwardResponseMessage
forward_Lease_LeaseKeepAlive_0 = runtime.ForwardResponseStream
forward_Lease_LeaseTimeToLive_0 = runtime.ForwardResponseMessage
)
// RegisterClusterHandlerFromEndpoint is same as RegisterClusterHandler but

View File

@ -104,8 +104,15 @@ service Lease {
};
}
// LeaseTimeToLive retrieves lease information.
rpc LeaseTimeToLive(LeaseTimeToLiveRequest) returns (LeaseTimeToLiveResponse) {
option (google.api.http) = {
post: "/v3alpha/kv/lease/timetolive"
body: "*"
};
}
// TODO(xiangli) List all existing Leases?
// TODO(xiangli) Get details information (expirations, leased keys, etc.) of a lease?
}
service Cluster {
@ -658,6 +665,25 @@ message LeaseKeepAliveResponse {
int64 TTL = 3;
}
message LeaseTimeToLiveRequest {
// ID is the lease ID for the lease.
int64 ID = 1;
// keys is true to query all the keys attached to this lease.
bool keys = 2;
}
message LeaseTimeToLiveResponse {
ResponseHeader header = 1;
// ID is the lease ID from the keep alive request.
int64 ID = 2;
// TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds.
int64 TTL = 3;
// GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
int64 grantedTTL = 4;
// Keys is the list of keys attached to this lease.
repeated bytes keys = 5;
}
message Member {
// ID is the member ID for this member.
uint64 ID = 1;

View File

@ -21,8 +21,10 @@ import (
"github.com/coreos/etcd/auth"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/lease/leasepb"
"github.com/coreos/etcd/mvcc"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
@ -59,6 +61,9 @@ type Lessor interface {
// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
// is returned.
LeaseRenew(id lease.LeaseID) (int64, error)
// LeaseTimeToLive retrieves lease information.
LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
}
type Authenticator interface {
@ -219,7 +224,7 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
ttl, err := s.lessor.Renew(id)
if err == nil {
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
if err != lease.ErrNotPrimary {
@ -227,6 +232,61 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
}
// renewals don't go through raft; forward to leader manually
leader, err := s.waitLeader()
if err != nil {
return -1, err
}
for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeasePrefix
ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout())
if err == nil {
break
}
}
return ttl, err
}
func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
if s.Leader() == s.ID() {
// primary; timetolive directly from leader
le := s.lessor.Lookup(lease.LeaseID(r.ID))
if le == nil {
return nil, lease.ErrLeaseNotFound
}
// TODO: fill out ResponseHeader
resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL}
if r.Keys {
ks := le.Keys()
kbs := make([][]byte, len(ks))
for i := range ks {
kbs[i] = []byte(ks[i])
}
resp.Keys = kbs
}
return resp, nil
}
// manually request to leader
leader, err := s.waitLeader()
if err != nil {
return nil, err
}
var lresp *pb.LeaseTimeToLiveResponse
for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeaseInternalPrefix
var iresp *leasepb.LeaseInternalResponse
iresp, err = leasehttp.TimeToLiveHTTP(ctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
if err == nil {
lresp = iresp.LeaseTimeToLiveResponse
break
}
}
return lresp, nil
}
func (s *EtcdServer) waitLeader() (*membership.Member, error) {
leader := s.cluster.Member(s.Leader())
for i := 0; i < 5 && leader == nil; i++ {
// wait an election
@ -235,21 +295,13 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
case <-time.After(dur):
leader = s.cluster.Member(s.Leader())
case <-s.done:
return -1, ErrStopped
return nil, ErrStopped
}
}
if leader == nil || len(leader.PeerURLs) == 0 {
return -1, ErrNoLeader
return nil, ErrNoLeader
}
for _, url := range leader.PeerURLs {
lurl := url + "/leases"
ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout())
if err == nil {
break
}
}
return ttl, err
return leader, nil
}
func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {

View File

@ -23,6 +23,14 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/lease/leasepb"
"github.com/coreos/etcd/pkg/httputil"
"golang.org/x/net/context"
)
var (
LeasePrefix = "/leases"
LeaseInternalPrefix = "/leases/internal"
)
// NewHandler returns an http Handler for lease renewals
@ -44,28 +52,70 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
lreq := pb.LeaseKeepAliveRequest{}
if err := lreq.Unmarshal(b); err != nil {
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
return
}
var v []byte
switch r.URL.Path {
case LeasePrefix:
lreq := pb.LeaseKeepAliveRequest{}
if err := lreq.Unmarshal(b); err != nil {
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
return
}
ttl, err := h.l.Renew(lease.LeaseID(lreq.ID))
if err != nil {
if err == lease.ErrLeaseNotFound {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
ttl, err := h.l.Renew(lease.LeaseID(lreq.ID))
if err != nil {
if err == lease.ErrLeaseNotFound {
http.Error(w, err.Error(), http.StatusNotFound)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// TODO: fill out ResponseHeader
resp := &pb.LeaseKeepAliveResponse{ID: lreq.ID, TTL: ttl}
v, err = resp.Marshal()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
case LeaseInternalPrefix:
lreq := leasepb.LeaseInternalRequest{}
if err := lreq.Unmarshal(b); err != nil {
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
return
}
// TODO: fill out ResponseHeader
resp := &pb.LeaseKeepAliveResponse{ID: lreq.ID, TTL: ttl}
v, err := resp.Marshal()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
if l == nil {
http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
return
}
// TODO: fill out ResponseHeader
resp := &leasepb.LeaseInternalResponse{
LeaseTimeToLiveResponse: &pb.LeaseTimeToLiveResponse{
Header: &pb.ResponseHeader{},
ID: lreq.LeaseTimeToLiveRequest.ID,
TTL: int64(l.Remaining().Seconds()),
GrantedTTL: l.TTL,
},
}
if lreq.LeaseTimeToLiveRequest.Keys {
ks := l.Keys()
kbs := make([][]byte, len(ks))
for i := range ks {
kbs[i] = []byte(ks[i])
}
resp.LeaseTimeToLiveResponse.Keys = kbs
}
v, err = resp.Marshal()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
default:
http.Error(w, fmt.Sprintf("unknown request path %q", r.URL.Path), http.StatusBadRequest)
return
}
@ -111,3 +161,65 @@ func RenewHTTP(id lease.LeaseID, url string, rt http.RoundTripper, timeout time.
}
return lresp.TTL, nil
}
// TimeToLiveHTTP retrieves lease information of the given lease ID.
func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string, rt http.RoundTripper) (*leasepb.LeaseInternalResponse, error) {
// will post lreq protobuf to leader
lreq, err := (&leasepb.LeaseInternalRequest{&pb.LeaseTimeToLiveRequest{ID: int64(id), Keys: keys}}).Marshal()
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/protobuf")
cancel := httputil.RequestCanceler(req)
cc := &http.Client{Transport: rt}
var b []byte
errc := make(chan error)
go func() {
// TODO detect if leader failed and retry?
resp, err := cc.Do(req)
if err != nil {
errc <- err
return
}
b, err = ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
errc <- err
return
}
if resp.StatusCode == http.StatusNotFound {
errc <- lease.ErrLeaseNotFound
return
}
if resp.StatusCode != http.StatusOK {
errc <- fmt.Errorf("lease: unknown error(%s)", string(b))
return
}
errc <- nil
}()
select {
case derr := <-errc:
if derr != nil {
return nil, derr
}
case <-ctx.Done():
cancel()
return nil, ctx.Err()
}
lresp := &leasepb.LeaseInternalResponse{}
if err := lresp.Unmarshal(b); err != nil {
return nil, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b))
}
if lresp.LeaseTimeToLiveResponse.ID != int64(id) {
return nil, fmt.Errorf("lease: renew id mismatch")
}
return lresp, nil
}

View File

@ -10,6 +10,8 @@
It has these top-level messages:
Lease
LeaseInternalRequest
LeaseInternalResponse
*/
package leasepb
@ -19,9 +21,11 @@ import (
proto "github.com/golang/protobuf/proto"
math "math"
io "io"
)
import io "io"
import etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
@ -42,8 +46,28 @@ func (m *Lease) String() string { return proto.CompactTextString(m) }
func (*Lease) ProtoMessage() {}
func (*Lease) Descriptor() ([]byte, []int) { return fileDescriptorLease, []int{0} }
type LeaseInternalRequest struct {
LeaseTimeToLiveRequest *etcdserverpb.LeaseTimeToLiveRequest `protobuf:"bytes,1,opt,name=LeaseTimeToLiveRequest,json=leaseTimeToLiveRequest" json:"LeaseTimeToLiveRequest,omitempty"`
}
func (m *LeaseInternalRequest) Reset() { *m = LeaseInternalRequest{} }
func (m *LeaseInternalRequest) String() string { return proto.CompactTextString(m) }
func (*LeaseInternalRequest) ProtoMessage() {}
func (*LeaseInternalRequest) Descriptor() ([]byte, []int) { return fileDescriptorLease, []int{1} }
type LeaseInternalResponse struct {
LeaseTimeToLiveResponse *etcdserverpb.LeaseTimeToLiveResponse `protobuf:"bytes,1,opt,name=LeaseTimeToLiveResponse,json=leaseTimeToLiveResponse" json:"LeaseTimeToLiveResponse,omitempty"`
}
func (m *LeaseInternalResponse) Reset() { *m = LeaseInternalResponse{} }
func (m *LeaseInternalResponse) String() string { return proto.CompactTextString(m) }
func (*LeaseInternalResponse) ProtoMessage() {}
func (*LeaseInternalResponse) Descriptor() ([]byte, []int) { return fileDescriptorLease, []int{2} }
func init() {
proto.RegisterType((*Lease)(nil), "leasepb.Lease")
proto.RegisterType((*LeaseInternalRequest)(nil), "leasepb.LeaseInternalRequest")
proto.RegisterType((*LeaseInternalResponse)(nil), "leasepb.LeaseInternalResponse")
}
func (m *Lease) Marshal() (data []byte, err error) {
size := m.Size()
@ -73,6 +97,62 @@ func (m *Lease) MarshalTo(data []byte) (int, error) {
return i, nil
}
func (m *LeaseInternalRequest) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *LeaseInternalRequest) MarshalTo(data []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.LeaseTimeToLiveRequest != nil {
data[i] = 0xa
i++
i = encodeVarintLease(data, i, uint64(m.LeaseTimeToLiveRequest.Size()))
n1, err := m.LeaseTimeToLiveRequest.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n1
}
return i, nil
}
func (m *LeaseInternalResponse) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *LeaseInternalResponse) MarshalTo(data []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.LeaseTimeToLiveResponse != nil {
data[i] = 0xa
i++
i = encodeVarintLease(data, i, uint64(m.LeaseTimeToLiveResponse.Size()))
n2, err := m.LeaseTimeToLiveResponse.MarshalTo(data[i:])
if err != nil {
return 0, err
}
i += n2
}
return i, nil
}
func encodeFixed64Lease(data []byte, offset int, v uint64) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)
@ -112,6 +192,26 @@ func (m *Lease) Size() (n int) {
return n
}
func (m *LeaseInternalRequest) Size() (n int) {
var l int
_ = l
if m.LeaseTimeToLiveRequest != nil {
l = m.LeaseTimeToLiveRequest.Size()
n += 1 + l + sovLease(uint64(l))
}
return n
}
func (m *LeaseInternalResponse) Size() (n int) {
var l int
_ = l
if m.LeaseTimeToLiveResponse != nil {
l = m.LeaseTimeToLiveResponse.Size()
n += 1 + l + sovLease(uint64(l))
}
return n
}
func sovLease(x uint64) (n int) {
for {
n++
@ -213,6 +313,172 @@ func (m *Lease) Unmarshal(data []byte) error {
}
return nil
}
func (m *LeaseInternalRequest) Unmarshal(data []byte) error {
l := len(data)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLease
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: LeaseInternalRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: LeaseInternalRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field LeaseTimeToLiveRequest", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLease
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthLease
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.LeaseTimeToLiveRequest == nil {
m.LeaseTimeToLiveRequest = &etcdserverpb.LeaseTimeToLiveRequest{}
}
if err := m.LeaseTimeToLiveRequest.Unmarshal(data[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipLease(data[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthLease
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *LeaseInternalResponse) Unmarshal(data []byte) error {
l := len(data)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLease
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: LeaseInternalResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: LeaseInternalResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field LeaseTimeToLiveResponse", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLease
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthLease
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.LeaseTimeToLiveResponse == nil {
m.LeaseTimeToLiveResponse = &etcdserverpb.LeaseTimeToLiveResponse{}
}
if err := m.LeaseTimeToLiveResponse.Unmarshal(data[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipLease(data[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthLease
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipLease(data []byte) (n int, err error) {
l := len(data)
iNdEx := 0
@ -319,13 +585,20 @@ var (
)
var fileDescriptorLease = []byte{
// 126 bytes of a gzipped FileDescriptorProto
// 239 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0xce, 0x49, 0x4d, 0x2c,
0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x07, 0x73, 0x0a, 0x92, 0xa4, 0x44, 0xd2,
0xf3, 0xd3, 0xf3, 0xc1, 0x62, 0xfa, 0x20, 0x16, 0x44, 0x5a, 0x49, 0x93, 0x8b, 0xd5, 0x07, 0xa4,
0x40, 0x88, 0x8f, 0x8b, 0xc9, 0xd3, 0x45, 0x82, 0x51, 0x81, 0x51, 0x83, 0x39, 0x88, 0x29, 0xd3,
0x45, 0x48, 0x80, 0x8b, 0x39, 0x24, 0xc4, 0x47, 0x82, 0x09, 0x2c, 0xc0, 0x5c, 0x12, 0xe2, 0xe3,
0x24, 0x71, 0xe2, 0xa1, 0x1c, 0xc3, 0x85, 0x87, 0x72, 0x0c, 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78,
0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x8c, 0xc7, 0x72, 0x0c, 0x49, 0x6c, 0x60, 0xb3, 0x8c,
0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0x0d, 0xa0, 0x42, 0x1a, 0x79, 0x00, 0x00, 0x00,
0xf3, 0xd3, 0xf3, 0xc1, 0x62, 0xfa, 0x20, 0x16, 0x44, 0x5a, 0x4a, 0x2d, 0xb5, 0x24, 0x39, 0x45,
0x1f, 0x44, 0x14, 0xa7, 0x16, 0x95, 0xa5, 0x16, 0x21, 0x31, 0x0b, 0x92, 0xf4, 0x8b, 0x0a, 0x92,
0x21, 0xea, 0x94, 0x34, 0xb9, 0x58, 0x7d, 0x40, 0x06, 0x09, 0xf1, 0x71, 0x31, 0x79, 0xba, 0x48,
0x30, 0x2a, 0x30, 0x6a, 0x30, 0x07, 0x31, 0x65, 0xba, 0x08, 0x09, 0x70, 0x31, 0x87, 0x84, 0xf8,
0x48, 0x30, 0x81, 0x05, 0x98, 0x4b, 0x42, 0x7c, 0x94, 0x4a, 0xb8, 0x44, 0xc0, 0x4a, 0x3d, 0xf3,
0x4a, 0x52, 0x8b, 0xf2, 0x12, 0x73, 0x82, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84, 0x62, 0xb8,
0xc4, 0xc0, 0xe2, 0x21, 0x99, 0xb9, 0xa9, 0x21, 0xf9, 0x3e, 0x99, 0x65, 0xa9, 0x50, 0x19, 0xb0,
0x69, 0xdc, 0x46, 0x2a, 0x7a, 0xc8, 0x76, 0xeb, 0x61, 0x57, 0x1b, 0x24, 0x96, 0x83, 0x55, 0x5c,
0xa9, 0x82, 0x4b, 0x14, 0xcd, 0xd6, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0xa1, 0x78, 0x2e, 0x71,
0x0c, 0xa3, 0x20, 0x52, 0x50, 0x7b, 0x55, 0x09, 0xd8, 0x0b, 0x51, 0x1c, 0x24, 0x9e, 0x83, 0x5d,
0xc2, 0x49, 0xe2, 0xc4, 0x43, 0x39, 0x86, 0x0b, 0x0f, 0xe5, 0x18, 0x4e, 0x3c, 0x92, 0x63, 0xbc,
0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x19, 0x8f, 0xe5, 0x18, 0x92, 0xd8, 0xc0, 0x61,
0x67, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x65, 0xaa, 0x74, 0x2e, 0x91, 0x01, 0x00, 0x00,
}

View File

@ -2,6 +2,7 @@ syntax = "proto3";
package leasepb;
import "gogoproto/gogo.proto";
import "etcd/etcdserver/etcdserverpb/rpc.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
@ -13,3 +14,11 @@ message Lease {
int64 ID = 1;
int64 TTL = 2;
}
message LeaseInternalRequest {
etcdserverpb.LeaseTimeToLiveRequest LeaseTimeToLiveRequest = 1;
}
message LeaseInternalResponse {
etcdserverpb.LeaseTimeToLiveResponse LeaseTimeToLiveResponse = 1;
}

View File

@ -480,6 +480,20 @@ func (l *Lease) refresh(extend time.Duration) {
// forever sets the expiry of lease to be forever.
func (l *Lease) forever() { l.expiry = forever }
// Keys returns all the keys attached to the lease.
func (l *Lease) Keys() []string {
keys := make([]string, 0, len(l.itemSet))
for k := range l.itemSet {
keys = append(keys, k.Key)
}
return keys
}
// Remaining returns the remaining time of the lease.
func (l *Lease) Remaining() time.Duration {
return l.expiry.Sub(time.Now())
}
type LeaseItem struct {
Key string
}

View File

@ -41,6 +41,11 @@ func (lp *leaseProxy) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest
return pb.NewLeaseClient(conn).LeaseRevoke(ctx, rr)
}
func (lp *leaseProxy) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
conn := lp.client.ActiveConnection()
return pb.NewLeaseClient(conn).LeaseTimeToLive(ctx, rr)
}
func (lp *leaseProxy) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
conn := lp.client.ActiveConnection()
ctx, cancel := context.WithCancel(stream.Context())