From 59214978a22a7893aace389bab3750a9bea4a00f Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Thu, 22 Jan 2015 11:37:02 -0500 Subject: [PATCH 1/2] raft: Add applied index as an argument to newRaft and RestartNode. --- etcdserver/raft.go | 4 +-- raft/node.go | 10 ++++--- raft/node_bench_test.go | 2 +- raft/node_test.go | 14 ++++----- raft/raft.go | 6 +++- raft/raft_paper_test.go | 44 ++++++++++++++-------------- raft/raft_test.go | 64 ++++++++++++++++++++--------------------- 7 files changed, 75 insertions(+), 69 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index d7d69a2fd..15915fff0 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -116,7 +116,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.N } s.SetHardState(st) s.Append(ents) - n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s) + n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s, 0) return id, n, s, w } @@ -157,7 +157,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type } s.SetHardState(st) s.Append(ents) - n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s) + n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s, 0) return id, n, s, w } diff --git a/raft/node.go b/raft/node.go index 1a3fd6cd7..85f4ac823 100644 --- a/raft/node.go +++ b/raft/node.go @@ -135,7 +135,7 @@ type Peer struct { // It appends a ConfChangeAddNode entry for each given peer to the initial log. func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage) Node { n := newNode() - r := newRaft(id, nil, election, heartbeat, storage) + r := newRaft(id, nil, election, heartbeat, storage, 0) // become the follower at term 1 and apply initial configuration // entires of term 1 @@ -171,11 +171,13 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage return &n } -// RestartNode is identical to StartNode but does not take a list of peers. +// RestartNode is similar to StartNode but does not take a list of peers. // The current membership of the cluster will be restored from the Storage. -func RestartNode(id uint64, election, heartbeat int, storage Storage) Node { +// If the caller has an existing state machine, pass in the last log index that +// has been applied to it; otherwise use zero. +func RestartNode(id uint64, election, heartbeat int, storage Storage, applied uint64) Node { n := newNode() - r := newRaft(id, nil, election, heartbeat, storage) + r := newRaft(id, nil, election, heartbeat, storage, applied) go n.run(r) return &n diff --git a/raft/node_bench_test.go b/raft/node_bench_test.go index 89a37d330..b137b6432 100644 --- a/raft/node_bench_test.go +++ b/raft/node_bench_test.go @@ -28,7 +28,7 @@ func BenchmarkOneNode(b *testing.B) { n := newNode() s := NewMemoryStorage() - r := newRaft(1, []uint64{1}, 10, 1, s) + r := newRaft(1, []uint64{1}, 10, 1, s, 0) go n.run(r) defer n.Stop() diff --git a/raft/node_test.go b/raft/node_test.go index f148e7e06..b04a4be54 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -116,7 +116,7 @@ func TestNodePropose(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newRaft(1, []uint64{1}, 10, 1, s) + r := newRaft(1, []uint64{1}, 10, 1, s, 0) go n.run(r) n.Campaign(context.TODO()) for { @@ -154,7 +154,7 @@ func TestNodeProposeConfig(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newRaft(1, []uint64{1}, 10, 1, s) + r := newRaft(1, []uint64{1}, 10, 1, s, 0) go n.run(r) n.Campaign(context.TODO()) for { @@ -192,7 +192,7 @@ func TestNodeProposeConfig(t *testing.T) { // who is the current leader. func TestBlockProposal(t *testing.T) { n := newNode() - r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0) go n.run(r) defer n.Stop() @@ -225,7 +225,7 @@ func TestBlockProposal(t *testing.T) { func TestNodeTick(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newRaft(1, []uint64{1}, 10, 1, s) + r := newRaft(1, []uint64{1}, 10, 1, s, 0) go n.run(r) elapsed := r.elapsed n.Tick() @@ -240,7 +240,7 @@ func TestNodeTick(t *testing.T) { func TestNodeStop(t *testing.T) { n := newNode() s := NewMemoryStorage() - r := newRaft(1, []uint64{1}, 10, 1, s) + r := newRaft(1, []uint64{1}, 10, 1, s, 0) donec := make(chan struct{}) go func() { @@ -364,7 +364,7 @@ func TestNodeRestart(t *testing.T) { storage := NewMemoryStorage() storage.SetHardState(st) storage.Append(entries) - n := RestartNode(1, 10, 1, storage) + n := RestartNode(1, 10, 1, storage, 0) if g := <-n.Ready(); !reflect.DeepEqual(g, want) { t.Errorf("g = %+v,\n w %+v", g, want) } @@ -400,7 +400,7 @@ func TestNodeRestartFromSnapshot(t *testing.T) { s.SetHardState(st) s.ApplySnapshot(snap) s.Append(entries) - n := RestartNode(1, 10, 1, s) + n := RestartNode(1, 10, 1, s, 0) if g := <-n.Ready(); !reflect.DeepEqual(g, want) { t.Errorf("g = %+v,\n w %+v", g, want) } else { diff --git a/raft/raft.go b/raft/raft.go index 8ba2a2365..a93dacfc5 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -139,7 +139,8 @@ type raft struct { step stepFunc } -func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft { +func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage, + applied uint64) *raft { if id == None { panic("cannot use none id") } @@ -172,6 +173,9 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage if !isHardStateEqual(hs, emptyState) { r.loadState(hs) } + if applied > 0 { + raftlog.appliedTo(applied) + } r.becomeFollower(r.Term, None) nodesStrs := make([]string, 0) diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 45da732ab..1ec986ad2 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -56,7 +56,7 @@ func TestLeaderUpdateTermFromMessage(t *testing.T) { // it immediately reverts to follower state. // Reference: section 5.1 func testUpdateTermFromMessage(t *testing.T, state StateType) { - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) switch state { case StateFollower: r.becomeFollower(1, 2) @@ -86,7 +86,7 @@ func TestRejectStaleTermMessage(t *testing.T) { fakeStep := func(r *raft, m pb.Message) { called = true } - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) r.step = fakeStep r.loadState(pb.HardState{Term: 2}) @@ -100,7 +100,7 @@ func TestRejectStaleTermMessage(t *testing.T) { // TestStartAsFollower tests that when servers start up, they begin as followers. // Reference: section 5.2 func TestStartAsFollower(t *testing.T) { - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) if r.state != StateFollower { t.Errorf("state = %s, want %s", r.state, StateFollower) } @@ -113,7 +113,7 @@ func TestStartAsFollower(t *testing.T) { func TestLeaderBcastBeat(t *testing.T) { // heartbeat interval hi := 1 - r := newRaft(1, []uint64{1, 2, 3}, 10, hi, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2, 3}, 10, hi, NewMemoryStorage(), 0) r.becomeCandidate() r.becomeLeader() for i := 0; i < 10; i++ { @@ -155,7 +155,7 @@ func TestCandidateStartNewElection(t *testing.T) { func testNonleaderStartElection(t *testing.T, state StateType) { // election timeout et := 10 - r := newRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage(), 0) switch state { case StateFollower: r.becomeFollower(1, 2) @@ -219,7 +219,7 @@ func TestLeaderElectionInOneRoundRPC(t *testing.T) { {5, map[uint64]bool{}, StateCandidate}, } for i, tt := range tests { - r := newRaft(1, idsBySize(tt.size), 10, 1, NewMemoryStorage()) + r := newRaft(1, idsBySize(tt.size), 10, 1, NewMemoryStorage(), 0) r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) for id, vote := range tt.votes { @@ -252,7 +252,7 @@ func TestFollowerVote(t *testing.T) { {2, 1, true}, } for i, tt := range tests { - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) r.loadState(pb.HardState{Term: 1, Vote: tt.vote}) r.Step(pb.Message{From: tt.nvote, To: 1, Term: 1, Type: pb.MsgVote}) @@ -278,7 +278,7 @@ func TestCandidateFallback(t *testing.T) { {From: 2, To: 1, Term: 2, Type: pb.MsgApp}, } for i, tt := range tests { - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) if r.state != StateCandidate { t.Fatalf("unexpected state = %s, want %s", r.state, StateCandidate) @@ -311,7 +311,7 @@ func TestCandidateElectionTimeoutRandomized(t *testing.T) { // Reference: section 5.2 func testNonleaderElectionTimeoutRandomized(t *testing.T, state StateType) { et := 10 - r := newRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage(), 0) timeouts := make(map[int]bool) for round := 0; round < 50*et; round++ { switch state { @@ -357,7 +357,7 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) { rs := make([]*raft, size) ids := idsBySize(size) for k := range rs { - rs[k] = newRaft(ids[k], ids, et, 1, NewMemoryStorage()) + rs[k] = newRaft(ids[k], ids, et, 1, NewMemoryStorage(), 0) } conflicts := 0 for round := 0; round < 1000; round++ { @@ -400,7 +400,7 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) { // Reference: section 5.3 func TestLeaderStartReplication(t *testing.T) { s := NewMemoryStorage() - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, s) + r := newRaft(1, []uint64{1, 2, 3}, 10, 1, s, 0) r.becomeCandidate() r.becomeLeader() commitNoopEntry(r, s) @@ -439,7 +439,7 @@ func TestLeaderStartReplication(t *testing.T) { // Reference: section 5.3 func TestLeaderCommitEntry(t *testing.T) { s := NewMemoryStorage() - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, s) + r := newRaft(1, []uint64{1, 2, 3}, 10, 1, s, 0) r.becomeCandidate() r.becomeLeader() commitNoopEntry(r, s) @@ -493,7 +493,7 @@ func TestLeaderAcknowledgeCommit(t *testing.T) { } for i, tt := range tests { s := NewMemoryStorage() - r := newRaft(1, idsBySize(tt.size), 10, 1, s) + r := newRaft(1, idsBySize(tt.size), 10, 1, s, 0) r.becomeCandidate() r.becomeLeader() commitNoopEntry(r, s) @@ -527,7 +527,7 @@ func TestLeaderCommitPrecedingEntries(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append(tt) - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage) + r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0) r.loadState(pb.HardState{Term: 2}) r.becomeCandidate() r.becomeLeader() @@ -582,7 +582,7 @@ func TestFollowerCommitEntry(t *testing.T) { }, } for i, tt := range tests { - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) r.becomeFollower(1, 2) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 1, Entries: tt.ents, Commit: tt.commit}) @@ -619,7 +619,7 @@ func TestFollowerCheckMsgApp(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append(ents) - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage) + r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0) r.loadState(pb.HardState{Commit: 2}) r.becomeFollower(2, 2) @@ -675,7 +675,7 @@ func TestFollowerAppendEntries(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append([]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}) - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage) + r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0) r.becomeFollower(2, 2) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents}) @@ -744,11 +744,11 @@ func TestLeaderSyncFollowerLog(t *testing.T) { for i, tt := range tests { leadStorage := NewMemoryStorage() leadStorage.Append(ents) - lead := newRaft(1, []uint64{1, 2, 3}, 10, 1, leadStorage) + lead := newRaft(1, []uint64{1, 2, 3}, 10, 1, leadStorage, 0) lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term}) followerStorage := NewMemoryStorage() followerStorage.Append(tt) - follower := newRaft(2, []uint64{1, 2, 3}, 10, 1, followerStorage) + follower := newRaft(2, []uint64{1, 2, 3}, 10, 1, followerStorage, 0) follower.loadState(pb.HardState{Term: term - 1}) // It is necessary to have a three-node cluster. // The second may have more up-to-date log than the first one, so the @@ -777,7 +777,7 @@ func TestVoteRequest(t *testing.T) { {[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}, 3}, } for i, tt := range tests { - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) r.Step(pb.Message{ From: 2, To: 1, Type: pb.MsgApp, Term: tt.wterm - 1, LogTerm: 0, Index: 0, Entries: tt.ents, }) @@ -840,7 +840,7 @@ func TestVoter(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append(tt.ents) - r := newRaft(1, []uint64{1, 2}, 10, 1, storage) + r := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVote, Term: 3, LogTerm: tt.logterm, Index: tt.index}) @@ -876,7 +876,7 @@ func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append(ents) - r := newRaft(1, []uint64{1, 2}, 10, 1, storage) + r := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0) r.loadState(pb.HardState{Term: 2}) // become leader at term 3 r.becomeCandidate() diff --git a/raft/raft_test.go b/raft/raft_test.go index 136f251f8..267196120 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -195,7 +195,7 @@ func TestProgressWaitReset(t *testing.T) { // TestProgressDecr ensures raft.heartbeat decreases progress.wait by heartbeat. func TestProgressDecr(t *testing.T) { - r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0) r.becomeCandidate() r.becomeLeader() r.prs[2].Wait = r.heartbeatTimeout * 2 @@ -207,7 +207,7 @@ func TestProgressDecr(t *testing.T) { } func TestProgressWait(t *testing.T) { - r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0) r.becomeCandidate() r.becomeLeader() r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) @@ -399,9 +399,9 @@ func TestCommitWithoutNewTermEntry(t *testing.T) { } func TestDuelingCandidates(t *testing.T) { - a := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - b := newRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - c := newRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + a := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) + b := newRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) + c := newRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) nt := newNetwork(a, b, c) nt.cut(1, 3) @@ -669,7 +669,7 @@ func TestCommit(t *testing.T) { storage.Append(tt.logs) storage.hardState = pb.HardState{Term: tt.smTerm} - sm := newRaft(1, []uint64{1}, 5, 1, storage) + sm := newRaft(1, []uint64{1}, 5, 1, storage, 0) for j := 0; j < len(tt.matches); j++ { sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1) } @@ -694,7 +694,7 @@ func TestIsElectionTimeout(t *testing.T) { } for i, tt := range tests { - sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) + sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0) sm.elapsed = tt.elapse c := 0 for j := 0; j < 10000; j++ { @@ -719,7 +719,7 @@ func TestStepIgnoreOldTermMsg(t *testing.T) { fakeStep := func(r *raft, m pb.Message) { called = true } - sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) + sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0) sm.step = fakeStep sm.Term = 2 sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1}) @@ -761,7 +761,7 @@ func TestHandleMsgApp(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}) - sm := newRaft(1, []uint64{1}, 10, 1, storage) + sm := newRaft(1, []uint64{1}, 10, 1, storage, 0) sm.becomeFollower(2, None) sm.handleAppendEntries(tt.m) @@ -795,7 +795,7 @@ func TestHandleHeartbeat(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) - sm := newRaft(1, []uint64{1, 2}, 5, 1, storage) + sm := newRaft(1, []uint64{1, 2}, 5, 1, storage, 0) sm.becomeFollower(2, 2) sm.raftLog.commitTo(commit) sm.handleHeartbeat(tt.m) @@ -816,7 +816,7 @@ func TestHandleHeartbeat(t *testing.T) { func TestHandleHeartbeatResp(t *testing.T) { storage := NewMemoryStorage() storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) - sm := newRaft(1, []uint64{1, 2}, 5, 1, storage) + sm := newRaft(1, []uint64{1, 2}, 5, 1, storage, 0) sm.becomeCandidate() sm.becomeLeader() sm.raftLog.commitTo(sm.raftLog.lastIndex()) @@ -904,7 +904,7 @@ func TestRecvMsgVote(t *testing.T) { } for i, tt := range tests { - sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) + sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0) sm.state = tt.state switch tt.state { case StateFollower: @@ -964,7 +964,7 @@ func TestStateTransition(t *testing.T) { } }() - sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) + sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0) sm.state = tt.from switch tt.to { @@ -1003,7 +1003,7 @@ func TestAllServerStepdown(t *testing.T) { tterm := uint64(3) for i, tt := range tests { - sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) switch tt.state { case StateFollower: sm.becomeFollower(1, None) @@ -1062,7 +1062,7 @@ func TestLeaderAppResp(t *testing.T) { for i, tt := range tests { // sm term is 1 after it becomes the leader. // thus the last log term must be 1 to be committed. - sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) sm.raftLog = &raftLog{ storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}, unstable: unstable{offset: 3}, @@ -1110,7 +1110,7 @@ func TestBcastBeat(t *testing.T) { } storage := NewMemoryStorage() storage.ApplySnapshot(s) - sm := newRaft(1, nil, 10, 1, storage) + sm := newRaft(1, nil, 10, 1, storage, 0) sm.Term = 1 sm.becomeCandidate() @@ -1169,7 +1169,7 @@ func TestRecvMsgBeat(t *testing.T) { } for i, tt := range tests { - sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0) sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}} sm.Term = 1 sm.state = tt.state @@ -1212,7 +1212,7 @@ func TestLeaderIncreaseNext(t *testing.T) { } for i, tt := range tests { - sm := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + sm := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) sm.raftLog.append(previousEnts...) sm.becomeCandidate() sm.becomeLeader() @@ -1236,7 +1236,7 @@ func TestRestore(t *testing.T) { } storage := NewMemoryStorage() - sm := newRaft(1, []uint64{1, 2}, 10, 1, storage) + sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0) if ok := sm.restore(s); !ok { t.Fatal("restore fail, want succeed") } @@ -1261,7 +1261,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) { previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}} commit := uint64(1) storage := NewMemoryStorage() - sm := newRaft(1, []uint64{1, 2}, 10, 1, storage) + sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0) sm.raftLog.append(previousEnts...) sm.raftLog.commitTo(commit) @@ -1302,7 +1302,7 @@ func TestProvideSnap(t *testing.T) { }, } storage := NewMemoryStorage() - sm := newRaft(1, []uint64{1}, 10, 1, storage) + sm := newRaft(1, []uint64{1}, 10, 1, storage, 0) sm.restore(s) sm.becomeCandidate() @@ -1333,7 +1333,7 @@ func TestRestoreFromSnapMsg(t *testing.T) { } m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s} - sm := newRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + sm := newRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) sm.Step(m) // TODO(bdarnell): what should this test? @@ -1367,7 +1367,7 @@ func TestSlowNodeRestore(t *testing.T) { // it appends the entry to log and sets pendingConf to be true. func TestStepConfig(t *testing.T) { // a raft that cannot make progress - r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) r.becomeCandidate() r.becomeLeader() index := r.raftLog.lastIndex() @@ -1385,7 +1385,7 @@ func TestStepConfig(t *testing.T) { // the proposal to noop and keep its original state. func TestStepIgnoreConfig(t *testing.T) { // a raft that cannot make progress - r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) r.becomeCandidate() r.becomeLeader() r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) @@ -1412,7 +1412,7 @@ func TestRecoverPendingConfig(t *testing.T) { {pb.EntryConfChange, true}, } for i, tt := range tests { - r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) r.appendEntry(pb.Entry{Type: tt.entType}) r.becomeCandidate() r.becomeLeader() @@ -1431,7 +1431,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) { t.Errorf("expect panic, but nothing happens") } }() - r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) r.becomeCandidate() @@ -1441,7 +1441,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) { // TestAddNode tests that addNode could update pendingConf and nodes correctly. func TestAddNode(t *testing.T) { - r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0) r.pendingConf = true r.addNode(2) if r.pendingConf != false { @@ -1457,7 +1457,7 @@ func TestAddNode(t *testing.T) { // TestRemoveNode tests that removeNode could update pendingConf, nodes and // and removed list correctly. func TestRemoveNode(t *testing.T) { - r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) + r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0) r.pendingConf = true r.removeNode(2) if r.pendingConf != false { @@ -1481,7 +1481,7 @@ func TestPromotable(t *testing.T) { {[]uint64{2, 3}, false}, } for i, tt := range tests { - r := newRaft(id, tt.peers, 5, 1, NewMemoryStorage()) + r := newRaft(id, tt.peers, 5, 1, NewMemoryStorage(), 0) if g := r.promotable(); g != tt.wp { t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp) } @@ -1503,7 +1503,7 @@ func TestRaftNodes(t *testing.T) { }, } for i, tt := range tests { - r := newRaft(1, tt.ids, 10, 1, NewMemoryStorage()) + r := newRaft(1, tt.ids, 10, 1, NewMemoryStorage(), 0) if !reflect.DeepEqual(r.nodes(), tt.wids) { t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids) } @@ -1515,7 +1515,7 @@ func ents(terms ...uint64) *raft { for i, term := range terms { storage.Append([]pb.Entry{{Index: uint64(i + 1), Term: term}}) } - sm := newRaft(1, []uint64{}, 5, 1, storage) + sm := newRaft(1, []uint64{}, 5, 1, storage, 0) sm.reset(0) return sm } @@ -1543,7 +1543,7 @@ func newNetwork(peers ...Interface) *network { switch v := p.(type) { case nil: nstorage[id] = NewMemoryStorage() - sm := newRaft(id, peerAddrs, 10, 1, nstorage[id]) + sm := newRaft(id, peerAddrs, 10, 1, nstorage[id], 0) npeers[id] = sm case *raft: v.id = id From 8c3a6508e9f22d4693bcc7c8eaa39e88bb81739c Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Thu, 22 Jan 2015 12:03:35 -0500 Subject: [PATCH 2/2] raft: Add applied to the newRaft log message. --- raft/raft.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index a93dacfc5..6c4d48354 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -183,8 +183,8 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n)) } - log.Printf("raft: newRaft %x [peers: [%s], term: %d, commit: %d, lastindex: %d, lastterm: %d]", - r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm()) + log.Printf("raft: newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]", + r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm()) return r }