rafthttp: always cancel in-flight request when pipeline.send

This fits the way for go1.5 to cancel request.
release-2.2
Yicheng Qin 2015-08-24 10:33:21 -07:00
parent 27b9963959
commit 61a75b3d48
4 changed files with 75 additions and 19 deletions

View File

@ -0,0 +1,35 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !go1.5
package rafthttp
import (
"errors"
"net/http"
)
func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) {
c := make(chan struct{}, 1)
t.mu.Lock()
t.cancel[req] = c
t.mu.Unlock()
select {
case <-t.unblockc:
return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
case <-c:
return nil, errors.New("request canceled")
}
}

View File

@ -0,0 +1,37 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build go1.5
package rafthttp
import (
"errors"
"net/http"
)
func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) {
c := make(chan struct{}, 1)
t.mu.Lock()
t.cancel[req] = c
t.mu.Unlock()
select {
case <-t.unblockc:
return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
case <-req.Cancel:
return nil, errors.New("request canceled")
case <-c:
return nil, errors.New("request canceled")
}
}

View File

@ -25,6 +25,7 @@ import (
"time"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/httputil"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
@ -43,10 +44,6 @@ const (
var errStopped = errors.New("stopped")
type canceler interface {
CancelRequest(*http.Request)
}
type pipeline struct {
from, to types.ID
cid types.ID
@ -149,15 +146,14 @@ func (p *pipeline) post(data []byte) (err error) {
}
}()
done := make(chan struct{}, 1)
cancel := httputil.RequestCanceler(p.tr, req)
go func() {
select {
case <-done:
case <-p.stopc:
waitSchedule()
stopped = true
if cancel, ok := p.tr.(canceler); ok {
cancel.CancelRequest(req)
}
cancel()
}
}()

View File

@ -223,18 +223,6 @@ func newRoundTripperBlocker() *roundTripperBlocker {
cancel: make(map[*http.Request]chan struct{}),
}
}
func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) {
c := make(chan struct{}, 1)
t.mu.Lock()
t.cancel[req] = c
t.mu.Unlock()
select {
case <-t.unblockc:
return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
case <-c:
return nil, errors.New("request canceled")
}
}
func (t *roundTripperBlocker) unblock() {
close(t.unblockc)
}