pkg: add field to record additional detail of trace; add stepThreshold
to reduce log volume.release-3.5
parent
f4e7fc56a7
commit
3830b3ef11
|
@ -35,6 +35,7 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
warnApplyDuration = 100 * time.Millisecond
|
warnApplyDuration = 100 * time.Millisecond
|
||||||
|
rangeTraceThreshold = 100 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
type applyResult struct {
|
type applyResult struct {
|
||||||
|
@ -247,11 +248,7 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||||
trace, ok := ctx.Value("trace").(*traceutil.Trace)
|
trace := traceutil.Get(ctx)
|
||||||
if !ok || trace == nil {
|
|
||||||
trace = traceutil.New("Apply Range")
|
|
||||||
ctx = context.WithValue(ctx, "trace", trace)
|
|
||||||
}
|
|
||||||
|
|
||||||
resp := &pb.RangeResponse{}
|
resp := &pb.RangeResponse{}
|
||||||
resp.Header = &pb.ResponseHeader{}
|
resp.Header = &pb.ResponseHeader{}
|
||||||
|
@ -350,8 +347,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra
|
||||||
|
|
||||||
func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
|
||||||
isWrite := !isTxnReadonly(rt)
|
isWrite := !isTxnReadonly(rt)
|
||||||
trace := traceutil.New("ReadOnlyTxn")
|
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(traceutil.TODO()))
|
||||||
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace))
|
|
||||||
|
|
||||||
txnPath := compareToPath(txn, rt)
|
txnPath := compareToPath(txn, rt)
|
||||||
if isWrite {
|
if isWrite {
|
||||||
|
|
|
@ -24,7 +24,6 @@ import (
|
||||||
"go.etcd.io/etcd/etcdserver/api/membership"
|
"go.etcd.io/etcd/etcdserver/api/membership"
|
||||||
"go.etcd.io/etcd/etcdserver/api/rafthttp"
|
"go.etcd.io/etcd/etcdserver/api/rafthttp"
|
||||||
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||||||
"go.etcd.io/etcd/pkg/traceutil"
|
|
||||||
"go.etcd.io/etcd/pkg/types"
|
"go.etcd.io/etcd/pkg/types"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -109,7 +108,7 @@ func warnOfExpensiveRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Strin
|
||||||
if !isNil(respMsg) {
|
if !isNil(respMsg) {
|
||||||
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
|
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
|
||||||
}
|
}
|
||||||
warnOfExpensiveGenericRequest(lg, nil, now, reqStringer, "", resp, err)
|
warnOfExpensiveGenericRequest(lg, now, reqStringer, "", resp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
|
func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
|
||||||
|
@ -127,18 +126,18 @@ func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnR
|
||||||
}
|
}
|
||||||
resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
|
resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
|
||||||
}
|
}
|
||||||
warnOfExpensiveGenericRequest(lg, nil, now, reqStringer, "read-only range ", resp, err)
|
warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, trace *traceutil.Trace, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
|
func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
|
||||||
var resp string
|
var resp string
|
||||||
if !isNil(rangeResponse) {
|
if !isNil(rangeResponse) {
|
||||||
resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
|
resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
|
||||||
}
|
}
|
||||||
warnOfExpensiveGenericRequest(lg, trace, now, reqStringer, "read-only range ", resp, err)
|
warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func warnOfExpensiveGenericRequest(lg *zap.Logger, trace *traceutil.Trace, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
|
func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
|
||||||
d := time.Since(now)
|
d := time.Since(now)
|
||||||
if d > warnApplyDuration {
|
if d > warnApplyDuration {
|
||||||
if lg != nil {
|
if lg != nil {
|
||||||
|
@ -160,9 +159,6 @@ func warnOfExpensiveGenericRequest(lg *zap.Logger, trace *traceutil.Trace, now t
|
||||||
}
|
}
|
||||||
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
|
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
|
||||||
}
|
}
|
||||||
if trace != nil {
|
|
||||||
trace.Log(lg)
|
|
||||||
}
|
|
||||||
slowApplies.Inc()
|
slowApplies.Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,13 +86,23 @@ type Authenticator interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||||
trace := traceutil.New("Range")
|
trace := traceutil.New("Range",
|
||||||
|
traceutil.Field{Key: "RangeBegin", Value: string(r.Key)},
|
||||||
|
traceutil.Field{Key: "RangeEnd", Value: string(r.RangeEnd)},
|
||||||
|
)
|
||||||
ctx = context.WithValue(ctx, "trace", trace)
|
ctx = context.WithValue(ctx, "trace", trace)
|
||||||
|
|
||||||
var resp *pb.RangeResponse
|
var resp *pb.RangeResponse
|
||||||
var err error
|
var err error
|
||||||
defer func(start time.Time) {
|
defer func(start time.Time) {
|
||||||
warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), trace, start, r, resp, err)
|
warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err)
|
||||||
|
if resp != nil {
|
||||||
|
trace.AddField(
|
||||||
|
traceutil.Field{Key: "ResponseCount", Value: len(resp.Kvs)},
|
||||||
|
traceutil.Field{Key: "ResponseRevision", Value: resp.Header.Revision},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
trace.LogIfLong(rangeTraceThreshold, s.getLogger())
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
if !r.Serializable {
|
if !r.Serializable {
|
||||||
|
@ -564,9 +574,8 @@ func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) e
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if trace, ok := ctx.Value("trace").(*traceutil.Trace); ok && trace != nil {
|
trace := traceutil.Get(ctx)
|
||||||
trace.Step("Authentication.")
|
trace.Step("Authentication.")
|
||||||
}
|
|
||||||
// fetch response for serialized request
|
// fetch response for serialized request
|
||||||
get()
|
get()
|
||||||
// check for stale token revision in case the auth store was updated while
|
// check for stale token revision in case the auth store was updated while
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"go.etcd.io/etcd/mvcc/backend"
|
"go.etcd.io/etcd/mvcc/backend"
|
||||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
"go.etcd.io/etcd/pkg/testutil"
|
"go.etcd.io/etcd/pkg/testutil"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
dto "github.com/prometheus/client_model/go"
|
dto "github.com/prometheus/client_model/go"
|
||||||
|
@ -47,7 +48,7 @@ var (
|
||||||
return kv.Range(key, end, ro)
|
return kv.Range(key, end, ro)
|
||||||
}
|
}
|
||||||
txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
|
txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
|
||||||
txn := kv.Read(nil)
|
txn := kv.Read(traceutil.TODO())
|
||||||
defer txn.End()
|
defer txn.End()
|
||||||
return txn.Range(key, end, ro)
|
return txn.Range(key, end, ro)
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,24 +14,27 @@
|
||||||
|
|
||||||
package mvcc
|
package mvcc
|
||||||
|
|
||||||
import "go.etcd.io/etcd/lease"
|
import (
|
||||||
|
"go.etcd.io/etcd/lease"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
|
)
|
||||||
|
|
||||||
type readView struct{ kv KV }
|
type readView struct{ kv KV }
|
||||||
|
|
||||||
func (rv *readView) FirstRev() int64 {
|
func (rv *readView) FirstRev() int64 {
|
||||||
tr := rv.kv.Read(nil)
|
tr := rv.kv.Read(traceutil.TODO())
|
||||||
defer tr.End()
|
defer tr.End()
|
||||||
return tr.FirstRev()
|
return tr.FirstRev()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rv *readView) Rev() int64 {
|
func (rv *readView) Rev() int64 {
|
||||||
tr := rv.kv.Read(nil)
|
tr := rv.kv.Read(traceutil.TODO())
|
||||||
defer tr.End()
|
defer tr.End()
|
||||||
return tr.Rev()
|
return tr.Rev()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
||||||
tr := rv.kv.Read(nil)
|
tr := rv.kv.Read(traceutil.TODO())
|
||||||
defer tr.End()
|
defer tr.End()
|
||||||
return tr.Range(key, end, ro)
|
return tr.Range(key, end, ro)
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ import (
|
||||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
"go.etcd.io/etcd/pkg/schedule"
|
"go.etcd.io/etcd/pkg/schedule"
|
||||||
"go.etcd.io/etcd/pkg/testutil"
|
"go.etcd.io/etcd/pkg/testutil"
|
||||||
|
"go.etcd.io/etcd/pkg/traceutil"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -658,7 +659,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
|
||||||
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
||||||
|
|
||||||
// readTx simulates a long read request
|
// readTx simulates a long read request
|
||||||
readTx1 := s.Read(nil)
|
readTx1 := s.Read(traceutil.TODO())
|
||||||
|
|
||||||
// write should not be blocked by reads
|
// write should not be blocked by reads
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
|
@ -673,7 +674,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// readTx2 simulates a short read request
|
// readTx2 simulates a short read request
|
||||||
readTx2 := s.Read(nil)
|
readTx2 := s.Read(traceutil.TODO())
|
||||||
ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
|
ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
|
||||||
ret, err := readTx2.Range([]byte("foo"), nil, ro)
|
ret, err := readTx2.Range([]byte("foo"), nil, ro)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -756,7 +757,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) {
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
wKVs := make(kvs, len(committedKVs))
|
wKVs := make(kvs, len(committedKVs))
|
||||||
copy(wKVs, committedKVs)
|
copy(wKVs, committedKVs)
|
||||||
tx := s.Read(nil)
|
tx := s.Read(traceutil.TODO())
|
||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
// get all keys in backend store, and compare with wKVs
|
// get all keys in backend store, and compare with wKVs
|
||||||
ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
|
ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
|
||||||
|
|
|
@ -69,7 +69,7 @@ func (s *store) Write() TxnWrite {
|
||||||
tx := s.b.BatchTx()
|
tx := s.b.BatchTx()
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
tw := &storeTxnWrite{
|
tw := &storeTxnWrite{
|
||||||
storeTxnRead: storeTxnRead{s, tx, 0, 0, nil},
|
storeTxnRead: storeTxnRead{s, tx, 0, 0, traceutil.TODO()},
|
||||||
tx: tx,
|
tx: tx,
|
||||||
beginRev: s.currentRev,
|
beginRev: s.currentRev,
|
||||||
changes: make([]mvccpb.KeyValue, 0, 4),
|
changes: make([]mvccpb.KeyValue, 0, 4),
|
||||||
|
@ -127,9 +127,7 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
revpairs := tr.s.kvindex.Revisions(key, end, rev)
|
revpairs := tr.s.kvindex.Revisions(key, end, rev)
|
||||||
if tr.trace != nil {
|
|
||||||
tr.trace.Step("Range keys from in-memory index tree.")
|
tr.trace.Step("Range keys from in-memory index tree.")
|
||||||
}
|
|
||||||
if len(revpairs) == 0 {
|
if len(revpairs) == 0 {
|
||||||
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
|
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
|
||||||
}
|
}
|
||||||
|
@ -169,9 +167,7 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if tr.trace != nil {
|
|
||||||
tr.trace.Step("Range keys from bolt db.")
|
tr.trace.Step("Range keys from bolt db.")
|
||||||
}
|
|
||||||
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
|
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,9 @@ package traceutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/pkg/capnslog"
|
"github.com/coreos/pkg/capnslog"
|
||||||
|
@ -13,8 +15,32 @@ var (
|
||||||
plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "trace")
|
plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "trace")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Field is a kv pair to record additional details of the trace.
|
||||||
|
type Field struct {
|
||||||
|
Key string
|
||||||
|
Value interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Field) format() string {
|
||||||
|
return fmt.Sprintf("%s:%v; ", f.Key, f.Value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeFields(fields []Field) string {
|
||||||
|
if len(fields) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
var buf bytes.Buffer
|
||||||
|
buf.WriteString("{")
|
||||||
|
for _, f := range fields {
|
||||||
|
buf.WriteString(f.format())
|
||||||
|
}
|
||||||
|
buf.WriteString("}")
|
||||||
|
return buf.String()
|
||||||
|
}
|
||||||
|
|
||||||
type Trace struct {
|
type Trace struct {
|
||||||
operation string
|
operation string
|
||||||
|
fields []Field
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
steps []step
|
steps []step
|
||||||
}
|
}
|
||||||
|
@ -22,39 +48,88 @@ type Trace struct {
|
||||||
type step struct {
|
type step struct {
|
||||||
time time.Time
|
time time.Time
|
||||||
msg string
|
msg string
|
||||||
|
fields []Field
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(op string) *Trace {
|
func New(op string, fields ...Field) *Trace {
|
||||||
return &Trace{operation: op, startTime: time.Now()}
|
return &Trace{operation: op, startTime: time.Now(), fields: fields}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Trace) Step(msg string) {
|
// traceutil.TODO() returns a non-nil, empty Trace
|
||||||
t.steps = append(t.steps, step{time: time.Now(), msg: msg})
|
func TODO() *Trace {
|
||||||
|
return &Trace{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dump all steps in the Trace
|
func Get(ctx context.Context) *Trace {
|
||||||
|
if trace, ok := ctx.Value("trace").(*Trace); ok && trace != nil {
|
||||||
|
return trace
|
||||||
|
}
|
||||||
|
return TODO()
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetOrCreate(ctx context.Context, op string, fields ...Field) (context.Context, *Trace) {
|
||||||
|
trace, ok := ctx.Value("trace").(*Trace)
|
||||||
|
if !ok || trace == nil {
|
||||||
|
trace = New(op)
|
||||||
|
trace.fields = fields
|
||||||
|
ctx = context.WithValue(ctx, "trace", trace)
|
||||||
|
}
|
||||||
|
return ctx, trace
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Trace) Step(msg string, fields ...Field) {
|
||||||
|
t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Trace) AddField(fields ...Field) {
|
||||||
|
for _, f := range fields {
|
||||||
|
t.fields = append(t.fields, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log dumps all steps in the Trace
|
||||||
func (t *Trace) Log(lg *zap.Logger) {
|
func (t *Trace) Log(lg *zap.Logger) {
|
||||||
|
t.LogWithStepThreshold(0, lg)
|
||||||
var buf bytes.Buffer
|
|
||||||
|
|
||||||
buf.WriteString(fmt.Sprintf("The tracing of %v request:\n", t.operation))
|
|
||||||
|
|
||||||
buf.WriteString("Request started at:")
|
|
||||||
buf.WriteString(t.startTime.Format("2006-01-02 15:04:05"))
|
|
||||||
buf.WriteString(fmt.Sprintf(".%06d", t.startTime.Nanosecond()/1000))
|
|
||||||
buf.WriteString("\n")
|
|
||||||
lastStepTime := t.startTime
|
|
||||||
for i, step := range t.steps {
|
|
||||||
buf.WriteString(fmt.Sprintf("Step %d: %v Time cost: %v\n", i, step.msg, step.time.Sub(lastStepTime)))
|
|
||||||
//fmt.Println(step.msg, " costs: ", step.time.Sub(lastStepTime))
|
|
||||||
lastStepTime = step.time
|
|
||||||
}
|
}
|
||||||
buf.WriteString("Trace End\n")
|
|
||||||
|
|
||||||
s := buf.String()
|
// LogIfLong dumps logs if the duration is longer than threshold
|
||||||
|
func (t *Trace) LogIfLong(threshold time.Duration, lg *zap.Logger) {
|
||||||
|
if time.Since(t.startTime) > threshold {
|
||||||
|
stepThreshold := threshold / time.Duration(len(t.steps)+1)
|
||||||
|
t.LogWithStepThreshold(stepThreshold, lg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogWithStepThreshold only dumps step whose duration is longer than step threshold
|
||||||
|
func (t *Trace) LogWithStepThreshold(threshold time.Duration, lg *zap.Logger) {
|
||||||
|
s := t.format(threshold)
|
||||||
if lg != nil {
|
if lg != nil {
|
||||||
lg.Info(s)
|
lg.Info(s)
|
||||||
} else {
|
} else {
|
||||||
plog.Info(s)
|
plog.Info(s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *Trace) format(threshold time.Duration) string {
|
||||||
|
endTime := time.Now()
|
||||||
|
totalDuration := endTime.Sub(t.startTime)
|
||||||
|
var buf bytes.Buffer
|
||||||
|
traceNum := rand.Int31()
|
||||||
|
|
||||||
|
buf.WriteString(fmt.Sprintf("Trace[%d] \"%v\" %s (duration: %v, start: %v)\n",
|
||||||
|
traceNum, t.operation, writeFields(t.fields), totalDuration,
|
||||||
|
t.startTime.Format("2006-01-02 15:04:05.000")))
|
||||||
|
lastStepTime := t.startTime
|
||||||
|
for _, step := range t.steps {
|
||||||
|
stepDuration := step.time.Sub(lastStepTime)
|
||||||
|
if stepDuration > threshold {
|
||||||
|
buf.WriteString(fmt.Sprintf("Trace[%d] Step \"%v\" %s (duration: %v)\n",
|
||||||
|
traceNum, step.msg, writeFields(step.fields), stepDuration))
|
||||||
|
}
|
||||||
|
lastStepTime = step.time
|
||||||
|
}
|
||||||
|
buf.WriteString(fmt.Sprintf("Trace[%d] End %v\n", traceNum,
|
||||||
|
endTime.Format("2006-01-02 15:04:05.000")))
|
||||||
|
|
||||||
|
return buf.String()
|
||||||
|
}
|
||||||
|
|
|
@ -1,28 +1,325 @@
|
||||||
package traceutil
|
package traceutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestTrace(t *testing.T) {
|
func TestGet(t *testing.T) {
|
||||||
|
traceForTest := &Trace{operation: "test"}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
inputCtx context.Context
|
||||||
|
outputTrace *Trace
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "When the context does not have trace",
|
||||||
|
inputCtx: context.TODO(),
|
||||||
|
outputTrace: TODO(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "When the context has trace",
|
||||||
|
inputCtx: context.WithValue(context.Background(), "trace", traceForTest),
|
||||||
|
outputTrace: traceForTest,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
trace := Get(tt.inputCtx)
|
||||||
|
if trace == nil {
|
||||||
|
t.Errorf("Expected %v; Got nil\n", tt.outputTrace)
|
||||||
|
}
|
||||||
|
if trace.operation != tt.outputTrace.operation {
|
||||||
|
t.Errorf("Expected %v; Got %v\n", tt.outputTrace, trace)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetOrCreate(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
inputCtx context.Context
|
||||||
|
outputTraceOp string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "When the context does not have trace",
|
||||||
|
inputCtx: context.TODO(),
|
||||||
|
outputTraceOp: "test",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "When the context has trace",
|
||||||
|
inputCtx: context.WithValue(context.Background(), "trace", &Trace{operation: "test"}),
|
||||||
|
outputTraceOp: "test",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
ctx, trace := GetOrCreate(tt.inputCtx, "test")
|
||||||
|
if trace == nil {
|
||||||
|
t.Errorf("Expected trace object; Got nil\n")
|
||||||
|
} else if trace.operation != tt.outputTraceOp {
|
||||||
|
t.Errorf("Expected %v; Got %v\n", tt.outputTraceOp, trace.operation)
|
||||||
|
}
|
||||||
|
if ctx.Value("trace") == nil {
|
||||||
|
t.Errorf("Expected context has attached trace; Got nil\n")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCreate(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
op = "Test"
|
op = "Test"
|
||||||
steps = []string{"Step1, Step2"}
|
steps = []string{"Step1, Step2"}
|
||||||
|
fields = []Field{
|
||||||
|
{"traceKey1", "traceValue1"},
|
||||||
|
{"traceKey2", "traceValue2"},
|
||||||
|
}
|
||||||
|
stepFields = []Field{
|
||||||
|
{"stepKey1", "stepValue2"},
|
||||||
|
{"stepKey2", "stepValue2"},
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
trace := New(op)
|
trace := New(op, fields[0], fields[1])
|
||||||
if trace.operation != op {
|
if trace.operation != op {
|
||||||
t.Errorf("Expected %v, got %v\n", op, trace.operation)
|
t.Errorf("Expected %v; Got %v\n", op, trace.operation)
|
||||||
|
}
|
||||||
|
for i, f := range trace.fields {
|
||||||
|
if f.Key != fields[i].Key {
|
||||||
|
t.Errorf("Expected %v; Got %v\n", fields[i].Key, f.Key)
|
||||||
|
}
|
||||||
|
if f.Value != fields[i].Value {
|
||||||
|
t.Errorf("Expected %v; Got %v\n", fields[i].Value, f.Value)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, v := range steps {
|
|
||||||
trace.Step(v)
|
|
||||||
trace.Step(v)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, v := range steps {
|
for i, v := range steps {
|
||||||
if v != trace.steps[i].msg {
|
trace.Step(v, stepFields[i])
|
||||||
t.Errorf("Expected %v, got %v\n.", v, trace.steps[i].msg)
|
}
|
||||||
|
|
||||||
|
for i, v := range trace.steps {
|
||||||
|
if steps[i] != v.msg {
|
||||||
|
t.Errorf("Expected %v, got %v\n.", steps[i], v.msg)
|
||||||
|
}
|
||||||
|
if stepFields[i].Key != v.fields[0].Key {
|
||||||
|
t.Errorf("Expected %v; Got %v\n", stepFields[i].Key, v.fields[0].Key)
|
||||||
|
}
|
||||||
|
if stepFields[i].Value != v.fields[0].Value {
|
||||||
|
t.Errorf("Expected %v; Got %v\n", stepFields[i].Value, v.fields[0].Value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLog(t *testing.T) {
|
||||||
|
test := struct {
|
||||||
|
name string
|
||||||
|
trace *Trace
|
||||||
|
expectedMsg []string
|
||||||
|
}{
|
||||||
|
name: "When dump all logs",
|
||||||
|
trace: &Trace{
|
||||||
|
operation: "Test",
|
||||||
|
startTime: time.Now().Add(-100 * time.Millisecond),
|
||||||
|
steps: []step{
|
||||||
|
{time: time.Now().Add(-80 * time.Millisecond), msg: "msg1"},
|
||||||
|
{time: time.Now().Add(-50 * time.Millisecond), msg: "msg2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedMsg: []string{
|
||||||
|
"msg1", "msg2",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
logPath := filepath.Join(os.TempDir(), fmt.Sprintf("test-log-%d", time.Now().UnixNano()))
|
||||||
|
defer os.RemoveAll(logPath)
|
||||||
|
|
||||||
|
lcfg := zap.NewProductionConfig()
|
||||||
|
lcfg.OutputPaths = []string{logPath}
|
||||||
|
lcfg.ErrorOutputPaths = []string{logPath}
|
||||||
|
lg, _ := lcfg.Build()
|
||||||
|
|
||||||
|
test.trace.Log(lg)
|
||||||
|
data, err := ioutil.ReadFile(logPath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, msg := range test.expectedMsg {
|
||||||
|
if !bytes.Contains(data, []byte(msg)) {
|
||||||
|
t.Errorf("Expected to find %v in log.\n", msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTraceFormat(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
trace *Trace
|
||||||
|
fields []Field
|
||||||
|
expectedMsg []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "When trace has fields",
|
||||||
|
trace: &Trace{
|
||||||
|
operation: "Test",
|
||||||
|
startTime: time.Now().Add(-100 * time.Millisecond),
|
||||||
|
steps: []step{
|
||||||
|
{
|
||||||
|
time: time.Now().Add(-80 * time.Millisecond),
|
||||||
|
msg: "msg1",
|
||||||
|
fields: []Field{{"stepKey1", "stepValue1"}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
time: time.Now().Add(-50 * time.Millisecond),
|
||||||
|
msg: "msg2",
|
||||||
|
fields: []Field{{"stepKey2", "stepValue2"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
fields: []Field{
|
||||||
|
{"traceKey1", "traceValue1"},
|
||||||
|
{"count", 1},
|
||||||
|
},
|
||||||
|
expectedMsg: []string{
|
||||||
|
"Test",
|
||||||
|
"msg1", "msg2",
|
||||||
|
"traceKey1:traceValue1", "count:1",
|
||||||
|
"stepKey1:stepValue1", "stepKey2:stepValue2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "When trace has no field",
|
||||||
|
trace: &Trace{
|
||||||
|
operation: "Test",
|
||||||
|
startTime: time.Now().Add(-100 * time.Millisecond),
|
||||||
|
steps: []step{
|
||||||
|
{time: time.Now().Add(-80 * time.Millisecond), msg: "msg1"},
|
||||||
|
{time: time.Now().Add(-50 * time.Millisecond), msg: "msg2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
fields: []Field{},
|
||||||
|
expectedMsg: []string{
|
||||||
|
"Test",
|
||||||
|
"msg1", "msg2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
for _, f := range tt.fields {
|
||||||
|
tt.trace.AddField(f)
|
||||||
|
}
|
||||||
|
s := tt.trace.format(0)
|
||||||
|
var buf bytes.Buffer
|
||||||
|
buf.WriteString(`Trace\[(\d*)?\](.+)\(duration(.+)start(.+)\)\n`)
|
||||||
|
for range tt.trace.steps {
|
||||||
|
buf.WriteString(`Trace\[(\d*)?\](.+)Step(.+)\(duration(.+)\)\n`)
|
||||||
|
}
|
||||||
|
buf.WriteString(`Trace\[(\d*)?\](.+)End(.+)\n`)
|
||||||
|
pattern := buf.String()
|
||||||
|
|
||||||
|
r, _ := regexp.Compile(pattern)
|
||||||
|
if !r.MatchString(s) {
|
||||||
|
t.Errorf("Wrong log format.\n")
|
||||||
|
}
|
||||||
|
for _, msg := range tt.expectedMsg {
|
||||||
|
if !strings.Contains(s, msg) {
|
||||||
|
t.Errorf("Expected to find %v in log.\n", msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLogIfLong(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
threshold time.Duration
|
||||||
|
trace *Trace
|
||||||
|
expectedMsg []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "When the duration is smaller than threshold",
|
||||||
|
threshold: time.Duration(200 * time.Millisecond),
|
||||||
|
trace: &Trace{
|
||||||
|
operation: "Test",
|
||||||
|
startTime: time.Now().Add(-100 * time.Millisecond),
|
||||||
|
steps: []step{
|
||||||
|
{time: time.Now().Add(-50 * time.Millisecond), msg: "msg1"},
|
||||||
|
{time: time.Now(), msg: "msg2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedMsg: []string{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "When the duration is longer than threshold",
|
||||||
|
threshold: time.Duration(50 * time.Millisecond),
|
||||||
|
trace: &Trace{
|
||||||
|
operation: "Test",
|
||||||
|
startTime: time.Now().Add(-100 * time.Millisecond),
|
||||||
|
steps: []step{
|
||||||
|
{time: time.Now().Add(-50 * time.Millisecond), msg: "msg1"},
|
||||||
|
{time: time.Now(), msg: "msg2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedMsg: []string{
|
||||||
|
"msg1", "msg2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "When not all steps are longer than step threshold",
|
||||||
|
threshold: time.Duration(50 * time.Millisecond),
|
||||||
|
trace: &Trace{
|
||||||
|
operation: "Test",
|
||||||
|
startTime: time.Now().Add(-100 * time.Millisecond),
|
||||||
|
steps: []step{
|
||||||
|
{time: time.Now(), msg: "msg1"},
|
||||||
|
{time: time.Now(), msg: "msg2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedMsg: []string{
|
||||||
|
"msg1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
logPath := filepath.Join(os.TempDir(), fmt.Sprintf("test-log-%d", time.Now().UnixNano()))
|
||||||
|
defer os.RemoveAll(logPath)
|
||||||
|
|
||||||
|
lcfg := zap.NewProductionConfig()
|
||||||
|
lcfg.OutputPaths = []string{logPath}
|
||||||
|
lcfg.ErrorOutputPaths = []string{logPath}
|
||||||
|
lg, _ := lcfg.Build()
|
||||||
|
|
||||||
|
tt.trace.LogIfLong(tt.threshold, lg)
|
||||||
|
data, err := ioutil.ReadFile(logPath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
for _, msg := range tt.expectedMsg {
|
||||||
|
if !bytes.Contains(data, []byte(msg)) {
|
||||||
|
t.Errorf("Expected to find %v in log\n", msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue