From 421d5fbe723decf60a91e8ce2f63d6db863a033e Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 30 Oct 2014 09:42:42 -0700 Subject: [PATCH] raft: add tests based on section 5.3 in raft paper --- raft/log.go | 3 + raft/raft_paper_test.go | 399 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 402 insertions(+) diff --git a/raft/log.go b/raft/log.go index cb3e26e6d..2f302c48f 100644 --- a/raft/log.go +++ b/raft/log.go @@ -41,6 +41,9 @@ func newLog() *raftLog { } func (l *raftLog) load(ents []pb.Entry) { + if l.offset != ents[0].Index { + panic("entries loaded don't match offset index") + } l.ents = ents l.unstable = l.offset + uint64(len(ents)) } diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 005768666..93201647a 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -379,8 +379,407 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) { } } +// TestLeaderStartReplication tests that when receiving client proposals, +// the leader appends the proposal to its log as a new entry, then issues +// AppendEntries RPCs in parallel to each of the other servers to replicate +// the entry. Also, when sending an AppendEntries RPC, the leader includes +// the index and term of the entry in its log that immediately precedes +// the new entries. +// Also, it writes the new entry into stable storage. +// Reference: section 5.3 +func TestLeaderStartReplication(t *testing.T) { + r := newRaft(1, []uint64{1, 2, 3}, 10, 1) + r.becomeCandidate() + r.becomeLeader() + commitNoopEntry(r) + li := r.raftLog.lastIndex() + + ents := []pb.Entry{{Data: []byte("some data")}} + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: ents}) + + if g := r.raftLog.lastIndex(); g != li+1 { + t.Errorf("lastIndex = %d, want %d", g, li+1) + } + if g := r.raftLog.committed; g != li { + t.Errorf("committed = %d, want %d", g, li) + } + msgs := r.readMessages() + sort.Sort(messageSlice(msgs)) + wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}} + wmsgs := []pb.Message{ + {From: 1, To: 2, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li}, + {From: 1, To: 3, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li}, + } + if !reflect.DeepEqual(msgs, wmsgs) { + t.Errorf("msgs = %+v, want %+v", msgs, wmsgs) + } + if g := r.raftLog.unstableEnts(); !reflect.DeepEqual(g, wents) { + t.Errorf("ents = %+v, want %+v", g, wents) + } +} + +// TestLeaderCommitEntry tests that when the entry has been safely replicated, +// the leader gives out the applied entries, which can be applied to its state +// machine. +// Also, the leader keeps track of the highest index it knows to be committed, +// and it includes that index in future AppendEntries RPCs so that the other +// servers eventually find out. +// Reference: section 5.3 +func TestLeaderCommitEntry(t *testing.T) { + r := newRaft(1, []uint64{1, 2, 3}, 10, 1) + r.becomeCandidate() + r.becomeLeader() + commitNoopEntry(r) + li := r.raftLog.lastIndex() + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + + for _, m := range r.readMessages() { + r.Step(acceptAndReply(m)) + } + + if g := r.raftLog.committed; g != li+1 { + t.Errorf("committed = %d, want %d", g, li+1) + } + wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}} + if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) { + t.Errorf("nextEnts = %+v, want %+v", g, wents) + } + msgs := r.readMessages() + sort.Sort(messageSlice(msgs)) + for i, m := range msgs { + if w := uint64(i + 2); m.To != w { + t.Errorf("to = %x, want %x", m.To, w) + } + if m.Type != pb.MsgApp { + t.Errorf("type = %s, want %s", m.Type, pb.MsgApp) + } + if m.Commit != li+1 { + t.Errorf("commit = %d, want %d", m.Commit, li+1) + } + } +} + +// TestLeaderAcknowledgeCommit tests that a log entry is committed once the +// leader that created the entry has replicated it on a majority of the servers. +// Reference: section 5.3 +func TestLeaderAcknowledgeCommit(t *testing.T) { + tests := []struct { + size int + acceptors map[uint64]bool + wack bool + }{ + {1, nil, true}, + {3, nil, false}, + {3, map[uint64]bool{2: true}, true}, + {3, map[uint64]bool{2: true, 3: true}, true}, + {5, nil, false}, + {5, map[uint64]bool{2: true}, false}, + {5, map[uint64]bool{2: true, 3: true}, true}, + {5, map[uint64]bool{2: true, 3: true, 4: true}, true}, + {5, map[uint64]bool{2: true, 3: true, 4: true, 5: true}, true}, + } + for i, tt := range tests { + r := newRaft(1, idsBySize(tt.size), 10, 1) + r.becomeCandidate() + r.becomeLeader() + commitNoopEntry(r) + li := r.raftLog.lastIndex() + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + + for _, m := range r.readMessages() { + if tt.acceptors[m.To] { + r.Step(acceptAndReply(m)) + } + } + + if g := r.raftLog.committed > li; g != tt.wack { + t.Errorf("#%d: ack commit = %v, want %v", i, g, tt.wack) + } + } +} + +// TestLeaderCommitPrecedingEntries tests that when leader commits a log entry, +// it also commits all preceding entries in the leader’s log, including +// entries created by previous leaders. +// Also, it applies the entry to its local state machine (in log order). +// Reference: section 5.3 +func TestLeaderCommitPrecedingEntries(t *testing.T) { + tests := [][]pb.Entry{ + {}, + {{Term: 2, Index: 1}}, + {{Term: 1, Index: 1}, {Term: 2, Index: 2}}, + {{Term: 1, Index: 1}}, + } + for i, tt := range tests { + r := newRaft(1, []uint64{1, 2, 3}, 10, 1) + r.loadEnts(append([]pb.Entry{{}}, tt...)) + r.loadState(pb.HardState{Term: 2}) + r.becomeCandidate() + r.becomeLeader() + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + + for _, m := range r.readMessages() { + r.Step(acceptAndReply(m)) + } + + li := uint64(len(tt)) + wents := append(tt, pb.Entry{Term: 3, Index: li + 1}, pb.Entry{Term: 3, Index: li + 2, Data: []byte("some data")}) + if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) { + t.Errorf("#%d: ents = %+v, want %+v", i, g, wents) + } + } +} + +// TestFollowerCommitEntry tests that once a follower learns that a log entry +// is committed, it applies the entry to its local state machine (in log order). +// Reference: section 5.3 +func TestFollowerCommitEntry(t *testing.T) { + tests := []struct { + ents []pb.Entry + commit uint64 + }{ + { + []pb.Entry{ + {Term: 1, Index: 1, Data: []byte("some data")}, + }, + 1, + }, + { + []pb.Entry{ + {Term: 1, Index: 1, Data: []byte("some data")}, + {Term: 1, Index: 2, Data: []byte("some data2")}, + }, + 2, + }, + { + []pb.Entry{ + {Term: 1, Index: 1, Data: []byte("some data2")}, + {Term: 1, Index: 2, Data: []byte("some data")}, + }, + 2, + }, + { + []pb.Entry{ + {Term: 1, Index: 1, Data: []byte("some data")}, + {Term: 1, Index: 2, Data: []byte("some data2")}, + }, + 1, + }, + } + for i, tt := range tests { + r := newRaft(1, []uint64{1, 2, 3}, 10, 1) + r.becomeFollower(1, 2) + + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 1, Entries: tt.ents, Commit: tt.commit}) + + if g := r.raftLog.committed; g != tt.commit { + t.Errorf("#%d: committed = %d, want %d", i, g, tt.commit) + } + wents := tt.ents[:int(tt.commit)] + if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) { + t.Errorf("#%d: nextEnts = %v, want %v", i, g, wents) + } + } +} + +// TestFollowerCheckMsgApp tests that if the follower does not find an +// entry in its log with the same index and term as the one in AppendEntries RPC, +// then it refuses the new entries. Otherwise it replies that it accepts the +// append entries. +// Reference: section 5.3 +func TestFollowerCheckMsgApp(t *testing.T) { + ents := []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}} + tests := []struct { + term uint64 + index uint64 + wreject bool + }{ + {ents[0].Term, ents[0].Index, false}, + {ents[1].Term, ents[1].Index, false}, + {ents[2].Term, ents[2].Index, false}, + {ents[1].Term, ents[1].Index + 1, true}, + {ents[1].Term + 1, ents[1].Index, true}, + {3, 3, true}, + } + for i, tt := range tests { + r := newRaft(1, []uint64{1, 2, 3}, 10, 1) + r.loadEnts(ents) + r.loadState(pb.HardState{Commit: 2}) + r.becomeFollower(2, 2) + + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index}) + + msgs := r.readMessages() + wmsgs := []pb.Message{ + {From: 1, To: 2, Type: pb.MsgAppResp, Term: 2, Index: tt.index, Reject: tt.wreject}, + } + if !reflect.DeepEqual(msgs, wmsgs) { + t.Errorf("#%d: msgs = %+v, want %+v", i, msgs, wmsgs) + } + } +} + +// TestFollowerAppendEntries tests that when AppendEntries RPC is valid, +// the follower will delete the existing conflict entry and all that follow it, +// and append any new entries not already in the log. +// Also, it writes the new entry into stable storage. +// Reference: section 5.3 +func TestFollowerAppendEntries(t *testing.T) { + tests := []struct { + index, term uint64 + ents []pb.Entry + wents []pb.Entry + wunstable []pb.Entry + }{ + { + 2, 2, + []pb.Entry{{Term: 3, Index: 3}}, + []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}}, + []pb.Entry{{Term: 3, Index: 3}}, + }, + { + 1, 1, + []pb.Entry{{Term: 3, Index: 3}, {Term: 4, Index: 4}}, + []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 3, Index: 3}, {Term: 4, Index: 4}}, + []pb.Entry{{Term: 3, Index: 3}, {Term: 4, Index: 4}}, + }, + { + 0, 0, + []pb.Entry{{Term: 1, Index: 1}}, + []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}}, + nil, + }, + { + 0, 0, + []pb.Entry{{Term: 3, Index: 3}}, + []pb.Entry{{}, {Term: 3, Index: 3}}, + []pb.Entry{{Term: 3, Index: 3}}, + }, + } + for i, tt := range tests { + r := newRaft(1, []uint64{1, 2, 3}, 10, 1) + r.loadEnts([]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}}) + r.becomeFollower(2, 2) + + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents}) + + if g := r.raftLog.ents; !reflect.DeepEqual(g, tt.wents) { + t.Errorf("#%d: ents = %+v, want %+v", i, g, tt.wents) + } + if g := r.raftLog.unstableEnts(); !reflect.DeepEqual(g, tt.wunstable) { + t.Errorf("#%d: unstableEnts = %+v, want %+v", i, g, tt.wunstable) + } + } +} + +// TestLeaderSyncFollowerLog tests that the leader could bring a follower's log +// into consistency with its own. +// Reference: section 5.3, figure 7 +func TestLeaderSyncFollowerLog(t *testing.T) { + ents := []pb.Entry{ + {}, + {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}, + {Term: 4, Index: 4}, {Term: 4, Index: 5}, + {Term: 5, Index: 6}, {Term: 5, Index: 7}, + {Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10}, + } + term := uint64(8) + tests := [][]pb.Entry{ + { + {}, + {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}, + {Term: 4, Index: 4}, {Term: 4, Index: 5}, + {Term: 5, Index: 6}, {Term: 5, Index: 7}, + {Term: 6, Index: 8}, {Term: 6, Index: 9}, + }, + { + {}, + {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}, + {Term: 4, Index: 4}, + }, + { + {}, + {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}, + {Term: 4, Index: 4}, {Term: 4, Index: 5}, + {Term: 5, Index: 6}, {Term: 5, Index: 7}, + {Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10}, {Term: 6, Index: 11}, + }, + { + {}, + {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}, + {Term: 4, Index: 4}, {Term: 4, Index: 5}, + {Term: 5, Index: 6}, {Term: 5, Index: 7}, + {Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10}, + {Term: 7, Index: 11}, {Term: 7, Index: 12}, + }, + { + {}, + {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}, + {Term: 4, Index: 4}, {Term: 4, Index: 5}, {Term: 4, Index: 6}, {Term: 4, Index: 7}, + }, + { + {}, + {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}, + {Term: 2, Index: 4}, {Term: 2, Index: 5}, {Term: 2, Index: 6}, + {Term: 3, Index: 7}, {Term: 3, Index: 8}, {Term: 3, Index: 9}, {Term: 3, Index: 10}, {Term: 3, Index: 11}, + }, + } + for i, tt := range tests { + lead := newRaft(1, []uint64{1, 2, 3}, 10, 1) + lead.loadEnts(ents) + lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term}) + follower := newRaft(2, []uint64{1, 2, 3}, 10, 1) + follower.loadEnts(tt) + follower.loadState(pb.HardState{Term: term - 1}) + // It is necessary to have a three-node cluster. + // The second may have more up-to-date log than the first one, so the + // first node needs the vote from the third node to become the leader. + n := newNetwork(lead, follower, nopStepper) + n.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + n.send(pb.Message{From: 3, To: 1, Type: pb.MsgVoteResp, Term: 1}) + + n.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + + if g := diffu(ltoa(lead.raftLog), ltoa(follower.raftLog)); g != "" { + t.Errorf("#%d: log diff:\n%s", i, g) + } + } +} + type messageSlice []pb.Message func (s messageSlice) Len() int { return len(s) } func (s messageSlice) Less(i, j int) bool { return fmt.Sprint(s[i]) < fmt.Sprint(s[j]) } func (s messageSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func commitNoopEntry(r *raft) { + if r.state != StateLeader { + panic("it should only be used when it is the leader") + } + r.bcastAppend() + // simulate the response of MsgApp + msgs := r.readMessages() + for _, m := range msgs { + if m.Type != pb.MsgApp || len(m.Entries) != 1 || m.Entries[0].Data != nil { + panic("not a message to append noop entry") + } + r.Step(acceptAndReply(m)) + } + // ignore further messages to refresh followers' commmit index + r.readMessages() + r.raftLog.resetNextEnts() + r.raftLog.resetUnstable() +} + +func acceptAndReply(m pb.Message) pb.Message { + if m.Type != pb.MsgApp { + panic("type should be MsgApp") + } + return pb.Message{ + From: m.To, + To: m.From, + Term: m.Term, + Type: pb.MsgAppResp, + Index: m.Index + uint64(len(m.Entries)), + } +}