diff --git a/config/config.go b/config/config.go index 655f93ccc..ed9c5592d 100644 --- a/config/config.go +++ b/config/config.go @@ -142,16 +142,6 @@ func (c *Config) Load(arguments []string) error { return err } - // Sanitize all the input fields. - if err := c.Sanitize(); err != nil { - return fmt.Errorf("sanitize: %v", err) - } - - // Force remove server configuration if specified. - if c.Force { - c.Reset() - } - return nil } diff --git a/config/config_test.go b/config/config_test.go index bc33cd212..050b54cfc 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -479,7 +479,7 @@ func TestConfigCustomConfigOverrideSystemConfig(t *testing.T) { c := New() c.SystemPath = p1 assert.Nil(t, c.Load([]string{"-config", p2}), "") - assert.Equal(t, c.Addr, "http://127.0.0.1:6000", "") + assert.Equal(t, c.Addr, "127.0.0.1:6000", "") }) }) } @@ -494,7 +494,7 @@ func TestConfigEnvVarOverrideCustomConfig(t *testing.T) { c := New() c.SystemPath = "" assert.Nil(t, c.Load([]string{"-config", path}), "") - assert.Equal(t, c.Peer.Addr, "http://127.0.0.1:8000", "") + assert.Equal(t, c.Peer.Addr, "127.0.0.1:8000", "") }) } @@ -506,7 +506,7 @@ func TestConfigCLIArgsOverrideEnvVar(t *testing.T) { c := New() c.SystemPath = "" assert.Nil(t, c.Load([]string{"-addr", "127.0.0.1:2000"}), "") - assert.Equal(t, c.Addr, "http://127.0.0.1:2000", "") + assert.Equal(t, c.Addr, "127.0.0.1:2000", "") } //-------------------------------------- diff --git a/etcd/etcd.go b/etcd/etcd.go index 0ee505fc1..3cbdde297 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -70,6 +70,16 @@ func New(c *config.Config) *Etcd { // Run the etcd instance. func (e *Etcd) Run() { + // Sanitize all the input fields. + if err := e.Config.Sanitize(); err != nil { + log.Fatalf("failed sanitizing configuration: %v", err) + } + + // Force remove server configuration if specified. + if e.Config.Force { + e.Config.Reset() + } + // Enable options. if e.Config.VeryVeryVerbose { log.Verbose = true diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index 859913604..4d5b92579 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -34,10 +34,6 @@ func TestRunStop(t *testing.T) { config.Addr = "localhost:0" config.Peer.Addr = "localhost:0" - if err := config.Sanitize(); err != nil { - t.Fatal(err) - } - etcd := New(config) go etcd.Run() <-etcd.ReadyNotify() diff --git a/http/cors.go b/http/cors.go index d6b11140e..eabd6ca7f 100644 --- a/http/cors.go +++ b/http/cors.go @@ -59,6 +59,10 @@ func (h *CORSHandler) addHeader(w http.ResponseWriter, origin string) { // ServeHTTP adds the correct CORS headers based on the origin and returns immediately // with a 200 OK if the method is OPTIONS. func (h *CORSHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + // It is important to flush before leaving the goroutine. + // Or it may miss the latest info written. + defer w.(http.Flusher).Flush() + // Write CORS header. if h.Info.OriginAllowed("*") { h.addHeader(w, "*") diff --git a/server/peer_server.go b/server/peer_server.go index 2c9fc6321..056981f6f 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -696,8 +696,8 @@ func (s *PeerServer) monitorActiveSize() { // Retrieve target active size and actual active size. activeSize := s.ClusterConfig().ActiveSize - peerCount := s.registry.Count() peers := s.registry.Names() + peerCount := s.registry.Count() if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name { peers = append(peers[:index], peers[index+1:]...) } diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index 669d7ebf4..d93f5dc36 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -223,8 +223,9 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht // Retrieves a list of peers and standbys. func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) { machines := make([]*machineMessage, 0) + leader := ps.raftServer.Leader() for _, name := range ps.registry.Names() { - if msg := ps.getMachineMessage(name); msg != nil { + if msg := ps.getMachineMessage(name, leader); msg != nil { machines = append(machines, msg) } } @@ -234,21 +235,27 @@ func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Re // Retrieve single peer or standby. func (ps *PeerServer) getMachineHttpHandler(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) - json.NewEncoder(w).Encode(ps.getMachineMessage(vars["name"])) + m := ps.getMachineMessage(vars["name"], ps.raftServer.Leader()) + json.NewEncoder(w).Encode(m) } -func (ps *PeerServer) getMachineMessage(name string) *machineMessage { +func (ps *PeerServer) getMachineMessage(name string, leader string) *machineMessage { if !ps.registry.Exists(name) { return nil } clientURL, _ := ps.registry.ClientURL(name) peerURL, _ := ps.registry.PeerURL(name) - return &machineMessage{ + msg := &machineMessage{ Name: name, + State: raft.Follower, ClientURL: clientURL, PeerURL: peerURL, } + if name == leader { + msg.State = raft.Leader + } + return msg } // Response to the name request @@ -300,6 +307,7 @@ func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Reques // machineMessage represents information about a peer or standby in the registry. type machineMessage struct { Name string `json:"name"` + State string `json:"state"` ClientURL string `json:"clientURL"` PeerURL string `json:"peerURL"` } diff --git a/server/raft_server_stats.go b/server/raft_server_stats.go index 5f880d196..099d7d65a 100644 --- a/server/raft_server_stats.go +++ b/server/raft_server_stats.go @@ -33,7 +33,7 @@ type raftServerStats struct { } func NewRaftServerStats(name string) *raftServerStats { - return &raftServerStats{ + stats := &raftServerStats{ Name: name, StartTime: time.Now(), sendRateQueue: &statsQueue{ @@ -43,6 +43,8 @@ func NewRaftServerStats(name string) *raftServerStats { back: -1, }, } + stats.LeaderInfo.startTime = time.Now() + return stats } func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) { diff --git a/server/registry.go b/server/registry.go index 459205afe..7ddcb5048 100644 --- a/server/registry.go +++ b/server/registry.go @@ -38,19 +38,6 @@ func NewRegistry(s store.Store) *Registry { } } -// Names returns a list of cached peer names. -func (r *Registry) Names() []string { - r.Lock() - defer r.Unlock() - - names := make([]string, 0, len(r.peers)) - for name := range r.peers { - names = append(names, name) - } - sort.Sort(sort.StringSlice(names)) - return names -} - // Register adds a peer to the registry. func (r *Registry) Register(name string, peerURL string, machURL string) error { // Write data to store. @@ -167,6 +154,17 @@ func (r *Registry) UpdatePeerURL(name string, peerURL string) error { return nil } +func (r *Registry) name(key, name string) (string, bool) { + return name, true +} + +// Names returns a list of cached peer names. +func (r *Registry) Names() []string { + names := r.urls(RegistryKey, "", "", r.name) + sort.Sort(sort.StringSlice(names)) + return names +} + // Retrieves the Client URLs for all nodes. func (r *Registry) ClientURLs(leaderName, selfName string) []string { return r.urls(RegistryKey, leaderName, selfName, r.clientURL) diff --git a/server/v2/tests/get_handler_test.go b/server/v2/tests/get_handler_test.go index 1c7825d07..4d04caeb5 100644 --- a/server/v2/tests/get_handler_test.go +++ b/server/v2/tests/get_handler_test.go @@ -90,6 +90,13 @@ func TestV2GetKeyRecursively(t *testing.T) { // func TestV2WatchKey(t *testing.T) { tests.RunServer(func(s *server.Server) { + // There exists a little gap between etcd ready to serve and + // it actually serves the first request, which means the response + // delay could be a little bigger. + // This test is time sensitive, so it does one request to ensure + // that the server is working. + tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar")) + var watchResp *http.Response c := make(chan bool) go func() { diff --git a/tests/functional/cluster_config_test.go b/tests/functional/cluster_config_test.go index c75ce1de4..c531fb1cb 100644 --- a/tests/functional/cluster_config_test.go +++ b/tests/functional/cluster_config_test.go @@ -2,6 +2,7 @@ package test import ( "bytes" + "encoding/json" "os" "testing" "time" @@ -27,3 +28,25 @@ func TestClusterConfig(t *testing.T) { assert.Equal(t, body["activeSize"], 3) assert.Equal(t, body["promoteDelay"], 60) } + +// TestGetMachines tests '/v2/admin/machines' sends back messages of all machines. +func TestGetMachines(t *testing.T) { + _, etcds, err := CreateCluster(3, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) + assert.NoError(t, err) + defer DestroyCluster(etcds) + + time.Sleep(1 * time.Second) + + resp, err := tests.Get("http://localhost:7001/v2/admin/machines") + if !assert.Equal(t, err, nil) { + t.FailNow() + } + assert.Equal(t, resp.StatusCode, 200) + machines := make([]map[string]interface{}, 0) + b := tests.ReadBody(resp) + json.Unmarshal(b, &machines) + assert.Equal(t, len(machines), 3) + if machines[0]["state"] != "leader" && machines[1]["state"] != "leader" && machines[2]["state"] != "leader" { + t.Errorf("no leader in the cluster") + } +} diff --git a/tests/functional/simple_snapshot_test.go b/tests/functional/simple_snapshot_test.go index e7692e735..4062d091a 100644 --- a/tests/functional/simple_snapshot_test.go +++ b/tests/functional/simple_snapshot_test.go @@ -56,7 +56,7 @@ func TestSnapshot(t *testing.T) { index, _ := strconv.Atoi(snapshots[0].Name()[2:5]) - if index < 507 || index > 515 { + if index < 503 || index > 515 { t.Fatal("wrong name of snapshot :", snapshots[0].Name()) } @@ -89,7 +89,7 @@ func TestSnapshot(t *testing.T) { index, _ = strconv.Atoi(snapshots[0].Name()[2:6]) - if index < 1014 || index > 1025 { + if index < 1010 || index > 1025 { t.Fatal("wrong name of snapshot :", snapshots[0].Name()) } } diff --git a/tests/server_utils.go b/tests/server_utils.go index 0bc483616..d69eabbed 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -1,17 +1,9 @@ package tests import ( - "io/ioutil" - "net/http" - "os" - "sync" - "time" - - "github.com/coreos/etcd/third_party/github.com/goraft/raft" - - "github.com/coreos/etcd/metrics" + "github.com/coreos/etcd/config" + "github.com/coreos/etcd/etcd" "github.com/coreos/etcd/server" - "github.com/coreos/etcd/store" ) const ( @@ -19,94 +11,30 @@ const ( testClientURL = "localhost:4401" testRaftURL = "localhost:7701" testSnapshotCount = 10000 - testHeartbeatInterval = time.Duration(50) * time.Millisecond - testElectionTimeout = time.Duration(200) * time.Millisecond + testHeartbeatInterval = 50 + testElectionTimeout = 200 + testDataDir = "/tmp/ETCDTEST" ) -// Starts a server in a temporary directory. +// Starts a new server. func RunServer(f func(*server.Server)) { - path, _ := ioutil.TempDir("", "etcd-") - defer os.RemoveAll(path) + c := config.New() - store := store.New() - registry := server.NewRegistry(store) + c.Name = testName + c.Addr = testClientURL + c.Peer.Addr = testRaftURL - serverStats := server.NewRaftServerStats(testName) - followersStats := server.NewRaftFollowersStats(testName) + c.DataDir = testDataDir + c.Force = true - psConfig := server.PeerServerConfig{ - Name: testName, - URL: "http://" + testRaftURL, - Scheme: "http", - SnapshotCount: testSnapshotCount, - } - - mb := metrics.NewBucket("") - - ps := server.NewPeerServer(psConfig, registry, store, &mb, followersStats, serverStats) - psListener := server.NewListener("http", testRaftURL, nil) - - // Create Raft transporter and server - dialTimeout := (3 * testHeartbeatInterval) + testElectionTimeout - responseHeaderTimeout := (3 * testHeartbeatInterval) + testElectionTimeout - raftTransporter := server.NewTransporter(followersStats, serverStats, registry, testHeartbeatInterval, dialTimeout, responseHeaderTimeout) - raftServer, err := raft.NewServer(testName, path, raftTransporter, store, ps, "") - if err != nil { - panic(err) - } - raftServer.SetElectionTimeout(testElectionTimeout) - raftServer.SetHeartbeatInterval(testHeartbeatInterval) - ps.SetRaftServer(raftServer) - - s := server.New(testName, "http://"+testClientURL, ps, registry, store, nil) - sListener := server.NewListener("http", testClientURL, nil) - - ps.SetServer(s) - - w := &sync.WaitGroup{} - - // Start up peer server. - c := make(chan bool) - go func() { - c <- true - ps.Start(false, "", []string{}) - h := waitHandler{w, ps.HTTPHandler()} - http.Serve(psListener, &h) - }() - <-c - - // Start up etcd server. - go func() { - c <- true - h := waitHandler{w, s.HTTPHandler()} - http.Serve(sListener, &h) - }() - <-c - - // Wait to make sure servers have started. - time.Sleep(50 * time.Millisecond) + c.Peer.HeartbeatInterval = testHeartbeatInterval + c.Peer.ElectionTimeout = testElectionTimeout + c.SnapshotCount = testSnapshotCount + i := etcd.New(c) + go i.Run() + <-i.ReadyNotify() // Execute the function passed in. - f(s) - - // Clean up servers. - ps.Stop() - psListener.Close() - sListener.Close() - w.Wait() -} - -type waitHandler struct { - wg *sync.WaitGroup - handler http.Handler -} - -func (h *waitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - h.wg.Add(1) - defer h.wg.Done() - h.handler.ServeHTTP(w, r) - - //important to flush before decrementing the wait group. - //we won't get a chance to once main() ends. - w.(http.Flusher).Flush() + f(i.Server) + i.Stop() }