From 02b0d8023421616bf77732a0e706662017fe063d Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 26 Apr 2019 15:56:39 +0200 Subject: [PATCH] raft: remove quorum() dependency from readOnly This now delegates the quorum computation to r.prs, which will allow it to generalize in a straightforward way when etcd-io/etcd#7625 is addressed. --- raft/progress.go | 4 ++++ raft/raft.go | 5 +++-- raft/read_only.go | 19 +++++++++---------- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/raft/progress.go b/raft/progress.go index 2833730c6..cd48c8277 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -320,6 +320,10 @@ func (p *prs) quorum() int { return len(p.nodes)/2 + 1 } +func (p *prs) hasQuorum(m map[uint64]struct{}) bool { + return len(m) >= p.quorum() +} + // committed returns the largest log index known to be committed based on what // the voting members of the group have acknowledged. func (p *prs) committed() uint64 { diff --git a/raft/raft.go b/raft/raft.go index 4c630dfd8..5bf372f18 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -1000,6 +1000,8 @@ func stepLeader(r *raft, m pb.Message) error { switch r.readOnly.option { case ReadOnlySafe: r.readOnly.addRequest(r.raftLog.committed, m) + // The local node automatically acks the request. + r.readOnly.recvAck(r.id, m.Entries[0].Data) r.bcastHeartbeatWithCtx(m.Entries[0].Data) case ReadOnlyLeaseBased: ri := r.raftLog.committed @@ -1097,8 +1099,7 @@ func stepLeader(r *raft, m pb.Message) error { return nil } - ackCount := r.readOnly.recvAck(m) - if ackCount < r.prs.quorum() { + if !r.prs.hasQuorum(r.readOnly.recvAck(m.From, m.Context)) { return nil } diff --git a/raft/read_only.go b/raft/read_only.go index 380533831..955fe7986 100644 --- a/raft/read_only.go +++ b/raft/read_only.go @@ -50,26 +50,25 @@ func newReadOnly(option ReadOnlyOption) *readOnly { // the read only request. // `m` is the original read only request message from the local or remote node. func (ro *readOnly) addRequest(index uint64, m pb.Message) { - ctx := string(m.Entries[0].Data) - if _, ok := ro.pendingReadIndex[ctx]; ok { + s := string(m.Entries[0].Data) + if _, ok := ro.pendingReadIndex[s]; ok { return } - ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})} - ro.readIndexQueue = append(ro.readIndexQueue, ctx) + ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})} + ro.readIndexQueue = append(ro.readIndexQueue, s) } // recvAck notifies the readonly struct that the raft state machine received // an acknowledgment of the heartbeat that attached with the read only request // context. -func (ro *readOnly) recvAck(m pb.Message) int { - rs, ok := ro.pendingReadIndex[string(m.Context)] +func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]struct{} { + rs, ok := ro.pendingReadIndex[string(context)] if !ok { - return 0 + return nil } - rs.acks[m.From] = struct{}{} - // add one to include an ack from local node - return len(rs.acks) + 1 + rs.acks[id] = struct{}{} + return rs.acks } // advance advances the read only request queue kept by the readonly struct.