From 91f768f9ae4e7e6cc5b1be84ed263b432edba32b Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Fri, 17 Jan 2014 19:28:00 -0800 Subject: [PATCH 01/20] refactor(cors): Rename cors_handler.go to cors.go --- server/{cors_handler.go => cors.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename server/{cors_handler.go => cors.go} (100%) diff --git a/server/cors_handler.go b/server/cors.go similarity index 100% rename from server/cors_handler.go rename to server/cors.go From 9c8a23c333df9f5a71af955004b13d17a6043c45 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Wed, 15 Jan 2014 23:17:04 -0800 Subject: [PATCH 02/20] refactor(PeerServer): Use a config struct in PeerServer --- etcd.go | 17 ++++--- server/join_command.go | 2 +- server/peer_server.go | 82 +++++++++++++++++----------------- server/peer_server_handlers.go | 30 ++++++------- server/transporter.go | 6 +-- tests/server_utils.go | 13 +++++- 6 files changed, 84 insertions(+), 66 deletions(-) diff --git a/etcd.go b/etcd.go index 32116d2f8..800dcb1f3 100644 --- a/etcd.go +++ b/etcd.go @@ -103,11 +103,18 @@ func main() { registry := server.NewRegistry(store) // Create peer server. - heartbeatTimeout := time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond - electionTimeout := time.Duration(config.Peer.ElectionTimeout) * time.Millisecond - ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount, heartbeatTimeout, electionTimeout, &mb) - ps.MaxClusterSize = config.MaxClusterSize - ps.RetryTimes = config.MaxRetryAttempts + psConfig := server.PeerServerConfig{ + Name: info.Name, + Path: config.DataDir, + URL: info.RaftURL, + BindAddr: info.RaftListenHost, + SnapshotCount: config.SnapshotCount, + HeartbeatTimeout: time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond, + ElectionTimeout: time.Duration(config.Peer.ElectionTimeout) * time.Millisecond, + MaxClusterSize: config.MaxClusterSize, + RetryTimes: config.MaxRetryAttempts, + } + ps := server.NewPeerServer(psConfig, &peerTLSConfig, &info.RaftTLS, registry, store, &mb) // Create client server. s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &tlsConfig, &info.EtcdTLS, ps, registry, store, &mb) diff --git a/server/join_command.go b/server/join_command.go index ed262b1bd..2cf5cd71a 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -52,7 +52,7 @@ func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) { } // Check peer number in the cluster - if ps.registry.Count() == ps.MaxClusterSize { + if ps.registry.Count() == ps.Config.MaxClusterSize { log.Debug("Reject join request from ", c.Name) return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex()) } diff --git a/server/peer_server.go b/server/peer_server.go index 4e494b89b..56149a5f4 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -25,15 +25,25 @@ const retryInterval = 10 const ThresholdMonitorTimeout = 5 * time.Second +type PeerServerConfig struct { + Name string + Path string + URL string + BindAddr string + SnapshotCount int + HeartbeatTimeout time.Duration + ElectionTimeout time.Duration + MaxClusterSize int + RetryTimes int +} + type PeerServer struct { + Config PeerServerConfig raftServer raft.Server server *Server httpServer *http.Server listener net.Listener joinIndex uint64 - name string - url string - bindAddr string tlsConf *TLSConfig tlsInfo *TLSInfo followersStats *raftFollowersStats @@ -41,10 +51,6 @@ type PeerServer struct { registry *Registry store store.Store snapConf *snapshotConf - MaxClusterSize int - RetryTimes int - HeartbeatTimeout time.Duration - ElectionTimeout time.Duration closeChan chan bool timeoutThresholdChan chan interface{} @@ -65,22 +71,20 @@ type snapshotConf struct { snapshotThr uint64 } -func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int, heartbeatTimeout, electionTimeout time.Duration, mb *metrics.Bucket) *PeerServer { - +func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, mb *metrics.Bucket) *PeerServer { s := &PeerServer{ - name: name, - url: url, - bindAddr: bindAddr, + Config: psConfig, + tlsConf: tlsConf, tlsInfo: tlsInfo, registry: registry, store: store, followersStats: &raftFollowersStats{ - Leader: name, + Leader: psConfig.Name, Followers: make(map[string]*raftFollowerStats), }, serverStats: &raftServerStats{ - Name: name, + Name: psConfig.Name, StartTime: time.Now(), sendRateQueue: &statsQueue{ back: -1, @@ -89,8 +93,6 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon back: -1, }, }, - HeartbeatTimeout: heartbeatTimeout, - ElectionTimeout: electionTimeout, timeoutThresholdChan: make(chan interface{}, 1), @@ -101,7 +103,7 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s) // Create raft server - raftServer, err := raft.NewServer(name, path, raftTransporter, s.store, s, "") + raftServer, err := raft.NewServer(psConfig.Name, psConfig.Path, raftTransporter, s.store, s, "") if err != nil { log.Fatal(err) } @@ -110,7 +112,7 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon checkingInterval: time.Second * 3, // this is not accurate, we will update raft to provide an api lastIndex: raftServer.CommitIndex(), - snapshotThr: uint64(snapshotCount), + snapshotThr: uint64(psConfig.SnapshotCount), } s.raftServer = raftServer @@ -134,14 +136,14 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error { err := s.raftServer.LoadSnapshot() if err == nil { - log.Debugf("%s finished load snapshot", s.name) + log.Debugf("%s finished load snapshot", s.Config.Name) } else { log.Debug(err) } } - s.raftServer.SetElectionTimeout(s.ElectionTimeout) - s.raftServer.SetHeartbeatTimeout(s.HeartbeatTimeout) + s.raftServer.SetElectionTimeout(s.Config.ElectionTimeout) + s.raftServer.SetHeartbeatTimeout(s.Config.HeartbeatTimeout) s.raftServer.Start() @@ -155,7 +157,7 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error { } else { // Rejoin the previous cluster - cluster = s.registry.PeerURLs(s.raftServer.Leader(), s.name) + cluster = s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) for i := 0; i < len(cluster); i++ { u, err := url.Parse(cluster[i]) if err != nil { @@ -168,7 +170,7 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error { log.Warn("the entire cluster is down! this peer will restart the cluster.") } - log.Debugf("%s restart as a follower", s.name) + log.Debugf("%s restart as a follower", s.Config.Name) } s.closeChan = make(chan bool) @@ -255,17 +257,17 @@ func (s *PeerServer) SetServer(server *Server) { func (s *PeerServer) startAsLeader() { // leader need to join self as a peer for { - _, err := s.raftServer.Do(NewJoinCommand(store.MinVersion(), store.MaxVersion(), s.raftServer.Name(), s.url, s.server.URL())) + _, err := s.raftServer.Do(NewJoinCommand(store.MinVersion(), store.MaxVersion(), s.raftServer.Name(), s.Config.URL, s.server.URL())) if err == nil { break } } - log.Debugf("%s start as a leader", s.name) + log.Debugf("%s start as a leader", s.Config.Name) } func (s *PeerServer) startAsFollower(cluster []string) { // start as a follower in a existing cluster - for i := 0; i < s.RetryTimes; i++ { + for i := 0; i < s.Config.RetryTimes; i++ { ok := s.joinCluster(cluster) if ok { return @@ -274,19 +276,19 @@ func (s *PeerServer) startAsFollower(cluster []string) { time.Sleep(time.Second * retryInterval) } - log.Fatalf("Cannot join the cluster via given peers after %x retries", s.RetryTimes) + log.Fatalf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes) } // Start to listen and response raft command func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) error { - log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.bindAddr, s.url) + log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.Config.Name, s.Config.BindAddr, s.Config.URL) router := mux.NewRouter() s.httpServer = &http.Server{ Handler: router, TLSConfig: &tlsConf, - Addr: s.bindAddr, + Addr: s.Config.BindAddr, } // internal commands @@ -333,7 +335,7 @@ func getVersion(t *transporter, versionURL url.URL) (int, error) { // Upgradable checks whether all peers in a cluster support an upgrade to the next store version. func (s *PeerServer) Upgradable() error { nextVersion := s.store.Version() + 1 - for _, peerURL := range s.registry.PeerURLs(s.raftServer.Leader(), s.name) { + for _, peerURL := range s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) { u, err := url.Parse(peerURL) if err != nil { return fmt.Errorf("PeerServer: Cannot parse URL: '%s' (%s)", peerURL, err) @@ -361,7 +363,7 @@ func (s *PeerServer) joinCluster(cluster []string) bool { err := s.joinByPeer(s.raftServer, peer, s.tlsConf.Scheme) if err == nil { - log.Debugf("%s success join to the cluster via peer %s", s.name, peer) + log.Debugf("%s success join to the cluster via peer %s", s.Config.Name, peer) return true } else { @@ -392,7 +394,7 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) return fmt.Errorf("Unable to join: cluster version is %d; version compatibility is %d - %d", version, store.MinVersion(), store.MaxVersion()) } - json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.url, s.server.URL())) + json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.Config.URL, s.server.URL())) joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/join"} @@ -417,7 +419,7 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) if resp.StatusCode == http.StatusTemporaryRedirect { address := resp.Header.Get("Location") log.Debugf("Send Join Request to %s", address) - json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.url, s.server.URL())) + json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.Config.URL, s.server.URL())) resp, req, err = t.Post(address, &b) } else if resp.StatusCode == http.StatusBadRequest { @@ -477,21 +479,21 @@ func (s *PeerServer) raftEventLogger(event raft.Event) { switch event.Type() { case raft.StateChangeEventType: - log.Infof("%s: state changed from '%v' to '%v'.", s.name, prevValue, value) + log.Infof("%s: state changed from '%v' to '%v'.", s.Config.Name, prevValue, value) case raft.TermChangeEventType: - log.Infof("%s: term #%v started.", s.name, value) + log.Infof("%s: term #%v started.", s.Config.Name, value) case raft.LeaderChangeEventType: - log.Infof("%s: leader changed from '%v' to '%v'.", s.name, prevValue, value) + log.Infof("%s: leader changed from '%v' to '%v'.", s.Config.Name, prevValue, value) case raft.AddPeerEventType: - log.Infof("%s: peer added: '%v'", s.name, value) + log.Infof("%s: peer added: '%v'", s.Config.Name, value) case raft.RemovePeerEventType: - log.Infof("%s: peer removed: '%v'", s.name, value) + log.Infof("%s: peer removed: '%v'", s.Config.Name, value) case raft.HeartbeatTimeoutEventType: var name = "" if peer, ok := value.(*raft.Peer); ok { name = peer.Name } - log.Infof("%s: warning: heartbeat timed out: '%v'", s.name, name) + log.Infof("%s: warning: heartbeat timed out: '%v'", s.Config.Name, name) case raft.ElectionTimeoutThresholdEventType: select { case s.timeoutThresholdChan <- value: @@ -538,7 +540,7 @@ func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) { for { select { case value := <-s.timeoutThresholdChan: - log.Infof("%s: warning: heartbeat near election timeout: %v", s.name, value) + log.Infof("%s: warning: heartbeat near election timeout: %v", s.Config.Name, value) case <-closeChan: return } diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index e1b485bec..a4ef84710 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -15,7 +15,7 @@ import ( // Get all the current logs func (ps *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] GET %s/log", ps.url) + log.Debugf("[recv] GET %s/log", ps.Config.URL) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(ps.raftServer.LogEntries()) @@ -27,11 +27,11 @@ func (ps *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) if _, err := rvreq.Decode(req.Body); err != nil { http.Error(w, "", http.StatusBadRequest) - log.Warnf("[recv] BADREQUEST %s/vote [%v]", ps.url, err) + log.Warnf("[recv] BADREQUEST %s/vote [%v]", ps.Config.URL, err) return } - log.Debugf("[recv] POST %s/vote [%s]", ps.url, rvreq.CandidateName) + log.Debugf("[recv] POST %s/vote [%s]", ps.Config.URL, rvreq.CandidateName) resp := ps.raftServer.RequestVote(rvreq) @@ -55,11 +55,11 @@ func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http. if _, err := aereq.Decode(req.Body); err != nil { http.Error(w, "", http.StatusBadRequest) - log.Warnf("[recv] BADREQUEST %s/log/append [%v]", ps.url, err) + log.Warnf("[recv] BADREQUEST %s/log/append [%v]", ps.Config.URL, err) return } - log.Debugf("[recv] POST %s/log/append [%d]", ps.url, len(aereq.Entries)) + log.Debugf("[recv] POST %s/log/append [%d]", ps.Config.URL, len(aereq.Entries)) ps.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) @@ -90,11 +90,11 @@ func (ps *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Reque if _, err := ssreq.Decode(req.Body); err != nil { http.Error(w, "", http.StatusBadRequest) - log.Warnf("[recv] BADREQUEST %s/snapshot [%v]", ps.url, err) + log.Warnf("[recv] BADREQUEST %s/snapshot [%v]", ps.Config.URL, err) return } - log.Debugf("[recv] POST %s/snapshot", ps.url) + log.Debugf("[recv] POST %s/snapshot", ps.Config.URL) resp := ps.raftServer.RequestSnapshot(ssreq) @@ -117,11 +117,11 @@ func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *ht if _, err := ssrreq.Decode(req.Body); err != nil { http.Error(w, "", http.StatusBadRequest) - log.Warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", ps.url, err) + log.Warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", ps.Config.URL, err) return } - log.Debugf("[recv] POST %s/snapshotRecovery", ps.url) + log.Debugf("[recv] POST %s/snapshotRecovery", ps.Config.URL) resp := ps.raftServer.SnapshotRecoveryRequest(ssrreq) @@ -140,7 +140,7 @@ func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *ht // Get the port that listening for etcd connecting of the server func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s/etcdURL/ ", ps.url) + log.Debugf("[recv] Get %s/etcdURL/ ", ps.Config.URL) w.WriteHeader(http.StatusOK) w.Write([]byte(ps.server.URL())) } @@ -195,21 +195,21 @@ func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request // Response to the name request func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s/name/ ", ps.url) + log.Debugf("[recv] Get %s/name/ ", ps.Config.URL) w.WriteHeader(http.StatusOK) - w.Write([]byte(ps.name)) + w.Write([]byte(ps.Config.Name)) } // Response to the name request func (ps *PeerServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s/version/ ", ps.url) + log.Debugf("[recv] Get %s/version/ ", ps.Config.URL) w.WriteHeader(http.StatusOK) w.Write([]byte(strconv.Itoa(ps.store.Version()))) } // Checks whether a given version is supported. func (ps *PeerServer) VersionCheckHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s%s ", ps.url, req.URL.Path) + log.Debugf("[recv] Get %s%s ", ps.Config.URL, req.URL.Path) vars := mux.Vars(req) version, _ := strconv.Atoi(vars["version"]) if version >= store.MinVersion() && version <= store.MaxVersion() { @@ -221,7 +221,7 @@ func (ps *PeerServer) VersionCheckHttpHandler(w http.ResponseWriter, req *http.R // Upgrades the current store version to the next version. func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s/version", ps.url) + log.Debugf("[recv] Get %s/version", ps.Config.URL) // Check if upgrade is possible for all nodes. if err := ps.Upgradable(); err != nil { diff --git a/server/transporter.go b/server/transporter.go index 22e113605..1aa5442fd 100644 --- a/server/transporter.go +++ b/server/transporter.go @@ -27,8 +27,8 @@ type dialer func(network, addr string) (net.Conn, error) // whether the user give the server cert and key func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter { // names for each type of timeout, for the sake of clarity - dialTimeout := (3 * peerServer.HeartbeatTimeout) + peerServer.ElectionTimeout - responseHeaderTimeout := (3 * peerServer.HeartbeatTimeout) + peerServer.ElectionTimeout + dialTimeout := (3 * peerServer.Config.HeartbeatTimeout) + peerServer.Config.ElectionTimeout + responseHeaderTimeout := (3 * peerServer.Config.HeartbeatTimeout) + peerServer.Config.ElectionTimeout t := transporter{} @@ -227,7 +227,7 @@ func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) // Cancel the on fly HTTP transaction when timeout happens. func (t *transporter) CancelWhenTimeout(req *http.Request) { go func() { - time.Sleep(t.peerServer.HeartbeatTimeout) + time.Sleep(t.peerServer.Config.HeartbeatTimeout) t.transport.CancelRequest(req) }() } diff --git a/tests/server_utils.go b/tests/server_utils.go index 6e3d369c3..5a3c05b35 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -26,8 +26,17 @@ func RunServer(f func(*server.Server)) { store := store.New() registry := server.NewRegistry(store) - ps := server.NewPeerServer(testName, path, "http://"+testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount, testHeartbeatTimeout, testElectionTimeout, nil) - ps.MaxClusterSize = 9 + psConfig := server.PeerServerConfig{ + Name: testName, + Path: path, + URL: "http://"+testRaftURL, + BindAddr: testRaftURL, + SnapshotCount: testSnapshotCount, + HeartbeatTimeout: testHeartbeatTimeout, + ElectionTimeout: testElectionTimeout, + MaxClusterSize: 9, + } + ps := server.NewPeerServer(psConfig, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, nil) s := server.New(testName, "http://"+testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store, nil) ps.SetServer(s) From c47760382e28a15ab6615cbfcc04005d6b118668 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Wed, 15 Jan 2014 23:24:14 -0800 Subject: [PATCH 03/20] refactor(Server): Use a config struct in Server --- etcd.go | 7 ++++++- server/server.go | 26 +++++++++++++++----------- tests/server_utils.go | 9 ++++++++- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/etcd.go b/etcd.go index 800dcb1f3..c60577085 100644 --- a/etcd.go +++ b/etcd.go @@ -117,7 +117,12 @@ func main() { ps := server.NewPeerServer(psConfig, &peerTLSConfig, &info.RaftTLS, registry, store, &mb) // Create client server. - s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &tlsConfig, &info.EtcdTLS, ps, registry, store, &mb) + sConfig := server.ServerConfig{ + Name: info.Name, + URL: info.EtcdURL, + BindAddr: info.EtcdListenHost, + } + s := server.New(sConfig, &tlsConfig, &info.EtcdTLS, ps, registry, store, &mb) if err := s.AllowOrigins(config.CorsOrigins); err != nil { panic(err) } diff --git a/server/server.go b/server/server.go index 0674a2b5e..a660bc593 100644 --- a/server/server.go +++ b/server/server.go @@ -22,15 +22,20 @@ import ( "github.com/gorilla/mux" ) +type ServerConfig struct { + Name string + URL string + BindAddr string +} + // This is the default implementation of the Server interface. type Server struct { http.Server + Config ServerConfig peerServer *PeerServer registry *Registry listener net.Listener store store.Store - name string - url string tlsConf *TLSConfig tlsInfo *TLSInfo router *mux.Router @@ -39,20 +44,19 @@ type Server struct { } // Creates a new Server. -func New(name string, urlStr string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, peerServer *PeerServer, registry *Registry, store store.Store, mb *metrics.Bucket) *Server { +func New(sConfig ServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, peerServer *PeerServer, registry *Registry, store store.Store, mb *metrics.Bucket) *Server { r := mux.NewRouter() cors := &corsHandler{router: r} s := &Server{ + Config: sConfig, Server: http.Server{ Handler: cors, TLSConfig: &tlsConf.Server, - Addr: bindAddr, + Addr: sConfig.BindAddr, }, - name: name, store: store, registry: registry, - url: urlStr, tlsConf: tlsConf, tlsInfo: tlsInfo, peerServer: peerServer, @@ -96,7 +100,7 @@ func (s *Server) Term() uint64 { // The server URL. func (s *Server) URL() string { - return s.url + return s.Config.URL } // Retrives the Peer URL for a given node name. @@ -143,7 +147,7 @@ func (s *Server) installV2() { func (s *Server) installMod() { r := s.router - r.PathPrefix("/mod").Handler(http.StripPrefix("/mod", mod.HttpHandler(s.url))) + r.PathPrefix("/mod").Handler(http.StripPrefix("/mod", mod.HttpHandler(s.Config.URL))) } func (s *Server) installDebug() { @@ -176,7 +180,7 @@ func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Reque // Wrap the standard HandleFunc interface to pass in the server reference. return r.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) { // Log request. - log.Debugf("[recv] %s %s %s [%s]", req.Method, s.url, req.URL.Path, req.RemoteAddr) + log.Debugf("[recv] %s %s %s [%s]", req.Method, s.Config.URL, req.URL.Path, req.RemoteAddr) // Execute handler function and return error if necessary. if err := f(w, req); err != nil { @@ -193,7 +197,7 @@ func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Reque // Start to listen and response etcd client command func (s *Server) ListenAndServe() error { - log.Infof("etcd server [name %s, listen on %s, advertised url %s]", s.name, s.Server.Addr, s.url) + log.Infof("etcd server [name %s, listen on %s, advertised url %s]", s.Config.Name, s.Server.Addr, s.Config.URL) if s.tlsConf.Scheme == "http" { return s.listenAndServe() @@ -353,7 +357,7 @@ func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) erro // Handler to return all the known peers in the current cluster. func (s *Server) GetPeersHandler(w http.ResponseWriter, req *http.Request) error { - peers := s.registry.ClientURLs(s.peerServer.RaftServer().Leader(), s.name) + peers := s.registry.ClientURLs(s.peerServer.RaftServer().Leader(), s.Config.Name) w.WriteHeader(http.StatusOK) w.Write([]byte(strings.Join(peers, ", "))) return nil diff --git a/tests/server_utils.go b/tests/server_utils.go index 5a3c05b35..cc785d2e7 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -37,7 +37,14 @@ func RunServer(f func(*server.Server)) { MaxClusterSize: 9, } ps := server.NewPeerServer(psConfig, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, nil) - s := server.New(testName, "http://"+testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store, nil) + + sConfig := server.ServerConfig{ + Name: testName, + URL: "http://"+testClientURL, + BindAddr: testClientURL, + } + s := server.New(sConfig, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store, nil) + ps.SetServer(s) // Start up peer server. From a93d60be90bfe41eabc0a18f56a3b4144dda5f8e Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Fri, 17 Jan 2014 20:04:10 -0800 Subject: [PATCH 04/20] refactor(cors): Break apart CORS data and middleware --- etcd.go | 11 ++++++++--- server/cors.go | 34 +++++++++++++++++----------------- server/peer_server.go | 1 + server/peer_server_handlers.go | 4 ++-- server/server.go | 33 ++++++++++++--------------------- tests/server_utils.go | 3 +++ 6 files changed, 43 insertions(+), 43 deletions(-) diff --git a/etcd.go b/etcd.go index c60577085..26ec45ebc 100644 --- a/etcd.go +++ b/etcd.go @@ -98,6 +98,12 @@ func main() { } } + // Retrieve CORS configuration + corsInfo, err := server.NewCORSInfo(config.CorsOrigins) + if err != nil { + log.Fatal("CORS:", err) + } + // Create etcd key-value store and registry. store := store.New() registry := server.NewRegistry(store) @@ -113,6 +119,7 @@ func main() { ElectionTimeout: time.Duration(config.Peer.ElectionTimeout) * time.Millisecond, MaxClusterSize: config.MaxClusterSize, RetryTimes: config.MaxRetryAttempts, + CORS: corsInfo, } ps := server.NewPeerServer(psConfig, &peerTLSConfig, &info.RaftTLS, registry, store, &mb) @@ -121,11 +128,9 @@ func main() { Name: info.Name, URL: info.EtcdURL, BindAddr: info.EtcdListenHost, + CORS: corsInfo, } s := server.New(sConfig, &tlsConfig, &info.EtcdTLS, ps, registry, store, &mb) - if err := s.AllowOrigins(config.CorsOrigins); err != nil { - panic(err) - } if config.Trace() { s.EnableTracing() diff --git a/server/cors.go b/server/cors.go index fec3c4abc..a3728b689 100644 --- a/server/cors.go +++ b/server/cors.go @@ -20,50 +20,50 @@ import ( "fmt" "net/http" "net/url" - - "github.com/gorilla/mux" ) -type corsHandler struct { - router *mux.Router - corsOrigins map[string]bool +type corsInfo struct { + origins map[string]bool } -// AllowOrigins sets a comma-delimited list of origins that are allowed. -func (s *corsHandler) AllowOrigins(origins []string) error { +func NewCORSInfo(origins []string) (*corsInfo, error) { // Construct a lookup of all origins. m := make(map[string]bool) for _, v := range origins { if v != "*" { if _, err := url.Parse(v); err != nil { - return fmt.Errorf("Invalid CORS origin: %s", err) + return nil, fmt.Errorf("Invalid CORS origin: %s", err) } } m[v] = true } - s.corsOrigins = m - return nil + return &corsInfo{m}, nil } // OriginAllowed determines whether the server will allow a given CORS origin. -func (c *corsHandler) OriginAllowed(origin string) bool { - return c.corsOrigins["*"] || c.corsOrigins[origin] +func (c *corsInfo) OriginAllowed(origin string) bool { + return c.origins["*"] || c.origins[origin] +} + +type corsHTTPMiddleware struct { + next http.Handler + info *corsInfo } // addHeader adds the correct cors headers given an origin -func (h *corsHandler) addHeader(w http.ResponseWriter, origin string) { +func (h *corsHTTPMiddleware) addHeader(w http.ResponseWriter, origin string) { w.Header().Add("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") w.Header().Add("Access-Control-Allow-Origin", origin) } // ServeHTTP adds the correct CORS headers based on the origin and returns immediatly // with a 200 OK if the method is OPTIONS. -func (h *corsHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { +func (h *corsHTTPMiddleware) ServeHTTP(w http.ResponseWriter, req *http.Request) { // Write CORS header. - if h.OriginAllowed("*") { + if h.info.OriginAllowed("*") { h.addHeader(w, "*") - } else if origin := req.Header.Get("Origin"); h.OriginAllowed(origin) { + } else if origin := req.Header.Get("Origin"); h.info.OriginAllowed(origin) { h.addHeader(w, origin) } @@ -72,5 +72,5 @@ func (h *corsHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - h.router.ServeHTTP(w, req) + h.next.ServeHTTP(w, req) } diff --git a/server/peer_server.go b/server/peer_server.go index 56149a5f4..bb5f8eb3a 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -35,6 +35,7 @@ type PeerServerConfig struct { ElectionTimeout time.Duration MaxClusterSize int RetryTimes int + CORS *corsInfo } type PeerServer struct { diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index a4ef84710..a2c498101 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -150,9 +150,9 @@ func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) command := &JoinCommand{} // Write CORS header. - if ps.server.OriginAllowed("*") { + if ps.Config.CORS.OriginAllowed("*") { w.Header().Add("Access-Control-Allow-Origin", "*") - } else if ps.server.OriginAllowed(req.Header.Get("Origin")) { + } else if ps.Config.CORS.OriginAllowed(req.Header.Get("Origin")) { w.Header().Add("Access-Control-Allow-Origin", req.Header.Get("Origin")) } diff --git a/server/server.go b/server/server.go index a660bc593..7a64d17fa 100644 --- a/server/server.go +++ b/server/server.go @@ -26,27 +26,28 @@ type ServerConfig struct { Name string URL string BindAddr string + CORS *corsInfo } // This is the default implementation of the Server interface. type Server struct { http.Server - Config ServerConfig - peerServer *PeerServer - registry *Registry - listener net.Listener - store store.Store - tlsConf *TLSConfig - tlsInfo *TLSInfo - router *mux.Router - corsHandler *corsHandler + Config ServerConfig + peerServer *PeerServer + registry *Registry + listener net.Listener + store store.Store + tlsConf *TLSConfig + tlsInfo *TLSInfo + router *mux.Router + corsMiddleware *corsHTTPMiddleware metrics *metrics.Bucket } // Creates a new Server. func New(sConfig ServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, peerServer *PeerServer, registry *Registry, store store.Store, mb *metrics.Bucket) *Server { r := mux.NewRouter() - cors := &corsHandler{router: r} + cors := &corsHTTPMiddleware{r, sConfig.CORS} s := &Server{ Config: sConfig, @@ -61,7 +62,7 @@ func New(sConfig ServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, peerServer tlsInfo: tlsInfo, peerServer: peerServer, router: r, - corsHandler: cors, + corsMiddleware: cors, metrics: mb, } @@ -326,16 +327,6 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque } } -// OriginAllowed determines whether the server will allow a given CORS origin. -func (s *Server) OriginAllowed(origin string) bool { - return s.corsHandler.OriginAllowed(origin) -} - -// AllowOrigins sets a comma-delimited list of origins that are allowed. -func (s *Server) AllowOrigins(origins []string) error { - return s.corsHandler.AllowOrigins(origins) -} - // Handler to return the current version of etcd. func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) error { w.WriteHeader(http.StatusOK) diff --git a/tests/server_utils.go b/tests/server_utils.go index cc785d2e7..93f3795b9 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -25,6 +25,7 @@ func RunServer(f func(*server.Server)) { store := store.New() registry := server.NewRegistry(store) + corsInfo, _ := server.NewCORSInfo([]string{}) psConfig := server.PeerServerConfig{ Name: testName, @@ -35,6 +36,7 @@ func RunServer(f func(*server.Server)) { HeartbeatTimeout: testHeartbeatTimeout, ElectionTimeout: testElectionTimeout, MaxClusterSize: 9, + CORS: corsInfo, } ps := server.NewPeerServer(psConfig, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, nil) @@ -42,6 +44,7 @@ func RunServer(f func(*server.Server)) { Name: testName, URL: "http://"+testClientURL, BindAddr: testClientURL, + CORS: corsInfo, } s := server.New(sConfig, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store, nil) From c0ff8f60261d4023e400609f3f56facc608cce21 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Sat, 18 Jan 2014 10:09:54 -0800 Subject: [PATCH 05/20] chore(imports): Shift around some imports --- etcd.go | 3 ++- server/peer_server.go | 5 +++-- server/server.go | 5 +++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/etcd.go b/etcd.go index 26ec45ebc..e03b6f1ad 100644 --- a/etcd.go +++ b/etcd.go @@ -22,11 +22,12 @@ import ( "runtime" "time" + "github.com/coreos/raft" + "github.com/coreos/etcd/log" "github.com/coreos/etcd/metrics" "github.com/coreos/etcd/server" "github.com/coreos/etcd/store" - "github.com/coreos/raft" ) func main() { diff --git a/server/peer_server.go b/server/peer_server.go index bb5f8eb3a..92da00434 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -13,12 +13,13 @@ import ( "strconv" "time" + "github.com/coreos/raft" + "github.com/gorilla/mux" + etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/log" "github.com/coreos/etcd/metrics" "github.com/coreos/etcd/store" - "github.com/coreos/raft" - "github.com/gorilla/mux" ) const retryInterval = 10 diff --git a/server/server.go b/server/server.go index 7a64d17fa..c690c060a 100644 --- a/server/server.go +++ b/server/server.go @@ -10,6 +10,9 @@ import ( "strings" "time" + "github.com/coreos/raft" + "github.com/gorilla/mux" + etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/log" "github.com/coreos/etcd/metrics" @@ -18,8 +21,6 @@ import ( "github.com/coreos/etcd/server/v2" "github.com/coreos/etcd/store" _ "github.com/coreos/etcd/store/v2" - "github.com/coreos/raft" - "github.com/gorilla/mux" ) type ServerConfig struct { From 91fc6aabd293a26b72642c32500392b6c78bb654 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Sat, 18 Jan 2014 10:10:06 -0800 Subject: [PATCH 06/20] chore(gofmt): Run gofmt --- etcd.go | 24 ++++++++++++------------ server/peer_server.go | 26 +++++++++++++------------- server/server.go | 12 ++++++------ 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/etcd.go b/etcd.go index e03b6f1ad..d24814840 100644 --- a/etcd.go +++ b/etcd.go @@ -111,25 +111,25 @@ func main() { // Create peer server. psConfig := server.PeerServerConfig{ - Name: info.Name, - Path: config.DataDir, - URL: info.RaftURL, - BindAddr: info.RaftListenHost, - SnapshotCount: config.SnapshotCount, + Name: info.Name, + Path: config.DataDir, + URL: info.RaftURL, + BindAddr: info.RaftListenHost, + SnapshotCount: config.SnapshotCount, HeartbeatTimeout: time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond, - ElectionTimeout: time.Duration(config.Peer.ElectionTimeout) * time.Millisecond, - MaxClusterSize: config.MaxClusterSize, - RetryTimes: config.MaxRetryAttempts, - CORS: corsInfo, + ElectionTimeout: time.Duration(config.Peer.ElectionTimeout) * time.Millisecond, + MaxClusterSize: config.MaxClusterSize, + RetryTimes: config.MaxRetryAttempts, + CORS: corsInfo, } ps := server.NewPeerServer(psConfig, &peerTLSConfig, &info.RaftTLS, registry, store, &mb) // Create client server. sConfig := server.ServerConfig{ - Name: info.Name, - URL: info.EtcdURL, + Name: info.Name, + URL: info.EtcdURL, BindAddr: info.EtcdListenHost, - CORS: corsInfo, + CORS: corsInfo, } s := server.New(sConfig, &tlsConfig, &info.EtcdTLS, ps, registry, store, &mb) diff --git a/server/peer_server.go b/server/peer_server.go index 92da00434..b5d8fde38 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -40,19 +40,19 @@ type PeerServerConfig struct { } type PeerServer struct { - Config PeerServerConfig - raftServer raft.Server - server *Server - httpServer *http.Server - listener net.Listener - joinIndex uint64 - tlsConf *TLSConfig - tlsInfo *TLSInfo - followersStats *raftFollowersStats - serverStats *raftServerStats - registry *Registry - store store.Store - snapConf *snapshotConf + Config PeerServerConfig + raftServer raft.Server + server *Server + httpServer *http.Server + listener net.Listener + joinIndex uint64 + tlsConf *TLSConfig + tlsInfo *TLSInfo + followersStats *raftFollowersStats + serverStats *raftServerStats + registry *Registry + store store.Store + snapConf *snapshotConf closeChan chan bool timeoutThresholdChan chan interface{} diff --git a/server/server.go b/server/server.go index c690c060a..99f440e66 100644 --- a/server/server.go +++ b/server/server.go @@ -57,12 +57,12 @@ func New(sConfig ServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, peerServer TLSConfig: &tlsConf.Server, Addr: sConfig.BindAddr, }, - store: store, - registry: registry, - tlsConf: tlsConf, - tlsInfo: tlsInfo, - peerServer: peerServer, - router: r, + store: store, + registry: registry, + tlsConf: tlsConf, + tlsInfo: tlsInfo, + peerServer: peerServer, + router: r, corsMiddleware: cors, metrics: mb, } From d0c4916fe9b2afaa273a2a7bc9782321a866ab9f Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 20 Jan 2014 09:59:33 -0800 Subject: [PATCH 07/20] refactor(server): move listener init out of server.go --- etcd.go | 16 +++++++-- server/listener.go | 39 ++++++++++++++++++++++ server/server.go | 77 ++++++------------------------------------- tests/server_utils.go | 9 +++-- 4 files changed, 68 insertions(+), 73 deletions(-) create mode 100644 server/listener.go diff --git a/etcd.go b/etcd.go index d24814840..96ab64957 100644 --- a/etcd.go +++ b/etcd.go @@ -18,6 +18,7 @@ package main import ( "fmt" + "net" "os" "runtime" "time" @@ -128,20 +129,29 @@ func main() { sConfig := server.ServerConfig{ Name: info.Name, URL: info.EtcdURL, - BindAddr: info.EtcdListenHost, CORS: corsInfo, } - s := server.New(sConfig, &tlsConfig, &info.EtcdTLS, ps, registry, store, &mb) + s := server.New(sConfig, ps, registry, store, &mb) if config.Trace() { s.EnableTracing() } + var sListener net.Listener + if tlsConfig.Scheme == "https" { + sListener, err = server.NewTLSListener(info.EtcdListenHost, info.EtcdTLS.CertFile, info.EtcdTLS.KeyFile) + } else { + sListener, err = server.NewListener(info.EtcdListenHost) + } + if err != nil { + panic(err) + } + ps.SetServer(s) // Run peer server in separate thread while the client server blocks. go func() { log.Fatal(ps.ListenAndServe(config.Snapshot, config.Peers)) }() - log.Fatal(s.ListenAndServe()) + log.Fatal(s.Serve(sListener)) } diff --git a/server/listener.go b/server/listener.go new file mode 100644 index 000000000..291d022ae --- /dev/null +++ b/server/listener.go @@ -0,0 +1,39 @@ +package server + +import ( + "crypto/tls" + "net" +) + +func NewListener(addr string) (net.Listener, error) { + if addr == "" { + addr = ":http" + } + l, e := net.Listen("tcp", addr) + if e != nil { + return nil, e + } + return l, nil +} + +func NewTLSListener(addr, keyFile, certFile string) (net.Listener, error) { + if addr == "" { + addr = ":https" + } + config := &tls.Config{} + config.NextProtos = []string{"http/1.1"} + + var err error + config.Certificates = make([]tls.Certificate, 1) + config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, err + } + + conn, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + + return tls.NewListener(conn, config), nil +} diff --git a/server/server.go b/server/server.go index 99f440e66..c3bedfaf6 100644 --- a/server/server.go +++ b/server/server.go @@ -1,7 +1,6 @@ package server import ( - "crypto/tls" "encoding/json" "fmt" "net" @@ -24,10 +23,9 @@ import ( ) type ServerConfig struct { - Name string - URL string - BindAddr string - CORS *corsInfo + Name string + URL string + CORS *corsInfo } // This is the default implementation of the Server interface. @@ -36,31 +34,25 @@ type Server struct { Config ServerConfig peerServer *PeerServer registry *Registry - listener net.Listener store store.Store - tlsConf *TLSConfig - tlsInfo *TLSInfo router *mux.Router corsMiddleware *corsHTTPMiddleware metrics *metrics.Bucket + listener net.Listener } // Creates a new Server. -func New(sConfig ServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, peerServer *PeerServer, registry *Registry, store store.Store, mb *metrics.Bucket) *Server { +func New(sConfig ServerConfig, peerServer *PeerServer, registry *Registry, store store.Store, mb *metrics.Bucket) *Server { r := mux.NewRouter() cors := &corsHTTPMiddleware{r, sConfig.CORS} s := &Server{ Config: sConfig, Server: http.Server{ - Handler: cors, - TLSConfig: &tlsConf.Server, - Addr: sConfig.BindAddr, + Handler: cors, }, store: store, registry: registry, - tlsConf: tlsConf, - tlsInfo: tlsInfo, peerServer: peerServer, router: r, corsMiddleware: cors, @@ -198,59 +190,10 @@ func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Reque } // Start to listen and response etcd client command -func (s *Server) ListenAndServe() error { - log.Infof("etcd server [name %s, listen on %s, advertised url %s]", s.Config.Name, s.Server.Addr, s.Config.URL) - - if s.tlsConf.Scheme == "http" { - return s.listenAndServe() - } else { - return s.listenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile) - } -} - -// Overridden version of net/http added so we can manage the listener. -func (s *Server) listenAndServe() error { - addr := s.Server.Addr - if addr == "" { - addr = ":http" - } - l, e := net.Listen("tcp", addr) - if e != nil { - return e - } - s.listener = l - return s.Server.Serve(l) -} - -// Overridden version of net/http added so we can manage the listener. -func (s *Server) listenAndServeTLS(certFile, keyFile string) error { - addr := s.Server.Addr - if addr == "" { - addr = ":https" - } - config := &tls.Config{} - if s.Server.TLSConfig != nil { - *config = *s.Server.TLSConfig - } - if config.NextProtos == nil { - config.NextProtos = []string{"http/1.1"} - } - - var err error - config.Certificates = make([]tls.Certificate, 1) - config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - return err - } - - conn, err := net.Listen("tcp", addr) - if err != nil { - return err - } - - tlsListener := tls.NewListener(conn, config) - s.listener = tlsListener - return s.Server.Serve(tlsListener) +func (s *Server) Serve(listener net.Listener) error { + log.Infof("etcd server [name %s, listen on %s, advertised url %s]", s.Config.Name, listener.Addr(), s.Config.URL) + s.listener = listener + return s.Server.Serve(listener) } // Stops the server. diff --git a/tests/server_utils.go b/tests/server_utils.go index 93f3795b9..2977224e9 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -43,10 +43,13 @@ func RunServer(f func(*server.Server)) { sConfig := server.ServerConfig{ Name: testName, URL: "http://"+testClientURL, - BindAddr: testClientURL, CORS: corsInfo, } - s := server.New(sConfig, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store, nil) + s := server.New(sConfig, ps, registry, store, nil) + sListener, err := server.NewListener(testClientURL) + if err != nil { + panic(err) + } ps.SetServer(s) @@ -61,7 +64,7 @@ func RunServer(f func(*server.Server)) { // Start up etcd server. go func() { c <- true - s.ListenAndServe() + s.Serve(sListener) }() <-c From 7bd4d05a380b341b99da6d7ae5e8ab241b3d6470 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 20 Jan 2014 10:38:36 -0800 Subject: [PATCH 08/20] refactor(peer-server): move listener init out of peer_server.go --- etcd.go | 14 ++++- server/peer_server.go | 118 ++++++++++-------------------------------- tests/server_utils.go | 8 ++- 3 files changed, 45 insertions(+), 95 deletions(-) diff --git a/etcd.go b/etcd.go index 96ab64957..5f6222149 100644 --- a/etcd.go +++ b/etcd.go @@ -114,8 +114,8 @@ func main() { psConfig := server.PeerServerConfig{ Name: info.Name, Path: config.DataDir, + Scheme: peerTLSConfig.Scheme, URL: info.RaftURL, - BindAddr: info.RaftListenHost, SnapshotCount: config.SnapshotCount, HeartbeatTimeout: time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond, ElectionTimeout: time.Duration(config.Peer.ElectionTimeout) * time.Millisecond, @@ -125,6 +125,16 @@ func main() { } ps := server.NewPeerServer(psConfig, &peerTLSConfig, &info.RaftTLS, registry, store, &mb) + var psListener net.Listener + if psConfig.Scheme == "https" { + psListener, err = server.NewTLSListener(info.RaftListenHost, info.RaftTLS.CertFile, info.RaftTLS.KeyFile) + } else { + psListener, err = server.NewListener(info.RaftListenHost) + } + if err != nil { + panic(err) + } + // Create client server. sConfig := server.ServerConfig{ Name: info.Name, @@ -151,7 +161,7 @@ func main() { // Run peer server in separate thread while the client server blocks. go func() { - log.Fatal(ps.ListenAndServe(config.Snapshot, config.Peers)) + log.Fatal(ps.Serve(psListener, config.Snapshot, config.Peers)) }() log.Fatal(s.Serve(sListener)) } diff --git a/server/peer_server.go b/server/peer_server.go index b5d8fde38..5eea6b363 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -2,7 +2,6 @@ package server import ( "bytes" - "crypto/tls" "encoding/binary" "encoding/json" "fmt" @@ -29,8 +28,8 @@ const ThresholdMonitorTimeout = 5 * time.Second type PeerServerConfig struct { Name string Path string + Scheme string URL string - BindAddr string SnapshotCount int HeartbeatTimeout time.Duration ElectionTimeout time.Duration @@ -43,8 +42,6 @@ type PeerServer struct { Config PeerServerConfig raftServer raft.Server server *Server - httpServer *http.Server - listener net.Listener joinIndex uint64 tlsConf *TLSConfig tlsInfo *TLSInfo @@ -54,6 +51,8 @@ type PeerServer struct { store store.Store snapConf *snapshotConf + listener net.Listener + closeChan chan bool timeoutThresholdChan chan interface{} @@ -77,8 +76,6 @@ func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSIn s := &PeerServer{ Config: psConfig, - tlsConf: tlsConf, - tlsInfo: tlsInfo, registry: registry, store: store, followersStats: &raftFollowersStats{ @@ -132,7 +129,7 @@ func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSIn } // Start the raft server -func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error { +func (s *PeerServer) Serve(listener net.Listener, snapshot bool, cluster []string) error { // LoadSnapshot if snapshot { err := s.raftServer.LoadSnapshot() @@ -185,56 +182,29 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error { go s.monitorSnapshot() } - // start to response to raft requests - return s.startTransport(s.tlsConf.Scheme, s.tlsConf.Server) + router := mux.NewRouter() + httpServer := &http.Server{Handler: router} + + // internal commands + router.HandleFunc("/name", s.NameHttpHandler) + router.HandleFunc("/version", s.VersionHttpHandler) + router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler) + router.HandleFunc("/upgrade", s.UpgradeHttpHandler) + router.HandleFunc("/join", s.JoinHttpHandler) + router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler) + router.HandleFunc("/vote", s.VoteHttpHandler) + router.HandleFunc("/log", s.GetLogHttpHandler) + router.HandleFunc("/log/append", s.AppendEntriesHttpHandler) + router.HandleFunc("/snapshot", s.SnapshotHttpHandler) + router.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler) + router.HandleFunc("/etcdURL", s.EtcdURLHttpHandler) + + s.listener = listener + log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.Config.Name, listener.Addr(), s.Config.URL) + httpServer.Serve(listener) + return nil } -// Overridden version of net/http added so we can manage the listener. -func (s *PeerServer) listenAndServe() error { - addr := s.httpServer.Addr - if addr == "" { - addr = ":http" - } - l, e := net.Listen("tcp", addr) - if e != nil { - return e - } - s.listener = l - return s.httpServer.Serve(l) -} - -// Overridden version of net/http added so we can manage the listener. -func (s *PeerServer) listenAndServeTLS(certFile, keyFile string) error { - addr := s.httpServer.Addr - if addr == "" { - addr = ":https" - } - config := &tls.Config{} - if s.httpServer.TLSConfig != nil { - *config = *s.httpServer.TLSConfig - } - if config.NextProtos == nil { - config.NextProtos = []string{"http/1.1"} - } - - var err error - config.Certificates = make([]tls.Certificate, 1) - config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - return err - } - - conn, err := net.Listen("tcp", addr) - if err != nil { - return err - } - - tlsListener := tls.NewListener(conn, config) - s.listener = tlsListener - return s.httpServer.Serve(tlsListener) -} - -// Stops the server. func (s *PeerServer) Close() { if s.closeChan != nil { close(s.closeChan) @@ -281,40 +251,6 @@ func (s *PeerServer) startAsFollower(cluster []string) { log.Fatalf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes) } -// Start to listen and response raft command -func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) error { - log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.Config.Name, s.Config.BindAddr, s.Config.URL) - - router := mux.NewRouter() - - s.httpServer = &http.Server{ - Handler: router, - TLSConfig: &tlsConf, - Addr: s.Config.BindAddr, - } - - // internal commands - router.HandleFunc("/name", s.NameHttpHandler) - router.HandleFunc("/version", s.VersionHttpHandler) - router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler) - router.HandleFunc("/upgrade", s.UpgradeHttpHandler) - router.HandleFunc("/join", s.JoinHttpHandler) - router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler) - router.HandleFunc("/vote", s.VoteHttpHandler) - router.HandleFunc("/log", s.GetLogHttpHandler) - router.HandleFunc("/log/append", s.AppendEntriesHttpHandler) - router.HandleFunc("/snapshot", s.SnapshotHttpHandler) - router.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler) - router.HandleFunc("/etcdURL", s.EtcdURLHttpHandler) - - if scheme == "http" { - return s.listenAndServe() - } else { - return s.listenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile) - } - -} - // getVersion fetches the peer version of a cluster. func getVersion(t *transporter, versionURL url.URL) (int, error) { resp, req, err := t.Get(versionURL.String()) @@ -344,7 +280,7 @@ func (s *PeerServer) Upgradable() error { } t, _ := s.raftServer.Transporter().(*transporter) - checkURL := (&url.URL{Host: u.Host, Scheme: s.tlsConf.Scheme, Path: fmt.Sprintf("/version/%d/check", nextVersion)}).String() + checkURL := (&url.URL{Host: u.Host, Scheme: s.Config.Scheme, Path: fmt.Sprintf("/version/%d/check", nextVersion)}).String() resp, _, err := t.Get(checkURL) if err != nil { return fmt.Errorf("PeerServer: Cannot check version compatibility: %s", u.Host) @@ -363,7 +299,7 @@ func (s *PeerServer) joinCluster(cluster []string) bool { continue } - err := s.joinByPeer(s.raftServer, peer, s.tlsConf.Scheme) + err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme) if err == nil { log.Debugf("%s success join to the cluster via peer %s", s.Config.Name, peer) return true diff --git a/tests/server_utils.go b/tests/server_utils.go index 2977224e9..596f960a0 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -31,7 +31,7 @@ func RunServer(f func(*server.Server)) { Name: testName, Path: path, URL: "http://"+testRaftURL, - BindAddr: testRaftURL, + Scheme: "http", SnapshotCount: testSnapshotCount, HeartbeatTimeout: testHeartbeatTimeout, ElectionTimeout: testElectionTimeout, @@ -39,6 +39,10 @@ func RunServer(f func(*server.Server)) { CORS: corsInfo, } ps := server.NewPeerServer(psConfig, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, nil) + psListener, err := server.NewListener(testRaftURL) + if err != nil { + panic(err) + } sConfig := server.ServerConfig{ Name: testName, @@ -57,7 +61,7 @@ func RunServer(f func(*server.Server)) { c := make(chan bool) go func() { c <- true - ps.ListenAndServe(false, []string{}) + ps.Serve(psListener, false, []string{}) }() <-c From 86718167e88ad5c16554bb4f14f73da5b174db9b Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 20 Jan 2014 11:07:46 -0800 Subject: [PATCH 09/20] refactor(peer_server): move stats construction to factories --- server/peer_server.go | 18 +++--------------- server/raft_follower_stats.go | 7 +++++++ server/raft_server_stats.go | 13 +++++++++++++ 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/server/peer_server.go b/server/peer_server.go index 5eea6b363..bec33486d 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -75,23 +75,11 @@ type snapshotConf struct { func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, mb *metrics.Bucket) *PeerServer { s := &PeerServer{ Config: psConfig, - registry: registry, store: store, - followersStats: &raftFollowersStats{ - Leader: psConfig.Name, - Followers: make(map[string]*raftFollowerStats), - }, - serverStats: &raftServerStats{ - Name: psConfig.Name, - StartTime: time.Now(), - sendRateQueue: &statsQueue{ - back: -1, - }, - recvRateQueue: &statsQueue{ - back: -1, - }, - }, + + followersStats: newRaftFollowersStats(psConfig.Name), + serverStats: newRaftServerStats(psConfig.Name), timeoutThresholdChan: make(chan interface{}, 1), diff --git a/server/raft_follower_stats.go b/server/raft_follower_stats.go index 96b76c85b..b22bb803e 100644 --- a/server/raft_follower_stats.go +++ b/server/raft_follower_stats.go @@ -10,6 +10,13 @@ type raftFollowersStats struct { Followers map[string]*raftFollowerStats `json:"followers"` } +func newRaftFollowersStats(name string) *raftFollowersStats { + return &raftFollowersStats{ + Leader: name, + Followers: make(map[string]*raftFollowerStats), + } +} + type raftFollowerStats struct { Latency struct { Current float64 `json:"current"` diff --git a/server/raft_server_stats.go b/server/raft_server_stats.go index 8f37752d2..518863539 100644 --- a/server/raft_server_stats.go +++ b/server/raft_server_stats.go @@ -29,6 +29,19 @@ type raftServerStats struct { recvRateQueue *statsQueue } +func newRaftServerStats(name string) *raftServerStats { + return &raftServerStats{ + Name: name, + StartTime: time.Now(), + sendRateQueue: &statsQueue{ + back: -1, + }, + recvRateQueue: &statsQueue{ + back: -1, + }, + } +} + func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) { ss.State = raft.Follower if leaderName != ss.LeaderInfo.Name { From 60bbc57aeb02cefd10bc436c64bb7d535c458264 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 20 Jan 2014 11:29:05 -0800 Subject: [PATCH 10/20] refactor(transporter): pass in timeouts --- server/peer_server.go | 4 +++- server/transporter.go | 23 +++++++++++------------ 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/server/peer_server.go b/server/peer_server.go index bec33486d..d6a30f92a 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -87,7 +87,9 @@ func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSIn } // Create transporter for raft - raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s) + dialTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout + responseHeaderTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout + raftTransporter := newTransporter(psConfig.Scheme, tlsConf.Client, s, psConfig.HeartbeatTimeout, dialTimeout, responseHeaderTimeout) // Create raft server raftServer, err := raft.NewServer(psConfig.Name, psConfig.Path, raftTransporter, s.store, s, "") diff --git a/server/transporter.go b/server/transporter.go index 1aa5442fd..f9479f7a0 100644 --- a/server/transporter.go +++ b/server/transporter.go @@ -15,9 +15,11 @@ import ( // Transporter layer for communication between raft nodes type transporter struct { + requestTimeout time.Duration + + peerServer *PeerServer client *http.Client transport *http.Transport - peerServer *PeerServer } type dialer func(network, addr string) (net.Conn, error) @@ -25,13 +27,7 @@ type dialer func(network, addr string) (net.Conn, error) // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key -func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter { - // names for each type of timeout, for the sake of clarity - dialTimeout := (3 * peerServer.Config.HeartbeatTimeout) + peerServer.Config.ElectionTimeout - responseHeaderTimeout := (3 * peerServer.Config.HeartbeatTimeout) + peerServer.Config.ElectionTimeout - - t := transporter{} - +func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter { tr := &http.Transport{ Dial: func(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, dialTimeout) @@ -44,9 +40,12 @@ func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) * tr.DisableCompression = true } - t.client = &http.Client{Transport: tr} - t.transport = tr - t.peerServer = peerServer + t := transporter{ + client: &http.Client{Transport: tr}, + transport: tr, + peerServer: peerServer, + requestTimeout: requestTimeout, + } return &t } @@ -227,7 +226,7 @@ func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) // Cancel the on fly HTTP transaction when timeout happens. func (t *transporter) CancelWhenTimeout(req *http.Request) { go func() { - time.Sleep(t.peerServer.Config.HeartbeatTimeout) + time.Sleep(t.requestTimeout) t.transport.CancelRequest(req) }() } From ffa2b07dc4893f4c8f842f4f4c860f489b6a7b5e Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 20 Jan 2014 11:39:41 -0800 Subject: [PATCH 11/20] refactor(transporter): Pass in everything the transporter needs --- server/peer_server.go | 9 +++++---- server/transporter.go | 32 ++++++++++++++++++-------------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/server/peer_server.go b/server/peer_server.go index d6a30f92a..781ed4db0 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -73,13 +73,14 @@ type snapshotConf struct { } func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, mb *metrics.Bucket) *PeerServer { + followersStats := newRaftFollowersStats(psConfig.Name) + serverStats := newRaftServerStats(psConfig.Name) s := &PeerServer{ Config: psConfig, registry: registry, store: store, - - followersStats: newRaftFollowersStats(psConfig.Name), - serverStats: newRaftServerStats(psConfig.Name), + followersStats: followersStats, + serverStats: serverStats, timeoutThresholdChan: make(chan interface{}, 1), @@ -89,7 +90,7 @@ func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSIn // Create transporter for raft dialTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout responseHeaderTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout - raftTransporter := newTransporter(psConfig.Scheme, tlsConf.Client, s, psConfig.HeartbeatTimeout, dialTimeout, responseHeaderTimeout) + raftTransporter := newTransporter(psConfig.Scheme, tlsConf.Client, followersStats, serverStats, registry, psConfig.HeartbeatTimeout, dialTimeout, responseHeaderTimeout) // Create raft server raftServer, err := raft.NewServer(psConfig.Name, psConfig.Path, raftTransporter, s.store, s, "") diff --git a/server/transporter.go b/server/transporter.go index f9479f7a0..98be6fd1c 100644 --- a/server/transporter.go +++ b/server/transporter.go @@ -16,10 +16,12 @@ import ( // Transporter layer for communication between raft nodes type transporter struct { requestTimeout time.Duration + followersStats *raftFollowersStats + serverStats *raftServerStats + registry *Registry - peerServer *PeerServer - client *http.Client - transport *http.Transport + client *http.Client + transport *http.Transport } type dialer func(network, addr string) (net.Conn, error) @@ -27,7 +29,7 @@ type dialer func(network, addr string) (net.Conn, error) // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key -func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter { +func newTransporter(scheme string, tlsConf tls.Config, followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter { tr := &http.Transport{ Dial: func(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, dialTimeout) @@ -41,10 +43,12 @@ func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer, d } t := transporter{ - client: &http.Client{Transport: tr}, - transport: tr, - peerServer: peerServer, + client: &http.Client{Transport: tr}, + transport: tr, requestTimeout: requestTimeout, + followersStats: followersStats, + serverStats: serverStats, + registry: registry, } return &t @@ -61,18 +65,18 @@ func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Pe size := b.Len() - t.peerServer.serverStats.SendAppendReq(size) + t.serverStats.SendAppendReq(size) - u, _ := t.peerServer.registry.PeerURL(peer.Name) + u, _ := t.registry.PeerURL(peer.Name) log.Debugf("Send LogEntries to %s ", u) - thisFollowerStats, ok := t.peerServer.followersStats.Followers[peer.Name] + thisFollowerStats, ok := t.followersStats.Followers[peer.Name] if !ok { //this is the first time this follower has been seen thisFollowerStats = &raftFollowerStats{} thisFollowerStats.Latency.Minimum = 1 << 63 - t.peerServer.followersStats.Followers[peer.Name] = thisFollowerStats + t.followersStats.Followers[peer.Name] = thisFollowerStats } start := time.Now() @@ -118,7 +122,7 @@ func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req * return nil } - u, _ := t.peerServer.registry.PeerURL(peer.Name) + u, _ := t.registry.PeerURL(peer.Name) log.Debugf("Send Vote from %s to %s", server.Name(), u) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b) @@ -151,7 +155,7 @@ func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, r return nil } - u, _ := t.peerServer.registry.PeerURL(peer.Name) + u, _ := t.registry.PeerURL(peer.Name) log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b) @@ -184,7 +188,7 @@ func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft return nil } - u, _ := t.peerServer.registry.PeerURL(peer.Name) + u, _ := t.registry.PeerURL(peer.Name) log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b) From a2ee62039438cf5cf81278f758e9d38518378a62 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 20 Jan 2014 12:42:13 -0800 Subject: [PATCH 12/20] refactor(raft): init raft transporter & server in main --- etcd.go | 24 +++++++++++++++--- server/listener.go | 2 +- server/peer_server.go | 46 ++++++++++++----------------------- server/raft_follower_stats.go | 2 +- server/raft_server_stats.go | 2 +- server/transporter.go | 2 +- tests/server_utils.go | 22 ++++++++++++++--- 7 files changed, 59 insertions(+), 41 deletions(-) diff --git a/etcd.go b/etcd.go index 5f6222149..ac1be5f1b 100644 --- a/etcd.go +++ b/etcd.go @@ -110,6 +110,16 @@ func main() { store := store.New() registry := server.NewRegistry(store) + // Create stats objects + followersStats := server.NewRaftFollowersStats(info.Name) + serverStats := server.NewRaftServerStats(info.Name) + + // Calculate all of our timeouts + heartbeatTimeout := time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond + electionTimeout := time.Duration(config.Peer.ElectionTimeout) * time.Millisecond + dialTimeout := (3 * heartbeatTimeout) + electionTimeout + responseHeaderTimeout := (3 * heartbeatTimeout) + electionTimeout + // Create peer server. psConfig := server.PeerServerConfig{ Name: info.Name, @@ -117,13 +127,11 @@ func main() { Scheme: peerTLSConfig.Scheme, URL: info.RaftURL, SnapshotCount: config.SnapshotCount, - HeartbeatTimeout: time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond, - ElectionTimeout: time.Duration(config.Peer.ElectionTimeout) * time.Millisecond, MaxClusterSize: config.MaxClusterSize, RetryTimes: config.MaxRetryAttempts, CORS: corsInfo, } - ps := server.NewPeerServer(psConfig, &peerTLSConfig, &info.RaftTLS, registry, store, &mb) + ps := server.NewPeerServer(psConfig, registry, store, &mb, followersStats, serverStats) var psListener net.Listener if psConfig.Scheme == "https" { @@ -135,6 +143,16 @@ func main() { panic(err) } + // Create Raft transporter and server + raftTransporter := server.NewTransporter(peerTLSConfig.Scheme, peerTLSConfig.Client, followersStats, serverStats, registry, heartbeatTimeout, dialTimeout, responseHeaderTimeout) + raftServer, err := raft.NewServer(info.Name, config.DataDir, raftTransporter, store, ps, "") + if err != nil { + log.Fatal(err) + } + raftServer.SetElectionTimeout(electionTimeout) + raftServer.SetHeartbeatTimeout(heartbeatTimeout) + ps.SetRaftServer(raftServer) + // Create client server. sConfig := server.ServerConfig{ Name: info.Name, diff --git a/server/listener.go b/server/listener.go index 291d022ae..dd3cfa9e1 100644 --- a/server/listener.go +++ b/server/listener.go @@ -16,7 +16,7 @@ func NewListener(addr string) (net.Listener, error) { return l, nil } -func NewTLSListener(addr, keyFile, certFile string) (net.Listener, error) { +func NewTLSListener(addr, certFile, keyFile string) (net.Listener, error) { if addr == "" { addr = ":https" } diff --git a/server/peer_server.go b/server/peer_server.go index 781ed4db0..5576cc6d7 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -43,8 +43,6 @@ type PeerServer struct { raftServer raft.Server server *Server joinIndex uint64 - tlsConf *TLSConfig - tlsInfo *TLSInfo followersStats *raftFollowersStats serverStats *raftServerStats registry *Registry @@ -72,9 +70,7 @@ type snapshotConf struct { snapshotThr uint64 } -func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, mb *metrics.Bucket) *PeerServer { - followersStats := newRaftFollowersStats(psConfig.Name) - serverStats := newRaftServerStats(psConfig.Name) +func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer { s := &PeerServer{ Config: psConfig, registry: registry, @@ -86,37 +82,28 @@ func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSIn metrics: mb, } + return s +} - // Create transporter for raft - dialTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout - responseHeaderTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout - raftTransporter := newTransporter(psConfig.Scheme, tlsConf.Client, followersStats, serverStats, registry, psConfig.HeartbeatTimeout, dialTimeout, responseHeaderTimeout) - - // Create raft server - raftServer, err := raft.NewServer(psConfig.Name, psConfig.Path, raftTransporter, s.store, s, "") - if err != nil { - log.Fatal(err) - } - +func (s *PeerServer) SetRaftServer(raftServer raft.Server) { s.snapConf = &snapshotConf{ checkingInterval: time.Second * 3, // this is not accurate, we will update raft to provide an api lastIndex: raftServer.CommitIndex(), - snapshotThr: uint64(psConfig.SnapshotCount), + snapshotThr: uint64(s.Config.SnapshotCount), } + raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger) + raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger) + raftServer.AddEventListener(raft.TermChangeEventType, s.raftEventLogger) + raftServer.AddEventListener(raft.AddPeerEventType, s.raftEventLogger) + raftServer.AddEventListener(raft.RemovePeerEventType, s.raftEventLogger) + raftServer.AddEventListener(raft.HeartbeatTimeoutEventType, s.raftEventLogger) + raftServer.AddEventListener(raft.ElectionTimeoutThresholdEventType, s.raftEventLogger) + + raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent) + s.raftServer = raftServer - s.raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger) - s.raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger) - s.raftServer.AddEventListener(raft.TermChangeEventType, s.raftEventLogger) - s.raftServer.AddEventListener(raft.AddPeerEventType, s.raftEventLogger) - s.raftServer.AddEventListener(raft.RemovePeerEventType, s.raftEventLogger) - s.raftServer.AddEventListener(raft.HeartbeatTimeoutEventType, s.raftEventLogger) - s.raftServer.AddEventListener(raft.ElectionTimeoutThresholdEventType, s.raftEventLogger) - - s.raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent) - - return s } // Start the raft server @@ -132,9 +119,6 @@ func (s *PeerServer) Serve(listener net.Listener, snapshot bool, cluster []strin } } - s.raftServer.SetElectionTimeout(s.Config.ElectionTimeout) - s.raftServer.SetHeartbeatTimeout(s.Config.HeartbeatTimeout) - s.raftServer.Start() if s.raftServer.IsLogEmpty() { diff --git a/server/raft_follower_stats.go b/server/raft_follower_stats.go index b22bb803e..1166c02d6 100644 --- a/server/raft_follower_stats.go +++ b/server/raft_follower_stats.go @@ -10,7 +10,7 @@ type raftFollowersStats struct { Followers map[string]*raftFollowerStats `json:"followers"` } -func newRaftFollowersStats(name string) *raftFollowersStats { +func NewRaftFollowersStats(name string) *raftFollowersStats { return &raftFollowersStats{ Leader: name, Followers: make(map[string]*raftFollowerStats), diff --git a/server/raft_server_stats.go b/server/raft_server_stats.go index 518863539..05bc04f85 100644 --- a/server/raft_server_stats.go +++ b/server/raft_server_stats.go @@ -29,7 +29,7 @@ type raftServerStats struct { recvRateQueue *statsQueue } -func newRaftServerStats(name string) *raftServerStats { +func NewRaftServerStats(name string) *raftServerStats { return &raftServerStats{ Name: name, StartTime: time.Now(), diff --git a/server/transporter.go b/server/transporter.go index 98be6fd1c..3b0cb6d68 100644 --- a/server/transporter.go +++ b/server/transporter.go @@ -29,7 +29,7 @@ type dialer func(network, addr string) (net.Conn, error) // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key -func newTransporter(scheme string, tlsConf tls.Config, followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter { +func NewTransporter(scheme string, tlsConf tls.Config, followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter { tr := &http.Transport{ Dial: func(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, dialTimeout) diff --git a/tests/server_utils.go b/tests/server_utils.go index 596f960a0..cbe11aa1e 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -5,6 +5,8 @@ import ( "os" "time" + "github.com/coreos/raft" + "github.com/coreos/etcd/server" "github.com/coreos/etcd/store" ) @@ -27,23 +29,37 @@ func RunServer(f func(*server.Server)) { registry := server.NewRegistry(store) corsInfo, _ := server.NewCORSInfo([]string{}) + serverStats := server.NewRaftServerStats(testName) + followersStats := server.NewRaftFollowersStats(testName) + psConfig := server.PeerServerConfig{ Name: testName, Path: path, URL: "http://"+testRaftURL, Scheme: "http", SnapshotCount: testSnapshotCount, - HeartbeatTimeout: testHeartbeatTimeout, - ElectionTimeout: testElectionTimeout, MaxClusterSize: 9, CORS: corsInfo, } - ps := server.NewPeerServer(psConfig, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, nil) + ps := server.NewPeerServer(psConfig, registry, store, nil, followersStats, serverStats) psListener, err := server.NewListener(testRaftURL) if err != nil { panic(err) } + // Create Raft transporter and server + tls := &server.TLSConfig{Scheme: "http"} + dialTimeout := (3 * testHeartbeatTimeout) + testElectionTimeout + responseHeaderTimeout := (3 * testHeartbeatTimeout) + testElectionTimeout + raftTransporter := server.NewTransporter(tls.Scheme, tls.Client, followersStats, serverStats, registry, testHeartbeatTimeout, dialTimeout, responseHeaderTimeout) + raftServer, err := raft.NewServer(testName, path, raftTransporter, store, ps, "") + if err != nil { + panic(err) + } + raftServer.SetElectionTimeout(testElectionTimeout) + raftServer.SetHeartbeatTimeout(testHeartbeatTimeout) + ps.SetRaftServer(raftServer) + sConfig := server.ServerConfig{ Name: testName, URL: "http://"+testClientURL, From 074099a1b21eda2deef31bd131689aa9a5886732 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 20 Jan 2014 16:10:49 -0800 Subject: [PATCH 13/20] refactor(cors): Simplify corsInfo struct --- server/cors.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/server/cors.go b/server/cors.go index a3728b689..c9698573a 100644 --- a/server/cors.go +++ b/server/cors.go @@ -22,9 +22,7 @@ import ( "net/url" ) -type corsInfo struct { - origins map[string]bool -} +type corsInfo map[string]bool func NewCORSInfo(origins []string) (*corsInfo, error) { // Construct a lookup of all origins. @@ -38,12 +36,13 @@ func NewCORSInfo(origins []string) (*corsInfo, error) { m[v] = true } - return &corsInfo{m}, nil + info := corsInfo(m) + return &info, nil } // OriginAllowed determines whether the server will allow a given CORS origin. -func (c *corsInfo) OriginAllowed(origin string) bool { - return c.origins["*"] || c.origins[origin] +func (c corsInfo) OriginAllowed(origin string) bool { + return c["*"] || c[origin] } type corsHTTPMiddleware struct { From 5c3a3db2d850db3cfcd765c47bd7d8fc81af9882 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 20 Jan 2014 16:24:58 -0800 Subject: [PATCH 14/20] refactor(server): treat Server as an http.Handler --- etcd.go | 7 ++- server/cors.go | 16 +++---- server/server.go | 120 ++++++++++++++++++++++------------------------- 3 files changed, 69 insertions(+), 74 deletions(-) diff --git a/etcd.go b/etcd.go index ac1be5f1b..8f98fa34e 100644 --- a/etcd.go +++ b/etcd.go @@ -19,6 +19,7 @@ package main import ( "fmt" "net" + "net/http" "os" "runtime" "time" @@ -157,7 +158,6 @@ func main() { sConfig := server.ServerConfig{ Name: info.Name, URL: info.EtcdURL, - CORS: corsInfo, } s := server.New(sConfig, ps, registry, store, &mb) @@ -181,5 +181,8 @@ func main() { go func() { log.Fatal(ps.Serve(psListener, config.Snapshot, config.Peers)) }() - log.Fatal(s.Serve(sListener)) + + log.Infof("etcd server [name %s, listen on %s, advertised url %s]", s.Config.Name, sListener.Addr(), s.Config.URL) + sHTTP := &server.CORSHTTPMiddleware{s, corsInfo} + log.Fatal(http.Serve(sListener, sHTTP)) } diff --git a/server/cors.go b/server/cors.go index c9698573a..d585fbfc9 100644 --- a/server/cors.go +++ b/server/cors.go @@ -45,24 +45,24 @@ func (c corsInfo) OriginAllowed(origin string) bool { return c["*"] || c[origin] } -type corsHTTPMiddleware struct { - next http.Handler - info *corsInfo +type CORSHTTPMiddleware struct { + Handler http.Handler + Info *corsInfo } // addHeader adds the correct cors headers given an origin -func (h *corsHTTPMiddleware) addHeader(w http.ResponseWriter, origin string) { +func (h *CORSHTTPMiddleware) addHeader(w http.ResponseWriter, origin string) { w.Header().Add("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") w.Header().Add("Access-Control-Allow-Origin", origin) } // ServeHTTP adds the correct CORS headers based on the origin and returns immediatly // with a 200 OK if the method is OPTIONS. -func (h *corsHTTPMiddleware) ServeHTTP(w http.ResponseWriter, req *http.Request) { +func (h *CORSHTTPMiddleware) ServeHTTP(w http.ResponseWriter, req *http.Request) { // Write CORS header. - if h.info.OriginAllowed("*") { + if h.Info.OriginAllowed("*") { h.addHeader(w, "*") - } else if origin := req.Header.Get("Origin"); h.info.OriginAllowed(origin) { + } else if origin := req.Header.Get("Origin"); h.Info.OriginAllowed(origin) { h.addHeader(w, origin) } @@ -71,5 +71,5 @@ func (h *corsHTTPMiddleware) ServeHTTP(w http.ResponseWriter, req *http.Request) return } - h.next.ServeHTTP(w, req) + h.Handler.ServeHTTP(w, req) } diff --git a/server/server.go b/server/server.go index c3bedfaf6..dd6d08ff0 100644 --- a/server/server.go +++ b/server/server.go @@ -25,51 +25,36 @@ import ( type ServerConfig struct { Name string URL string - CORS *corsInfo } // This is the default implementation of the Server interface. type Server struct { - http.Server Config ServerConfig peerServer *PeerServer registry *Registry store store.Store - router *mux.Router - corsMiddleware *corsHTTPMiddleware - metrics *metrics.Bucket + metrics *metrics.Bucket + listener net.Listener + + trace bool } // Creates a new Server. func New(sConfig ServerConfig, peerServer *PeerServer, registry *Registry, store store.Store, mb *metrics.Bucket) *Server { - r := mux.NewRouter() - cors := &corsHTTPMiddleware{r, sConfig.CORS} - s := &Server{ - Config: sConfig, - Server: http.Server{ - Handler: cors, - }, + Config: sConfig, store: store, registry: registry, peerServer: peerServer, - router: r, - corsMiddleware: cors, metrics: mb, } - // Install the routes. - s.handleFunc("/version", s.GetVersionHandler).Methods("GET") - s.installV1() - s.installV2() - s.installMod() - return s } func (s *Server) EnableTracing() { - s.installDebug() + s.trace = true } // The current state of the server in the cluster. @@ -112,64 +97,62 @@ func (s *Server) Store() store.Store { return s.store } -func (s *Server) installV1() { - s.handleFuncV1("/v1/keys/{key:.*}", v1.GetKeyHandler).Methods("GET") - s.handleFuncV1("/v1/keys/{key:.*}", v1.SetKeyHandler).Methods("POST", "PUT") - s.handleFuncV1("/v1/keys/{key:.*}", v1.DeleteKeyHandler).Methods("DELETE") - s.handleFuncV1("/v1/watch/{key:.*}", v1.WatchKeyHandler).Methods("GET", "POST") - s.handleFunc("/v1/leader", s.GetLeaderHandler).Methods("GET") - s.handleFunc("/v1/machines", s.GetPeersHandler).Methods("GET") - s.handleFunc("/v1/peers", s.GetPeersHandler).Methods("GET") - s.handleFunc("/v1/stats/self", s.GetStatsHandler).Methods("GET") - s.handleFunc("/v1/stats/leader", s.GetLeaderStatsHandler).Methods("GET") - s.handleFunc("/v1/stats/store", s.GetStoreStatsHandler).Methods("GET") +func (s *Server) installV1(r *mux.Router) { + s.handleFuncV1(r, "/v1/keys/{key:.*}", v1.GetKeyHandler).Methods("GET") + s.handleFuncV1(r, "/v1/keys/{key:.*}", v1.SetKeyHandler).Methods("POST", "PUT") + s.handleFuncV1(r, "/v1/keys/{key:.*}", v1.DeleteKeyHandler).Methods("DELETE") + s.handleFuncV1(r, "/v1/watch/{key:.*}", v1.WatchKeyHandler).Methods("GET", "POST") + s.handleFunc(r, "/v1/leader", s.GetLeaderHandler).Methods("GET") + s.handleFunc(r, "/v1/machines", s.GetPeersHandler).Methods("GET") + s.handleFunc(r, "/v1/peers", s.GetPeersHandler).Methods("GET") + s.handleFunc(r, "/v1/stats/self", s.GetStatsHandler).Methods("GET") + s.handleFunc(r, "/v1/stats/leader", s.GetLeaderStatsHandler).Methods("GET") + s.handleFunc(r, "/v1/stats/store", s.GetStoreStatsHandler).Methods("GET") } -func (s *Server) installV2() { - s.handleFuncV2("/v2/keys/{key:.*}", v2.GetHandler).Methods("GET") - s.handleFuncV2("/v2/keys/{key:.*}", v2.PostHandler).Methods("POST") - s.handleFuncV2("/v2/keys/{key:.*}", v2.PutHandler).Methods("PUT") - s.handleFuncV2("/v2/keys/{key:.*}", v2.DeleteHandler).Methods("DELETE") - s.handleFunc("/v2/leader", s.GetLeaderHandler).Methods("GET") - s.handleFunc("/v2/machines", s.GetPeersHandler).Methods("GET") - s.handleFunc("/v2/peers", s.GetPeersHandler).Methods("GET") - s.handleFunc("/v2/stats/self", s.GetStatsHandler).Methods("GET") - s.handleFunc("/v2/stats/leader", s.GetLeaderStatsHandler).Methods("GET") - s.handleFunc("/v2/stats/store", s.GetStoreStatsHandler).Methods("GET") - s.handleFunc("/v2/speedTest", s.SpeedTestHandler).Methods("GET") +func (s *Server) installV2(r *mux.Router) { + s.handleFuncV2(r, "/v2/keys/{key:.*}", v2.GetHandler).Methods("GET") + s.handleFuncV2(r, "/v2/keys/{key:.*}", v2.PostHandler).Methods("POST") + s.handleFuncV2(r, "/v2/keys/{key:.*}", v2.PutHandler).Methods("PUT") + s.handleFuncV2(r, "/v2/keys/{key:.*}", v2.DeleteHandler).Methods("DELETE") + s.handleFunc(r, "/v2/leader", s.GetLeaderHandler).Methods("GET") + s.handleFunc(r, "/v2/machines", s.GetPeersHandler).Methods("GET") + s.handleFunc(r, "/v2/peers", s.GetPeersHandler).Methods("GET") + s.handleFunc(r, "/v2/stats/self", s.GetStatsHandler).Methods("GET") + s.handleFunc(r, "/v2/stats/leader", s.GetLeaderStatsHandler).Methods("GET") + s.handleFunc(r, "/v2/stats/store", s.GetStoreStatsHandler).Methods("GET") + s.handleFunc(r, "/v2/speedTest", s.SpeedTestHandler).Methods("GET") } -func (s *Server) installMod() { - r := s.router +func (s *Server) installMod(r *mux.Router) { r.PathPrefix("/mod").Handler(http.StripPrefix("/mod", mod.HttpHandler(s.Config.URL))) } -func (s *Server) installDebug() { - s.handleFunc("/debug/metrics", s.GetMetricsHandler).Methods("GET") - s.router.HandleFunc("/debug/pprof", pprof.Index) - s.router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - s.router.HandleFunc("/debug/pprof/profile", pprof.Profile) - s.router.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - s.router.HandleFunc("/debug/pprof/{name}", pprof.Index) +func (s *Server) installDebug(r *mux.Router) { + s.handleFunc(r, "/debug/metrics", s.GetMetricsHandler).Methods("GET") + r.HandleFunc("/debug/pprof", pprof.Index) + r.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + r.HandleFunc("/debug/pprof/profile", pprof.Profile) + r.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + r.HandleFunc("/debug/pprof/{name}", pprof.Index) } // Adds a v1 server handler to the router. -func (s *Server) handleFuncV1(path string, f func(http.ResponseWriter, *http.Request, v1.Server) error) *mux.Route { - return s.handleFunc(path, func(w http.ResponseWriter, req *http.Request) error { +func (s *Server) handleFuncV1(r *mux.Router, path string, f func(http.ResponseWriter, *http.Request, v1.Server) error) *mux.Route { + return s.handleFunc(r, path, func(w http.ResponseWriter, req *http.Request) error { return f(w, req, s) }) } // Adds a v2 server handler to the router. -func (s *Server) handleFuncV2(path string, f func(http.ResponseWriter, *http.Request, v2.Server) error) *mux.Route { - return s.handleFunc(path, func(w http.ResponseWriter, req *http.Request) error { +func (s *Server) handleFuncV2(r *mux.Router, path string, f func(http.ResponseWriter, *http.Request, v2.Server) error) *mux.Route { + return s.handleFunc(r, path, func(w http.ResponseWriter, req *http.Request) error { return f(w, req, s) }) } // Adds a server handler to the router. -func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Request) error) *mux.Route { - r := s.router +func (s *Server) handleFunc(r *mux.Router, path string, f func(http.ResponseWriter, *http.Request) error) *mux.Route { // Wrap the standard HandleFunc interface to pass in the server reference. return r.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) { @@ -189,11 +172,20 @@ func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Reque }) } -// Start to listen and response etcd client command -func (s *Server) Serve(listener net.Listener) error { - log.Infof("etcd server [name %s, listen on %s, advertised url %s]", s.Config.Name, listener.Addr(), s.Config.URL) - s.listener = listener - return s.Server.Serve(listener) +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + router := mux.NewRouter() + + // Install the routes. + s.handleFunc(router, "/version", s.GetVersionHandler).Methods("GET") + s.installV1(router) + s.installV2(router) + s.installMod(router) + + if s.trace { + s.installDebug(router) + } + + router.ServeHTTP(w, r) } // Stops the server. From 0abd860f7e655903a3dee37964b3126c94fee13a Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 20 Jan 2014 17:22:09 -0800 Subject: [PATCH 15/20] refactor(server): drop Serve code; rename cors object * server/cors.go renamed to http/cors.go * all CORS code removed from Server and PeerServer * Server and PeerServer fulfill http.Handler, now passed to http.Serve * non-HTTP code in PeerServer.Serve moved to PeerServer.Start --- etcd.go | 12 +++++++---- {server => http}/cors.go | 18 ++++++++-------- server/peer_server.go | 38 +++++++++++++++++----------------- server/peer_server_handlers.go | 7 ------- server/server.go | 18 +++++++--------- tests/server_utils.go | 14 ++++++------- 6 files changed, 50 insertions(+), 57 deletions(-) rename {server => http}/cors.go (80%) diff --git a/etcd.go b/etcd.go index 8f98fa34e..385fc47c2 100644 --- a/etcd.go +++ b/etcd.go @@ -26,6 +26,7 @@ import ( "github.com/coreos/raft" + ehttp "github.com/coreos/etcd/http" "github.com/coreos/etcd/log" "github.com/coreos/etcd/metrics" "github.com/coreos/etcd/server" @@ -102,7 +103,7 @@ func main() { } // Retrieve CORS configuration - corsInfo, err := server.NewCORSInfo(config.CorsOrigins) + corsInfo, err := ehttp.NewCORSInfo(config.CorsOrigins) if err != nil { log.Fatal("CORS:", err) } @@ -130,7 +131,6 @@ func main() { SnapshotCount: config.SnapshotCount, MaxClusterSize: config.MaxClusterSize, RetryTimes: config.MaxRetryAttempts, - CORS: corsInfo, } ps := server.NewPeerServer(psConfig, registry, store, &mb, followersStats, serverStats) @@ -177,12 +177,16 @@ func main() { ps.SetServer(s) + ps.Start(config.Snapshot, config.Peers) + // Run peer server in separate thread while the client server blocks. go func() { - log.Fatal(ps.Serve(psListener, config.Snapshot, config.Peers)) + log.Infof("raft server [name %s, listen on %s, advertised url %s]", ps.Config.Name, psListener.Addr(), ps.Config.URL) + sHTTP := &ehttp.CORSHandler{ps, corsInfo} + log.Fatal(http.Serve(psListener, sHTTP)) }() log.Infof("etcd server [name %s, listen on %s, advertised url %s]", s.Config.Name, sListener.Addr(), s.Config.URL) - sHTTP := &server.CORSHTTPMiddleware{s, corsInfo} + sHTTP := &ehttp.CORSHandler{s, corsInfo} log.Fatal(http.Serve(sListener, sHTTP)) } diff --git a/server/cors.go b/http/cors.go similarity index 80% rename from server/cors.go rename to http/cors.go index d585fbfc9..16e616eed 100644 --- a/server/cors.go +++ b/http/cors.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package server +package http import ( "fmt" @@ -22,9 +22,9 @@ import ( "net/url" ) -type corsInfo map[string]bool +type CORSInfo map[string]bool -func NewCORSInfo(origins []string) (*corsInfo, error) { +func NewCORSInfo(origins []string) (*CORSInfo, error) { // Construct a lookup of all origins. m := make(map[string]bool) for _, v := range origins { @@ -36,29 +36,29 @@ func NewCORSInfo(origins []string) (*corsInfo, error) { m[v] = true } - info := corsInfo(m) + info := CORSInfo(m) return &info, nil } // OriginAllowed determines whether the server will allow a given CORS origin. -func (c corsInfo) OriginAllowed(origin string) bool { +func (c CORSInfo) OriginAllowed(origin string) bool { return c["*"] || c[origin] } -type CORSHTTPMiddleware struct { +type CORSHandler struct { Handler http.Handler - Info *corsInfo + Info *CORSInfo } // addHeader adds the correct cors headers given an origin -func (h *CORSHTTPMiddleware) addHeader(w http.ResponseWriter, origin string) { +func (h *CORSHandler) addHeader(w http.ResponseWriter, origin string) { w.Header().Add("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") w.Header().Add("Access-Control-Allow-Origin", origin) } // ServeHTTP adds the correct CORS headers based on the origin and returns immediatly // with a 200 OK if the method is OPTIONS. -func (h *CORSHTTPMiddleware) ServeHTTP(w http.ResponseWriter, req *http.Request) { +func (h *CORSHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // Write CORS header. if h.Info.OriginAllowed("*") { h.addHeader(w, "*") diff --git a/server/peer_server.go b/server/peer_server.go index 5576cc6d7..705cd9c13 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "net" "net/http" "net/url" "strconv" @@ -35,11 +34,11 @@ type PeerServerConfig struct { ElectionTimeout time.Duration MaxClusterSize int RetryTimes int - CORS *corsInfo } type PeerServer struct { Config PeerServerConfig + handler http.Handler raftServer raft.Server server *Server joinIndex uint64 @@ -49,8 +48,6 @@ type PeerServer struct { store store.Store snapConf *snapshotConf - listener net.Listener - closeChan chan bool timeoutThresholdChan chan interface{} @@ -82,6 +79,9 @@ func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.St metrics: mb, } + + s.handler = s.buildHTTPHandler() + return s } @@ -107,7 +107,7 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server) { } // Start the raft server -func (s *PeerServer) Serve(listener net.Listener, snapshot bool, cluster []string) error { +func (s *PeerServer) Start(snapshot bool, cluster []string) error { // LoadSnapshot if snapshot { err := s.raftServer.LoadSnapshot() @@ -157,8 +157,18 @@ func (s *PeerServer) Serve(listener net.Listener, snapshot bool, cluster []strin go s.monitorSnapshot() } + return nil +} + +func (s *PeerServer) Stop() { + if s.closeChan != nil { + close(s.closeChan) + s.closeChan = nil + } +} + +func (s *PeerServer) buildHTTPHandler() http.Handler { router := mux.NewRouter() - httpServer := &http.Server{Handler: router} // internal commands router.HandleFunc("/name", s.NameHttpHandler) @@ -174,21 +184,11 @@ func (s *PeerServer) Serve(listener net.Listener, snapshot bool, cluster []strin router.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler) router.HandleFunc("/etcdURL", s.EtcdURLHttpHandler) - s.listener = listener - log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.Config.Name, listener.Addr(), s.Config.URL) - httpServer.Serve(listener) - return nil + return router } -func (s *PeerServer) Close() { - if s.closeChan != nil { - close(s.closeChan) - s.closeChan = nil - } - if s.listener != nil { - s.listener.Close() - s.listener = nil - } +func (s *PeerServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) } // Retrieves the underlying Raft server. diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index a2c498101..fdcbf3df5 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -149,13 +149,6 @@ func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Reques func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) { command := &JoinCommand{} - // Write CORS header. - if ps.Config.CORS.OriginAllowed("*") { - w.Header().Add("Access-Control-Allow-Origin", "*") - } else if ps.Config.CORS.OriginAllowed(req.Header.Get("Origin")) { - w.Header().Add("Access-Control-Allow-Origin", req.Header.Get("Origin")) - } - err := decodeJsonRequest(req, command) if err != nil { w.WriteHeader(http.StatusInternalServerError) diff --git a/server/server.go b/server/server.go index dd6d08ff0..f4b823355 100644 --- a/server/server.go +++ b/server/server.go @@ -3,7 +3,6 @@ package server import ( "encoding/json" "fmt" - "net" "net/http" "net/http/pprof" "strings" @@ -30,13 +29,12 @@ type ServerConfig struct { // This is the default implementation of the Server interface. type Server struct { Config ServerConfig + handler http.Handler peerServer *PeerServer registry *Registry store store.Store metrics *metrics.Bucket - listener net.Listener - trace bool } @@ -50,6 +48,8 @@ func New(sConfig ServerConfig, peerServer *PeerServer, registry *Registry, store metrics: mb, } + s.handler = s.buildHTTPHandler() + return s } @@ -172,7 +172,7 @@ func (s *Server) handleFunc(r *mux.Router, path string, f func(http.ResponseWrit }) } -func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (s *Server) buildHTTPHandler() http.Handler { router := mux.NewRouter() // Install the routes. @@ -185,15 +185,11 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.installDebug(router) } - router.ServeHTTP(w, r) + return router } -// Stops the server. -func (s *Server) Close() { - if s.listener != nil { - s.listener.Close() - s.listener = nil - } +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) } // Dispatch command to the current leader diff --git a/tests/server_utils.go b/tests/server_utils.go index cbe11aa1e..5a0cb5031 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -2,6 +2,7 @@ package tests import ( "io/ioutil" + "net/http" "os" "time" @@ -27,7 +28,6 @@ func RunServer(f func(*server.Server)) { store := store.New() registry := server.NewRegistry(store) - corsInfo, _ := server.NewCORSInfo([]string{}) serverStats := server.NewRaftServerStats(testName) followersStats := server.NewRaftFollowersStats(testName) @@ -39,7 +39,6 @@ func RunServer(f func(*server.Server)) { Scheme: "http", SnapshotCount: testSnapshotCount, MaxClusterSize: 9, - CORS: corsInfo, } ps := server.NewPeerServer(psConfig, registry, store, nil, followersStats, serverStats) psListener, err := server.NewListener(testRaftURL) @@ -63,7 +62,6 @@ func RunServer(f func(*server.Server)) { sConfig := server.ServerConfig{ Name: testName, URL: "http://"+testClientURL, - CORS: corsInfo, } s := server.New(sConfig, ps, registry, store, nil) sListener, err := server.NewListener(testClientURL) @@ -77,14 +75,15 @@ func RunServer(f func(*server.Server)) { c := make(chan bool) go func() { c <- true - ps.Serve(psListener, false, []string{}) + ps.Start(false, []string{}) + http.Serve(psListener, ps) }() <-c // Start up etcd server. go func() { c <- true - s.Serve(sListener) + http.Serve(sListener, s) }() <-c @@ -95,6 +94,7 @@ func RunServer(f func(*server.Server)) { f(s) // Clean up servers. - ps.Close() - s.Close() + ps.Stop() + psListener.Close() + sListener.Close() } From a7d9efa90048b1fe88df4dea652bea7eccc20e39 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 20 Jan 2014 19:17:12 -0800 Subject: [PATCH 16/20] refactor(server): Remove ServerConfig struct --- etcd.go | 8 ++------ server/server.go | 35 ++++++++++++++++------------------- tests/server_utils.go | 6 +----- 3 files changed, 19 insertions(+), 30 deletions(-) diff --git a/etcd.go b/etcd.go index 385fc47c2..4316579b2 100644 --- a/etcd.go +++ b/etcd.go @@ -155,11 +155,7 @@ func main() { ps.SetRaftServer(raftServer) // Create client server. - sConfig := server.ServerConfig{ - Name: info.Name, - URL: info.EtcdURL, - } - s := server.New(sConfig, ps, registry, store, &mb) + s := server.New(info.Name, info.EtcdURL, ps, registry, store, &mb) if config.Trace() { s.EnableTracing() @@ -186,7 +182,7 @@ func main() { log.Fatal(http.Serve(psListener, sHTTP)) }() - log.Infof("etcd server [name %s, listen on %s, advertised url %s]", s.Config.Name, sListener.Addr(), s.Config.URL) + log.Infof("etcd server [name %s, listen on %s, advertised url %s]", s.Name, sListener.Addr(), s.URL()) sHTTP := &ehttp.CORSHandler{s, corsInfo} log.Fatal(http.Serve(sListener, sHTTP)) } diff --git a/server/server.go b/server/server.go index f4b823355..de0801679 100644 --- a/server/server.go +++ b/server/server.go @@ -21,30 +21,27 @@ import ( _ "github.com/coreos/etcd/store/v2" ) -type ServerConfig struct { - Name string - URL string -} - // This is the default implementation of the Server interface. type Server struct { - Config ServerConfig - handler http.Handler - peerServer *PeerServer - registry *Registry - store store.Store + Name string + url string + handler http.Handler + peerServer *PeerServer + registry *Registry + store store.Store metrics *metrics.Bucket trace bool } // Creates a new Server. -func New(sConfig ServerConfig, peerServer *PeerServer, registry *Registry, store store.Store, mb *metrics.Bucket) *Server { +func New(name, url string, peerServer *PeerServer, registry *Registry, store store.Store, mb *metrics.Bucket) *Server { s := &Server{ - Config: sConfig, - store: store, - registry: registry, - peerServer: peerServer, + Name: name, + url: url, + store: store, + registry: registry, + peerServer: peerServer, metrics: mb, } @@ -79,7 +76,7 @@ func (s *Server) Term() uint64 { // The server URL. func (s *Server) URL() string { - return s.Config.URL + return s.url } // Retrives the Peer URL for a given node name. @@ -125,7 +122,7 @@ func (s *Server) installV2(r *mux.Router) { } func (s *Server) installMod(r *mux.Router) { - r.PathPrefix("/mod").Handler(http.StripPrefix("/mod", mod.HttpHandler(s.Config.URL))) + r.PathPrefix("/mod").Handler(http.StripPrefix("/mod", mod.HttpHandler(s.URL()))) } func (s *Server) installDebug(r *mux.Router) { @@ -157,7 +154,7 @@ func (s *Server) handleFunc(r *mux.Router, path string, f func(http.ResponseWrit // Wrap the standard HandleFunc interface to pass in the server reference. return r.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) { // Log request. - log.Debugf("[recv] %s %s %s [%s]", req.Method, s.Config.URL, req.URL.Path, req.RemoteAddr) + log.Debugf("[recv] %s %s %s [%s]", req.Method, s.URL(), req.URL.Path, req.RemoteAddr) // Execute handler function and return error if necessary. if err := f(w, req); err != nil { @@ -280,7 +277,7 @@ func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) erro // Handler to return all the known peers in the current cluster. func (s *Server) GetPeersHandler(w http.ResponseWriter, req *http.Request) error { - peers := s.registry.ClientURLs(s.peerServer.RaftServer().Leader(), s.Config.Name) + peers := s.registry.ClientURLs(s.peerServer.RaftServer().Leader(), s.Name) w.WriteHeader(http.StatusOK) w.Write([]byte(strings.Join(peers, ", "))) return nil diff --git a/tests/server_utils.go b/tests/server_utils.go index 5a0cb5031..8f5415a07 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -59,11 +59,7 @@ func RunServer(f func(*server.Server)) { raftServer.SetHeartbeatTimeout(testHeartbeatTimeout) ps.SetRaftServer(raftServer) - sConfig := server.ServerConfig{ - Name: testName, - URL: "http://"+testClientURL, - } - s := server.New(sConfig, ps, registry, store, nil) + s := server.New(testName, "http://"+testClientURL, ps, registry, store, nil) sListener, err := server.NewListener(testClientURL) if err != nil { panic(err) From 19980a7033a329e979bab806f4b9e6d0f69a17de Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 20 Jan 2014 20:16:53 -0800 Subject: [PATCH 17/20] refactor(peerserver): remove timeouts from PeerServerConfig --- server/peer_server.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/peer_server.go b/server/peer_server.go index 705cd9c13..afa66adad 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -30,8 +30,6 @@ type PeerServerConfig struct { Scheme string URL string SnapshotCount int - HeartbeatTimeout time.Duration - ElectionTimeout time.Duration MaxClusterSize int RetryTimes int } From f158dfcd772efa557e81d33c186c861067c24ea6 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 20 Jan 2014 20:22:29 -0800 Subject: [PATCH 18/20] refactor(peerserver): Remove PeerServerConfig.Path --- etcd.go | 1 - server/peer_server.go | 1 - tests/server_utils.go | 1 - 3 files changed, 3 deletions(-) diff --git a/etcd.go b/etcd.go index 4316579b2..b56a4bcf5 100644 --- a/etcd.go +++ b/etcd.go @@ -125,7 +125,6 @@ func main() { // Create peer server. psConfig := server.PeerServerConfig{ Name: info.Name, - Path: config.DataDir, Scheme: peerTLSConfig.Scheme, URL: info.RaftURL, SnapshotCount: config.SnapshotCount, diff --git a/server/peer_server.go b/server/peer_server.go index afa66adad..924d09fda 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -26,7 +26,6 @@ const ThresholdMonitorTimeout = 5 * time.Second type PeerServerConfig struct { Name string - Path string Scheme string URL string SnapshotCount int diff --git a/tests/server_utils.go b/tests/server_utils.go index 8f5415a07..0552e1a03 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -34,7 +34,6 @@ func RunServer(f func(*server.Server)) { psConfig := server.PeerServerConfig{ Name: testName, - Path: path, URL: "http://"+testRaftURL, Scheme: "http", SnapshotCount: testSnapshotCount, From 089021ca6dc4110d27b2e47388ec7d76a646b626 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 20 Jan 2014 20:31:09 -0800 Subject: [PATCH 19/20] refacotor(transporter): make TLS config explicit --- etcd.go | 5 ++++- server/transporter.go | 12 ++++++------ tests/server_utils.go | 3 +-- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/etcd.go b/etcd.go index b56a4bcf5..b167a0576 100644 --- a/etcd.go +++ b/etcd.go @@ -144,7 +144,10 @@ func main() { } // Create Raft transporter and server - raftTransporter := server.NewTransporter(peerTLSConfig.Scheme, peerTLSConfig.Client, followersStats, serverStats, registry, heartbeatTimeout, dialTimeout, responseHeaderTimeout) + raftTransporter := server.NewTransporter(followersStats, serverStats, registry, heartbeatTimeout, dialTimeout, responseHeaderTimeout) + if psConfig.Scheme == "https" { + raftTransporter.SetTLSConfig(peerTLSConfig.Client) + } raftServer, err := raft.NewServer(info.Name, config.DataDir, raftTransporter, store, ps, "") if err != nil { log.Fatal(err) diff --git a/server/transporter.go b/server/transporter.go index 3b0cb6d68..3a04df77c 100644 --- a/server/transporter.go +++ b/server/transporter.go @@ -29,7 +29,7 @@ type dialer func(network, addr string) (net.Conn, error) // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key -func NewTransporter(scheme string, tlsConf tls.Config, followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter { +func NewTransporter(followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter { tr := &http.Transport{ Dial: func(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, dialTimeout) @@ -37,11 +37,6 @@ func NewTransporter(scheme string, tlsConf tls.Config, followersStats *raftFollo ResponseHeaderTimeout: responseHeaderTimeout, } - if scheme == "https" { - tr.TLSClientConfig = &tlsConf - tr.DisableCompression = true - } - t := transporter{ client: &http.Client{Transport: tr}, transport: tr, @@ -54,6 +49,11 @@ func NewTransporter(scheme string, tlsConf tls.Config, followersStats *raftFollo return &t } +func (t *transporter) SetTLSConfig(tlsConf tls.Config) { + t.transport.TLSClientConfig = &tlsConf + t.transport.DisableCompression = true +} + // Sends AppendEntries RPCs to a peer when the server is the leader. func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { var b bytes.Buffer diff --git a/tests/server_utils.go b/tests/server_utils.go index 0552e1a03..143ccd7c4 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -46,10 +46,9 @@ func RunServer(f func(*server.Server)) { } // Create Raft transporter and server - tls := &server.TLSConfig{Scheme: "http"} dialTimeout := (3 * testHeartbeatTimeout) + testElectionTimeout responseHeaderTimeout := (3 * testHeartbeatTimeout) + testElectionTimeout - raftTransporter := server.NewTransporter(tls.Scheme, tls.Client, followersStats, serverStats, registry, testHeartbeatTimeout, dialTimeout, responseHeaderTimeout) + raftTransporter := server.NewTransporter(followersStats, serverStats, registry, testHeartbeatTimeout, dialTimeout, responseHeaderTimeout) raftServer, err := raft.NewServer(testName, path, raftTransporter, store, ps, "") if err != nil { panic(err) From 2fe22f18908b2acb8f293d8003adfaf9a2c1d78f Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Tue, 21 Jan 2014 08:08:20 -0800 Subject: [PATCH 20/20] refactor(servers): emit http.Handlers from *Server --- etcd.go | 4 ++-- server/peer_server.go | 9 +-------- server/server.go | 8 +------- tests/server_utils.go | 4 ++-- 4 files changed, 6 insertions(+), 19 deletions(-) diff --git a/etcd.go b/etcd.go index b167a0576..dc373fff3 100644 --- a/etcd.go +++ b/etcd.go @@ -180,11 +180,11 @@ func main() { // Run peer server in separate thread while the client server blocks. go func() { log.Infof("raft server [name %s, listen on %s, advertised url %s]", ps.Config.Name, psListener.Addr(), ps.Config.URL) - sHTTP := &ehttp.CORSHandler{ps, corsInfo} + sHTTP := &ehttp.CORSHandler{ps.HTTPHandler(), corsInfo} log.Fatal(http.Serve(psListener, sHTTP)) }() log.Infof("etcd server [name %s, listen on %s, advertised url %s]", s.Name, sListener.Addr(), s.URL()) - sHTTP := &ehttp.CORSHandler{s, corsInfo} + sHTTP := &ehttp.CORSHandler{s.HTTPHandler(), corsInfo} log.Fatal(http.Serve(sListener, sHTTP)) } diff --git a/server/peer_server.go b/server/peer_server.go index 924d09fda..4fc880334 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -35,7 +35,6 @@ type PeerServerConfig struct { type PeerServer struct { Config PeerServerConfig - handler http.Handler raftServer raft.Server server *Server joinIndex uint64 @@ -77,8 +76,6 @@ func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.St metrics: mb, } - s.handler = s.buildHTTPHandler() - return s } @@ -164,7 +161,7 @@ func (s *PeerServer) Stop() { } } -func (s *PeerServer) buildHTTPHandler() http.Handler { +func (s *PeerServer) HTTPHandler() http.Handler { router := mux.NewRouter() // internal commands @@ -184,10 +181,6 @@ func (s *PeerServer) buildHTTPHandler() http.Handler { return router } -func (s *PeerServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { - s.handler.ServeHTTP(w, r) -} - // Retrieves the underlying Raft server. func (s *PeerServer) RaftServer() raft.Server { return s.raftServer diff --git a/server/server.go b/server/server.go index de0801679..6a55ca2f2 100644 --- a/server/server.go +++ b/server/server.go @@ -45,8 +45,6 @@ func New(name, url string, peerServer *PeerServer, registry *Registry, store sto metrics: mb, } - s.handler = s.buildHTTPHandler() - return s } @@ -169,7 +167,7 @@ func (s *Server) handleFunc(r *mux.Router, path string, f func(http.ResponseWrit }) } -func (s *Server) buildHTTPHandler() http.Handler { +func (s *Server) HTTPHandler() http.Handler { router := mux.NewRouter() // Install the routes. @@ -185,10 +183,6 @@ func (s *Server) buildHTTPHandler() http.Handler { return router } -func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - s.handler.ServeHTTP(w, r) -} - // Dispatch command to the current leader func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error { ps := s.peerServer diff --git a/tests/server_utils.go b/tests/server_utils.go index 143ccd7c4..f3dc05af6 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -70,14 +70,14 @@ func RunServer(f func(*server.Server)) { go func() { c <- true ps.Start(false, []string{}) - http.Serve(psListener, ps) + http.Serve(psListener, ps.HTTPHandler()) }() <-c // Start up etcd server. go func() { c <- true - http.Serve(sListener, s) + http.Serve(sListener, s.HTTPHandler()) }() <-c