concurrency: use current revisions for election
Watching from the leader's ModRevision could cause live-locking on observe retry loops when the ModRevision is less than the compacted revision. Instead, start watching the leader from at least the store revision of the linearized read used to detect the current leader. Fixes #7815release-3.2
parent
6486be673b
commit
50f29bd661
|
@ -165,15 +165,14 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
|
|||
client := e.session.Client()
|
||||
|
||||
defer close(ch)
|
||||
lastRev := int64(0)
|
||||
for {
|
||||
opts := append(v3.WithFirstCreate(), v3.WithRev(lastRev))
|
||||
resp, err := client.Get(ctx, e.keyPrefix, opts...)
|
||||
resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var kv *mvccpb.KeyValue
|
||||
var hdr *pb.ResponseHeader
|
||||
|
||||
if len(resp.Kvs) == 0 {
|
||||
cctx, cancel := context.WithCancel(ctx)
|
||||
|
@ -189,18 +188,27 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
|
|||
// only accept PUTs; a DELETE will make observe() spin
|
||||
for _, ev := range wr.Events {
|
||||
if ev.Type == mvccpb.PUT {
|
||||
kv = ev.Kv
|
||||
hdr, kv = &wr.Header, ev.Kv
|
||||
// may have multiple revs; hdr.rev = the last rev
|
||||
// set to kv's rev in case batch has multiple PUTs
|
||||
hdr.Revision = kv.ModRevision
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
} else {
|
||||
kv = resp.Kvs[0]
|
||||
hdr, kv = resp.Header, resp.Kvs[0]
|
||||
}
|
||||
|
||||
select {
|
||||
case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
cctx, cancel := context.WithCancel(ctx)
|
||||
wch := client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision))
|
||||
wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
|
||||
keyDeleted := false
|
||||
for !keyDeleted {
|
||||
wr, ok := <-wch
|
||||
|
@ -209,7 +217,6 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
|
|||
}
|
||||
for _, ev := range wr.Events {
|
||||
if ev.Type == mvccpb.DELETE {
|
||||
lastRev = ev.Kv.ModRevision
|
||||
keyDeleted = true
|
||||
break
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue