// Copyright 2015 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package mvcc import ( "context" "encoding/binary" "errors" "fmt" "hash/crc32" "math" "sync" "sync/atomic" "time" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/schedule" "go.etcd.io/etcd/pkg/traceutil" "github.com/coreos/pkg/capnslog" "go.uber.org/zap" ) var ( keyBucketName = []byte("key") metaBucketName = []byte("meta") consistentIndexKeyName = []byte("consistent_index") scheduledCompactKeyName = []byte("scheduledCompactRev") finishedCompactKeyName = []byte("finishedCompactRev") ErrCompacted = errors.New("mvcc: required revision has been compacted") ErrFutureRev = errors.New("mvcc: required revision is a future revision") ErrCanceled = errors.New("mvcc: watcher is canceled") ErrClosed = errors.New("mvcc: closed") plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc") ) const ( // markedRevBytesLen is the byte length of marked revision. // The first `revBytesLen` bytes represents a normal revision. The last // one byte is the mark. markedRevBytesLen = revBytesLen + 1 markBytePosition = markedRevBytesLen - 1 markTombstone byte = 't' ) var restoreChunkKeys = 10000 // non-const for testing var defaultCompactBatchLimit = 1000 // ConsistentIndexGetter is an interface that wraps the Get method. // Consistent index is the offset of an entry in a consistent replicated log. type ConsistentIndexGetter interface { // ConsistentIndex returns the consistent index of current executing entry. ConsistentIndex() uint64 } type StoreConfig struct { CompactionBatchLimit int } type store struct { ReadView WriteView // consistentIndex caches the "consistent_index" key's value. Accessed // through atomics so must be 64-bit aligned. consistentIndex uint64 cfg StoreConfig // mu read locks for txns and write locks for non-txn store changes. mu sync.RWMutex ig ConsistentIndexGetter b backend.Backend kvindex index le lease.Lessor // revMuLock protects currentRev and compactMainRev. // Locked at end of write txn and released after write txn unlock lock. // Locked before locking read txn and released after locking. revMu sync.RWMutex // currentRev is the revision of the last completed transaction. currentRev int64 // compactMainRev is the main revision of the last compaction. compactMainRev int64 // bytesBuf8 is a byte slice of length 8 // to avoid a repetitive allocation in saveIndex. bytesBuf8 []byte fifoSched schedule.Scheduler stopc chan struct{} lg *zap.Logger } // NewStore returns a new store. It is useful to create a store inside // mvcc pkg. It should only be used for testing externally. func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store { if cfg.CompactionBatchLimit == 0 { cfg.CompactionBatchLimit = defaultCompactBatchLimit } s := &store{ cfg: cfg, b: b, ig: ig, kvindex: newTreeIndex(lg), le: le, currentRev: 1, compactMainRev: -1, bytesBuf8: make([]byte, 8), fifoSched: schedule.NewFIFOScheduler(), stopc: make(chan struct{}), lg: lg, } s.ReadView = &readView{s} s.WriteView = &writeView{s} if s.le != nil { s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) }) } tx := s.b.BatchTx() tx.Lock() tx.UnsafeCreateBucket(keyBucketName) tx.UnsafeCreateBucket(metaBucketName) tx.Unlock() s.b.ForceCommit() s.mu.Lock() defer s.mu.Unlock() if err := s.restore(); err != nil { // TODO: return the error instead of panic here? panic("failed to recover store from backend") } return s } func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { if ctx == nil || ctx.Err() != nil { s.mu.Lock() select { case <-s.stopc: default: f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } s.fifoSched.Schedule(f) } s.mu.Unlock() return } close(ch) } func (s *store) Hash() (hash uint32, revision int64, err error) { // TODO: hash and revision could be inconsistent, one possible fix is to add s.revMu.RLock() at the beginning of function, which is costly start := time.Now() s.b.ForceCommit() h, err := s.b.Hash(DefaultIgnores) hashSec.Observe(time.Since(start).Seconds()) return h, s.currentRev, err } func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) { start := time.Now() s.mu.RLock() s.revMu.RLock() compactRev, currentRev = s.compactMainRev, s.currentRev s.revMu.RUnlock() if rev > 0 && rev <= compactRev { s.mu.RUnlock() return 0, 0, compactRev, ErrCompacted } else if rev > 0 && rev > currentRev { s.mu.RUnlock() return 0, currentRev, 0, ErrFutureRev } if rev == 0 { rev = currentRev } keep := s.kvindex.Keep(rev) tx := s.b.ReadTx() tx.RLock() defer tx.RUnlock() s.mu.RUnlock() upper := revision{main: rev + 1} lower := revision{main: compactRev + 1} h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) h.Write(keyBucketName) err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error { kr := bytesToRev(k) if !upper.GreaterThan(kr) { return nil } // skip revisions that are scheduled for deletion // due to compacting; don't skip if there isn't one. if lower.GreaterThan(kr) && len(keep) > 0 { if _, ok := keep[kr]; !ok { return nil } } h.Write(k) h.Write(v) return nil }) hash = h.Sum32() hashRevSec.Observe(time.Since(start).Seconds()) return hash, currentRev, compactRev, err } func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { s.revMu.Lock() if rev <= s.compactMainRev { ch := make(chan struct{}) f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } s.fifoSched.Schedule(f) s.revMu.Unlock() return ch, ErrCompacted } if rev > s.currentRev { s.revMu.Unlock() return nil, ErrFutureRev } s.compactMainRev = rev rbytes := newRevBytes() revToBytes(revision{main: rev}, rbytes) tx := s.b.BatchTx() tx.Lock() tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes) tx.Unlock() // ensure that desired compaction is persisted s.b.ForceCommit() s.revMu.Unlock() return nil, nil } func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { start := time.Now() keep := s.kvindex.Compact(rev) trace.Step("compact in-memory index tree") ch := make(chan struct{}) var j = func(ctx context.Context) { if ctx.Err() != nil { s.compactBarrier(ctx, ch) return } if !s.scheduleCompaction(rev, keep) { s.compactBarrier(nil, ch) return } close(ch) } s.fifoSched.Schedule(j) indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) trace.Step("schedule compaction") return ch, nil } func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { ch, err := s.updateCompactRev(rev) if nil != err { return ch, err } return s.compact(traceutil.TODO(), rev) } func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { s.mu.Lock() ch, err := s.updateCompactRev(rev) trace.Step("check and update compact revision") if err != nil { s.mu.Unlock() return ch, err } s.mu.Unlock() return s.compact(trace, rev) } // DefaultIgnores is a map of keys to ignore in hash checking. var DefaultIgnores map[backend.IgnoreKey]struct{} func init() { DefaultIgnores = map[backend.IgnoreKey]struct{}{ // consistent index might be changed due to v2 internal sync, which // is not controllable by the user. {Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {}, } } func (s *store) Commit() { s.mu.Lock() defer s.mu.Unlock() tx := s.b.BatchTx() tx.Lock() s.saveIndex(tx) tx.Unlock() s.b.ForceCommit() } func (s *store) Restore(b backend.Backend) error { s.mu.Lock() defer s.mu.Unlock() close(s.stopc) s.fifoSched.Stop() atomic.StoreUint64(&s.consistentIndex, 0) s.b = b s.kvindex = newTreeIndex(s.lg) s.currentRev = 1 s.compactMainRev = -1 s.fifoSched = schedule.NewFIFOScheduler() s.stopc = make(chan struct{}) return s.restore() } func (s *store) restore() error { s.setupMetricsReporter() min, max := newRevBytes(), newRevBytes() revToBytes(revision{main: 1}, min) revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) keyToLease := make(map[string]lease.LeaseID) // restore index tx := s.b.BatchTx() tx.Lock() _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0) if len(finishedCompactBytes) != 0 { s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main if s.lg != nil { s.lg.Info( "restored last compact revision", zap.String("meta-bucket-name", string(metaBucketName)), zap.String("meta-bucket-name-key", string(finishedCompactKeyName)), zap.Int64("restored-compact-revision", s.compactMainRev), ) } else { plog.Printf("restore compact to %d", s.compactMainRev) } } _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) scheduledCompact := int64(0) if len(scheduledCompactBytes) != 0 { scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main } // index keys concurrently as they're loaded in from tx keysGauge.Set(0) rkvc, revc := restoreIntoIndex(s.lg, s.kvindex) for { keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys)) if len(keys) == 0 { break } // rkvc blocks if the total pending keys exceeds the restore // chunk size to keep keys from consuming too much memory. restoreChunk(s.lg, rkvc, keys, vals, keyToLease) if len(keys) < restoreChunkKeys { // partial set implies final set break } // next set begins after where this one ended newMin := bytesToRev(keys[len(keys)-1][:revBytesLen]) newMin.sub++ revToBytes(newMin, min) } close(rkvc) s.currentRev = <-revc // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. // the correct revision should be set to compaction revision in the case, not the largest revision // we have seen. if s.currentRev < s.compactMainRev { s.currentRev = s.compactMainRev } if scheduledCompact <= s.compactMainRev { scheduledCompact = 0 } for key, lid := range keyToLease { if s.le == nil { panic("no lessor to attach lease") } err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}}) if err != nil { if s.lg != nil { s.lg.Warn( "failed to attach a lease", zap.String("lease-id", fmt.Sprintf("%016x", lid)), zap.Error(err), ) } else { plog.Errorf("unexpected Attach error: %v", err) } } } tx.Unlock() if scheduledCompact != 0 { s.compactLockfree(scheduledCompact) if s.lg != nil { s.lg.Info( "resume scheduled compaction", zap.String("meta-bucket-name", string(metaBucketName)), zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)), zap.Int64("scheduled-compact-revision", scheduledCompact), ) } else { plog.Printf("resume scheduled compaction at %d", scheduledCompact) } } return nil } type revKeyValue struct { key []byte kv mvccpb.KeyValue kstr string } func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) { rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1) go func() { currentRev := int64(1) defer func() { revc <- currentRev }() // restore the tree index from streaming the unordered index. kiCache := make(map[string]*keyIndex, restoreChunkKeys) for rkv := range rkvc { ki, ok := kiCache[rkv.kstr] // purge kiCache if many keys but still missing in the cache if !ok && len(kiCache) >= restoreChunkKeys { i := 10 for k := range kiCache { delete(kiCache, k) if i--; i == 0 { break } } } // cache miss, fetch from tree index if there if !ok { ki = &keyIndex{key: rkv.kv.Key} if idxKey := idx.KeyIndex(ki); idxKey != nil { kiCache[rkv.kstr], ki = idxKey, idxKey ok = true } } rev := bytesToRev(rkv.key) currentRev = rev.main if ok { if isTombstone(rkv.key) { ki.tombstone(lg, rev.main, rev.sub) continue } ki.put(lg, rev.main, rev.sub) } else if !isTombstone(rkv.key) { ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version) idx.Insert(ki) kiCache[rkv.kstr] = ki } } }() return rkvc, revc } func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) { for i, key := range keys { rkv := revKeyValue{key: key} if err := rkv.kv.Unmarshal(vals[i]); err != nil { if lg != nil { lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) } else { plog.Fatalf("cannot unmarshal event: %v", err) } } rkv.kstr = string(rkv.kv.Key) if isTombstone(key) { delete(keyToLease, rkv.kstr) } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease { keyToLease[rkv.kstr] = lid } else { delete(keyToLease, rkv.kstr) } kvc <- rkv } } func (s *store) Close() error { close(s.stopc) s.fifoSched.Stop() return nil } func (s *store) saveIndex(tx backend.BatchTx) { if s.ig == nil { return } bs := s.bytesBuf8 ci := s.ig.ConsistentIndex() binary.BigEndian.PutUint64(bs, ci) // put the index into the underlying backend // tx has been locked in TxnBegin, so there is no need to lock it again tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) atomic.StoreUint64(&s.consistentIndex, ci) } func (s *store) ConsistentIndex() uint64 { if ci := atomic.LoadUint64(&s.consistentIndex); ci > 0 { return ci } tx := s.b.BatchTx() tx.Lock() defer tx.Unlock() _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) if len(vs) == 0 { return 0 } v := binary.BigEndian.Uint64(vs[0]) atomic.StoreUint64(&s.consistentIndex, v) return v } func (s *store) setupMetricsReporter() { b := s.b reportDbTotalSizeInBytesMu.Lock() reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) } reportDbTotalSizeInBytesMu.Unlock() reportDbTotalSizeInBytesDebugMu.Lock() reportDbTotalSizeInBytesDebug = func() float64 { return float64(b.Size()) } reportDbTotalSizeInBytesDebugMu.Unlock() reportDbTotalSizeInUseInBytesMu.Lock() reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) } reportDbTotalSizeInUseInBytesMu.Unlock() reportDbOpenReadTxNMu.Lock() reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) } reportDbOpenReadTxNMu.Unlock() reportCurrentRevMu.Lock() reportCurrentRev = func() float64 { s.revMu.RLock() defer s.revMu.RUnlock() return float64(s.currentRev) } reportCurrentRevMu.Unlock() reportCompactRevMu.Lock() reportCompactRev = func() float64 { s.revMu.RLock() defer s.revMu.RUnlock() return float64(s.compactMainRev) } reportCompactRevMu.Unlock() } // appendMarkTombstone appends tombstone mark to normal revision bytes. func appendMarkTombstone(lg *zap.Logger, b []byte) []byte { if len(b) != revBytesLen { if lg != nil { lg.Panic( "cannot append tombstone mark to non-normal revision bytes", zap.Int("expected-revision-bytes-size", revBytesLen), zap.Int("given-revision-bytes-size", len(b)), ) } else { plog.Panicf("cannot append mark to non normal revision bytes") } } return append(b, markTombstone) } // isTombstone checks whether the revision bytes is a tombstone. func isTombstone(b []byte) bool { return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone }