From a42d1dc1fef0349bdeb7f032915e77fa448df147 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 30 Mar 2016 04:19:58 -0700 Subject: [PATCH] *: drain http.Response.Body before closing --- etcdserver/cluster_util.go | 3 ++- pkg/httputil/{cancelreq.go => httputil.go} | 14 +++++++++++++- rafthttp/snapshot_sender.go | 2 +- rafthttp/stream.go | 10 +++++----- 4 files changed, 21 insertions(+), 8 deletions(-) rename pkg/httputil/{cancelreq.go => httputil.go} (59%) diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index 5f3ffdce5..cfbd1fea9 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -22,6 +22,7 @@ import ( "sort" "time" + "github.com/coreos/etcd/pkg/httputil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/version" "github.com/coreos/go-semver/semver" @@ -231,7 +232,7 @@ func getVersion(m *Member, rt http.RoundTripper) (*version.Versions, error) { } // etcd 2.0 does not have version endpoint on peer url. if resp.StatusCode == http.StatusNotFound { - resp.Body.Close() + httputil.GracefulClose(resp) return &version.Versions{ Server: "2.0.0", Cluster: "2.0.0", diff --git a/pkg/httputil/cancelreq.go b/pkg/httputil/httputil.go similarity index 59% rename from pkg/httputil/cancelreq.go rename to pkg/httputil/httputil.go index ab9fce1ab..daf43bd8f 100644 --- a/pkg/httputil/cancelreq.go +++ b/pkg/httputil/httputil.go @@ -7,7 +7,11 @@ // Package httputil provides HTTP utility functions. package httputil -import "net/http" +import ( + "io" + "io/ioutil" + "net/http" +) func RequestCanceler(rt http.RoundTripper, req *http.Request) func() { ch := make(chan struct{}) @@ -17,3 +21,11 @@ func RequestCanceler(rt http.RoundTripper, req *http.Request) func() { close(ch) } } + +// GracefulClose drains http.Response.Body until it hits EOF +// and closes it. This prevents TCP/TLS connections from closing, +// therefore available for reuse. +func GracefulClose(resp *http.Response) { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() +} diff --git a/rafthttp/snapshot_sender.go b/rafthttp/snapshot_sender.go index 12e62d7c5..a871b886a 100644 --- a/rafthttp/snapshot_sender.go +++ b/rafthttp/snapshot_sender.go @@ -124,7 +124,7 @@ func (s *snapshotSender) post(req *http.Request) (err error) { // close the response body when timeouts. // prevents from reading the body forever when the other side dies right after // successfully receives the request body. - time.AfterFunc(snapResponseReadTimeout, func() { resp.Body.Close() }) + time.AfterFunc(snapResponseReadTimeout, func() { httputil.GracefulClose(resp) }) body, err := ioutil.ReadAll(resp.Body) result <- responseAndError{resp, body, err} }() diff --git a/rafthttp/stream.go b/rafthttp/stream.go index af8185f82..8886cfa2c 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -417,14 +417,14 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { rv := serverVersion(resp.Header) lv := semver.Must(semver.NewVersion(version.Version)) if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) { - resp.Body.Close() + httputil.GracefulClose(resp) cr.picker.unreachable(u) return nil, errUnsupportedStreamType } switch resp.StatusCode { case http.StatusGone: - resp.Body.Close() + httputil.GracefulClose(resp) cr.picker.unreachable(u) err := fmt.Errorf("the member has been permanently removed from the cluster") select { @@ -435,7 +435,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { case http.StatusOK: return resp.Body, nil case http.StatusNotFound: - resp.Body.Close() + httputil.GracefulClose(resp) cr.picker.unreachable(u) return nil, fmt.Errorf("remote member %s could not recognize local member", cr.remote) case http.StatusPreconditionFailed: @@ -444,7 +444,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { cr.picker.unreachable(u) return nil, err } - resp.Body.Close() + httputil.GracefulClose(resp) cr.picker.unreachable(u) switch strings.TrimSuffix(string(b), "\n") { @@ -459,7 +459,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b)) } default: - resp.Body.Close() + httputil.GracefulClose(resp) cr.picker.unreachable(u) return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode) }