// Copyright 2015 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package rafttest import ( "math/rand" "sync" "time" "go.etcd.io/etcd/raft/raftpb" ) // a network interface type iface interface { send(m raftpb.Message) recv() chan raftpb.Message disconnect() connect() } // a network type network interface { // drop message at given rate (1.0 drops all messages) drop(from, to uint64, rate float64) // delay message for (0, d] randomly at given rate (1.0 delay all messages) // do we need rate here? delay(from, to uint64, d time.Duration, rate float64) disconnect(id uint64) connect(id uint64) // heal heals the network heal() } type raftNetwork struct { rand *rand.Rand mu sync.Mutex disconnected map[uint64]bool dropmap map[conn]float64 delaymap map[conn]delay recvQueues map[uint64]chan raftpb.Message } type conn struct { from, to uint64 } type delay struct { d time.Duration rate float64 } func newRaftNetwork(nodes ...uint64) *raftNetwork { pn := &raftNetwork{ rand: rand.New(rand.NewSource(1)), recvQueues: make(map[uint64]chan raftpb.Message), dropmap: make(map[conn]float64), delaymap: make(map[conn]delay), disconnected: make(map[uint64]bool), } for _, n := range nodes { pn.recvQueues[n] = make(chan raftpb.Message, 1024) } return pn } func (rn *raftNetwork) nodeNetwork(id uint64) iface { return &nodeNetwork{id: id, raftNetwork: rn} } func (rn *raftNetwork) send(m raftpb.Message) { rn.mu.Lock() to := rn.recvQueues[m.To] if rn.disconnected[m.To] { to = nil } drop := rn.dropmap[conn{m.From, m.To}] dl := rn.delaymap[conn{m.From, m.To}] rn.mu.Unlock() if to == nil { return } if drop != 0 && rn.rand.Float64() < drop { return } // TODO: shall we dl without blocking the send call? if dl.d != 0 && rn.rand.Float64() < dl.rate { rd := rn.rand.Int63n(int64(dl.d)) time.Sleep(time.Duration(rd)) } // use marshal/unmarshal to copy message to avoid data race. b, err := m.Marshal() if err != nil { panic(err) } var cm raftpb.Message err = cm.Unmarshal(b) if err != nil { panic(err) } select { case to <- cm: default: // drop messages when the receiver queue is full. } } func (rn *raftNetwork) recvFrom(from uint64) chan raftpb.Message { rn.mu.Lock() fromc := rn.recvQueues[from] if rn.disconnected[from] { fromc = nil } rn.mu.Unlock() return fromc } func (rn *raftNetwork) drop(from, to uint64, rate float64) { rn.mu.Lock() defer rn.mu.Unlock() rn.dropmap[conn{from, to}] = rate } func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) { rn.mu.Lock() defer rn.mu.Unlock() rn.delaymap[conn{from, to}] = delay{d, rate} } func (rn *raftNetwork) heal() { rn.mu.Lock() defer rn.mu.Unlock() rn.dropmap = make(map[conn]float64) rn.delaymap = make(map[conn]delay) } func (rn *raftNetwork) disconnect(id uint64) { rn.mu.Lock() defer rn.mu.Unlock() rn.disconnected[id] = true } func (rn *raftNetwork) connect(id uint64) { rn.mu.Lock() defer rn.mu.Unlock() rn.disconnected[id] = false } type nodeNetwork struct { id uint64 *raftNetwork } func (nt *nodeNetwork) connect() { nt.raftNetwork.connect(nt.id) } func (nt *nodeNetwork) disconnect() { nt.raftNetwork.disconnect(nt.id) } func (nt *nodeNetwork) send(m raftpb.Message) { nt.raftNetwork.send(m) } func (nt *nodeNetwork) recv() chan raftpb.Message { return nt.recvFrom(nt.id) }