From f5eaaaf440af1ada3f3d4975bd23ae3812e43ea8 Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Tue, 30 Apr 2019 15:51:36 -0700 Subject: [PATCH] etcdserver: forward member promote to leader --- etcdserver/api/etcdhttp/peer.go | 98 +++++++++++++++++-- etcdserver/api/etcdhttp/peer_test.go | 140 +++++++++++++++++++++++++-- etcdserver/cluster_util.go | 46 +++++++++ etcdserver/server.go | 32 ++++++ etcdserver/v3_server.go | 12 ++- 5 files changed, 307 insertions(+), 21 deletions(-) diff --git a/etcdserver/api/etcdhttp/peer.go b/etcdserver/api/etcdhttp/peer.go index 9f3eac352..3c3d5345a 100644 --- a/etcdserver/api/etcdhttp/peer.go +++ b/etcdserver/api/etcdhttp/peer.go @@ -16,56 +16,82 @@ package etcdhttp import ( "encoding/json" + "fmt" "net/http" + "strconv" + "strings" "go.etcd.io/etcd/etcdserver" "go.etcd.io/etcd/etcdserver/api" + "go.etcd.io/etcd/etcdserver/api/membership" "go.etcd.io/etcd/etcdserver/api/rafthttp" "go.etcd.io/etcd/lease/leasehttp" + "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" ) const ( - peerMembersPrefix = "/members" + peerMembersPath = "/members" + peerMemberPromotePrefix = "/members/promote/" ) // NewPeerHandler generates an http.Handler to handle etcd peer requests. func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeer) http.Handler { - return newPeerHandler(lg, s.Cluster(), s.RaftHandler(), s.LeaseHandler()) + return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler()) } -func newPeerHandler(lg *zap.Logger, cluster api.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler { - mh := &peerMembersHandler{ - lg: lg, - cluster: cluster, - } +func newPeerHandler(lg *zap.Logger, s etcdserver.Server, raftHandler http.Handler, leaseHandler http.Handler) http.Handler { + peerMembersHandler := newPeerMembersHandler(lg, s.Cluster()) + peerMemberPromoteHandler := newPeerMemberPromoteHandler(lg, s) mux := http.NewServeMux() mux.HandleFunc("/", http.NotFound) mux.Handle(rafthttp.RaftPrefix, raftHandler) mux.Handle(rafthttp.RaftPrefix+"/", raftHandler) - mux.Handle(peerMembersPrefix, mh) + mux.Handle(peerMembersPath, peerMembersHandler) + mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler) if leaseHandler != nil { mux.Handle(leasehttp.LeasePrefix, leaseHandler) mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler) } - mux.HandleFunc(versionPath, versionHandler(cluster, serveVersion)) + mux.HandleFunc(versionPath, versionHandler(s.Cluster(), serveVersion)) return mux } +func newPeerMembersHandler(lg *zap.Logger, cluster api.Cluster) http.Handler { + return &peerMembersHandler{ + lg: lg, + cluster: cluster, + } +} + type peerMembersHandler struct { lg *zap.Logger cluster api.Cluster } +func newPeerMemberPromoteHandler(lg *zap.Logger, s etcdserver.Server) http.Handler { + return &peerMemberPromoteHandler{ + lg: lg, + cluster: s.Cluster(), + server: s, + } +} + +type peerMemberPromoteHandler struct { + lg *zap.Logger + cluster api.Cluster + server etcdserver.Server +} + func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r, "GET") { return } w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) - if r.URL.Path != peerMembersPrefix { + if r.URL.Path != peerMembersPath { http.Error(w, "bad path", http.StatusBadRequest) return } @@ -79,3 +105,55 @@ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } } + +func (h *peerMemberPromoteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !allowMethod(w, r, "POST") { + return + } + w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) + + if !strings.HasPrefix(r.URL.Path, peerMemberPromotePrefix) { + http.Error(w, "bad path", http.StatusBadRequest) + return + } + idStr := strings.TrimPrefix(r.URL.Path, peerMemberPromotePrefix) + id, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + http.Error(w, fmt.Sprintf("member %s not found in cluster", idStr), http.StatusNotFound) + return + } + + resp, err := h.server.PromoteMember(r.Context(), id) + if err != nil { + switch err { + case membership.ErrIDNotFound: + http.Error(w, err.Error(), http.StatusNotFound) + case membership.ErrMemberNotLearner: + http.Error(w, err.Error(), http.StatusPreconditionFailed) + case membership.ErrLearnerNotReady: + http.Error(w, err.Error(), http.StatusPreconditionFailed) + default: + WriteError(h.lg, w, r, err) + } + if h.lg != nil { + h.lg.Warn( + "failed to promote a member", + zap.String("member-id", types.ID(id).String()), + zap.Error(err), + ) + } else { + plog.Errorf("error promoting member %s (%v)", types.ID(id).String(), err) + } + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(resp); err != nil { + if h.lg != nil { + h.lg.Warn("failed to encode members response", zap.Error(err)) + } else { + plog.Warningf("failed to encode members response (%v)", err) + } + } +} diff --git a/etcdserver/api/etcdhttp/peer_test.go b/etcdserver/api/etcdhttp/peer_test.go index 095aa5da8..8d890c0b5 100644 --- a/etcdserver/api/etcdhttp/peer_test.go +++ b/etcdserver/api/etcdhttp/peer_test.go @@ -15,19 +15,24 @@ package etcdhttp import ( + "context" "encoding/json" + "fmt" "io/ioutil" "net/http" "net/http/httptest" "path" "sort" + "strings" "testing" "go.uber.org/zap" "github.com/coreos/go-semver/semver" + "go.etcd.io/etcd/etcdserver/api" "go.etcd.io/etcd/etcdserver/api/membership" "go.etcd.io/etcd/etcdserver/api/rafthttp" + pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/pkg/testutil" "go.etcd.io/etcd/pkg/types" ) @@ -51,13 +56,34 @@ func (c *fakeCluster) Members() []*membership.Member { func (c *fakeCluster) Member(id types.ID) *membership.Member { return c.members[uint64(id)] } func (c *fakeCluster) Version() *semver.Version { return nil } +type fakeServer struct { + cluster api.Cluster +} + +func (s *fakeServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) { + return nil, fmt.Errorf("AddMember not implemented in fakeServer") +} +func (s *fakeServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) { + return nil, fmt.Errorf("RemoveMember not implemented in fakeServer") +} +func (s *fakeServer) UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error) { + return nil, fmt.Errorf("UpdateMember not implemented in fakeServer") +} +func (s *fakeServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) { + return nil, fmt.Errorf("PromoteMember not implemented in fakeServer") +} +func (s *fakeServer) ClusterVersion() *semver.Version { return nil } +func (s *fakeServer) Cluster() api.Cluster { return s.cluster } +func (s *fakeServer) Alarms() []*pb.AlarmMember { return nil } + +var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("test data")) +}) + // TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that // handles raft-prefix requests well. func TestNewPeerHandlerOnRaftPrefix(t *testing.T) { - h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("test data")) - }) - ph := newPeerHandler(zap.NewExample(), &fakeCluster{}, h, nil) + ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil) srv := httptest.NewServer(ph) defer srv.Close() @@ -80,6 +106,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) { } } +// TestServeMembersFails ensures peerMembersHandler only accepts GET request func TestServeMembersFails(t *testing.T) { tests := []struct { method string @@ -89,6 +116,10 @@ func TestServeMembersFails(t *testing.T) { "POST", http.StatusMethodNotAllowed, }, + { + "PUT", + http.StatusMethodNotAllowed, + }, { "DELETE", http.StatusMethodNotAllowed, @@ -100,8 +131,12 @@ func TestServeMembersFails(t *testing.T) { } for i, tt := range tests { rw := httptest.NewRecorder() - h := &peerMembersHandler{cluster: nil} - h.ServeHTTP(rw, &http.Request{Method: tt.method}) + h := newPeerMembersHandler(nil, &fakeCluster{}) + req, err := http.NewRequest(tt.method, "", nil) + if err != nil { + t.Fatalf("#%d: failed to create http request: %v", i, err) + } + h.ServeHTTP(rw, req) if rw.Code != tt.wcode { t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode) } @@ -115,7 +150,7 @@ func TestServeMembersGet(t *testing.T) { id: 1, members: map[uint64]*membership.Member{1: &memb1, 2: &memb2}, } - h := &peerMembersHandler{cluster: cluster} + h := newPeerMembersHandler(nil, cluster) msb, err := json.Marshal([]membership.Member{memb1, memb2}) if err != nil { t.Fatal(err) @@ -128,8 +163,8 @@ func TestServeMembersGet(t *testing.T) { wct string wbody string }{ - {peerMembersPrefix, http.StatusOK, "application/json", wms}, - {path.Join(peerMembersPrefix, "bad"), http.StatusBadRequest, "text/plain; charset=utf-8", "bad path\n"}, + {peerMembersPath, http.StatusOK, "application/json", wms}, + {path.Join(peerMembersPath, "bad"), http.StatusBadRequest, "text/plain; charset=utf-8", "bad path\n"}, } for i, tt := range tests { @@ -156,3 +191,90 @@ func TestServeMembersGet(t *testing.T) { } } } + +// TestServeMemberPromoteFails ensures peerMemberPromoteHandler only accepts POST request +func TestServeMemberPromoteFails(t *testing.T) { + tests := []struct { + method string + wcode int + }{ + { + "GET", + http.StatusMethodNotAllowed, + }, + { + "PUT", + http.StatusMethodNotAllowed, + }, + { + "DELETE", + http.StatusMethodNotAllowed, + }, + { + "BAD", + http.StatusMethodNotAllowed, + }, + } + for i, tt := range tests { + rw := httptest.NewRecorder() + h := newPeerMemberPromoteHandler(nil, &fakeServer{cluster: &fakeCluster{}}) + req, err := http.NewRequest(tt.method, "", nil) + if err != nil { + t.Fatalf("#%d: failed to create http request: %v", i, err) + } + h.ServeHTTP(rw, req) + if rw.Code != tt.wcode { + t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode) + } + } +} + +// TestNewPeerHandlerOnMembersPromotePrefix verifies the request with members promote prefix is routed correctly +func TestNewPeerHandlerOnMembersPromotePrefix(t *testing.T) { + ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil) + srv := httptest.NewServer(ph) + defer srv.Close() + + tests := []struct { + path string + wcode int + checkBody bool + wKeyWords string + }{ + { + // does not contain member id in path + peerMemberPromotePrefix, + http.StatusNotFound, + false, + "", + }, + { + // try to promote member id = 1 + peerMemberPromotePrefix + "1", + http.StatusInternalServerError, + true, + "PromoteMember not implemented in fakeServer", + }, + } + for i, tt := range tests { + req, err := http.NewRequest("POST", srv.URL+tt.path, nil) + if err != nil { + t.Fatalf("failed to create request: %v", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("failed to get http response: %v", err) + } + body, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + t.Fatalf("unexpected ioutil.ReadAll error: %v", err) + } + if resp.StatusCode != tt.wcode { + t.Fatalf("#%d: code = %d, want %d", i, resp.StatusCode, tt.wcode) + } + if tt.checkBody && strings.Contains(string(body), tt.wKeyWords) { + t.Errorf("#%d: body: %s, want body to contain keywords: %s", i, string(body), tt.wKeyWords) + } + } +} diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index eecb890e6..2030e7958 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -15,11 +15,13 @@ package etcdserver import ( + "context" "encoding/json" "fmt" "io/ioutil" "net/http" "sort" + "strings" "time" "go.etcd.io/etcd/etcdserver/api/membership" @@ -355,3 +357,47 @@ func getVersion(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (*ve } return nil, err } + +func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.RoundTripper) ([]*membership.Member, error) { + cc := &http.Client{Transport: peerRt} + // TODO: refactor member http handler code + // cannot import etcdhttp, so manually construct url + requestUrl := url + "/members/promote/" + fmt.Sprintf("%d", id) + req, err := http.NewRequest("POST", requestUrl, nil) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + resp, err := cc.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode == http.StatusRequestTimeout { + return nil, ErrTimeout + } + if resp.StatusCode == http.StatusPreconditionFailed { + // both ErrMemberNotLearner and ErrLearnerNotReady have same http status code + if strings.Contains(string(b), membership.ErrLearnerNotReady.Error()) { + return nil, membership.ErrLearnerNotReady + } + if strings.Contains(string(b), membership.ErrMemberNotLearner.Error()) { + return nil, membership.ErrMemberNotLearner + } + return nil, fmt.Errorf("member promote: unknown error(%s)", string(b)) + } + if resp.StatusCode == http.StatusNotFound { + return nil, membership.ErrIDNotFound + } + + var membs []*membership.Member + if err := json.Unmarshal(b, &membs); err != nil { + return nil, err + } + return membs, nil +} diff --git a/etcdserver/server.go b/etcdserver/server.go index 8cd4da5e9..82ba5983f 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1635,6 +1635,38 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership // PromoteMember promotes a learner node to a voting node. func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) { + resp, err := s.promoteMember(ctx, id) + if err != ErrNotLeader { + return resp, err + } + + cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) + defer cancel() + // forward to leader + for cctx.Err() == nil { + leader, err := s.waitLeader(cctx) + if err != nil { + return nil, err + } + for _, url := range leader.PeerURLs { + resp, err := promoteMemberHTTP(cctx, url, id, s.peerRt) + if err == nil { + return resp, nil + } + // If member promotion failed, return early. Otherwise keep retry. + if err == membership.ErrIDNotFound || err == membership.ErrLearnerNotReady || err == membership.ErrMemberNotLearner { + return nil, err + } + } + } + + if cctx.Err() == context.DeadlineExceeded { + return nil, ErrTimeout + } + return nil, ErrCanceled +} + +func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) { if err := s.checkMembershipOperationPermission(ctx); err != nil { return nil, err } diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 74d980961..b2084618b 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -260,7 +260,11 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e } } } - return -1, ErrTimeout + + if cctx.Err() == context.DeadlineExceeded { + return -1, ErrTimeout + } + return -1, ErrCanceled } func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { @@ -303,7 +307,11 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR } } } - return nil, ErrTimeout + + if cctx.Err() == context.DeadlineExceeded { + return nil, ErrTimeout + } + return nil, ErrCanceled } func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {