etcd/raft/node_test.go

404 lines
9.8 KiB
Go
Raw Normal View History

/*
Copyright 2014 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
2014-05-29 00:53:26 +04:00
package raft
2014-08-29 03:41:42 +04:00
import (
"reflect"
"testing"
2014-09-12 02:31:47 +04:00
"time"
2014-05-29 00:53:26 +04:00
2014-10-14 11:35:28 +04:00
"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
2014-10-27 19:46:37 +03:00
"github.com/coreos/etcd/pkg/testutil"
2014-09-04 07:06:16 +04:00
"github.com/coreos/etcd/raft/raftpb"
2014-08-29 03:41:42 +04:00
)
2014-09-12 06:00:40 +04:00
// TestNodeStep ensures that node.Step sends msgProp to propc chan
// and other kinds of messages to recvc chan.
2014-09-12 02:31:47 +04:00
func TestNodeStep(t *testing.T) {
2014-10-12 11:34:22 +04:00
for i, msgn := range raftpb.MessageType_name {
2014-09-17 23:23:44 +04:00
n := &node{
2014-09-12 02:31:47 +04:00
propc: make(chan raftpb.Message, 1),
recvc: make(chan raftpb.Message, 1),
}
2014-10-12 11:34:22 +04:00
msgt := raftpb.MessageType(i)
n.Step(context.TODO(), raftpb.Message{Type: msgt})
2014-09-12 06:00:40 +04:00
// Proposal goes to proc chan. Others go to recvc chan.
2014-10-12 11:34:22 +04:00
if msgt == raftpb.MsgProp {
2014-09-12 02:31:47 +04:00
select {
case <-n.propc:
default:
2014-10-12 11:34:22 +04:00
t.Errorf("%d: cannot receive %s on propc chan", msgt, msgn)
2014-09-12 02:31:47 +04:00
}
} else {
2014-10-12 11:34:22 +04:00
if msgt == raftpb.MsgBeat || msgt == raftpb.MsgHup {
select {
case <-n.recvc:
2014-10-12 11:34:22 +04:00
t.Errorf("%d: step should ignore %s", msgt, msgn)
default:
}
} else {
select {
case <-n.recvc:
default:
2014-10-12 11:34:22 +04:00
t.Errorf("%d: cannot receive %s on recvc chan", msgt, msgn)
}
2014-09-12 02:31:47 +04:00
}
}
}
}
// Cancel and Stop should unblock Step()
func TestNodeStepUnblock(t *testing.T) {
2014-09-12 06:00:40 +04:00
// a node without buffer to block step
2014-09-17 23:23:44 +04:00
n := &node{
2014-09-12 02:31:47 +04:00
propc: make(chan raftpb.Message),
done: make(chan struct{}),
}
ctx, cancel := context.WithCancel(context.Background())
stopFunc := func() { close(n.done) }
tests := []struct {
unblock func()
werr error
}{
{stopFunc, ErrStopped},
{cancel, context.Canceled},
}
for i, tt := range tests {
errc := make(chan error, 1)
go func() {
2014-10-12 11:34:22 +04:00
err := n.Step(ctx, raftpb.Message{Type: raftpb.MsgProp})
2014-09-12 02:31:47 +04:00
errc <- err
}()
tt.unblock()
select {
case err := <-errc:
if err != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
2014-09-12 02:31:47 +04:00
}
//clean up side-effect
if ctx.Err() != nil {
ctx = context.TODO()
}
select {
case <-n.done:
n.done = make(chan struct{})
default:
}
case <-time.After(time.Millisecond * 100):
t.Errorf("#%d: failed to unblock step", i)
}
}
}
2014-11-13 01:39:07 +03:00
// TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft.
func TestNodePropose(t *testing.T) {
msgs := []raftpb.Message{}
appendStep := func(r *raft, m raftpb.Message) {
msgs = append(msgs, m)
}
n := newNode()
r := newRaft(1, []uint64{1}, 10, 1)
go n.run(r)
n.Campaign(context.TODO())
for {
rd := <-n.Ready()
// change the step function to appendStep until this raft becomes leader
if rd.SoftState.Lead == r.id {
r.step = appendStep
n.Advance()
break
}
n.Advance()
}
n.Propose(context.TODO(), []byte("somedata"))
n.Stop()
if len(msgs) != 1 {
t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
}
if msgs[0].Type != raftpb.MsgProp {
t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp)
}
if !reflect.DeepEqual(msgs[0].Entries[0].Data, []byte("somedata")) {
t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, []byte("somedata"))
}
}
2014-09-12 07:32:55 +04:00
// TestBlockProposal ensures that node will block proposal when it does not
2014-09-12 22:48:08 +04:00
// know who is the current leader; node will accept proposal when it knows
2014-09-12 07:32:55 +04:00
// who is the current leader.
func TestBlockProposal(t *testing.T) {
n := newNode()
2014-10-08 14:29:53 +04:00
r := newRaft(1, []uint64{1}, 10, 1)
2014-09-12 09:13:28 +04:00
go n.run(r)
2014-09-12 23:28:15 +04:00
defer n.Stop()
2014-09-12 09:13:28 +04:00
errc := make(chan error, 1)
go func() {
errc <- n.Propose(context.TODO(), []byte("somedata"))
}()
2014-10-27 19:46:37 +03:00
testutil.ForceGosched()
2014-09-12 09:13:28 +04:00
select {
case err := <-errc:
t.Errorf("err = %v, want blocking", err)
default:
}
n.Campaign(context.TODO())
2014-10-27 19:46:37 +03:00
testutil.ForceGosched()
2014-09-12 09:13:28 +04:00
select {
case err := <-errc:
if err != nil {
t.Errorf("err = %v, want %v", err, nil)
2014-09-12 07:32:55 +04:00
}
2014-09-12 09:13:28 +04:00
default:
t.Errorf("blocking proposal, want unblocking")
2014-09-12 07:32:55 +04:00
}
2014-09-12 09:13:28 +04:00
}
2014-09-12 07:32:55 +04:00
2014-11-12 22:39:22 +03:00
// TestNodeTick ensures that node.Tick() will increase the
// elapsed of the underly raft state machine.
func TestNodeTick(t *testing.T) {
n := newNode()
r := newRaft(1, []uint64{1}, 10, 1)
go n.run(r)
elapsed := r.elapsed
n.Tick()
n.Stop()
if r.elapsed != elapsed+1 {
t.Errorf("elapsed = %d, want %d", r.elapsed, elapsed+1)
}
}
2014-09-12 09:13:28 +04:00
func TestReadyContainUpdates(t *testing.T) {
tests := []struct {
rd Ready
wcontain bool
}{
{Ready{}, false},
2014-09-16 04:35:02 +04:00
{Ready{SoftState: &SoftState{Lead: 1}}, true},
{Ready{HardState: raftpb.HardState{Vote: 1}}, true},
2014-09-12 09:13:28 +04:00
{Ready{Entries: make([]raftpb.Entry, 1, 1)}, true},
{Ready{CommittedEntries: make([]raftpb.Entry, 1, 1)}, true},
{Ready{Messages: make([]raftpb.Message, 1, 1)}, true},
2014-09-17 05:18:45 +04:00
{Ready{Snapshot: raftpb.Snapshot{Index: 1}}, true},
2014-09-12 09:13:28 +04:00
}
for i, tt := range tests {
2014-09-16 04:35:02 +04:00
if g := tt.rd.containsUpdates(); g != tt.wcontain {
t.Errorf("#%d: containUpdates = %v, want %v", i, g, tt.wcontain)
2014-09-12 09:13:28 +04:00
}
2014-09-12 07:32:55 +04:00
}
}
// TestNodeStart ensures that a node can be started correctly. The node should
// start with correct configuration change entries, and can accept and commit
// proposals.
func TestNodeStart(t *testing.T) {
2014-08-29 03:41:42 +04:00
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
ccdata, err := cc.Marshal()
if err != nil {
t.Fatalf("unexpected marshal error: %v", err)
}
2014-09-04 22:18:09 +04:00
wants := []Ready{
2014-08-29 03:41:42 +04:00
{
SoftState: &SoftState{Lead: 1, Nodes: []uint64{1}, RaftState: StateLeader},
HardState: raftpb.HardState{Term: 1, Commit: 2},
Entries: []raftpb.Entry{
{},
{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
{Term: 1, Index: 2},
},
CommittedEntries: []raftpb.Entry{
{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
{Term: 1, Index: 2},
},
2014-08-29 03:41:42 +04:00
},
{
HardState: raftpb.HardState{Term: 1, Commit: 3},
Entries: []raftpb.Entry{{Term: 1, Index: 3, Data: []byte("foo")}},
CommittedEntries: []raftpb.Entry{{Term: 1, Index: 3, Data: []byte("foo")}},
2014-08-29 03:41:42 +04:00
},
}
n := StartNode(1, []Peer{{ID: 1}}, 10, 1)
n.ApplyConfChange(cc)
2014-09-04 22:18:09 +04:00
n.Campaign(ctx)
if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) {
t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0])
} else {
n.Advance()
2014-09-04 22:18:09 +04:00
}
n.Propose(ctx, []byte("foo"))
if g := <-n.Ready(); !reflect.DeepEqual(g, wants[1]) {
t.Errorf("#%d: g = %+v,\n w %+v", 2, g, wants[1])
} else {
n.Advance()
2014-08-29 03:41:42 +04:00
}
select {
2014-09-04 22:18:09 +04:00
case rd := <-n.Ready():
2014-08-29 03:41:42 +04:00
t.Errorf("unexpected Ready: %+v", rd)
default:
}
}
2014-09-05 08:15:39 +04:00
func TestNodeRestart(t *testing.T) {
entries := []raftpb.Entry{
{},
2014-09-05 08:15:39 +04:00
{Term: 1, Index: 1},
{Term: 1, Index: 2, Data: []byte("foo")},
}
2014-09-16 04:35:02 +04:00
st := raftpb.HardState{Term: 1, Commit: 1}
2014-09-05 08:15:39 +04:00
want := Ready{
2014-09-16 04:35:02 +04:00
HardState: emptyState,
2014-09-09 03:16:58 +04:00
// commit upto index commit index in st
CommittedEntries: entries[1 : st.Commit+1],
2014-09-05 08:15:39 +04:00
}
n := RestartNode(1, 10, 1, nil, st, entries)
2014-09-05 08:15:39 +04:00
if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
t.Errorf("g = %+v,\n w %+v", g, want)
} else {
n.Advance()
2014-09-05 08:15:39 +04:00
}
select {
case rd := <-n.Ready():
t.Errorf("unexpected Ready: %+v", rd)
default:
}
}
2014-09-12 09:13:28 +04:00
2014-09-17 05:18:45 +04:00
// TestCompacts ensures Node.Compact creates a correct raft snapshot and compacts
// the raft log (call raft.compact)
2014-10-07 11:53:53 +04:00
func TestNodeCompact(t *testing.T) {
2014-09-17 05:18:45 +04:00
ctx := context.Background()
n := newNode()
2014-10-08 14:29:53 +04:00
r := newRaft(1, []uint64{1}, 10, 1)
2014-09-17 05:18:45 +04:00
go n.run(r)
n.Campaign(ctx)
n.Propose(ctx, []byte("foo"))
w := raftpb.Snapshot{
Term: 1,
Index: 2, // one nop + one proposal
Data: []byte("a snapshot"),
Nodes: []uint64{1},
2014-09-17 05:18:45 +04:00
}
2014-10-27 19:46:37 +03:00
testutil.ForceGosched()
2014-09-17 05:18:45 +04:00
select {
case <-n.Ready():
n.Advance()
2014-09-17 05:18:45 +04:00
default:
t.Fatalf("unexpected proposal failure: unable to commit entry")
}
n.Compact(w.Index, w.Nodes, w.Data)
2014-10-27 19:46:37 +03:00
testutil.ForceGosched()
2014-09-17 05:18:45 +04:00
select {
case rd := <-n.Ready():
if !reflect.DeepEqual(rd.Snapshot, w) {
t.Errorf("snap = %+v, want %+v", rd.Snapshot, w)
}
n.Advance()
2014-09-17 05:18:45 +04:00
default:
t.Fatalf("unexpected compact failure: unable to create a snapshot")
}
2014-10-27 19:46:37 +03:00
testutil.ForceGosched()
2014-09-17 05:18:45 +04:00
// TODO: this test the run updates the snapi correctly... should be tested
// separately with other kinds of updates
select {
case <-n.Ready():
t.Fatalf("unexpected more ready")
default:
}
n.Stop()
if r.raftLog.offset != w.Index {
t.Errorf("log.offset = %d, want %d", r.raftLog.offset, w.Index)
}
}
func TestNodeAdvance(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
n := StartNode(1, []Peer{{ID: 1}}, 10, 1)
n.ApplyConfChange(raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1})
n.Campaign(ctx)
<-n.Ready()
n.Propose(ctx, []byte("foo"))
select {
case rd := <-n.Ready():
t.Fatalf("unexpected Ready before Advance: %+v", rd)
default:
}
n.Advance()
select {
case <-n.Ready():
default:
t.Errorf("expect Ready after Advance, but there is no Ready available")
}
}
func TestSoftStateEqual(t *testing.T) {
tests := []struct {
st *SoftState
we bool
}{
{&SoftState{}, true},
{&SoftState{Lead: 1}, false},
{&SoftState{RaftState: StateLeader}, false},
2014-10-08 14:29:53 +04:00
{&SoftState{Nodes: []uint64{1, 2}}, false},
}
for i, tt := range tests {
if g := tt.st.equal(&SoftState{}); g != tt.we {
t.Errorf("#%d, equal = %v, want %v", i, g, tt.we)
}
}
}
func TestIsHardStateEqual(t *testing.T) {
2014-09-12 22:48:08 +04:00
tests := []struct {
2014-09-16 04:35:02 +04:00
st raftpb.HardState
2014-09-12 22:48:08 +04:00
we bool
}{
{emptyState, true},
2014-09-16 04:35:02 +04:00
{raftpb.HardState{Vote: 1}, false},
{raftpb.HardState{Commit: 1}, false},
{raftpb.HardState{Term: 1}, false},
2014-09-12 22:48:08 +04:00
}
for i, tt := range tests {
if isHardStateEqual(tt.st, emptyState) != tt.we {
t.Errorf("#%d, equal = %v, want %v", i, isHardStateEqual(tt.st, emptyState), tt.we)
2014-09-12 22:48:08 +04:00
}
}
}