commit
e201f4b824
|
@ -182,7 +182,16 @@ func (mn *multiNode) run() {
|
||||||
select {
|
select {
|
||||||
case gc := <-mn.groupc:
|
case gc := <-mn.groupc:
|
||||||
// TODO(bdarnell): pass applied through gc and into newRaft. Or get rid of it?
|
// TODO(bdarnell): pass applied through gc and into newRaft. Or get rid of it?
|
||||||
r := newRaft(mn.id, nil, mn.election, mn.heartbeat, gc.storage, 0)
|
// TODO(bdarnell): make maxSizePerMsg(InflightMsgs) configurable
|
||||||
|
c := &Config{
|
||||||
|
ID: mn.id,
|
||||||
|
ElectionTick: mn.election,
|
||||||
|
HeartbeatTick: mn.heartbeat,
|
||||||
|
Storage: gc.storage,
|
||||||
|
MaxSizePerMsg: noLimit,
|
||||||
|
MaxInflightMsgs: 256,
|
||||||
|
}
|
||||||
|
r := newRaft(c)
|
||||||
group = &groupState{
|
group = &groupState{
|
||||||
id: gc.id,
|
id: gc.id,
|
||||||
raft: r,
|
raft: r,
|
||||||
|
|
30
raft/node.go
30
raft/node.go
|
@ -144,9 +144,17 @@ type Peer struct {
|
||||||
// the election and heartbeat timeouts in units of ticks.
|
// the election and heartbeat timeouts in units of ticks.
|
||||||
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
|
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
|
||||||
func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage) Node {
|
func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage) Node {
|
||||||
n := newNode()
|
c := &Config{
|
||||||
r := newRaft(id, nil, election, heartbeat, storage, 0)
|
ID: id,
|
||||||
|
Peers: nil,
|
||||||
|
ElectionTick: election,
|
||||||
|
HeartbeatTick: heartbeat,
|
||||||
|
Storage: storage,
|
||||||
|
// TODO(xiangli): make this configurable
|
||||||
|
MaxSizePerMsg: noLimit,
|
||||||
|
MaxInflightMsgs: 256,
|
||||||
|
}
|
||||||
|
r := newRaft(c)
|
||||||
// become the follower at term 1 and apply initial configuration
|
// become the follower at term 1 and apply initial configuration
|
||||||
// entires of term 1
|
// entires of term 1
|
||||||
r.becomeFollower(1, None)
|
r.becomeFollower(1, None)
|
||||||
|
@ -177,6 +185,7 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage
|
||||||
r.addNode(peer.ID)
|
r.addNode(peer.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
n := newNode()
|
||||||
go n.run(r)
|
go n.run(r)
|
||||||
return &n
|
return &n
|
||||||
}
|
}
|
||||||
|
@ -186,9 +195,20 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage
|
||||||
// If the caller has an existing state machine, pass in the last log index that
|
// If the caller has an existing state machine, pass in the last log index that
|
||||||
// has been applied to it; otherwise use zero.
|
// has been applied to it; otherwise use zero.
|
||||||
func RestartNode(id uint64, election, heartbeat int, storage Storage, applied uint64) Node {
|
func RestartNode(id uint64, election, heartbeat int, storage Storage, applied uint64) Node {
|
||||||
n := newNode()
|
c := &Config{
|
||||||
r := newRaft(id, nil, election, heartbeat, storage, applied)
|
ID: id,
|
||||||
|
Peers: nil,
|
||||||
|
ElectionTick: election,
|
||||||
|
HeartbeatTick: heartbeat,
|
||||||
|
Storage: storage,
|
||||||
|
Applied: applied,
|
||||||
|
// TODO(xiangli): make this configurable
|
||||||
|
MaxSizePerMsg: noLimit,
|
||||||
|
MaxInflightMsgs: 256,
|
||||||
|
}
|
||||||
|
r := newRaft(c)
|
||||||
|
|
||||||
|
n := newNode()
|
||||||
go n.run(r)
|
go n.run(r)
|
||||||
return &n
|
return &n
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,7 @@ func BenchmarkOneNode(b *testing.B) {
|
||||||
|
|
||||||
n := newNode()
|
n := newNode()
|
||||||
s := NewMemoryStorage()
|
s := NewMemoryStorage()
|
||||||
r := newRaft(1, []uint64{1}, 10, 1, s, 0)
|
r := newTestRaft(1, []uint64{1}, 10, 1, s)
|
||||||
go n.run(r)
|
go n.run(r)
|
||||||
|
|
||||||
defer n.Stop()
|
defer n.Stop()
|
||||||
|
|
|
@ -114,7 +114,7 @@ func TestNodePropose(t *testing.T) {
|
||||||
|
|
||||||
n := newNode()
|
n := newNode()
|
||||||
s := NewMemoryStorage()
|
s := NewMemoryStorage()
|
||||||
r := newRaft(1, []uint64{1}, 10, 1, s, 0)
|
r := newTestRaft(1, []uint64{1}, 10, 1, s)
|
||||||
go n.run(r)
|
go n.run(r)
|
||||||
n.Campaign(context.TODO())
|
n.Campaign(context.TODO())
|
||||||
for {
|
for {
|
||||||
|
@ -152,7 +152,7 @@ func TestNodeProposeConfig(t *testing.T) {
|
||||||
|
|
||||||
n := newNode()
|
n := newNode()
|
||||||
s := NewMemoryStorage()
|
s := NewMemoryStorage()
|
||||||
r := newRaft(1, []uint64{1}, 10, 1, s, 0)
|
r := newTestRaft(1, []uint64{1}, 10, 1, s)
|
||||||
go n.run(r)
|
go n.run(r)
|
||||||
n.Campaign(context.TODO())
|
n.Campaign(context.TODO())
|
||||||
for {
|
for {
|
||||||
|
@ -190,7 +190,7 @@ func TestNodeProposeConfig(t *testing.T) {
|
||||||
// who is the current leader.
|
// who is the current leader.
|
||||||
func TestBlockProposal(t *testing.T) {
|
func TestBlockProposal(t *testing.T) {
|
||||||
n := newNode()
|
n := newNode()
|
||||||
r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||||
go n.run(r)
|
go n.run(r)
|
||||||
defer n.Stop()
|
defer n.Stop()
|
||||||
|
|
||||||
|
@ -223,7 +223,7 @@ func TestBlockProposal(t *testing.T) {
|
||||||
func TestNodeTick(t *testing.T) {
|
func TestNodeTick(t *testing.T) {
|
||||||
n := newNode()
|
n := newNode()
|
||||||
s := NewMemoryStorage()
|
s := NewMemoryStorage()
|
||||||
r := newRaft(1, []uint64{1}, 10, 1, s, 0)
|
r := newTestRaft(1, []uint64{1}, 10, 1, s)
|
||||||
go n.run(r)
|
go n.run(r)
|
||||||
elapsed := r.elapsed
|
elapsed := r.elapsed
|
||||||
n.Tick()
|
n.Tick()
|
||||||
|
@ -238,7 +238,7 @@ func TestNodeTick(t *testing.T) {
|
||||||
func TestNodeStop(t *testing.T) {
|
func TestNodeStop(t *testing.T) {
|
||||||
n := newNode()
|
n := newNode()
|
||||||
s := NewMemoryStorage()
|
s := NewMemoryStorage()
|
||||||
r := newRaft(1, []uint64{1}, 10, 1, s, 0)
|
r := newTestRaft(1, []uint64{1}, 10, 1, s)
|
||||||
donec := make(chan struct{})
|
donec := make(chan struct{})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
|
97
raft/raft.go
97
raft/raft.go
|
@ -51,6 +51,75 @@ func (st StateType) String() string {
|
||||||
return stmap[uint64(st)]
|
return stmap[uint64(st)]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Config contains the parameters to start a raft.
|
||||||
|
type Config struct {
|
||||||
|
// ID is the identity of the local raft. ID cannot be 0.
|
||||||
|
ID uint64
|
||||||
|
// Peers contains the IDs of all nodes (including self) in
|
||||||
|
// the raft cluster. It should only be set when starting a new
|
||||||
|
// raft cluster.
|
||||||
|
// Restarting raft from previous configuration will panic if
|
||||||
|
// Peers is set.
|
||||||
|
Peers []uint64
|
||||||
|
|
||||||
|
// ElectionTick is the election timeout. If a follower does not
|
||||||
|
// receive any message from the leader of current term during
|
||||||
|
// ElectionTick, it will become candidate and start an election.
|
||||||
|
// ElectionTick must be greater than HeartbeatTick. We suggest
|
||||||
|
// to use ElectionTick = 10 * HeartbeatTick to avoid unnecessary
|
||||||
|
// leader switching.
|
||||||
|
ElectionTick int
|
||||||
|
// HeartbeatTick is the heartbeat interval. A leader sends heartbeat
|
||||||
|
// message to maintain the leadership every heartbeat interval.
|
||||||
|
HeartbeatTick int
|
||||||
|
|
||||||
|
// Storage is the storage for raft. raft generates entires and
|
||||||
|
// states to be stored in storage. raft reads the persisted entires
|
||||||
|
// and states out of Storage when it needs. raft reads out the previous
|
||||||
|
// state and configuration out of storage when restarting.
|
||||||
|
Storage Storage
|
||||||
|
// Applied is the last applied index. It should only be set when restarting
|
||||||
|
// raft. raft will not return entries to the application smaller or equal to Applied.
|
||||||
|
// If Applied is unset when restarting, raft might return previous applied entries.
|
||||||
|
// This is a very application dependent configuration.
|
||||||
|
Applied uint64
|
||||||
|
|
||||||
|
// MaxSizePerMsg limits the max size of each append message. Smaller value lowers
|
||||||
|
// the raft recovery cost(initial probing and message lost during normal operation).
|
||||||
|
// On the other side, it might affect the throughput during normal replication.
|
||||||
|
// Note: math.MaxUint64 for unlimited, 0 for at most one entry per message.
|
||||||
|
MaxSizePerMsg uint64
|
||||||
|
// MaxInflightMsgs limits the max number of in-flight append messages during optimistic
|
||||||
|
// replication phase. The application transportation layer usually has its own sending
|
||||||
|
// buffer over TCP/UDP. Setting MaxInflightMsgs to avoid overflowing that sending buffer.
|
||||||
|
// TODO (xiangli): feedback to application to limit the proposal rate?
|
||||||
|
MaxInflightMsgs int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Config) validate() error {
|
||||||
|
if c.ID == None {
|
||||||
|
return errors.New("cannot use none as id")
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.HeartbeatTick <= 0 {
|
||||||
|
return errors.New("heartbeat tick must be greater than 0")
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.ElectionTick <= c.HeartbeatTick {
|
||||||
|
return errors.New("election tick must be greater than heartbeat tick")
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.Storage == nil {
|
||||||
|
return errors.New("storage cannot be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.MaxInflightMsgs <= 0 {
|
||||||
|
return errors.New("max inflight messages must be greater than 0")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type raft struct {
|
type raft struct {
|
||||||
pb.HardState
|
pb.HardState
|
||||||
|
|
||||||
|
@ -83,16 +152,16 @@ type raft struct {
|
||||||
step stepFunc
|
step stepFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage,
|
func newRaft(c *Config) *raft {
|
||||||
applied uint64) *raft {
|
if err := c.validate(); err != nil {
|
||||||
if id == None {
|
panic(err.Error())
|
||||||
panic("cannot use none id")
|
|
||||||
}
|
}
|
||||||
raftlog := newLog(storage)
|
raftlog := newLog(c.Storage)
|
||||||
hs, cs, err := storage.InitialState()
|
hs, cs, err := c.Storage.InitialState()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err) // TODO(bdarnell)
|
panic(err) // TODO(bdarnell)
|
||||||
}
|
}
|
||||||
|
peers := c.Peers
|
||||||
if len(cs.Nodes) > 0 {
|
if len(cs.Nodes) > 0 {
|
||||||
if len(peers) > 0 {
|
if len(peers) > 0 {
|
||||||
// TODO(bdarnell): the peers argument is always nil except in
|
// TODO(bdarnell): the peers argument is always nil except in
|
||||||
|
@ -103,27 +172,27 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage
|
||||||
peers = cs.Nodes
|
peers = cs.Nodes
|
||||||
}
|
}
|
||||||
r := &raft{
|
r := &raft{
|
||||||
id: id,
|
id: c.ID,
|
||||||
lead: None,
|
lead: None,
|
||||||
raftLog: raftlog,
|
raftLog: raftlog,
|
||||||
// 4MB for now and hard code it
|
// 4MB for now and hard code it
|
||||||
// TODO(xiang): add a config arguement into newRaft after we add
|
// TODO(xiang): add a config arguement into newRaft after we add
|
||||||
// the max inflight message field.
|
// the max inflight message field.
|
||||||
maxMsgSize: 4 * 1024 * 1024,
|
maxMsgSize: c.MaxSizePerMsg,
|
||||||
maxInflight: 256,
|
maxInflight: c.MaxInflightMsgs,
|
||||||
prs: make(map[uint64]*Progress),
|
prs: make(map[uint64]*Progress),
|
||||||
electionTimeout: election,
|
electionTimeout: c.ElectionTick,
|
||||||
heartbeatTimeout: heartbeat,
|
heartbeatTimeout: c.HeartbeatTick,
|
||||||
}
|
}
|
||||||
r.rand = rand.New(rand.NewSource(int64(id)))
|
r.rand = rand.New(rand.NewSource(int64(c.ID)))
|
||||||
for _, p := range peers {
|
for _, p := range peers {
|
||||||
r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
|
r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
|
||||||
}
|
}
|
||||||
if !isHardStateEqual(hs, emptyState) {
|
if !isHardStateEqual(hs, emptyState) {
|
||||||
r.loadState(hs)
|
r.loadState(hs)
|
||||||
}
|
}
|
||||||
if applied > 0 {
|
if c.Applied > 0 {
|
||||||
raftlog.appliedTo(applied)
|
raftlog.appliedTo(c.Applied)
|
||||||
}
|
}
|
||||||
r.becomeFollower(r.Term, None)
|
r.becomeFollower(r.Term, None)
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ import (
|
||||||
// 1. msgApp can fill the sending window until full
|
// 1. msgApp can fill the sending window until full
|
||||||
// 2. when the window is full, no more msgApp can be sent.
|
// 2. when the window is full, no more msgApp can be sent.
|
||||||
func TestMsgAppFlowControlFull(t *testing.T) {
|
func TestMsgAppFlowControlFull(t *testing.T) {
|
||||||
r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ func TestMsgAppFlowControlFull(t *testing.T) {
|
||||||
// 1. vaild msgAppResp.index moves the windows to pass all smaller or equal index.
|
// 1. vaild msgAppResp.index moves the windows to pass all smaller or equal index.
|
||||||
// 2. out-of-dated msgAppResp has no effect on the silding window.
|
// 2. out-of-dated msgAppResp has no effect on the silding window.
|
||||||
func TestMsgAppFlowControlMoveForward(t *testing.T) {
|
func TestMsgAppFlowControlMoveForward(t *testing.T) {
|
||||||
r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
|
|
||||||
|
@ -105,7 +105,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
|
||||||
// TestMsgAppFlowControlRecvHeartbeat ensures a heartbeat response
|
// TestMsgAppFlowControlRecvHeartbeat ensures a heartbeat response
|
||||||
// frees one slot if the window is full.
|
// frees one slot if the window is full.
|
||||||
func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
|
func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
|
||||||
r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
|
|
||||||
|
|
|
@ -52,7 +52,7 @@ func TestLeaderUpdateTermFromMessage(t *testing.T) {
|
||||||
// it immediately reverts to follower state.
|
// it immediately reverts to follower state.
|
||||||
// Reference: section 5.1
|
// Reference: section 5.1
|
||||||
func testUpdateTermFromMessage(t *testing.T, state StateType) {
|
func testUpdateTermFromMessage(t *testing.T, state StateType) {
|
||||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||||
switch state {
|
switch state {
|
||||||
case StateFollower:
|
case StateFollower:
|
||||||
r.becomeFollower(1, 2)
|
r.becomeFollower(1, 2)
|
||||||
|
@ -82,7 +82,7 @@ func TestRejectStaleTermMessage(t *testing.T) {
|
||||||
fakeStep := func(r *raft, m pb.Message) {
|
fakeStep := func(r *raft, m pb.Message) {
|
||||||
called = true
|
called = true
|
||||||
}
|
}
|
||||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||||
r.step = fakeStep
|
r.step = fakeStep
|
||||||
r.loadState(pb.HardState{Term: 2})
|
r.loadState(pb.HardState{Term: 2})
|
||||||
|
|
||||||
|
@ -96,7 +96,7 @@ func TestRejectStaleTermMessage(t *testing.T) {
|
||||||
// TestStartAsFollower tests that when servers start up, they begin as followers.
|
// TestStartAsFollower tests that when servers start up, they begin as followers.
|
||||||
// Reference: section 5.2
|
// Reference: section 5.2
|
||||||
func TestStartAsFollower(t *testing.T) {
|
func TestStartAsFollower(t *testing.T) {
|
||||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||||
if r.state != StateFollower {
|
if r.state != StateFollower {
|
||||||
t.Errorf("state = %s, want %s", r.state, StateFollower)
|
t.Errorf("state = %s, want %s", r.state, StateFollower)
|
||||||
}
|
}
|
||||||
|
@ -109,7 +109,7 @@ func TestStartAsFollower(t *testing.T) {
|
||||||
func TestLeaderBcastBeat(t *testing.T) {
|
func TestLeaderBcastBeat(t *testing.T) {
|
||||||
// heartbeat interval
|
// heartbeat interval
|
||||||
hi := 1
|
hi := 1
|
||||||
r := newRaft(1, []uint64{1, 2, 3}, 10, hi, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2, 3}, 10, hi, NewMemoryStorage())
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
|
@ -151,7 +151,7 @@ func TestCandidateStartNewElection(t *testing.T) {
|
||||||
func testNonleaderStartElection(t *testing.T, state StateType) {
|
func testNonleaderStartElection(t *testing.T, state StateType) {
|
||||||
// election timeout
|
// election timeout
|
||||||
et := 10
|
et := 10
|
||||||
r := newRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage())
|
||||||
switch state {
|
switch state {
|
||||||
case StateFollower:
|
case StateFollower:
|
||||||
r.becomeFollower(1, 2)
|
r.becomeFollower(1, 2)
|
||||||
|
@ -215,7 +215,7 @@ func TestLeaderElectionInOneRoundRPC(t *testing.T) {
|
||||||
{5, map[uint64]bool{}, StateCandidate},
|
{5, map[uint64]bool{}, StateCandidate},
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
r := newRaft(1, idsBySize(tt.size), 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, idsBySize(tt.size), 10, 1, NewMemoryStorage())
|
||||||
|
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
|
||||||
for id, vote := range tt.votes {
|
for id, vote := range tt.votes {
|
||||||
|
@ -248,7 +248,7 @@ func TestFollowerVote(t *testing.T) {
|
||||||
{2, 1, true},
|
{2, 1, true},
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||||
r.loadState(pb.HardState{Term: 1, Vote: tt.vote})
|
r.loadState(pb.HardState{Term: 1, Vote: tt.vote})
|
||||||
|
|
||||||
r.Step(pb.Message{From: tt.nvote, To: 1, Term: 1, Type: pb.MsgVote})
|
r.Step(pb.Message{From: tt.nvote, To: 1, Term: 1, Type: pb.MsgVote})
|
||||||
|
@ -274,7 +274,7 @@ func TestCandidateFallback(t *testing.T) {
|
||||||
{From: 2, To: 1, Term: 2, Type: pb.MsgApp},
|
{From: 2, To: 1, Term: 2, Type: pb.MsgApp},
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
|
||||||
if r.state != StateCandidate {
|
if r.state != StateCandidate {
|
||||||
t.Fatalf("unexpected state = %s, want %s", r.state, StateCandidate)
|
t.Fatalf("unexpected state = %s, want %s", r.state, StateCandidate)
|
||||||
|
@ -307,7 +307,7 @@ func TestCandidateElectionTimeoutRandomized(t *testing.T) {
|
||||||
// Reference: section 5.2
|
// Reference: section 5.2
|
||||||
func testNonleaderElectionTimeoutRandomized(t *testing.T, state StateType) {
|
func testNonleaderElectionTimeoutRandomized(t *testing.T, state StateType) {
|
||||||
et := 10
|
et := 10
|
||||||
r := newRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage())
|
||||||
timeouts := make(map[int]bool)
|
timeouts := make(map[int]bool)
|
||||||
for round := 0; round < 50*et; round++ {
|
for round := 0; round < 50*et; round++ {
|
||||||
switch state {
|
switch state {
|
||||||
|
@ -353,7 +353,7 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) {
|
||||||
rs := make([]*raft, size)
|
rs := make([]*raft, size)
|
||||||
ids := idsBySize(size)
|
ids := idsBySize(size)
|
||||||
for k := range rs {
|
for k := range rs {
|
||||||
rs[k] = newRaft(ids[k], ids, et, 1, NewMemoryStorage(), 0)
|
rs[k] = newTestRaft(ids[k], ids, et, 1, NewMemoryStorage())
|
||||||
}
|
}
|
||||||
conflicts := 0
|
conflicts := 0
|
||||||
for round := 0; round < 1000; round++ {
|
for round := 0; round < 1000; round++ {
|
||||||
|
@ -396,7 +396,7 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) {
|
||||||
// Reference: section 5.3
|
// Reference: section 5.3
|
||||||
func TestLeaderStartReplication(t *testing.T) {
|
func TestLeaderStartReplication(t *testing.T) {
|
||||||
s := NewMemoryStorage()
|
s := NewMemoryStorage()
|
||||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, s, 0)
|
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s)
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
commitNoopEntry(r, s)
|
commitNoopEntry(r, s)
|
||||||
|
@ -435,7 +435,7 @@ func TestLeaderStartReplication(t *testing.T) {
|
||||||
// Reference: section 5.3
|
// Reference: section 5.3
|
||||||
func TestLeaderCommitEntry(t *testing.T) {
|
func TestLeaderCommitEntry(t *testing.T) {
|
||||||
s := NewMemoryStorage()
|
s := NewMemoryStorage()
|
||||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, s, 0)
|
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s)
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
commitNoopEntry(r, s)
|
commitNoopEntry(r, s)
|
||||||
|
@ -489,7 +489,7 @@ func TestLeaderAcknowledgeCommit(t *testing.T) {
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
s := NewMemoryStorage()
|
s := NewMemoryStorage()
|
||||||
r := newRaft(1, idsBySize(tt.size), 10, 1, s, 0)
|
r := newTestRaft(1, idsBySize(tt.size), 10, 1, s)
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
commitNoopEntry(r, s)
|
commitNoopEntry(r, s)
|
||||||
|
@ -523,7 +523,7 @@ func TestLeaderCommitPrecedingEntries(t *testing.T) {
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
storage.Append(tt)
|
storage.Append(tt)
|
||||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0)
|
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
|
||||||
r.loadState(pb.HardState{Term: 2})
|
r.loadState(pb.HardState{Term: 2})
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
|
@ -578,7 +578,7 @@ func TestFollowerCommitEntry(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||||
r.becomeFollower(1, 2)
|
r.becomeFollower(1, 2)
|
||||||
|
|
||||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 1, Entries: tt.ents, Commit: tt.commit})
|
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 1, Entries: tt.ents, Commit: tt.commit})
|
||||||
|
@ -621,7 +621,7 @@ func TestFollowerCheckMsgApp(t *testing.T) {
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
storage.Append(ents)
|
storage.Append(ents)
|
||||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0)
|
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
|
||||||
r.loadState(pb.HardState{Commit: 1})
|
r.loadState(pb.HardState{Commit: 1})
|
||||||
r.becomeFollower(2, 2)
|
r.becomeFollower(2, 2)
|
||||||
|
|
||||||
|
@ -677,7 +677,7 @@ func TestFollowerAppendEntries(t *testing.T) {
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
storage.Append([]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}})
|
storage.Append([]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}})
|
||||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0)
|
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
|
||||||
r.becomeFollower(2, 2)
|
r.becomeFollower(2, 2)
|
||||||
|
|
||||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents})
|
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents})
|
||||||
|
@ -746,11 +746,11 @@ func TestLeaderSyncFollowerLog(t *testing.T) {
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
leadStorage := NewMemoryStorage()
|
leadStorage := NewMemoryStorage()
|
||||||
leadStorage.Append(ents)
|
leadStorage.Append(ents)
|
||||||
lead := newRaft(1, []uint64{1, 2, 3}, 10, 1, leadStorage, 0)
|
lead := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, leadStorage)
|
||||||
lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term})
|
lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term})
|
||||||
followerStorage := NewMemoryStorage()
|
followerStorage := NewMemoryStorage()
|
||||||
followerStorage.Append(tt)
|
followerStorage.Append(tt)
|
||||||
follower := newRaft(2, []uint64{1, 2, 3}, 10, 1, followerStorage, 0)
|
follower := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, followerStorage)
|
||||||
follower.loadState(pb.HardState{Term: term - 1})
|
follower.loadState(pb.HardState{Term: term - 1})
|
||||||
// It is necessary to have a three-node cluster.
|
// It is necessary to have a three-node cluster.
|
||||||
// The second may have more up-to-date log than the first one, so the
|
// The second may have more up-to-date log than the first one, so the
|
||||||
|
@ -779,7 +779,7 @@ func TestVoteRequest(t *testing.T) {
|
||||||
{[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}, 3},
|
{[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}, 3},
|
||||||
}
|
}
|
||||||
for j, tt := range tests {
|
for j, tt := range tests {
|
||||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||||
r.Step(pb.Message{
|
r.Step(pb.Message{
|
||||||
From: 2, To: 1, Type: pb.MsgApp, Term: tt.wterm - 1, LogTerm: 0, Index: 0, Entries: tt.ents,
|
From: 2, To: 1, Type: pb.MsgApp, Term: tt.wterm - 1, LogTerm: 0, Index: 0, Entries: tt.ents,
|
||||||
})
|
})
|
||||||
|
@ -842,7 +842,7 @@ func TestVoter(t *testing.T) {
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
storage.Append(tt.ents)
|
storage.Append(tt.ents)
|
||||||
r := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
|
r := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
|
||||||
|
|
||||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVote, Term: 3, LogTerm: tt.logterm, Index: tt.index})
|
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVote, Term: 3, LogTerm: tt.logterm, Index: tt.index})
|
||||||
|
|
||||||
|
@ -878,7 +878,7 @@ func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) {
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
storage.Append(ents)
|
storage.Append(ents)
|
||||||
r := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
|
r := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
|
||||||
r.loadState(pb.HardState{Term: 2})
|
r.loadState(pb.HardState{Term: 2})
|
||||||
// become leader at term 3
|
// become leader at term 3
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
|
|
|
@ -32,7 +32,7 @@ var (
|
||||||
|
|
||||||
func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
|
func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
sm := newRaft(1, []uint64{1}, 10, 1, storage, 0)
|
sm := newTestRaft(1, []uint64{1}, 10, 1, storage)
|
||||||
sm.restore(testingSnap)
|
sm.restore(testingSnap)
|
||||||
|
|
||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
|
@ -50,7 +50,7 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
|
||||||
|
|
||||||
func TestPendingSnapshotPauseReplication(t *testing.T) {
|
func TestPendingSnapshotPauseReplication(t *testing.T) {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
|
sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
|
||||||
sm.restore(testingSnap)
|
sm.restore(testingSnap)
|
||||||
|
|
||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
|
@ -67,7 +67,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) {
|
||||||
|
|
||||||
func TestSnapshotFailure(t *testing.T) {
|
func TestSnapshotFailure(t *testing.T) {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
|
sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
|
||||||
sm.restore(testingSnap)
|
sm.restore(testingSnap)
|
||||||
|
|
||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
|
@ -90,7 +90,7 @@ func TestSnapshotFailure(t *testing.T) {
|
||||||
|
|
||||||
func TestSnapshotSucceed(t *testing.T) {
|
func TestSnapshotSucceed(t *testing.T) {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
|
sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
|
||||||
sm.restore(testingSnap)
|
sm.restore(testingSnap)
|
||||||
|
|
||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
|
@ -113,7 +113,7 @@ func TestSnapshotSucceed(t *testing.T) {
|
||||||
|
|
||||||
func TestSnapshotAbort(t *testing.T) {
|
func TestSnapshotAbort(t *testing.T) {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
|
sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
|
||||||
sm.restore(testingSnap)
|
sm.restore(testingSnap)
|
||||||
|
|
||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
|
|
|
@ -262,7 +262,7 @@ func TestProgressResume(t *testing.T) {
|
||||||
|
|
||||||
// TestProgressResumeByHeartbeat ensures raft.heartbeat reset progress.paused by heartbeat.
|
// TestProgressResumeByHeartbeat ensures raft.heartbeat reset progress.paused by heartbeat.
|
||||||
func TestProgressResumeByHeartbeat(t *testing.T) {
|
func TestProgressResumeByHeartbeat(t *testing.T) {
|
||||||
r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
r.prs[2].Paused = true
|
r.prs[2].Paused = true
|
||||||
|
@ -274,7 +274,7 @@ func TestProgressResumeByHeartbeat(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProgressPaused(t *testing.T) {
|
func TestProgressPaused(t *testing.T) {
|
||||||
r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||||
|
@ -466,9 +466,9 @@ func TestCommitWithoutNewTermEntry(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDuelingCandidates(t *testing.T) {
|
func TestDuelingCandidates(t *testing.T) {
|
||||||
a := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||||
b := newRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||||
c := newRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||||
|
|
||||||
nt := newNetwork(a, b, c)
|
nt := newNetwork(a, b, c)
|
||||||
nt.cut(1, 3)
|
nt.cut(1, 3)
|
||||||
|
@ -736,7 +736,7 @@ func TestCommit(t *testing.T) {
|
||||||
storage.Append(tt.logs)
|
storage.Append(tt.logs)
|
||||||
storage.hardState = pb.HardState{Term: tt.smTerm}
|
storage.hardState = pb.HardState{Term: tt.smTerm}
|
||||||
|
|
||||||
sm := newRaft(1, []uint64{1}, 5, 1, storage, 0)
|
sm := newTestRaft(1, []uint64{1}, 5, 1, storage)
|
||||||
for j := 0; j < len(tt.matches); j++ {
|
for j := 0; j < len(tt.matches); j++ {
|
||||||
sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1)
|
sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1)
|
||||||
}
|
}
|
||||||
|
@ -761,7 +761,7 @@ func TestIsElectionTimeout(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
|
sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||||
sm.elapsed = tt.elapse
|
sm.elapsed = tt.elapse
|
||||||
c := 0
|
c := 0
|
||||||
for j := 0; j < 10000; j++ {
|
for j := 0; j < 10000; j++ {
|
||||||
|
@ -786,7 +786,7 @@ func TestStepIgnoreOldTermMsg(t *testing.T) {
|
||||||
fakeStep := func(r *raft, m pb.Message) {
|
fakeStep := func(r *raft, m pb.Message) {
|
||||||
called = true
|
called = true
|
||||||
}
|
}
|
||||||
sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
|
sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||||
sm.step = fakeStep
|
sm.step = fakeStep
|
||||||
sm.Term = 2
|
sm.Term = 2
|
||||||
sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
|
sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
|
||||||
|
@ -828,7 +828,7 @@ func TestHandleMsgApp(t *testing.T) {
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}})
|
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}})
|
||||||
sm := newRaft(1, []uint64{1}, 10, 1, storage, 0)
|
sm := newTestRaft(1, []uint64{1}, 10, 1, storage)
|
||||||
sm.becomeFollower(2, None)
|
sm.becomeFollower(2, None)
|
||||||
|
|
||||||
sm.handleAppendEntries(tt.m)
|
sm.handleAppendEntries(tt.m)
|
||||||
|
@ -862,7 +862,7 @@ func TestHandleHeartbeat(t *testing.T) {
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
|
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
|
||||||
sm := newRaft(1, []uint64{1, 2}, 5, 1, storage, 0)
|
sm := newTestRaft(1, []uint64{1, 2}, 5, 1, storage)
|
||||||
sm.becomeFollower(2, 2)
|
sm.becomeFollower(2, 2)
|
||||||
sm.raftLog.commitTo(commit)
|
sm.raftLog.commitTo(commit)
|
||||||
sm.handleHeartbeat(tt.m)
|
sm.handleHeartbeat(tt.m)
|
||||||
|
@ -883,7 +883,7 @@ func TestHandleHeartbeat(t *testing.T) {
|
||||||
func TestHandleHeartbeatResp(t *testing.T) {
|
func TestHandleHeartbeatResp(t *testing.T) {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
|
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
|
||||||
sm := newRaft(1, []uint64{1, 2}, 5, 1, storage, 0)
|
sm := newTestRaft(1, []uint64{1, 2}, 5, 1, storage)
|
||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
sm.becomeLeader()
|
sm.becomeLeader()
|
||||||
sm.raftLog.commitTo(sm.raftLog.lastIndex())
|
sm.raftLog.commitTo(sm.raftLog.lastIndex())
|
||||||
|
@ -942,7 +942,7 @@ func TestHandleHeartbeatResp(t *testing.T) {
|
||||||
// TestMsgAppRespWaitReset verifies the waitReset behavior of a leader
|
// TestMsgAppRespWaitReset verifies the waitReset behavior of a leader
|
||||||
// MsgAppResp.
|
// MsgAppResp.
|
||||||
func TestMsgAppRespWaitReset(t *testing.T) {
|
func TestMsgAppRespWaitReset(t *testing.T) {
|
||||||
sm := newRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage(), 0)
|
sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage())
|
||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
sm.becomeLeader()
|
sm.becomeLeader()
|
||||||
|
|
||||||
|
@ -1036,7 +1036,7 @@ func TestRecvMsgVote(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
|
sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||||
sm.state = tt.state
|
sm.state = tt.state
|
||||||
switch tt.state {
|
switch tt.state {
|
||||||
case StateFollower:
|
case StateFollower:
|
||||||
|
@ -1096,7 +1096,7 @@ func TestStateTransition(t *testing.T) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
|
sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||||
sm.state = tt.from
|
sm.state = tt.from
|
||||||
|
|
||||||
switch tt.to {
|
switch tt.to {
|
||||||
|
@ -1135,7 +1135,7 @@ func TestAllServerStepdown(t *testing.T) {
|
||||||
tterm := uint64(3)
|
tterm := uint64(3)
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||||
switch tt.state {
|
switch tt.state {
|
||||||
case StateFollower:
|
case StateFollower:
|
||||||
sm.becomeFollower(1, None)
|
sm.becomeFollower(1, None)
|
||||||
|
@ -1194,7 +1194,7 @@ func TestLeaderAppResp(t *testing.T) {
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
// sm term is 1 after it becomes the leader.
|
// sm term is 1 after it becomes the leader.
|
||||||
// thus the last log term must be 1 to be committed.
|
// thus the last log term must be 1 to be committed.
|
||||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||||
sm.raftLog = &raftLog{
|
sm.raftLog = &raftLog{
|
||||||
storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}},
|
storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}},
|
||||||
unstable: unstable{offset: 3},
|
unstable: unstable{offset: 3},
|
||||||
|
@ -1242,7 +1242,7 @@ func TestBcastBeat(t *testing.T) {
|
||||||
}
|
}
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
storage.ApplySnapshot(s)
|
storage.ApplySnapshot(s)
|
||||||
sm := newRaft(1, nil, 10, 1, storage, 0)
|
sm := newTestRaft(1, nil, 10, 1, storage)
|
||||||
sm.Term = 1
|
sm.Term = 1
|
||||||
|
|
||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
|
@ -1301,7 +1301,7 @@ func TestRecvMsgBeat(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||||
sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}}
|
sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}}
|
||||||
sm.Term = 1
|
sm.Term = 1
|
||||||
sm.state = tt.state
|
sm.state = tt.state
|
||||||
|
@ -1344,7 +1344,7 @@ func TestLeaderIncreaseNext(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
sm := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
sm := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||||
sm.raftLog.append(previousEnts...)
|
sm.raftLog.append(previousEnts...)
|
||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
sm.becomeLeader()
|
sm.becomeLeader()
|
||||||
|
@ -1360,7 +1360,7 @@ func TestLeaderIncreaseNext(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSendAppendForProgressProbe(t *testing.T) {
|
func TestSendAppendForProgressProbe(t *testing.T) {
|
||||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
r.readMessages()
|
r.readMessages()
|
||||||
|
@ -1406,7 +1406,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSendAppendForProgressReplicate(t *testing.T) {
|
func TestSendAppendForProgressReplicate(t *testing.T) {
|
||||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
r.readMessages()
|
r.readMessages()
|
||||||
|
@ -1423,7 +1423,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSendAppendForProgressSnapshot(t *testing.T) {
|
func TestSendAppendForProgressSnapshot(t *testing.T) {
|
||||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
r.readMessages()
|
r.readMessages()
|
||||||
|
@ -1443,7 +1443,7 @@ func TestRecvMsgUnreachable(t *testing.T) {
|
||||||
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
|
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
|
||||||
s := NewMemoryStorage()
|
s := NewMemoryStorage()
|
||||||
s.Append(previousEnts)
|
s.Append(previousEnts)
|
||||||
r := newRaft(1, []uint64{1, 2}, 10, 1, s, 0)
|
r := newTestRaft(1, []uint64{1, 2}, 10, 1, s)
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
r.readMessages()
|
r.readMessages()
|
||||||
|
@ -1472,7 +1472,7 @@ func TestRestore(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
|
sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
|
||||||
if ok := sm.restore(s); !ok {
|
if ok := sm.restore(s); !ok {
|
||||||
t.Fatal("restore fail, want succeed")
|
t.Fatal("restore fail, want succeed")
|
||||||
}
|
}
|
||||||
|
@ -1497,7 +1497,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) {
|
||||||
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
|
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
|
||||||
commit := uint64(1)
|
commit := uint64(1)
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
|
sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
|
||||||
sm.raftLog.append(previousEnts...)
|
sm.raftLog.append(previousEnts...)
|
||||||
sm.raftLog.commitTo(commit)
|
sm.raftLog.commitTo(commit)
|
||||||
|
|
||||||
|
@ -1538,7 +1538,7 @@ func TestProvideSnap(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
sm := newRaft(1, []uint64{1}, 10, 1, storage, 0)
|
sm := newTestRaft(1, []uint64{1}, 10, 1, storage)
|
||||||
sm.restore(s)
|
sm.restore(s)
|
||||||
|
|
||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
|
@ -1569,7 +1569,7 @@ func TestRestoreFromSnapMsg(t *testing.T) {
|
||||||
}
|
}
|
||||||
m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
|
m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
|
||||||
|
|
||||||
sm := newRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
sm := newTestRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||||
sm.Step(m)
|
sm.Step(m)
|
||||||
|
|
||||||
// TODO(bdarnell): what should this test?
|
// TODO(bdarnell): what should this test?
|
||||||
|
@ -1604,7 +1604,7 @@ func TestSlowNodeRestore(t *testing.T) {
|
||||||
// it appends the entry to log and sets pendingConf to be true.
|
// it appends the entry to log and sets pendingConf to be true.
|
||||||
func TestStepConfig(t *testing.T) {
|
func TestStepConfig(t *testing.T) {
|
||||||
// a raft that cannot make progress
|
// a raft that cannot make progress
|
||||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
index := r.raftLog.lastIndex()
|
index := r.raftLog.lastIndex()
|
||||||
|
@ -1622,7 +1622,7 @@ func TestStepConfig(t *testing.T) {
|
||||||
// the proposal to noop and keep its original state.
|
// the proposal to noop and keep its original state.
|
||||||
func TestStepIgnoreConfig(t *testing.T) {
|
func TestStepIgnoreConfig(t *testing.T) {
|
||||||
// a raft that cannot make progress
|
// a raft that cannot make progress
|
||||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
|
||||||
|
@ -1649,7 +1649,7 @@ func TestRecoverPendingConfig(t *testing.T) {
|
||||||
{pb.EntryConfChange, true},
|
{pb.EntryConfChange, true},
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||||
r.appendEntry(pb.Entry{Type: tt.entType})
|
r.appendEntry(pb.Entry{Type: tt.entType})
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
|
@ -1668,7 +1668,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
|
||||||
t.Errorf("expect panic, but nothing happens")
|
t.Errorf("expect panic, but nothing happens")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||||
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
|
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
|
||||||
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
|
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
|
@ -1678,7 +1678,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
|
||||||
|
|
||||||
// TestAddNode tests that addNode could update pendingConf and nodes correctly.
|
// TestAddNode tests that addNode could update pendingConf and nodes correctly.
|
||||||
func TestAddNode(t *testing.T) {
|
func TestAddNode(t *testing.T) {
|
||||||
r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||||
r.pendingConf = true
|
r.pendingConf = true
|
||||||
r.addNode(2)
|
r.addNode(2)
|
||||||
if r.pendingConf != false {
|
if r.pendingConf != false {
|
||||||
|
@ -1694,7 +1694,7 @@ func TestAddNode(t *testing.T) {
|
||||||
// TestRemoveNode tests that removeNode could update pendingConf, nodes and
|
// TestRemoveNode tests that removeNode could update pendingConf, nodes and
|
||||||
// and removed list correctly.
|
// and removed list correctly.
|
||||||
func TestRemoveNode(t *testing.T) {
|
func TestRemoveNode(t *testing.T) {
|
||||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||||
r.pendingConf = true
|
r.pendingConf = true
|
||||||
r.removeNode(2)
|
r.removeNode(2)
|
||||||
if r.pendingConf != false {
|
if r.pendingConf != false {
|
||||||
|
@ -1718,7 +1718,7 @@ func TestPromotable(t *testing.T) {
|
||||||
{[]uint64{2, 3}, false},
|
{[]uint64{2, 3}, false},
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
r := newRaft(id, tt.peers, 5, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(id, tt.peers, 5, 1, NewMemoryStorage())
|
||||||
if g := r.promotable(); g != tt.wp {
|
if g := r.promotable(); g != tt.wp {
|
||||||
t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
|
t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
|
||||||
}
|
}
|
||||||
|
@ -1740,7 +1740,7 @@ func TestRaftNodes(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
r := newRaft(1, tt.ids, 10, 1, NewMemoryStorage(), 0)
|
r := newTestRaft(1, tt.ids, 10, 1, NewMemoryStorage())
|
||||||
if !reflect.DeepEqual(r.nodes(), tt.wids) {
|
if !reflect.DeepEqual(r.nodes(), tt.wids) {
|
||||||
t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids)
|
t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids)
|
||||||
}
|
}
|
||||||
|
@ -1752,7 +1752,7 @@ func ents(terms ...uint64) *raft {
|
||||||
for i, term := range terms {
|
for i, term := range terms {
|
||||||
storage.Append([]pb.Entry{{Index: uint64(i + 1), Term: term}})
|
storage.Append([]pb.Entry{{Index: uint64(i + 1), Term: term}})
|
||||||
}
|
}
|
||||||
sm := newRaft(1, []uint64{}, 5, 1, storage, 0)
|
sm := newTestRaft(1, []uint64{}, 5, 1, storage)
|
||||||
sm.reset(0)
|
sm.reset(0)
|
||||||
return sm
|
return sm
|
||||||
}
|
}
|
||||||
|
@ -1780,7 +1780,7 @@ func newNetwork(peers ...Interface) *network {
|
||||||
switch v := p.(type) {
|
switch v := p.(type) {
|
||||||
case nil:
|
case nil:
|
||||||
nstorage[id] = NewMemoryStorage()
|
nstorage[id] = NewMemoryStorage()
|
||||||
sm := newRaft(id, peerAddrs, 10, 1, nstorage[id], 0)
|
sm := newTestRaft(id, peerAddrs, 10, 1, nstorage[id])
|
||||||
npeers[id] = sm
|
npeers[id] = sm
|
||||||
case *raft:
|
case *raft:
|
||||||
v.id = id
|
v.id = id
|
||||||
|
@ -1880,3 +1880,17 @@ func idsBySize(size int) []uint64 {
|
||||||
}
|
}
|
||||||
return ids
|
return ids
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft {
|
||||||
|
c := &Config{
|
||||||
|
ID: id,
|
||||||
|
Peers: peers,
|
||||||
|
ElectionTick: election,
|
||||||
|
HeartbeatTick: heartbeat,
|
||||||
|
Storage: storage,
|
||||||
|
MaxSizePerMsg: noLimit,
|
||||||
|
MaxInflightMsgs: 256,
|
||||||
|
}
|
||||||
|
|
||||||
|
return newRaft(c)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue