rafttest: separate network interface and network

release-2.0
Xiang Li 2015-02-03 22:47:16 -08:00
parent fca9805f84
commit 83edf0d862
2 changed files with 28 additions and 10 deletions

View File

@ -7,17 +7,25 @@ import (
"github.com/coreos/etcd/raft/raftpb"
)
type network interface {
// a network interface
type iface interface {
send(m raftpb.Message)
recv() chan raftpb.Message
disconnect()
connect()
}
// a network
type network interface {
// drop message at given rate (1.0 drops all messages)
drop(from, to uint64, rate float64)
// delay message for (0, d] randomly at given rate (1.0 delay all messages)
// do we need rate here?
delay(from, to uint64, d time.Duration, rate float64)
disconnect(id uint64)
connect(id uint64)
// heal heals the network
heal()
}
type raftNetwork struct {
@ -38,7 +46,7 @@ func newRaftNetwork(nodes ...uint64) *raftNetwork {
return pn
}
func (rn *raftNetwork) nodeNetwork(id uint64) *nodeNetwork {
func (rn *raftNetwork) nodeNetwork(id uint64) iface {
return &nodeNetwork{id: id, raftNetwork: rn}
}
@ -75,6 +83,8 @@ func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) {
panic("unimplemented")
}
func (rn *raftNetwork) heal() {}
func (rn *raftNetwork) disconnect(id uint64) {
rn.mu.Lock()
defer rn.mu.Unlock()
@ -92,6 +102,14 @@ type nodeNetwork struct {
*raftNetwork
}
func (nt *nodeNetwork) connect() {
nt.raftNetwork.connect(nt.id)
}
func (nt *nodeNetwork) disconnect() {
nt.raftNetwork.disconnect(nt.id)
}
func (nt *nodeNetwork) send(m raftpb.Message) {
nt.raftNetwork.send(m)
}

View File

@ -13,7 +13,7 @@ type node struct {
raft.Node
id uint64
paused bool
nt network
iface iface
stopc chan struct{}
// stable
@ -21,14 +21,14 @@ type node struct {
state raftpb.HardState
}
func startNode(id uint64, peers []raft.Peer, nt network) *node {
func startNode(id uint64, peers []raft.Peer, iface iface) *node {
st := raft.NewMemoryStorage()
rn := raft.StartNode(id, peers, 10, 1, st)
n := &node{
Node: rn,
id: id,
storage: st,
nt: nt,
iface: iface,
}
n.start()
return n
@ -51,11 +51,11 @@ func (n *node) start() {
n.storage.Append(rd.Entries)
go func() {
for _, m := range rd.Messages {
n.nt.send(m)
n.iface.send(m)
}
}()
n.Advance()
case m := <-n.nt.recv():
case m := <-n.iface.recv():
n.Step(context.TODO(), m)
case <-n.stopc:
n.Stop()
@ -72,7 +72,7 @@ func (n *node) start() {
// All in memory state of node is discarded.
// All stable MUST be unchanged.
func (n *node) stop() {
n.nt.disconnect(n.id)
n.iface.disconnect()
n.stopc <- struct{}{}
// wait for the shutdown
<-n.stopc
@ -85,7 +85,7 @@ func (n *node) restart() {
<-n.stopc
n.Node = raft.RestartNode(n.id, 10, 1, n.storage, 0)
n.start()
n.nt.connect(n.id)
n.iface.connect()
}
// pause pauses the node.