diff --git a/raft/rafttest/network.go b/raft/rafttest/network.go index d305a44b1..0da9e045c 100644 --- a/raft/rafttest/network.go +++ b/raft/rafttest/network.go @@ -7,17 +7,25 @@ import ( "github.com/coreos/etcd/raft/raftpb" ) -type network interface { +// a network interface +type iface interface { send(m raftpb.Message) recv() chan raftpb.Message + disconnect() + connect() +} + +// a network +type network interface { // drop message at given rate (1.0 drops all messages) drop(from, to uint64, rate float64) // delay message for (0, d] randomly at given rate (1.0 delay all messages) // do we need rate here? delay(from, to uint64, d time.Duration, rate float64) - disconnect(id uint64) connect(id uint64) + // heal heals the network + heal() } type raftNetwork struct { @@ -38,7 +46,7 @@ func newRaftNetwork(nodes ...uint64) *raftNetwork { return pn } -func (rn *raftNetwork) nodeNetwork(id uint64) *nodeNetwork { +func (rn *raftNetwork) nodeNetwork(id uint64) iface { return &nodeNetwork{id: id, raftNetwork: rn} } @@ -75,6 +83,8 @@ func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) { panic("unimplemented") } +func (rn *raftNetwork) heal() {} + func (rn *raftNetwork) disconnect(id uint64) { rn.mu.Lock() defer rn.mu.Unlock() @@ -92,6 +102,14 @@ type nodeNetwork struct { *raftNetwork } +func (nt *nodeNetwork) connect() { + nt.raftNetwork.connect(nt.id) +} + +func (nt *nodeNetwork) disconnect() { + nt.raftNetwork.disconnect(nt.id) +} + func (nt *nodeNetwork) send(m raftpb.Message) { nt.raftNetwork.send(m) } diff --git a/raft/rafttest/node.go b/raft/rafttest/node.go index fdceb92f5..c3778f2b1 100644 --- a/raft/rafttest/node.go +++ b/raft/rafttest/node.go @@ -13,7 +13,7 @@ type node struct { raft.Node id uint64 paused bool - nt network + iface iface stopc chan struct{} // stable @@ -21,14 +21,14 @@ type node struct { state raftpb.HardState } -func startNode(id uint64, peers []raft.Peer, nt network) *node { +func startNode(id uint64, peers []raft.Peer, iface iface) *node { st := raft.NewMemoryStorage() rn := raft.StartNode(id, peers, 10, 1, st) n := &node{ Node: rn, id: id, storage: st, - nt: nt, + iface: iface, } n.start() return n @@ -51,11 +51,11 @@ func (n *node) start() { n.storage.Append(rd.Entries) go func() { for _, m := range rd.Messages { - n.nt.send(m) + n.iface.send(m) } }() n.Advance() - case m := <-n.nt.recv(): + case m := <-n.iface.recv(): n.Step(context.TODO(), m) case <-n.stopc: n.Stop() @@ -72,7 +72,7 @@ func (n *node) start() { // All in memory state of node is discarded. // All stable MUST be unchanged. func (n *node) stop() { - n.nt.disconnect(n.id) + n.iface.disconnect() n.stopc <- struct{}{} // wait for the shutdown <-n.stopc @@ -85,7 +85,7 @@ func (n *node) restart() { <-n.stopc n.Node = raft.RestartNode(n.id, 10, 1, n.storage, 0) n.start() - n.nt.connect(n.id) + n.iface.connect() } // pause pauses the node.