From 1d3afd4bb5d3dda36267b4e2fb2cd1aa10ae42d7 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 7 Aug 2017 00:48:05 -0700 Subject: [PATCH] etcdhttp, v2http, etcdserver: use etcdserver.{Server,ServerV2} interfaces --- etcdserver/api/cluster.go | 3 -- etcdserver/api/etcdhttp/base.go | 2 +- etcdserver/api/etcdhttp/metrics.go | 6 +-- etcdserver/api/etcdhttp/peer.go | 9 +--- etcdserver/api/etcdhttp/peer_test.go | 1 - etcdserver/api/v2http/client.go | 34 ++++++------- etcdserver/api/v2http/client_test.go | 75 +++++++++++++++------------- etcdserver/api/v2http/http_test.go | 8 +-- etcdserver/api/v3rpc/maintenance.go | 3 +- etcdserver/api/v3rpc/member.go | 14 +++--- etcdserver/server.go | 70 +++++++++++++++++--------- etcdserver/server_test.go | 3 +- etcdserver/v2_server.go | 10 +++- 13 files changed, 128 insertions(+), 110 deletions(-) diff --git a/etcdserver/api/cluster.go b/etcdserver/api/cluster.go index 87face4a1..654c25804 100644 --- a/etcdserver/api/cluster.go +++ b/etcdserver/api/cluster.go @@ -33,9 +33,6 @@ type Cluster interface { // Member retrieves a particular member based on ID, or nil if the // member does not exist in the cluster Member(id types.ID) *membership.Member - // IsIDRemoved checks whether the given ID has been removed from this - // cluster at some point in the past - IsIDRemoved(id types.ID) bool // Version is the cluster-wide minimum major.minor version. Version() *semver.Version } diff --git a/etcdserver/api/etcdhttp/base.go b/etcdserver/api/etcdhttp/base.go index e7dc144f6..f0d3b0bd3 100644 --- a/etcdserver/api/etcdhttp/base.go +++ b/etcdserver/api/etcdhttp/base.go @@ -43,7 +43,7 @@ const ( // HandleBasic adds handlers to a mux for serving JSON etcd client requests // that do not access the v2 store. -func HandleBasic(mux *http.ServeMux, server *etcdserver.EtcdServer) { +func HandleBasic(mux *http.ServeMux, server etcdserver.ServerPeer) { mux.HandleFunc(varsPath, serveVars) mux.HandleFunc(configPath+"/local/log", logHandleFunc) HandleMetricsHealth(mux, server) diff --git a/etcdserver/api/etcdhttp/metrics.go b/etcdserver/api/etcdhttp/metrics.go index 81b636056..23e7aff86 100644 --- a/etcdserver/api/etcdhttp/metrics.go +++ b/etcdserver/api/etcdhttp/metrics.go @@ -33,7 +33,7 @@ const ( ) // HandleMetricsHealth registers metrics and health handlers. -func HandleMetricsHealth(mux *http.ServeMux, srv *etcdserver.EtcdServer) { +func HandleMetricsHealth(mux *http.ServeMux, srv etcdserver.ServerV2) { mux.Handle(pathMetrics, prometheus.Handler()) mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) })) } @@ -44,7 +44,7 @@ func HandlePrometheus(mux *http.ServeMux) { } // HandleHealth registers health handler on '/health'. -func HandleHealth(mux *http.ServeMux, srv *etcdserver.EtcdServer) { +func HandleHealth(mux *http.ServeMux, srv etcdserver.ServerV2) { mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) })) } @@ -74,7 +74,7 @@ type Health struct { Errors []string `json:"errors,omitempty"` } -func checkHealth(srv *etcdserver.EtcdServer) Health { +func checkHealth(srv etcdserver.ServerV2) Health { h := Health{Health: false} as := srv.Alarms() diff --git a/etcdserver/api/etcdhttp/peer.go b/etcdserver/api/etcdhttp/peer.go index 721bae3c6..0a9213b01 100644 --- a/etcdserver/api/etcdhttp/peer.go +++ b/etcdserver/api/etcdhttp/peer.go @@ -29,13 +29,8 @@ const ( ) // NewPeerHandler generates an http.Handler to handle etcd peer requests. -func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler { - var lh http.Handler - l := s.Lessor() - if l != nil { - lh = leasehttp.NewHandler(l, func() <-chan struct{} { return s.ApplyWait() }) - } - return newPeerHandler(s.Cluster(), s.RaftHandler(), lh) +func NewPeerHandler(s etcdserver.ServerPeer) http.Handler { + return newPeerHandler(s.Cluster(), s.RaftHandler(), s.LeaseHandler()) } func newPeerHandler(cluster api.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler { diff --git a/etcdserver/api/etcdhttp/peer_test.go b/etcdserver/api/etcdhttp/peer_test.go index c2b14f195..40dbef8ae 100644 --- a/etcdserver/api/etcdhttp/peer_test.go +++ b/etcdserver/api/etcdhttp/peer_test.go @@ -47,7 +47,6 @@ func (c *fakeCluster) Members() []*membership.Member { return []*membership.Member(ms) } func (c *fakeCluster) Member(id types.ID) *membership.Member { return c.members[uint64(id)] } -func (c *fakeCluster) IsIDRemoved(id types.ID) bool { return false } func (c *fakeCluster) Version() *semver.Version { return nil } // TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that diff --git a/etcdserver/api/v2http/client.go b/etcdserver/api/v2http/client.go index aa1e71ec3..c9d86508e 100644 --- a/etcdserver/api/v2http/client.go +++ b/etcdserver/api/v2http/client.go @@ -50,22 +50,21 @@ const ( ) // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests. -func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http.Handler { +func NewClientHandler(server etcdserver.ServerPeer, timeout time.Duration) http.Handler { mux := http.NewServeMux() etcdhttp.HandleBasic(mux, server) handleV2(mux, server, timeout) return requestLogger(mux) } -func handleV2(mux *http.ServeMux, server *etcdserver.EtcdServer, timeout time.Duration) { +func handleV2(mux *http.ServeMux, server etcdserver.ServerV2, timeout time.Duration) { sec := auth.NewStore(server, timeout) kh := &keysHandler{ sec: sec, server: server, cluster: server.Cluster(), - timer: server, timeout: timeout, - clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled, + clientCertAuthEnabled: server.ClientCertAuthEnabled(), } sh := &statsHandler{ @@ -78,7 +77,7 @@ func handleV2(mux *http.ServeMux, server *etcdserver.EtcdServer, timeout time.Du cluster: server.Cluster(), timeout: timeout, clock: clockwork.NewRealClock(), - clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled, + clientCertAuthEnabled: server.ClientCertAuthEnabled(), } mah := &machinesHandler{cluster: server.Cluster()} @@ -86,7 +85,7 @@ func handleV2(mux *http.ServeMux, server *etcdserver.EtcdServer, timeout time.Du sech := &authHandler{ sec: sec, cluster: server.Cluster(), - clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled, + clientCertAuthEnabled: server.ClientCertAuthEnabled(), } mux.HandleFunc("/", http.NotFound) mux.Handle(keysPrefix, kh) @@ -102,9 +101,8 @@ func handleV2(mux *http.ServeMux, server *etcdserver.EtcdServer, timeout time.Du type keysHandler struct { sec auth.Store - server etcdserver.Server + server etcdserver.ServerV2 cluster api.Cluster - timer etcdserver.RaftTimer timeout time.Duration clientCertAuthEnabled bool } @@ -142,7 +140,7 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } switch { case resp.Event != nil: - if err := writeKeyEvent(w, resp.Event, noValueOnSuccess, h.timer); err != nil { + if err := writeKeyEvent(w, resp, noValueOnSuccess); err != nil { // Should never be reached plog.Errorf("error writing event (%v)", err) } @@ -150,7 +148,7 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { case resp.Watcher != nil: ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout) defer cancel() - handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer) + handleKeyWatch(ctx, w, resp, rr.Stream) default: writeKeyError(w, errors.New("received response with no Event/Watcher!")) } @@ -170,7 +168,7 @@ func (h *machinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { type membersHandler struct { sec auth.Store - server etcdserver.Server + server etcdserver.ServerV2 cluster api.Cluster timeout time.Duration clock clockwork.Clock @@ -503,14 +501,15 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque // writeKeyEvent trims the prefix of key path in a single Event under // StoreKeysPrefix, serializes it and writes the resulting JSON to the given // ResponseWriter, along with the appropriate headers. -func writeKeyEvent(w http.ResponseWriter, ev *store.Event, noValueOnSuccess bool, rt etcdserver.RaftTimer) error { +func writeKeyEvent(w http.ResponseWriter, resp etcdserver.Response, noValueOnSuccess bool) error { + ev := resp.Event if ev == nil { return errors.New("cannot write empty Event!") } w.Header().Set("Content-Type", "application/json") w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex)) - w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index())) - w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term())) + w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index)) + w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term)) if ev.IsCreated() { w.WriteHeader(http.StatusCreated) @@ -552,7 +551,8 @@ func writeKeyError(w http.ResponseWriter, err error) { } } -func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) { +func handleKeyWatch(ctx context.Context, w http.ResponseWriter, resp etcdserver.Response, stream bool) { + wa := resp.Watcher defer wa.Remove() ech := wa.EventChan() var nch <-chan bool @@ -562,8 +562,8 @@ func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher w.Header().Set("Content-Type", "application/json") w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex())) - w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index())) - w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term())) + w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index)) + w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term)) w.WriteHeader(http.StatusOK) // Ensure headers are flushed early, in case of long polling diff --git a/etcdserver/api/v2http/client_test.go b/etcdserver/api/v2http/client_test.go index 896021b22..0a58082a4 100644 --- a/etcdserver/api/v2http/client_test.go +++ b/etcdserver/api/v2http/client_test.go @@ -30,6 +30,7 @@ import ( etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api" "github.com/coreos/etcd/etcdserver/api/v2http/httptypes" "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/membership" @@ -87,14 +88,26 @@ func mustNewMethodRequest(t *testing.T, m, p string) *http.Request { } } +type fakeServer struct { + dummyRaftTimer + dummyStats +} + +func (s *fakeServer) Leader() types.ID { return types.ID(1) } +func (s *fakeServer) Alarms() []*etcdserverpb.AlarmMember { return nil } +func (s *fakeServer) Cluster() api.Cluster { return nil } +func (s *fakeServer) ClusterVersion() *semver.Version { return nil } +func (s *fakeServer) RaftHandler() http.Handler { return nil } +func (s *fakeServer) Do(ctx context.Context, r etcdserverpb.Request) (rr etcdserver.Response, err error) { + return +} +func (s *fakeServer) ClientCertAuthEnabled() bool { return false } + type serverRecorder struct { + fakeServer actions []action } -func (s *serverRecorder) Start() {} -func (s *serverRecorder) Stop() {} -func (s *serverRecorder) Leader() types.ID { return types.ID(1) } -func (s *serverRecorder) ID() types.ID { return types.ID(1) } func (s *serverRecorder) Do(_ context.Context, r etcdserverpb.Request) (etcdserver.Response, error) { s.actions = append(s.actions, action{name: "Do", params: []interface{}{r}}) return etcdserver.Response{}, nil @@ -117,8 +130,6 @@ func (s *serverRecorder) UpdateMember(_ context.Context, m membership.Member) ([ return nil, nil } -func (s *serverRecorder) ClusterVersion() *semver.Version { return nil } - type action struct { name string params []interface{} @@ -138,13 +149,10 @@ func (fr *flushingRecorder) Flush() { // resServer implements the etcd.Server interface for testing. // It returns the given response from any Do calls, and nil error type resServer struct { + fakeServer res etcdserver.Response } -func (rs *resServer) Start() {} -func (rs *resServer) Stop() {} -func (rs *resServer) ID() types.ID { return types.ID(1) } -func (rs *resServer) Leader() types.ID { return types.ID(1) } func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) { return rs.res, nil } @@ -158,7 +166,6 @@ func (rs *resServer) RemoveMember(_ context.Context, _ uint64) ([]*membership.Me func (rs *resServer) UpdateMember(_ context.Context, _ membership.Member) ([]*membership.Member, error) { return nil, nil } -func (rs *resServer) ClusterVersion() *semver.Version { return nil } func boolp(b bool) *bool { return &b } @@ -874,7 +881,7 @@ func TestServeMembersUpdate(t *testing.T) { func TestServeMembersFail(t *testing.T) { tests := []struct { req *http.Request - server etcdserver.Server + server etcdserver.ServerV2 wcode int }{ @@ -941,7 +948,7 @@ func TestServeMembersFail(t *testing.T) { Header: map[string][]string{"Content-Type": {"application/json"}}, }, &errServer{ - errors.New("Error while adding a member"), + err: errors.New("Error while adding a member"), }, http.StatusInternalServerError, @@ -955,7 +962,7 @@ func TestServeMembersFail(t *testing.T) { Header: map[string][]string{"Content-Type": {"application/json"}}, }, &errServer{ - membership.ErrIDExists, + err: membership.ErrIDExists, }, http.StatusConflict, @@ -969,7 +976,7 @@ func TestServeMembersFail(t *testing.T) { Header: map[string][]string{"Content-Type": {"application/json"}}, }, &errServer{ - membership.ErrPeerURLexists, + err: membership.ErrPeerURLexists, }, http.StatusConflict, @@ -981,7 +988,7 @@ func TestServeMembersFail(t *testing.T) { Method: "DELETE", }, &errServer{ - errors.New("Error while removing member"), + err: errors.New("Error while removing member"), }, http.StatusInternalServerError, @@ -993,7 +1000,7 @@ func TestServeMembersFail(t *testing.T) { Method: "DELETE", }, &errServer{ - membership.ErrIDRemoved, + err: membership.ErrIDRemoved, }, http.StatusGone, @@ -1005,7 +1012,7 @@ func TestServeMembersFail(t *testing.T) { Method: "DELETE", }, &errServer{ - membership.ErrIDNotFound, + err: membership.ErrIDNotFound, }, http.StatusNotFound, @@ -1075,7 +1082,7 @@ func TestServeMembersFail(t *testing.T) { Header: map[string][]string{"Content-Type": {"application/json"}}, }, &errServer{ - errors.New("blah"), + err: errors.New("blah"), }, http.StatusInternalServerError, @@ -1089,7 +1096,7 @@ func TestServeMembersFail(t *testing.T) { Header: map[string][]string{"Content-Type": {"application/json"}}, }, &errServer{ - membership.ErrPeerURLexists, + err: membership.ErrPeerURLexists, }, http.StatusConflict, @@ -1103,7 +1110,7 @@ func TestServeMembersFail(t *testing.T) { Header: map[string][]string{"Content-Type": {"application/json"}}, }, &errServer{ - membership.ErrIDNotFound, + err: membership.ErrIDNotFound, }, http.StatusNotFound, @@ -1153,7 +1160,7 @@ func TestServeMembersFail(t *testing.T) { func TestWriteEvent(t *testing.T) { // nil event should not panic rec := httptest.NewRecorder() - writeKeyEvent(rec, nil, false, dummyRaftTimer{}) + writeKeyEvent(rec, etcdserver.Response{}, false) h := rec.Header() if len(h) > 0 { t.Fatalf("unexpected non-empty headers: %#v", h) @@ -1199,7 +1206,8 @@ func TestWriteEvent(t *testing.T) { for i, tt := range tests { rw := httptest.NewRecorder() - writeKeyEvent(rw, tt.ev, tt.noValue, dummyRaftTimer{}) + resp := etcdserver.Response{Event: tt.ev, Term: 5, Index: 100} + writeKeyEvent(rw, resp, tt.noValue) if gct := rw.Header().Get("Content-Type"); gct != "application/json" { t.Errorf("case %d: bad Content-Type: got %q, want application/json", i, gct) } @@ -1411,7 +1419,7 @@ func TestServeStoreStats(t *testing.T) { func TestBadServeKeys(t *testing.T) { testBadCases := []struct { req *http.Request - server etcdserver.Server + server etcdserver.ServerV2 wcode int wbody string @@ -1451,7 +1459,7 @@ func TestBadServeKeys(t *testing.T) { // etcdserver.Server error mustNewRequest(t, "foo"), &errServer{ - errors.New("Internal Server Error"), + err: errors.New("Internal Server Error"), }, http.StatusInternalServerError, @@ -1461,7 +1469,7 @@ func TestBadServeKeys(t *testing.T) { // etcdserver.Server etcd error mustNewRequest(t, "foo"), &errServer{ - etcdErr.NewError(etcdErr.EcodeKeyNotFound, "/1/pant", 0), + err: etcdErr.NewError(etcdErr.EcodeKeyNotFound, "/1/pant", 0), }, http.StatusNotFound, @@ -1471,7 +1479,7 @@ func TestBadServeKeys(t *testing.T) { // non-event/watcher response from etcdserver.Server mustNewRequest(t, "foo"), &resServer{ - etcdserver.Response{}, + res: etcdserver.Response{}, }, http.StatusInternalServerError, @@ -1529,7 +1537,7 @@ func TestServeKeysGood(t *testing.T) { }, } server := &resServer{ - etcdserver.Response{ + res: etcdserver.Response{ Event: &store.Event{ Action: store.Get, Node: &store.NodeExtern{}, @@ -1540,7 +1548,6 @@ func TestServeKeysGood(t *testing.T) { h := &keysHandler{ timeout: time.Hour, server: server, - timer: &dummyRaftTimer{}, cluster: &fakeCluster{id: 1}, } rw := httptest.NewRecorder() @@ -1597,7 +1604,6 @@ func TestServeKeysEvent(t *testing.T) { timeout: time.Hour, server: server, cluster: &fakeCluster{id: 1}, - timer: &dummyRaftTimer{}, } for _, tt := range tests { @@ -1632,7 +1638,7 @@ func TestServeKeysWatch(t *testing.T) { echan: ec, } server := &resServer{ - etcdserver.Response{ + res: etcdserver.Response{ Watcher: dw, }, } @@ -1640,7 +1646,6 @@ func TestServeKeysWatch(t *testing.T) { timeout: time.Hour, server: server, cluster: &fakeCluster{id: 1}, - timer: &dummyRaftTimer{}, } go func() { ec <- &store.Event{ @@ -1764,7 +1769,8 @@ func TestHandleWatch(t *testing.T) { } tt.doToChan(wa.echan) - handleKeyWatch(tt.getCtx(), rw, wa, false, dummyRaftTimer{}) + resp := etcdserver.Response{Term: 5, Index: 100, Watcher: wa} + handleKeyWatch(tt.getCtx(), rw, resp, false) wcode := http.StatusOK wct := "application/json" @@ -1808,7 +1814,8 @@ func TestHandleWatchStreaming(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) done := make(chan struct{}) go func() { - handleKeyWatch(ctx, rw, wa, true, dummyRaftTimer{}) + resp := etcdserver.Response{Watcher: wa} + handleKeyWatch(ctx, rw, resp, true) close(done) }() diff --git a/etcdserver/api/v2http/http_test.go b/etcdserver/api/v2http/http_test.go index 61225409e..5b68a49a5 100644 --- a/etcdserver/api/v2http/http_test.go +++ b/etcdserver/api/v2http/http_test.go @@ -48,19 +48,15 @@ func (c *fakeCluster) Members() []*membership.Member { return []*membership.Member(ms) } func (c *fakeCluster) Member(id types.ID) *membership.Member { return c.members[uint64(id)] } -func (c *fakeCluster) IsIDRemoved(id types.ID) bool { return false } func (c *fakeCluster) Version() *semver.Version { return nil } // errServer implements the etcd.Server interface for testing. // It returns the given error from any Do/Process/AddMember/RemoveMember calls. type errServer struct { err error + fakeServer } -func (fs *errServer) Start() {} -func (fs *errServer) Stop() {} -func (fs *errServer) ID() types.ID { return types.ID(1) } -func (fs *errServer) Leader() types.ID { return types.ID(1) } func (fs *errServer) Do(ctx context.Context, r etcdserverpb.Request) (etcdserver.Response, error) { return etcdserver.Response{}, fs.err } @@ -77,8 +73,6 @@ func (fs *errServer) UpdateMember(ctx context.Context, m membership.Member) ([]* return nil, fs.err } -func (fs *errServer) ClusterVersion() *semver.Version { return nil } - func TestWriteError(t *testing.T) { // nil error should not panic rec := httptest.NewRecorder() diff --git a/etcdserver/api/v3rpc/maintenance.go b/etcdserver/api/v3rpc/maintenance.go index fdbb118ce..a88aec995 100644 --- a/etcdserver/api/v3rpc/maintenance.go +++ b/etcdserver/api/v3rpc/maintenance.go @@ -46,8 +46,7 @@ type LeaderTransferrer interface { } type RaftStatusGetter interface { - Index() uint64 - Term() uint64 + etcdserver.RaftTimer ID() types.ID Leader() types.ID } diff --git a/etcdserver/api/v3rpc/member.go b/etcdserver/api/v3rpc/member.go index 91a59389b..ff271e8dd 100644 --- a/etcdserver/api/v3rpc/member.go +++ b/etcdserver/api/v3rpc/member.go @@ -27,16 +27,14 @@ import ( ) type ClusterServer struct { - cluster api.Cluster - server etcdserver.Server - raftTimer etcdserver.RaftTimer + cluster api.Cluster + server etcdserver.ServerV3 } -func NewClusterServer(s *etcdserver.EtcdServer) *ClusterServer { +func NewClusterServer(s etcdserver.ServerV3) *ClusterServer { return &ClusterServer{ - cluster: s.Cluster(), - server: s, - raftTimer: s, + cluster: s.Cluster(), + server: s, } } @@ -86,7 +84,7 @@ func (cs *ClusterServer) MemberList(ctx context.Context, r *pb.MemberListRequest } func (cs *ClusterServer) header() *pb.ResponseHeader { - return &pb.ResponseHeader{ClusterId: uint64(cs.cluster.ID()), MemberId: uint64(cs.server.ID()), RaftTerm: cs.raftTimer.Term()} + return &pb.ResponseHeader{ClusterId: uint64(cs.cluster.ID()), MemberId: uint64(cs.server.ID()), RaftTerm: cs.server.Term()} } func membersToProtoMembers(membs []*membership.Member) []*pb.Member { diff --git a/etcdserver/server.go b/etcdserver/server.go index ac7601ecb..358f25a76 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -38,6 +38,7 @@ import ( "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/lease/leasehttp" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/pkg/fileutil" @@ -108,29 +109,33 @@ func init() { } type Response struct { + Term uint64 + Index uint64 Event *store.Event Watcher store.Watcher err error } -type Server interface { - // Start performs any initialization of the Server necessary for it to - // begin serving requests. It must be called before Do or Process. - // Start must be non-blocking; any long-running server functionality - // should be implemented in goroutines. - Start() - // Stop terminates the Server and performs any necessary finalization. - // Do and Process cannot be called after Stop has been invoked. - Stop() - // ID returns the ID of the Server. +type ServerV2 interface { + Server + // Do takes a V2 request and attempts to fulfill it, returning a Response. + Do(ctx context.Context, r pb.Request) (Response, error) + stats.Stats + ClientCertAuthEnabled() bool +} + +type ServerV3 interface { + Server ID() types.ID + RaftTimer +} + +func (s *EtcdServer) ClientCertAuthEnabled() bool { return s.Cfg.ClientCertAuthEnabled } + +type Server interface { // Leader returns the ID of the leader Server. Leader() types.ID - // Do takes a request and attempts to fulfill it, returning a Response. - Do(ctx context.Context, r pb.Request) (Response, error) - // Process takes a raft message and applies it to the server's raft state - // machine, respecting any timeout of the given context. - Process(ctx context.Context, m raftpb.Message) error + // AddMember attempts to add a member into the cluster. It will return // ErrIDRemoved if member ID is removed from the cluster, or return // ErrIDExists if member ID exists in the cluster. @@ -139,7 +144,6 @@ type Server interface { // return ErrIDRemoved if member ID is removed from the cluster, or return // ErrIDNotFound if member ID is not in the cluster. RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) - // UpdateMember attempts to update an existing member in the cluster. It will // return ErrIDNotFound if the member ID does not exist. UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error) @@ -159,6 +163,8 @@ type Server interface { // the leader is etcd 2.0. etcd 2.0 leader will not update clusterVersion since // this feature is introduced post 2.0. ClusterVersion() *semver.Version + Cluster() api.Cluster + Alarms() []*pb.AlarmMember } // EtcdServer is the production implementation of the Server interface @@ -514,9 +520,10 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { return srv, nil } -// Start prepares and starts server in a new goroutine. It is no longer safe to -// modify a server's fields after it has been sent to Start. -// It also starts a goroutine to publish its server information. +// Start performs any initialization of the Server necessary for it to +// begin serving requests. It must be called before Do or Process. +// Start must be non-blocking; any long-running server functionality +// should be implemented in goroutines. func (s *EtcdServer) Start() { s.start() s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) @@ -576,14 +583,27 @@ func (s *EtcdServer) purgeFile() { func (s *EtcdServer) ID() types.ID { return s.id } -func (s *EtcdServer) Cluster() *membership.RaftCluster { return s.cluster } - -func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() } - -func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor } +func (s *EtcdServer) Cluster() api.Cluster { return s.cluster } func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) } +type ServerPeer interface { + ServerV2 + RaftHandler() http.Handler + LeaseHandler() http.Handler +} + +func (s *EtcdServer) LeaseHandler() http.Handler { + if s.lessor == nil { + return nil + } + return leasehttp.NewHandler(s.lessor, s.ApplyWait) +} + +func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() } + +// Process takes a raft message and applies it to the server's raft state +// machine, respecting any timeout of the given context. func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { if s.cluster.IsIDRemoved(types.ID(m.From)) { plog.Warningf("reject message from removed member %s", types.ID(m.From).String()) @@ -992,6 +1012,8 @@ func (s *EtcdServer) HardStop() { // Stop should be called after a Start(s), otherwise it will block forever. // When stopping leader, Stop transfers its leadership to one of its peers // before stopping the server. +// Stop terminates the Server and performs any necessary finalization. +// Do and Process cannot be called after Stop has been invoked. func (s *EtcdServer) Stop() { if err := s.TransferLeadership(); err != nil { plog.Warningf("%s failed to transfer leadership (%v)", s.ID(), err) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 66c99247b..d17fab9c8 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -701,7 +701,8 @@ func TestDoProposal(t *testing.T) { if err != nil { t.Fatalf("#%d: err = %v, want nil", i, err) } - wresp := Response{Event: &store.Event{}} + // resp.Index is set in Do() based on the raft state; may either be 0 or 1 + wresp := Response{Event: &store.Event{}, Index: resp.Index} if !reflect.DeepEqual(resp, wresp) { t.Errorf("#%d: resp = %v, want %v", i, resp, wresp) } diff --git a/etcdserver/v2_server.go b/etcdserver/v2_server.go index 72c4eb7c5..b0a64ad62 100644 --- a/etcdserver/v2_server.go +++ b/etcdserver/v2_server.go @@ -96,12 +96,18 @@ func (a *v2apiStore) Head(ctx context.Context, r *pb.Request) (Response, error) return Response{Event: ev}, nil } -// Do interprets r and performs an operation on s.store according to r.Method +func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { + resp, err := s.do(ctx, r) + resp.Term, resp.Index = s.Term(), s.Index() + return resp, err +} + +// do interprets r and performs an operation on s.store according to r.Method // and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with // Quorum == true, r will be sent through consensus before performing its // respective operation. Do will block until an action is performed or there is // an error. -func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { +func (s *EtcdServer) do(ctx context.Context, r pb.Request) (Response, error) { r.ID = s.reqIDGen.Next() if r.Method == "GET" && r.Quorum { r.Method = "QGET"