Merge pull request #2203 from xiang90/raft_test

raft: add raft test suite
release-2.0
Xiang Li 2015-02-01 14:57:09 -08:00
commit d16c5e1e81
4 changed files with 192 additions and 1 deletions

2
Godeps/Godeps.json generated
View File

@ -1,6 +1,6 @@
{
"ImportPath": "github.com/coreos/etcd",
"GoVersion": "go1.3.1",
"GoVersion": "go1.4.1",
"Packages": [
"./..."
],

73
raft/rafttest/network.go Normal file
View File

@ -0,0 +1,73 @@
package rafttest
import (
"time"
"github.com/coreos/etcd/raft/raftpb"
)
type network interface {
send(m raftpb.Message)
recv() chan raftpb.Message
// drop message at given rate (1.0 drops all messages)
drop(from, to uint64, rate float64)
// delay message for (0, d] randomly at given rate (1.0 delay all messages)
// do we need rate here?
delay(from, to uint64, d time.Duration, rate float64)
}
type raftNetwork struct {
recvQueues map[uint64]chan raftpb.Message
}
func newRaftNetwork(nodes ...uint64) *raftNetwork {
pn := &raftNetwork{
recvQueues: make(map[uint64]chan raftpb.Message, 0),
}
for _, n := range nodes {
pn.recvQueues[n] = make(chan raftpb.Message, 1024)
}
return pn
}
func (rn *raftNetwork) nodeNetwork(id uint64) *nodeNetwork {
return &nodeNetwork{id: id, raftNetwork: rn}
}
func (rn *raftNetwork) send(m raftpb.Message) {
to := rn.recvQueues[m.To]
if to == nil {
panic("sent to nil")
}
to <- m
}
func (rn *raftNetwork) recvFrom(from uint64) chan raftpb.Message {
fromc := rn.recvQueues[from]
if fromc == nil {
panic("recv from nil")
}
return fromc
}
func (rn *raftNetwork) drop(from, to uint64, rate float64) {
panic("unimplemented")
}
func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) {
panic("unimplemented")
}
type nodeNetwork struct {
id uint64
*raftNetwork
}
func (nt *nodeNetwork) send(m raftpb.Message) {
nt.raftNetwork.send(m)
}
func (nt *nodeNetwork) recv() chan raftpb.Message {
return nt.recvFrom(nt.id)
}

84
raft/rafttest/node.go Normal file
View File

@ -0,0 +1,84 @@
package rafttest
import (
"log"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
)
type node struct {
raft.Node
paused bool
nt network
stopc chan struct{}
// stable
storage *raft.MemoryStorage
state raftpb.HardState
}
func startNode(id uint64, peers []raft.Peer, nt network) *node {
st := raft.NewMemoryStorage()
rn := raft.StartNode(id, peers, 10, 1, st)
n := &node{
Node: rn,
storage: st,
nt: nt,
stopc: make(chan struct{}),
}
ticker := time.Tick(5 * time.Millisecond)
go func() {
for {
select {
case <-ticker:
n.Tick()
case rd := <-n.Ready():
if !raft.IsEmptyHardState(rd.HardState) {
n.state = rd.HardState
}
n.storage.Append(rd.Entries)
go func() {
for _, m := range rd.Messages {
nt.send(m)
}
}()
n.Advance()
case m := <-n.nt.recv():
n.Step(context.TODO(), m)
case <-n.stopc:
log.Printf("raft.%d: stop", id)
return
}
}
}()
return n
}
func (n *node) stop() { close(n.stopc) }
// restart restarts the node with the given delay.
// All in memory state of node is reset to initialized state.
// All stable MUST be unchanged.
func (n *node) restart(delay time.Duration) {
panic("unimplemented")
}
// pause pauses the node.
// The paused node buffers the received messages and replies
// all of them when it resumes.
func (n *node) pause() {
panic("unimplemented")
}
// resume resumes the paused node.
func (n *node) resume() {
panic("unimplemented")
}
func (n *node) isPaused() bool {
return n.paused
}

View File

@ -0,0 +1,34 @@
package rafttest
import (
"testing"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/raft"
)
func TestBasicProgress(t *testing.T) {
peers := []raft.Peer{{1, nil}, {2, nil}, {3, nil}, {4, nil}, {5, nil}}
nt := newRaftNetwork(1, 2, 3, 4, 5)
nodes := make([]*node, 0)
for i := 1; i <= 5; i++ {
n := startNode(uint64(i), peers, nt.nodeNetwork(uint64(i)))
nodes = append(nodes, n)
}
time.Sleep(50 * time.Millisecond)
for i := 0; i < 1000; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}
time.Sleep(100 * time.Millisecond)
for _, n := range nodes {
n.stop()
if n.state.Commit < 1000 {
t.Errorf("commit = %d, want > 1000", n.state.Commit)
}
}
}