etcd/server/server.go

367 lines
10 KiB
Go
Raw Normal View History

2013-10-11 08:42:45 +04:00
package server
import (
"encoding/json"
"fmt"
2013-10-11 08:42:45 +04:00
"net/http"
"net/http/pprof"
2013-10-13 01:56:43 +04:00
"strings"
2013-10-13 10:29:58 +04:00
"time"
2013-10-11 11:02:38 +04:00
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
2013-10-13 01:56:43 +04:00
etcdErr "github.com/coreos/etcd/error"
ehttp "github.com/coreos/etcd/http"
2013-10-13 01:56:43 +04:00
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/metrics"
2013-10-17 05:59:25 +04:00
"github.com/coreos/etcd/mod"
uhttp "github.com/coreos/etcd/pkg/http"
2013-10-13 01:56:43 +04:00
"github.com/coreos/etcd/server/v1"
2013-10-13 10:29:58 +04:00
"github.com/coreos/etcd/server/v2"
2013-10-13 01:56:43 +04:00
"github.com/coreos/etcd/store"
2013-10-27 22:47:00 +04:00
_ "github.com/coreos/etcd/store/v2"
2013-10-11 08:42:45 +04:00
)
2013-10-11 10:07:22 +04:00
// This is the default implementation of the Server interface.
2013-10-13 01:56:43 +04:00
type Server struct {
Name string
url string
handler http.Handler
peerServer *PeerServer
registry *Registry
store store.Store
metrics *metrics.Bucket
trace bool
2013-10-11 08:42:45 +04:00
}
// Creates a new Server.
func New(name, url string, peerServer *PeerServer, registry *Registry, store store.Store, mb *metrics.Bucket) *Server {
2013-10-13 01:56:43 +04:00
s := &Server{
Name: name,
url: url,
store: store,
registry: registry,
peerServer: peerServer,
metrics: mb,
2013-10-11 08:42:45 +04:00
}
return s
}
func (s *Server) EnableTracing() {
s.trace = true
}
2013-10-13 10:29:58 +04:00
// The current state of the server in the cluster.
func (s *Server) State() string {
2013-10-14 23:05:55 +04:00
return s.peerServer.RaftServer().State()
2013-10-13 10:29:58 +04:00
}
// The node name of the leader in the cluster.
func (s *Server) Leader() string {
2013-10-14 23:05:55 +04:00
return s.peerServer.RaftServer().Leader()
2013-10-13 10:29:58 +04:00
}
2013-10-11 10:07:22 +04:00
// The current Raft committed index.
2013-10-13 01:56:43 +04:00
func (s *Server) CommitIndex() uint64 {
2013-10-14 23:05:55 +04:00
return s.peerServer.RaftServer().CommitIndex()
2013-10-11 10:07:22 +04:00
}
// The current Raft term.
2013-10-13 01:56:43 +04:00
func (s *Server) Term() uint64 {
2013-10-14 23:05:55 +04:00
return s.peerServer.RaftServer().Term()
2013-10-12 23:35:23 +04:00
}
// The server URL.
2013-10-13 01:56:43 +04:00
func (s *Server) URL() string {
return s.url
2013-10-11 10:07:22 +04:00
}
// PeerHost retrieves the host part of Peer URL for a given node name.
func (s *Server) PeerHost(name string) (string, bool) {
return s.registry.PeerHost(name)
}
2013-10-13 10:29:58 +04:00
// Retrives the Peer URL for a given node name.
func (s *Server) PeerURL(name string) (string, bool) {
return s.registry.PeerURL(name)
}
// ClientURL retrieves the Client URL for a given node name.
func (s *Server) ClientURL(name string) (string, bool) {
return s.registry.ClientURL(name)
}
2013-10-13 01:56:43 +04:00
// Returns a reference to the Store.
2013-10-14 21:12:30 +04:00
func (s *Server) Store() store.Store {
2013-10-13 01:56:43 +04:00
return s.store
}
func (s *Server) SetRegistry(registry *Registry) {
s.registry = registry
}
func (s *Server) SetStore(store store.Store) {
s.store = store
}
func (s *Server) installV1(r *mux.Router) {
s.handleFuncV1(r, "/v1/keys/{key:.*}", v1.GetKeyHandler).Methods("GET", "HEAD")
s.handleFuncV1(r, "/v1/keys/{key:.*}", v1.SetKeyHandler).Methods("POST", "PUT")
s.handleFuncV1(r, "/v1/keys/{key:.*}", v1.DeleteKeyHandler).Methods("DELETE")
2014-04-09 04:37:05 +04:00
s.handleFuncV1(r, "/v1/watch/{key:.*}", v1.WatchKeyHandler).Methods("GET", "HEAD", "POST")
s.handleFunc(r, "/v1/leader", s.GetLeaderHandler).Methods("GET", "HEAD")
s.handleFunc(r, "/v1/machines", s.GetPeersHandler).Methods("GET", "HEAD")
s.handleFunc(r, "/v1/peers", s.GetPeersHandler).Methods("GET", "HEAD")
s.handleFunc(r, "/v1/stats/self", s.GetStatsHandler).Methods("GET", "HEAD")
s.handleFunc(r, "/v1/stats/leader", s.GetLeaderStatsHandler).Methods("GET", "HEAD")
s.handleFunc(r, "/v1/stats/store", s.GetStoreStatsHandler).Methods("GET", "HEAD")
}
func (s *Server) installV2(r *mux.Router) {
r2 := mux.NewRouter()
r.PathPrefix("/v2").Handler(ehttp.NewLowerQueryParamsHandler(r2))
s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.GetHandler).Methods("GET", "HEAD")
s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.PostHandler).Methods("POST")
s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.PutHandler).Methods("PUT")
s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.DeleteHandler).Methods("DELETE")
s.handleFunc(r2, "/v2/leader", s.GetLeaderHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/machines", s.GetPeersHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/peers", s.GetPeersHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/stats/self", s.GetStatsHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/stats/leader", s.GetLeaderStatsHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/stats/store", s.GetStoreStatsHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/speedTest", s.SpeedTestHandler).Methods("GET", "HEAD")
}
func (s *Server) installMod(r *mux.Router) {
r.PathPrefix("/mod").Handler(http.StripPrefix("/mod", mod.HttpHandler(s.URL())))
2013-10-17 05:59:25 +04:00
}
func (s *Server) installDebug(r *mux.Router) {
s.handleFunc(r, "/debug/metrics", s.GetMetricsHandler).Methods("GET", "HEAD")
r.HandleFunc("/debug/pprof", pprof.Index)
r.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
r.HandleFunc("/debug/pprof/profile", pprof.Profile)
r.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
r.HandleFunc("/debug/pprof/{name}", pprof.Index)
}
2013-10-13 01:56:43 +04:00
// Adds a v1 server handler to the router.
func (s *Server) handleFuncV1(r *mux.Router, path string, f func(http.ResponseWriter, *http.Request, v1.Server) error) *mux.Route {
return s.handleFunc(r, path, func(w http.ResponseWriter, req *http.Request) error {
2013-10-13 01:56:43 +04:00
return f(w, req, s)
})
2013-10-11 10:07:22 +04:00
}
2013-10-13 10:29:58 +04:00
// Adds a v2 server handler to the router.
func (s *Server) handleFuncV2(r *mux.Router, path string, f func(http.ResponseWriter, *http.Request, v2.Server) error) *mux.Route {
return s.handleFunc(r, path, func(w http.ResponseWriter, req *http.Request) error {
2013-10-13 10:29:58 +04:00
return f(w, req, s)
})
}
type HEADResponseWriter struct {
http.ResponseWriter
}
func (w *HEADResponseWriter) Write([]byte) (int, error) {
return 0, nil
}
2013-10-11 08:42:45 +04:00
// Adds a server handler to the router.
func (s *Server) handleFunc(r *mux.Router, path string, f func(http.ResponseWriter, *http.Request) error) *mux.Route {
2013-10-11 08:42:45 +04:00
// Wrap the standard HandleFunc interface to pass in the server reference.
return r.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) {
if req.Method == "HEAD" {
w = &HEADResponseWriter{w}
}
2013-10-11 10:07:22 +04:00
// Log request.
log.Debugf("[recv] %s %s %s [%s]", req.Method, s.URL(), req.URL.Path, req.RemoteAddr)
2013-10-11 10:07:22 +04:00
2013-10-11 08:42:45 +04:00
// Execute handler function and return error if necessary.
2013-10-13 09:39:34 +04:00
if err := f(w, req); err != nil {
2013-10-11 08:42:45 +04:00
if etcdErr, ok := err.(*etcdErr.Error); ok {
2013-10-13 01:56:43 +04:00
log.Debug("Return error: ", (*etcdErr).Error())
w.Header().Set("Content-Type", "application/json")
2013-10-11 08:42:45 +04:00
etcdErr.Write(w)
} else {
2013-10-13 01:56:43 +04:00
http.Error(w, err.Error(), http.StatusInternalServerError)
2013-10-11 08:42:45 +04:00
}
}
})
}
func (s *Server) HTTPHandler() http.Handler {
router := mux.NewRouter()
// Install the routes.
s.handleFunc(router, "/version", s.GetVersionHandler).Methods("GET")
s.installV1(router)
s.installV2(router)
// Mod is deprecated temporariy due to its unstable state.
// It would be added back later.
// s.installMod(router)
if s.trace {
s.installDebug(router)
}
return router
2013-10-18 04:11:11 +04:00
}
// Dispatch command to the current leader
2013-10-13 01:56:43 +04:00
func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
ps := s.peerServer
if ps.raftServer.State() == raft.Leader {
result, err := ps.raftServer.Do(c)
if err != nil {
return err
}
if result == nil {
return etcdErr.NewError(300, "Empty result from raft", s.Store().Index())
}
// response for raft related commands[join/remove]
if b, ok := result.([]byte); ok {
w.WriteHeader(http.StatusOK)
w.Write(b)
return nil
}
var b []byte
if strings.HasPrefix(req.URL.Path, "/v1") {
2013-12-11 23:12:39 +04:00
b, _ = json.Marshal(result.(*store.Event).Response(0))
w.WriteHeader(http.StatusOK)
} else {
e, _ := result.(*store.Event)
b, _ = json.Marshal(e)
w.Header().Set("Content-Type", "application/json")
// etcd index should be the same as the event index
// which is also the last modified index of the node
2013-12-12 21:56:28 +04:00
w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index()))
w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
if e.IsCreated() {
w.WriteHeader(http.StatusCreated)
} else {
w.WriteHeader(http.StatusOK)
}
}
w.Write(b)
return nil
2014-02-19 00:29:18 +04:00
}
2014-02-19 00:29:18 +04:00
leader := ps.raftServer.Leader()
if leader == "" {
return etcdErr.NewError(300, "", s.Store().Index())
}
2014-02-19 00:29:18 +04:00
var url string
switch c.(type) {
case *JoinCommand, *RemoveCommand,
*SetClusterConfigCommand:
2014-02-19 00:29:18 +04:00
url, _ = ps.registry.PeerURL(leader)
default:
url, _ = ps.registry.ClientURL(leader)
}
2014-02-19 00:29:18 +04:00
uhttp.Redirect(url, w, req)
return nil
2013-10-11 08:42:45 +04:00
}
2013-10-13 10:29:58 +04:00
// Handler to return the current version of etcd.
func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) error {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "etcd %s", ReleaseVersion)
2013-10-13 10:29:58 +04:00
return nil
}
2013-10-13 09:39:34 +04:00
// Handler to return the current leader's raft address
func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error {
2013-10-14 23:05:55 +04:00
leader := s.peerServer.RaftServer().Leader()
2013-10-13 09:39:34 +04:00
if leader == "" {
return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", s.Store().Index())
2013-10-13 09:39:34 +04:00
}
w.WriteHeader(http.StatusOK)
url, _ := s.registry.PeerURL(leader)
w.Write([]byte(url))
return nil
}
2013-11-15 07:58:47 +04:00
// Handler to return all the known peers in the current cluster.
func (s *Server) GetPeersHandler(w http.ResponseWriter, req *http.Request) error {
peers := s.registry.ClientURLs(s.peerServer.RaftServer().Leader(), s.Name)
w.WriteHeader(http.StatusOK)
2013-11-15 07:58:47 +04:00
w.Write([]byte(strings.Join(peers, ", ")))
return nil
2013-10-13 09:39:34 +04:00
}
// Retrieves stats on the Raft server.
func (s *Server) GetStatsHandler(w http.ResponseWriter, req *http.Request) error {
w.Header().Set("Content-Type", "application/json")
w.Write(s.peerServer.Stats())
return nil
2013-10-13 09:39:34 +04:00
}
// Retrieves stats on the leader.
func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) error {
2013-10-14 23:05:55 +04:00
if s.peerServer.RaftServer().State() == raft.Leader {
w.Header().Set("Content-Type", "application/json")
w.Write(s.peerServer.PeerStats())
return nil
2013-10-13 09:39:34 +04:00
}
w.WriteHeader(http.StatusForbidden)
w.Write([]byte("not current leader"))
2013-10-13 09:39:34 +04:00
return nil
}
// Retrieves stats on the leader.
func (s *Server) GetStoreStatsHandler(w http.ResponseWriter, req *http.Request) error {
w.Header().Set("Content-Type", "application/json")
w.Write(s.store.JsonStats())
return nil
2013-10-13 09:39:34 +04:00
}
2013-10-13 10:29:58 +04:00
// Executes a speed test to evaluate the performance of update replication.
func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) error {
count := 1000
c := make(chan bool, count)
for i := 0; i < count; i++ {
go func() {
for j := 0; j < 10; j++ {
2013-12-06 02:10:37 +04:00
c := s.Store().CommandFactory().CreateSetCommand("foo", false, "bar", time.Unix(0, 0))
2013-10-14 23:05:55 +04:00
s.peerServer.RaftServer().Do(c)
2013-10-13 10:29:58 +04:00
}
c <- true
}()
}
for i := 0; i < count; i++ {
<-c
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("speed test success"))
return nil
}
// Retrieves metrics from bucket
func (s *Server) GetMetricsHandler(w http.ResponseWriter, req *http.Request) error {
(*s.metrics).Dump(w)
return nil
}