etcdhttp, v2http, etcdserver: use etcdserver.{Server,ServerV2} interfaces

release-3.3
Anthony Romano 2017-08-07 00:48:05 -07:00
parent 565831c21c
commit 1d3afd4bb5
13 changed files with 128 additions and 110 deletions

View File

@ -33,9 +33,6 @@ type Cluster interface {
// Member retrieves a particular member based on ID, or nil if the
// member does not exist in the cluster
Member(id types.ID) *membership.Member
// IsIDRemoved checks whether the given ID has been removed from this
// cluster at some point in the past
IsIDRemoved(id types.ID) bool
// Version is the cluster-wide minimum major.minor version.
Version() *semver.Version
}

View File

@ -43,7 +43,7 @@ const (
// HandleBasic adds handlers to a mux for serving JSON etcd client requests
// that do not access the v2 store.
func HandleBasic(mux *http.ServeMux, server *etcdserver.EtcdServer) {
func HandleBasic(mux *http.ServeMux, server etcdserver.ServerPeer) {
mux.HandleFunc(varsPath, serveVars)
mux.HandleFunc(configPath+"/local/log", logHandleFunc)
HandleMetricsHealth(mux, server)

View File

@ -33,7 +33,7 @@ const (
)
// HandleMetricsHealth registers metrics and health handlers.
func HandleMetricsHealth(mux *http.ServeMux, srv *etcdserver.EtcdServer) {
func HandleMetricsHealth(mux *http.ServeMux, srv etcdserver.ServerV2) {
mux.Handle(pathMetrics, prometheus.Handler())
mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) }))
}
@ -44,7 +44,7 @@ func HandlePrometheus(mux *http.ServeMux) {
}
// HandleHealth registers health handler on '/health'.
func HandleHealth(mux *http.ServeMux, srv *etcdserver.EtcdServer) {
func HandleHealth(mux *http.ServeMux, srv etcdserver.ServerV2) {
mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) }))
}
@ -74,7 +74,7 @@ type Health struct {
Errors []string `json:"errors,omitempty"`
}
func checkHealth(srv *etcdserver.EtcdServer) Health {
func checkHealth(srv etcdserver.ServerV2) Health {
h := Health{Health: false}
as := srv.Alarms()

View File

@ -29,13 +29,8 @@ const (
)
// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler {
var lh http.Handler
l := s.Lessor()
if l != nil {
lh = leasehttp.NewHandler(l, func() <-chan struct{} { return s.ApplyWait() })
}
return newPeerHandler(s.Cluster(), s.RaftHandler(), lh)
func NewPeerHandler(s etcdserver.ServerPeer) http.Handler {
return newPeerHandler(s.Cluster(), s.RaftHandler(), s.LeaseHandler())
}
func newPeerHandler(cluster api.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler {

View File

@ -47,7 +47,6 @@ func (c *fakeCluster) Members() []*membership.Member {
return []*membership.Member(ms)
}
func (c *fakeCluster) Member(id types.ID) *membership.Member { return c.members[uint64(id)] }
func (c *fakeCluster) IsIDRemoved(id types.ID) bool { return false }
func (c *fakeCluster) Version() *semver.Version { return nil }
// TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that

View File

@ -50,22 +50,21 @@ const (
)
// NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http.Handler {
func NewClientHandler(server etcdserver.ServerPeer, timeout time.Duration) http.Handler {
mux := http.NewServeMux()
etcdhttp.HandleBasic(mux, server)
handleV2(mux, server, timeout)
return requestLogger(mux)
}
func handleV2(mux *http.ServeMux, server *etcdserver.EtcdServer, timeout time.Duration) {
func handleV2(mux *http.ServeMux, server etcdserver.ServerV2, timeout time.Duration) {
sec := auth.NewStore(server, timeout)
kh := &keysHandler{
sec: sec,
server: server,
cluster: server.Cluster(),
timer: server,
timeout: timeout,
clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled,
clientCertAuthEnabled: server.ClientCertAuthEnabled(),
}
sh := &statsHandler{
@ -78,7 +77,7 @@ func handleV2(mux *http.ServeMux, server *etcdserver.EtcdServer, timeout time.Du
cluster: server.Cluster(),
timeout: timeout,
clock: clockwork.NewRealClock(),
clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled,
clientCertAuthEnabled: server.ClientCertAuthEnabled(),
}
mah := &machinesHandler{cluster: server.Cluster()}
@ -86,7 +85,7 @@ func handleV2(mux *http.ServeMux, server *etcdserver.EtcdServer, timeout time.Du
sech := &authHandler{
sec: sec,
cluster: server.Cluster(),
clientCertAuthEnabled: server.Cfg.ClientCertAuthEnabled,
clientCertAuthEnabled: server.ClientCertAuthEnabled(),
}
mux.HandleFunc("/", http.NotFound)
mux.Handle(keysPrefix, kh)
@ -102,9 +101,8 @@ func handleV2(mux *http.ServeMux, server *etcdserver.EtcdServer, timeout time.Du
type keysHandler struct {
sec auth.Store
server etcdserver.Server
server etcdserver.ServerV2
cluster api.Cluster
timer etcdserver.RaftTimer
timeout time.Duration
clientCertAuthEnabled bool
}
@ -142,7 +140,7 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
switch {
case resp.Event != nil:
if err := writeKeyEvent(w, resp.Event, noValueOnSuccess, h.timer); err != nil {
if err := writeKeyEvent(w, resp, noValueOnSuccess); err != nil {
// Should never be reached
plog.Errorf("error writing event (%v)", err)
}
@ -150,7 +148,7 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case resp.Watcher != nil:
ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
defer cancel()
handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
handleKeyWatch(ctx, w, resp, rr.Stream)
default:
writeKeyError(w, errors.New("received response with no Event/Watcher!"))
}
@ -170,7 +168,7 @@ func (h *machinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
type membersHandler struct {
sec auth.Store
server etcdserver.Server
server etcdserver.ServerV2
cluster api.Cluster
timeout time.Duration
clock clockwork.Clock
@ -503,14 +501,15 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque
// writeKeyEvent trims the prefix of key path in a single Event under
// StoreKeysPrefix, serializes it and writes the resulting JSON to the given
// ResponseWriter, along with the appropriate headers.
func writeKeyEvent(w http.ResponseWriter, ev *store.Event, noValueOnSuccess bool, rt etcdserver.RaftTimer) error {
func writeKeyEvent(w http.ResponseWriter, resp etcdserver.Response, noValueOnSuccess bool) error {
ev := resp.Event
if ev == nil {
return errors.New("cannot write empty Event!")
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index))
w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term))
if ev.IsCreated() {
w.WriteHeader(http.StatusCreated)
@ -552,7 +551,8 @@ func writeKeyError(w http.ResponseWriter, err error) {
}
}
func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
func handleKeyWatch(ctx context.Context, w http.ResponseWriter, resp etcdserver.Response, stream bool) {
wa := resp.Watcher
defer wa.Remove()
ech := wa.EventChan()
var nch <-chan bool
@ -562,8 +562,8 @@ func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index))
w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term))
w.WriteHeader(http.StatusOK)
// Ensure headers are flushed early, in case of long polling

View File

@ -30,6 +30,7 @@ import (
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api"
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
@ -87,14 +88,26 @@ func mustNewMethodRequest(t *testing.T, m, p string) *http.Request {
}
}
type fakeServer struct {
dummyRaftTimer
dummyStats
}
func (s *fakeServer) Leader() types.ID { return types.ID(1) }
func (s *fakeServer) Alarms() []*etcdserverpb.AlarmMember { return nil }
func (s *fakeServer) Cluster() api.Cluster { return nil }
func (s *fakeServer) ClusterVersion() *semver.Version { return nil }
func (s *fakeServer) RaftHandler() http.Handler { return nil }
func (s *fakeServer) Do(ctx context.Context, r etcdserverpb.Request) (rr etcdserver.Response, err error) {
return
}
func (s *fakeServer) ClientCertAuthEnabled() bool { return false }
type serverRecorder struct {
fakeServer
actions []action
}
func (s *serverRecorder) Start() {}
func (s *serverRecorder) Stop() {}
func (s *serverRecorder) Leader() types.ID { return types.ID(1) }
func (s *serverRecorder) ID() types.ID { return types.ID(1) }
func (s *serverRecorder) Do(_ context.Context, r etcdserverpb.Request) (etcdserver.Response, error) {
s.actions = append(s.actions, action{name: "Do", params: []interface{}{r}})
return etcdserver.Response{}, nil
@ -117,8 +130,6 @@ func (s *serverRecorder) UpdateMember(_ context.Context, m membership.Member) ([
return nil, nil
}
func (s *serverRecorder) ClusterVersion() *semver.Version { return nil }
type action struct {
name string
params []interface{}
@ -138,13 +149,10 @@ func (fr *flushingRecorder) Flush() {
// resServer implements the etcd.Server interface for testing.
// It returns the given response from any Do calls, and nil error
type resServer struct {
fakeServer
res etcdserver.Response
}
func (rs *resServer) Start() {}
func (rs *resServer) Stop() {}
func (rs *resServer) ID() types.ID { return types.ID(1) }
func (rs *resServer) Leader() types.ID { return types.ID(1) }
func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) {
return rs.res, nil
}
@ -158,7 +166,6 @@ func (rs *resServer) RemoveMember(_ context.Context, _ uint64) ([]*membership.Me
func (rs *resServer) UpdateMember(_ context.Context, _ membership.Member) ([]*membership.Member, error) {
return nil, nil
}
func (rs *resServer) ClusterVersion() *semver.Version { return nil }
func boolp(b bool) *bool { return &b }
@ -874,7 +881,7 @@ func TestServeMembersUpdate(t *testing.T) {
func TestServeMembersFail(t *testing.T) {
tests := []struct {
req *http.Request
server etcdserver.Server
server etcdserver.ServerV2
wcode int
}{
@ -941,7 +948,7 @@ func TestServeMembersFail(t *testing.T) {
Header: map[string][]string{"Content-Type": {"application/json"}},
},
&errServer{
errors.New("Error while adding a member"),
err: errors.New("Error while adding a member"),
},
http.StatusInternalServerError,
@ -955,7 +962,7 @@ func TestServeMembersFail(t *testing.T) {
Header: map[string][]string{"Content-Type": {"application/json"}},
},
&errServer{
membership.ErrIDExists,
err: membership.ErrIDExists,
},
http.StatusConflict,
@ -969,7 +976,7 @@ func TestServeMembersFail(t *testing.T) {
Header: map[string][]string{"Content-Type": {"application/json"}},
},
&errServer{
membership.ErrPeerURLexists,
err: membership.ErrPeerURLexists,
},
http.StatusConflict,
@ -981,7 +988,7 @@ func TestServeMembersFail(t *testing.T) {
Method: "DELETE",
},
&errServer{
errors.New("Error while removing member"),
err: errors.New("Error while removing member"),
},
http.StatusInternalServerError,
@ -993,7 +1000,7 @@ func TestServeMembersFail(t *testing.T) {
Method: "DELETE",
},
&errServer{
membership.ErrIDRemoved,
err: membership.ErrIDRemoved,
},
http.StatusGone,
@ -1005,7 +1012,7 @@ func TestServeMembersFail(t *testing.T) {
Method: "DELETE",
},
&errServer{
membership.ErrIDNotFound,
err: membership.ErrIDNotFound,
},
http.StatusNotFound,
@ -1075,7 +1082,7 @@ func TestServeMembersFail(t *testing.T) {
Header: map[string][]string{"Content-Type": {"application/json"}},
},
&errServer{
errors.New("blah"),
err: errors.New("blah"),
},
http.StatusInternalServerError,
@ -1089,7 +1096,7 @@ func TestServeMembersFail(t *testing.T) {
Header: map[string][]string{"Content-Type": {"application/json"}},
},
&errServer{
membership.ErrPeerURLexists,
err: membership.ErrPeerURLexists,
},
http.StatusConflict,
@ -1103,7 +1110,7 @@ func TestServeMembersFail(t *testing.T) {
Header: map[string][]string{"Content-Type": {"application/json"}},
},
&errServer{
membership.ErrIDNotFound,
err: membership.ErrIDNotFound,
},
http.StatusNotFound,
@ -1153,7 +1160,7 @@ func TestServeMembersFail(t *testing.T) {
func TestWriteEvent(t *testing.T) {
// nil event should not panic
rec := httptest.NewRecorder()
writeKeyEvent(rec, nil, false, dummyRaftTimer{})
writeKeyEvent(rec, etcdserver.Response{}, false)
h := rec.Header()
if len(h) > 0 {
t.Fatalf("unexpected non-empty headers: %#v", h)
@ -1199,7 +1206,8 @@ func TestWriteEvent(t *testing.T) {
for i, tt := range tests {
rw := httptest.NewRecorder()
writeKeyEvent(rw, tt.ev, tt.noValue, dummyRaftTimer{})
resp := etcdserver.Response{Event: tt.ev, Term: 5, Index: 100}
writeKeyEvent(rw, resp, tt.noValue)
if gct := rw.Header().Get("Content-Type"); gct != "application/json" {
t.Errorf("case %d: bad Content-Type: got %q, want application/json", i, gct)
}
@ -1411,7 +1419,7 @@ func TestServeStoreStats(t *testing.T) {
func TestBadServeKeys(t *testing.T) {
testBadCases := []struct {
req *http.Request
server etcdserver.Server
server etcdserver.ServerV2
wcode int
wbody string
@ -1451,7 +1459,7 @@ func TestBadServeKeys(t *testing.T) {
// etcdserver.Server error
mustNewRequest(t, "foo"),
&errServer{
errors.New("Internal Server Error"),
err: errors.New("Internal Server Error"),
},
http.StatusInternalServerError,
@ -1461,7 +1469,7 @@ func TestBadServeKeys(t *testing.T) {
// etcdserver.Server etcd error
mustNewRequest(t, "foo"),
&errServer{
etcdErr.NewError(etcdErr.EcodeKeyNotFound, "/1/pant", 0),
err: etcdErr.NewError(etcdErr.EcodeKeyNotFound, "/1/pant", 0),
},
http.StatusNotFound,
@ -1471,7 +1479,7 @@ func TestBadServeKeys(t *testing.T) {
// non-event/watcher response from etcdserver.Server
mustNewRequest(t, "foo"),
&resServer{
etcdserver.Response{},
res: etcdserver.Response{},
},
http.StatusInternalServerError,
@ -1529,7 +1537,7 @@ func TestServeKeysGood(t *testing.T) {
},
}
server := &resServer{
etcdserver.Response{
res: etcdserver.Response{
Event: &store.Event{
Action: store.Get,
Node: &store.NodeExtern{},
@ -1540,7 +1548,6 @@ func TestServeKeysGood(t *testing.T) {
h := &keysHandler{
timeout: time.Hour,
server: server,
timer: &dummyRaftTimer{},
cluster: &fakeCluster{id: 1},
}
rw := httptest.NewRecorder()
@ -1597,7 +1604,6 @@ func TestServeKeysEvent(t *testing.T) {
timeout: time.Hour,
server: server,
cluster: &fakeCluster{id: 1},
timer: &dummyRaftTimer{},
}
for _, tt := range tests {
@ -1632,7 +1638,7 @@ func TestServeKeysWatch(t *testing.T) {
echan: ec,
}
server := &resServer{
etcdserver.Response{
res: etcdserver.Response{
Watcher: dw,
},
}
@ -1640,7 +1646,6 @@ func TestServeKeysWatch(t *testing.T) {
timeout: time.Hour,
server: server,
cluster: &fakeCluster{id: 1},
timer: &dummyRaftTimer{},
}
go func() {
ec <- &store.Event{
@ -1764,7 +1769,8 @@ func TestHandleWatch(t *testing.T) {
}
tt.doToChan(wa.echan)
handleKeyWatch(tt.getCtx(), rw, wa, false, dummyRaftTimer{})
resp := etcdserver.Response{Term: 5, Index: 100, Watcher: wa}
handleKeyWatch(tt.getCtx(), rw, resp, false)
wcode := http.StatusOK
wct := "application/json"
@ -1808,7 +1814,8 @@ func TestHandleWatchStreaming(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
handleKeyWatch(ctx, rw, wa, true, dummyRaftTimer{})
resp := etcdserver.Response{Watcher: wa}
handleKeyWatch(ctx, rw, resp, true)
close(done)
}()

View File

@ -48,19 +48,15 @@ func (c *fakeCluster) Members() []*membership.Member {
return []*membership.Member(ms)
}
func (c *fakeCluster) Member(id types.ID) *membership.Member { return c.members[uint64(id)] }
func (c *fakeCluster) IsIDRemoved(id types.ID) bool { return false }
func (c *fakeCluster) Version() *semver.Version { return nil }
// errServer implements the etcd.Server interface for testing.
// It returns the given error from any Do/Process/AddMember/RemoveMember calls.
type errServer struct {
err error
fakeServer
}
func (fs *errServer) Start() {}
func (fs *errServer) Stop() {}
func (fs *errServer) ID() types.ID { return types.ID(1) }
func (fs *errServer) Leader() types.ID { return types.ID(1) }
func (fs *errServer) Do(ctx context.Context, r etcdserverpb.Request) (etcdserver.Response, error) {
return etcdserver.Response{}, fs.err
}
@ -77,8 +73,6 @@ func (fs *errServer) UpdateMember(ctx context.Context, m membership.Member) ([]*
return nil, fs.err
}
func (fs *errServer) ClusterVersion() *semver.Version { return nil }
func TestWriteError(t *testing.T) {
// nil error should not panic
rec := httptest.NewRecorder()

View File

@ -46,8 +46,7 @@ type LeaderTransferrer interface {
}
type RaftStatusGetter interface {
Index() uint64
Term() uint64
etcdserver.RaftTimer
ID() types.ID
Leader() types.ID
}

View File

@ -27,16 +27,14 @@ import (
)
type ClusterServer struct {
cluster api.Cluster
server etcdserver.Server
raftTimer etcdserver.RaftTimer
cluster api.Cluster
server etcdserver.ServerV3
}
func NewClusterServer(s *etcdserver.EtcdServer) *ClusterServer {
func NewClusterServer(s etcdserver.ServerV3) *ClusterServer {
return &ClusterServer{
cluster: s.Cluster(),
server: s,
raftTimer: s,
cluster: s.Cluster(),
server: s,
}
}
@ -86,7 +84,7 @@ func (cs *ClusterServer) MemberList(ctx context.Context, r *pb.MemberListRequest
}
func (cs *ClusterServer) header() *pb.ResponseHeader {
return &pb.ResponseHeader{ClusterId: uint64(cs.cluster.ID()), MemberId: uint64(cs.server.ID()), RaftTerm: cs.raftTimer.Term()}
return &pb.ResponseHeader{ClusterId: uint64(cs.cluster.ID()), MemberId: uint64(cs.server.ID()), RaftTerm: cs.server.Term()}
}
func membersToProtoMembers(membs []*membership.Member) []*pb.Member {

View File

@ -38,6 +38,7 @@ import (
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/pkg/fileutil"
@ -108,29 +109,33 @@ func init() {
}
type Response struct {
Term uint64
Index uint64
Event *store.Event
Watcher store.Watcher
err error
}
type Server interface {
// Start performs any initialization of the Server necessary for it to
// begin serving requests. It must be called before Do or Process.
// Start must be non-blocking; any long-running server functionality
// should be implemented in goroutines.
Start()
// Stop terminates the Server and performs any necessary finalization.
// Do and Process cannot be called after Stop has been invoked.
Stop()
// ID returns the ID of the Server.
type ServerV2 interface {
Server
// Do takes a V2 request and attempts to fulfill it, returning a Response.
Do(ctx context.Context, r pb.Request) (Response, error)
stats.Stats
ClientCertAuthEnabled() bool
}
type ServerV3 interface {
Server
ID() types.ID
RaftTimer
}
func (s *EtcdServer) ClientCertAuthEnabled() bool { return s.Cfg.ClientCertAuthEnabled }
type Server interface {
// Leader returns the ID of the leader Server.
Leader() types.ID
// Do takes a request and attempts to fulfill it, returning a Response.
Do(ctx context.Context, r pb.Request) (Response, error)
// Process takes a raft message and applies it to the server's raft state
// machine, respecting any timeout of the given context.
Process(ctx context.Context, m raftpb.Message) error
// AddMember attempts to add a member into the cluster. It will return
// ErrIDRemoved if member ID is removed from the cluster, or return
// ErrIDExists if member ID exists in the cluster.
@ -139,7 +144,6 @@ type Server interface {
// return ErrIDRemoved if member ID is removed from the cluster, or return
// ErrIDNotFound if member ID is not in the cluster.
RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
// UpdateMember attempts to update an existing member in the cluster. It will
// return ErrIDNotFound if the member ID does not exist.
UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
@ -159,6 +163,8 @@ type Server interface {
// the leader is etcd 2.0. etcd 2.0 leader will not update clusterVersion since
// this feature is introduced post 2.0.
ClusterVersion() *semver.Version
Cluster() api.Cluster
Alarms() []*pb.AlarmMember
}
// EtcdServer is the production implementation of the Server interface
@ -514,9 +520,10 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
return srv, nil
}
// Start prepares and starts server in a new goroutine. It is no longer safe to
// modify a server's fields after it has been sent to Start.
// It also starts a goroutine to publish its server information.
// Start performs any initialization of the Server necessary for it to
// begin serving requests. It must be called before Do or Process.
// Start must be non-blocking; any long-running server functionality
// should be implemented in goroutines.
func (s *EtcdServer) Start() {
s.start()
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
@ -576,14 +583,27 @@ func (s *EtcdServer) purgeFile() {
func (s *EtcdServer) ID() types.ID { return s.id }
func (s *EtcdServer) Cluster() *membership.RaftCluster { return s.cluster }
func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor }
func (s *EtcdServer) Cluster() api.Cluster { return s.cluster }
func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }
type ServerPeer interface {
ServerV2
RaftHandler() http.Handler
LeaseHandler() http.Handler
}
func (s *EtcdServer) LeaseHandler() http.Handler {
if s.lessor == nil {
return nil
}
return leasehttp.NewHandler(s.lessor, s.ApplyWait)
}
func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
// Process takes a raft message and applies it to the server's raft state
// machine, respecting any timeout of the given context.
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.cluster.IsIDRemoved(types.ID(m.From)) {
plog.Warningf("reject message from removed member %s", types.ID(m.From).String())
@ -992,6 +1012,8 @@ func (s *EtcdServer) HardStop() {
// Stop should be called after a Start(s), otherwise it will block forever.
// When stopping leader, Stop transfers its leadership to one of its peers
// before stopping the server.
// Stop terminates the Server and performs any necessary finalization.
// Do and Process cannot be called after Stop has been invoked.
func (s *EtcdServer) Stop() {
if err := s.TransferLeadership(); err != nil {
plog.Warningf("%s failed to transfer leadership (%v)", s.ID(), err)

View File

@ -701,7 +701,8 @@ func TestDoProposal(t *testing.T) {
if err != nil {
t.Fatalf("#%d: err = %v, want nil", i, err)
}
wresp := Response{Event: &store.Event{}}
// resp.Index is set in Do() based on the raft state; may either be 0 or 1
wresp := Response{Event: &store.Event{}, Index: resp.Index}
if !reflect.DeepEqual(resp, wresp) {
t.Errorf("#%d: resp = %v, want %v", i, resp, wresp)
}

View File

@ -96,12 +96,18 @@ func (a *v2apiStore) Head(ctx context.Context, r *pb.Request) (Response, error)
return Response{Event: ev}, nil
}
// Do interprets r and performs an operation on s.store according to r.Method
func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
resp, err := s.do(ctx, r)
resp.Term, resp.Index = s.Term(), s.Index()
return resp, err
}
// do interprets r and performs an operation on s.store according to r.Method
// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with
// Quorum == true, r will be sent through consensus before performing its
// respective operation. Do will block until an action is performed or there is
// an error.
func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
func (s *EtcdServer) do(ctx context.Context, r pb.Request) (Response, error) {
r.ID = s.reqIDGen.Next()
if r.Method == "GET" && r.Quorum {
r.Method = "QGET"