From 4ccbcb91c8d31670a88ac81fddd314cf6720f50d Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 3 Nov 2015 15:13:50 -0800 Subject: [PATCH] rafthttp: add functions to create listener and roundTripper This moves the code to create listener and roundTripper for raft communication to the same place, and use explicit functions to build them. This prevents possible development errors in the future. --- etcdmain/etcd.go | 2 +- etcdserver/server.go | 5 +---- rafthttp/transport.go | 8 ++------ rafthttp/util.go | 27 +++++++++++++++++++++++++++ 4 files changed, 31 insertions(+), 11 deletions(-) diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 8cde657c7..dc83626be 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -208,7 +208,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) } var l net.Listener - l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout) + l, err = rafthttp.NewListener(u, cfg.peerTLSInfo) if err != nil { return nil, err } diff --git a/etcdserver/server.go b/etcdserver/server.go index b97d2e82a..8a8ee76a4 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -40,7 +40,6 @@ import ( "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/runtime" "github.com/coreos/etcd/pkg/timeutil" - "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/wait" "github.com/coreos/etcd/raft" @@ -211,12 +210,10 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { haveWAL := wal.Exist(cfg.WALDir()) ss := snap.New(cfg.SnapDir()) - // use timeout transport to pair with remote timeout listeners - pt, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, cfg.peerDialTimeout(), 0, 0) + prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout()) if err != nil { return nil, err } - prt := http.RoundTripper(pt) var remotes []*Member switch { case !haveWAL && !cfg.NewCluster: diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 3625bd001..8232003fb 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -136,15 +136,11 @@ type Transport struct { func (t *Transport) Start() error { var err error - // Read/write timeout is set for stream roundTripper to promptly - // find out broken status, which minimizes the number of messages - // sent on broken connection. - t.streamRt, err = transport.NewTimeoutTransport(t.TLSInfo, t.DialTimeout, ConnReadTimeout, ConnWriteTimeout) + t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout) if err != nil { return err } - // use timeout transport to pair with remote timeout listeners - t.pipelineRt, err = transport.NewTimeoutTransport(t.TLSInfo, t.DialTimeout, 0, 0) + t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout) if err != nil { return err } diff --git a/rafthttp/util.go b/rafthttp/util.go index dbf09c223..75a66cfd7 100644 --- a/rafthttp/util.go +++ b/rafthttp/util.go @@ -18,11 +18,14 @@ import ( "encoding/binary" "fmt" "io" + "net" "net/http" "net/url" "strings" + "time" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" + "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/version" @@ -30,6 +33,30 @@ import ( var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster") +// NewListener returns a listener for raft message transfer between peers. +// It uses timeout listener to identify broken streams promptly. +func NewListener(u url.URL, tlsInfo transport.TLSInfo) (net.Listener, error) { + return transport.NewTimeoutListener(u.Host, u.Scheme, tlsInfo, ConnReadTimeout, ConnWriteTimeout) +} + +// NewRoundTripper returns a roundTripper used to send requests +// to rafthttp listener of remote peers. +func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) { + // It uses timeout transport to pair with remote timeout listeners. + // It sets no read/write timeout, because message in requests may + // take long time to write out before reading out the response. + return transport.NewTimeoutTransport(tlsInfo, dialTimeout, 0, 0) +} + +// newStreamRoundTripper returns a roundTripper used to send stream requests +// to rafthttp listener of remote peers. +// Read/write timeout is set for stream roundTripper to promptly +// find out broken status, which minimizes the number of messages +// sent on broken connection. +func newStreamRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) { + return transport.NewTimeoutTransport(tlsInfo, dialTimeout, ConnReadTimeout, ConnWriteTimeout) +} + func writeEntryTo(w io.Writer, ent *raftpb.Entry) error { size := ent.Size() if err := binary.Write(w, binary.BigEndian, uint64(size)); err != nil {