lease: force leader to apply its pending committed index for lease operations
suppose a lease granting request from a follower goes through and followed by a lease look up or renewal, the leader might not apply the lease grant request locally. So the leader might not find the lease from the lease look up or renewal request which will result lease not found error. To fix this issue, we force the leader to apply its pending commited index before looking up lease. FIX #6978release-3.1
parent
86a43849fb
commit
fef4a79528
|
@ -31,8 +31,9 @@ const (
|
|||
// NewPeerHandler generates an http.Handler to handle etcd peer requests.
|
||||
func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler {
|
||||
var lh http.Handler
|
||||
if l := s.Lessor(); l != nil {
|
||||
lh = leasehttp.NewHandler(l)
|
||||
l := s.Lessor()
|
||||
if l != nil {
|
||||
lh = leasehttp.NewHandler(l, func() <-chan struct{} { return s.ApplyWait() })
|
||||
}
|
||||
return newPeerHandler(s.Cluster(), s.RaftHandler(), lh)
|
||||
}
|
||||
|
|
|
@ -565,6 +565,8 @@ func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler()
|
|||
|
||||
func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor }
|
||||
|
||||
func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }
|
||||
|
||||
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
|
||||
if s.cluster.IsIDRemoved(types.ID(m.From)) {
|
||||
plog.Warningf("reject message from removed member %s", types.ID(m.From).String())
|
||||
|
|
|
@ -233,6 +233,91 @@ func TestV3LeaseExists(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestV3LeaseRenewStress keeps creating lease and renewing it immediately to ensure the renewal goes through.
|
||||
// it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found.
|
||||
// related issue https://github.com/coreos/etcd/issues/6978
|
||||
func TestV3LeaseRenewStress(t *testing.T) {
|
||||
testLeaseStress(t, stressLeaseRenew)
|
||||
}
|
||||
|
||||
// TestV3LeaseTimeToLiveStress keeps creating lease and retriving it immediately to ensure the lease can be retrived.
|
||||
// it was oberserved that the immediate lease retrival after granting a lease from follower resulted lease not found.
|
||||
// related issue https://github.com/coreos/etcd/issues/6978
|
||||
func TestV3LeaseTimeToLiveStress(t *testing.T) {
|
||||
testLeaseStress(t, stressLeaseTimeToLive)
|
||||
}
|
||||
|
||||
func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
errc := make(chan error)
|
||||
|
||||
for i := 0; i < 30; i++ {
|
||||
for j := 0; j < 3; j++ {
|
||||
go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < 90; i++ {
|
||||
if err := <-errc; err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) {
|
||||
defer func() {
|
||||
if tctx.Err() != nil {
|
||||
reterr = nil
|
||||
}
|
||||
}()
|
||||
lac, err := lc.LeaseKeepAlive(tctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for tctx.Err() == nil {
|
||||
resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60})
|
||||
if gerr != nil {
|
||||
continue
|
||||
}
|
||||
err = lac.Send(&pb.LeaseKeepAliveRequest{ID: resp.ID})
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
rresp, rxerr := lac.Recv()
|
||||
if rxerr != nil {
|
||||
continue
|
||||
}
|
||||
if rresp.TTL == 0 {
|
||||
return fmt.Errorf("TTL shouldn't be 0 so soon")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func stressLeaseTimeToLive(tctx context.Context, lc pb.LeaseClient) (reterr error) {
|
||||
defer func() {
|
||||
if tctx.Err() != nil {
|
||||
reterr = nil
|
||||
}
|
||||
}()
|
||||
for tctx.Err() == nil {
|
||||
resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60})
|
||||
if gerr != nil {
|
||||
continue
|
||||
}
|
||||
_, kerr := lc.LeaseTimeToLive(tctx, &pb.LeaseTimeToLiveRequest{ID: resp.ID})
|
||||
if rpctypes.Error(kerr) == rpctypes.ErrLeaseNotFound {
|
||||
return kerr
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestV3PutOnNonExistLease(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
|
|
|
@ -16,9 +16,11 @@ package leasehttp
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/lease"
|
||||
|
@ -30,14 +32,19 @@ import (
|
|||
var (
|
||||
LeasePrefix = "/leases"
|
||||
LeaseInternalPrefix = "/leases/internal"
|
||||
applyTimeout = time.Second
|
||||
ErrLeaseHTTPTimeout = errors.New("waiting for node to catch up its applied index has timed out")
|
||||
)
|
||||
|
||||
// NewHandler returns an http Handler for lease renewals
|
||||
func NewHandler(l lease.Lessor) http.Handler {
|
||||
return &leaseHandler{l}
|
||||
func NewHandler(l lease.Lessor, waitch func() <-chan struct{}) http.Handler {
|
||||
return &leaseHandler{l, waitch}
|
||||
}
|
||||
|
||||
type leaseHandler struct{ l lease.Lessor }
|
||||
type leaseHandler struct {
|
||||
l lease.Lessor
|
||||
waitch func() <-chan struct{}
|
||||
}
|
||||
|
||||
func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "POST" {
|
||||
|
@ -59,6 +66,12 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-h.waitch():
|
||||
case <-time.After(applyTimeout):
|
||||
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
|
||||
return
|
||||
}
|
||||
ttl, err := h.l.Renew(lease.LeaseID(lreq.ID))
|
||||
if err != nil {
|
||||
if err == lease.ErrLeaseNotFound {
|
||||
|
@ -83,7 +96,12 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-h.waitch():
|
||||
case <-time.After(applyTimeout):
|
||||
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
|
||||
return
|
||||
}
|
||||
l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
|
||||
if l == nil {
|
||||
http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
|
||||
|
@ -148,6 +166,10 @@ func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundT
|
|||
return -1, err
|
||||
}
|
||||
|
||||
if resp.StatusCode == http.StatusRequestTimeout {
|
||||
return -1, ErrLeaseHTTPTimeout
|
||||
}
|
||||
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return -1, lease.ErrLeaseNotFound
|
||||
}
|
||||
|
@ -196,6 +218,10 @@ func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string
|
|||
errc <- err
|
||||
return
|
||||
}
|
||||
if resp.StatusCode == http.StatusRequestTimeout {
|
||||
errc <- ErrLeaseHTTPTimeout
|
||||
return
|
||||
}
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
errc <- lease.ErrLeaseNotFound
|
||||
return
|
||||
|
|
|
@ -18,11 +18,13 @@ import (
|
|||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
|
@ -38,7 +40,7 @@ func TestRenewHTTP(t *testing.T) {
|
|||
t.Fatalf("failed to create lease: %v", err)
|
||||
}
|
||||
|
||||
ts := httptest.NewServer(NewHandler(le))
|
||||
ts := httptest.NewServer(NewHandler(le, waitReady))
|
||||
defer ts.Close()
|
||||
|
||||
ttl, err := RenewHTTP(context.TODO(), l.ID, ts.URL+LeasePrefix, http.DefaultTransport)
|
||||
|
@ -62,7 +64,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
|
|||
t.Fatalf("failed to create lease: %v", err)
|
||||
}
|
||||
|
||||
ts := httptest.NewServer(NewHandler(le))
|
||||
ts := httptest.NewServer(NewHandler(le, waitReady))
|
||||
defer ts.Close()
|
||||
|
||||
resp, err := TimeToLiveHTTP(context.TODO(), l.ID, true, ts.URL+LeaseInternalPrefix, http.DefaultTransport)
|
||||
|
@ -76,3 +78,50 @@ func TestTimeToLiveHTTP(t *testing.T) {
|
|||
t.Fatalf("granted TTL expected 5, got %d", resp.LeaseTimeToLiveResponse.GrantedTTL)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRenewHTTPTimeout(t *testing.T) {
|
||||
testApplyTimeout(t, func(l *lease.Lease, serverURL string) error {
|
||||
_, err := RenewHTTP(context.TODO(), l.ID, serverURL+LeasePrefix, http.DefaultTransport)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func TestTimeToLiveHTTPTimeout(t *testing.T) {
|
||||
testApplyTimeout(t, func(l *lease.Lease, serverURL string) error {
|
||||
_, err := TimeToLiveHTTP(context.TODO(), l.ID, true, serverURL+LeaseInternalPrefix, http.DefaultTransport)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
|
||||
be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
|
||||
defer os.Remove(tmpPath)
|
||||
defer be.Close()
|
||||
|
||||
le := lease.NewLessor(be, int64(5))
|
||||
le.Promote(time.Second)
|
||||
l, err := le.Grant(1, int64(5))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create lease: %v", err)
|
||||
}
|
||||
|
||||
ts := httptest.NewServer(NewHandler(le, waitNotReady))
|
||||
defer ts.Close()
|
||||
err = f(l, ts.URL)
|
||||
if err == nil {
|
||||
t.Fatalf("expected timeout error, got nil")
|
||||
}
|
||||
if strings.Compare(err.Error(), ErrLeaseHTTPTimeout.Error()) != 0 {
|
||||
t.Fatalf("expected (%v), got (%v)", ErrLeaseHTTPTimeout.Error(), err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func waitReady() <-chan struct{} {
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
func waitNotReady() <-chan struct{} {
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue