From 0eee88a3d92052c029a828837ff8e471a8dc7fe0 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 2 Nov 2015 21:53:48 -0800 Subject: [PATCH 1/3] etcdserver: use timeout transport as peer transport This pairs with remote timeout listeners. etcd uses timeout listener, and times out the accepted connections if there is no activity. So the idle connections may time out easily. Becaus timeout transport doesn't reuse connections, it prevents using timeouted connection. This fixes the problem that etcd fail to get version of peers. --- etcdserver/server.go | 3 ++- rafthttp/transport.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 33e116cbb..7465d1712 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -211,7 +211,8 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { haveWAL := wal.Exist(cfg.WALDir()) ss := snap.New(cfg.SnapDir()) - pt, err := transport.NewTransport(cfg.PeerTLSInfo, cfg.peerDialTimeout()) + // use timeout transport to pair with remote timeout listeners + pt, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, cfg.peerDialTimeout(), 0, 0) if err != nil { return nil, err } diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 6a6fa12f6..3625bd001 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -143,7 +143,8 @@ func (t *Transport) Start() error { if err != nil { return err } - t.pipelineRt, err = transport.NewTransport(t.TLSInfo, t.DialTimeout) + // use timeout transport to pair with remote timeout listeners + t.pipelineRt, err = transport.NewTimeoutTransport(t.TLSInfo, t.DialTimeout, 0, 0) if err != nil { return err } From 32819f6b3fc9469fc15e45c41891a3ca2a822b21 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 4 Nov 2015 10:49:42 -0800 Subject: [PATCH 2/3] etcdserver: use roundTripper to request peerURL It uses roundTripper instead of Transport because roundTripper is sufficient for its requirements. --- etcdserver/cluster_util.go | 24 ++++++++++++------------ etcdserver/server.go | 15 ++++++++------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index 60ec32d0f..74991a573 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -29,8 +29,8 @@ import ( // isMemberBootstrapped tries to check if the given member has been bootstrapped // in the given cluster. -func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool { - rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), time.Second, false, tr) +func isMemberBootstrapped(cl *cluster, member string, rt http.RoundTripper) bool { + rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), time.Second, false, rt) if err != nil { return false } @@ -52,14 +52,14 @@ func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool { // response, an error is returned. // Each request has a 10-second timeout. Because the upper limit of TTL is 5s, // 10 second is enough for building connection and finishing request. -func GetClusterFromRemotePeers(urls []string, tr *http.Transport) (*cluster, error) { - return getClusterFromRemotePeers(urls, 10*time.Second, true, tr) +func GetClusterFromRemotePeers(urls []string, rt http.RoundTripper) (*cluster, error) { + return getClusterFromRemotePeers(urls, 10*time.Second, true, rt) } // If logerr is true, it prints out more error messages. -func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, tr *http.Transport) (*cluster, error) { +func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*cluster, error) { cc := &http.Client{ - Transport: tr, + Transport: rt, Timeout: timeout, } for _, u := range urls { @@ -114,7 +114,7 @@ func getRemotePeerURLs(cl Cluster, local string) []string { // The key of the returned map is the member's ID. The value of the returned map // is the semver versions string, including server and cluster. // If it fails to get the version of a member, the key will be nil. -func getVersions(cl Cluster, local types.ID, tr *http.Transport) map[string]*version.Versions { +func getVersions(cl Cluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions { members := cl.Members() vers := make(map[string]*version.Versions) for _, m := range members { @@ -126,7 +126,7 @@ func getVersions(cl Cluster, local types.ID, tr *http.Transport) map[string]*ver vers[m.ID.String()] = &version.Versions{Server: version.Version, Cluster: cv} continue } - ver, err := getVersion(m, tr) + ver, err := getVersion(m, rt) if err != nil { plog.Warningf("cannot get the version of member %s (%v)", m.ID, err) vers[m.ID.String()] = nil @@ -172,8 +172,8 @@ func decideClusterVersion(vers map[string]*version.Versions) *semver.Version { // cluster version in the range of [MinClusterVersion, Version] and no known members has a cluster version // out of the range. // We set this rule since when the local member joins, another member might be offline. -func isCompatibleWithCluster(cl Cluster, local types.ID, tr *http.Transport) bool { - vers := getVersions(cl, local, tr) +func isCompatibleWithCluster(cl Cluster, local types.ID, rt http.RoundTripper) bool { + vers := getVersions(cl, local, rt) minV := semver.Must(semver.NewVersion(version.MinClusterVersion)) maxV := semver.Must(semver.NewVersion(version.Version)) maxV = &semver.Version{ @@ -214,9 +214,9 @@ func isCompatibleWithVers(vers map[string]*version.Versions, local types.ID, min // getVersion returns the Versions of the given member via its // peerURLs. Returns the last error if it fails to get the version. -func getVersion(m *Member, tr *http.Transport) (*version.Versions, error) { +func getVersion(m *Member, rt http.RoundTripper) (*version.Versions, error) { cc := &http.Client{ - Transport: tr, + Transport: rt, } var ( err error diff --git a/etcdserver/server.go b/etcdserver/server.go index 7465d1712..b97d2e82a 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -171,8 +171,8 @@ type EtcdServer struct { // consistent index used to hold the offset of current executing entry // It is initialized to 0 before executing any entry. consistIndex consistentIndex - // versionTr used to send requests for peer version - versionTr *http.Transport + // versionRt used to send requests for peer version + versionRt http.RoundTripper reqIDGen *idutil.Generator // forceVersionC is used to force the version monitor loop @@ -216,6 +216,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err != nil { return nil, err } + prt := http.RoundTripper(pt) var remotes []*Member switch { case !haveWAL && !cfg.NewCluster: @@ -226,14 +227,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err != nil { return nil, err } - existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), pt) + existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt) if err != nil { return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err) } if err := ValidateClusterAndAssignIDs(cl, existingCluster); err != nil { return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) } - if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, pt) { + if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) { return nil, fmt.Errorf("incomptible with current running cluster") } @@ -251,7 +252,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { return nil, err } m := cl.MemberByName(cfg.Name) - if isMemberBootstrapped(cl, cfg.Name, pt) { + if isMemberBootstrapped(cl, cfg.Name, prt) { return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) } if cfg.ShouldDiscover() { @@ -339,7 +340,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { stats: sstats, lstats: lstats, SyncTicker: time.Tick(500 * time.Millisecond), - versionTr: pt, + versionRt: prt, reqIDGen: idutil.NewGenerator(uint8(id), time.Now()), forceVersionC: make(chan struct{}), } @@ -1091,7 +1092,7 @@ func (s *EtcdServer) monitorVersions() { continue } - v := decideClusterVersion(getVersions(s.cluster, s.id, s.versionTr)) + v := decideClusterVersion(getVersions(s.cluster, s.id, s.versionRt)) if v != nil { // only keep major.minor version for comparasion v = &semver.Version{ From 4ccbcb91c8d31670a88ac81fddd314cf6720f50d Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 3 Nov 2015 15:13:50 -0800 Subject: [PATCH 3/3] rafthttp: add functions to create listener and roundTripper This moves the code to create listener and roundTripper for raft communication to the same place, and use explicit functions to build them. This prevents possible development errors in the future. --- etcdmain/etcd.go | 2 +- etcdserver/server.go | 5 +---- rafthttp/transport.go | 8 ++------ rafthttp/util.go | 27 +++++++++++++++++++++++++++ 4 files changed, 31 insertions(+), 11 deletions(-) diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 8cde657c7..dc83626be 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -208,7 +208,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) } var l net.Listener - l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout) + l, err = rafthttp.NewListener(u, cfg.peerTLSInfo) if err != nil { return nil, err } diff --git a/etcdserver/server.go b/etcdserver/server.go index b97d2e82a..8a8ee76a4 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -40,7 +40,6 @@ import ( "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/runtime" "github.com/coreos/etcd/pkg/timeutil" - "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/wait" "github.com/coreos/etcd/raft" @@ -211,12 +210,10 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { haveWAL := wal.Exist(cfg.WALDir()) ss := snap.New(cfg.SnapDir()) - // use timeout transport to pair with remote timeout listeners - pt, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, cfg.peerDialTimeout(), 0, 0) + prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout()) if err != nil { return nil, err } - prt := http.RoundTripper(pt) var remotes []*Member switch { case !haveWAL && !cfg.NewCluster: diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 3625bd001..8232003fb 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -136,15 +136,11 @@ type Transport struct { func (t *Transport) Start() error { var err error - // Read/write timeout is set for stream roundTripper to promptly - // find out broken status, which minimizes the number of messages - // sent on broken connection. - t.streamRt, err = transport.NewTimeoutTransport(t.TLSInfo, t.DialTimeout, ConnReadTimeout, ConnWriteTimeout) + t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout) if err != nil { return err } - // use timeout transport to pair with remote timeout listeners - t.pipelineRt, err = transport.NewTimeoutTransport(t.TLSInfo, t.DialTimeout, 0, 0) + t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout) if err != nil { return err } diff --git a/rafthttp/util.go b/rafthttp/util.go index dbf09c223..75a66cfd7 100644 --- a/rafthttp/util.go +++ b/rafthttp/util.go @@ -18,11 +18,14 @@ import ( "encoding/binary" "fmt" "io" + "net" "net/http" "net/url" "strings" + "time" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" + "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/version" @@ -30,6 +33,30 @@ import ( var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster") +// NewListener returns a listener for raft message transfer between peers. +// It uses timeout listener to identify broken streams promptly. +func NewListener(u url.URL, tlsInfo transport.TLSInfo) (net.Listener, error) { + return transport.NewTimeoutListener(u.Host, u.Scheme, tlsInfo, ConnReadTimeout, ConnWriteTimeout) +} + +// NewRoundTripper returns a roundTripper used to send requests +// to rafthttp listener of remote peers. +func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) { + // It uses timeout transport to pair with remote timeout listeners. + // It sets no read/write timeout, because message in requests may + // take long time to write out before reading out the response. + return transport.NewTimeoutTransport(tlsInfo, dialTimeout, 0, 0) +} + +// newStreamRoundTripper returns a roundTripper used to send stream requests +// to rafthttp listener of remote peers. +// Read/write timeout is set for stream roundTripper to promptly +// find out broken status, which minimizes the number of messages +// sent on broken connection. +func newStreamRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) { + return transport.NewTimeoutTransport(tlsInfo, dialTimeout, ConnReadTimeout, ConnWriteTimeout) +} + func writeEntryTo(w io.Writer, ent *raftpb.Entry) error { size := ent.Size() if err := binary.Write(w, binary.BigEndian, uint64(size)); err != nil {