etcd/rafthttp/transport_test.go

155 lines
4.1 KiB
Go
Raw Normal View History

// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2014-11-07 03:48:03 +03:00
package rafthttp
2014-11-07 03:48:03 +03:00
import (
2014-11-13 02:41:11 +03:00
"net/http"
2015-02-26 11:13:35 +03:00
"reflect"
2014-11-07 03:48:03 +03:00
"testing"
2014-11-13 02:41:11 +03:00
"time"
2014-11-07 03:48:03 +03:00
2015-08-03 04:07:16 +03:00
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing"
2014-11-07 03:48:03 +03:00
"github.com/coreos/etcd/etcdserver/stats"
2014-11-13 02:41:11 +03:00
"github.com/coreos/etcd/pkg/testutil"
2014-11-07 03:48:03 +03:00
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
2014-11-07 03:48:03 +03:00
)
2015-02-26 11:13:35 +03:00
// TestTransportSend tests that transport can send messages using correct
// underlying peer, and drop local or unknown-target messages.
func TestTransportSend(t *testing.T) {
ss := &stats.ServerStats{}
ss.Initialize()
peer1 := newFakePeer()
peer2 := newFakePeer()
tr := &Transport{
ServerStats: ss,
2015-02-26 11:13:35 +03:00
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
}
wmsgsIgnored := []raftpb.Message{
// bad local message
{Type: raftpb.MsgBeat},
// bad remote message
{Type: raftpb.MsgProp, To: 3},
}
wmsgsTo1 := []raftpb.Message{
// good message
{Type: raftpb.MsgProp, To: 1},
{Type: raftpb.MsgApp, To: 1},
}
wmsgsTo2 := []raftpb.Message{
// good message
{Type: raftpb.MsgProp, To: 2},
{Type: raftpb.MsgApp, To: 2},
}
tr.Send(wmsgsIgnored)
tr.Send(wmsgsTo1)
tr.Send(wmsgsTo2)
if !reflect.DeepEqual(peer1.msgs, wmsgsTo1) {
t.Errorf("msgs to peer 1 = %+v, want %+v", peer1.msgs, wmsgsTo1)
}
if !reflect.DeepEqual(peer2.msgs, wmsgsTo2) {
t.Errorf("msgs to peer 2 = %+v, want %+v", peer2.msgs, wmsgsTo2)
}
}
2014-12-29 04:44:26 +03:00
func TestTransportAdd(t *testing.T) {
2014-11-07 03:48:03 +03:00
ls := stats.NewLeaderStats("")
tr := &Transport{
LeaderStats: ls,
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
2014-12-29 04:44:26 +03:00
}
2015-04-06 18:38:13 +03:00
tr.AddPeer(1, []string{"http://localhost:2380"})
2014-11-07 03:48:03 +03:00
if _, ok := ls.Followers["1"]; !ok {
t.Errorf("FollowerStats[1] is nil, want exists")
}
2014-12-29 04:44:26 +03:00
s, ok := tr.peers[types.ID(1)]
2014-11-07 03:48:03 +03:00
if !ok {
tr.Stop()
2014-11-07 03:48:03 +03:00
t.Fatalf("senders[1] is nil, want exists")
}
2014-12-29 04:44:26 +03:00
// duplicate AddPeer is ignored
2015-04-06 18:38:13 +03:00
tr.AddPeer(1, []string{"http://localhost:2380"})
2014-12-29 04:44:26 +03:00
ns := tr.peers[types.ID(1)]
if s != ns {
t.Errorf("sender = %v, want %v", ns, s)
}
tr.Stop()
2014-11-07 03:48:03 +03:00
}
2014-12-29 04:44:26 +03:00
func TestTransportRemove(t *testing.T) {
tr := &Transport{
LeaderStats: stats.NewLeaderStats(""),
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
2014-12-29 04:44:26 +03:00
}
2015-04-06 18:38:13 +03:00
tr.AddPeer(1, []string{"http://localhost:2380"})
2014-12-29 04:44:26 +03:00
tr.RemovePeer(types.ID(1))
defer tr.Stop()
2014-11-07 03:48:03 +03:00
2014-12-29 04:44:26 +03:00
if _, ok := tr.peers[types.ID(1)]; ok {
2014-11-07 03:48:03 +03:00
t.Fatalf("senders[1] exists, want removed")
}
}
2014-11-13 02:41:11 +03:00
2015-02-26 11:13:35 +03:00
func TestTransportUpdate(t *testing.T) {
peer := newFakePeer()
tr := &Transport{
2015-08-03 04:07:16 +03:00
peers: map[types.ID]Peer{types.ID(1): peer},
prober: probing.NewProber(nil),
2015-02-26 11:13:35 +03:00
}
2015-04-06 18:38:13 +03:00
u := "http://localhost:2380"
2015-02-26 11:13:35 +03:00
tr.UpdatePeer(types.ID(1), []string{u})
2015-04-06 18:38:13 +03:00
wurls := types.URLs(testutil.MustNewURLs(t, []string{"http://localhost:2380"}))
2015-02-27 18:54:06 +03:00
if !reflect.DeepEqual(peer.urls, wurls) {
t.Errorf("urls = %+v, want %+v", peer.urls, wurls)
2015-02-26 11:13:35 +03:00
}
}
2015-01-03 07:00:29 +03:00
func TestTransportErrorc(t *testing.T) {
errorc := make(chan error, 1)
tr := &Transport{
LeaderStats: stats.NewLeaderStats(""),
ErrorC: errorc,
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
2014-12-29 04:44:26 +03:00
}
2015-04-06 18:38:13 +03:00
tr.AddPeer(1, []string{"http://localhost:2380"})
defer tr.Stop()
2014-11-14 10:03:34 +03:00
select {
2015-01-03 07:00:29 +03:00
case <-errorc:
t.Fatalf("received unexpected from errorc")
2014-11-14 10:03:34 +03:00
case <-time.After(10 * time.Millisecond):
}
2015-10-18 09:30:30 +03:00
tr.peers[1].send(raftpb.Message{})
2014-11-14 10:03:34 +03:00
testutil.WaitSchedule()
2014-11-14 10:03:34 +03:00
select {
2015-01-03 07:00:29 +03:00
case <-errorc:
2014-11-14 10:03:34 +03:00
default:
2015-01-03 07:00:29 +03:00
t.Fatalf("cannot receive error from errorc")
2014-11-14 10:03:34 +03:00
}
}