mvcc: add more structured logging

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
release-3.4
Gyuho Lee 2018-05-04 13:15:51 -07:00
parent a32db53765
commit 03ef9745a9
11 changed files with 287 additions and 85 deletions

View File

@ -196,7 +196,11 @@ func (b *backend) Snapshot() Snapshot {
defer b.mu.RUnlock()
tx, err := b.db.Begin(false)
if err != nil {
plog.Fatalf("cannot begin tx (%s)", err)
if b.lg != nil {
b.lg.Fatal("failed to begin tx", zap.Error(err))
} else {
plog.Fatalf("cannot begin tx (%s)", err)
}
}
stopc, donec := make(chan struct{}), make(chan struct{})

View File

@ -22,6 +22,7 @@ import (
"time"
bolt "github.com/coreos/bbolt"
"go.uber.org/zap"
)
type BatchTx interface {
@ -47,7 +48,15 @@ type batchTx struct {
func (t *batchTx) UnsafeCreateBucket(name []byte) {
_, err := t.tx.CreateBucket(name)
if err != nil && err != bolt.ErrBucketExists {
plog.Fatalf("cannot create bucket %s (%v)", name, err)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to create a bucket",
zap.String("bucket-name", string(name)),
zap.Error(err),
)
} else {
plog.Fatalf("cannot create bucket %s (%v)", name, err)
}
}
t.pending++
}
@ -65,7 +74,14 @@ func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
plog.Fatalf("bucket %s does not exist", bucketName)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
)
} else {
plog.Fatalf("bucket %s does not exist", bucketName)
}
}
if seq {
// it is useful to increase fill percent when the workloads are mostly append-only.
@ -73,7 +89,15 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
bucket.FillPercent = 0.9
}
if err := bucket.Put(key, value); err != nil {
plog.Fatalf("cannot put key into bucket (%v)", err)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to write to a bucket",
zap.String("bucket-name", string(bucketName)),
zap.Error(err),
)
} else {
plog.Fatalf("cannot put key into bucket (%v)", err)
}
}
t.pending++
}
@ -82,7 +106,14 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
plog.Fatalf("bucket %s does not exist", bucketName)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
)
} else {
plog.Fatalf("bucket %s does not exist", bucketName)
}
}
return unsafeRange(bucket.Cursor(), key, endKey, limit)
}
@ -113,11 +144,26 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
plog.Fatalf("bucket %s does not exist", bucketName)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
)
} else {
plog.Fatalf("bucket %s does not exist", bucketName)
}
}
err := bucket.Delete(key)
if err != nil {
plog.Fatalf("cannot delete key from bucket (%v)", err)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to delete a key",
zap.String("bucket-name", string(bucketName)),
zap.Error(err),
)
} else {
plog.Fatalf("cannot delete key from bucket (%v)", err)
}
}
t.pending++
}
@ -177,7 +223,14 @@ func (t *batchTx) commit(stop bool) {
t.pending = 0
if err != nil {
plog.Fatalf("cannot commit tx (%s)", err)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to commit tx",
zap.Error(err),
)
} else {
plog.Fatalf("cannot commit tx (%s)", err)
}
}
}
if !stop {
@ -236,7 +289,14 @@ func (t *batchTxBuffered) commit(stop bool) {
func (t *batchTxBuffered) unsafeCommit(stop bool) {
if t.backend.readTx.tx != nil {
if err := t.backend.readTx.tx.Rollback(); err != nil {
plog.Fatalf("cannot rollback tx (%s)", err)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to rollback tx",
zap.Error(err),
)
} else {
plog.Fatalf("cannot rollback tx (%s)", err)
}
}
t.backend.readTx.reset()
}

View File

@ -57,12 +57,12 @@ func (ti *treeIndex) Put(key []byte, rev revision) {
defer ti.Unlock()
item := ti.tree.Get(keyi)
if item == nil {
keyi.put(rev.main, rev.sub)
keyi.put(ti.lg, rev.main, rev.sub)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi := item.(*keyIndex)
okeyi.put(rev.main, rev.sub)
okeyi.put(ti.lg, rev.main, rev.sub)
}
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
@ -72,7 +72,7 @@ func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, v
if keyi = ti.keyIndex(keyi); keyi == nil {
return revision{}, revision{}, 0, ErrRevisionNotFound
}
return keyi.get(atRev)
return keyi.get(ti.lg, atRev)
}
func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
@ -112,7 +112,7 @@ func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
return []revision{rev}
}
ti.visit(key, end, func(ki *keyIndex) {
if rev, _, _, err := ki.get(atRev); err == nil {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
revs = append(revs, rev)
}
})
@ -128,7 +128,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
return [][]byte{key}, []revision{rev}
}
ti.visit(key, end, func(ki *keyIndex) {
if rev, _, _, err := ki.get(atRev); err == nil {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
revs = append(revs, rev)
keys = append(keys, ki.key)
}
@ -147,7 +147,7 @@ func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
}
ki := item.(*keyIndex)
return ki.tombstone(rev.main, rev.sub)
return ki.tombstone(ti.lg, rev.main, rev.sub)
}
// RangeSince returns all revisions from key(including) to end(excluding)
@ -165,7 +165,7 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
return nil
}
keyi = item.(*keyIndex)
return keyi.since(rev)
return keyi.since(ti.lg, rev)
}
endi := &keyIndex{key: end}
@ -175,7 +175,7 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
return false
}
curKeyi := item.(*keyIndex)
revs = append(revs, curKeyi.since(rev)...)
revs = append(revs, curKeyi.since(ti.lg, rev)...)
return true
})
sort.Sort(revisions(revs))
@ -199,7 +199,7 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
//Lock is needed here to prevent modification to the keyIndex while
//compaction is going on or revision added to empty before deletion
ti.Lock()
keyi.compact(rev, available)
keyi.compact(ti.lg, rev, available)
if keyi.isEmpty() {
item := ti.tree.Delete(keyi)
if item == nil {

View File

@ -284,10 +284,10 @@ func restore(ti *treeIndex, key []byte, created, modified revision, ver int64) {
defer ti.Unlock()
item := ti.tree.Get(keyi)
if item == nil {
keyi.restore(created, modified, ver)
keyi.restore(ti.lg, created, modified, ver)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi := item.(*keyIndex)
okeyi.put(modified.main, modified.sub)
okeyi.put(ti.lg, modified.main, modified.sub)
}

View File

@ -20,6 +20,7 @@ import (
"fmt"
"github.com/google/btree"
"go.uber.org/zap"
)
var (
@ -73,11 +74,21 @@ type keyIndex struct {
}
// put puts a revision to the keyIndex.
func (ki *keyIndex) put(main int64, sub int64) {
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
rev := revision{main: main, sub: sub}
if !rev.GreaterThan(ki.modified) {
plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)
if lg != nil {
lg.Panic(
"'put' with an unexpected smaller revision",
zap.Int64("given-revision-main", rev.main),
zap.Int64("given-revision-sub", rev.sub),
zap.Int64("modified-revision-main", ki.modified.main),
zap.Int64("modified-revision-sub", ki.modified.sub),
)
} else {
plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)
}
}
if len(ki.generations) == 0 {
ki.generations = append(ki.generations, generation{})
@ -92,9 +103,16 @@ func (ki *keyIndex) put(main int64, sub int64) {
ki.modified = rev
}
func (ki *keyIndex) restore(created, modified revision, ver int64) {
func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) {
if len(ki.generations) != 0 {
plog.Panicf("store.keyindex: cannot restore non-empty keyIndex")
if lg != nil {
lg.Panic(
"'restore' got an unexpected non-empty generations",
zap.Int("generations-size", len(ki.generations)),
)
} else {
plog.Panicf("store.keyindex: cannot restore non-empty keyIndex")
}
}
ki.modified = modified
@ -106,14 +124,21 @@ func (ki *keyIndex) restore(created, modified revision, ver int64) {
// tombstone puts a revision, pointing to a tombstone, to the keyIndex.
// It also creates a new empty generation in the keyIndex.
// It returns ErrRevisionNotFound when tombstone on an empty generation.
func (ki *keyIndex) tombstone(main int64, sub int64) error {
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
if ki.isEmpty() {
plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
if lg != nil {
lg.Panic(
"'tombstone' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
} else {
plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
}
}
if ki.generations[len(ki.generations)-1].isEmpty() {
return ErrRevisionNotFound
}
ki.put(main, sub)
ki.put(lg, main, sub)
ki.generations = append(ki.generations, generation{})
keysGauge.Dec()
return nil
@ -121,9 +146,16 @@ func (ki *keyIndex) tombstone(main int64, sub int64) error {
// get gets the modified, created revision and version of the key that satisfies the given atRev.
// Rev must be higher than or equal to the given atRev.
func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err error) {
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
if ki.isEmpty() {
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
if lg != nil {
lg.Panic(
"'get' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
} else {
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
}
}
g := ki.findGeneration(atRev)
if g.isEmpty() {
@ -141,9 +173,16 @@ func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err
// since returns revisions since the given rev. Only the revision with the
// largest sub revision will be returned if multiple revisions have the same
// main revision.
func (ki *keyIndex) since(rev int64) []revision {
func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
if ki.isEmpty() {
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
if lg != nil {
lg.Panic(
"'since' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
} else {
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
}
}
since := revision{rev, 0}
var gi int
@ -182,9 +221,16 @@ func (ki *keyIndex) since(rev int64) []revision {
// revision than the given atRev except the largest one (If the largest one is
// a tombstone, it will not be kept).
// If a generation becomes empty during compaction, it will be removed.
func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
if ki.isEmpty() {
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
if lg != nil {
lg.Panic(
"'compact' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
} else {
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
}
}
genIdx, revIndex := ki.doCompact(atRev, available)

View File

@ -17,6 +17,8 @@ package mvcc
import (
"reflect"
"testing"
"go.uber.org/zap"
)
func TestKeyIndexGet(t *testing.T) {
@ -28,7 +30,7 @@ func TestKeyIndexGet(t *testing.T) {
// {{8, 0}[1], {10, 0}[2], {12, 0}(t)[3]}
// {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]}
ki := newTestKeyIndex()
ki.compact(4, make(map[revision]struct{}))
ki.compact(zap.NewExample(), 4, make(map[revision]struct{}))
tests := []struct {
rev int64
@ -68,7 +70,7 @@ func TestKeyIndexGet(t *testing.T) {
}
for i, tt := range tests {
mod, creat, ver, err := ki.get(tt.rev)
mod, creat, ver, err := ki.get(zap.NewExample(), tt.rev)
if err != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
}
@ -86,7 +88,7 @@ func TestKeyIndexGet(t *testing.T) {
func TestKeyIndexSince(t *testing.T) {
ki := newTestKeyIndex()
ki.compact(4, make(map[revision]struct{}))
ki.compact(zap.NewExample(), 4, make(map[revision]struct{}))
allRevs := []revision{{4, 0}, {6, 0}, {8, 0}, {10, 0}, {12, 0}, {14, 1}, {16, 0}}
tests := []struct {
@ -115,7 +117,7 @@ func TestKeyIndexSince(t *testing.T) {
}
for i, tt := range tests {
revs := ki.since(tt.rev)
revs := ki.since(zap.NewExample(), tt.rev)
if !reflect.DeepEqual(revs, tt.wrevs) {
t.Errorf("#%d: revs = %+v, want %+v", i, revs, tt.wrevs)
}
@ -124,7 +126,7 @@ func TestKeyIndexSince(t *testing.T) {
func TestKeyIndexPut(t *testing.T) {
ki := &keyIndex{key: []byte("foo")}
ki.put(5, 0)
ki.put(zap.NewExample(), 5, 0)
wki := &keyIndex{
key: []byte("foo"),
@ -135,7 +137,7 @@ func TestKeyIndexPut(t *testing.T) {
t.Errorf("ki = %+v, want %+v", ki, wki)
}
ki.put(7, 0)
ki.put(zap.NewExample(), 7, 0)
wki = &keyIndex{
key: []byte("foo"),
@ -149,7 +151,7 @@ func TestKeyIndexPut(t *testing.T) {
func TestKeyIndexRestore(t *testing.T) {
ki := &keyIndex{key: []byte("foo")}
ki.restore(revision{5, 0}, revision{7, 0}, 2)
ki.restore(zap.NewExample(), revision{5, 0}, revision{7, 0}, 2)
wki := &keyIndex{
key: []byte("foo"),
@ -163,9 +165,9 @@ func TestKeyIndexRestore(t *testing.T) {
func TestKeyIndexTombstone(t *testing.T) {
ki := &keyIndex{key: []byte("foo")}
ki.put(5, 0)
ki.put(zap.NewExample(), 5, 0)
err := ki.tombstone(7, 0)
err := ki.tombstone(zap.NewExample(), 7, 0)
if err != nil {
t.Errorf("unexpected tombstone error: %v", err)
}
@ -179,9 +181,9 @@ func TestKeyIndexTombstone(t *testing.T) {
t.Errorf("ki = %+v, want %+v", ki, wki)
}
ki.put(8, 0)
ki.put(9, 0)
err = ki.tombstone(15, 0)
ki.put(zap.NewExample(), 8, 0)
ki.put(zap.NewExample(), 9, 0)
err = ki.tombstone(zap.NewExample(), 15, 0)
if err != nil {
t.Errorf("unexpected tombstone error: %v", err)
}
@ -199,7 +201,7 @@ func TestKeyIndexTombstone(t *testing.T) {
t.Errorf("ki = %+v, want %+v", ki, wki)
}
err = ki.tombstone(16, 0)
err = ki.tombstone(zap.NewExample(), 16, 0)
if err != ErrRevisionNotFound {
t.Errorf("tombstone error = %v, want %v", err, ErrRevisionNotFound)
}
@ -454,7 +456,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
}
am = make(map[revision]struct{})
ki.compact(tt.compact, am)
ki.compact(zap.NewExample(), tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
}
@ -477,7 +479,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
}
am = make(map[revision]struct{})
ki.compact(tt.compact, am)
ki.compact(zap.NewExample(), tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
}
@ -500,7 +502,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) {
t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
}
am = make(map[revision]struct{})
ki.compact(tt.compact, am)
ki.compact(zap.NewExample(), tt.compact, am)
if !reflect.DeepEqual(ki, tt.wki) {
t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
}
@ -530,10 +532,10 @@ func cloneGeneration(g *generation) *generation {
// test that compact on version that higher than last modified version works well
func TestKeyIndexCompactOnFurtherRev(t *testing.T) {
ki := &keyIndex{key: []byte("foo")}
ki.put(1, 0)
ki.put(2, 0)
ki.put(zap.NewExample(), 1, 0)
ki.put(zap.NewExample(), 2, 0)
am := make(map[revision]struct{})
ki.compact(3, am)
ki.compact(zap.NewExample(), 3, am)
wki := &keyIndex{
key: []byte("foo"),
@ -685,14 +687,14 @@ func newTestKeyIndex() *keyIndex {
// {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]}
ki := &keyIndex{key: []byte("foo")}
ki.put(2, 0)
ki.put(4, 0)
ki.tombstone(6, 0)
ki.put(8, 0)
ki.put(10, 0)
ki.tombstone(12, 0)
ki.put(14, 0)
ki.put(14, 1)
ki.tombstone(16, 0)
ki.put(zap.NewExample(), 2, 0)
ki.put(zap.NewExample(), 4, 0)
ki.tombstone(zap.NewExample(), 6, 0)
ki.put(zap.NewExample(), 8, 0)
ki.put(zap.NewExample(), 10, 0)
ki.tombstone(zap.NewExample(), 12, 0)
ki.put(zap.NewExample(), 14, 0)
ki.put(zap.NewExample(), 14, 1)
ki.tombstone(zap.NewExample(), 16, 0)
return ki
}

View File

@ -351,7 +351,7 @@ func (s *store) restore() error {
// index keys concurrently as they're loaded in from tx
keysGauge.Set(0)
rkvc, revc := restoreIntoIndex(s.kvindex)
rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
for {
keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
if len(keys) == 0 {
@ -359,7 +359,7 @@ func (s *store) restore() error {
}
// rkvc blocks if the total pending keys exceeds the restore
// chunk size to keep keys from consuming too much memory.
restoreChunk(rkvc, keys, vals, keyToLease)
restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
if len(keys) < restoreChunkKeys {
// partial set implies final set
break
@ -426,7 +426,7 @@ type revKeyValue struct {
kstr string
}
func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) {
func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) {
rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
go func() {
currentRev := int64(1)
@ -457,12 +457,12 @@ func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) {
currentRev = rev.main
if ok {
if isTombstone(rkv.key) {
ki.tombstone(rev.main, rev.sub)
ki.tombstone(lg, rev.main, rev.sub)
continue
}
ki.put(rev.main, rev.sub)
ki.put(lg, rev.main, rev.sub)
} else if !isTombstone(rkv.key) {
ki.restore(revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
idx.Insert(ki)
kiCache[rkv.kstr] = ki
}
@ -471,11 +471,15 @@ func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) {
return rkvc, revc
}
func restoreChunk(kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
for i, key := range keys {
rkv := revKeyValue{key: key}
if err := rkv.kv.Unmarshal(vals[i]); err != nil {
plog.Fatalf("cannot unmarshal event: %v", err)
if lg != nil {
lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
} else {
plog.Fatalf("cannot unmarshal event: %v", err)
}
}
rkv.kstr = string(rkv.kv.Key)
if isTombstone(key) {
@ -525,9 +529,17 @@ func (s *store) ConsistentIndex() uint64 {
}
// appendMarkTombstone appends tombstone mark to normal revision bytes.
func appendMarkTombstone(b []byte) []byte {
func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
if len(b) != revBytesLen {
plog.Panicf("cannot append mark to non normal revision bytes")
if lg != nil {
lg.Panic(
"cannot append tombstone mark to non-normal revision bytes",
zap.Int("expected-revision-bytes-size", revBytesLen),
zap.Int("given-revision-bytes-size", len(b)),
)
} else {
plog.Panicf("cannot append mark to non normal revision bytes")
}
}
return append(b, markTombstone)
}

View File

@ -31,6 +31,7 @@ import (
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/schedule"
"github.com/coreos/etcd/pkg/testutil"
"go.uber.org/zap"
)
@ -672,7 +673,7 @@ func newTestKeyBytes(rev revision, tombstone bool) []byte {
bytes := newRevBytes()
revToBytes(rev, bytes)
if tombstone {
bytes = appendMarkTombstone(bytes)
bytes = appendMarkTombstone(zap.NewExample(), bytes)
}
return bytes
}
@ -696,6 +697,7 @@ func newFakeStore() *store {
compactMainRev: -1,
fifoSched: schedule.NewFIFOScheduler(),
stopc: make(chan struct{}),
lg: zap.NewExample(),
}
s.ReadView, s.WriteView = &readView{s}, &writeView{s}
return s

View File

@ -18,6 +18,7 @@ import (
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)
type storeTxnRead struct {
@ -139,10 +140,25 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
revToBytes(revpair, revBytes)
_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
if len(vs) != 1 {
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
if tr.s.lg != nil {
tr.s.lg.Fatal(
"range failed to find revision pair",
zap.Int64("revision-main", revpair.main),
zap.Int64("revision-sub", revpair.sub),
)
} else {
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
}
}
if err := kvs[i].Unmarshal(vs[0]); err != nil {
plog.Fatalf("cannot unmarshal event: %v", err)
if tr.s.lg != nil {
tr.s.lg.Fatal(
"failed to unmarshal mvccpb.KeyValue",
zap.Error(err),
)
} else {
plog.Fatalf("cannot unmarshal event: %v", err)
}
}
}
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
@ -177,7 +193,14 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
d, err := kv.Marshal()
if err != nil {
plog.Fatalf("cannot marshal event: %v", err)
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
} else {
plog.Fatalf("cannot marshal event: %v", err)
}
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
@ -190,7 +213,14 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
}
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
plog.Errorf("unexpected error from lease detach: %v", err)
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to detach old lease from a key",
zap.Error(err),
)
} else {
plog.Errorf("unexpected error from lease detach: %v", err)
}
}
}
if leaseID != lease.NoLease {
@ -223,19 +253,40 @@ func (tw *storeTxnWrite) delete(key []byte, rev revision) {
ibytes := newRevBytes()
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ibytes = appendMarkTombstone(ibytes)
if tw.storeTxnRead.s != nil && tw.storeTxnRead.s.lg != nil {
ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
} else {
// TODO: remove this in v3.5
ibytes = appendMarkTombstone(nil, ibytes)
}
kv := mvccpb.KeyValue{Key: key}
d, err := kv.Marshal()
if err != nil {
plog.Fatalf("cannot marshal event: %v", err)
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
} else {
plog.Fatalf("cannot marshal event: %v", err)
}
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
err = tw.s.kvindex.Tombstone(key, idxRev)
if err != nil {
plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to tombstone an existing key",
zap.String("key", string(key)),
zap.Error(err),
)
} else {
plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
}
}
tw.changes = append(tw.changes, kv)
@ -245,7 +296,14 @@ func (tw *storeTxnWrite) delete(key []byte, rev revision) {
if leaseID != lease.NoLease {
err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
if err != nil {
plog.Errorf("cannot detach %v", err)
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to detach old lease from a key",
zap.Error(err),
)
} else {
plog.Errorf("cannot detach %v", err)
}
}
}
}

View File

@ -16,6 +16,7 @@ package mvcc
import (
"encoding/binary"
"fmt"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
@ -47,7 +48,7 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
d, err := kv.Marshal()
if err != nil {
plog.Fatalf("cannot marshal event: %v", err)
panic(fmt.Errorf("cannot marshal event: %v", err))
}
be.BatchTx().Lock()

View File

@ -347,7 +347,13 @@ func (s *watchableStore) syncWatchers() int {
tx := s.store.b.ReadTx()
tx.Lock()
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
evs := kvsToEvents(wg, revs, vs)
var evs []mvccpb.Event
if s.store != nil && s.store.lg != nil {
evs = kvsToEvents(s.store.lg, wg, revs, vs)
} else {
// TODO: remove this in v3.5
evs = kvsToEvents(nil, wg, revs, vs)
}
tx.Unlock()
var victims watcherBatch
@ -399,11 +405,15 @@ func (s *watchableStore) syncWatchers() int {
}
// kvsToEvents gets all events for the watchers from all key-value pairs
func kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
for i, v := range vals {
var kv mvccpb.KeyValue
if err := kv.Unmarshal(v); err != nil {
plog.Panicf("cannot unmarshal event: %v", err)
if lg != nil {
lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
} else {
plog.Panicf("cannot unmarshal event: %v", err)
}
}
if !wg.contains(string(kv.Key)) {
@ -427,7 +437,14 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
var victim watcherBatch
for w, eb := range newWatcherBatch(&s.synced, evs) {
if eb.revs != 1 {
plog.Panicf("unexpected multiple revisions in notification")
if s.store != nil && s.store.lg != nil {
s.store.lg.Panic(
"unexpected multiple revisions in watch notification",
zap.Int("number-of-revisions", eb.revs),
)
} else {
plog.Panicf("unexpected multiple revisions in notification")
}
}
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))