Merge pull request #1071 from jonboulle/serve_mux

etcdserver/etcdhttp: switch to using http.ServeMux
release-2.0
Jonathan Boulle 2014-09-15 16:05:58 -07:00
commit 699bc50365
4 changed files with 40 additions and 50 deletions

View File

@ -28,57 +28,56 @@ import (
const (
keysPrefix = "/v2/keys"
machinesPrefix = "/v2/machines"
raftPrefix = "/raft"
DefaultTimeout = 500 * time.Millisecond
)
var errClosed = errors.New("etcdhttp: client closed connection")
// Handler implements the http.Handler interface and serves etcd client and
// raft communication.
type Handler struct {
Timeout time.Duration
Server etcdserver.Server
// NewHandler generates a muxed http.Handler with the given parameters.
func NewHandler(server etcdserver.Server, peers Peers, timeout time.Duration) http.Handler {
sh := &serverHandler{
timeout: timeout,
server: server,
peers: peers,
}
if sh.timeout == 0 {
sh.timeout = DefaultTimeout
}
mux := http.NewServeMux()
mux.HandleFunc(raftPrefix, sh.serveRaft)
mux.HandleFunc(keysPrefix, sh.serveKeys)
mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
// TODO: dynamic configuration may make this outdated. take care of it.
// TODO: dynamic configuration may introduce race also.
Peers Peers
mux.HandleFunc(machinesPrefix, sh.serveMachines)
mux.HandleFunc("/", http.NotFound)
return mux
}
func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// TODO: set read/write timeout?
timeout := h.Timeout
if timeout == 0 {
timeout = DefaultTimeout
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
switch {
case strings.HasPrefix(r.URL.Path, "/raft"):
h.serveRaft(ctx, w, r)
case strings.HasPrefix(r.URL.Path, keysPrefix):
h.serveKeys(ctx, w, r)
case strings.HasPrefix(r.URL.Path, machinesPrefix):
h.serveMachines(w, r)
default:
http.NotFound(w, r)
}
// serverHandler provides http.Handlers for etcd client and raft communication.
type serverHandler struct {
timeout time.Duration
server etcdserver.Server
peers Peers
}
func (h Handler) serveKeys(ctx context.Context, w http.ResponseWriter, r *http.Request) {
func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
return
}
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
defer cancel()
rr, err := parseRequest(r, genID())
if err != nil {
writeError(w, err)
return
}
resp, err := h.Server.Do(ctx, rr)
resp, err := h.server.Do(ctx, rr)
if err != nil {
writeError(w, err)
return
@ -106,18 +105,19 @@ func (h Handler) serveKeys(ctx context.Context, w http.ResponseWriter, r *http.R
// serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
// TODO: rethink the format of machine list because it is not json format.
func (h Handler) serveMachines(w http.ResponseWriter, r *http.Request) {
func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET", "HEAD") {
return
}
endpoints := h.Peers.Endpoints()
endpoints := h.peers.Endpoints()
w.Write([]byte(strings.Join(endpoints, ", ")))
}
func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.Request) {
func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "POST") {
return
}
b, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println("etcdhttp: error reading raft message:", err)
@ -127,7 +127,7 @@ func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.R
log.Println("etcdhttp: error unmarshaling raft message:", err)
}
log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
if err := h.Server.Process(ctx, m); err != nil {
if err := h.server.Process(context.TODO(), m); err != nil {
log.Println("etcdhttp: error processing raft message:", err)
writeError(w, err)
return

View File

@ -10,6 +10,7 @@ import (
"strings"
"sync"
"testing"
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
@ -587,8 +588,8 @@ func TestV2MachinesEndpoint(t *testing.T) {
{"POST", http.StatusMethodNotAllowed},
}
h := Handler{Peers: Peers{}}
s := httptest.NewServer(h)
m := NewHandler(nil, Peers{}, time.Hour)
s := httptest.NewServer(m)
defer s.Close()
for _, tt := range tests {
@ -610,13 +611,13 @@ func TestV2MachinesEndpoint(t *testing.T) {
func TestServeMachines(t *testing.T) {
peers := Peers{}
peers.Set("0xBEEF0=localhost:8080&0xBEEF1=localhost:8081&0xBEEF2=localhost:8082")
h := Handler{Peers: peers}
writer := httptest.NewRecorder()
req, err := http.NewRequest("GET", "", nil)
if err != nil {
t.Fatal(err)
}
h := &serverHandler{peers: peers}
h.serveMachines(writer, req)
w := "http://localhost:8080, http://localhost:8081, http://localhost:8082"
if g := writer.Body.String(); g != w {

View File

@ -36,11 +36,7 @@ func TestSet(t *testing.T) {
srv.Start()
defer srv.Stop()
h := etcdhttp.Handler{
Timeout: time.Hour,
Server: srv,
}
h := etcdhttp.NewHandler(srv, nil, time.Hour)
s := httptest.NewServer(h)
defer s.Close()
@ -50,7 +46,7 @@ func TestSet(t *testing.T) {
}
if resp.StatusCode != 201 {
t.Errorf("StatusCode = %d, expected %d", 201, resp.StatusCode)
t.Errorf("StatusCode = %d, expected %d", resp.StatusCode, 201)
}
g := new(store.Event)

View File

@ -83,14 +83,7 @@ func startEtcd() http.Handler {
Ticker: time.Tick(100 * time.Millisecond),
}
s.Start()
h := etcdhttp.Handler{
Timeout: *timeout,
Server: s,
Peers: *peers,
}
return &h
return etcdhttp.NewHandler(s, *peers, *timeout)
}
// startRaft starts a raft node from the given wal dir.