etcd: add /v2/admin/machines/ endpoint

release-2.0
Yicheng Qin 2014-07-09 13:11:14 -07:00
parent f95f53e446
commit edd8d7e534
3 changed files with 182 additions and 2 deletions

View File

@ -27,8 +27,9 @@ const (
v2LeaderPrefix = "/v2/leader"
v2StoreStatsPrefix = "/v2/stats/store"
v2configKVPrefix = "/_etcd/config"
v2adminConfigPrefix = "/v2/admin/config"
v2configKVPrefix = "/_etcd/config"
v2adminConfigPrefix = "/v2/admin/config"
v2adminMachinesPrefix = "/v2/admin/machines/"
raftPrefix = "/raft"
)
@ -107,6 +108,7 @@ func New(c *config.Config, id int64) *Server {
m.Handle(v2LeaderPrefix, handlerErr(s.serveLeader))
m.Handle(v2StoreStatsPrefix, handlerErr(s.serveStoreStats))
m.Handle(v2adminConfigPrefix, handlerErr(s.serveAdminConfig))
m.Handle(v2adminMachinesPrefix, handlerErr(s.serveAdminMachines))
s.Handler = m
return s
}

View File

@ -2,11 +2,29 @@ package etcd
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"path/filepath"
"strings"
"github.com/coreos/etcd/store"
)
const (
stateFollower = "follower"
stateCandidate = "candidate"
stateLeader = "leader"
)
// machineMessage represents information about a peer or standby in the registry.
type machineMessage struct {
Name string `json:"name"`
State string `json:"state"`
ClientURL string `json:"clientURL"`
PeerURL string `json:"peerURL"`
}
func (s *Server) serveAdminConfig(w http.ResponseWriter, r *http.Request) error {
switch r.Method {
case "GET":
@ -34,3 +52,70 @@ func (s *Server) serveAdminConfig(w http.ResponseWriter, r *http.Request) error
json.NewEncoder(w).Encode(s.ClusterConfig())
return nil
}
func (s *Server) serveAdminMachines(w http.ResponseWriter, r *http.Request) error {
switch r.Method {
case "GET":
name := strings.TrimPrefix(r.URL.Path, v2adminMachinesPrefix)
var info interface{}
var err error
if name != "" {
info, err = s.someMachineMessage(name)
} else {
info, err = s.allMachineMessages()
}
if err != nil {
return err
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(info)
case "DELETE":
// todo: remove the machine
panic("unimplemented")
default:
return allow(w, "GET", "DELETE")
}
return nil
}
// someMachineMessage return machine message of specified name.
func (s *Server) someMachineMessage(name string) (*machineMessage, error) {
p := filepath.Join(v2machineKVPrefix, name)
e, err := s.Get(p, false, false)
if err != nil {
return nil, err
}
lead := fmt.Sprint(s.node.Leader())
return newMachineMessage(e.Node, lead), nil
}
func (s *Server) allMachineMessages() ([]*machineMessage, error) {
e, err := s.Get(v2machineKVPrefix, false, false)
if err != nil {
return nil, err
}
lead := fmt.Sprint(s.node.Leader())
ms := make([]*machineMessage, len(e.Node.Nodes))
for i, n := range e.Node.Nodes {
ms[i] = newMachineMessage(n, lead)
}
return ms, nil
}
func newMachineMessage(n *store.NodeExtern, lead string) *machineMessage {
_, name := filepath.Split(n.Key)
q, err := url.ParseQuery(*n.Value)
if err != nil {
panic("fail to parse the info for machine " + name)
}
m := &machineMessage{
Name: name,
State: stateFollower,
ClientURL: q["etcd"][0],
PeerURL: q["raft"][0],
}
if name == lead {
m.State = stateLeader
}
return m
}

View File

@ -3,6 +3,7 @@ package etcd
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"reflect"
@ -219,6 +220,98 @@ func TestPutAdminConfigEndPoint(t *testing.T) {
}
}
func TestGetAdminMachineEndPoint(t *testing.T) {
es, hs := buildCluster(3, false)
waitCluster(t, es)
for i := range es {
for j := range hs {
name := fmt.Sprint(es[i].id)
r, err := http.Get(hs[j].URL + v2adminMachinesPrefix + name)
if err != nil {
t.Errorf("%v", err)
continue
}
if g := r.StatusCode; g != 200 {
t.Errorf("#%d on %d: status = %d, want %d", i, j, g, 200)
}
if g := r.Header.Get("Content-Type"); g != "application/json" {
t.Errorf("#%d on %d: ContentType = %d, want application/json", i, j, g)
}
m := new(machineMessage)
err = json.NewDecoder(r.Body).Decode(m)
r.Body.Close()
if err != nil {
t.Errorf("%v", err)
continue
}
wm := &machineMessage{
Name: name,
State: stateFollower,
ClientURL: hs[i].URL,
PeerURL: hs[i].URL,
}
if i == 0 {
wm.State = stateLeader
}
if !reflect.DeepEqual(m, wm) {
t.Errorf("#%d on %d: body = %+v, want %+v", i, j, m, wm)
}
}
}
for i := range es {
es[len(es)-i-1].Stop()
}
for i := range hs {
hs[len(hs)-i-1].Close()
}
afterTest(t)
}
func TestGetAdminMachinesEndPoint(t *testing.T) {
es, hs := buildCluster(3, false)
waitCluster(t, es)
w := make([]*machineMessage, len(hs))
for i := range hs {
w[i] = &machineMessage{
Name: fmt.Sprint(es[i].id),
State: stateFollower,
ClientURL: hs[i].URL,
PeerURL: hs[i].URL,
}
}
w[0].State = stateLeader
for i := range hs {
r, err := http.Get(hs[i].URL + v2adminMachinesPrefix)
if err != nil {
t.Errorf("%v", err)
continue
}
m := make([]*machineMessage, 0)
err = json.NewDecoder(r.Body).Decode(&m)
r.Body.Close()
if err != nil {
t.Errorf("%v", err)
continue
}
if !reflect.DeepEqual(m, w) {
t.Errorf("on %d: machines = %+v, want %+v", i, m, w)
}
}
for i := range es {
es[len(es)-i-1].Stop()
}
for i := range hs {
hs[len(hs)-i-1].Close()
}
afterTest(t)
}
// barrier ensures that all servers have made further progress on applied index
// compared to the base one.
func barrier(t *testing.T, base int, es []*Server) {