diff --git a/raft/multinode.go b/raft/multinode.go index 27773a2ae..b4da1992d 100644 --- a/raft/multinode.go +++ b/raft/multinode.go @@ -182,7 +182,16 @@ func (mn *multiNode) run() { select { case gc := <-mn.groupc: // TODO(bdarnell): pass applied through gc and into newRaft. Or get rid of it? - r := newRaft(mn.id, nil, mn.election, mn.heartbeat, gc.storage, 0) + // TODO(bdarnell): make maxSizePerMsg(InflightMsgs) configurable + c := &Config{ + ID: mn.id, + ElectionTick: mn.election, + HeartbeatTick: mn.heartbeat, + Storage: gc.storage, + MaxSizePerMsg: noLimit, + MaxInflightMsgs: 256, + } + r := newRaft(c) group = &groupState{ id: gc.id, raft: r, diff --git a/raft/node.go b/raft/node.go index c9761577f..a32dbb401 100644 --- a/raft/node.go +++ b/raft/node.go @@ -144,9 +144,17 @@ type Peer struct { // the election and heartbeat timeouts in units of ticks. // It appends a ConfChangeAddNode entry for each given peer to the initial log. func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage) Node { - n := newNode() - r := newRaft(id, nil, election, heartbeat, storage, 0) - + c := &Config{ + ID: id, + Peers: nil, + ElectionTick: election, + HeartbeatTick: heartbeat, + Storage: storage, + // TODO(xiangli): make this configurable + MaxSizePerMsg: noLimit, + MaxInflightMsgs: 256, + } + r := newRaft(c) // become the follower at term 1 and apply initial configuration // entires of term 1 r.becomeFollower(1, None) @@ -177,6 +185,7 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage r.addNode(peer.ID) } + n := newNode() go n.run(r) return &n } @@ -186,9 +195,20 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage // If the caller has an existing state machine, pass in the last log index that // has been applied to it; otherwise use zero. func RestartNode(id uint64, election, heartbeat int, storage Storage, applied uint64) Node { - n := newNode() - r := newRaft(id, nil, election, heartbeat, storage, applied) + c := &Config{ + ID: id, + Peers: nil, + ElectionTick: election, + HeartbeatTick: heartbeat, + Storage: storage, + Applied: applied, + // TODO(xiangli): make this configurable + MaxSizePerMsg: noLimit, + MaxInflightMsgs: 256, + } + r := newRaft(c) + n := newNode() go n.run(r) return &n } diff --git a/raft/node_bench_test.go b/raft/node_bench_test.go index 21af86494..9175cb834 100644 --- a/raft/node_bench_test.go +++ b/raft/node_bench_test.go @@ -26,7 +26,7 @@ func BenchmarkOneNode(b *testing.B) { n := newNode() s := NewMemoryStorage() - r := newRaft(1, []uint64{1}, 10, 1, s, 0) + r := newTestRaft(1, []uint64{1}, 10, 1, s) go n.run(r) defer n.Stop() diff --git a/raft/node_test.go b/raft/node_test.go index b4a7440f3..7e610e8b9 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -114,7 +114,7 @@ func TestNodePropose(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newRaft(1, []uint64{1}, 10, 1, s, 0) + r := newTestRaft(1, []uint64{1}, 10, 1, s) go n.run(r) n.Campaign(context.TODO()) for { @@ -152,7 +152,7 @@ func TestNodeProposeConfig(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newRaft(1, []uint64{1}, 10, 1, s, 0) + r := newTestRaft(1, []uint64{1}, 10, 1, s) go n.run(r) n.Campaign(context.TODO()) for { @@ -190,7 +190,7 @@ func TestNodeProposeConfig(t *testing.T) { // who is the current leader. func TestBlockProposal(t *testing.T) { n := newNode() - r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) go n.run(r) defer n.Stop() @@ -223,7 +223,7 @@ func TestBlockProposal(t *testing.T) { func TestNodeTick(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newRaft(1, []uint64{1}, 10, 1, s, 0) + r := newTestRaft(1, []uint64{1}, 10, 1, s) go n.run(r) elapsed := r.elapsed n.Tick() @@ -238,7 +238,7 @@ func TestNodeTick(t *testing.T) { func TestNodeStop(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newRaft(1, []uint64{1}, 10, 1, s, 0) + r := newTestRaft(1, []uint64{1}, 10, 1, s) donec := make(chan struct{}) go func() { diff --git a/raft/raft.go b/raft/raft.go index 65fe5277c..6b41b4510 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -51,6 +51,75 @@ func (st StateType) String() string { return stmap[uint64(st)] } +// Config contains the parameters to start a raft. +type Config struct { + // ID is the identity of the local raft. ID cannot be 0. + ID uint64 + // Peers contains the IDs of all nodes (including self) in + // the raft cluster. It should only be set when starting a new + // raft cluster. + // Restarting raft from previous configuration will panic if + // Peers is set. + Peers []uint64 + + // ElectionTick is the election timeout. If a follower does not + // receive any message from the leader of current term during + // ElectionTick, it will become candidate and start an election. + // ElectionTick must be greater than HeartbeatTick. We suggest + // to use ElectionTick = 10 * HeartbeatTick to avoid unnecessary + // leader switching. + ElectionTick int + // HeartbeatTick is the heartbeat interval. A leader sends heartbeat + // message to maintain the leadership every heartbeat interval. + HeartbeatTick int + + // Storage is the storage for raft. raft generates entires and + // states to be stored in storage. raft reads the persisted entires + // and states out of Storage when it needs. raft reads out the previous + // state and configuration out of storage when restarting. + Storage Storage + // Applied is the last applied index. It should only be set when restarting + // raft. raft will not return entries to the application smaller or equal to Applied. + // If Applied is unset when restarting, raft might return previous applied entries. + // This is a very application dependent configuration. + Applied uint64 + + // MaxSizePerMsg limits the max size of each append message. Smaller value lowers + // the raft recovery cost(initial probing and message lost during normal operation). + // On the other side, it might affect the throughput during normal replication. + // Note: math.MaxUint64 for unlimited, 0 for at most one entry per message. + MaxSizePerMsg uint64 + // MaxInflightMsgs limits the max number of in-flight append messages during optimistic + // replication phase. The application transportation layer usually has its own sending + // buffer over TCP/UDP. Setting MaxInflightMsgs to avoid overflowing that sending buffer. + // TODO (xiangli): feedback to application to limit the proposal rate? + MaxInflightMsgs int +} + +func (c *Config) validate() error { + if c.ID == None { + return errors.New("cannot use none as id") + } + + if c.HeartbeatTick <= 0 { + return errors.New("heartbeat tick must be greater than 0") + } + + if c.ElectionTick <= c.HeartbeatTick { + return errors.New("election tick must be greater than heartbeat tick") + } + + if c.Storage == nil { + return errors.New("storage cannot be nil") + } + + if c.MaxInflightMsgs <= 0 { + return errors.New("max inflight messages must be greater than 0") + } + + return nil +} + type raft struct { pb.HardState @@ -83,16 +152,16 @@ type raft struct { step stepFunc } -func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage, - applied uint64) *raft { - if id == None { - panic("cannot use none id") +func newRaft(c *Config) *raft { + if err := c.validate(); err != nil { + panic(err.Error()) } - raftlog := newLog(storage) - hs, cs, err := storage.InitialState() + raftlog := newLog(c.Storage) + hs, cs, err := c.Storage.InitialState() if err != nil { panic(err) // TODO(bdarnell) } + peers := c.Peers if len(cs.Nodes) > 0 { if len(peers) > 0 { // TODO(bdarnell): the peers argument is always nil except in @@ -103,27 +172,27 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage peers = cs.Nodes } r := &raft{ - id: id, + id: c.ID, lead: None, raftLog: raftlog, // 4MB for now and hard code it // TODO(xiang): add a config arguement into newRaft after we add // the max inflight message field. - maxMsgSize: 4 * 1024 * 1024, - maxInflight: 256, + maxMsgSize: c.MaxSizePerMsg, + maxInflight: c.MaxInflightMsgs, prs: make(map[uint64]*Progress), - electionTimeout: election, - heartbeatTimeout: heartbeat, + electionTimeout: c.ElectionTick, + heartbeatTimeout: c.HeartbeatTick, } - r.rand = rand.New(rand.NewSource(int64(id))) + r.rand = rand.New(rand.NewSource(int64(c.ID))) for _, p := range peers { r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)} } if !isHardStateEqual(hs, emptyState) { r.loadState(hs) } - if applied > 0 { - raftlog.appliedTo(applied) + if c.Applied > 0 { + raftlog.appliedTo(c.Applied) } r.becomeFollower(r.Term, None) diff --git a/raft/raft_flow_control_test.go b/raft/raft_flow_control_test.go index 3c9a60164..6be4c1635 100644 --- a/raft/raft_flow_control_test.go +++ b/raft/raft_flow_control_test.go @@ -24,7 +24,7 @@ import ( // 1. msgApp can fill the sending window until full // 2. when the window is full, no more msgApp can be sent. func TestMsgAppFlowControlFull(t *testing.T) { - r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() @@ -60,7 +60,7 @@ func TestMsgAppFlowControlFull(t *testing.T) { // 1. vaild msgAppResp.index moves the windows to pass all smaller or equal index. // 2. out-of-dated msgAppResp has no effect on the silding window. func TestMsgAppFlowControlMoveForward(t *testing.T) { - r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() @@ -105,7 +105,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { // TestMsgAppFlowControlRecvHeartbeat ensures a heartbeat response // frees one slot if the window is full. func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { - r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 3d9e496cf..be5473bf2 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -52,7 +52,7 @@ func TestLeaderUpdateTermFromMessage(t *testing.T) { // it immediately reverts to follower state. // Reference: section 5.1 func testUpdateTermFromMessage(t *testing.T, state StateType) { - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) switch state { case StateFollower: r.becomeFollower(1, 2) @@ -82,7 +82,7 @@ func TestRejectStaleTermMessage(t *testing.T) { fakeStep := func(r *raft, m pb.Message) { called = true } - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) r.step = fakeStep r.loadState(pb.HardState{Term: 2}) @@ -96,7 +96,7 @@ func TestRejectStaleTermMessage(t *testing.T) { // TestStartAsFollower tests that when servers start up, they begin as followers. // Reference: section 5.2 func TestStartAsFollower(t *testing.T) { - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) if r.state != StateFollower { t.Errorf("state = %s, want %s", r.state, StateFollower) } @@ -109,7 +109,7 @@ func TestStartAsFollower(t *testing.T) { func TestLeaderBcastBeat(t *testing.T) { // heartbeat interval hi := 1 - r := newRaft(1, []uint64{1, 2, 3}, 10, hi, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, hi, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() for i := 0; i < 10; i++ { @@ -151,7 +151,7 @@ func TestCandidateStartNewElection(t *testing.T) { func testNonleaderStartElection(t *testing.T, state StateType) { // election timeout et := 10 - r := newRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage()) switch state { case StateFollower: r.becomeFollower(1, 2) @@ -215,7 +215,7 @@ func TestLeaderElectionInOneRoundRPC(t *testing.T) { {5, map[uint64]bool{}, StateCandidate}, } for i, tt := range tests { - r := newRaft(1, idsBySize(tt.size), 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, idsBySize(tt.size), 10, 1, NewMemoryStorage()) r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) for id, vote := range tt.votes { @@ -248,7 +248,7 @@ func TestFollowerVote(t *testing.T) { {2, 1, true}, } for i, tt := range tests { - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) r.loadState(pb.HardState{Term: 1, Vote: tt.vote}) r.Step(pb.Message{From: tt.nvote, To: 1, Term: 1, Type: pb.MsgVote}) @@ -274,7 +274,7 @@ func TestCandidateFallback(t *testing.T) { {From: 2, To: 1, Term: 2, Type: pb.MsgApp}, } for i, tt := range tests { - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) if r.state != StateCandidate { t.Fatalf("unexpected state = %s, want %s", r.state, StateCandidate) @@ -307,7 +307,7 @@ func TestCandidateElectionTimeoutRandomized(t *testing.T) { // Reference: section 5.2 func testNonleaderElectionTimeoutRandomized(t *testing.T, state StateType) { et := 10 - r := newRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage()) timeouts := make(map[int]bool) for round := 0; round < 50*et; round++ { switch state { @@ -353,7 +353,7 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) { rs := make([]*raft, size) ids := idsBySize(size) for k := range rs { - rs[k] = newRaft(ids[k], ids, et, 1, NewMemoryStorage(), 0) + rs[k] = newTestRaft(ids[k], ids, et, 1, NewMemoryStorage()) } conflicts := 0 for round := 0; round < 1000; round++ { @@ -396,7 +396,7 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) { // Reference: section 5.3 func TestLeaderStartReplication(t *testing.T) { s := NewMemoryStorage() - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, s, 0) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s) r.becomeCandidate() r.becomeLeader() commitNoopEntry(r, s) @@ -435,7 +435,7 @@ func TestLeaderStartReplication(t *testing.T) { // Reference: section 5.3 func TestLeaderCommitEntry(t *testing.T) { s := NewMemoryStorage() - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, s, 0) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s) r.becomeCandidate() r.becomeLeader() commitNoopEntry(r, s) @@ -489,7 +489,7 @@ func TestLeaderAcknowledgeCommit(t *testing.T) { } for i, tt := range tests { s := NewMemoryStorage() - r := newRaft(1, idsBySize(tt.size), 10, 1, s, 0) + r := newTestRaft(1, idsBySize(tt.size), 10, 1, s) r.becomeCandidate() r.becomeLeader() commitNoopEntry(r, s) @@ -523,7 +523,7 @@ func TestLeaderCommitPrecedingEntries(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append(tt) - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage) r.loadState(pb.HardState{Term: 2}) r.becomeCandidate() r.becomeLeader() @@ -578,7 +578,7 @@ func TestFollowerCommitEntry(t *testing.T) { }, } for i, tt := range tests { - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) r.becomeFollower(1, 2) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 1, Entries: tt.ents, Commit: tt.commit}) @@ -621,7 +621,7 @@ func TestFollowerCheckMsgApp(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append(ents) - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage) r.loadState(pb.HardState{Commit: 1}) r.becomeFollower(2, 2) @@ -677,7 +677,7 @@ func TestFollowerAppendEntries(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append([]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}) - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage) r.becomeFollower(2, 2) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents}) @@ -746,11 +746,11 @@ func TestLeaderSyncFollowerLog(t *testing.T) { for i, tt := range tests { leadStorage := NewMemoryStorage() leadStorage.Append(ents) - lead := newRaft(1, []uint64{1, 2, 3}, 10, 1, leadStorage, 0) + lead := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, leadStorage) lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term}) followerStorage := NewMemoryStorage() followerStorage.Append(tt) - follower := newRaft(2, []uint64{1, 2, 3}, 10, 1, followerStorage, 0) + follower := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, followerStorage) follower.loadState(pb.HardState{Term: term - 1}) // It is necessary to have a three-node cluster. // The second may have more up-to-date log than the first one, so the @@ -779,7 +779,7 @@ func TestVoteRequest(t *testing.T) { {[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}, 3}, } for j, tt := range tests { - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) r.Step(pb.Message{ From: 2, To: 1, Type: pb.MsgApp, Term: tt.wterm - 1, LogTerm: 0, Index: 0, Entries: tt.ents, }) @@ -842,7 +842,7 @@ func TestVoter(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append(tt.ents) - r := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVote, Term: 3, LogTerm: tt.logterm, Index: tt.index}) @@ -878,7 +878,7 @@ func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append(ents) - r := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) r.loadState(pb.HardState{Term: 2}) // become leader at term 3 r.becomeCandidate() diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index 21ec5355c..f75c784c2 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -32,7 +32,7 @@ var ( func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { storage := NewMemoryStorage() - sm := newRaft(1, []uint64{1}, 10, 1, storage, 0) + sm := newTestRaft(1, []uint64{1}, 10, 1, storage) sm.restore(testingSnap) sm.becomeCandidate() @@ -50,7 +50,7 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { func TestPendingSnapshotPauseReplication(t *testing.T) { storage := NewMemoryStorage() - sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0) + sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) sm.restore(testingSnap) sm.becomeCandidate() @@ -67,7 +67,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) { func TestSnapshotFailure(t *testing.T) { storage := NewMemoryStorage() - sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0) + sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) sm.restore(testingSnap) sm.becomeCandidate() @@ -90,7 +90,7 @@ func TestSnapshotFailure(t *testing.T) { func TestSnapshotSucceed(t *testing.T) { storage := NewMemoryStorage() - sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0) + sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) sm.restore(testingSnap) sm.becomeCandidate() @@ -113,7 +113,7 @@ func TestSnapshotSucceed(t *testing.T) { func TestSnapshotAbort(t *testing.T) { storage := NewMemoryStorage() - sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0) + sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) sm.restore(testingSnap) sm.becomeCandidate() diff --git a/raft/raft_test.go b/raft/raft_test.go index 81b0f10c8..815cb1d08 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -262,7 +262,7 @@ func TestProgressResume(t *testing.T) { // TestProgressResumeByHeartbeat ensures raft.heartbeat reset progress.paused by heartbeat. func TestProgressResumeByHeartbeat(t *testing.T) { - r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() r.prs[2].Paused = true @@ -274,7 +274,7 @@ func TestProgressResumeByHeartbeat(t *testing.T) { } func TestProgressPaused(t *testing.T) { - r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) @@ -466,9 +466,9 @@ func TestCommitWithoutNewTermEntry(t *testing.T) { } func TestDuelingCandidates(t *testing.T) { - a := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) - b := newRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) - c := newRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) + a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) nt := newNetwork(a, b, c) nt.cut(1, 3) @@ -736,7 +736,7 @@ func TestCommit(t *testing.T) { storage.Append(tt.logs) storage.hardState = pb.HardState{Term: tt.smTerm} - sm := newRaft(1, []uint64{1}, 5, 1, storage, 0) + sm := newTestRaft(1, []uint64{1}, 5, 1, storage) for j := 0; j < len(tt.matches); j++ { sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1) } @@ -761,7 +761,7 @@ func TestIsElectionTimeout(t *testing.T) { } for i, tt := range tests { - sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0) + sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) sm.elapsed = tt.elapse c := 0 for j := 0; j < 10000; j++ { @@ -786,7 +786,7 @@ func TestStepIgnoreOldTermMsg(t *testing.T) { fakeStep := func(r *raft, m pb.Message) { called = true } - sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0) + sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) sm.step = fakeStep sm.Term = 2 sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1}) @@ -828,7 +828,7 @@ func TestHandleMsgApp(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}) - sm := newRaft(1, []uint64{1}, 10, 1, storage, 0) + sm := newTestRaft(1, []uint64{1}, 10, 1, storage) sm.becomeFollower(2, None) sm.handleAppendEntries(tt.m) @@ -862,7 +862,7 @@ func TestHandleHeartbeat(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) - sm := newRaft(1, []uint64{1, 2}, 5, 1, storage, 0) + sm := newTestRaft(1, []uint64{1, 2}, 5, 1, storage) sm.becomeFollower(2, 2) sm.raftLog.commitTo(commit) sm.handleHeartbeat(tt.m) @@ -883,7 +883,7 @@ func TestHandleHeartbeat(t *testing.T) { func TestHandleHeartbeatResp(t *testing.T) { storage := NewMemoryStorage() storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) - sm := newRaft(1, []uint64{1, 2}, 5, 1, storage, 0) + sm := newTestRaft(1, []uint64{1, 2}, 5, 1, storage) sm.becomeCandidate() sm.becomeLeader() sm.raftLog.commitTo(sm.raftLog.lastIndex()) @@ -942,7 +942,7 @@ func TestHandleHeartbeatResp(t *testing.T) { // TestMsgAppRespWaitReset verifies the waitReset behavior of a leader // MsgAppResp. func TestMsgAppRespWaitReset(t *testing.T) { - sm := newRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage(), 0) + sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) sm.becomeCandidate() sm.becomeLeader() @@ -1036,7 +1036,7 @@ func TestRecvMsgVote(t *testing.T) { } for i, tt := range tests { - sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0) + sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) sm.state = tt.state switch tt.state { case StateFollower: @@ -1096,7 +1096,7 @@ func TestStateTransition(t *testing.T) { } }() - sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0) + sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) sm.state = tt.from switch tt.to { @@ -1135,7 +1135,7 @@ func TestAllServerStepdown(t *testing.T) { tterm := uint64(3) for i, tt := range tests { - sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) + sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) switch tt.state { case StateFollower: sm.becomeFollower(1, None) @@ -1194,7 +1194,7 @@ func TestLeaderAppResp(t *testing.T) { for i, tt := range tests { // sm term is 1 after it becomes the leader. // thus the last log term must be 1 to be committed. - sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) + sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) sm.raftLog = &raftLog{ storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}, unstable: unstable{offset: 3}, @@ -1242,7 +1242,7 @@ func TestBcastBeat(t *testing.T) { } storage := NewMemoryStorage() storage.ApplySnapshot(s) - sm := newRaft(1, nil, 10, 1, storage, 0) + sm := newTestRaft(1, nil, 10, 1, storage) sm.Term = 1 sm.becomeCandidate() @@ -1301,7 +1301,7 @@ func TestRecvMsgBeat(t *testing.T) { } for i, tt := range tests { - sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) + sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}} sm.Term = 1 sm.state = tt.state @@ -1344,7 +1344,7 @@ func TestLeaderIncreaseNext(t *testing.T) { } for i, tt := range tests { - sm := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) + sm := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) sm.raftLog.append(previousEnts...) sm.becomeCandidate() sm.becomeLeader() @@ -1360,7 +1360,7 @@ func TestLeaderIncreaseNext(t *testing.T) { } func TestSendAppendForProgressProbe(t *testing.T) { - r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() r.readMessages() @@ -1406,7 +1406,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { } func TestSendAppendForProgressReplicate(t *testing.T) { - r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() r.readMessages() @@ -1423,7 +1423,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) { } func TestSendAppendForProgressSnapshot(t *testing.T) { - r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() r.readMessages() @@ -1443,7 +1443,7 @@ func TestRecvMsgUnreachable(t *testing.T) { previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}} s := NewMemoryStorage() s.Append(previousEnts) - r := newRaft(1, []uint64{1, 2}, 10, 1, s, 0) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, s) r.becomeCandidate() r.becomeLeader() r.readMessages() @@ -1472,7 +1472,7 @@ func TestRestore(t *testing.T) { } storage := NewMemoryStorage() - sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0) + sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) if ok := sm.restore(s); !ok { t.Fatal("restore fail, want succeed") } @@ -1497,7 +1497,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) { previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}} commit := uint64(1) storage := NewMemoryStorage() - sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0) + sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) sm.raftLog.append(previousEnts...) sm.raftLog.commitTo(commit) @@ -1538,7 +1538,7 @@ func TestProvideSnap(t *testing.T) { }, } storage := NewMemoryStorage() - sm := newRaft(1, []uint64{1}, 10, 1, storage, 0) + sm := newTestRaft(1, []uint64{1}, 10, 1, storage) sm.restore(s) sm.becomeCandidate() @@ -1569,7 +1569,7 @@ func TestRestoreFromSnapMsg(t *testing.T) { } m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s} - sm := newRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) + sm := newTestRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage()) sm.Step(m) // TODO(bdarnell): what should this test? @@ -1604,7 +1604,7 @@ func TestSlowNodeRestore(t *testing.T) { // it appends the entry to log and sets pendingConf to be true. func TestStepConfig(t *testing.T) { // a raft that cannot make progress - r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() index := r.raftLog.lastIndex() @@ -1622,7 +1622,7 @@ func TestStepConfig(t *testing.T) { // the proposal to noop and keep its original state. func TestStepIgnoreConfig(t *testing.T) { // a raft that cannot make progress - r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) @@ -1649,7 +1649,7 @@ func TestRecoverPendingConfig(t *testing.T) { {pb.EntryConfChange, true}, } for i, tt := range tests { - r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) r.appendEntry(pb.Entry{Type: tt.entType}) r.becomeCandidate() r.becomeLeader() @@ -1668,7 +1668,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) { t.Errorf("expect panic, but nothing happens") } }() - r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) r.becomeCandidate() @@ -1678,7 +1678,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) { // TestAddNode tests that addNode could update pendingConf and nodes correctly. func TestAddNode(t *testing.T) { - r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) r.pendingConf = true r.addNode(2) if r.pendingConf != false { @@ -1694,7 +1694,7 @@ func TestAddNode(t *testing.T) { // TestRemoveNode tests that removeNode could update pendingConf, nodes and // and removed list correctly. func TestRemoveNode(t *testing.T) { - r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) r.pendingConf = true r.removeNode(2) if r.pendingConf != false { @@ -1718,7 +1718,7 @@ func TestPromotable(t *testing.T) { {[]uint64{2, 3}, false}, } for i, tt := range tests { - r := newRaft(id, tt.peers, 5, 1, NewMemoryStorage(), 0) + r := newTestRaft(id, tt.peers, 5, 1, NewMemoryStorage()) if g := r.promotable(); g != tt.wp { t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp) } @@ -1740,7 +1740,7 @@ func TestRaftNodes(t *testing.T) { }, } for i, tt := range tests { - r := newRaft(1, tt.ids, 10, 1, NewMemoryStorage(), 0) + r := newTestRaft(1, tt.ids, 10, 1, NewMemoryStorage()) if !reflect.DeepEqual(r.nodes(), tt.wids) { t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids) } @@ -1752,7 +1752,7 @@ func ents(terms ...uint64) *raft { for i, term := range terms { storage.Append([]pb.Entry{{Index: uint64(i + 1), Term: term}}) } - sm := newRaft(1, []uint64{}, 5, 1, storage, 0) + sm := newTestRaft(1, []uint64{}, 5, 1, storage) sm.reset(0) return sm } @@ -1780,7 +1780,7 @@ func newNetwork(peers ...Interface) *network { switch v := p.(type) { case nil: nstorage[id] = NewMemoryStorage() - sm := newRaft(id, peerAddrs, 10, 1, nstorage[id], 0) + sm := newTestRaft(id, peerAddrs, 10, 1, nstorage[id]) npeers[id] = sm case *raft: v.id = id @@ -1880,3 +1880,17 @@ func idsBySize(size int) []uint64 { } return ids } + +func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft { + c := &Config{ + ID: id, + Peers: peers, + ElectionTick: election, + HeartbeatTick: heartbeat, + Storage: storage, + MaxSizePerMsg: noLimit, + MaxInflightMsgs: 256, + } + + return newRaft(c) +}