etcdserver: trace compaction request; add return parameter 'trace' to applierV3.Compaction()

mvcc: trace compaction request; add input parameter 'trace' to KV.Compact()
release-3.5
yoyinzyc 2019-10-01 17:18:26 -07:00
parent 3a3eb24c69
commit 57aa68af5a
13 changed files with 92 additions and 55 deletions

View File

@ -55,7 +55,7 @@ type applierV3 interface {
Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error)
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error)
LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
@ -129,7 +129,7 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
case r.Txn != nil:
ar.resp, ar.err = a.s.applyV3.Txn(r.Txn)
case r.Compaction != nil:
ar.resp, ar.physc, ar.err = a.s.applyV3.Compaction(r.Compaction)
ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)
case r.LeaseGrant != nil:
ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
case r.LeaseRevoke != nil:
@ -182,7 +182,7 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu
trace = traceutil.New("put",
a.s.getLogger(),
traceutil.Field{Key: "key", Value: string(p.Key)},
traceutil.Field{Key: "value", Value: string(p.Value)},
traceutil.Field{Key: "req_size", Value: proto.Size(p)},
)
val, leaseID := p.Value, lease.LeaseID(p.Lease)
if txn == nil {
@ -197,12 +197,13 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu
var rr *mvcc.RangeResult
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
trace.StepBegin()
trace.DisableStep()
rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, nil, err
}
trace.StepEnd("get previous kv pair")
trace.EnableStep()
trace.Step("get previous kv pair")
}
if p.IgnoreValue || p.IgnoreLease {
if rr == nil || len(rr.KVs) == 0 {
@ -223,6 +224,7 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu
}
resp.Header.Revision = txn.Put(p.Key, val, leaseID)
trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
return resp, trace, nil
}
@ -568,17 +570,22 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
return txns
}
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
resp := &pb.CompactionResponse{}
resp.Header = &pb.ResponseHeader{}
ch, err := a.s.KV().Compact(compaction.Revision)
trace := traceutil.New("compact",
a.s.getLogger(),
traceutil.Field{Key: "revision", Value: compaction.Revision},
)
ch, err := a.s.KV().Compact(trace, compaction.Revision)
if err != nil {
return nil, ch, err
return nil, ch, nil, err
}
// get the current revision. which key to get is not important.
rr, _ := a.s.KV().Range([]byte("compaction"), nil, mvcc.RangeOptions{})
resp.Header.Revision = rr.Rev
return resp, ch, err
return resp, ch, trace, err
}
func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {

View File

@ -399,8 +399,8 @@ func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
return nil, ErrCorrupt
}
func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
return nil, nil, ErrCorrupt
func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
return nil, nil, nil, ErrCorrupt
}
func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {

View File

@ -39,8 +39,7 @@ const (
// However, if the committed entries are very heavy to apply, the gap might grow.
// We should stop accepting new proposals if the gap growing to a certain point.
maxGapBetweenApplyAndCommitIndex = 5000
rangeTraceThreshold = 100 * time.Millisecond
putTraceThreshold = 100 * time.Millisecond
traceThreshold = 100 * time.Millisecond
)
type RaftKV interface {
@ -93,7 +92,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
traceutil.Field{Key: "range_begin", Value: string(r.Key)},
traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)},
)
ctx = context.WithValue(ctx, traceutil.CtxKey, trace)
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
var resp *pb.RangeResponse
var err error
@ -105,7 +104,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
traceutil.Field{Key: "response_revision", Value: resp.Header.Revision},
)
}
trace.LogIfLong(rangeTraceThreshold)
trace.LogIfLong(traceThreshold)
}(time.Now())
if !r.Serializable {
@ -128,7 +127,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
}
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
ctx = context.WithValue(ctx, "time", time.Now())
ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
if err != nil {
return nil, err
@ -205,7 +204,18 @@ func isTxnReadonly(r *pb.TxnRequest) bool {
}
func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
startTime := time.Now()
result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
trace := traceutil.TODO()
if result != nil && result.trace != nil {
trace = result.trace
defer func() {
trace.LogIfLong(traceThreshold)
}()
applyStart := result.trace.GetStartTime()
result.trace.SetStartTime(startTime)
trace.InsertStep(0, applyStart, "process raft request")
}
if r.Physical && result != nil && result.physc != nil {
<-result.physc
// The compaction is done deleting keys; the hash is now settled
@ -214,6 +224,7 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.
// if the compaction resumes. Force the finished compaction to
// commit so it won't resume following a crash.
s.be.ForceCommit()
trace.Step("physically apply compaction")
}
if err != nil {
return nil, err
@ -229,6 +240,7 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.
resp.Header = &pb.ResponseHeader{}
}
resp.Header.Revision = s.kv.Rev()
trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
return resp, nil
}
@ -552,10 +564,14 @@ func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftReque
if result.err != nil {
return nil, result.err
}
if startTime, ok := ctx.Value("time").(time.Time); ok && result.trace != nil {
applyStart := result.trace.ResetStartTime(startTime)
if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.trace != nil {
applyStart := result.trace.GetStartTime()
// The trace object is created in apply. Here reset the start time to trace
// the raft request time by the difference between the request start time
// and apply start time
result.trace.SetStartTime(startTime)
result.trace.InsertStep(0, applyStart, "process raft request")
result.trace.LogIfLong(putTraceThreshold)
result.trace.LogIfLong(traceThreshold)
}
return result.resp, nil
}

View File

@ -27,6 +27,7 @@ import (
"go.etcd.io/etcd/mvcc"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/pkg/testutil"
"go.etcd.io/etcd/pkg/traceutil"
"go.uber.org/zap"
)
@ -173,7 +174,7 @@ func TestV3CorruptAlarm(t *testing.T) {
// NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
s.Put([]byte("abc"), []byte("def"), 0)
s.Put([]byte("xyz"), []byte("123"), 0)
s.Compact(5)
s.Compact(traceutil.TODO(), 5)
s.Commit()
s.Close()
be.Close()

View File

@ -115,7 +115,7 @@ type KV interface {
HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)
// Compact frees all superseded keys with revisions less than rev.
Compact(rev int64) (<-chan struct{}, error)
Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error)
// Commit commits outstanding txns into the underlying backend.
Commit()

View File

@ -183,7 +183,7 @@ func testKVRangeBadRev(t *testing.T, f rangeFunc) {
defer cleanup(s, b, tmpPath)
put3TestKVs(s)
if _, err := s.Compact(4); err != nil {
if _, err := s.Compact(traceutil.TODO(), 4); err != nil {
t.Fatalf("compact error (%v)", err)
}
@ -545,7 +545,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
},
}
for i, tt := range tests {
_, err := s.Compact(tt.rev)
_, err := s.Compact(traceutil.TODO(), tt.rev)
if err != nil {
t.Errorf("#%d: unexpect compact error %v", i, err)
}
@ -581,7 +581,7 @@ func TestKVCompactBad(t *testing.T) {
{100, ErrFutureRev},
}
for i, tt := range tests {
_, err := s.Compact(tt.rev)
_, err := s.Compact(traceutil.TODO(), tt.rev)
if err != tt.werr {
t.Errorf("#%d: compact error = %v, want %v", i, err, tt.werr)
}
@ -627,7 +627,7 @@ func TestKVRestore(t *testing.T) {
func(kv KV) {
kv.Put([]byte("foo"), []byte("bar0"), 1)
kv.Put([]byte("foo"), []byte("bar1"), 2)
kv.Compact(1)
kv.Compact(traceutil.TODO(), 1)
},
}
for i, tt := range tests {

View File

@ -271,9 +271,10 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
return nil, nil
}
func (s *store) compact(rev int64) (<-chan struct{}, error) {
func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
start := time.Now()
keep := s.kvindex.Compact(rev)
trace.Step("compact in-memory index tree")
ch := make(chan struct{})
var j = func(ctx context.Context) {
if ctx.Err() != nil {
@ -290,6 +291,7 @@ func (s *store) compact(rev int64) (<-chan struct{}, error) {
s.fifoSched.Schedule(j)
indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
trace.Step("schedule compaction")
return ch, nil
}
@ -299,21 +301,21 @@ func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
return ch, err
}
return s.compact(rev)
return s.compact(traceutil.TODO(), rev)
}
func (s *store) Compact(rev int64) (<-chan struct{}, error) {
func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
s.mu.Lock()
ch, err := s.updateCompactRev(rev)
trace.Step("check and update compact revision")
if err != nil {
s.mu.Unlock()
return ch, err
}
s.mu.Unlock()
return s.compact(rev)
return s.compact(trace, rev)
}
// DefaultIgnores is a map of keys to ignore in hash checking.

View File

@ -22,6 +22,7 @@ import (
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/pkg/traceutil"
"go.uber.org/zap"
)
@ -109,7 +110,7 @@ func TestCompactAllAndRestore(t *testing.T) {
rev := s0.Rev()
// compact all keys
done, err := s0.Compact(rev)
done, err := s0.Compact(traceutil.TODO(), rev)
if err != nil {
t.Fatal(err)
}

View File

@ -332,7 +332,7 @@ func TestStoreCompact(t *testing.T) {
key2 := newTestKeyBytes(revision{2, 0}, false)
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil}
s.Compact(3)
s.Compact(traceutil.TODO(), 3)
s.fifoSched.WaitFinish(1)
if s.compactMainRev != 3 {
@ -583,7 +583,7 @@ func TestHashKVWhenCompacting(t *testing.T) {
go func() {
defer wg.Done()
for i := 100; i >= 0; i-- {
_, err := s.Compact(int64(rev - 1 - i))
_, err := s.Compact(traceutil.TODO(), int64(rev-1-i))
if err != nil {
t.Error(err)
}
@ -610,7 +610,7 @@ func TestHashKVZeroRevision(t *testing.T) {
for i := 2; i <= rev; i++ {
s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
}
if _, err := s.Compact(int64(rev / 2)); err != nil {
if _, err := s.Compact(traceutil.TODO(), int64(rev/2)); err != nil {
t.Fatal(err)
}

View File

@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/traceutil"
"go.uber.org/zap"
)
@ -237,7 +238,7 @@ func TestWatchCompacted(t *testing.T) {
for i := 0; i < maxRev; i++ {
s.Put(testKey, testValue, lease.NoLease)
}
_, err := s.Compact(compactRev)
_, err := s.Compact(traceutil.TODO(), compactRev)
if err != nil {
t.Fatalf("failed to compact kv (%v)", err)
}

View File

@ -25,7 +25,10 @@ import (
"go.uber.org/zap"
)
const CtxKey = "trace"
const (
TraceKey = "trace"
StartTimeKey = "startTime"
)
// Field is a kv pair to record additional details of the trace.
type Field struct {
@ -51,12 +54,12 @@ func writeFields(fields []Field) string {
}
type Trace struct {
operation string
lg *zap.Logger
fields []Field
startTime time.Time
steps []step
inStep bool
operation string
lg *zap.Logger
fields []Field
startTime time.Time
steps []step
stepDisabled bool
}
type step struct {
@ -75,16 +78,18 @@ func TODO() *Trace {
}
func Get(ctx context.Context) *Trace {
if trace, ok := ctx.Value(CtxKey).(*Trace); ok && trace != nil {
if trace, ok := ctx.Value(TraceKey).(*Trace); ok && trace != nil {
return trace
}
return TODO()
}
func (t *Trace) ResetStartTime(time time.Time) (prev time.Time) {
prev = t.startTime
func (t *Trace) GetStartTime() time.Time {
return t.startTime
}
func (t *Trace) SetStartTime(time time.Time) {
t.startTime = time
return prev
}
func (t *Trace) InsertStep(at int, time time.Time, msg string, fields ...Field) {
@ -96,19 +101,22 @@ func (t *Trace) InsertStep(at int, time time.Time, msg string, fields ...Field)
t.steps = append(t.steps, newStep)
}
}
// Step adds step to trace
func (t *Trace) Step(msg string, fields ...Field) {
if !t.inStep {
if !t.stepDisabled {
t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields})
}
}
func (t *Trace) StepBegin() {
t.inStep = true
// DisableStep sets the flag to prevent the trace from adding steps
func (t *Trace) DisableStep() {
t.stepDisabled = true
}
func (t *Trace) StepEnd(msg string, fields ...Field) {
t.inStep = false
t.Step(msg, fields...)
// EnableStep re-enable the trace to add steps
func (t *Trace) EnableStep() {
t.stepDisabled = false
}
func (t *Trace) AddField(fields ...Field) {
@ -149,7 +157,7 @@ func (t *Trace) logInfo(threshold time.Duration) (string, []zap.Field) {
for _, step := range t.steps {
stepDuration := step.time.Sub(lastStepTime)
if stepDuration > threshold {
steps = append(steps, fmt.Sprintf("trace[%d] step '%v' %s (duration: %v)",
steps = append(steps, fmt.Sprintf("trace[%d] '%v' %s (duration: %v)",
traceNum, step.msg, writeFields(step.fields), stepDuration))
}
lastStepTime = step.time

View File

@ -41,7 +41,7 @@ func TestGet(t *testing.T) {
},
{
name: "When the context has trace",
inputCtx: context.WithValue(context.Background(), CtxKey, traceForTest),
inputCtx: context.WithValue(context.Background(), TraceKey, traceForTest),
outputTrace: traceForTest,
},
}

View File

@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/pkg/report"
"go.etcd.io/etcd/pkg/traceutil"
"github.com/spf13/cobra"
)
@ -114,7 +115,7 @@ func mvccPutFunc(cmd *cobra.Command, args []string) {
for i := 0; i < mvccTotalRequests; i++ {
st := time.Now()
tw := s.Write()
tw := s.Write(traceutil.TODO())
for j := i; j < i+nrTxnOps; j++ {
tw.Put(keys[j], vals[j], lease.NoLease)
}