diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index 1806cf0ab..618421c66 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -28,57 +28,56 @@ import ( const ( keysPrefix = "/v2/keys" machinesPrefix = "/v2/machines" + raftPrefix = "/raft" DefaultTimeout = 500 * time.Millisecond ) var errClosed = errors.New("etcdhttp: client closed connection") -// Handler implements the http.Handler interface and serves etcd client and -// raft communication. -type Handler struct { - Timeout time.Duration - Server etcdserver.Server +// NewHandler generates a muxed http.Handler with the given parameters. +func NewHandler(server etcdserver.Server, peers Peers, timeout time.Duration) http.Handler { + sh := &serverHandler{ + timeout: timeout, + server: server, + peers: peers, + } + if sh.timeout == 0 { + sh.timeout = DefaultTimeout + } + mux := http.NewServeMux() + mux.HandleFunc(raftPrefix, sh.serveRaft) + mux.HandleFunc(keysPrefix, sh.serveKeys) + mux.HandleFunc(keysPrefix+"/", sh.serveKeys) // TODO: dynamic configuration may make this outdated. take care of it. // TODO: dynamic configuration may introduce race also. - Peers Peers + mux.HandleFunc(machinesPrefix, sh.serveMachines) + mux.HandleFunc("/", http.NotFound) + return mux } -func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // TODO: set read/write timeout? - - timeout := h.Timeout - if timeout == 0 { - timeout = DefaultTimeout - } - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - switch { - case strings.HasPrefix(r.URL.Path, "/raft"): - h.serveRaft(ctx, w, r) - case strings.HasPrefix(r.URL.Path, keysPrefix): - h.serveKeys(ctx, w, r) - case strings.HasPrefix(r.URL.Path, machinesPrefix): - h.serveMachines(w, r) - default: - http.NotFound(w, r) - } +// serverHandler provides http.Handlers for etcd client and raft communication. +type serverHandler struct { + timeout time.Duration + server etcdserver.Server + peers Peers } -func (h Handler) serveKeys(ctx context.Context, w http.ResponseWriter, r *http.Request) { +func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") { return } + ctx, cancel := context.WithTimeout(context.Background(), h.timeout) + defer cancel() + rr, err := parseRequest(r, genID()) if err != nil { writeError(w, err) return } - resp, err := h.Server.Do(ctx, rr) + resp, err := h.server.Do(ctx, rr) if err != nil { writeError(w, err) return @@ -106,18 +105,19 @@ func (h Handler) serveKeys(ctx context.Context, w http.ResponseWriter, r *http.R // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'. // TODO: rethink the format of machine list because it is not json format. -func (h Handler) serveMachines(w http.ResponseWriter, r *http.Request) { +func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET", "HEAD") { return } - endpoints := h.Peers.Endpoints() + endpoints := h.peers.Endpoints() w.Write([]byte(strings.Join(endpoints, ", "))) } -func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.Request) { +func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "POST") { return } + b, err := ioutil.ReadAll(r.Body) if err != nil { log.Println("etcdhttp: error reading raft message:", err) @@ -127,7 +127,7 @@ func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.R log.Println("etcdhttp: error unmarshaling raft message:", err) } log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m) - if err := h.Server.Process(ctx, m); err != nil { + if err := h.server.Process(context.TODO(), m); err != nil { log.Println("etcdhttp: error processing raft message:", err) writeError(w, err) return diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 19cafccc8..dffff09dd 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -10,6 +10,7 @@ import ( "strings" "sync" "testing" + "time" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -587,8 +588,8 @@ func TestV2MachinesEndpoint(t *testing.T) { {"POST", http.StatusMethodNotAllowed}, } - h := Handler{Peers: Peers{}} - s := httptest.NewServer(h) + m := NewHandler(nil, Peers{}, time.Hour) + s := httptest.NewServer(m) defer s.Close() for _, tt := range tests { @@ -610,13 +611,13 @@ func TestV2MachinesEndpoint(t *testing.T) { func TestServeMachines(t *testing.T) { peers := Peers{} peers.Set("0xBEEF0=localhost:8080&0xBEEF1=localhost:8081&0xBEEF2=localhost:8082") - h := Handler{Peers: peers} writer := httptest.NewRecorder() req, err := http.NewRequest("GET", "", nil) if err != nil { t.Fatal(err) } + h := &serverHandler{peers: peers} h.serveMachines(writer, req) w := "http://localhost:8080, http://localhost:8081, http://localhost:8082" if g := writer.Body.String(); g != w { diff --git a/functional/http_functional_test.go b/functional/http_functional_test.go index 663d1dfc3..f3c6751e2 100644 --- a/functional/http_functional_test.go +++ b/functional/http_functional_test.go @@ -36,11 +36,7 @@ func TestSet(t *testing.T) { srv.Start() defer srv.Stop() - h := etcdhttp.Handler{ - Timeout: time.Hour, - Server: srv, - } - + h := etcdhttp.NewHandler(srv, nil, time.Hour) s := httptest.NewServer(h) defer s.Close() @@ -50,7 +46,7 @@ func TestSet(t *testing.T) { } if resp.StatusCode != 201 { - t.Errorf("StatusCode = %d, expected %d", 201, resp.StatusCode) + t.Errorf("StatusCode = %d, expected %d", resp.StatusCode, 201) } g := new(store.Event) diff --git a/main.go b/main.go index 8526615e7..cb4323be4 100644 --- a/main.go +++ b/main.go @@ -83,14 +83,7 @@ func startEtcd() http.Handler { Ticker: time.Tick(100 * time.Millisecond), } s.Start() - - h := etcdhttp.Handler{ - Timeout: *timeout, - Server: s, - Peers: *peers, - } - - return &h + return etcdhttp.NewHandler(s, *peers, *timeout) } // startRaft starts a raft node from the given wal dir.