diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 8f009638a..a85711cba 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -210,7 +210,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) } var l net.Listener - l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout) + l, err = rafthttp.NewListener(u, cfg.peerTLSInfo) if err != nil { return nil, err } diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index 60ec32d0f..74991a573 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -29,8 +29,8 @@ import ( // isMemberBootstrapped tries to check if the given member has been bootstrapped // in the given cluster. -func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool { - rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), time.Second, false, tr) +func isMemberBootstrapped(cl *cluster, member string, rt http.RoundTripper) bool { + rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), time.Second, false, rt) if err != nil { return false } @@ -52,14 +52,14 @@ func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool { // response, an error is returned. // Each request has a 10-second timeout. Because the upper limit of TTL is 5s, // 10 second is enough for building connection and finishing request. -func GetClusterFromRemotePeers(urls []string, tr *http.Transport) (*cluster, error) { - return getClusterFromRemotePeers(urls, 10*time.Second, true, tr) +func GetClusterFromRemotePeers(urls []string, rt http.RoundTripper) (*cluster, error) { + return getClusterFromRemotePeers(urls, 10*time.Second, true, rt) } // If logerr is true, it prints out more error messages. -func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, tr *http.Transport) (*cluster, error) { +func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*cluster, error) { cc := &http.Client{ - Transport: tr, + Transport: rt, Timeout: timeout, } for _, u := range urls { @@ -114,7 +114,7 @@ func getRemotePeerURLs(cl Cluster, local string) []string { // The key of the returned map is the member's ID. The value of the returned map // is the semver versions string, including server and cluster. // If it fails to get the version of a member, the key will be nil. -func getVersions(cl Cluster, local types.ID, tr *http.Transport) map[string]*version.Versions { +func getVersions(cl Cluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions { members := cl.Members() vers := make(map[string]*version.Versions) for _, m := range members { @@ -126,7 +126,7 @@ func getVersions(cl Cluster, local types.ID, tr *http.Transport) map[string]*ver vers[m.ID.String()] = &version.Versions{Server: version.Version, Cluster: cv} continue } - ver, err := getVersion(m, tr) + ver, err := getVersion(m, rt) if err != nil { plog.Warningf("cannot get the version of member %s (%v)", m.ID, err) vers[m.ID.String()] = nil @@ -172,8 +172,8 @@ func decideClusterVersion(vers map[string]*version.Versions) *semver.Version { // cluster version in the range of [MinClusterVersion, Version] and no known members has a cluster version // out of the range. // We set this rule since when the local member joins, another member might be offline. -func isCompatibleWithCluster(cl Cluster, local types.ID, tr *http.Transport) bool { - vers := getVersions(cl, local, tr) +func isCompatibleWithCluster(cl Cluster, local types.ID, rt http.RoundTripper) bool { + vers := getVersions(cl, local, rt) minV := semver.Must(semver.NewVersion(version.MinClusterVersion)) maxV := semver.Must(semver.NewVersion(version.Version)) maxV = &semver.Version{ @@ -214,9 +214,9 @@ func isCompatibleWithVers(vers map[string]*version.Versions, local types.ID, min // getVersion returns the Versions of the given member via its // peerURLs. Returns the last error if it fails to get the version. -func getVersion(m *Member, tr *http.Transport) (*version.Versions, error) { +func getVersion(m *Member, rt http.RoundTripper) (*version.Versions, error) { cc := &http.Client{ - Transport: tr, + Transport: rt, } var ( err error diff --git a/etcdserver/server.go b/etcdserver/server.go index 7bf7874b4..27ffc8a05 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -40,7 +40,6 @@ import ( "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/runtime" "github.com/coreos/etcd/pkg/timeutil" - "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/wait" "github.com/coreos/etcd/raft" @@ -171,8 +170,8 @@ type EtcdServer struct { // consistent index used to hold the offset of current executing entry // It is initialized to 0 before executing any entry. consistIndex consistentIndex - // versionTr used to send requests for peer version - versionTr *http.Transport + // versionRt used to send requests for peer version + versionRt http.RoundTripper reqIDGen *idutil.Generator // forceVersionC is used to force the version monitor loop @@ -211,7 +210,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { haveWAL := wal.Exist(cfg.WALDir()) ss := snap.New(cfg.SnapDir()) - pt, err := transport.NewTransport(cfg.PeerTLSInfo, cfg.peerDialTimeout()) + prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout()) if err != nil { return nil, err } @@ -225,14 +224,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err != nil { return nil, err } - existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), pt) + existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt) if err != nil { return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err) } if err := ValidateClusterAndAssignIDs(cl, existingCluster); err != nil { return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) } - if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, pt) { + if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) { return nil, fmt.Errorf("incomptible with current running cluster") } @@ -250,7 +249,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { return nil, err } m := cl.MemberByName(cfg.Name) - if isMemberBootstrapped(cl, cfg.Name, pt) { + if isMemberBootstrapped(cl, cfg.Name, prt) { return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) } if cfg.ShouldDiscover() { @@ -338,7 +337,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { stats: sstats, lstats: lstats, SyncTicker: time.Tick(500 * time.Millisecond), - versionTr: pt, + versionRt: prt, reqIDGen: idutil.NewGenerator(uint8(id), time.Now()), forceVersionC: make(chan struct{}), } @@ -1090,7 +1089,7 @@ func (s *EtcdServer) monitorVersions() { continue } - v := decideClusterVersion(getVersions(s.cluster, s.id, s.versionTr)) + v := decideClusterVersion(getVersions(s.cluster, s.id, s.versionRt)) if v != nil { // only keep major.minor version for comparasion v = &semver.Version{ diff --git a/rafthttp/transport.go b/rafthttp/transport.go index f0020d117..127dab4ef 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -137,14 +137,11 @@ type Transport struct { func (t *Transport) Start() error { var err error - // Read/write timeout is set for stream roundTripper to promptly - // find out broken status, which minimizes the number of messages - // sent on broken connection. - t.streamRt, err = transport.NewTimeoutTransport(t.TLSInfo, t.DialTimeout, ConnReadTimeout, ConnWriteTimeout) + t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout) if err != nil { return err } - t.pipelineRt, err = transport.NewTransport(t.TLSInfo, t.DialTimeout) + t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout) if err != nil { return err } diff --git a/rafthttp/util.go b/rafthttp/util.go index dbf09c223..75a66cfd7 100644 --- a/rafthttp/util.go +++ b/rafthttp/util.go @@ -18,11 +18,14 @@ import ( "encoding/binary" "fmt" "io" + "net" "net/http" "net/url" "strings" + "time" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" + "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/version" @@ -30,6 +33,30 @@ import ( var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster") +// NewListener returns a listener for raft message transfer between peers. +// It uses timeout listener to identify broken streams promptly. +func NewListener(u url.URL, tlsInfo transport.TLSInfo) (net.Listener, error) { + return transport.NewTimeoutListener(u.Host, u.Scheme, tlsInfo, ConnReadTimeout, ConnWriteTimeout) +} + +// NewRoundTripper returns a roundTripper used to send requests +// to rafthttp listener of remote peers. +func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) { + // It uses timeout transport to pair with remote timeout listeners. + // It sets no read/write timeout, because message in requests may + // take long time to write out before reading out the response. + return transport.NewTimeoutTransport(tlsInfo, dialTimeout, 0, 0) +} + +// newStreamRoundTripper returns a roundTripper used to send stream requests +// to rafthttp listener of remote peers. +// Read/write timeout is set for stream roundTripper to promptly +// find out broken status, which minimizes the number of messages +// sent on broken connection. +func newStreamRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) { + return transport.NewTimeoutTransport(tlsInfo, dialTimeout, ConnReadTimeout, ConnWriteTimeout) +} + func writeEntryTo(w io.Writer, ent *raftpb.Entry) error { size := ent.Size() if err := binary.Write(w, binary.BigEndian, uint64(size)); err != nil {