diff --git a/etcdserver/etcdhttp/peers.go b/etcdserver/etcdhttp/peers.go index 4af765fe2..e60e780d6 100644 --- a/etcdserver/etcdhttp/peers.go +++ b/etcdserver/etcdhttp/peers.go @@ -85,17 +85,18 @@ func (ps Peers) Endpoints() []string { return endpoints } -func Sender(p Peers) func(msgs []raftpb.Message) { +func Sender(t *http.Transport, p Peers) func(msgs []raftpb.Message) { + c := &http.Client{Transport: t} return func(msgs []raftpb.Message) { for _, m := range msgs { // TODO: reuse go routines // limit the number of outgoing connections for the same receiver - go send(p, m) + go send(c, p, m) } } } -func send(p Peers, m raftpb.Message) { +func send(c *http.Client, p Peers, m raftpb.Message) { // TODO (xiangli): reasonable retry logic for i := 0; i < 3; i++ { url := p.Pick(m.To) @@ -116,16 +117,15 @@ func send(p Peers, m raftpb.Message) { log.Println("etcdhttp: dropping message:", err) return // drop bad message } - if httpPost(url, data) { + if httpPost(c, url, data) { return // success } // TODO: backoff } } -func httpPost(url string, data []byte) bool { - // TODO: set timeouts - resp, err := http.Post(url, "application/protobuf", bytes.NewBuffer(data)) +func httpPost(c *http.Client, url string, data []byte) bool { + resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data)) if err != nil { elog.TODO() return false diff --git a/etcdserver/etcdhttp/peers_test.go b/etcdserver/etcdhttp/peers_test.go index f733635b1..e08bc7325 100644 --- a/etcdserver/etcdhttp/peers_test.go +++ b/etcdserver/etcdhttp/peers_test.go @@ -148,7 +148,7 @@ func TestHttpPost(t *testing.T) { } for i, tt := range tests { ts := httptest.NewServer(tt.h) - if g := httpPost(ts.URL, []byte("adsf")); g != tt.w { + if g := httpPost(http.DefaultClient, ts.URL, []byte("adsf")); g != tt.w { t.Errorf("#%d: httpPost()=%t, want %t", i, g, tt.w) } if tr.Method != "POST" { @@ -161,7 +161,7 @@ func TestHttpPost(t *testing.T) { ts.Close() } - if httpPost("garbage url", []byte("data")) { + if httpPost(http.DefaultClient, "garbage url", []byte("data")) { t.Errorf("httpPost with bad URL returned true unexpectedly!") } } @@ -215,7 +215,7 @@ func TestSend(t *testing.T) { ps := Peers{ 42: []string{strings.TrimPrefix(ts.URL, "http://")}, } - send(ps, tt.m) + send(http.DefaultClient, ps, tt.m) if !tt.ok { if tr != nil { diff --git a/main.go b/main.go index c424dc4dc..53350b996 100644 --- a/main.go +++ b/main.go @@ -151,6 +151,15 @@ func startEtcd() { n = raft.RestartNode(id, peers.IDs(), 10, 1, snapshot, st, ents) } + pt := &http.Transport{ + // timeouts copied from http.DefaultTransport + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, + } + s := &etcdserver.EtcdServer{ Store: st, Node: n, @@ -158,7 +167,7 @@ func startEtcd() { *wal.WAL *snap.Snapshotter }{w, snapshotter}, - Send: etcdhttp.Sender(*peers), + Send: etcdhttp.Sender(pt, *peers), Ticker: time.Tick(100 * time.Millisecond), SyncTicker: time.Tick(500 * time.Millisecond), SnapCount: *snapCount,