rafttest: make raft test reliable

release-3.1
Xiang Li 2016-11-04 15:40:50 -07:00
parent 91360e1495
commit e5987dea37
3 changed files with 105 additions and 40 deletions

View File

@ -100,8 +100,20 @@ func (rn *raftNetwork) send(m raftpb.Message) {
time.Sleep(time.Duration(rd))
}
// use marshal/unmarshal to copy message to avoid data race.
b, err := m.Marshal()
if err != nil {
panic(err)
}
var cm raftpb.Message
err = cm.Unmarshal(b)
if err != nil {
panic(err)
}
select {
case to <- m:
case to <- cm:
default:
// drop messages when the receiver queue is full.
}

View File

@ -16,6 +16,7 @@ package rafttest
import (
"log"
"sync"
"time"
"github.com/coreos/etcd/raft"
@ -32,7 +33,9 @@ type node struct {
// stable
storage *raft.MemoryStorage
state raftpb.HardState
mu sync.Mutex // guards state
state raftpb.HardState
}
func startNode(id uint64, peers []raft.Peer, iface iface) *node {
@ -68,7 +71,9 @@ func (n *node) start() {
n.Tick()
case rd := <-n.Ready():
if !raft.IsEmptyHardState(rd.HardState) {
n.mu.Lock()
n.state = rd.HardState
n.mu.Unlock()
n.storage.SetHardState(n.state)
}
n.storage.Append(rd.Entries)
@ -79,7 +84,7 @@ func (n *node) start() {
}
n.Advance()
case m := <-n.iface.recv():
n.Step(context.TODO(), m)
go n.Step(context.TODO(), m)
case <-n.stopc:
n.Stop()
log.Printf("raft.%d: stop", n.id)

View File

@ -33,18 +33,18 @@ func TestBasicProgress(t *testing.T) {
nodes = append(nodes, n)
}
time.Sleep(10 * time.Millisecond)
waitLeader(nodes)
for i := 0; i < 10000; i++ {
for i := 0; i < 100; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}
time.Sleep(500 * time.Millisecond)
if !waitCommitConverge(nodes, 100) {
t.Errorf("commits failed to converge!")
}
for _, n := range nodes {
n.stop()
if n.state.Commit != 10006 {
t.Errorf("commit = %d, want = 10006", n.state.Commit)
}
}
}
@ -59,31 +59,32 @@ func TestRestart(t *testing.T) {
nodes = append(nodes, n)
}
time.Sleep(50 * time.Millisecond)
for i := 0; i < 300; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}
nodes[1].stop()
for i := 0; i < 300; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}
nodes[2].stop()
for i := 0; i < 300; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}
nodes[2].restart()
for i := 0; i < 300; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}
nodes[1].restart()
l := waitLeader(nodes)
k1, k2 := (l+1)%5, (l+2)%5
for i := 0; i < 30; i++ {
nodes[l].Propose(context.TODO(), []byte("somedata"))
}
nodes[k1].stop()
for i := 0; i < 30; i++ {
nodes[(l+3)%5].Propose(context.TODO(), []byte("somedata"))
}
nodes[k2].stop()
for i := 0; i < 30; i++ {
nodes[(l+4)%5].Propose(context.TODO(), []byte("somedata"))
}
nodes[k2].restart()
for i := 0; i < 30; i++ {
nodes[l].Propose(context.TODO(), []byte("somedata"))
}
nodes[k1].restart()
if !waitCommitConverge(nodes, 120) {
t.Errorf("commits failed to converge!")
}
// give some time for nodes to catch up with the raft leader
time.Sleep(500 * time.Millisecond)
for _, n := range nodes {
n.stop()
if n.state.Commit != 1206 {
t.Errorf("commit = %d, want = 1206", n.state.Commit)
}
}
}
@ -98,30 +99,77 @@ func TestPause(t *testing.T) {
nodes = append(nodes, n)
}
time.Sleep(50 * time.Millisecond)
for i := 0; i < 300; i++ {
waitLeader(nodes)
for i := 0; i < 30; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}
nodes[1].pause()
for i := 0; i < 300; i++ {
for i := 0; i < 30; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}
nodes[2].pause()
for i := 0; i < 300; i++ {
for i := 0; i < 30; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}
nodes[2].resume()
for i := 0; i < 300; i++ {
for i := 0; i < 30; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}
nodes[1].resume()
// give some time for nodes to catch up with the raft leader
time.Sleep(300 * time.Millisecond)
if !waitCommitConverge(nodes, 120) {
t.Errorf("commits failed to converge!")
}
for _, n := range nodes {
n.stop()
if n.state.Commit != 1206 {
t.Errorf("commit = %d, want = 1206", n.state.Commit)
}
}
func waitLeader(ns []*node) int {
var l map[uint64]struct{}
var lindex int
for {
l = make(map[uint64]struct{})
for i, n := range ns {
lead := n.Status().SoftState.Lead
if lead != 0 {
l[lead] = struct{}{}
if n.id == lead {
lindex = i
}
}
}
if len(l) == 1 {
return lindex
}
}
}
func waitCommitConverge(ns []*node, target uint64) bool {
var c map[uint64]struct{}
for i := 0; i < 50; i++ {
c = make(map[uint64]struct{})
var good int
for _, n := range ns {
commit := n.Node.Status().HardState.Commit
c[commit] = struct{}{}
if commit > target {
good++
}
}
if len(c) == 1 && good == len(ns) {
return true
}
time.Sleep(100 * time.Millisecond)
}
return false
}