diff --git a/etcdserver/api/v3rpc/lease.go b/etcdserver/api/v3rpc/lease.go index ae1778788..be6e20b97 100644 --- a/etcdserver/api/v3rpc/lease.go +++ b/etcdserver/api/v3rpc/lease.go @@ -79,14 +79,14 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}} ls.hdr.fill(resp.Header) - ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID)) + ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID)) if err == lease.ErrLeaseNotFound { err = nil ttl = 0 } if err != nil { - return err + return togRPCError(err) } resp.TTL = ttl diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index e18bac950..ac80f3855 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -17,7 +17,6 @@ package etcdserver import ( "bytes" "encoding/binary" - "io" "strconv" "strings" "time" @@ -27,7 +26,6 @@ import ( "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/lease/leasehttp" - "github.com/coreos/etcd/lease/leasepb" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/raft" @@ -70,7 +68,7 @@ type Lessor interface { // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error // is returned. - LeaseRenew(id lease.LeaseID) (int64, error) + LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) // LeaseTimeToLive retrieves lease information. LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) @@ -306,7 +304,7 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) return result.resp.(*pb.LeaseRevokeResponse), nil } -func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) { +func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { ttl, err := s.lessor.Renew(id) if err == nil { // already requested to primary lessor(leader) return ttl, nil @@ -315,21 +313,24 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) { return -1, err } - // renewals don't go through raft; forward to leader manually - leader, err := s.waitLeader() - if err != nil { - return -1, err - } + cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) + defer cancel() - for _, url := range leader.PeerURLs { - lurl := url + leasehttp.LeasePrefix - ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout()) - if err == nil { - break + // renewals don't go through raft; forward to leader manually + for cctx.Err() == nil && err != nil { + leader, lerr := s.waitLeader(cctx) + if lerr != nil { + return -1, lerr + } + for _, url := range leader.PeerURLs { + lurl := url + leasehttp.LeasePrefix + ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) + if err == nil || err == lease.ErrLeaseNotFound { + return ttl, err + } } - err = convertEOFToNoLeader(err) } - return ttl, err + return -1, ErrTimeout } func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { @@ -352,39 +353,32 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR return resp, nil } - // manually request to leader - leader, err := s.waitLeader() - if err != nil { - return nil, err - } + cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) + defer cancel() - for _, url := range leader.PeerURLs { - lurl := url + leasehttp.LeaseInternalPrefix - var iresp *leasepb.LeaseInternalResponse - iresp, err = leasehttp.TimeToLiveHTTP(ctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt) - if err == nil { - return iresp.LeaseTimeToLiveResponse, nil + // forward to leader + for cctx.Err() == nil { + leader, err := s.waitLeader(cctx) + if err != nil { + return nil, err + } + for _, url := range leader.PeerURLs { + lurl := url + leasehttp.LeaseInternalPrefix + resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt) + if err == nil { + return resp.LeaseTimeToLiveResponse, nil + } + if err == lease.ErrLeaseNotFound { + return nil, err + } } - err = convertEOFToNoLeader(err) } - return nil, err + return nil, ErrTimeout } -// convertEOFToNoLeader converts EOF erros to ErrNoLeader because -// lease renew, timetolive requests to followers are forwarded to leader, -// and follower might not be able to reach leader from transient network -// errors (often EOF errors). By returning ErrNoLeader, signal clients -// to retry its requests. -func convertEOFToNoLeader(err error) error { - if err == io.EOF || err == io.ErrUnexpectedEOF { - return ErrNoLeader - } - return err -} - -func (s *EtcdServer) waitLeader() (*membership.Member, error) { +func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) { leader := s.cluster.Member(s.Leader()) - for i := 0; i < 5 && leader == nil; i++ { + for leader == nil { // wait an election dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond select { @@ -392,6 +386,8 @@ func (s *EtcdServer) waitLeader() (*membership.Member, error) { leader = s.cluster.Member(s.Leader()) case <-s.stopping: return nil, ErrStopped + case <-ctx.Done(): + return nil, ErrNoLeader } } if leader == nil || len(leader.PeerURLs) == 0 { diff --git a/lease/leasehttp/http.go b/lease/leasehttp/http.go index 4f649746a..06eb93565 100644 --- a/lease/leasehttp/http.go +++ b/lease/leasehttp/http.go @@ -19,7 +19,6 @@ import ( "fmt" "io/ioutil" "net/http" - "time" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease" @@ -125,15 +124,22 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // RenewHTTP renews a lease at a given primary server. // TODO: Batch request in future? -func RenewHTTP(id lease.LeaseID, url string, rt http.RoundTripper, timeout time.Duration) (int64, error) { +func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundTripper) (int64, error) { // will post lreq protobuf to leader lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal() if err != nil { return -1, err } - cc := &http.Client{Transport: rt, Timeout: timeout} - resp, err := cc.Post(url, "application/protobuf", bytes.NewReader(lreq)) + cc := &http.Client{Transport: rt} + req, err := http.NewRequest("POST", url, bytes.NewReader(lreq)) + if err != nil { + return -1, err + } + req.Header.Set("Content-Type", "application/protobuf") + req.Cancel = ctx.Done() + + resp, err := cc.Do(req) if err != nil { return -1, err } diff --git a/lease/leasehttp/http_test.go b/lease/leasehttp/http_test.go index 93113573b..6219a4a2c 100644 --- a/lease/leasehttp/http_test.go +++ b/lease/leasehttp/http_test.go @@ -41,7 +41,7 @@ func TestRenewHTTP(t *testing.T) { ts := httptest.NewServer(NewHandler(le)) defer ts.Close() - ttl, err := RenewHTTP(l.ID, ts.URL+LeasePrefix, http.DefaultTransport, time.Second) + ttl, err := RenewHTTP(context.TODO(), l.ID, ts.URL+LeasePrefix, http.DefaultTransport) if err != nil { t.Fatal(err) }