raft: add Ready.MustSync

Add Ready.MustSync which indicates that the hard state and raft log
entries in a Ready message must be synchronously written to persistent
storage.
release-3.2
Peter Mattis 2017-02-13 15:09:54 -05:00
parent 2925f02aac
commit ab03a42f06
4 changed files with 25 additions and 10 deletions

View File

@ -83,6 +83,10 @@ type Ready struct {
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}
func isHardStateEqual(a, b pb.HardState) bool {
@ -517,5 +521,17 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
if len(r.readStates) != 0 {
rd.ReadStates = r.readStates
}
rd.MustSync = MustSync(rd.HardState, prevHardSt, len(rd.Entries))
return rd
}
// MustSync returns true if the hard state and count of Raft entries indicate
// that a synchronous write to persistent storage is required.
func MustSync(st, prevst pb.HardState, entsnum int) bool {
// Persistent state on all servers:
// (Updated on stable storage before responding to RPCs)
// currentTerm
// votedFor
// log entries[]
return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
}

View File

@ -487,11 +487,13 @@ func TestNodeStart(t *testing.T) {
CommittedEntries: []raftpb.Entry{
{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
},
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
MustSync: true,
},
}
storage := NewMemoryStorage()
@ -544,6 +546,7 @@ func TestNodeRestart(t *testing.T) {
HardState: st,
// commit up to index commit index in st
CommittedEntries: entries[:st.Commit],
MustSync: true,
}
storage := NewMemoryStorage()
@ -588,6 +591,7 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
HardState: st,
// commit up to index commit index in st
CommittedEntries: entries,
MustSync: true,
}
s := NewMemoryStorage()

View File

@ -273,11 +273,13 @@ func TestRawNodeStart(t *testing.T) {
CommittedEntries: []raftpb.Entry{
{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
},
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
MustSync: true,
},
}
@ -326,6 +328,7 @@ func TestRawNodeRestart(t *testing.T) {
HardState: emptyState,
// commit up to commit index in st
CommittedEntries: entries[:st.Commit],
MustSync: true,
}
storage := NewMemoryStorage()
@ -362,6 +365,7 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) {
HardState: emptyState,
// commit up to commit index in st
CommittedEntries: entries,
MustSync: true,
}
s := NewMemoryStorage()

View File

@ -552,7 +552,7 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
return nil
}
mustSync := mustSync(st, w.state, len(ents))
mustSync := raft.MustSync(st, w.state, len(ents))
// TODO(xiangli): no more reference operator
for i := range ents {
@ -618,15 +618,6 @@ func (w *WAL) seq() uint64 {
return seq
}
func mustSync(st, prevst raftpb.HardState, entsnum int) bool {
// Persistent state on all servers:
// (Updated on stable storage before responding to RPCs)
// currentTerm
// votedFor
// log entries[]
return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
}
func closeAll(rcs ...io.ReadCloser) error {
for _, f := range rcs {
if err := f.Close(); err != nil {