commit
8baa2401b1
|
@ -22,6 +22,8 @@ import (
|
|||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/jonboulle/clockwork"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -54,12 +56,19 @@ type RevGetter interface {
|
|||
Rev() int64
|
||||
}
|
||||
|
||||
func New(mode string, retention time.Duration, rg RevGetter, c Compactable) (Compactor, error) {
|
||||
// New returns a new Compactor based on given "mode".
|
||||
func New(
|
||||
lg *zap.Logger,
|
||||
mode string,
|
||||
retention time.Duration,
|
||||
rg RevGetter,
|
||||
c Compactable,
|
||||
) (Compactor, error) {
|
||||
switch mode {
|
||||
case ModePeriodic:
|
||||
return NewPeriodic(retention, rg, c), nil
|
||||
return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil
|
||||
case ModeRevision:
|
||||
return NewRevision(int64(retention), rg, c), nil
|
||||
return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported compaction mode %s", mode)
|
||||
}
|
||||
|
|
|
@ -23,11 +23,13 @@ import (
|
|||
"github.com/coreos/etcd/mvcc"
|
||||
|
||||
"github.com/jonboulle/clockwork"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Periodic compacts the log by purging revisions older than
|
||||
// the configured retention time.
|
||||
type Periodic struct {
|
||||
lg *zap.Logger
|
||||
clock clockwork.Clock
|
||||
period time.Duration
|
||||
|
||||
|
@ -43,22 +45,19 @@ type Periodic struct {
|
|||
paused bool
|
||||
}
|
||||
|
||||
// NewPeriodic creates a new instance of Periodic compactor that purges
|
||||
// newPeriodic creates a new instance of Periodic compactor that purges
|
||||
// the log older than h Duration.
|
||||
func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic {
|
||||
return newPeriodic(clockwork.NewRealClock(), h, rg, c)
|
||||
}
|
||||
|
||||
func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
|
||||
t := &Periodic{
|
||||
func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
|
||||
pc := &Periodic{
|
||||
lg: lg,
|
||||
clock: clock,
|
||||
period: h,
|
||||
rg: rg,
|
||||
c: c,
|
||||
revs: make([]int64, 0),
|
||||
}
|
||||
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||
return t
|
||||
pc.ctx, pc.cancel = context.WithCancel(context.Background())
|
||||
return pc
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -96,50 +95,77 @@ Compaction period 5-sec:
|
|||
*/
|
||||
|
||||
// Run runs periodic compactor.
|
||||
func (t *Periodic) Run() {
|
||||
compactInterval := t.getCompactInterval()
|
||||
retryInterval := t.getRetryInterval()
|
||||
retentions := t.getRetentions()
|
||||
func (pc *Periodic) Run() {
|
||||
compactInterval := pc.getCompactInterval()
|
||||
retryInterval := pc.getRetryInterval()
|
||||
retentions := pc.getRetentions()
|
||||
|
||||
go func() {
|
||||
lastSuccess := t.clock.Now()
|
||||
baseInterval := t.period
|
||||
lastSuccess := pc.clock.Now()
|
||||
baseInterval := pc.period
|
||||
for {
|
||||
t.revs = append(t.revs, t.rg.Rev())
|
||||
if len(t.revs) > retentions {
|
||||
t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago
|
||||
pc.revs = append(pc.revs, pc.rg.Rev())
|
||||
if len(pc.revs) > retentions {
|
||||
pc.revs = pc.revs[1:] // pc.revs[0] is always the rev at pc.period ago
|
||||
}
|
||||
|
||||
select {
|
||||
case <-t.ctx.Done():
|
||||
case <-pc.ctx.Done():
|
||||
return
|
||||
case <-t.clock.After(retryInterval):
|
||||
t.mu.Lock()
|
||||
p := t.paused
|
||||
t.mu.Unlock()
|
||||
case <-pc.clock.After(retryInterval):
|
||||
pc.mu.Lock()
|
||||
p := pc.paused
|
||||
pc.mu.Unlock()
|
||||
if p {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if t.clock.Now().Sub(lastSuccess) < baseInterval {
|
||||
if pc.clock.Now().Sub(lastSuccess) < baseInterval {
|
||||
continue
|
||||
}
|
||||
|
||||
// wait up to initial given period
|
||||
if baseInterval == t.period {
|
||||
if baseInterval == pc.period {
|
||||
baseInterval = compactInterval
|
||||
}
|
||||
rev := t.revs[0]
|
||||
rev := pc.revs[0]
|
||||
|
||||
plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period)
|
||||
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
|
||||
if err == nil || err == mvcc.ErrCompacted {
|
||||
lastSuccess = t.clock.Now()
|
||||
plog.Noticef("Finished auto-compaction at revision %d", rev)
|
||||
if pc.lg != nil {
|
||||
pc.lg.Info(
|
||||
"starting auto periodic compaction",
|
||||
zap.Int64("revision", rev),
|
||||
zap.Duration("compact-period", pc.period),
|
||||
)
|
||||
} else {
|
||||
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
|
||||
plog.Noticef("Retry after %v", retryInterval)
|
||||
plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, pc.period)
|
||||
}
|
||||
_, err := pc.c.Compact(pc.ctx, &pb.CompactionRequest{Revision: rev})
|
||||
if err == nil || err == mvcc.ErrCompacted {
|
||||
if pc.lg != nil {
|
||||
pc.lg.Info(
|
||||
"completed auto periodic compaction",
|
||||
zap.Int64("revision", rev),
|
||||
zap.Duration("compact-period", pc.period),
|
||||
zap.Duration("took", time.Since(lastSuccess)),
|
||||
)
|
||||
} else {
|
||||
plog.Noticef("Finished auto-compaction at revision %d", rev)
|
||||
}
|
||||
lastSuccess = pc.clock.Now()
|
||||
} else {
|
||||
if pc.lg != nil {
|
||||
pc.lg.Warn(
|
||||
"failed auto periodic compaction",
|
||||
zap.Int64("revision", rev),
|
||||
zap.Duration("compact-period", pc.period),
|
||||
zap.Duration("retry-interval", retryInterval),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
|
||||
plog.Noticef("Retry after %v", retryInterval)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -149,22 +175,22 @@ func (t *Periodic) Run() {
|
|||
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
|
||||
// if given compaction period x is >1-hour, compact every hour.
|
||||
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
|
||||
func (t *Periodic) getCompactInterval() time.Duration {
|
||||
itv := t.period
|
||||
func (pc *Periodic) getCompactInterval() time.Duration {
|
||||
itv := pc.period
|
||||
if itv > time.Hour {
|
||||
itv = time.Hour
|
||||
}
|
||||
return itv
|
||||
}
|
||||
|
||||
func (t *Periodic) getRetentions() int {
|
||||
return int(t.period/t.getRetryInterval()) + 1
|
||||
func (pc *Periodic) getRetentions() int {
|
||||
return int(pc.period/pc.getRetryInterval()) + 1
|
||||
}
|
||||
|
||||
const retryDivisor = 10
|
||||
|
||||
func (t *Periodic) getRetryInterval() time.Duration {
|
||||
itv := t.period
|
||||
func (pc *Periodic) getRetryInterval() time.Duration {
|
||||
itv := pc.period
|
||||
if itv > time.Hour {
|
||||
itv = time.Hour
|
||||
}
|
||||
|
@ -172,20 +198,20 @@ func (t *Periodic) getRetryInterval() time.Duration {
|
|||
}
|
||||
|
||||
// Stop stops periodic compactor.
|
||||
func (t *Periodic) Stop() {
|
||||
t.cancel()
|
||||
func (pc *Periodic) Stop() {
|
||||
pc.cancel()
|
||||
}
|
||||
|
||||
// Pause pauses periodic compactor.
|
||||
func (t *Periodic) Pause() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.paused = true
|
||||
func (pc *Periodic) Pause() {
|
||||
pc.mu.Lock()
|
||||
pc.paused = true
|
||||
pc.mu.Unlock()
|
||||
}
|
||||
|
||||
// Resume resumes periodic compactor.
|
||||
func (t *Periodic) Resume() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.paused = false
|
||||
func (pc *Periodic) Resume() {
|
||||
pc.mu.Lock()
|
||||
pc.paused = false
|
||||
pc.mu.Unlock()
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/coreos/etcd/pkg/testutil"
|
||||
|
||||
"github.com/jonboulle/clockwork"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestPeriodicHourly(t *testing.T) {
|
||||
|
@ -32,7 +33,7 @@ func TestPeriodicHourly(t *testing.T) {
|
|||
fc := clockwork.NewFakeClock()
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
tb := newPeriodic(fc, retentionDuration, rg, compactable)
|
||||
tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)
|
||||
|
||||
tb.Run()
|
||||
defer tb.Stop()
|
||||
|
@ -83,7 +84,7 @@ func TestPeriodicMinutes(t *testing.T) {
|
|||
fc := clockwork.NewFakeClock()
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
tb := newPeriodic(fc, retentionDuration, rg, compactable)
|
||||
tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)
|
||||
|
||||
tb.Run()
|
||||
defer tb.Stop()
|
||||
|
@ -131,7 +132,7 @@ func TestPeriodicPause(t *testing.T) {
|
|||
retentionDuration := time.Hour
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
tb := newPeriodic(fc, retentionDuration, rg, compactable)
|
||||
tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)
|
||||
|
||||
tb.Run()
|
||||
tb.Pause()
|
||||
|
|
|
@ -23,11 +23,14 @@ import (
|
|||
"github.com/coreos/etcd/mvcc"
|
||||
|
||||
"github.com/jonboulle/clockwork"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Revision compacts the log by purging revisions older than
|
||||
// the configured reivison number. Compaction happens every 5 minutes.
|
||||
type Revision struct {
|
||||
lg *zap.Logger
|
||||
|
||||
clock clockwork.Clock
|
||||
retention int64
|
||||
|
||||
|
@ -41,75 +44,100 @@ type Revision struct {
|
|||
paused bool
|
||||
}
|
||||
|
||||
// NewRevision creates a new instance of Revisonal compactor that purges
|
||||
// newRevision creates a new instance of Revisonal compactor that purges
|
||||
// the log older than retention revisions from the current revision.
|
||||
func NewRevision(retention int64, rg RevGetter, c Compactable) *Revision {
|
||||
return newRevision(clockwork.NewRealClock(), retention, rg, c)
|
||||
}
|
||||
|
||||
func newRevision(clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
|
||||
t := &Revision{
|
||||
func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
|
||||
rc := &Revision{
|
||||
lg: lg,
|
||||
clock: clock,
|
||||
retention: retention,
|
||||
rg: rg,
|
||||
c: c,
|
||||
}
|
||||
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||
return t
|
||||
rc.ctx, rc.cancel = context.WithCancel(context.Background())
|
||||
return rc
|
||||
}
|
||||
|
||||
const revInterval = 5 * time.Minute
|
||||
|
||||
// Run runs revision-based compactor.
|
||||
func (t *Revision) Run() {
|
||||
func (rc *Revision) Run() {
|
||||
prev := int64(0)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-t.ctx.Done():
|
||||
case <-rc.ctx.Done():
|
||||
return
|
||||
case <-t.clock.After(revInterval):
|
||||
t.mu.Lock()
|
||||
p := t.paused
|
||||
t.mu.Unlock()
|
||||
case <-rc.clock.After(revInterval):
|
||||
rc.mu.Lock()
|
||||
p := rc.paused
|
||||
rc.mu.Unlock()
|
||||
if p {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
rev := t.rg.Rev() - t.retention
|
||||
rev := rc.rg.Rev() - rc.retention
|
||||
if rev <= 0 || rev == prev {
|
||||
continue
|
||||
}
|
||||
|
||||
plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, t.retention)
|
||||
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
|
||||
now := time.Now()
|
||||
if rc.lg != nil {
|
||||
rc.lg.Info(
|
||||
"starting auto revision compaction",
|
||||
zap.Int64("revision", rev),
|
||||
zap.Int64("revision-compaction-retention", rc.retention),
|
||||
)
|
||||
} else {
|
||||
plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, rc.retention)
|
||||
}
|
||||
_, err := rc.c.Compact(rc.ctx, &pb.CompactionRequest{Revision: rev})
|
||||
if err == nil || err == mvcc.ErrCompacted {
|
||||
prev = rev
|
||||
plog.Noticef("Finished auto-compaction at revision %d", rev)
|
||||
if rc.lg != nil {
|
||||
rc.lg.Info(
|
||||
"completed auto revision compaction",
|
||||
zap.Int64("revision", rev),
|
||||
zap.Int64("revision-compaction-retention", rc.retention),
|
||||
zap.Duration("took", time.Since(now)),
|
||||
)
|
||||
} else {
|
||||
plog.Noticef("Finished auto-compaction at revision %d", rev)
|
||||
}
|
||||
} else {
|
||||
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
|
||||
plog.Noticef("Retry after %v", revInterval)
|
||||
if rc.lg != nil {
|
||||
rc.lg.Warn(
|
||||
"failed auto revision compaction",
|
||||
zap.Int64("revision", rev),
|
||||
zap.Int64("revision-compaction-retention", rc.retention),
|
||||
zap.Duration("retry-interval", revInterval),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
|
||||
plog.Noticef("Retry after %v", revInterval)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop stops revision-based compactor.
|
||||
func (t *Revision) Stop() {
|
||||
t.cancel()
|
||||
func (rc *Revision) Stop() {
|
||||
rc.cancel()
|
||||
}
|
||||
|
||||
// Pause pauses revision-based compactor.
|
||||
func (t *Revision) Pause() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.paused = true
|
||||
func (rc *Revision) Pause() {
|
||||
rc.mu.Lock()
|
||||
rc.paused = true
|
||||
rc.mu.Unlock()
|
||||
}
|
||||
|
||||
// Resume resumes revision-based compactor.
|
||||
func (t *Revision) Resume() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.paused = false
|
||||
func (rc *Revision) Resume() {
|
||||
rc.mu.Lock()
|
||||
rc.paused = false
|
||||
rc.mu.Unlock()
|
||||
}
|
||||
|
|
|
@ -23,13 +23,14 @@ import (
|
|||
"github.com/coreos/etcd/pkg/testutil"
|
||||
|
||||
"github.com/jonboulle/clockwork"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestRevision(t *testing.T) {
|
||||
fc := clockwork.NewFakeClock()
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
tb := newRevision(fc, 10, rg, compactable)
|
||||
tb := newRevision(zap.NewExample(), fc, 10, rg, compactable)
|
||||
|
||||
tb.Run()
|
||||
defer tb.Stop()
|
||||
|
@ -72,7 +73,7 @@ func TestRevisionPause(t *testing.T) {
|
|||
fc := clockwork.NewFakeClock()
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
tb := newRevision(fc, 10, rg, compactable)
|
||||
tb := newRevision(zap.NewExample(), fc, 10, rg, compactable)
|
||||
|
||||
tb.Run()
|
||||
tb.Pause()
|
||||
|
|
|
@ -35,6 +35,7 @@ import (
|
|||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/jonboulle/clockwork"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -59,8 +60,8 @@ var (
|
|||
|
||||
// JoinCluster will connect to the discovery service at the given url, and
|
||||
// register the server represented by the given id and config to the cluster
|
||||
func JoinCluster(durl, dproxyurl string, id types.ID, config string) (string, error) {
|
||||
d, err := newDiscovery(durl, dproxyurl, id)
|
||||
func JoinCluster(lg *zap.Logger, durl, dproxyurl string, id types.ID, config string) (string, error) {
|
||||
d, err := newDiscovery(lg, durl, dproxyurl, id)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -69,8 +70,8 @@ func JoinCluster(durl, dproxyurl string, id types.ID, config string) (string, er
|
|||
|
||||
// GetCluster will connect to the discovery service at the given url and
|
||||
// retrieve a string describing the cluster
|
||||
func GetCluster(durl, dproxyurl string) (string, error) {
|
||||
d, err := newDiscovery(durl, dproxyurl, 0)
|
||||
func GetCluster(lg *zap.Logger, durl, dproxyurl string) (string, error) {
|
||||
d, err := newDiscovery(lg, durl, dproxyurl, 0)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -78,6 +79,7 @@ func GetCluster(durl, dproxyurl string) (string, error) {
|
|||
}
|
||||
|
||||
type discovery struct {
|
||||
lg *zap.Logger
|
||||
cluster string
|
||||
id types.ID
|
||||
c client.KeysAPI
|
||||
|
@ -90,7 +92,7 @@ type discovery struct {
|
|||
// newProxyFunc builds a proxy function from the given string, which should
|
||||
// represent a URL that can be used as a proxy. It performs basic
|
||||
// sanitization of the URL and returns any error encountered.
|
||||
func newProxyFunc(proxy string) (func(*http.Request) (*url.URL, error), error) {
|
||||
func newProxyFunc(lg *zap.Logger, proxy string) (func(*http.Request) (*url.URL, error), error) {
|
||||
if proxy == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -111,18 +113,22 @@ func newProxyFunc(proxy string) (func(*http.Request) (*url.URL, error), error) {
|
|||
return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err)
|
||||
}
|
||||
|
||||
plog.Infof("using proxy %q", proxyURL.String())
|
||||
if lg != nil {
|
||||
lg.Info("running proxy with discovery", zap.String("proxy-url", proxyURL.String()))
|
||||
} else {
|
||||
plog.Infof("using proxy %q", proxyURL.String())
|
||||
}
|
||||
return http.ProxyURL(proxyURL), nil
|
||||
}
|
||||
|
||||
func newDiscovery(durl, dproxyurl string, id types.ID) (*discovery, error) {
|
||||
func newDiscovery(lg *zap.Logger, durl, dproxyurl string, id types.ID) (*discovery, error) {
|
||||
u, err := url.Parse(durl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
token := u.Path
|
||||
u.Path = ""
|
||||
pf, err := newProxyFunc(dproxyurl)
|
||||
pf, err := newProxyFunc(lg, dproxyurl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -143,6 +149,7 @@ func newDiscovery(durl, dproxyurl string, id types.ID) (*discovery, error) {
|
|||
}
|
||||
dc := client.NewKeysAPIWithPrefix(c, "")
|
||||
return &discovery{
|
||||
lg: lg,
|
||||
cluster: token,
|
||||
c: dc,
|
||||
id: id,
|
||||
|
@ -225,7 +232,17 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
|
|||
return nil, 0, 0, ErrBadDiscoveryEndpoint
|
||||
}
|
||||
if ce, ok := err.(*client.ClusterError); ok {
|
||||
plog.Error(ce.Detail())
|
||||
if d.lg != nil {
|
||||
d.lg.Warn(
|
||||
"failed to get from discovery server",
|
||||
zap.String("discovery-url", d.url.String()),
|
||||
zap.String("path", path.Join(configKey, "size")),
|
||||
zap.Error(err),
|
||||
zap.String("err-detail", ce.Detail()),
|
||||
)
|
||||
} else {
|
||||
plog.Error(ce.Detail())
|
||||
}
|
||||
return d.checkClusterRetry()
|
||||
}
|
||||
return nil, 0, 0, err
|
||||
|
@ -240,7 +257,17 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
|
|||
cancel()
|
||||
if err != nil {
|
||||
if ce, ok := err.(*client.ClusterError); ok {
|
||||
plog.Error(ce.Detail())
|
||||
if d.lg != nil {
|
||||
d.lg.Warn(
|
||||
"failed to get from discovery server",
|
||||
zap.String("discovery-url", d.url.String()),
|
||||
zap.String("path", d.cluster),
|
||||
zap.Error(err),
|
||||
zap.String("err-detail", ce.Detail()),
|
||||
)
|
||||
} else {
|
||||
plog.Error(ce.Detail())
|
||||
}
|
||||
return d.checkClusterRetry()
|
||||
}
|
||||
return nil, 0, 0, err
|
||||
|
@ -276,7 +303,16 @@ func (d *discovery) logAndBackoffForRetry(step string) {
|
|||
retries = maxExpoentialRetries
|
||||
}
|
||||
retryTimeInSecond := time.Duration(0x1<<retries) * time.Second
|
||||
plog.Infof("%s: error connecting to %s, retrying in %s", step, d.url, retryTimeInSecond)
|
||||
if d.lg != nil {
|
||||
d.lg.Info(
|
||||
"retry connecting to discovery service",
|
||||
zap.String("url", d.url.String()),
|
||||
zap.String("reason", step),
|
||||
zap.Duration("backoff", retryTimeInSecond),
|
||||
)
|
||||
} else {
|
||||
plog.Infof("%s: error connecting to %s, retrying in %s", step, d.url, retryTimeInSecond)
|
||||
}
|
||||
d.clock.Sleep(retryTimeInSecond)
|
||||
}
|
||||
|
||||
|
@ -310,15 +346,40 @@ func (d *discovery) waitNodes(nodes []*client.Node, size int, index uint64) ([]*
|
|||
copy(all, nodes)
|
||||
for _, n := range all {
|
||||
if path.Base(n.Key) == path.Base(d.selfKey()) {
|
||||
plog.Noticef("found self %s in the cluster", path.Base(d.selfKey()))
|
||||
if d.lg != nil {
|
||||
d.lg.Info(
|
||||
"found self from discovery server",
|
||||
zap.String("discovery-url", d.url.String()),
|
||||
zap.String("self", path.Base(d.selfKey())),
|
||||
)
|
||||
} else {
|
||||
plog.Noticef("found self %s in the cluster", path.Base(d.selfKey()))
|
||||
}
|
||||
} else {
|
||||
plog.Noticef("found peer %s in the cluster", path.Base(n.Key))
|
||||
if d.lg != nil {
|
||||
d.lg.Info(
|
||||
"found peer from discovery server",
|
||||
zap.String("discovery-url", d.url.String()),
|
||||
zap.String("peer", path.Base(n.Key)),
|
||||
)
|
||||
} else {
|
||||
plog.Noticef("found peer %s in the cluster", path.Base(n.Key))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// wait for others
|
||||
for len(all) < size {
|
||||
plog.Noticef("found %d peer(s), waiting for %d more", len(all), size-len(all))
|
||||
if d.lg != nil {
|
||||
d.lg.Info(
|
||||
"found peers from discovery server; waiting for more",
|
||||
zap.String("discovery-url", d.url.String()),
|
||||
zap.Int("found-peers", len(all)),
|
||||
zap.Int("needed-peers", size-len(all)),
|
||||
)
|
||||
} else {
|
||||
plog.Noticef("found %d peer(s), waiting for %d more", len(all), size-len(all))
|
||||
}
|
||||
resp, err := w.Next(context.Background())
|
||||
if err != nil {
|
||||
if ce, ok := err.(*client.ClusterError); ok {
|
||||
|
@ -327,10 +388,26 @@ func (d *discovery) waitNodes(nodes []*client.Node, size int, index uint64) ([]*
|
|||
}
|
||||
return nil, err
|
||||
}
|
||||
plog.Noticef("found peer %s in the cluster", path.Base(resp.Node.Key))
|
||||
if d.lg != nil {
|
||||
d.lg.Info(
|
||||
"found peer from discovery server",
|
||||
zap.String("discovery-url", d.url.String()),
|
||||
zap.String("peer", path.Base(resp.Node.Key)),
|
||||
)
|
||||
} else {
|
||||
plog.Noticef("found peer %s in the cluster", path.Base(resp.Node.Key))
|
||||
}
|
||||
all = append(all, resp.Node)
|
||||
}
|
||||
plog.Noticef("found %d needed peer(s)", len(all))
|
||||
if d.lg != nil {
|
||||
d.lg.Info(
|
||||
"found all needed peers from discovery server",
|
||||
zap.String("discovery-url", d.url.String()),
|
||||
zap.Int("found-peers", len(all)),
|
||||
)
|
||||
} else {
|
||||
plog.Noticef("found %d needed peer(s)", len(all))
|
||||
}
|
||||
return all, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/coreos/etcd/client"
|
||||
|
||||
"github.com/jonboulle/clockwork"
|
||||
|
@ -36,7 +38,7 @@ const (
|
|||
)
|
||||
|
||||
func TestNewProxyFuncUnset(t *testing.T) {
|
||||
pf, err := newProxyFunc("")
|
||||
pf, err := newProxyFunc(zap.NewExample(), "")
|
||||
if pf != nil {
|
||||
t.Fatal("unexpected non-nil proxyFunc")
|
||||
}
|
||||
|
@ -51,7 +53,7 @@ func TestNewProxyFuncBad(t *testing.T) {
|
|||
"http://foo.com/%1",
|
||||
}
|
||||
for i, in := range tests {
|
||||
pf, err := newProxyFunc(in)
|
||||
pf, err := newProxyFunc(zap.NewExample(), in)
|
||||
if pf != nil {
|
||||
t.Errorf("#%d: unexpected non-nil proxyFunc", i)
|
||||
}
|
||||
|
@ -67,7 +69,7 @@ func TestNewProxyFunc(t *testing.T) {
|
|||
"http://disco.foo.bar": "http://disco.foo.bar",
|
||||
}
|
||||
for in, w := range tests {
|
||||
pf, err := newProxyFunc(in)
|
||||
pf, err := newProxyFunc(zap.NewExample(), in)
|
||||
if pf == nil {
|
||||
t.Errorf("%s: unexpected nil proxyFunc", in)
|
||||
continue
|
||||
|
|
299
embed/config.go
299
embed/config.go
|
@ -16,7 +16,6 @@ package embed
|
|||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
|
@ -24,28 +23,22 @@ import (
|
|||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/compactor"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/pkg/flags"
|
||||
"github.com/coreos/etcd/pkg/logutil"
|
||||
"github.com/coreos/etcd/pkg/netutil"
|
||||
"github.com/coreos/etcd/pkg/srv"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/ghodss/yaml"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -278,6 +271,7 @@ type Config struct {
|
|||
// - file path to append server logs to.
|
||||
// It can be multiple when "Logger" is zap.
|
||||
LogOutputs []string `json:"log-outputs"`
|
||||
|
||||
// Debug is true, to enable debug level logging.
|
||||
Debug bool `json:"debug"`
|
||||
|
||||
|
@ -399,259 +393,6 @@ func logTLSHandshakeFailure(conn *tls.Conn, err error) {
|
|||
}
|
||||
}
|
||||
|
||||
// GetLogger returns the logger.
|
||||
func (cfg Config) GetLogger() *zap.Logger {
|
||||
cfg.loggerMu.RLock()
|
||||
l := cfg.logger
|
||||
cfg.loggerMu.RUnlock()
|
||||
return l
|
||||
}
|
||||
|
||||
// for testing
|
||||
var grpcLogOnce = new(sync.Once)
|
||||
|
||||
// setupLogging initializes etcd logging.
|
||||
// Must be called after flag parsing or finishing configuring embed.Config.
|
||||
func (cfg *Config) setupLogging() error {
|
||||
// handle "DeprecatedLogOutput" in v3.4
|
||||
// TODO: remove "DeprecatedLogOutput" in v3.5
|
||||
len1 := len(cfg.DeprecatedLogOutput)
|
||||
len2 := len(cfg.LogOutputs)
|
||||
if len1 != len2 {
|
||||
switch {
|
||||
case len1 > len2: // deprecate "log-output" flag is used
|
||||
fmt.Fprintln(os.Stderr, "'--log-output' flag has been deprecated! Please use '--log-outputs'!")
|
||||
cfg.LogOutputs = cfg.DeprecatedLogOutput
|
||||
case len1 < len2: // "--log-outputs" flag has been set with multiple writers
|
||||
cfg.DeprecatedLogOutput = []string{}
|
||||
}
|
||||
} else {
|
||||
if len1 > 1 {
|
||||
return errors.New("both '--log-output' and '--log-outputs' are set; only set '--log-outputs'")
|
||||
}
|
||||
if len1 < 1 {
|
||||
return errors.New("either '--log-output' or '--log-outputs' flag must be set")
|
||||
}
|
||||
if reflect.DeepEqual(cfg.DeprecatedLogOutput, cfg.LogOutputs) && cfg.DeprecatedLogOutput[0] != DefaultLogOutput {
|
||||
return fmt.Errorf("'--log-output=%q' and '--log-outputs=%q' are incompatible; only set --log-outputs", cfg.DeprecatedLogOutput, cfg.LogOutputs)
|
||||
}
|
||||
if !reflect.DeepEqual(cfg.DeprecatedLogOutput, []string{DefaultLogOutput}) {
|
||||
fmt.Fprintf(os.Stderr, "Deprecated '--log-output' flag is set to %q\n", cfg.DeprecatedLogOutput)
|
||||
fmt.Fprintln(os.Stderr, "Please use '--log-outputs' flag")
|
||||
}
|
||||
}
|
||||
|
||||
switch cfg.Logger {
|
||||
case "capnslog": // TODO: deprecate this in v3.5
|
||||
cfg.ClientTLSInfo.HandshakeFailure = logTLSHandshakeFailure
|
||||
cfg.PeerTLSInfo.HandshakeFailure = logTLSHandshakeFailure
|
||||
|
||||
if cfg.Debug {
|
||||
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
|
||||
grpc.EnableTracing = true
|
||||
// enable info, warning, error
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
|
||||
} else {
|
||||
capnslog.SetGlobalLogLevel(capnslog.INFO)
|
||||
// only discard info
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
|
||||
}
|
||||
|
||||
// TODO: deprecate with "capnslog"
|
||||
if cfg.LogPkgLevels != "" {
|
||||
repoLog := capnslog.MustRepoLogger("github.com/coreos/etcd")
|
||||
settings, err := repoLog.ParseLogLevelConfig(cfg.LogPkgLevels)
|
||||
if err != nil {
|
||||
plog.Warningf("couldn't parse log level string: %s, continuing with default levels", err.Error())
|
||||
return nil
|
||||
}
|
||||
repoLog.SetLogLevel(settings)
|
||||
}
|
||||
|
||||
if len(cfg.LogOutputs) != 1 {
|
||||
fmt.Printf("--logger=capnslog supports only 1 value in '--log-outputs', got %q\n", cfg.LogOutputs)
|
||||
os.Exit(1)
|
||||
}
|
||||
// capnslog initially SetFormatter(NewDefaultFormatter(os.Stderr))
|
||||
// where NewDefaultFormatter returns NewJournaldFormatter when syscall.Getppid() == 1
|
||||
// specify 'stdout' or 'stderr' to skip journald logging even when running under systemd
|
||||
output := cfg.LogOutputs[0]
|
||||
switch output {
|
||||
case "stdout":
|
||||
capnslog.SetFormatter(capnslog.NewPrettyFormatter(os.Stdout, cfg.Debug))
|
||||
case "stderr":
|
||||
capnslog.SetFormatter(capnslog.NewPrettyFormatter(os.Stderr, cfg.Debug))
|
||||
case DefaultLogOutput:
|
||||
default:
|
||||
plog.Panicf(`unknown log-output %q (only supports %q, "stdout", "stderr")`, output, DefaultLogOutput)
|
||||
}
|
||||
|
||||
case "zap":
|
||||
if len(cfg.LogOutputs) == 0 {
|
||||
cfg.LogOutputs = []string{DefaultLogOutput}
|
||||
}
|
||||
if len(cfg.LogOutputs) > 1 {
|
||||
for _, v := range cfg.LogOutputs {
|
||||
if v == DefaultLogOutput {
|
||||
panic(fmt.Errorf("multi logoutput for %q is not supported yet", DefaultLogOutput))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: use zapcore to support more features?
|
||||
lcfg := zap.Config{
|
||||
Level: zap.NewAtomicLevelAt(zap.InfoLevel),
|
||||
Development: false,
|
||||
Sampling: &zap.SamplingConfig{
|
||||
Initial: 100,
|
||||
Thereafter: 100,
|
||||
},
|
||||
Encoding: "json",
|
||||
EncoderConfig: zap.NewProductionEncoderConfig(),
|
||||
|
||||
OutputPaths: make([]string, 0),
|
||||
ErrorOutputPaths: make([]string, 0),
|
||||
}
|
||||
outputPaths, errOutputPaths := make(map[string]struct{}), make(map[string]struct{})
|
||||
isJournald := false
|
||||
for _, v := range cfg.LogOutputs {
|
||||
switch v {
|
||||
case DefaultLogOutput:
|
||||
if syscall.Getppid() == 1 {
|
||||
// capnslog initially SetFormatter(NewDefaultFormatter(os.Stderr))
|
||||
// where "NewDefaultFormatter" returns "NewJournaldFormatter"
|
||||
// specify 'stdout' or 'stderr' to override this redirects
|
||||
// when syscall.Getppid() == 1
|
||||
isJournald = true
|
||||
break
|
||||
}
|
||||
|
||||
outputPaths["stderr"] = struct{}{}
|
||||
errOutputPaths["stderr"] = struct{}{}
|
||||
|
||||
case "stderr":
|
||||
outputPaths["stderr"] = struct{}{}
|
||||
errOutputPaths["stderr"] = struct{}{}
|
||||
|
||||
case "stdout":
|
||||
outputPaths["stdout"] = struct{}{}
|
||||
errOutputPaths["stdout"] = struct{}{}
|
||||
|
||||
default:
|
||||
outputPaths[v] = struct{}{}
|
||||
errOutputPaths[v] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
if !isJournald {
|
||||
for v := range outputPaths {
|
||||
lcfg.OutputPaths = append(lcfg.OutputPaths, v)
|
||||
}
|
||||
for v := range errOutputPaths {
|
||||
lcfg.ErrorOutputPaths = append(lcfg.ErrorOutputPaths, v)
|
||||
}
|
||||
sort.Strings(lcfg.OutputPaths)
|
||||
sort.Strings(lcfg.ErrorOutputPaths)
|
||||
|
||||
if cfg.Debug {
|
||||
lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
|
||||
grpc.EnableTracing = true
|
||||
}
|
||||
|
||||
var err error
|
||||
cfg.logger, err = lcfg.Build()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cfg.loggerConfig = &lcfg
|
||||
cfg.loggerCore = nil
|
||||
cfg.loggerWriteSyncer = nil
|
||||
|
||||
grpcLogOnce.Do(func() {
|
||||
// debug true, enable info, warning, error
|
||||
// debug false, only discard info
|
||||
var gl grpclog.LoggerV2
|
||||
gl, err = logutil.NewGRPCLoggerV2(lcfg)
|
||||
if err == nil {
|
||||
grpclog.SetLoggerV2(gl)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if len(cfg.LogOutputs) > 1 {
|
||||
for _, v := range cfg.LogOutputs {
|
||||
if v != DefaultLogOutput {
|
||||
return fmt.Errorf("running as a systemd unit but other '--log-output' values (%q) are configured with 'default'; override 'default' value with something else", cfg.LogOutputs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// use stderr as fallback
|
||||
syncer := getZapWriteSyncer()
|
||||
lvl := zap.NewAtomicLevelAt(zap.InfoLevel)
|
||||
if cfg.Debug {
|
||||
lvl = zap.NewAtomicLevelAt(zap.DebugLevel)
|
||||
grpc.EnableTracing = true
|
||||
}
|
||||
|
||||
// WARN: do not change field names in encoder config
|
||||
// journald logging writer assumes field names of "level" and "caller"
|
||||
cr := zapcore.NewCore(
|
||||
zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()),
|
||||
syncer,
|
||||
lvl,
|
||||
)
|
||||
cfg.logger = zap.New(cr, zap.AddCaller(), zap.ErrorOutput(syncer))
|
||||
|
||||
cfg.loggerConfig = nil
|
||||
cfg.loggerCore = cr
|
||||
cfg.loggerWriteSyncer = syncer
|
||||
|
||||
grpcLogOnce.Do(func() {
|
||||
grpclog.SetLoggerV2(logutil.NewGRPCLoggerV2FromZapCore(cr, syncer))
|
||||
})
|
||||
}
|
||||
|
||||
logTLSHandshakeFailure := func(conn *tls.Conn, err error) {
|
||||
state := conn.ConnectionState()
|
||||
remoteAddr := conn.RemoteAddr().String()
|
||||
serverName := state.ServerName
|
||||
if len(state.PeerCertificates) > 0 {
|
||||
cert := state.PeerCertificates[0]
|
||||
ips := make([]string, 0, len(cert.IPAddresses))
|
||||
for i := range cert.IPAddresses {
|
||||
ips[i] = cert.IPAddresses[i].String()
|
||||
}
|
||||
cfg.logger.Warn(
|
||||
"rejected connection",
|
||||
zap.String("remote-addr", remoteAddr),
|
||||
zap.String("server-name", serverName),
|
||||
zap.Strings("ip-addresses", ips),
|
||||
zap.Strings("dns-names", cert.DNSNames),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
cfg.logger.Warn(
|
||||
"rejected connection",
|
||||
zap.String("remote-addr", remoteAddr),
|
||||
zap.String("server-name", serverName),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
cfg.ClientTLSInfo.HandshakeFailure = logTLSHandshakeFailure
|
||||
cfg.PeerTLSInfo.HandshakeFailure = logTLSHandshakeFailure
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown logger option %q", cfg.Logger)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func ConfigFromFile(path string) (*Config, error) {
|
||||
cfg := &configYAML{Config: *NewConfig()}
|
||||
if err := cfg.configFromFile(path); err != nil {
|
||||
|
@ -826,12 +567,13 @@ func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, tok
|
|||
// self's advertised peer URLs
|
||||
urlsmap[cfg.Name] = cfg.APUrls
|
||||
token = cfg.Durl
|
||||
|
||||
case cfg.DNSCluster != "":
|
||||
clusterStrs, cerr := cfg.GetDNSClusterNames()
|
||||
lg := cfg.logger
|
||||
if cerr != nil {
|
||||
if lg != nil {
|
||||
lg.Error("failed to resolve during SRV discovery", zap.Error(cerr))
|
||||
lg.Warn("failed to resolve during SRV discovery", zap.Error(cerr))
|
||||
} else {
|
||||
plog.Errorf("couldn't resolve during SRV discovery (%v)", cerr)
|
||||
}
|
||||
|
@ -856,6 +598,7 @@ func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, tok
|
|||
return nil, "", fmt.Errorf("cannot find local etcd member %q in SRV records", cfg.Name)
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
// We're statically configured, and cluster has appropriately been set.
|
||||
urlsmap, err = types.NewURLsMap(cfg.InitialCluster)
|
||||
|
@ -873,15 +616,45 @@ func (cfg *Config) GetDNSClusterNames() ([]string, error) {
|
|||
if cfg.DNSClusterServiceName != "" {
|
||||
serviceNameSuffix = "-" + cfg.DNSClusterServiceName
|
||||
}
|
||||
// Use both etcd-server-ssl and etcd-server for discovery. Combine the results if both are available.
|
||||
|
||||
lg := cfg.GetLogger()
|
||||
|
||||
// Use both etcd-server-ssl and etcd-server for discovery.
|
||||
// Combine the results if both are available.
|
||||
clusterStrs, cerr = srv.GetCluster("https", "etcd-server-ssl"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.APUrls)
|
||||
defaultHTTPClusterStrs, httpCerr := srv.GetCluster("http", "etcd-server"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.APUrls)
|
||||
if cerr != nil {
|
||||
clusterStrs = make([]string, 0)
|
||||
}
|
||||
if lg != nil {
|
||||
lg.Info(
|
||||
"get cluster for etcd-server-ssl SRV",
|
||||
zap.String("service-scheme", "https"),
|
||||
zap.String("service-name", "etcd-server-ssl"+serviceNameSuffix),
|
||||
zap.String("server-name", cfg.Name),
|
||||
zap.String("discovery-srv", cfg.DNSCluster),
|
||||
zap.Strings("advertise-peer-urls", cfg.getAPURLs()),
|
||||
zap.Strings("found-cluster", clusterStrs),
|
||||
zap.Error(cerr),
|
||||
)
|
||||
}
|
||||
|
||||
defaultHTTPClusterStrs, httpCerr := srv.GetCluster("http", "etcd-server"+serviceNameSuffix, cfg.Name, cfg.DNSCluster, cfg.APUrls)
|
||||
if httpCerr != nil {
|
||||
clusterStrs = append(clusterStrs, defaultHTTPClusterStrs...)
|
||||
}
|
||||
if lg != nil {
|
||||
lg.Info(
|
||||
"get cluster for etcd-server SRV",
|
||||
zap.String("service-scheme", "http"),
|
||||
zap.String("service-name", "etcd-server"+serviceNameSuffix),
|
||||
zap.String("server-name", cfg.Name),
|
||||
zap.String("discovery-srv", cfg.DNSCluster),
|
||||
zap.Strings("advertise-peer-urls", cfg.getAPURLs()),
|
||||
zap.Strings("found-cluster", clusterStrs),
|
||||
zap.Error(httpCerr),
|
||||
)
|
||||
}
|
||||
|
||||
return clusterStrs, cerr
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,288 @@
|
|||
// Copyright 2018 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package embed
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"github.com/coreos/etcd/pkg/logutil"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
)
|
||||
|
||||
// GetLogger returns the logger.
|
||||
func (cfg Config) GetLogger() *zap.Logger {
|
||||
cfg.loggerMu.RLock()
|
||||
l := cfg.logger
|
||||
cfg.loggerMu.RUnlock()
|
||||
return l
|
||||
}
|
||||
|
||||
// for testing
|
||||
var grpcLogOnce = new(sync.Once)
|
||||
|
||||
// setupLogging initializes etcd logging.
|
||||
// Must be called after flag parsing or finishing configuring embed.Config.
|
||||
func (cfg *Config) setupLogging() error {
|
||||
// handle "DeprecatedLogOutput" in v3.4
|
||||
// TODO: remove "DeprecatedLogOutput" in v3.5
|
||||
len1 := len(cfg.DeprecatedLogOutput)
|
||||
len2 := len(cfg.LogOutputs)
|
||||
if len1 != len2 {
|
||||
switch {
|
||||
case len1 > len2: // deprecate "log-output" flag is used
|
||||
fmt.Fprintln(os.Stderr, "'--log-output' flag has been deprecated! Please use '--log-outputs'!")
|
||||
cfg.LogOutputs = cfg.DeprecatedLogOutput
|
||||
case len1 < len2: // "--log-outputs" flag has been set with multiple writers
|
||||
cfg.DeprecatedLogOutput = []string{}
|
||||
}
|
||||
} else {
|
||||
if len1 > 1 {
|
||||
return errors.New("both '--log-output' and '--log-outputs' are set; only set '--log-outputs'")
|
||||
}
|
||||
if len1 < 1 {
|
||||
return errors.New("either '--log-output' or '--log-outputs' flag must be set")
|
||||
}
|
||||
if reflect.DeepEqual(cfg.DeprecatedLogOutput, cfg.LogOutputs) && cfg.DeprecatedLogOutput[0] != DefaultLogOutput {
|
||||
return fmt.Errorf("'--log-output=%q' and '--log-outputs=%q' are incompatible; only set --log-outputs", cfg.DeprecatedLogOutput, cfg.LogOutputs)
|
||||
}
|
||||
if !reflect.DeepEqual(cfg.DeprecatedLogOutput, []string{DefaultLogOutput}) {
|
||||
fmt.Fprintf(os.Stderr, "Deprecated '--log-output' flag is set to %q\n", cfg.DeprecatedLogOutput)
|
||||
fmt.Fprintln(os.Stderr, "Please use '--log-outputs' flag")
|
||||
}
|
||||
}
|
||||
|
||||
switch cfg.Logger {
|
||||
case "capnslog": // TODO: deprecate this in v3.5
|
||||
cfg.ClientTLSInfo.HandshakeFailure = logTLSHandshakeFailure
|
||||
cfg.PeerTLSInfo.HandshakeFailure = logTLSHandshakeFailure
|
||||
|
||||
if cfg.Debug {
|
||||
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
|
||||
grpc.EnableTracing = true
|
||||
// enable info, warning, error
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
|
||||
} else {
|
||||
capnslog.SetGlobalLogLevel(capnslog.INFO)
|
||||
// only discard info
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
|
||||
}
|
||||
|
||||
// TODO: deprecate with "capnslog"
|
||||
if cfg.LogPkgLevels != "" {
|
||||
repoLog := capnslog.MustRepoLogger("github.com/coreos/etcd")
|
||||
settings, err := repoLog.ParseLogLevelConfig(cfg.LogPkgLevels)
|
||||
if err != nil {
|
||||
plog.Warningf("couldn't parse log level string: %s, continuing with default levels", err.Error())
|
||||
return nil
|
||||
}
|
||||
repoLog.SetLogLevel(settings)
|
||||
}
|
||||
|
||||
if len(cfg.LogOutputs) != 1 {
|
||||
fmt.Printf("--logger=capnslog supports only 1 value in '--log-outputs', got %q\n", cfg.LogOutputs)
|
||||
os.Exit(1)
|
||||
}
|
||||
// capnslog initially SetFormatter(NewDefaultFormatter(os.Stderr))
|
||||
// where NewDefaultFormatter returns NewJournaldFormatter when syscall.Getppid() == 1
|
||||
// specify 'stdout' or 'stderr' to skip journald logging even when running under systemd
|
||||
output := cfg.LogOutputs[0]
|
||||
switch output {
|
||||
case "stdout":
|
||||
capnslog.SetFormatter(capnslog.NewPrettyFormatter(os.Stdout, cfg.Debug))
|
||||
case "stderr":
|
||||
capnslog.SetFormatter(capnslog.NewPrettyFormatter(os.Stderr, cfg.Debug))
|
||||
case DefaultLogOutput:
|
||||
default:
|
||||
plog.Panicf(`unknown log-output %q (only supports %q, "stdout", "stderr")`, output, DefaultLogOutput)
|
||||
}
|
||||
|
||||
case "zap":
|
||||
if len(cfg.LogOutputs) == 0 {
|
||||
cfg.LogOutputs = []string{DefaultLogOutput}
|
||||
}
|
||||
if len(cfg.LogOutputs) > 1 {
|
||||
for _, v := range cfg.LogOutputs {
|
||||
if v == DefaultLogOutput {
|
||||
panic(fmt.Errorf("multi logoutput for %q is not supported yet", DefaultLogOutput))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: use zapcore to support more features?
|
||||
lcfg := zap.Config{
|
||||
Level: zap.NewAtomicLevelAt(zap.InfoLevel),
|
||||
Development: false,
|
||||
Sampling: &zap.SamplingConfig{
|
||||
Initial: 100,
|
||||
Thereafter: 100,
|
||||
},
|
||||
Encoding: "json",
|
||||
EncoderConfig: zap.NewProductionEncoderConfig(),
|
||||
|
||||
OutputPaths: make([]string, 0),
|
||||
ErrorOutputPaths: make([]string, 0),
|
||||
}
|
||||
outputPaths, errOutputPaths := make(map[string]struct{}), make(map[string]struct{})
|
||||
isJournald := false
|
||||
for _, v := range cfg.LogOutputs {
|
||||
switch v {
|
||||
case DefaultLogOutput:
|
||||
if syscall.Getppid() == 1 {
|
||||
// capnslog initially SetFormatter(NewDefaultFormatter(os.Stderr))
|
||||
// where "NewDefaultFormatter" returns "NewJournaldFormatter"
|
||||
// specify 'stdout' or 'stderr' to override this redirects
|
||||
// when syscall.Getppid() == 1
|
||||
isJournald = true
|
||||
break
|
||||
}
|
||||
|
||||
outputPaths["stderr"] = struct{}{}
|
||||
errOutputPaths["stderr"] = struct{}{}
|
||||
|
||||
case "stderr":
|
||||
outputPaths["stderr"] = struct{}{}
|
||||
errOutputPaths["stderr"] = struct{}{}
|
||||
|
||||
case "stdout":
|
||||
outputPaths["stdout"] = struct{}{}
|
||||
errOutputPaths["stdout"] = struct{}{}
|
||||
|
||||
default:
|
||||
outputPaths[v] = struct{}{}
|
||||
errOutputPaths[v] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
if !isJournald {
|
||||
for v := range outputPaths {
|
||||
lcfg.OutputPaths = append(lcfg.OutputPaths, v)
|
||||
}
|
||||
for v := range errOutputPaths {
|
||||
lcfg.ErrorOutputPaths = append(lcfg.ErrorOutputPaths, v)
|
||||
}
|
||||
sort.Strings(lcfg.OutputPaths)
|
||||
sort.Strings(lcfg.ErrorOutputPaths)
|
||||
|
||||
if cfg.Debug {
|
||||
lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
|
||||
grpc.EnableTracing = true
|
||||
}
|
||||
|
||||
var err error
|
||||
cfg.logger, err = lcfg.Build()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cfg.loggerConfig = &lcfg
|
||||
cfg.loggerCore = nil
|
||||
cfg.loggerWriteSyncer = nil
|
||||
|
||||
grpcLogOnce.Do(func() {
|
||||
// debug true, enable info, warning, error
|
||||
// debug false, only discard info
|
||||
var gl grpclog.LoggerV2
|
||||
gl, err = logutil.NewGRPCLoggerV2(lcfg)
|
||||
if err == nil {
|
||||
grpclog.SetLoggerV2(gl)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if len(cfg.LogOutputs) > 1 {
|
||||
for _, v := range cfg.LogOutputs {
|
||||
if v != DefaultLogOutput {
|
||||
return fmt.Errorf("running as a systemd unit but other '--log-output' values (%q) are configured with 'default'; override 'default' value with something else", cfg.LogOutputs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// use stderr as fallback
|
||||
syncer := getZapWriteSyncer()
|
||||
lvl := zap.NewAtomicLevelAt(zap.InfoLevel)
|
||||
if cfg.Debug {
|
||||
lvl = zap.NewAtomicLevelAt(zap.DebugLevel)
|
||||
grpc.EnableTracing = true
|
||||
}
|
||||
|
||||
// WARN: do not change field names in encoder config
|
||||
// journald logging writer assumes field names of "level" and "caller"
|
||||
cr := zapcore.NewCore(
|
||||
zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()),
|
||||
syncer,
|
||||
lvl,
|
||||
)
|
||||
cfg.logger = zap.New(cr, zap.AddCaller(), zap.ErrorOutput(syncer))
|
||||
|
||||
cfg.loggerConfig = nil
|
||||
cfg.loggerCore = cr
|
||||
cfg.loggerWriteSyncer = syncer
|
||||
|
||||
grpcLogOnce.Do(func() {
|
||||
grpclog.SetLoggerV2(logutil.NewGRPCLoggerV2FromZapCore(cr, syncer))
|
||||
})
|
||||
}
|
||||
|
||||
logTLSHandshakeFailure := func(conn *tls.Conn, err error) {
|
||||
state := conn.ConnectionState()
|
||||
remoteAddr := conn.RemoteAddr().String()
|
||||
serverName := state.ServerName
|
||||
if len(state.PeerCertificates) > 0 {
|
||||
cert := state.PeerCertificates[0]
|
||||
ips := make([]string, 0, len(cert.IPAddresses))
|
||||
for i := range cert.IPAddresses {
|
||||
ips[i] = cert.IPAddresses[i].String()
|
||||
}
|
||||
cfg.logger.Warn(
|
||||
"rejected connection",
|
||||
zap.String("remote-addr", remoteAddr),
|
||||
zap.String("server-name", serverName),
|
||||
zap.Strings("ip-addresses", ips),
|
||||
zap.Strings("dns-names", cert.DNSNames),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
cfg.logger.Warn(
|
||||
"rejected connection",
|
||||
zap.String("remote-addr", remoteAddr),
|
||||
zap.String("server-name", serverName),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
cfg.ClientTLSInfo.HandshakeFailure = logTLSHandshakeFailure
|
||||
cfg.PeerTLSInfo.HandshakeFailure = logTLSHandshakeFailure
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown logger option %q", cfg.Logger)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -20,9 +20,11 @@ import (
|
|||
"os"
|
||||
|
||||
"github.com/coreos/etcd/pkg/logutil"
|
||||
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
// use stderr as fallback
|
||||
func getZapWriteSyncer() zapcore.WriteSyncer {
|
||||
return zapcore.AddSync(logutil.NewJournaldWriter(os.Stderr))
|
||||
}
|
|
@ -28,6 +28,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap/zapcore"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2http"
|
||||
|
@ -273,6 +275,23 @@ func (e *Etcd) Config() Config {
|
|||
// Client requests will be terminated with request timeout.
|
||||
// After timeout, enforce remaning requests be closed immediately.
|
||||
func (e *Etcd) Close() {
|
||||
fields := []zapcore.Field{
|
||||
zap.String("name", e.cfg.Name),
|
||||
zap.String("data-dir", e.cfg.Dir),
|
||||
zap.Strings("advertise-peer-urls", e.cfg.getAPURLs()),
|
||||
zap.Strings("advertise-client-urls", e.cfg.getACURLs()),
|
||||
}
|
||||
lg := e.GetLogger()
|
||||
if lg != nil {
|
||||
lg.Info("closing etcd server", fields...)
|
||||
}
|
||||
defer func() {
|
||||
if lg != nil {
|
||||
lg.Info("closed etcd server", fields...)
|
||||
lg.Sync()
|
||||
}
|
||||
}()
|
||||
|
||||
e.closeOnce.Do(func() { close(e.stopc) })
|
||||
|
||||
// close client requests with request timeout
|
||||
|
@ -315,11 +334,6 @@ func (e *Etcd) Close() {
|
|||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
lg := e.GetLogger()
|
||||
if lg != nil {
|
||||
lg.Sync()
|
||||
}
|
||||
}
|
||||
|
||||
func stopServers(ctx context.Context, ss *servers) {
|
||||
|
|
|
@ -63,14 +63,14 @@ func startEtcdOrProxyV2() {
|
|||
if err != nil {
|
||||
lg := cfg.ec.GetLogger()
|
||||
if lg != nil {
|
||||
lg.Error("failed to verify flags", zap.Error(err))
|
||||
lg.Warn("failed to verify flags", zap.Error(err))
|
||||
} else {
|
||||
plog.Errorf("error verifying flags, %v. See 'etcd --help'.", err)
|
||||
}
|
||||
switch err {
|
||||
case embed.ErrUnsetAdvertiseClientURLsFlag:
|
||||
if lg != nil {
|
||||
lg.Error("advertise client URLs are not set", zap.Error(err))
|
||||
lg.Warn("advertise client URLs are not set", zap.Error(err))
|
||||
} else {
|
||||
plog.Errorf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.")
|
||||
}
|
||||
|
@ -143,7 +143,11 @@ func startEtcdOrProxyV2() {
|
|||
which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir)
|
||||
if which != dirEmpty {
|
||||
if lg != nil {
|
||||
|
||||
lg.Info(
|
||||
"server has been already initialized",
|
||||
zap.String("data-dir", cfg.ec.Dir),
|
||||
zap.String("dir-type", string(which)),
|
||||
)
|
||||
} else {
|
||||
plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which)
|
||||
}
|
||||
|
@ -153,7 +157,14 @@ func startEtcdOrProxyV2() {
|
|||
case dirProxy:
|
||||
err = startProxy(cfg)
|
||||
default:
|
||||
plog.Panicf("unhandled dir type %v", which)
|
||||
if lg != nil {
|
||||
lg.Panic(
|
||||
"unknown directory type",
|
||||
zap.String("dir-type", string(which)),
|
||||
)
|
||||
} else {
|
||||
plog.Panicf("unhandled dir type %v", which)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
shouldProxy := cfg.isProxy()
|
||||
|
@ -162,12 +173,20 @@ func startEtcdOrProxyV2() {
|
|||
if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == discovery.ErrFullCluster {
|
||||
if cfg.shouldFallbackToProxy() {
|
||||
if lg != nil {
|
||||
|
||||
lg.Warn(
|
||||
"discovery cluster is full, falling back to proxy",
|
||||
zap.String("fallback-proxy", fallbackFlagProxy),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy)
|
||||
}
|
||||
shouldProxy = true
|
||||
}
|
||||
} else if err != nil {
|
||||
if lg != nil {
|
||||
lg.Warn("failed to start etcd", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
if shouldProxy {
|
||||
|
@ -180,13 +199,13 @@ func startEtcdOrProxyV2() {
|
|||
switch derr.Err {
|
||||
case discovery.ErrDuplicateID:
|
||||
if lg != nil {
|
||||
lg.Error(
|
||||
lg.Warn(
|
||||
"member has been registered with discovery service",
|
||||
zap.String("name", cfg.ec.Name),
|
||||
zap.String("discovery-token", cfg.ec.Durl),
|
||||
zap.Error(derr.Err),
|
||||
)
|
||||
lg.Error(
|
||||
lg.Warn(
|
||||
"but could not find valid cluster configuration",
|
||||
zap.String("data-dir", cfg.ec.Dir),
|
||||
)
|
||||
|
@ -198,9 +217,10 @@ func startEtcdOrProxyV2() {
|
|||
plog.Infof("Please check the given data dir path if the previous bootstrap succeeded")
|
||||
plog.Infof("or use a new discovery token if the previous bootstrap failed.")
|
||||
}
|
||||
|
||||
case discovery.ErrDuplicateName:
|
||||
if lg != nil {
|
||||
lg.Error(
|
||||
lg.Warn(
|
||||
"member with duplicated name has already been registered",
|
||||
zap.String("discovery-token", cfg.ec.Durl),
|
||||
zap.Error(derr.Err),
|
||||
|
@ -212,9 +232,10 @@ func startEtcdOrProxyV2() {
|
|||
plog.Errorf("please check (cURL) the discovery token for more information.")
|
||||
plog.Errorf("please do not reuse the discovery token and generate a new one to bootstrap the cluster.")
|
||||
}
|
||||
|
||||
default:
|
||||
if lg != nil {
|
||||
lg.Error(
|
||||
lg.Warn(
|
||||
"failed to bootstrap; discovery token was already used",
|
||||
zap.String("discovery-token", cfg.ec.Durl),
|
||||
zap.Error(err),
|
||||
|
@ -231,7 +252,7 @@ func startEtcdOrProxyV2() {
|
|||
|
||||
if strings.Contains(err.Error(), "include") && strings.Contains(err.Error(), "--initial-cluster") {
|
||||
if lg != nil {
|
||||
lg.Error("failed to start", zap.Error(err))
|
||||
lg.Warn("failed to start", zap.Error(err))
|
||||
} else {
|
||||
plog.Infof("%v", err)
|
||||
}
|
||||
|
@ -320,7 +341,12 @@ func startProxy(cfg *config) error {
|
|||
clientTLSInfo.InsecureSkipVerify = cfg.ec.ClientAutoTLS
|
||||
cfg.ec.PeerTLSInfo.InsecureSkipVerify = cfg.ec.PeerAutoTLS
|
||||
|
||||
pt, err := transport.NewTimeoutTransport(clientTLSInfo, time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond)
|
||||
pt, err := transport.NewTimeoutTransport(
|
||||
clientTLSInfo,
|
||||
time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond,
|
||||
time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond,
|
||||
time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -333,7 +359,12 @@ func startProxy(cfg *config) error {
|
|||
plog.Fatalf("could not get certs (%v)", err)
|
||||
}
|
||||
}
|
||||
tr, err := transport.NewTimeoutTransport(cfg.ec.PeerTLSInfo, time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond)
|
||||
tr, err := transport.NewTimeoutTransport(
|
||||
cfg.ec.PeerTLSInfo,
|
||||
time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond,
|
||||
time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond,
|
||||
time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -385,6 +416,7 @@ func startProxy(cfg *config) error {
|
|||
} else {
|
||||
plog.Infof("proxy: using peer urls %v from cluster file %q", peerURLs, clusterfile)
|
||||
}
|
||||
|
||||
case os.IsNotExist(err):
|
||||
var urlsmap types.URLsMap
|
||||
urlsmap, _, err = cfg.ec.PeerURLsMapAndToken("proxy")
|
||||
|
@ -394,7 +426,7 @@ func startProxy(cfg *config) error {
|
|||
|
||||
if cfg.ec.Durl != "" {
|
||||
var s string
|
||||
s, err = discovery.GetCluster(cfg.ec.Durl, cfg.ec.Dproxy)
|
||||
s, err = discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -408,6 +440,7 @@ func startProxy(cfg *config) error {
|
|||
} else {
|
||||
plog.Infof("proxy: using peer urls %v ", peerURLs)
|
||||
}
|
||||
|
||||
default:
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -21,11 +21,10 @@ import (
|
|||
"os"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/coreos/etcd/proxy/tcpproxy"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -91,7 +90,14 @@ func stripSchema(eps []string) []string {
|
|||
}
|
||||
|
||||
func startGateway(cmd *cobra.Command, args []string) {
|
||||
srvs := discoverEndpoints(gatewayDNSCluster, gatewayCA, gatewayInsecureDiscovery)
|
||||
var lg *zap.Logger
|
||||
lg, err := zap.NewProduction()
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
srvs := discoverEndpoints(lg, gatewayDNSCluster, gatewayCA, gatewayInsecureDiscovery)
|
||||
if len(srvs.Endpoints) == 0 {
|
||||
// no endpoints discovered, fall back to provided endpoints
|
||||
srvs.Endpoints = gatewayEndpoints
|
||||
|
@ -116,13 +122,6 @@ func startGateway(cmd *cobra.Command, args []string) {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
var lg *zap.Logger
|
||||
lg, err := zap.NewProduction()
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
var l net.Listener
|
||||
l, err = net.Listen("tcp", gatewayListenAddr)
|
||||
if err != nil {
|
||||
|
|
|
@ -238,7 +238,7 @@ func checkArgs() {
|
|||
}
|
||||
|
||||
func mustNewClient(lg *zap.Logger) *clientv3.Client {
|
||||
srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
|
||||
srvs := discoverEndpoints(lg, grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
|
||||
eps := srvs.Endpoints
|
||||
if len(eps) == 0 {
|
||||
eps = grpcProxyEndpoints
|
||||
|
|
|
@ -20,9 +20,11 @@ import (
|
|||
|
||||
"github.com/coreos/etcd/pkg/srv"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func discoverEndpoints(dns string, ca string, insecure bool) (s srv.SRVClients) {
|
||||
func discoverEndpoints(lg *zap.Logger, dns string, ca string, insecure bool) (s srv.SRVClients) {
|
||||
if dns == "" {
|
||||
return s
|
||||
}
|
||||
|
@ -32,7 +34,17 @@ func discoverEndpoints(dns string, ca string, insecure bool) (s srv.SRVClients)
|
|||
os.Exit(1)
|
||||
}
|
||||
endpoints := srvs.Endpoints
|
||||
plog.Infof("discovered the cluster %s from %s", endpoints, dns)
|
||||
|
||||
if lg != nil {
|
||||
lg.Info(
|
||||
"discovered cluster from SRV",
|
||||
zap.String("srv-server", dns),
|
||||
zap.Strings("endpoints", endpoints),
|
||||
)
|
||||
} else {
|
||||
plog.Infof("discovered the cluster %s from %s", endpoints, dns)
|
||||
}
|
||||
|
||||
if insecure {
|
||||
return *srvs
|
||||
}
|
||||
|
@ -41,12 +53,41 @@ func discoverEndpoints(dns string, ca string, insecure bool) (s srv.SRVClients)
|
|||
TrustedCAFile: ca,
|
||||
ServerName: dns,
|
||||
}
|
||||
plog.Infof("validating discovered endpoints %v", endpoints)
|
||||
|
||||
if lg != nil {
|
||||
lg.Info(
|
||||
"validating discovered SRV endpoints",
|
||||
zap.String("srv-server", dns),
|
||||
zap.Strings("endpoints", endpoints),
|
||||
)
|
||||
} else {
|
||||
plog.Infof("validating discovered endpoints %v", endpoints)
|
||||
}
|
||||
|
||||
endpoints, err = transport.ValidateSecureEndpoints(tlsInfo, endpoints)
|
||||
if err != nil {
|
||||
plog.Warningf("%v", err)
|
||||
if lg != nil {
|
||||
lg.Warn(
|
||||
"failed to validate discovered endpoints",
|
||||
zap.String("srv-server", dns),
|
||||
zap.Strings("endpoints", endpoints),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
plog.Warningf("%v", err)
|
||||
}
|
||||
} else {
|
||||
if lg != nil {
|
||||
lg.Info(
|
||||
"using validated discovered SRV endpoints",
|
||||
zap.String("srv-server", dns),
|
||||
zap.Strings("endpoints", endpoints),
|
||||
)
|
||||
}
|
||||
}
|
||||
if lg == nil {
|
||||
plog.Infof("using discovered endpoints %v", endpoints)
|
||||
}
|
||||
plog.Infof("using discovered endpoints %v", endpoints)
|
||||
|
||||
// map endpoints back to SRVClients struct with SRV data
|
||||
eps := make(map[string]struct{})
|
||||
|
|
|
@ -369,7 +369,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
|||
}
|
||||
if cfg.ShouldDiscover() {
|
||||
var str string
|
||||
str, err = discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
|
||||
str, err = discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
|
||||
if err != nil {
|
||||
return nil, &DiscoveryError{Op: "join", Err: err}
|
||||
}
|
||||
|
@ -562,7 +562,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
|||
}
|
||||
srv.authStore = auth.NewAuthStore(srv.be, tp)
|
||||
if num := cfg.AutoCompactionRetention; num != 0 {
|
||||
srv.compactor, err = compactor.New(cfg.AutoCompactionMode, num, srv.kv, srv)
|
||||
srv.compactor, err = compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -30,8 +30,8 @@ import (
|
|||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/etcdserver/v2error"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in New Issue