From c3de53c23c0d78de5f3bad1df572c5078d78f49d Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 27 Apr 2016 09:48:54 -0700 Subject: [PATCH] v3rpc: fill lease header --- etcdserver/api/v3rpc/header.go | 3 +++ etcdserver/api/v3rpc/lease.go | 24 ++++++++++++++++++------ etcdserver/apply.go | 4 +++- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/etcdserver/api/v3rpc/header.go b/etcdserver/api/v3rpc/header.go index dcb559219..3101a7d39 100644 --- a/etcdserver/api/v3rpc/header.go +++ b/etcdserver/api/v3rpc/header.go @@ -40,4 +40,7 @@ func (h *header) fill(rh *pb.ResponseHeader) { rh.ClusterId = uint64(h.clusterID) rh.MemberId = uint64(h.memberID) rh.RaftTerm = h.raftTimer.Term() + if rh.Revision == 0 { + rh.Revision = h.rev() + } } diff --git a/etcdserver/api/v3rpc/lease.go b/etcdserver/api/v3rpc/lease.go index b21b1d111..dd8cc529d 100644 --- a/etcdserver/api/v3rpc/lease.go +++ b/etcdserver/api/v3rpc/lease.go @@ -25,11 +25,12 @@ import ( ) type LeaseServer struct { - le etcdserver.Lessor + hdr header + le etcdserver.Lessor } -func NewLeaseServer(le etcdserver.Lessor) pb.LeaseServer { - return &LeaseServer{le: le} +func NewLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer { + return &LeaseServer{le: s, hdr: newHeader(s)} } func (ls *LeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { @@ -37,15 +38,17 @@ func (ls *LeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) if err == lease.ErrLeaseExists { return nil, rpctypes.ErrLeaseExist } + ls.hdr.fill(resp.Header) return resp, err } func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { - r, err := ls.le.LeaseRevoke(ctx, rr) + resp, err := ls.le.LeaseRevoke(ctx, rr) if err != nil { return nil, rpctypes.ErrLeaseNotFound } - return r, nil + ls.hdr.fill(resp.Header) + return resp, nil } func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error { @@ -58,6 +61,15 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro return err } + // Create header before we sent out the renew request. + // This can make sure that the revision is strictly smaller or equal to + // when the keepalive happened at the local server (when the local server is the leader) + // or remote leader. + // Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded + // at rev 4. + resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}} + ls.hdr.fill(resp.Header) + ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID)) if err == lease.ErrLeaseNotFound { return rpctypes.ErrLeaseNotFound @@ -67,7 +79,7 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro return err } - resp := &pb.LeaseKeepAliveResponse{ID: req.ID, TTL: ttl} + resp.TTL = ttl err = stream.Send(resp) if err != nil { return err diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 0eed6f257..7edfcc971 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -406,13 +406,15 @@ func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantR if err == nil { resp.ID = int64(l.ID) resp.TTL = l.TTL + resp.Header = &pb.ResponseHeader{Revision: a.s.KV().Rev()} } + return resp, err } func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { err := a.s.lessor.Revoke(lease.LeaseID(lc.ID)) - return &pb.LeaseRevokeResponse{}, err + return &pb.LeaseRevokeResponse{Header: &pb.ResponseHeader{Revision: a.s.KV().Rev()}}, err } func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {