diff --git a/raft/raft.go b/raft/raft.go index 77bef9697..414344cea 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -104,7 +104,8 @@ type stateMachine struct { state stateType - commit int + commit int + applied int votes map[int]bool @@ -180,11 +181,12 @@ func (sm *stateMachine) sendAppend() { m.Index = in.next - 1 m.LogTerm = sm.log[in.next-1].Term m.Entries = sm.log[in.next:] + m.Commit = sm.commit sm.send(m) } } -func (sm *stateMachine) theN() int { +func (sm *stateMachine) maybeCommit() bool { // TODO(bmizerany): optimize.. Currently naive mis := make([]int, len(sm.ins)) for i := range mis { @@ -192,18 +194,20 @@ func (sm *stateMachine) theN() int { } sort.Sort(sort.Reverse(sort.IntSlice(mis))) mci := mis[sm.q()-1] - if sm.log[mci].Term == sm.term { - return mci + + if mci > sm.commit && sm.log[mci].Term == sm.term { + sm.commit = mci + return true } - return -1 + return false } +// nextEnts returns the appliable entries and updates the applied index func (sm *stateMachine) nextEnts() (ents []Entry) { - ci := sm.theN() - if ci > sm.commit { - ents = sm.log[sm.commit+1 : ci] - sm.commit = ci + if sm.commit > sm.applied { + ents = sm.log[sm.applied+1 : sm.commit+1] + sm.applied = sm.commit } return ents } @@ -306,6 +310,7 @@ func (sm *stateMachine) Step(m Message) { handleAppendEntries := func() { if sm.isLogOk(m.Index, m.LogTerm) { + sm.commit = m.Commit sm.append(m.Index, m.Entries...) sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.li()}) } else { @@ -323,6 +328,9 @@ func (sm *stateMachine) Step(m Message) { sm.sendAppend() } else { in.update(m.Index) + if sm.maybeCommit() { + sm.sendAppend() + } } } case stateCandidate: diff --git a/raft/raft_test.go b/raft/raft_test.go index d0eb529dc..8586e81f1 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1,6 +1,7 @@ package raft import ( + "bytes" "fmt" "reflect" "testing" @@ -56,6 +57,63 @@ func TestLeaderElection(t *testing.T) { } } +func TestLogReplication(t *testing.T) { + tests := []struct { + *network + msgs []Message + wcommit int + }{ + { + newNetwork(nil, nil, nil), + []Message{ + Message{To: 0, Type: msgProp, Data: []byte("somedata")}, + }, + 1, + }, + { + newNetwork(nil, nil, nil), + []Message{ + Message{To: 0, Type: msgProp, Data: []byte("somedata")}, + Message{To: 1, Type: msgHup}, + Message{To: 1, Type: msgProp, Data: []byte("somedata")}, + }, + 2, + }, + } + + for i, tt := range tests { + tt.tee = stepperFunc(func(m Message) { + t.Logf("#%d: m = %+v", i, m) + }) + tt.Step(Message{To: 0, Type: msgHup}) + + for _, m := range tt.msgs { + tt.Step(m) + } + + for j, ism := range tt.ss { + sm := ism.(*nsm) + + if sm.commit != tt.wcommit { + t.Errorf("#%d.%d: commit = %d, want %d", i, j, sm.commit, tt.wcommit) + } + + ents := sm.nextEnts() + props := make([]Message, 0) + for _, m := range tt.msgs { + if m.Type == msgProp { + props = append(props, m) + } + } + for k, m := range props { + if !bytes.Equal(ents[k].Data, m.Data) { + t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Data) + } + } + } + } +} + func TestDualingCandidates(t *testing.T) { a := &nsm{stateMachine{log: defaultLog}, nil} c := &nsm{stateMachine{log: defaultLog}, nil} @@ -242,7 +300,7 @@ func TestProposalByProxy(t *testing.T) { } } -func TestTheN(t *testing.T) { +func TestCommit(t *testing.T) { tests := []struct { matches []int logs []Entry @@ -251,17 +309,17 @@ func TestTheN(t *testing.T) { }{ // odd {[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1}, - {[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, -1}, + {[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, {[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2}, - {[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, -1}, + {[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, // even {[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1}, - {[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, -1}, + {[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, {[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1}, - {[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, -1}, + {[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, {[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2}, - {[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, -1}, + {[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0}, } for i, tt := range tests { @@ -270,9 +328,9 @@ func TestTheN(t *testing.T) { ins[j] = &index{tt.matches[j], tt.matches[j] + 1} } sm := &stateMachine{log: tt.logs, ins: ins, k: len(ins), term: tt.smTerm} - g := sm.theN() - if g != tt.w { - t.Errorf("#%d: theN = %d, want %d", i, g, tt.w) + sm.maybeCommit() + if g := sm.commit; g != tt.w { + t.Errorf("#%d: commit = %d, want %d", i, g, tt.w) } } }