From a76681073d5784ea2f71e51b5628048b8929356e Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Thu, 14 Jun 2018 13:33:52 -0700 Subject: [PATCH] clientv3: add "zap.Config" to replace global logger Signed-off-by: Gyuho Lee --- clientv3/client.go | 28 ++++++++++++++++++++-------- clientv3/config.go | 24 ++++++++++++++++++++++++ clientv3/retry_interceptor.go | 11 +++++------ 3 files changed, 49 insertions(+), 14 deletions(-) diff --git a/clientv3/client.go b/clientv3/client.go index 43210f69b..8a0cc1cc4 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -46,15 +46,16 @@ var ( ErrOldCluster = errors.New("etcdclient: old cluster version") roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String()) - logger *zap.Logger ) func init() { - logger = zap.NewNop() // zap.NewExample() balancer.RegisterBuilder(balancer.Config{ Policy: picker.RoundrobinBalanced, Name: roundRobinBalancerName, - Logger: logger, + + // TODO: configure from clientv3.Config + Logger: zap.NewNop(), + // Logger: zap.NewExample(), }) } @@ -86,6 +87,8 @@ type Client struct { tokenCred *authTokenCredential callOpts []grpc.CallOption + + lg *zap.Logger } // New creates a new etcdv3 client from a given configuration. @@ -274,8 +277,8 @@ func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts [] opts = append(opts, // Disable stream retry by default since go-grpc-middleware/retry does not support client streams. // Streams that are safe to retry are enabled individually. - grpc.WithStreamInterceptor(c.streamClientInterceptor(logger, withMax(0), rrBackoff)), - grpc.WithUnaryInterceptor(c.unaryClientInterceptor(logger, withMax(defaultUnaryMaxRetries), rrBackoff)), + grpc.WithStreamInterceptor(c.streamClientInterceptor(c.lg, withMax(0), rrBackoff)), + grpc.WithUnaryInterceptor(c.unaryClientInterceptor(c.lg, withMax(defaultUnaryMaxRetries), rrBackoff)), ) return opts, nil @@ -410,6 +413,16 @@ func newClient(cfg *Config) (*Client, error) { callOpts: defaultCallOpts, } + lcfg := DefaultLogConfig + if cfg.LogConfig != nil { + lcfg = *cfg.LogConfig + } + var err error + client.lg, err = lcfg.Build() + if err != nil { + return nil, err + } + if cfg.Username != "" && cfg.Password != "" { client.Username = cfg.Username client.Password = cfg.Password @@ -434,7 +447,6 @@ func newClient(cfg *Config) (*Client, error) { // Prepare a 'endpoint:///' resolver for the client and create a endpoint target to pass // to dial so the client knows to use this resolver. - var err error client.resolverGroup, err = endpoint.NewResolverGroup(fmt.Sprintf("client-%s", strconv.FormatInt(time.Now().UnixNano(), 36))) if err != nil { client.cancel() @@ -485,10 +497,10 @@ func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFracti n := uint(len(c.Endpoints())) quorum := (n/2 + 1) if attempt%quorum == 0 { - logger.Info("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction)) + c.lg.Info("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction)) return backoffutils.JitterUp(waitBetween, jitterFraction) } - logger.Info("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum)) + c.lg.Info("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum)) return 0 } } diff --git a/clientv3/config.go b/clientv3/config.go index 79d6e2a98..c81d56466 100644 --- a/clientv3/config.go +++ b/clientv3/config.go @@ -19,6 +19,7 @@ import ( "crypto/tls" "time" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -72,4 +73,27 @@ type Config struct { // Context is the default client context; it can be used to cancel grpc dial out and // other operations that do not have an explicit context. Context context.Context + + // LogConfig configures client-side logger. + // If nil, use the default logger. + // TODO: configure balancer and gRPC logger + LogConfig *zap.Config +} + +// DefaultLogConfig is the default client logging configuration. +// Default log level is "Warn". Use "zap.InfoLevel" for debugging. +// Use "/dev/null" for output paths, to discard all logs. +var DefaultLogConfig = zap.Config{ + Level: zap.NewAtomicLevelAt(zap.WarnLevel), + Development: false, + Sampling: &zap.SamplingConfig{ + Initial: 100, + Thereafter: 100, + }, + Encoding: "json", + EncoderConfig: zap.NewProductionEncoderConfig(), + + // Use "/dev/null" to discard all + OutputPaths: []string{"stderr"}, + ErrorOutputPaths: []string{"stderr"}, } diff --git a/clientv3/retry_interceptor.go b/clientv3/retry_interceptor.go index 21e0add76..9fcec4291 100644 --- a/clientv3/retry_interceptor.go +++ b/clientv3/retry_interceptor.go @@ -70,7 +70,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt } continue } - if !isSafeRetry(lastErr, callOpts) { + if !isSafeRetry(c.lg, lastErr, callOpts) { return lastErr } } @@ -215,14 +215,13 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{} if s.callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken { gterr := s.client.getToken(s.ctx) if gterr != nil { - logger.Info("retry failed to fetch new auth token", zap.Error(gterr)) + s.client.lg.Info("retry failed to fetch new auth token", zap.Error(gterr)) return false, err // return the original error for simplicity } return true, err } - return isSafeRetry(err, s.callOpts), err - + return isSafeRetry(s.client.lg, err, s.callOpts), err } func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) { @@ -262,7 +261,7 @@ func waitRetryBackoff(attempt uint, ctx context.Context, callOpts *options) erro } // isSafeRetry returns "true", if request is safe for retry with the given error. -func isSafeRetry(err error, callOpts *options) bool { +func isSafeRetry(lg *zap.Logger, err error, callOpts *options) bool { if isContextError(err) { return false } @@ -272,7 +271,7 @@ func isSafeRetry(err error, callOpts *options) bool { case nonRepeatable: return isSafeRetryMutableRPC(err) default: - logger.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String())) + lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String())) return false } }