etcdserver: use roundTripper to request peerURL

It uses roundTripper instead of Transport because roundTripper is
sufficient for its requirements.
release-2.3
Yicheng Qin 2015-11-04 10:49:42 -08:00
parent 0eee88a3d9
commit 32819f6b3f
2 changed files with 20 additions and 19 deletions

View File

@ -29,8 +29,8 @@ import (
// isMemberBootstrapped tries to check if the given member has been bootstrapped // isMemberBootstrapped tries to check if the given member has been bootstrapped
// in the given cluster. // in the given cluster.
func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool { func isMemberBootstrapped(cl *cluster, member string, rt http.RoundTripper) bool {
rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), time.Second, false, tr) rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), time.Second, false, rt)
if err != nil { if err != nil {
return false return false
} }
@ -52,14 +52,14 @@ func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool {
// response, an error is returned. // response, an error is returned.
// Each request has a 10-second timeout. Because the upper limit of TTL is 5s, // Each request has a 10-second timeout. Because the upper limit of TTL is 5s,
// 10 second is enough for building connection and finishing request. // 10 second is enough for building connection and finishing request.
func GetClusterFromRemotePeers(urls []string, tr *http.Transport) (*cluster, error) { func GetClusterFromRemotePeers(urls []string, rt http.RoundTripper) (*cluster, error) {
return getClusterFromRemotePeers(urls, 10*time.Second, true, tr) return getClusterFromRemotePeers(urls, 10*time.Second, true, rt)
} }
// If logerr is true, it prints out more error messages. // If logerr is true, it prints out more error messages.
func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, tr *http.Transport) (*cluster, error) { func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*cluster, error) {
cc := &http.Client{ cc := &http.Client{
Transport: tr, Transport: rt,
Timeout: timeout, Timeout: timeout,
} }
for _, u := range urls { for _, u := range urls {
@ -114,7 +114,7 @@ func getRemotePeerURLs(cl Cluster, local string) []string {
// The key of the returned map is the member's ID. The value of the returned map // The key of the returned map is the member's ID. The value of the returned map
// is the semver versions string, including server and cluster. // is the semver versions string, including server and cluster.
// If it fails to get the version of a member, the key will be nil. // If it fails to get the version of a member, the key will be nil.
func getVersions(cl Cluster, local types.ID, tr *http.Transport) map[string]*version.Versions { func getVersions(cl Cluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions {
members := cl.Members() members := cl.Members()
vers := make(map[string]*version.Versions) vers := make(map[string]*version.Versions)
for _, m := range members { for _, m := range members {
@ -126,7 +126,7 @@ func getVersions(cl Cluster, local types.ID, tr *http.Transport) map[string]*ver
vers[m.ID.String()] = &version.Versions{Server: version.Version, Cluster: cv} vers[m.ID.String()] = &version.Versions{Server: version.Version, Cluster: cv}
continue continue
} }
ver, err := getVersion(m, tr) ver, err := getVersion(m, rt)
if err != nil { if err != nil {
plog.Warningf("cannot get the version of member %s (%v)", m.ID, err) plog.Warningf("cannot get the version of member %s (%v)", m.ID, err)
vers[m.ID.String()] = nil vers[m.ID.String()] = nil
@ -172,8 +172,8 @@ func decideClusterVersion(vers map[string]*version.Versions) *semver.Version {
// cluster version in the range of [MinClusterVersion, Version] and no known members has a cluster version // cluster version in the range of [MinClusterVersion, Version] and no known members has a cluster version
// out of the range. // out of the range.
// We set this rule since when the local member joins, another member might be offline. // We set this rule since when the local member joins, another member might be offline.
func isCompatibleWithCluster(cl Cluster, local types.ID, tr *http.Transport) bool { func isCompatibleWithCluster(cl Cluster, local types.ID, rt http.RoundTripper) bool {
vers := getVersions(cl, local, tr) vers := getVersions(cl, local, rt)
minV := semver.Must(semver.NewVersion(version.MinClusterVersion)) minV := semver.Must(semver.NewVersion(version.MinClusterVersion))
maxV := semver.Must(semver.NewVersion(version.Version)) maxV := semver.Must(semver.NewVersion(version.Version))
maxV = &semver.Version{ maxV = &semver.Version{
@ -214,9 +214,9 @@ func isCompatibleWithVers(vers map[string]*version.Versions, local types.ID, min
// getVersion returns the Versions of the given member via its // getVersion returns the Versions of the given member via its
// peerURLs. Returns the last error if it fails to get the version. // peerURLs. Returns the last error if it fails to get the version.
func getVersion(m *Member, tr *http.Transport) (*version.Versions, error) { func getVersion(m *Member, rt http.RoundTripper) (*version.Versions, error) {
cc := &http.Client{ cc := &http.Client{
Transport: tr, Transport: rt,
} }
var ( var (
err error err error

View File

@ -171,8 +171,8 @@ type EtcdServer struct {
// consistent index used to hold the offset of current executing entry // consistent index used to hold the offset of current executing entry
// It is initialized to 0 before executing any entry. // It is initialized to 0 before executing any entry.
consistIndex consistentIndex consistIndex consistentIndex
// versionTr used to send requests for peer version // versionRt used to send requests for peer version
versionTr *http.Transport versionRt http.RoundTripper
reqIDGen *idutil.Generator reqIDGen *idutil.Generator
// forceVersionC is used to force the version monitor loop // forceVersionC is used to force the version monitor loop
@ -216,6 +216,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
prt := http.RoundTripper(pt)
var remotes []*Member var remotes []*Member
switch { switch {
case !haveWAL && !cfg.NewCluster: case !haveWAL && !cfg.NewCluster:
@ -226,14 +227,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), pt) existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err) return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err)
} }
if err := ValidateClusterAndAssignIDs(cl, existingCluster); err != nil { if err := ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
} }
if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, pt) { if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) {
return nil, fmt.Errorf("incomptible with current running cluster") return nil, fmt.Errorf("incomptible with current running cluster")
} }
@ -251,7 +252,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
return nil, err return nil, err
} }
m := cl.MemberByName(cfg.Name) m := cl.MemberByName(cfg.Name)
if isMemberBootstrapped(cl, cfg.Name, pt) { if isMemberBootstrapped(cl, cfg.Name, prt) {
return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
} }
if cfg.ShouldDiscover() { if cfg.ShouldDiscover() {
@ -339,7 +340,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
stats: sstats, stats: sstats,
lstats: lstats, lstats: lstats,
SyncTicker: time.Tick(500 * time.Millisecond), SyncTicker: time.Tick(500 * time.Millisecond),
versionTr: pt, versionRt: prt,
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()), reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
forceVersionC: make(chan struct{}), forceVersionC: make(chan struct{}),
} }
@ -1091,7 +1092,7 @@ func (s *EtcdServer) monitorVersions() {
continue continue
} }
v := decideClusterVersion(getVersions(s.cluster, s.id, s.versionTr)) v := decideClusterVersion(getVersions(s.cluster, s.id, s.versionRt))
if v != nil { if v != nil {
// only keep major.minor version for comparasion // only keep major.minor version for comparasion
v = &semver.Version{ v = &semver.Version{