From 32819f6b3fc9469fc15e45c41891a3ca2a822b21 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 4 Nov 2015 10:49:42 -0800 Subject: [PATCH] etcdserver: use roundTripper to request peerURL It uses roundTripper instead of Transport because roundTripper is sufficient for its requirements. --- etcdserver/cluster_util.go | 24 ++++++++++++------------ etcdserver/server.go | 15 ++++++++------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index 60ec32d0f..74991a573 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -29,8 +29,8 @@ import ( // isMemberBootstrapped tries to check if the given member has been bootstrapped // in the given cluster. -func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool { - rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), time.Second, false, tr) +func isMemberBootstrapped(cl *cluster, member string, rt http.RoundTripper) bool { + rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), time.Second, false, rt) if err != nil { return false } @@ -52,14 +52,14 @@ func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool { // response, an error is returned. // Each request has a 10-second timeout. Because the upper limit of TTL is 5s, // 10 second is enough for building connection and finishing request. -func GetClusterFromRemotePeers(urls []string, tr *http.Transport) (*cluster, error) { - return getClusterFromRemotePeers(urls, 10*time.Second, true, tr) +func GetClusterFromRemotePeers(urls []string, rt http.RoundTripper) (*cluster, error) { + return getClusterFromRemotePeers(urls, 10*time.Second, true, rt) } // If logerr is true, it prints out more error messages. -func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, tr *http.Transport) (*cluster, error) { +func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*cluster, error) { cc := &http.Client{ - Transport: tr, + Transport: rt, Timeout: timeout, } for _, u := range urls { @@ -114,7 +114,7 @@ func getRemotePeerURLs(cl Cluster, local string) []string { // The key of the returned map is the member's ID. The value of the returned map // is the semver versions string, including server and cluster. // If it fails to get the version of a member, the key will be nil. -func getVersions(cl Cluster, local types.ID, tr *http.Transport) map[string]*version.Versions { +func getVersions(cl Cluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions { members := cl.Members() vers := make(map[string]*version.Versions) for _, m := range members { @@ -126,7 +126,7 @@ func getVersions(cl Cluster, local types.ID, tr *http.Transport) map[string]*ver vers[m.ID.String()] = &version.Versions{Server: version.Version, Cluster: cv} continue } - ver, err := getVersion(m, tr) + ver, err := getVersion(m, rt) if err != nil { plog.Warningf("cannot get the version of member %s (%v)", m.ID, err) vers[m.ID.String()] = nil @@ -172,8 +172,8 @@ func decideClusterVersion(vers map[string]*version.Versions) *semver.Version { // cluster version in the range of [MinClusterVersion, Version] and no known members has a cluster version // out of the range. // We set this rule since when the local member joins, another member might be offline. -func isCompatibleWithCluster(cl Cluster, local types.ID, tr *http.Transport) bool { - vers := getVersions(cl, local, tr) +func isCompatibleWithCluster(cl Cluster, local types.ID, rt http.RoundTripper) bool { + vers := getVersions(cl, local, rt) minV := semver.Must(semver.NewVersion(version.MinClusterVersion)) maxV := semver.Must(semver.NewVersion(version.Version)) maxV = &semver.Version{ @@ -214,9 +214,9 @@ func isCompatibleWithVers(vers map[string]*version.Versions, local types.ID, min // getVersion returns the Versions of the given member via its // peerURLs. Returns the last error if it fails to get the version. -func getVersion(m *Member, tr *http.Transport) (*version.Versions, error) { +func getVersion(m *Member, rt http.RoundTripper) (*version.Versions, error) { cc := &http.Client{ - Transport: tr, + Transport: rt, } var ( err error diff --git a/etcdserver/server.go b/etcdserver/server.go index 7465d1712..b97d2e82a 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -171,8 +171,8 @@ type EtcdServer struct { // consistent index used to hold the offset of current executing entry // It is initialized to 0 before executing any entry. consistIndex consistentIndex - // versionTr used to send requests for peer version - versionTr *http.Transport + // versionRt used to send requests for peer version + versionRt http.RoundTripper reqIDGen *idutil.Generator // forceVersionC is used to force the version monitor loop @@ -216,6 +216,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err != nil { return nil, err } + prt := http.RoundTripper(pt) var remotes []*Member switch { case !haveWAL && !cfg.NewCluster: @@ -226,14 +227,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err != nil { return nil, err } - existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), pt) + existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt) if err != nil { return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err) } if err := ValidateClusterAndAssignIDs(cl, existingCluster); err != nil { return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) } - if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, pt) { + if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) { return nil, fmt.Errorf("incomptible with current running cluster") } @@ -251,7 +252,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { return nil, err } m := cl.MemberByName(cfg.Name) - if isMemberBootstrapped(cl, cfg.Name, pt) { + if isMemberBootstrapped(cl, cfg.Name, prt) { return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) } if cfg.ShouldDiscover() { @@ -339,7 +340,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { stats: sstats, lstats: lstats, SyncTicker: time.Tick(500 * time.Millisecond), - versionTr: pt, + versionRt: prt, reqIDGen: idutil.NewGenerator(uint8(id), time.Now()), forceVersionC: make(chan struct{}), } @@ -1091,7 +1092,7 @@ func (s *EtcdServer) monitorVersions() { continue } - v := decideClusterVersion(getVersions(s.cluster, s.id, s.versionTr)) + v := decideClusterVersion(getVersions(s.cluster, s.id, s.versionRt)) if v != nil { // only keep major.minor version for comparasion v = &semver.Version{