clientv3: add "zap.Config" to replace global logger
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>release-3.4
parent
08da08bb19
commit
a76681073d
|
@ -46,15 +46,16 @@ var (
|
|||
ErrOldCluster = errors.New("etcdclient: old cluster version")
|
||||
|
||||
roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String())
|
||||
logger *zap.Logger
|
||||
)
|
||||
|
||||
func init() {
|
||||
logger = zap.NewNop() // zap.NewExample()
|
||||
balancer.RegisterBuilder(balancer.Config{
|
||||
Policy: picker.RoundrobinBalanced,
|
||||
Name: roundRobinBalancerName,
|
||||
Logger: logger,
|
||||
|
||||
// TODO: configure from clientv3.Config
|
||||
Logger: zap.NewNop(),
|
||||
// Logger: zap.NewExample(),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -86,6 +87,8 @@ type Client struct {
|
|||
tokenCred *authTokenCredential
|
||||
|
||||
callOpts []grpc.CallOption
|
||||
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
// New creates a new etcdv3 client from a given configuration.
|
||||
|
@ -274,8 +277,8 @@ func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts []
|
|||
opts = append(opts,
|
||||
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
|
||||
// Streams that are safe to retry are enabled individually.
|
||||
grpc.WithStreamInterceptor(c.streamClientInterceptor(logger, withMax(0), rrBackoff)),
|
||||
grpc.WithUnaryInterceptor(c.unaryClientInterceptor(logger, withMax(defaultUnaryMaxRetries), rrBackoff)),
|
||||
grpc.WithStreamInterceptor(c.streamClientInterceptor(c.lg, withMax(0), rrBackoff)),
|
||||
grpc.WithUnaryInterceptor(c.unaryClientInterceptor(c.lg, withMax(defaultUnaryMaxRetries), rrBackoff)),
|
||||
)
|
||||
|
||||
return opts, nil
|
||||
|
@ -410,6 +413,16 @@ func newClient(cfg *Config) (*Client, error) {
|
|||
callOpts: defaultCallOpts,
|
||||
}
|
||||
|
||||
lcfg := DefaultLogConfig
|
||||
if cfg.LogConfig != nil {
|
||||
lcfg = *cfg.LogConfig
|
||||
}
|
||||
var err error
|
||||
client.lg, err = lcfg.Build()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cfg.Username != "" && cfg.Password != "" {
|
||||
client.Username = cfg.Username
|
||||
client.Password = cfg.Password
|
||||
|
@ -434,7 +447,6 @@ func newClient(cfg *Config) (*Client, error) {
|
|||
|
||||
// Prepare a 'endpoint://<unique-client-id>/' resolver for the client and create a endpoint target to pass
|
||||
// to dial so the client knows to use this resolver.
|
||||
var err error
|
||||
client.resolverGroup, err = endpoint.NewResolverGroup(fmt.Sprintf("client-%s", strconv.FormatInt(time.Now().UnixNano(), 36)))
|
||||
if err != nil {
|
||||
client.cancel()
|
||||
|
@ -485,10 +497,10 @@ func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFracti
|
|||
n := uint(len(c.Endpoints()))
|
||||
quorum := (n/2 + 1)
|
||||
if attempt%quorum == 0 {
|
||||
logger.Info("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
|
||||
c.lg.Info("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
|
||||
return backoffutils.JitterUp(waitBetween, jitterFraction)
|
||||
}
|
||||
logger.Info("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
|
||||
c.lg.Info("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"crypto/tls"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
|
@ -72,4 +73,27 @@ type Config struct {
|
|||
// Context is the default client context; it can be used to cancel grpc dial out and
|
||||
// other operations that do not have an explicit context.
|
||||
Context context.Context
|
||||
|
||||
// LogConfig configures client-side logger.
|
||||
// If nil, use the default logger.
|
||||
// TODO: configure balancer and gRPC logger
|
||||
LogConfig *zap.Config
|
||||
}
|
||||
|
||||
// DefaultLogConfig is the default client logging configuration.
|
||||
// Default log level is "Warn". Use "zap.InfoLevel" for debugging.
|
||||
// Use "/dev/null" for output paths, to discard all logs.
|
||||
var DefaultLogConfig = zap.Config{
|
||||
Level: zap.NewAtomicLevelAt(zap.WarnLevel),
|
||||
Development: false,
|
||||
Sampling: &zap.SamplingConfig{
|
||||
Initial: 100,
|
||||
Thereafter: 100,
|
||||
},
|
||||
Encoding: "json",
|
||||
EncoderConfig: zap.NewProductionEncoderConfig(),
|
||||
|
||||
// Use "/dev/null" to discard all
|
||||
OutputPaths: []string{"stderr"},
|
||||
ErrorOutputPaths: []string{"stderr"},
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
|
|||
}
|
||||
continue
|
||||
}
|
||||
if !isSafeRetry(lastErr, callOpts) {
|
||||
if !isSafeRetry(c.lg, lastErr, callOpts) {
|
||||
return lastErr
|
||||
}
|
||||
}
|
||||
|
@ -215,14 +215,13 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}
|
|||
if s.callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
|
||||
gterr := s.client.getToken(s.ctx)
|
||||
if gterr != nil {
|
||||
logger.Info("retry failed to fetch new auth token", zap.Error(gterr))
|
||||
s.client.lg.Info("retry failed to fetch new auth token", zap.Error(gterr))
|
||||
return false, err // return the original error for simplicity
|
||||
}
|
||||
return true, err
|
||||
|
||||
}
|
||||
return isSafeRetry(err, s.callOpts), err
|
||||
|
||||
return isSafeRetry(s.client.lg, err, s.callOpts), err
|
||||
}
|
||||
|
||||
func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) {
|
||||
|
@ -262,7 +261,7 @@ func waitRetryBackoff(attempt uint, ctx context.Context, callOpts *options) erro
|
|||
}
|
||||
|
||||
// isSafeRetry returns "true", if request is safe for retry with the given error.
|
||||
func isSafeRetry(err error, callOpts *options) bool {
|
||||
func isSafeRetry(lg *zap.Logger, err error, callOpts *options) bool {
|
||||
if isContextError(err) {
|
||||
return false
|
||||
}
|
||||
|
@ -272,7 +271,7 @@ func isSafeRetry(err error, callOpts *options) bool {
|
|||
case nonRepeatable:
|
||||
return isSafeRetryMutableRPC(err)
|
||||
default:
|
||||
logger.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String()))
|
||||
lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String()))
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue