raft: ensure CheckQuorum is enabled when readonlyoption is lease based

release-3.3
Xiang 2017-09-17 10:46:12 -07:00
parent 085adc5b8b
commit 9801fd7297
2 changed files with 6 additions and 28 deletions

View File

@ -171,6 +171,7 @@ type Config struct {
// If the clock drift is unbounded, leader might keep the lease longer than it
// should (clock can move backward/pause without any bound). ReadIndex is not safe
// in that case.
// CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased.
ReadOnlyOption ReadOnlyOption
// Logger is the logger used for raft log. For multinode which can host
@ -213,6 +214,10 @@ func (c *Config) validate() error {
c.Logger = raftLogger
}
if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum {
return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased")
}
return nil
}
@ -870,10 +875,7 @@ func stepLeader(r *raft, m pb.Message) {
r.readOnly.addRequest(r.raftLog.committed, m)
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
var ri uint64
if r.checkQuorum {
ri = r.raftLog.committed
}
ri := r.raftLog.committed
if m.From == None || m.From == r.id { // from local member
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
} else {

View File

@ -1909,30 +1909,6 @@ func TestReadOnlyOptionLease(t *testing.T) {
}
}
func TestReadOnlyOptionLeaseWithoutCheckQuorum(t *testing.T) {
a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
a.readOnly.option = ReadOnlyLeaseBased
b.readOnly.option = ReadOnlyLeaseBased
c.readOnly.option = ReadOnlyLeaseBased
nt := newNetwork(a, b, c)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
ctx := []byte("ctx1")
nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}})
rs := b.readStates[0]
if rs.Index != None {
t.Errorf("readIndex = %d, want %d", rs.Index, None)
}
if !bytes.Equal(rs.RequestCtx, ctx) {
t.Errorf("requestCtx = %v, want %v", rs.RequestCtx, ctx)
}
}
// TestReadOnlyForNewLeader ensures that a leader only accepts MsgReadIndex message
// when it commits at least one log entry at it term.
func TestReadOnlyForNewLeader(t *testing.T) {