etcdhttp: add PUT and DELETE on /v2/admin/members/

release-2.0
Yicheng Qin 2014-10-13 21:06:03 -07:00
parent 09f9884c6a
commit 31264e7eb5
3 changed files with 223 additions and 4 deletions

View File

@ -24,6 +24,7 @@ const (
keysPrefix = "/v2/keys"
membersPrefix = "/v2/members"
deprecatedMachinesPrefix = "/v2/machines"
adminMembersPrefix = "/v2/admin/members/"
raftPrefix = "/raft"
// time to wait for response from EtcdServer requests
@ -51,6 +52,7 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
// TODO: add serveMembers
mux.HandleFunc(membersPrefix, sh.serveMachines)
mux.HandleFunc(deprecatedMachinesPrefix, sh.serveMachines)
mux.HandleFunc(adminMembersPrefix, sh.serveAdminMembers)
mux.HandleFunc("/", http.NotFound)
return mux
}
@ -118,6 +120,47 @@ func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(strings.Join(endpoints, ", ")))
}
func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "PUT", "DELETE") {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout)
defer cancel()
idStr := strings.TrimPrefix(r.URL.Path, adminMembersPrefix)
id, err := strconv.ParseUint(idStr, 16, 64)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
switch r.Method {
case "PUT":
r.ParseForm()
peerURLs := r.PostForm["PeerURLs"]
log.Printf("etcdhttp: add node %x with peer urls %v", id, peerURLs)
m := etcdserver.Member{
ID: id,
RaftAttributes: etcdserver.RaftAttributes{
PeerURLs: peerURLs,
},
}
if err := h.server.AddMember(ctx, m); err != nil {
log.Printf("etcdhttp: error adding node %x: %v", id, err)
writeError(w, err)
return
}
w.WriteHeader(http.StatusCreated)
case "DELETE":
log.Printf("etcdhttp: remove node %x", id)
if err := h.server.RemoveMember(ctx, id); err != nil {
log.Printf("etcdhttp: error removing node %x: %v", id, err)
writeError(w, err)
return
}
w.WriteHeader(http.StatusNoContent)
}
}
func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "POST") {
return

View File

@ -703,7 +703,7 @@ func TestAllowMethod(t *testing.T) {
}
// errServer implements the etcd.Server interface for testing.
// It returns the given error from any Do/Process calls.
// It returns the given error from any Do/Process/AddMember/RemoveMember calls.
type errServer struct {
err error
}
@ -716,6 +716,12 @@ func (fs *errServer) Process(ctx context.Context, m raftpb.Message) error {
}
func (fs *errServer) Start() {}
func (fs *errServer) Stop() {}
func (fs *errServer) AddMember(ctx context.Context, m etcdserver.Member) error {
return fs.err
}
func (fs *errServer) RemoveMember(ctx context.Context, id uint64) error {
return fs.err
}
// errReader implements io.Reader to facilitate a broken request.
type errReader struct{}
@ -839,9 +845,11 @@ type resServer struct {
func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) {
return rs.res, nil
}
func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil }
func (rs *resServer) Start() {}
func (rs *resServer) Stop() {}
func (rs *resServer) Process(_ context.Context, _ raftpb.Message) error { return nil }
func (rs *resServer) Start() {}
func (rs *resServer) Stop() {}
func (rs *resServer) AddMember(_ context.Context, _ etcdserver.Member) error { return nil }
func (rs *resServer) RemoveMember(_ context.Context, _ uint64) error { return nil }
func mustMarshalEvent(t *testing.T, ev *store.Event) string {
b := new(bytes.Buffer)
@ -1234,6 +1242,170 @@ func TestHandleWatchStreaming(t *testing.T) {
}
}
func TestServeAdminMembersFail(t *testing.T) {
tests := []struct {
req *http.Request
server etcdserver.Server
wcode int
}{
{
// bad method
&http.Request{
Method: "CONNECT",
},
&resServer{},
http.StatusMethodNotAllowed,
},
{
// bad method
&http.Request{
Method: "TRACE",
},
&resServer{},
http.StatusMethodNotAllowed,
},
{
// parse id error
&http.Request{
URL: mustNewURL(t, path.Join(adminMembersPrefix, "badID")),
Method: "PUT",
},
&resServer{},
http.StatusBadRequest,
},
{
// etcdserver.AddMember error
&http.Request{
URL: mustNewURL(t, path.Join(adminMembersPrefix, "1")),
Method: "PUT",
},
&errServer{
errors.New("blah"),
},
http.StatusInternalServerError,
},
{
// etcdserver.RemoveMember error
&http.Request{
URL: mustNewURL(t, path.Join(adminMembersPrefix, "1")),
Method: "DELETE",
},
&errServer{
errors.New("blah"),
},
http.StatusInternalServerError,
},
}
for i, tt := range tests {
h := &serverHandler{
server: tt.server,
}
rw := httptest.NewRecorder()
h.serveAdminMembers(rw, tt.req)
if rw.Code != tt.wcode {
t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode)
}
}
}
type action struct {
name string
params []interface{}
}
type serverRecorder struct {
actions []action
}
func (s *serverRecorder) Do(_ context.Context, r etcdserverpb.Request) (etcdserver.Response, error) {
s.actions = append(s.actions, action{name: "Do", params: []interface{}{r}})
return etcdserver.Response{}, nil
}
func (s *serverRecorder) Process(_ context.Context, m raftpb.Message) error {
s.actions = append(s.actions, action{name: "Process", params: []interface{}{m}})
return nil
}
func (s *serverRecorder) Start() {}
func (s *serverRecorder) Stop() {}
func (s *serverRecorder) AddMember(_ context.Context, m etcdserver.Member) error {
s.actions = append(s.actions, action{name: "AddMember", params: []interface{}{m}})
return nil
}
func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error {
s.actions = append(s.actions, action{name: "RemoveMember", params: []interface{}{id}})
return nil
}
func TestServeAdminMembersPut(t *testing.T) {
u := mustNewURL(t, path.Join(adminMembersPrefix, "BEEF"))
form := url.Values{"PeerURLs": []string{"http://a", "http://b"}}
body := strings.NewReader(form.Encode())
req, err := http.NewRequest("PUT", u.String(), body)
if err != nil {
t.Fatal(err)
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
s := &serverRecorder{}
h := &serverHandler{
server: s,
}
rw := httptest.NewRecorder()
h.serveAdminMembers(rw, req)
wcode := http.StatusCreated
if rw.Code != wcode {
t.Errorf("code=%d, want %d", rw.Code, wcode)
}
g := rw.Body.String()
if g != "" {
t.Errorf("got body=%q, want %q", g, "")
}
wm := etcdserver.Member{
ID: 0xBEEF,
RaftAttributes: etcdserver.RaftAttributes{
PeerURLs: []string{"http://a", "http://b"},
},
}
wactions := []action{{name: "AddMember", params: []interface{}{wm}}}
if !reflect.DeepEqual(s.actions, wactions) {
t.Errorf("actions = %+v, want %+v", s.actions, wactions)
}
}
func TestServeAdminMembersDelete(t *testing.T) {
req := &http.Request{
Method: "DELETE",
URL: mustNewURL(t, path.Join(adminMembersPrefix, "BEEF")),
}
s := &serverRecorder{}
h := &serverHandler{
server: s,
}
rw := httptest.NewRecorder()
h.serveAdminMembers(rw, req)
wcode := http.StatusNoContent
if rw.Code != wcode {
t.Errorf("code=%d, want %d", rw.Code, wcode)
}
g := rw.Body.String()
if g != "" {
t.Errorf("got body=%q, want %q", g, "")
}
wactions := []action{{name: "RemoveMember", params: []interface{}{uint64(0xBEEF)}}}
if !reflect.DeepEqual(s.actions, wactions) {
t.Errorf("actions = %+v, want %+v", s.actions, wactions)
}
}
type fakeCluster struct {
members []etcdserver.Member
}

View File

@ -74,6 +74,10 @@ type Server interface {
// Process takes a raft message and applies it to the server's raft state
// machine, respecting any timeout of the given context.
Process(ctx context.Context, m raftpb.Message) error
// AddMember attempts to add a member into the cluster.
AddMember(ctx context.Context, memb Member) error
// RemoveMember attempts to remove a member from the cluster.
RemoveMember(ctx context.Context, id uint64) error
}
type RaftTimer interface {