From f269c42aad6bd4b98b6f612a7113a6f1a17feed0 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Thu, 26 Apr 2018 14:21:15 -0700 Subject: [PATCH 1/5] compactor: support structured logger Signed-off-by: Gyuho Lee --- compactor/compactor.go | 15 ++++- compactor/periodic.go | 124 ++++++++++++++++++++++--------------- compactor/periodic_test.go | 7 ++- compactor/revision.go | 90 +++++++++++++++++---------- compactor/revision_test.go | 5 +- 5 files changed, 153 insertions(+), 88 deletions(-) diff --git a/compactor/compactor.go b/compactor/compactor.go index 8100b6938..8faf8d300 100644 --- a/compactor/compactor.go +++ b/compactor/compactor.go @@ -22,6 +22,8 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/pkg/capnslog" + "github.com/jonboulle/clockwork" + "go.uber.org/zap" ) var ( @@ -54,12 +56,19 @@ type RevGetter interface { Rev() int64 } -func New(mode string, retention time.Duration, rg RevGetter, c Compactable) (Compactor, error) { +// New returns a new Compactor based on given "mode". +func New( + lg *zap.Logger, + mode string, + retention time.Duration, + rg RevGetter, + c Compactable, +) (Compactor, error) { switch mode { case ModePeriodic: - return NewPeriodic(retention, rg, c), nil + return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil case ModeRevision: - return NewRevision(int64(retention), rg, c), nil + return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil default: return nil, fmt.Errorf("unsupported compaction mode %s", mode) } diff --git a/compactor/periodic.go b/compactor/periodic.go index 9d9164e9c..466cda4bd 100644 --- a/compactor/periodic.go +++ b/compactor/periodic.go @@ -23,11 +23,13 @@ import ( "github.com/coreos/etcd/mvcc" "github.com/jonboulle/clockwork" + "go.uber.org/zap" ) // Periodic compacts the log by purging revisions older than // the configured retention time. type Periodic struct { + lg *zap.Logger clock clockwork.Clock period time.Duration @@ -43,22 +45,19 @@ type Periodic struct { paused bool } -// NewPeriodic creates a new instance of Periodic compactor that purges +// newPeriodic creates a new instance of Periodic compactor that purges // the log older than h Duration. -func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic { - return newPeriodic(clockwork.NewRealClock(), h, rg, c) -} - -func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic { - t := &Periodic{ +func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic { + pc := &Periodic{ + lg: lg, clock: clock, period: h, rg: rg, c: c, revs: make([]int64, 0), } - t.ctx, t.cancel = context.WithCancel(context.Background()) - return t + pc.ctx, pc.cancel = context.WithCancel(context.Background()) + return pc } /* @@ -96,50 +95,77 @@ Compaction period 5-sec: */ // Run runs periodic compactor. -func (t *Periodic) Run() { - compactInterval := t.getCompactInterval() - retryInterval := t.getRetryInterval() - retentions := t.getRetentions() +func (pc *Periodic) Run() { + compactInterval := pc.getCompactInterval() + retryInterval := pc.getRetryInterval() + retentions := pc.getRetentions() go func() { - lastSuccess := t.clock.Now() - baseInterval := t.period + lastSuccess := pc.clock.Now() + baseInterval := pc.period for { - t.revs = append(t.revs, t.rg.Rev()) - if len(t.revs) > retentions { - t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago + pc.revs = append(pc.revs, pc.rg.Rev()) + if len(pc.revs) > retentions { + pc.revs = pc.revs[1:] // pc.revs[0] is always the rev at pc.period ago } select { - case <-t.ctx.Done(): + case <-pc.ctx.Done(): return - case <-t.clock.After(retryInterval): - t.mu.Lock() - p := t.paused - t.mu.Unlock() + case <-pc.clock.After(retryInterval): + pc.mu.Lock() + p := pc.paused + pc.mu.Unlock() if p { continue } } - if t.clock.Now().Sub(lastSuccess) < baseInterval { + if pc.clock.Now().Sub(lastSuccess) < baseInterval { continue } // wait up to initial given period - if baseInterval == t.period { + if baseInterval == pc.period { baseInterval = compactInterval } - rev := t.revs[0] + rev := pc.revs[0] - plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period) - _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) - if err == nil || err == mvcc.ErrCompacted { - lastSuccess = t.clock.Now() - plog.Noticef("Finished auto-compaction at revision %d", rev) + if pc.lg != nil { + pc.lg.Info( + "starting auto periodic compaction", + zap.Int64("revision", rev), + zap.Duration("compact-period", pc.period), + ) } else { - plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) - plog.Noticef("Retry after %v", retryInterval) + plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, pc.period) + } + _, err := pc.c.Compact(pc.ctx, &pb.CompactionRequest{Revision: rev}) + if err == nil || err == mvcc.ErrCompacted { + if pc.lg != nil { + pc.lg.Info( + "completed auto periodic compaction", + zap.Int64("revision", rev), + zap.Duration("compact-period", pc.period), + zap.Duration("took", time.Since(lastSuccess)), + ) + } else { + plog.Noticef("Finished auto-compaction at revision %d", rev) + } + lastSuccess = pc.clock.Now() + } else { + if pc.lg != nil { + pc.lg.Warn( + "failed auto periodic compaction", + zap.Int64("revision", rev), + zap.Duration("compact-period", pc.period), + zap.Duration("retry-interval", retryInterval), + zap.Error(err), + ) + } else { + plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) + plog.Noticef("Retry after %v", retryInterval) + } } } }() @@ -149,22 +175,22 @@ func (t *Periodic) Run() { // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute) // if given compaction period x is >1-hour, compact every hour. // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour) -func (t *Periodic) getCompactInterval() time.Duration { - itv := t.period +func (pc *Periodic) getCompactInterval() time.Duration { + itv := pc.period if itv > time.Hour { itv = time.Hour } return itv } -func (t *Periodic) getRetentions() int { - return int(t.period/t.getRetryInterval()) + 1 +func (pc *Periodic) getRetentions() int { + return int(pc.period/pc.getRetryInterval()) + 1 } const retryDivisor = 10 -func (t *Periodic) getRetryInterval() time.Duration { - itv := t.period +func (pc *Periodic) getRetryInterval() time.Duration { + itv := pc.period if itv > time.Hour { itv = time.Hour } @@ -172,20 +198,20 @@ func (t *Periodic) getRetryInterval() time.Duration { } // Stop stops periodic compactor. -func (t *Periodic) Stop() { - t.cancel() +func (pc *Periodic) Stop() { + pc.cancel() } // Pause pauses periodic compactor. -func (t *Periodic) Pause() { - t.mu.Lock() - defer t.mu.Unlock() - t.paused = true +func (pc *Periodic) Pause() { + pc.mu.Lock() + pc.paused = true + pc.mu.Unlock() } // Resume resumes periodic compactor. -func (t *Periodic) Resume() { - t.mu.Lock() - defer t.mu.Unlock() - t.paused = false +func (pc *Periodic) Resume() { + pc.mu.Lock() + pc.paused = false + pc.mu.Unlock() } diff --git a/compactor/periodic_test.go b/compactor/periodic_test.go index 21e539e76..fdf90e5df 100644 --- a/compactor/periodic_test.go +++ b/compactor/periodic_test.go @@ -23,6 +23,7 @@ import ( "github.com/coreos/etcd/pkg/testutil" "github.com/jonboulle/clockwork" + "go.uber.org/zap" ) func TestPeriodicHourly(t *testing.T) { @@ -32,7 +33,7 @@ func TestPeriodicHourly(t *testing.T) { fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := newPeriodic(fc, retentionDuration, rg, compactable) + tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable) tb.Run() defer tb.Stop() @@ -83,7 +84,7 @@ func TestPeriodicMinutes(t *testing.T) { fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := newPeriodic(fc, retentionDuration, rg, compactable) + tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable) tb.Run() defer tb.Stop() @@ -131,7 +132,7 @@ func TestPeriodicPause(t *testing.T) { retentionDuration := time.Hour rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := newPeriodic(fc, retentionDuration, rg, compactable) + tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable) tb.Run() tb.Pause() diff --git a/compactor/revision.go b/compactor/revision.go index 927e41c97..ace79c387 100644 --- a/compactor/revision.go +++ b/compactor/revision.go @@ -23,11 +23,14 @@ import ( "github.com/coreos/etcd/mvcc" "github.com/jonboulle/clockwork" + "go.uber.org/zap" ) // Revision compacts the log by purging revisions older than // the configured reivison number. Compaction happens every 5 minutes. type Revision struct { + lg *zap.Logger + clock clockwork.Clock retention int64 @@ -41,75 +44,100 @@ type Revision struct { paused bool } -// NewRevision creates a new instance of Revisonal compactor that purges +// newRevision creates a new instance of Revisonal compactor that purges // the log older than retention revisions from the current revision. -func NewRevision(retention int64, rg RevGetter, c Compactable) *Revision { - return newRevision(clockwork.NewRealClock(), retention, rg, c) -} - -func newRevision(clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision { - t := &Revision{ +func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision { + rc := &Revision{ + lg: lg, clock: clock, retention: retention, rg: rg, c: c, } - t.ctx, t.cancel = context.WithCancel(context.Background()) - return t + rc.ctx, rc.cancel = context.WithCancel(context.Background()) + return rc } const revInterval = 5 * time.Minute // Run runs revision-based compactor. -func (t *Revision) Run() { +func (rc *Revision) Run() { prev := int64(0) go func() { for { select { - case <-t.ctx.Done(): + case <-rc.ctx.Done(): return - case <-t.clock.After(revInterval): - t.mu.Lock() - p := t.paused - t.mu.Unlock() + case <-rc.clock.After(revInterval): + rc.mu.Lock() + p := rc.paused + rc.mu.Unlock() if p { continue } } - rev := t.rg.Rev() - t.retention + rev := rc.rg.Rev() - rc.retention if rev <= 0 || rev == prev { continue } - plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, t.retention) - _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) + now := time.Now() + if rc.lg != nil { + rc.lg.Info( + "starting auto revision compaction", + zap.Int64("revision", rev), + zap.Int64("revision-compaction-retention", rc.retention), + ) + } else { + plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, rc.retention) + } + _, err := rc.c.Compact(rc.ctx, &pb.CompactionRequest{Revision: rev}) if err == nil || err == mvcc.ErrCompacted { prev = rev - plog.Noticef("Finished auto-compaction at revision %d", rev) + if rc.lg != nil { + rc.lg.Info( + "completed auto revision compaction", + zap.Int64("revision", rev), + zap.Int64("revision-compaction-retention", rc.retention), + zap.Duration("took", time.Since(now)), + ) + } else { + plog.Noticef("Finished auto-compaction at revision %d", rev) + } } else { - plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) - plog.Noticef("Retry after %v", revInterval) + if rc.lg != nil { + rc.lg.Warn( + "failed auto revision compaction", + zap.Int64("revision", rev), + zap.Int64("revision-compaction-retention", rc.retention), + zap.Duration("retry-interval", revInterval), + zap.Error(err), + ) + } else { + plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) + plog.Noticef("Retry after %v", revInterval) + } } } }() } // Stop stops revision-based compactor. -func (t *Revision) Stop() { - t.cancel() +func (rc *Revision) Stop() { + rc.cancel() } // Pause pauses revision-based compactor. -func (t *Revision) Pause() { - t.mu.Lock() - defer t.mu.Unlock() - t.paused = true +func (rc *Revision) Pause() { + rc.mu.Lock() + rc.paused = true + rc.mu.Unlock() } // Resume resumes revision-based compactor. -func (t *Revision) Resume() { - t.mu.Lock() - defer t.mu.Unlock() - t.paused = false +func (rc *Revision) Resume() { + rc.mu.Lock() + rc.paused = false + rc.mu.Unlock() } diff --git a/compactor/revision_test.go b/compactor/revision_test.go index 905683c36..e6bebcd49 100644 --- a/compactor/revision_test.go +++ b/compactor/revision_test.go @@ -23,13 +23,14 @@ import ( "github.com/coreos/etcd/pkg/testutil" "github.com/jonboulle/clockwork" + "go.uber.org/zap" ) func TestRevision(t *testing.T) { fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := newRevision(fc, 10, rg, compactable) + tb := newRevision(zap.NewExample(), fc, 10, rg, compactable) tb.Run() defer tb.Stop() @@ -72,7 +73,7 @@ func TestRevisionPause(t *testing.T) { fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100 compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := newRevision(fc, 10, rg, compactable) + tb := newRevision(zap.NewExample(), fc, 10, rg, compactable) tb.Run() tb.Pause() From 6a016cbd8664c7d207703c7b18fe7bdaa016e17a Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Thu, 26 Apr 2018 14:21:33 -0700 Subject: [PATCH 2/5] discovery: support structured logger Signed-off-by: Gyuho Lee --- discovery/discovery.go | 109 ++++++++++++++++++++++++++++++------ discovery/discovery_test.go | 8 ++- 2 files changed, 98 insertions(+), 19 deletions(-) diff --git a/discovery/discovery.go b/discovery/discovery.go index 7d1fa0d05..5ff5f8e23 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -35,6 +35,7 @@ import ( "github.com/coreos/pkg/capnslog" "github.com/jonboulle/clockwork" + "go.uber.org/zap" ) var ( @@ -59,8 +60,8 @@ var ( // JoinCluster will connect to the discovery service at the given url, and // register the server represented by the given id and config to the cluster -func JoinCluster(durl, dproxyurl string, id types.ID, config string) (string, error) { - d, err := newDiscovery(durl, dproxyurl, id) +func JoinCluster(lg *zap.Logger, durl, dproxyurl string, id types.ID, config string) (string, error) { + d, err := newDiscovery(lg, durl, dproxyurl, id) if err != nil { return "", err } @@ -69,8 +70,8 @@ func JoinCluster(durl, dproxyurl string, id types.ID, config string) (string, er // GetCluster will connect to the discovery service at the given url and // retrieve a string describing the cluster -func GetCluster(durl, dproxyurl string) (string, error) { - d, err := newDiscovery(durl, dproxyurl, 0) +func GetCluster(lg *zap.Logger, durl, dproxyurl string) (string, error) { + d, err := newDiscovery(lg, durl, dproxyurl, 0) if err != nil { return "", err } @@ -78,6 +79,7 @@ func GetCluster(durl, dproxyurl string) (string, error) { } type discovery struct { + lg *zap.Logger cluster string id types.ID c client.KeysAPI @@ -90,7 +92,7 @@ type discovery struct { // newProxyFunc builds a proxy function from the given string, which should // represent a URL that can be used as a proxy. It performs basic // sanitization of the URL and returns any error encountered. -func newProxyFunc(proxy string) (func(*http.Request) (*url.URL, error), error) { +func newProxyFunc(lg *zap.Logger, proxy string) (func(*http.Request) (*url.URL, error), error) { if proxy == "" { return nil, nil } @@ -111,18 +113,22 @@ func newProxyFunc(proxy string) (func(*http.Request) (*url.URL, error), error) { return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err) } - plog.Infof("using proxy %q", proxyURL.String()) + if lg != nil { + lg.Info("running proxy with discovery", zap.String("proxy-url", proxyURL.String())) + } else { + plog.Infof("using proxy %q", proxyURL.String()) + } return http.ProxyURL(proxyURL), nil } -func newDiscovery(durl, dproxyurl string, id types.ID) (*discovery, error) { +func newDiscovery(lg *zap.Logger, durl, dproxyurl string, id types.ID) (*discovery, error) { u, err := url.Parse(durl) if err != nil { return nil, err } token := u.Path u.Path = "" - pf, err := newProxyFunc(dproxyurl) + pf, err := newProxyFunc(lg, dproxyurl) if err != nil { return nil, err } @@ -143,6 +149,7 @@ func newDiscovery(durl, dproxyurl string, id types.ID) (*discovery, error) { } dc := client.NewKeysAPIWithPrefix(c, "") return &discovery{ + lg: lg, cluster: token, c: dc, id: id, @@ -225,7 +232,17 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) { return nil, 0, 0, ErrBadDiscoveryEndpoint } if ce, ok := err.(*client.ClusterError); ok { - plog.Error(ce.Detail()) + if d.lg != nil { + d.lg.Warn( + "failed to get from discovery server", + zap.String("discovery-url", d.url.String()), + zap.String("path", path.Join(configKey, "size")), + zap.Error(err), + zap.String("err-detail", ce.Detail()), + ) + } else { + plog.Error(ce.Detail()) + } return d.checkClusterRetry() } return nil, 0, 0, err @@ -240,7 +257,17 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) { cancel() if err != nil { if ce, ok := err.(*client.ClusterError); ok { - plog.Error(ce.Detail()) + if d.lg != nil { + d.lg.Warn( + "failed to get from discovery server", + zap.String("discovery-url", d.url.String()), + zap.String("path", d.cluster), + zap.Error(err), + zap.String("err-detail", ce.Detail()), + ) + } else { + plog.Error(ce.Detail()) + } return d.checkClusterRetry() } return nil, 0, 0, err @@ -276,7 +303,16 @@ func (d *discovery) logAndBackoffForRetry(step string) { retries = maxExpoentialRetries } retryTimeInSecond := time.Duration(0x1< Date: Thu, 26 Apr 2018 14:22:09 -0700 Subject: [PATCH 3/5] etcdserver: support structured logger for discovery, compactor Signed-off-by: Gyuho Lee --- etcdserver/server.go | 4 ++-- etcdserver/v2auth/auth.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 987eeb7f7..8270faca5 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -369,7 +369,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { } if cfg.ShouldDiscover() { var str string - str, err = discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) + str, err = discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) if err != nil { return nil, &DiscoveryError{Op: "join", Err: err} } @@ -562,7 +562,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { } srv.authStore = auth.NewAuthStore(srv.be, tp) if num := cfg.AutoCompactionRetention; num != 0 { - srv.compactor, err = compactor.New(cfg.AutoCompactionMode, num, srv.kv, srv) + srv.compactor, err = compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv) if err != nil { return nil, err } diff --git a/etcdserver/v2auth/auth.go b/etcdserver/v2auth/auth.go index ba34336db..fe9d228f8 100644 --- a/etcdserver/v2auth/auth.go +++ b/etcdserver/v2auth/auth.go @@ -30,8 +30,8 @@ import ( "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/v2error" "github.com/coreos/etcd/pkg/types" - "github.com/coreos/pkg/capnslog" + "github.com/coreos/pkg/capnslog" "golang.org/x/crypto/bcrypt" ) From c316e6773b6d3546a16687e88fe84e84fe53d627 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Thu, 26 Apr 2018 14:22:33 -0700 Subject: [PATCH 4/5] embed: clean up structured logging calls Signed-off-by: Gyuho Lee --- embed/config.go | 299 +++--------------- embed/config_logging.go | 288 +++++++++++++++++ ...nix.go => config_logging_journald_unix.go} | 2 + ....go => config_logging_journald_windows.go} | 0 embed/etcd.go | 24 +- 5 files changed, 345 insertions(+), 268 deletions(-) create mode 100644 embed/config_logging.go rename embed/{journald_unix.go => config_logging_journald_unix.go} (96%) rename embed/{journald_windows.go => config_logging_journald_windows.go} (100%) diff --git a/embed/config.go b/embed/config.go index fd3d26284..23d56ad63 100644 --- a/embed/config.go +++ b/embed/config.go @@ -16,7 +16,6 @@ package embed import ( "crypto/tls" - "errors" "fmt" "io/ioutil" "net" @@ -24,28 +23,22 @@ import ( "net/url" "os" "path/filepath" - "reflect" - "sort" "strings" "sync" - "syscall" "time" "github.com/coreos/etcd/compactor" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/pkg/flags" - "github.com/coreos/etcd/pkg/logutil" "github.com/coreos/etcd/pkg/netutil" "github.com/coreos/etcd/pkg/srv" "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" - "github.com/coreos/pkg/capnslog" "github.com/ghodss/yaml" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" - "google.golang.org/grpc/grpclog" ) const ( @@ -278,6 +271,7 @@ type Config struct { // - file path to append server logs to. // It can be multiple when "Logger" is zap. LogOutputs []string `json:"log-outputs"` + // Debug is true, to enable debug level logging. Debug bool `json:"debug"` @@ -399,259 +393,6 @@ func logTLSHandshakeFailure(conn *tls.Conn, err error) { } } -// GetLogger returns the logger. -func (cfg Config) GetLogger() *zap.Logger { - cfg.loggerMu.RLock() - l := cfg.logger - cfg.loggerMu.RUnlock() - return l -} - -// for testing -var grpcLogOnce = new(sync.Once) - -// setupLogging initializes etcd logging. -// Must be called after flag parsing or finishing configuring embed.Config. -func (cfg *Config) setupLogging() error { - // handle "DeprecatedLogOutput" in v3.4 - // TODO: remove "DeprecatedLogOutput" in v3.5 - len1 := len(cfg.DeprecatedLogOutput) - len2 := len(cfg.LogOutputs) - if len1 != len2 { - switch { - case len1 > len2: // deprecate "log-output" flag is used - fmt.Fprintln(os.Stderr, "'--log-output' flag has been deprecated! Please use '--log-outputs'!") - cfg.LogOutputs = cfg.DeprecatedLogOutput - case len1 < len2: // "--log-outputs" flag has been set with multiple writers - cfg.DeprecatedLogOutput = []string{} - } - } else { - if len1 > 1 { - return errors.New("both '--log-output' and '--log-outputs' are set; only set '--log-outputs'") - } - if len1 < 1 { - return errors.New("either '--log-output' or '--log-outputs' flag must be set") - } - if reflect.DeepEqual(cfg.DeprecatedLogOutput, cfg.LogOutputs) && cfg.DeprecatedLogOutput[0] != DefaultLogOutput { - return fmt.Errorf("'--log-output=%q' and '--log-outputs=%q' are incompatible; only set --log-outputs", cfg.DeprecatedLogOutput, cfg.LogOutputs) - } - if !reflect.DeepEqual(cfg.DeprecatedLogOutput, []string{DefaultLogOutput}) { - fmt.Fprintf(os.Stderr, "Deprecated '--log-output' flag is set to %q\n", cfg.DeprecatedLogOutput) - fmt.Fprintln(os.Stderr, "Please use '--log-outputs' flag") - } - } - - switch cfg.Logger { - case "capnslog": // TODO: deprecate this in v3.5 - cfg.ClientTLSInfo.HandshakeFailure = logTLSHandshakeFailure - cfg.PeerTLSInfo.HandshakeFailure = logTLSHandshakeFailure - - if cfg.Debug { - capnslog.SetGlobalLogLevel(capnslog.DEBUG) - grpc.EnableTracing = true - // enable info, warning, error - grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr)) - } else { - capnslog.SetGlobalLogLevel(capnslog.INFO) - // only discard info - grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr)) - } - - // TODO: deprecate with "capnslog" - if cfg.LogPkgLevels != "" { - repoLog := capnslog.MustRepoLogger("github.com/coreos/etcd") - settings, err := repoLog.ParseLogLevelConfig(cfg.LogPkgLevels) - if err != nil { - plog.Warningf("couldn't parse log level string: %s, continuing with default levels", err.Error()) - return nil - } - repoLog.SetLogLevel(settings) - } - - if len(cfg.LogOutputs) != 1 { - fmt.Printf("--logger=capnslog supports only 1 value in '--log-outputs', got %q\n", cfg.LogOutputs) - os.Exit(1) - } - // capnslog initially SetFormatter(NewDefaultFormatter(os.Stderr)) - // where NewDefaultFormatter returns NewJournaldFormatter when syscall.Getppid() == 1 - // specify 'stdout' or 'stderr' to skip journald logging even when running under systemd - output := cfg.LogOutputs[0] - switch output { - case "stdout": - capnslog.SetFormatter(capnslog.NewPrettyFormatter(os.Stdout, cfg.Debug)) - case "stderr": - capnslog.SetFormatter(capnslog.NewPrettyFormatter(os.Stderr, cfg.Debug)) - case DefaultLogOutput: - default: - plog.Panicf(`unknown log-output %q (only supports %q, "stdout", "stderr")`, output, DefaultLogOutput) - } - - case "zap": - if len(cfg.LogOutputs) == 0 { - cfg.LogOutputs = []string{DefaultLogOutput} - } - if len(cfg.LogOutputs) > 1 { - for _, v := range cfg.LogOutputs { - if v == DefaultLogOutput { - panic(fmt.Errorf("multi logoutput for %q is not supported yet", DefaultLogOutput)) - } - } - } - - // TODO: use zapcore to support more features? - lcfg := zap.Config{ - Level: zap.NewAtomicLevelAt(zap.InfoLevel), - Development: false, - Sampling: &zap.SamplingConfig{ - Initial: 100, - Thereafter: 100, - }, - Encoding: "json", - EncoderConfig: zap.NewProductionEncoderConfig(), - - OutputPaths: make([]string, 0), - ErrorOutputPaths: make([]string, 0), - } - outputPaths, errOutputPaths := make(map[string]struct{}), make(map[string]struct{}) - isJournald := false - for _, v := range cfg.LogOutputs { - switch v { - case DefaultLogOutput: - if syscall.Getppid() == 1 { - // capnslog initially SetFormatter(NewDefaultFormatter(os.Stderr)) - // where "NewDefaultFormatter" returns "NewJournaldFormatter" - // specify 'stdout' or 'stderr' to override this redirects - // when syscall.Getppid() == 1 - isJournald = true - break - } - - outputPaths["stderr"] = struct{}{} - errOutputPaths["stderr"] = struct{}{} - - case "stderr": - outputPaths["stderr"] = struct{}{} - errOutputPaths["stderr"] = struct{}{} - - case "stdout": - outputPaths["stdout"] = struct{}{} - errOutputPaths["stdout"] = struct{}{} - - default: - outputPaths[v] = struct{}{} - errOutputPaths[v] = struct{}{} - } - } - - if !isJournald { - for v := range outputPaths { - lcfg.OutputPaths = append(lcfg.OutputPaths, v) - } - for v := range errOutputPaths { - lcfg.ErrorOutputPaths = append(lcfg.ErrorOutputPaths, v) - } - sort.Strings(lcfg.OutputPaths) - sort.Strings(lcfg.ErrorOutputPaths) - - if cfg.Debug { - lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel) - grpc.EnableTracing = true - } - - var err error - cfg.logger, err = lcfg.Build() - if err != nil { - return err - } - - cfg.loggerConfig = &lcfg - cfg.loggerCore = nil - cfg.loggerWriteSyncer = nil - - grpcLogOnce.Do(func() { - // debug true, enable info, warning, error - // debug false, only discard info - var gl grpclog.LoggerV2 - gl, err = logutil.NewGRPCLoggerV2(lcfg) - if err == nil { - grpclog.SetLoggerV2(gl) - } - }) - if err != nil { - return err - } - } else { - if len(cfg.LogOutputs) > 1 { - for _, v := range cfg.LogOutputs { - if v != DefaultLogOutput { - return fmt.Errorf("running as a systemd unit but other '--log-output' values (%q) are configured with 'default'; override 'default' value with something else", cfg.LogOutputs) - } - } - } - - // use stderr as fallback - syncer := getZapWriteSyncer() - lvl := zap.NewAtomicLevelAt(zap.InfoLevel) - if cfg.Debug { - lvl = zap.NewAtomicLevelAt(zap.DebugLevel) - grpc.EnableTracing = true - } - - // WARN: do not change field names in encoder config - // journald logging writer assumes field names of "level" and "caller" - cr := zapcore.NewCore( - zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), - syncer, - lvl, - ) - cfg.logger = zap.New(cr, zap.AddCaller(), zap.ErrorOutput(syncer)) - - cfg.loggerConfig = nil - cfg.loggerCore = cr - cfg.loggerWriteSyncer = syncer - - grpcLogOnce.Do(func() { - grpclog.SetLoggerV2(logutil.NewGRPCLoggerV2FromZapCore(cr, syncer)) - }) - } - - logTLSHandshakeFailure := func(conn *tls.Conn, err error) { - state := conn.ConnectionState() - remoteAddr := conn.RemoteAddr().String() - serverName := state.ServerName - if len(state.PeerCertificates) > 0 { - cert := state.PeerCertificates[0] - ips := make([]string, 0, len(cert.IPAddresses)) - for i := range cert.IPAddresses { - ips[i] = cert.IPAddresses[i].String() - } - cfg.logger.Warn( - "rejected connection", - zap.String("remote-addr", remoteAddr), - zap.String("server-name", serverName), - zap.Strings("ip-addresses", ips), - zap.Strings("dns-names", cert.DNSNames), - zap.Error(err), - ) - } else { - cfg.logger.Warn( - "rejected connection", - zap.String("remote-addr", remoteAddr), - zap.String("server-name", serverName), - zap.Error(err), - ) - } - } - cfg.ClientTLSInfo.HandshakeFailure = logTLSHandshakeFailure - cfg.PeerTLSInfo.HandshakeFailure = logTLSHandshakeFailure - - default: - return fmt.Errorf("unknown logger option %q", cfg.Logger) - } - - return nil -} - func ConfigFromFile(path string) (*Config, error) { cfg := &configYAML{Config: *NewConfig()} if err := cfg.configFromFile(path); err != nil { @@ -826,12 +567,13 @@ func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, tok // self's advertised peer URLs urlsmap[cfg.Name] = cfg.APUrls token = cfg.Durl + case cfg.DNSCluster != "": clusterStrs, cerr := cfg.GetDNSClusterNames() lg := cfg.logger if cerr != nil { if lg != nil { - lg.Error("failed to resolve during SRV discovery", zap.Error(cerr)) + lg.Warn("failed to resolve during SRV discovery", zap.Error(cerr)) } else { plog.Errorf("couldn't resolve during SRV discovery (%v)", cerr) } @@ -856,6 +598,7 @@ func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, tok return nil, "", fmt.Errorf("cannot find local etcd member %q in SRV records", cfg.Name) } } + default: // We're statically configured, and cluster has appropriately been set. urlsmap, err = types.NewURLsMap(cfg.InitialCluster) @@ -873,15 +616,45 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) { if cfg.DNSClusterServiceName != "" { serviceNameSuffix = "-" + cfg.DNSClusterServiceName } - // Use both etcd-server-ssl and etcd-server for discovery. Combine the results if both are available. + + lg := cfg.GetLogger() + + // Use both etcd-server-ssl and etcd-server for discovery. + // Combine the results if both are available. clusterStrs, cerr = srv.GetCluster("https", "etcd-server-ssl"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.APUrls) - defaultHTTPClusterStrs, httpCerr := srv.GetCluster("http", "etcd-server"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.APUrls) if cerr != nil { clusterStrs = make([]string, 0) } + if lg != nil { + lg.Info( + "get cluster for etcd-server-ssl SRV", + zap.String("service-scheme", "https"), + zap.String("service-name", "etcd-server-ssl"+serviceNameSuffix), + zap.String("server-name", cfg.Name), + zap.String("discovery-srv", cfg.DNSCluster), + zap.Strings("advertise-peer-urls", cfg.getAPURLs()), + zap.Strings("found-cluster", clusterStrs), + zap.Error(cerr), + ) + } + + defaultHTTPClusterStrs, httpCerr := srv.GetCluster("http", "etcd-server"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.APUrls) if httpCerr != nil { clusterStrs = append(clusterStrs, defaultHTTPClusterStrs...) } + if lg != nil { + lg.Info( + "get cluster for etcd-server SRV", + zap.String("service-scheme", "http"), + zap.String("service-name", "etcd-server"+serviceNameSuffix), + zap.String("server-name", cfg.Name), + zap.String("discovery-srv", cfg.DNSCluster), + zap.Strings("advertise-peer-urls", cfg.getAPURLs()), + zap.Strings("found-cluster", clusterStrs), + zap.Error(httpCerr), + ) + } + return clusterStrs, cerr } diff --git a/embed/config_logging.go b/embed/config_logging.go new file mode 100644 index 000000000..cae94b987 --- /dev/null +++ b/embed/config_logging.go @@ -0,0 +1,288 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package embed + +import ( + "crypto/tls" + "errors" + "fmt" + "io/ioutil" + "os" + "reflect" + "sort" + "sync" + "syscall" + + "github.com/coreos/etcd/pkg/logutil" + + "github.com/coreos/pkg/capnslog" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "google.golang.org/grpc/grpclog" +) + +// GetLogger returns the logger. +func (cfg Config) GetLogger() *zap.Logger { + cfg.loggerMu.RLock() + l := cfg.logger + cfg.loggerMu.RUnlock() + return l +} + +// for testing +var grpcLogOnce = new(sync.Once) + +// setupLogging initializes etcd logging. +// Must be called after flag parsing or finishing configuring embed.Config. +func (cfg *Config) setupLogging() error { + // handle "DeprecatedLogOutput" in v3.4 + // TODO: remove "DeprecatedLogOutput" in v3.5 + len1 := len(cfg.DeprecatedLogOutput) + len2 := len(cfg.LogOutputs) + if len1 != len2 { + switch { + case len1 > len2: // deprecate "log-output" flag is used + fmt.Fprintln(os.Stderr, "'--log-output' flag has been deprecated! Please use '--log-outputs'!") + cfg.LogOutputs = cfg.DeprecatedLogOutput + case len1 < len2: // "--log-outputs" flag has been set with multiple writers + cfg.DeprecatedLogOutput = []string{} + } + } else { + if len1 > 1 { + return errors.New("both '--log-output' and '--log-outputs' are set; only set '--log-outputs'") + } + if len1 < 1 { + return errors.New("either '--log-output' or '--log-outputs' flag must be set") + } + if reflect.DeepEqual(cfg.DeprecatedLogOutput, cfg.LogOutputs) && cfg.DeprecatedLogOutput[0] != DefaultLogOutput { + return fmt.Errorf("'--log-output=%q' and '--log-outputs=%q' are incompatible; only set --log-outputs", cfg.DeprecatedLogOutput, cfg.LogOutputs) + } + if !reflect.DeepEqual(cfg.DeprecatedLogOutput, []string{DefaultLogOutput}) { + fmt.Fprintf(os.Stderr, "Deprecated '--log-output' flag is set to %q\n", cfg.DeprecatedLogOutput) + fmt.Fprintln(os.Stderr, "Please use '--log-outputs' flag") + } + } + + switch cfg.Logger { + case "capnslog": // TODO: deprecate this in v3.5 + cfg.ClientTLSInfo.HandshakeFailure = logTLSHandshakeFailure + cfg.PeerTLSInfo.HandshakeFailure = logTLSHandshakeFailure + + if cfg.Debug { + capnslog.SetGlobalLogLevel(capnslog.DEBUG) + grpc.EnableTracing = true + // enable info, warning, error + grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr)) + } else { + capnslog.SetGlobalLogLevel(capnslog.INFO) + // only discard info + grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr)) + } + + // TODO: deprecate with "capnslog" + if cfg.LogPkgLevels != "" { + repoLog := capnslog.MustRepoLogger("github.com/coreos/etcd") + settings, err := repoLog.ParseLogLevelConfig(cfg.LogPkgLevels) + if err != nil { + plog.Warningf("couldn't parse log level string: %s, continuing with default levels", err.Error()) + return nil + } + repoLog.SetLogLevel(settings) + } + + if len(cfg.LogOutputs) != 1 { + fmt.Printf("--logger=capnslog supports only 1 value in '--log-outputs', got %q\n", cfg.LogOutputs) + os.Exit(1) + } + // capnslog initially SetFormatter(NewDefaultFormatter(os.Stderr)) + // where NewDefaultFormatter returns NewJournaldFormatter when syscall.Getppid() == 1 + // specify 'stdout' or 'stderr' to skip journald logging even when running under systemd + output := cfg.LogOutputs[0] + switch output { + case "stdout": + capnslog.SetFormatter(capnslog.NewPrettyFormatter(os.Stdout, cfg.Debug)) + case "stderr": + capnslog.SetFormatter(capnslog.NewPrettyFormatter(os.Stderr, cfg.Debug)) + case DefaultLogOutput: + default: + plog.Panicf(`unknown log-output %q (only supports %q, "stdout", "stderr")`, output, DefaultLogOutput) + } + + case "zap": + if len(cfg.LogOutputs) == 0 { + cfg.LogOutputs = []string{DefaultLogOutput} + } + if len(cfg.LogOutputs) > 1 { + for _, v := range cfg.LogOutputs { + if v == DefaultLogOutput { + panic(fmt.Errorf("multi logoutput for %q is not supported yet", DefaultLogOutput)) + } + } + } + + // TODO: use zapcore to support more features? + lcfg := zap.Config{ + Level: zap.NewAtomicLevelAt(zap.InfoLevel), + Development: false, + Sampling: &zap.SamplingConfig{ + Initial: 100, + Thereafter: 100, + }, + Encoding: "json", + EncoderConfig: zap.NewProductionEncoderConfig(), + + OutputPaths: make([]string, 0), + ErrorOutputPaths: make([]string, 0), + } + outputPaths, errOutputPaths := make(map[string]struct{}), make(map[string]struct{}) + isJournald := false + for _, v := range cfg.LogOutputs { + switch v { + case DefaultLogOutput: + if syscall.Getppid() == 1 { + // capnslog initially SetFormatter(NewDefaultFormatter(os.Stderr)) + // where "NewDefaultFormatter" returns "NewJournaldFormatter" + // specify 'stdout' or 'stderr' to override this redirects + // when syscall.Getppid() == 1 + isJournald = true + break + } + + outputPaths["stderr"] = struct{}{} + errOutputPaths["stderr"] = struct{}{} + + case "stderr": + outputPaths["stderr"] = struct{}{} + errOutputPaths["stderr"] = struct{}{} + + case "stdout": + outputPaths["stdout"] = struct{}{} + errOutputPaths["stdout"] = struct{}{} + + default: + outputPaths[v] = struct{}{} + errOutputPaths[v] = struct{}{} + } + } + + if !isJournald { + for v := range outputPaths { + lcfg.OutputPaths = append(lcfg.OutputPaths, v) + } + for v := range errOutputPaths { + lcfg.ErrorOutputPaths = append(lcfg.ErrorOutputPaths, v) + } + sort.Strings(lcfg.OutputPaths) + sort.Strings(lcfg.ErrorOutputPaths) + + if cfg.Debug { + lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + grpc.EnableTracing = true + } + + var err error + cfg.logger, err = lcfg.Build() + if err != nil { + return err + } + + cfg.loggerConfig = &lcfg + cfg.loggerCore = nil + cfg.loggerWriteSyncer = nil + + grpcLogOnce.Do(func() { + // debug true, enable info, warning, error + // debug false, only discard info + var gl grpclog.LoggerV2 + gl, err = logutil.NewGRPCLoggerV2(lcfg) + if err == nil { + grpclog.SetLoggerV2(gl) + } + }) + if err != nil { + return err + } + } else { + if len(cfg.LogOutputs) > 1 { + for _, v := range cfg.LogOutputs { + if v != DefaultLogOutput { + return fmt.Errorf("running as a systemd unit but other '--log-output' values (%q) are configured with 'default'; override 'default' value with something else", cfg.LogOutputs) + } + } + } + + // use stderr as fallback + syncer := getZapWriteSyncer() + lvl := zap.NewAtomicLevelAt(zap.InfoLevel) + if cfg.Debug { + lvl = zap.NewAtomicLevelAt(zap.DebugLevel) + grpc.EnableTracing = true + } + + // WARN: do not change field names in encoder config + // journald logging writer assumes field names of "level" and "caller" + cr := zapcore.NewCore( + zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), + syncer, + lvl, + ) + cfg.logger = zap.New(cr, zap.AddCaller(), zap.ErrorOutput(syncer)) + + cfg.loggerConfig = nil + cfg.loggerCore = cr + cfg.loggerWriteSyncer = syncer + + grpcLogOnce.Do(func() { + grpclog.SetLoggerV2(logutil.NewGRPCLoggerV2FromZapCore(cr, syncer)) + }) + } + + logTLSHandshakeFailure := func(conn *tls.Conn, err error) { + state := conn.ConnectionState() + remoteAddr := conn.RemoteAddr().String() + serverName := state.ServerName + if len(state.PeerCertificates) > 0 { + cert := state.PeerCertificates[0] + ips := make([]string, 0, len(cert.IPAddresses)) + for i := range cert.IPAddresses { + ips[i] = cert.IPAddresses[i].String() + } + cfg.logger.Warn( + "rejected connection", + zap.String("remote-addr", remoteAddr), + zap.String("server-name", serverName), + zap.Strings("ip-addresses", ips), + zap.Strings("dns-names", cert.DNSNames), + zap.Error(err), + ) + } else { + cfg.logger.Warn( + "rejected connection", + zap.String("remote-addr", remoteAddr), + zap.String("server-name", serverName), + zap.Error(err), + ) + } + } + cfg.ClientTLSInfo.HandshakeFailure = logTLSHandshakeFailure + cfg.PeerTLSInfo.HandshakeFailure = logTLSHandshakeFailure + + default: + return fmt.Errorf("unknown logger option %q", cfg.Logger) + } + + return nil +} diff --git a/embed/journald_unix.go b/embed/config_logging_journald_unix.go similarity index 96% rename from embed/journald_unix.go rename to embed/config_logging_journald_unix.go index 8944b4c4a..c58ef9ca0 100644 --- a/embed/journald_unix.go +++ b/embed/config_logging_journald_unix.go @@ -20,9 +20,11 @@ import ( "os" "github.com/coreos/etcd/pkg/logutil" + "go.uber.org/zap/zapcore" ) +// use stderr as fallback func getZapWriteSyncer() zapcore.WriteSyncer { return zapcore.AddSync(logutil.NewJournaldWriter(os.Stderr)) } diff --git a/embed/journald_windows.go b/embed/config_logging_journald_windows.go similarity index 100% rename from embed/journald_windows.go rename to embed/config_logging_journald_windows.go diff --git a/embed/etcd.go b/embed/etcd.go index a500a86e9..8d537800c 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -28,6 +28,8 @@ import ( "sync" "time" + "go.uber.org/zap/zapcore" + "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/etcdhttp" "github.com/coreos/etcd/etcdserver/api/v2http" @@ -273,6 +275,23 @@ func (e *Etcd) Config() Config { // Client requests will be terminated with request timeout. // After timeout, enforce remaning requests be closed immediately. func (e *Etcd) Close() { + fields := []zapcore.Field{ + zap.String("name", e.cfg.Name), + zap.String("data-dir", e.cfg.Dir), + zap.Strings("advertise-peer-urls", e.cfg.getAPURLs()), + zap.Strings("advertise-client-urls", e.cfg.getACURLs()), + } + lg := e.GetLogger() + if lg != nil { + lg.Info("closing etcd server", fields...) + } + defer func() { + if lg != nil { + lg.Info("closed etcd server", fields...) + lg.Sync() + } + }() + e.closeOnce.Do(func() { close(e.stopc) }) // close client requests with request timeout @@ -315,11 +334,6 @@ func (e *Etcd) Close() { cancel() } } - - lg := e.GetLogger() - if lg != nil { - lg.Sync() - } } func stopServers(ctx context.Context, ss *servers) { From 829c4479f3c65b83cd40128428da8190202573a9 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Thu, 26 Apr 2018 14:22:58 -0700 Subject: [PATCH 5/5] etcdmain: support structured logging for discovery service Signed-off-by: Gyuho Lee --- etcdmain/etcd.go | 59 ++++++++++++++++++++++++++++++++---------- etcdmain/gateway.go | 19 +++++++------- etcdmain/grpc_proxy.go | 2 +- etcdmain/util.go | 51 ++++++++++++++++++++++++++++++++---- 4 files changed, 102 insertions(+), 29 deletions(-) diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 2ca2f2664..c0a24ff90 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -63,14 +63,14 @@ func startEtcdOrProxyV2() { if err != nil { lg := cfg.ec.GetLogger() if lg != nil { - lg.Error("failed to verify flags", zap.Error(err)) + lg.Warn("failed to verify flags", zap.Error(err)) } else { plog.Errorf("error verifying flags, %v. See 'etcd --help'.", err) } switch err { case embed.ErrUnsetAdvertiseClientURLsFlag: if lg != nil { - lg.Error("advertise client URLs are not set", zap.Error(err)) + lg.Warn("advertise client URLs are not set", zap.Error(err)) } else { plog.Errorf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.") } @@ -143,7 +143,11 @@ func startEtcdOrProxyV2() { which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir) if which != dirEmpty { if lg != nil { - + lg.Info( + "server has been already initialized", + zap.String("data-dir", cfg.ec.Dir), + zap.String("dir-type", string(which)), + ) } else { plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which) } @@ -153,7 +157,14 @@ func startEtcdOrProxyV2() { case dirProxy: err = startProxy(cfg) default: - plog.Panicf("unhandled dir type %v", which) + if lg != nil { + lg.Panic( + "unknown directory type", + zap.String("dir-type", string(which)), + ) + } else { + plog.Panicf("unhandled dir type %v", which) + } } } else { shouldProxy := cfg.isProxy() @@ -162,12 +173,20 @@ func startEtcdOrProxyV2() { if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == discovery.ErrFullCluster { if cfg.shouldFallbackToProxy() { if lg != nil { - + lg.Warn( + "discovery cluster is full, falling back to proxy", + zap.String("fallback-proxy", fallbackFlagProxy), + zap.Error(err), + ) } else { plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy) } shouldProxy = true } + } else if err != nil { + if lg != nil { + lg.Warn("failed to start etcd", zap.Error(err)) + } } } if shouldProxy { @@ -180,13 +199,13 @@ func startEtcdOrProxyV2() { switch derr.Err { case discovery.ErrDuplicateID: if lg != nil { - lg.Error( + lg.Warn( "member has been registered with discovery service", zap.String("name", cfg.ec.Name), zap.String("discovery-token", cfg.ec.Durl), zap.Error(derr.Err), ) - lg.Error( + lg.Warn( "but could not find valid cluster configuration", zap.String("data-dir", cfg.ec.Dir), ) @@ -198,9 +217,10 @@ func startEtcdOrProxyV2() { plog.Infof("Please check the given data dir path if the previous bootstrap succeeded") plog.Infof("or use a new discovery token if the previous bootstrap failed.") } + case discovery.ErrDuplicateName: if lg != nil { - lg.Error( + lg.Warn( "member with duplicated name has already been registered", zap.String("discovery-token", cfg.ec.Durl), zap.Error(derr.Err), @@ -212,9 +232,10 @@ func startEtcdOrProxyV2() { plog.Errorf("please check (cURL) the discovery token for more information.") plog.Errorf("please do not reuse the discovery token and generate a new one to bootstrap the cluster.") } + default: if lg != nil { - lg.Error( + lg.Warn( "failed to bootstrap; discovery token was already used", zap.String("discovery-token", cfg.ec.Durl), zap.Error(err), @@ -231,7 +252,7 @@ func startEtcdOrProxyV2() { if strings.Contains(err.Error(), "include") && strings.Contains(err.Error(), "--initial-cluster") { if lg != nil { - lg.Error("failed to start", zap.Error(err)) + lg.Warn("failed to start", zap.Error(err)) } else { plog.Infof("%v", err) } @@ -320,7 +341,12 @@ func startProxy(cfg *config) error { clientTLSInfo.InsecureSkipVerify = cfg.ec.ClientAutoTLS cfg.ec.PeerTLSInfo.InsecureSkipVerify = cfg.ec.PeerAutoTLS - pt, err := transport.NewTimeoutTransport(clientTLSInfo, time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond) + pt, err := transport.NewTimeoutTransport( + clientTLSInfo, + time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond, + time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond, + time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond, + ) if err != nil { return err } @@ -333,7 +359,12 @@ func startProxy(cfg *config) error { plog.Fatalf("could not get certs (%v)", err) } } - tr, err := transport.NewTimeoutTransport(cfg.ec.PeerTLSInfo, time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond) + tr, err := transport.NewTimeoutTransport( + cfg.ec.PeerTLSInfo, + time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond, + time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond, + time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond, + ) if err != nil { return err } @@ -385,6 +416,7 @@ func startProxy(cfg *config) error { } else { plog.Infof("proxy: using peer urls %v from cluster file %q", peerURLs, clusterfile) } + case os.IsNotExist(err): var urlsmap types.URLsMap urlsmap, _, err = cfg.ec.PeerURLsMapAndToken("proxy") @@ -394,7 +426,7 @@ func startProxy(cfg *config) error { if cfg.ec.Durl != "" { var s string - s, err = discovery.GetCluster(cfg.ec.Durl, cfg.ec.Dproxy) + s, err = discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy) if err != nil { return err } @@ -408,6 +440,7 @@ func startProxy(cfg *config) error { } else { plog.Infof("proxy: using peer urls %v ", peerURLs) } + default: return err } diff --git a/etcdmain/gateway.go b/etcdmain/gateway.go index 2c4e4950e..b260ddc7d 100644 --- a/etcdmain/gateway.go +++ b/etcdmain/gateway.go @@ -21,11 +21,10 @@ import ( "os" "time" - "go.uber.org/zap" - "github.com/coreos/etcd/proxy/tcpproxy" "github.com/spf13/cobra" + "go.uber.org/zap" ) var ( @@ -91,7 +90,14 @@ func stripSchema(eps []string) []string { } func startGateway(cmd *cobra.Command, args []string) { - srvs := discoverEndpoints(gatewayDNSCluster, gatewayCA, gatewayInsecureDiscovery) + var lg *zap.Logger + lg, err := zap.NewProduction() + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + srvs := discoverEndpoints(lg, gatewayDNSCluster, gatewayCA, gatewayInsecureDiscovery) if len(srvs.Endpoints) == 0 { // no endpoints discovered, fall back to provided endpoints srvs.Endpoints = gatewayEndpoints @@ -116,13 +122,6 @@ func startGateway(cmd *cobra.Command, args []string) { os.Exit(1) } - var lg *zap.Logger - lg, err := zap.NewProduction() - if err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - var l net.Listener l, err = net.Listen("tcp", gatewayListenAddr) if err != nil { diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index baa824894..6a1ff1f6b 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -238,7 +238,7 @@ func checkArgs() { } func mustNewClient(lg *zap.Logger) *clientv3.Client { - srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery) + srvs := discoverEndpoints(lg, grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery) eps := srvs.Endpoints if len(eps) == 0 { eps = grpcProxyEndpoints diff --git a/etcdmain/util.go b/etcdmain/util.go index 9657271d5..c8872ad46 100644 --- a/etcdmain/util.go +++ b/etcdmain/util.go @@ -20,9 +20,11 @@ import ( "github.com/coreos/etcd/pkg/srv" "github.com/coreos/etcd/pkg/transport" + + "go.uber.org/zap" ) -func discoverEndpoints(dns string, ca string, insecure bool) (s srv.SRVClients) { +func discoverEndpoints(lg *zap.Logger, dns string, ca string, insecure bool) (s srv.SRVClients) { if dns == "" { return s } @@ -32,7 +34,17 @@ func discoverEndpoints(dns string, ca string, insecure bool) (s srv.SRVClients) os.Exit(1) } endpoints := srvs.Endpoints - plog.Infof("discovered the cluster %s from %s", endpoints, dns) + + if lg != nil { + lg.Info( + "discovered cluster from SRV", + zap.String("srv-server", dns), + zap.Strings("endpoints", endpoints), + ) + } else { + plog.Infof("discovered the cluster %s from %s", endpoints, dns) + } + if insecure { return *srvs } @@ -41,12 +53,41 @@ func discoverEndpoints(dns string, ca string, insecure bool) (s srv.SRVClients) TrustedCAFile: ca, ServerName: dns, } - plog.Infof("validating discovered endpoints %v", endpoints) + + if lg != nil { + lg.Info( + "validating discovered SRV endpoints", + zap.String("srv-server", dns), + zap.Strings("endpoints", endpoints), + ) + } else { + plog.Infof("validating discovered endpoints %v", endpoints) + } + endpoints, err = transport.ValidateSecureEndpoints(tlsInfo, endpoints) if err != nil { - plog.Warningf("%v", err) + if lg != nil { + lg.Warn( + "failed to validate discovered endpoints", + zap.String("srv-server", dns), + zap.Strings("endpoints", endpoints), + zap.Error(err), + ) + } else { + plog.Warningf("%v", err) + } + } else { + if lg != nil { + lg.Info( + "using validated discovered SRV endpoints", + zap.String("srv-server", dns), + zap.Strings("endpoints", endpoints), + ) + } + } + if lg == nil { + plog.Infof("using discovered endpoints %v", endpoints) } - plog.Infof("using discovered endpoints %v", endpoints) // map endpoints back to SRVClients struct with SRV data eps := make(map[string]struct{})