From ab03a42f069f9bfa1313409ac546484bc2130d64 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Mon, 13 Feb 2017 15:09:54 -0500 Subject: [PATCH] raft: add Ready.MustSync Add Ready.MustSync which indicates that the hard state and raft log entries in a Ready message must be synchronously written to persistent storage. --- raft/node.go | 16 ++++++++++++++++ raft/node_test.go | 4 ++++ raft/rawnode_test.go | 4 ++++ wal/wal.go | 11 +---------- 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/raft/node.go b/raft/node.go index c8410fdc7..5da1c1193 100644 --- a/raft/node.go +++ b/raft/node.go @@ -83,6 +83,10 @@ type Ready struct { // If it contains a MsgSnap message, the application MUST report back to raft // when the snapshot has been received or has failed by calling ReportSnapshot. Messages []pb.Message + + // MustSync indicates whether the HardState and Entries must be synchronously + // written to disk or if an asynchronous write is permissible. + MustSync bool } func isHardStateEqual(a, b pb.HardState) bool { @@ -517,5 +521,17 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { if len(r.readStates) != 0 { rd.ReadStates = r.readStates } + rd.MustSync = MustSync(rd.HardState, prevHardSt, len(rd.Entries)) return rd } + +// MustSync returns true if the hard state and count of Raft entries indicate +// that a synchronous write to persistent storage is required. +func MustSync(st, prevst pb.HardState, entsnum int) bool { + // Persistent state on all servers: + // (Updated on stable storage before responding to RPCs) + // currentTerm + // votedFor + // log entries[] + return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term +} diff --git a/raft/node_test.go b/raft/node_test.go index 94414d9e1..ec3c74515 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -487,11 +487,13 @@ func TestNodeStart(t *testing.T) { CommittedEntries: []raftpb.Entry{ {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, }, + MustSync: true, }, { HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1}, Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, + MustSync: true, }, } storage := NewMemoryStorage() @@ -544,6 +546,7 @@ func TestNodeRestart(t *testing.T) { HardState: st, // commit up to index commit index in st CommittedEntries: entries[:st.Commit], + MustSync: true, } storage := NewMemoryStorage() @@ -588,6 +591,7 @@ func TestNodeRestartFromSnapshot(t *testing.T) { HardState: st, // commit up to index commit index in st CommittedEntries: entries, + MustSync: true, } s := NewMemoryStorage() diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index b978ad2da..4ccf72de4 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -273,11 +273,13 @@ func TestRawNodeStart(t *testing.T) { CommittedEntries: []raftpb.Entry{ {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, }, + MustSync: true, }, { HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1}, Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, + MustSync: true, }, } @@ -326,6 +328,7 @@ func TestRawNodeRestart(t *testing.T) { HardState: emptyState, // commit up to commit index in st CommittedEntries: entries[:st.Commit], + MustSync: true, } storage := NewMemoryStorage() @@ -362,6 +365,7 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) { HardState: emptyState, // commit up to commit index in st CommittedEntries: entries, + MustSync: true, } s := NewMemoryStorage() diff --git a/wal/wal.go b/wal/wal.go index 69ed6b239..80511b3e9 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -552,7 +552,7 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { return nil } - mustSync := mustSync(st, w.state, len(ents)) + mustSync := raft.MustSync(st, w.state, len(ents)) // TODO(xiangli): no more reference operator for i := range ents { @@ -618,15 +618,6 @@ func (w *WAL) seq() uint64 { return seq } -func mustSync(st, prevst raftpb.HardState, entsnum int) bool { - // Persistent state on all servers: - // (Updated on stable storage before responding to RPCs) - // currentTerm - // votedFor - // log entries[] - return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term -} - func closeAll(rcs ...io.ReadCloser) error { for _, f := range rcs { if err := f.Close(); err != nil {