rafthttp: {nopProcessor, errProcessor} -> fakeRaft

release-2.1
Yicheng Qin 2015-03-02 13:31:56 -08:00
parent ee8325d62c
commit fc2d7019e5
3 changed files with 23 additions and 29 deletions

View File

@ -46,7 +46,7 @@ func TestServeRaftPrefix(t *testing.T) {
bytes.NewReader(
pbutil.MustMarshal(&raftpb.Message{}),
),
&nopProcessor{},
&fakeRaft{},
"0",
http.StatusMethodNotAllowed,
},
@ -56,7 +56,7 @@ func TestServeRaftPrefix(t *testing.T) {
bytes.NewReader(
pbutil.MustMarshal(&raftpb.Message{}),
),
&nopProcessor{},
&fakeRaft{},
"0",
http.StatusMethodNotAllowed,
},
@ -66,7 +66,7 @@ func TestServeRaftPrefix(t *testing.T) {
bytes.NewReader(
pbutil.MustMarshal(&raftpb.Message{}),
),
&nopProcessor{},
&fakeRaft{},
"0",
http.StatusMethodNotAllowed,
},
@ -74,7 +74,7 @@ func TestServeRaftPrefix(t *testing.T) {
// bad request body
"POST",
&errReader{},
&nopProcessor{},
&fakeRaft{},
"0",
http.StatusBadRequest,
},
@ -82,7 +82,7 @@ func TestServeRaftPrefix(t *testing.T) {
// bad request protobuf
"POST",
strings.NewReader("malformed garbage"),
&nopProcessor{},
&fakeRaft{},
"0",
http.StatusBadRequest,
},
@ -92,7 +92,7 @@ func TestServeRaftPrefix(t *testing.T) {
bytes.NewReader(
pbutil.MustMarshal(&raftpb.Message{}),
),
&nopProcessor{},
&fakeRaft{},
"1",
http.StatusPreconditionFailed,
},
@ -102,7 +102,7 @@ func TestServeRaftPrefix(t *testing.T) {
bytes.NewReader(
pbutil.MustMarshal(&raftpb.Message{}),
),
&errProcessor{
&fakeRaft{
err: &resWriterToError{code: http.StatusForbidden},
},
"0",
@ -114,7 +114,7 @@ func TestServeRaftPrefix(t *testing.T) {
bytes.NewReader(
pbutil.MustMarshal(&raftpb.Message{}),
),
&errProcessor{
&fakeRaft{
err: &resWriterToError{code: http.StatusInternalServerError},
},
"0",
@ -126,7 +126,7 @@ func TestServeRaftPrefix(t *testing.T) {
bytes.NewReader(
pbutil.MustMarshal(&raftpb.Message{}),
),
&errProcessor{err: errors.New("blah")},
&fakeRaft{err: errors.New("blah")},
"0",
http.StatusInternalServerError,
},
@ -136,7 +136,7 @@ func TestServeRaftPrefix(t *testing.T) {
bytes.NewReader(
pbutil.MustMarshal(&raftpb.Message{}),
),
&nopProcessor{},
&fakeRaft{},
"0",
http.StatusNoContent,
},
@ -320,19 +320,13 @@ type errReader struct{}
func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") }
type nopProcessor struct{}
func (p *nopProcessor) Process(ctx context.Context, m raftpb.Message) error { return nil }
func (p *nopProcessor) ReportUnreachable(id uint64) {}
func (p *nopProcessor) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
type errProcessor struct {
type fakeRaft struct {
err error
}
func (p *errProcessor) Process(ctx context.Context, m raftpb.Message) error { return p.err }
func (p *errProcessor) ReportUnreachable(id uint64) {}
func (p *errProcessor) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
func (p *fakeRaft) Process(ctx context.Context, m raftpb.Message) error { return p.err }
func (p *fakeRaft) ReportUnreachable(id uint64) {}
func (p *fakeRaft) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
type resWriterToError struct {
code int

View File

@ -32,7 +32,7 @@ import (
func TestPipelineSend(t *testing.T) {
tr := &roundTripperRecorder{}
fs := &stats.FollowerStats{}
p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &nopProcessor{}, nil)
p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
p.stop()
@ -50,7 +50,7 @@ func TestPipelineSend(t *testing.T) {
func TestPipelineExceedMaximalServing(t *testing.T) {
tr := newRoundTripperBlocker()
fs := &stats.FollowerStats{}
p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &nopProcessor{}, nil)
p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
// keep the sender busy and make the buffer full
// nothing can go out as we block the sender
@ -89,7 +89,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) {
// it increases fail count in stats.
func TestPipelineSendFailed(t *testing.T) {
fs := &stats.FollowerStats{}
p := newPipeline(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), fs, &nopProcessor{}, nil)
p := newPipeline(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
p.stop()
@ -103,7 +103,7 @@ func TestPipelineSendFailed(t *testing.T) {
func TestPipelinePost(t *testing.T) {
tr := &roundTripperRecorder{}
p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), nil, &nopProcessor{}, nil)
p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), nil, &fakeRaft{}, nil)
if err := p.post([]byte("some data")); err != nil {
t.Fatalf("unexpect post error: %v", err)
}
@ -145,7 +145,7 @@ func TestPipelinePostBad(t *testing.T) {
{"http://10.0.0.1", http.StatusCreated, nil},
}
for i, tt := range tests {
p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &nopProcessor{}, make(chan error))
p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &fakeRaft{}, make(chan error))
err := p.post([]byte("some data"))
p.stop()
@ -166,7 +166,7 @@ func TestPipelinePostErrorc(t *testing.T) {
}
for i, tt := range tests {
errorc := make(chan error, 1)
p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &nopProcessor{}, errorc)
p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &fakeRaft{}, errorc)
p.post([]byte("some data"))
p.stop()
select {

View File

@ -17,7 +17,7 @@ import (
// to streamWriter. After that, streamWriter can use it to send messages
// continuously, and closes it when stopped.
func TestStreamWriterAttachOutgoingConn(t *testing.T) {
sw := startStreamWriter(&stats.FollowerStats{}, &nopProcessor{})
sw := startStreamWriter(&stats.FollowerStats{}, &fakeRaft{})
// the expected initial state of streamWrite is not working
if g := sw.isWorking(); g != false {
t.Errorf("initial working status = %v, want false", g)
@ -63,7 +63,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) {
// TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad
// outgoingConn will close the outgoingConn and fall back to non-working status.
func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
sw := startStreamWriter(&stats.FollowerStats{}, &nopProcessor{})
sw := startStreamWriter(&stats.FollowerStats{}, &fakeRaft{})
defer sw.stop()
wfc := &fakeWriteFlushCloser{err: errors.New("blah")}
sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
@ -183,7 +183,7 @@ func TestStream(t *testing.T) {
srv := httptest.NewServer(h)
defer srv.Close()
sw := startStreamWriter(&stats.FollowerStats{}, &nopProcessor{})
sw := startStreamWriter(&stats.FollowerStats{}, &fakeRaft{})
defer sw.stop()
h.sw = sw