clientv3: backoff on reestablishing watches when Unavailable errors are encountered

release-3.4
Jordan Liggitt 2018-06-13 00:54:22 -04:00
parent 88acced1cd
commit d1579c95a2
No known key found for this signature in database
GPG Key ID: 39928704103C7229
2 changed files with 28 additions and 0 deletions

View File

@ -527,6 +527,20 @@ func isHaltErr(ctx context.Context, err error) bool {
return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal
}
// isUnavailableErr returns true if the given error is an unavailable error
func isUnavailableErr(ctx context.Context, err error) bool {
if ctx != nil && ctx.Err() != nil {
return false
}
if err == nil {
return false
}
ev, _ := status.FromError(err)
// Unavailable codes mean the system will be right back.
// (e.g., can't connect, lost leader)
return ev.Code() == codes.Unavailable
}
func toErr(ctx context.Context, err error) error {
if err == nil {
return nil

View File

@ -830,10 +830,13 @@ func (w *watchGrpcStream) joinSubstreams() {
}
}
var maxBackoff = 100 * time.Millisecond
// openWatchClient retries opening a watch client until success or halt.
// manually retry in case "ws==nil && err==nil"
// TODO: remove FailFast=false
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
backoff := time.Millisecond
for {
select {
case <-w.ctx.Done():
@ -849,6 +852,17 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
if isHaltErr(w.ctx, err) {
return nil, v3rpc.Error(err)
}
if isUnavailableErr(w.ctx, err) {
// retry, but backoff
if backoff < maxBackoff {
// 25% backoff factor
backoff = backoff + backoff/4
if backoff > maxBackoff {
backoff = maxBackoff
}
}
time.Sleep(backoff)
}
}
return ws, nil
}