rafthttp: use customized transport for probing
We need to support TLS verification when probing.release-2.2
parent
a637e86372
commit
b6580a9591
|
@ -104,7 +104,7 @@
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/xiang90/probing",
|
"ImportPath": "github.com/xiang90/probing",
|
||||||
"Rev": "11caf1c32ca4055f97e55541e92a75966635981d"
|
"Rev": "6a0cc1ae81b4cc11db5e491e030e4b98fba79c19"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "golang.org/x/crypto/bcrypt",
|
"ImportPath": "golang.org/x/crypto/bcrypt",
|
||||||
|
|
|
@ -24,10 +24,17 @@ type Prober interface {
|
||||||
type prober struct {
|
type prober struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
targets map[string]*status
|
targets map[string]*status
|
||||||
|
tr http.RoundTripper
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProber() Prober {
|
func NewProber(tr http.RoundTripper) Prober {
|
||||||
return &prober{targets: make(map[string]*status)}
|
p := &prober{targets: make(map[string]*status)}
|
||||||
|
if tr == nil {
|
||||||
|
p.tr = http.DefaultTransport
|
||||||
|
} else {
|
||||||
|
p.tr = tr
|
||||||
|
}
|
||||||
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []string) error {
|
func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []string) error {
|
||||||
|
@ -48,7 +55,11 @@ func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []s
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
resp, err := http.Get(endpoints[pinned])
|
req, err := http.NewRequest("GET", endpoints[pinned], nil)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
resp, err := p.tr.RoundTrip(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.recordFailure()
|
s.recordFailure()
|
||||||
pinned = (pinned + 1) % len(endpoints)
|
pinned = (pinned + 1) % len(endpoints)
|
||||||
|
|
|
@ -13,7 +13,7 @@ var (
|
||||||
func TestProbe(t *testing.T) {
|
func TestProbe(t *testing.T) {
|
||||||
s := httptest.NewServer(NewHandler())
|
s := httptest.NewServer(NewHandler())
|
||||||
|
|
||||||
p := NewProber()
|
p := NewProber(nil)
|
||||||
p.AddHTTP(testID, time.Millisecond, []string{s.URL})
|
p.AddHTTP(testID, time.Millisecond, []string{s.URL})
|
||||||
defer p.Remove(testID)
|
defer p.Remove(testID)
|
||||||
|
|
||||||
|
@ -48,7 +48,7 @@ func TestProbeReset(t *testing.T) {
|
||||||
s := httptest.NewServer(NewHandler())
|
s := httptest.NewServer(NewHandler())
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
p := NewProber()
|
p := NewProber(nil)
|
||||||
p.AddHTTP(testID, time.Millisecond, []string{s.URL})
|
p.AddHTTP(testID, time.Millisecond, []string{s.URL})
|
||||||
defer p.Remove(testID)
|
defer p.Remove(testID)
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ func TestProbeRemove(t *testing.T) {
|
||||||
s := httptest.NewServer(NewHandler())
|
s := httptest.NewServer(NewHandler())
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
p := NewProber()
|
p := NewProber(nil)
|
||||||
p.AddHTTP(testID, time.Millisecond, []string{s.URL})
|
p.AddHTTP(testID, time.Millisecond, []string{s.URL})
|
||||||
|
|
||||||
p.Remove(testID)
|
p.Remove(testID)
|
||||||
|
|
|
@ -100,7 +100,7 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan
|
||||||
remotes: make(map[types.ID]*remote),
|
remotes: make(map[types.ID]*remote),
|
||||||
peers: make(map[types.ID]Peer),
|
peers: make(map[types.ID]Peer),
|
||||||
|
|
||||||
prober: probing.NewProber(),
|
prober: probing.NewProber(rt),
|
||||||
errorc: errorc,
|
errorc: errorc,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,7 @@ func TestTransportAdd(t *testing.T) {
|
||||||
leaderStats: ls,
|
leaderStats: ls,
|
||||||
term: term,
|
term: term,
|
||||||
peers: make(map[types.ID]Peer),
|
peers: make(map[types.ID]Peer),
|
||||||
prober: probing.NewProber(),
|
prober: probing.NewProber(nil),
|
||||||
}
|
}
|
||||||
tr.AddPeer(1, []string{"http://localhost:2380"})
|
tr.AddPeer(1, []string{"http://localhost:2380"})
|
||||||
|
|
||||||
|
@ -106,7 +106,7 @@ func TestTransportRemove(t *testing.T) {
|
||||||
roundTripper: &roundTripperRecorder{},
|
roundTripper: &roundTripperRecorder{},
|
||||||
leaderStats: stats.NewLeaderStats(""),
|
leaderStats: stats.NewLeaderStats(""),
|
||||||
peers: make(map[types.ID]Peer),
|
peers: make(map[types.ID]Peer),
|
||||||
prober: probing.NewProber(),
|
prober: probing.NewProber(nil),
|
||||||
}
|
}
|
||||||
tr.AddPeer(1, []string{"http://localhost:2380"})
|
tr.AddPeer(1, []string{"http://localhost:2380"})
|
||||||
tr.RemovePeer(types.ID(1))
|
tr.RemovePeer(types.ID(1))
|
||||||
|
@ -121,7 +121,7 @@ func TestTransportUpdate(t *testing.T) {
|
||||||
peer := newFakePeer()
|
peer := newFakePeer()
|
||||||
tr := &transport{
|
tr := &transport{
|
||||||
peers: map[types.ID]Peer{types.ID(1): peer},
|
peers: map[types.ID]Peer{types.ID(1): peer},
|
||||||
prober: probing.NewProber(),
|
prober: probing.NewProber(nil),
|
||||||
}
|
}
|
||||||
u := "http://localhost:2380"
|
u := "http://localhost:2380"
|
||||||
tr.UpdatePeer(types.ID(1), []string{u})
|
tr.UpdatePeer(types.ID(1), []string{u})
|
||||||
|
@ -137,7 +137,7 @@ func TestTransportErrorc(t *testing.T) {
|
||||||
roundTripper: newRespRoundTripper(http.StatusForbidden, nil),
|
roundTripper: newRespRoundTripper(http.StatusForbidden, nil),
|
||||||
leaderStats: stats.NewLeaderStats(""),
|
leaderStats: stats.NewLeaderStats(""),
|
||||||
peers: make(map[types.ID]Peer),
|
peers: make(map[types.ID]Peer),
|
||||||
prober: probing.NewProber(),
|
prober: probing.NewProber(nil),
|
||||||
errorc: errorc,
|
errorc: errorc,
|
||||||
}
|
}
|
||||||
tr.AddPeer(1, []string{"http://localhost:2380"})
|
tr.AddPeer(1, []string{"http://localhost:2380"})
|
||||||
|
|
Loading…
Reference in New Issue