From 54b4f52e488b4f5e5bff10a1b9e5f14eec4e12f2 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 1 Aug 2014 21:43:08 -0700 Subject: [PATCH] raft: add index to entry --- etcd/participant.go | 2 +- raft/log.go | 12 ++++++------ raft/log_test.go | 23 +++++++++-------------- raft/node.go | 2 +- raft/raft.go | 1 + raft/raft_test.go | 12 ++++++------ 6 files changed, 24 insertions(+), 28 deletions(-) diff --git a/etcd/participant.go b/etcd/participant.go index 98c3fe6a4..f41bfe9a6 100644 --- a/etcd/participant.go +++ b/etcd/participant.go @@ -201,7 +201,7 @@ func (p *participant) run() int64 { return stopMode } p.apply(node.Next()) - _, ents := node.UnstableEnts() + ents := node.UnstableEnts() p.save(ents, node.UnstableState()) p.send(node.Msgs()) if node.IsRemoved() { diff --git a/raft/log.go b/raft/log.go index 0ed5149ef..02c492eee 100644 --- a/raft/log.go +++ b/raft/log.go @@ -15,9 +15,10 @@ const ( ) type Entry struct { - Type int64 - Term int64 - Data []byte + Type int64 + Term int64 + Index int64 + Data []byte } func (e *Entry) isConfig() bool { @@ -88,11 +89,10 @@ func (l *raftLog) findConflict(from int64, ents []Entry) int64 { return -1 } -func (l *raftLog) unstableEnts() (int64, []Entry) { - offset := l.unstable +func (l *raftLog) unstableEnts() []Entry { ents := l.entries(l.unstable) l.unstable = l.lastIndex() + 1 - return offset, ents + return ents } func (l *raftLog) lastIndex() int64 { diff --git a/raft/log_test.go b/raft/log_test.go index 0fb12d106..229956022 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -77,7 +77,7 @@ func TestCompactionSideEffects(t *testing.T) { raftLog := newLog() for i = 0; i < lastIndex; i++ { - raftLog.append(int64(i), Entry{Term: int64(i + 1)}) + raftLog.append(int64(i), Entry{Term: int64(i + 1), Index: int64(i + 1)}) } raftLog.compact(500) @@ -98,13 +98,13 @@ func TestCompactionSideEffects(t *testing.T) { } } - offset, unstableEnts := raftLog.unstableEnts() - if offset != 501 { - t.Errorf("offset(unstableEntries) = %d, want = %d", offset, 500) - } + unstableEnts := raftLog.unstableEnts() if g := len(unstableEnts); g != 500 { t.Errorf("len(unstableEntries) = %d, want = %d", g, 500) } + if unstableEnts[0].Index != 501 { + t.Errorf("Index = %d, want = %d", unstableEnts[0].Index, 501) + } prev := raftLog.lastIndex() raftLog.append(raftLog.lastIndex(), Entry{Term: raftLog.lastIndex() + 1}) @@ -119,25 +119,21 @@ func TestCompactionSideEffects(t *testing.T) { } func TestUnstableEnts(t *testing.T) { - previousEnts := []Entry{{Term: 1}, {Term: 2}} + previousEnts := []Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}} tests := []struct { unstable int64 - woffset int64 wents []Entry wunstable int64 }{ - {3, 3, nil, 3}, - {1, 1, []Entry{{Term: 1}, {Term: 2}}, 3}, + {3, nil, 3}, + {1, previousEnts, 3}, } for i, tt := range tests { raftLog := newLog() raftLog.ents = append(raftLog.ents, previousEnts...) raftLog.unstable = tt.unstable - offset, ents := raftLog.unstableEnts() - if offset != tt.woffset { - t.Errorf("#%d: offset = %d, want = %d", i, offset, tt.woffset) - } + ents := raftLog.unstableEnts() if !reflect.DeepEqual(ents, tt.wents) { t.Errorf("#%d: unstableEnts = %+v, want %+v", i, ents, tt.wents) } @@ -145,7 +141,6 @@ func TestUnstableEnts(t *testing.T) { t.Errorf("#%d: unstable = %d, want %d", i, g, tt.wunstable) } } - } //TestCompaction ensures that the number of log entreis is correct after compactions. diff --git a/raft/node.go b/raft/node.go index abc53270c..361cfa154 100644 --- a/raft/node.go +++ b/raft/node.go @@ -219,7 +219,7 @@ func (n *Node) UpdateConf(t int64, c *Config) { // UnstableEnts retuens all the entries that need to be persistent. // The first return value is offset, and the second one is unstable entries. -func (n *Node) UnstableEnts() (int64, []Entry) { +func (n *Node) UnstableEnts() []Entry { return n.sm.raftLog.unstableEnts() } diff --git a/raft/raft.go b/raft/raft.go index 08ecb1bc3..b147913e3 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -301,6 +301,7 @@ func (sm *stateMachine) q() int { func (sm *stateMachine) appendEntry(e Entry) { e.Term = sm.term.Get() + e.Index = sm.raftLog.lastIndex() + 1 sm.index.Set(sm.raftLog.append(sm.raftLog.lastIndex(), e)) sm.ins[sm.id].update(sm.raftLog.lastIndex()) sm.maybeCommit() diff --git a/raft/raft_test.go b/raft/raft_test.go index f60947a38..f11c3ccaa 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -210,7 +210,7 @@ func TestDuelingCandidates(t *testing.T) { nt.recover() nt.send(Message{From: 2, To: 2, Type: msgHup}) - wlog := &raftLog{ents: []Entry{{}, Entry{Type: Normal, Data: nil, Term: 1}}, committed: 1} + wlog := &raftLog{ents: []Entry{{}, Entry{Type: Normal, Data: nil, Term: 1, Index: 1}}, committed: 1} tests := []struct { sm *stateMachine state stateType @@ -262,7 +262,7 @@ func TestCandidateConcede(t *testing.T) { if g := a.term; g != 1 { t.Errorf("term = %d, want %d", g, 1) } - wantLog := ltoa(&raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1}, {Term: 1, Data: data}}, committed: 2}) + wantLog := ltoa(&raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2}) for i, p := range tt.peers { if sm, ok := p.(*stateMachine); ok { l := ltoa(sm.raftLog) @@ -296,8 +296,8 @@ func TestOldMessages(t *testing.T) { l := &raftLog{ ents: []Entry{ - {}, {Type: Normal, Data: nil, Term: 1}, - {Type: Normal, Data: nil, Term: 2}, {Type: Normal, Data: nil, Term: 3}, + {}, {Type: Normal, Data: nil, Term: 1, Index: 1}, + {Type: Normal, Data: nil, Term: 2, Index: 2}, {Type: Normal, Data: nil, Term: 3, Index: 3}, }, committed: 3, } @@ -351,7 +351,7 @@ func TestProposal(t *testing.T) { wantLog := newLog() if tt.success { - wantLog = &raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1}, {Term: 1, Data: data}}, committed: 2} + wantLog = &raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2} } base := ltoa(wantLog) for i, p := range tt.peers { @@ -385,7 +385,7 @@ func TestProposalByProxy(t *testing.T) { // propose via follower tt.send(Message{From: 1, To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}) - wantLog := &raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1}, {Term: 1, Data: data}}, committed: 2} + wantLog := &raftLog{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2} base := ltoa(wantLog) for i, p := range tt.peers { if sm, ok := p.(*stateMachine); ok {