raft: remove quorum() dependency from readOnly
This now delegates the quorum computation to r.prs, which will allow it to generalize in a straightforward way when etcd-io/etcd#7625 is addressed.release-3.4
parent
57a1b39fcd
commit
02b0d80234
|
@ -320,6 +320,10 @@ func (p *prs) quorum() int {
|
||||||
return len(p.nodes)/2 + 1
|
return len(p.nodes)/2 + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *prs) hasQuorum(m map[uint64]struct{}) bool {
|
||||||
|
return len(m) >= p.quorum()
|
||||||
|
}
|
||||||
|
|
||||||
// committed returns the largest log index known to be committed based on what
|
// committed returns the largest log index known to be committed based on what
|
||||||
// the voting members of the group have acknowledged.
|
// the voting members of the group have acknowledged.
|
||||||
func (p *prs) committed() uint64 {
|
func (p *prs) committed() uint64 {
|
||||||
|
|
|
@ -1000,6 +1000,8 @@ func stepLeader(r *raft, m pb.Message) error {
|
||||||
switch r.readOnly.option {
|
switch r.readOnly.option {
|
||||||
case ReadOnlySafe:
|
case ReadOnlySafe:
|
||||||
r.readOnly.addRequest(r.raftLog.committed, m)
|
r.readOnly.addRequest(r.raftLog.committed, m)
|
||||||
|
// The local node automatically acks the request.
|
||||||
|
r.readOnly.recvAck(r.id, m.Entries[0].Data)
|
||||||
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
|
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
|
||||||
case ReadOnlyLeaseBased:
|
case ReadOnlyLeaseBased:
|
||||||
ri := r.raftLog.committed
|
ri := r.raftLog.committed
|
||||||
|
@ -1097,8 +1099,7 @@ func stepLeader(r *raft, m pb.Message) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ackCount := r.readOnly.recvAck(m)
|
if !r.prs.hasQuorum(r.readOnly.recvAck(m.From, m.Context)) {
|
||||||
if ackCount < r.prs.quorum() {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,26 +50,25 @@ func newReadOnly(option ReadOnlyOption) *readOnly {
|
||||||
// the read only request.
|
// the read only request.
|
||||||
// `m` is the original read only request message from the local or remote node.
|
// `m` is the original read only request message from the local or remote node.
|
||||||
func (ro *readOnly) addRequest(index uint64, m pb.Message) {
|
func (ro *readOnly) addRequest(index uint64, m pb.Message) {
|
||||||
ctx := string(m.Entries[0].Data)
|
s := string(m.Entries[0].Data)
|
||||||
if _, ok := ro.pendingReadIndex[ctx]; ok {
|
if _, ok := ro.pendingReadIndex[s]; ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
|
ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
|
||||||
ro.readIndexQueue = append(ro.readIndexQueue, ctx)
|
ro.readIndexQueue = append(ro.readIndexQueue, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
// recvAck notifies the readonly struct that the raft state machine received
|
// recvAck notifies the readonly struct that the raft state machine received
|
||||||
// an acknowledgment of the heartbeat that attached with the read only request
|
// an acknowledgment of the heartbeat that attached with the read only request
|
||||||
// context.
|
// context.
|
||||||
func (ro *readOnly) recvAck(m pb.Message) int {
|
func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]struct{} {
|
||||||
rs, ok := ro.pendingReadIndex[string(m.Context)]
|
rs, ok := ro.pendingReadIndex[string(context)]
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
rs.acks[m.From] = struct{}{}
|
rs.acks[id] = struct{}{}
|
||||||
// add one to include an ack from local node
|
return rs.acks
|
||||||
return len(rs.acks) + 1
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// advance advances the read only request queue kept by the readonly struct.
|
// advance advances the read only request queue kept by the readonly struct.
|
||||||
|
|
Loading…
Reference in New Issue