From 03ef9745a992b4a5d33d867826784aabf98391ed Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Fri, 4 May 2018 13:15:51 -0700 Subject: [PATCH] mvcc: add more structured logging Signed-off-by: Gyuho Lee --- mvcc/backend/backend.go | 6 +++- mvcc/backend/batch_tx.go | 76 +++++++++++++++++++++++++++++++++++----- mvcc/index.go | 18 +++++----- mvcc/index_test.go | 4 +-- mvcc/key_index.go | 72 ++++++++++++++++++++++++++++++------- mvcc/key_index_test.go | 58 +++++++++++++++--------------- mvcc/kvstore.go | 32 +++++++++++------ mvcc/kvstore_test.go | 4 ++- mvcc/kvstore_txn.go | 74 +++++++++++++++++++++++++++++++++----- mvcc/util.go | 3 +- mvcc/watchable_store.go | 25 ++++++++++--- 11 files changed, 287 insertions(+), 85 deletions(-) diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index 3438e6780..d98ce11a3 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -196,7 +196,11 @@ func (b *backend) Snapshot() Snapshot { defer b.mu.RUnlock() tx, err := b.db.Begin(false) if err != nil { - plog.Fatalf("cannot begin tx (%s)", err) + if b.lg != nil { + b.lg.Fatal("failed to begin tx", zap.Error(err)) + } else { + plog.Fatalf("cannot begin tx (%s)", err) + } } stopc, donec := make(chan struct{}), make(chan struct{}) diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index ceb0b1739..d2a9e455b 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -22,6 +22,7 @@ import ( "time" bolt "github.com/coreos/bbolt" + "go.uber.org/zap" ) type BatchTx interface { @@ -47,7 +48,15 @@ type batchTx struct { func (t *batchTx) UnsafeCreateBucket(name []byte) { _, err := t.tx.CreateBucket(name) if err != nil && err != bolt.ErrBucketExists { - plog.Fatalf("cannot create bucket %s (%v)", name, err) + if t.backend.lg != nil { + t.backend.lg.Fatal( + "failed to create a bucket", + zap.String("bucket-name", string(name)), + zap.Error(err), + ) + } else { + plog.Fatalf("cannot create bucket %s (%v)", name, err) + } } t.pending++ } @@ -65,7 +74,14 @@ func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) { func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) { bucket := t.tx.Bucket(bucketName) if bucket == nil { - plog.Fatalf("bucket %s does not exist", bucketName) + if t.backend.lg != nil { + t.backend.lg.Fatal( + "failed to find a bucket", + zap.String("bucket-name", string(bucketName)), + ) + } else { + plog.Fatalf("bucket %s does not exist", bucketName) + } } if seq { // it is useful to increase fill percent when the workloads are mostly append-only. @@ -73,7 +89,15 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo bucket.FillPercent = 0.9 } if err := bucket.Put(key, value); err != nil { - plog.Fatalf("cannot put key into bucket (%v)", err) + if t.backend.lg != nil { + t.backend.lg.Fatal( + "failed to write to a bucket", + zap.String("bucket-name", string(bucketName)), + zap.Error(err), + ) + } else { + plog.Fatalf("cannot put key into bucket (%v)", err) + } } t.pending++ } @@ -82,7 +106,14 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { bucket := t.tx.Bucket(bucketName) if bucket == nil { - plog.Fatalf("bucket %s does not exist", bucketName) + if t.backend.lg != nil { + t.backend.lg.Fatal( + "failed to find a bucket", + zap.String("bucket-name", string(bucketName)), + ) + } else { + plog.Fatalf("bucket %s does not exist", bucketName) + } } return unsafeRange(bucket.Cursor(), key, endKey, limit) } @@ -113,11 +144,26 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { bucket := t.tx.Bucket(bucketName) if bucket == nil { - plog.Fatalf("bucket %s does not exist", bucketName) + if t.backend.lg != nil { + t.backend.lg.Fatal( + "failed to find a bucket", + zap.String("bucket-name", string(bucketName)), + ) + } else { + plog.Fatalf("bucket %s does not exist", bucketName) + } } err := bucket.Delete(key) if err != nil { - plog.Fatalf("cannot delete key from bucket (%v)", err) + if t.backend.lg != nil { + t.backend.lg.Fatal( + "failed to delete a key", + zap.String("bucket-name", string(bucketName)), + zap.Error(err), + ) + } else { + plog.Fatalf("cannot delete key from bucket (%v)", err) + } } t.pending++ } @@ -177,7 +223,14 @@ func (t *batchTx) commit(stop bool) { t.pending = 0 if err != nil { - plog.Fatalf("cannot commit tx (%s)", err) + if t.backend.lg != nil { + t.backend.lg.Fatal( + "failed to commit tx", + zap.Error(err), + ) + } else { + plog.Fatalf("cannot commit tx (%s)", err) + } } } if !stop { @@ -236,7 +289,14 @@ func (t *batchTxBuffered) commit(stop bool) { func (t *batchTxBuffered) unsafeCommit(stop bool) { if t.backend.readTx.tx != nil { if err := t.backend.readTx.tx.Rollback(); err != nil { - plog.Fatalf("cannot rollback tx (%s)", err) + if t.backend.lg != nil { + t.backend.lg.Fatal( + "failed to rollback tx", + zap.Error(err), + ) + } else { + plog.Fatalf("cannot rollback tx (%s)", err) + } } t.backend.readTx.reset() } diff --git a/mvcc/index.go b/mvcc/index.go index 16e223be2..f8cc6df88 100644 --- a/mvcc/index.go +++ b/mvcc/index.go @@ -57,12 +57,12 @@ func (ti *treeIndex) Put(key []byte, rev revision) { defer ti.Unlock() item := ti.tree.Get(keyi) if item == nil { - keyi.put(rev.main, rev.sub) + keyi.put(ti.lg, rev.main, rev.sub) ti.tree.ReplaceOrInsert(keyi) return } okeyi := item.(*keyIndex) - okeyi.put(rev.main, rev.sub) + okeyi.put(ti.lg, rev.main, rev.sub) } func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) { @@ -72,7 +72,7 @@ func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, v if keyi = ti.keyIndex(keyi); keyi == nil { return revision{}, revision{}, 0, ErrRevisionNotFound } - return keyi.get(atRev) + return keyi.get(ti.lg, atRev) } func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex { @@ -112,7 +112,7 @@ func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) { return []revision{rev} } ti.visit(key, end, func(ki *keyIndex) { - if rev, _, _, err := ki.get(atRev); err == nil { + if rev, _, _, err := ki.get(ti.lg, atRev); err == nil { revs = append(revs, rev) } }) @@ -128,7 +128,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs [] return [][]byte{key}, []revision{rev} } ti.visit(key, end, func(ki *keyIndex) { - if rev, _, _, err := ki.get(atRev); err == nil { + if rev, _, _, err := ki.get(ti.lg, atRev); err == nil { revs = append(revs, rev) keys = append(keys, ki.key) } @@ -147,7 +147,7 @@ func (ti *treeIndex) Tombstone(key []byte, rev revision) error { } ki := item.(*keyIndex) - return ki.tombstone(rev.main, rev.sub) + return ki.tombstone(ti.lg, rev.main, rev.sub) } // RangeSince returns all revisions from key(including) to end(excluding) @@ -165,7 +165,7 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision { return nil } keyi = item.(*keyIndex) - return keyi.since(rev) + return keyi.since(ti.lg, rev) } endi := &keyIndex{key: end} @@ -175,7 +175,7 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision { return false } curKeyi := item.(*keyIndex) - revs = append(revs, curKeyi.since(rev)...) + revs = append(revs, curKeyi.since(ti.lg, rev)...) return true }) sort.Sort(revisions(revs)) @@ -199,7 +199,7 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { //Lock is needed here to prevent modification to the keyIndex while //compaction is going on or revision added to empty before deletion ti.Lock() - keyi.compact(rev, available) + keyi.compact(ti.lg, rev, available) if keyi.isEmpty() { item := ti.tree.Delete(keyi) if item == nil { diff --git a/mvcc/index_test.go b/mvcc/index_test.go index 0016874e4..f9431ae31 100644 --- a/mvcc/index_test.go +++ b/mvcc/index_test.go @@ -284,10 +284,10 @@ func restore(ti *treeIndex, key []byte, created, modified revision, ver int64) { defer ti.Unlock() item := ti.tree.Get(keyi) if item == nil { - keyi.restore(created, modified, ver) + keyi.restore(ti.lg, created, modified, ver) ti.tree.ReplaceOrInsert(keyi) return } okeyi := item.(*keyIndex) - okeyi.put(modified.main, modified.sub) + okeyi.put(ti.lg, modified.main, modified.sub) } diff --git a/mvcc/key_index.go b/mvcc/key_index.go index 805922bfc..2b0844e3c 100644 --- a/mvcc/key_index.go +++ b/mvcc/key_index.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/google/btree" + "go.uber.org/zap" ) var ( @@ -73,11 +74,21 @@ type keyIndex struct { } // put puts a revision to the keyIndex. -func (ki *keyIndex) put(main int64, sub int64) { +func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) { rev := revision{main: main, sub: sub} if !rev.GreaterThan(ki.modified) { - plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified) + if lg != nil { + lg.Panic( + "'put' with an unexpected smaller revision", + zap.Int64("given-revision-main", rev.main), + zap.Int64("given-revision-sub", rev.sub), + zap.Int64("modified-revision-main", ki.modified.main), + zap.Int64("modified-revision-sub", ki.modified.sub), + ) + } else { + plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified) + } } if len(ki.generations) == 0 { ki.generations = append(ki.generations, generation{}) @@ -92,9 +103,16 @@ func (ki *keyIndex) put(main int64, sub int64) { ki.modified = rev } -func (ki *keyIndex) restore(created, modified revision, ver int64) { +func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) { if len(ki.generations) != 0 { - plog.Panicf("store.keyindex: cannot restore non-empty keyIndex") + if lg != nil { + lg.Panic( + "'restore' got an unexpected non-empty generations", + zap.Int("generations-size", len(ki.generations)), + ) + } else { + plog.Panicf("store.keyindex: cannot restore non-empty keyIndex") + } } ki.modified = modified @@ -106,14 +124,21 @@ func (ki *keyIndex) restore(created, modified revision, ver int64) { // tombstone puts a revision, pointing to a tombstone, to the keyIndex. // It also creates a new empty generation in the keyIndex. // It returns ErrRevisionNotFound when tombstone on an empty generation. -func (ki *keyIndex) tombstone(main int64, sub int64) error { +func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error { if ki.isEmpty() { - plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key)) + if lg != nil { + lg.Panic( + "'tombstone' got an unexpected empty keyIndex", + zap.String("key", string(ki.key)), + ) + } else { + plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key)) + } } if ki.generations[len(ki.generations)-1].isEmpty() { return ErrRevisionNotFound } - ki.put(main, sub) + ki.put(lg, main, sub) ki.generations = append(ki.generations, generation{}) keysGauge.Dec() return nil @@ -121,9 +146,16 @@ func (ki *keyIndex) tombstone(main int64, sub int64) error { // get gets the modified, created revision and version of the key that satisfies the given atRev. // Rev must be higher than or equal to the given atRev. -func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err error) { +func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) { if ki.isEmpty() { - plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key)) + if lg != nil { + lg.Panic( + "'get' got an unexpected empty keyIndex", + zap.String("key", string(ki.key)), + ) + } else { + plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key)) + } } g := ki.findGeneration(atRev) if g.isEmpty() { @@ -141,9 +173,16 @@ func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err // since returns revisions since the given rev. Only the revision with the // largest sub revision will be returned if multiple revisions have the same // main revision. -func (ki *keyIndex) since(rev int64) []revision { +func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision { if ki.isEmpty() { - plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key)) + if lg != nil { + lg.Panic( + "'since' got an unexpected empty keyIndex", + zap.String("key", string(ki.key)), + ) + } else { + plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key)) + } } since := revision{rev, 0} var gi int @@ -182,9 +221,16 @@ func (ki *keyIndex) since(rev int64) []revision { // revision than the given atRev except the largest one (If the largest one is // a tombstone, it will not be kept). // If a generation becomes empty during compaction, it will be removed. -func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) { +func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) { if ki.isEmpty() { - plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key)) + if lg != nil { + lg.Panic( + "'compact' got an unexpected empty keyIndex", + zap.String("key", string(ki.key)), + ) + } else { + plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key)) + } } genIdx, revIndex := ki.doCompact(atRev, available) diff --git a/mvcc/key_index_test.go b/mvcc/key_index_test.go index 57e6a9cd7..9e7da6ad9 100644 --- a/mvcc/key_index_test.go +++ b/mvcc/key_index_test.go @@ -17,6 +17,8 @@ package mvcc import ( "reflect" "testing" + + "go.uber.org/zap" ) func TestKeyIndexGet(t *testing.T) { @@ -28,7 +30,7 @@ func TestKeyIndexGet(t *testing.T) { // {{8, 0}[1], {10, 0}[2], {12, 0}(t)[3]} // {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]} ki := newTestKeyIndex() - ki.compact(4, make(map[revision]struct{})) + ki.compact(zap.NewExample(), 4, make(map[revision]struct{})) tests := []struct { rev int64 @@ -68,7 +70,7 @@ func TestKeyIndexGet(t *testing.T) { } for i, tt := range tests { - mod, creat, ver, err := ki.get(tt.rev) + mod, creat, ver, err := ki.get(zap.NewExample(), tt.rev) if err != tt.werr { t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) } @@ -86,7 +88,7 @@ func TestKeyIndexGet(t *testing.T) { func TestKeyIndexSince(t *testing.T) { ki := newTestKeyIndex() - ki.compact(4, make(map[revision]struct{})) + ki.compact(zap.NewExample(), 4, make(map[revision]struct{})) allRevs := []revision{{4, 0}, {6, 0}, {8, 0}, {10, 0}, {12, 0}, {14, 1}, {16, 0}} tests := []struct { @@ -115,7 +117,7 @@ func TestKeyIndexSince(t *testing.T) { } for i, tt := range tests { - revs := ki.since(tt.rev) + revs := ki.since(zap.NewExample(), tt.rev) if !reflect.DeepEqual(revs, tt.wrevs) { t.Errorf("#%d: revs = %+v, want %+v", i, revs, tt.wrevs) } @@ -124,7 +126,7 @@ func TestKeyIndexSince(t *testing.T) { func TestKeyIndexPut(t *testing.T) { ki := &keyIndex{key: []byte("foo")} - ki.put(5, 0) + ki.put(zap.NewExample(), 5, 0) wki := &keyIndex{ key: []byte("foo"), @@ -135,7 +137,7 @@ func TestKeyIndexPut(t *testing.T) { t.Errorf("ki = %+v, want %+v", ki, wki) } - ki.put(7, 0) + ki.put(zap.NewExample(), 7, 0) wki = &keyIndex{ key: []byte("foo"), @@ -149,7 +151,7 @@ func TestKeyIndexPut(t *testing.T) { func TestKeyIndexRestore(t *testing.T) { ki := &keyIndex{key: []byte("foo")} - ki.restore(revision{5, 0}, revision{7, 0}, 2) + ki.restore(zap.NewExample(), revision{5, 0}, revision{7, 0}, 2) wki := &keyIndex{ key: []byte("foo"), @@ -163,9 +165,9 @@ func TestKeyIndexRestore(t *testing.T) { func TestKeyIndexTombstone(t *testing.T) { ki := &keyIndex{key: []byte("foo")} - ki.put(5, 0) + ki.put(zap.NewExample(), 5, 0) - err := ki.tombstone(7, 0) + err := ki.tombstone(zap.NewExample(), 7, 0) if err != nil { t.Errorf("unexpected tombstone error: %v", err) } @@ -179,9 +181,9 @@ func TestKeyIndexTombstone(t *testing.T) { t.Errorf("ki = %+v, want %+v", ki, wki) } - ki.put(8, 0) - ki.put(9, 0) - err = ki.tombstone(15, 0) + ki.put(zap.NewExample(), 8, 0) + ki.put(zap.NewExample(), 9, 0) + err = ki.tombstone(zap.NewExample(), 15, 0) if err != nil { t.Errorf("unexpected tombstone error: %v", err) } @@ -199,7 +201,7 @@ func TestKeyIndexTombstone(t *testing.T) { t.Errorf("ki = %+v, want %+v", ki, wki) } - err = ki.tombstone(16, 0) + err = ki.tombstone(zap.NewExample(), 16, 0) if err != ErrRevisionNotFound { t.Errorf("tombstone error = %v, want %v", err, ErrRevisionNotFound) } @@ -454,7 +456,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) } am = make(map[revision]struct{}) - ki.compact(tt.compact, am) + ki.compact(zap.NewExample(), tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) } @@ -477,7 +479,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) } am = make(map[revision]struct{}) - ki.compact(tt.compact, am) + ki.compact(zap.NewExample(), tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) } @@ -500,7 +502,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) } am = make(map[revision]struct{}) - ki.compact(tt.compact, am) + ki.compact(zap.NewExample(), tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) } @@ -530,10 +532,10 @@ func cloneGeneration(g *generation) *generation { // test that compact on version that higher than last modified version works well func TestKeyIndexCompactOnFurtherRev(t *testing.T) { ki := &keyIndex{key: []byte("foo")} - ki.put(1, 0) - ki.put(2, 0) + ki.put(zap.NewExample(), 1, 0) + ki.put(zap.NewExample(), 2, 0) am := make(map[revision]struct{}) - ki.compact(3, am) + ki.compact(zap.NewExample(), 3, am) wki := &keyIndex{ key: []byte("foo"), @@ -685,14 +687,14 @@ func newTestKeyIndex() *keyIndex { // {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]} ki := &keyIndex{key: []byte("foo")} - ki.put(2, 0) - ki.put(4, 0) - ki.tombstone(6, 0) - ki.put(8, 0) - ki.put(10, 0) - ki.tombstone(12, 0) - ki.put(14, 0) - ki.put(14, 1) - ki.tombstone(16, 0) + ki.put(zap.NewExample(), 2, 0) + ki.put(zap.NewExample(), 4, 0) + ki.tombstone(zap.NewExample(), 6, 0) + ki.put(zap.NewExample(), 8, 0) + ki.put(zap.NewExample(), 10, 0) + ki.tombstone(zap.NewExample(), 12, 0) + ki.put(zap.NewExample(), 14, 0) + ki.put(zap.NewExample(), 14, 1) + ki.tombstone(zap.NewExample(), 16, 0) return ki } diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 1b46ce4b1..592f46d21 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -351,7 +351,7 @@ func (s *store) restore() error { // index keys concurrently as they're loaded in from tx keysGauge.Set(0) - rkvc, revc := restoreIntoIndex(s.kvindex) + rkvc, revc := restoreIntoIndex(s.lg, s.kvindex) for { keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys)) if len(keys) == 0 { @@ -359,7 +359,7 @@ func (s *store) restore() error { } // rkvc blocks if the total pending keys exceeds the restore // chunk size to keep keys from consuming too much memory. - restoreChunk(rkvc, keys, vals, keyToLease) + restoreChunk(s.lg, rkvc, keys, vals, keyToLease) if len(keys) < restoreChunkKeys { // partial set implies final set break @@ -426,7 +426,7 @@ type revKeyValue struct { kstr string } -func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) { +func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) { rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1) go func() { currentRev := int64(1) @@ -457,12 +457,12 @@ func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) { currentRev = rev.main if ok { if isTombstone(rkv.key) { - ki.tombstone(rev.main, rev.sub) + ki.tombstone(lg, rev.main, rev.sub) continue } - ki.put(rev.main, rev.sub) + ki.put(lg, rev.main, rev.sub) } else if !isTombstone(rkv.key) { - ki.restore(revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version) + ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version) idx.Insert(ki) kiCache[rkv.kstr] = ki } @@ -471,11 +471,15 @@ func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) { return rkvc, revc } -func restoreChunk(kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) { +func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) { for i, key := range keys { rkv := revKeyValue{key: key} if err := rkv.kv.Unmarshal(vals[i]); err != nil { - plog.Fatalf("cannot unmarshal event: %v", err) + if lg != nil { + lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) + } else { + plog.Fatalf("cannot unmarshal event: %v", err) + } } rkv.kstr = string(rkv.kv.Key) if isTombstone(key) { @@ -525,9 +529,17 @@ func (s *store) ConsistentIndex() uint64 { } // appendMarkTombstone appends tombstone mark to normal revision bytes. -func appendMarkTombstone(b []byte) []byte { +func appendMarkTombstone(lg *zap.Logger, b []byte) []byte { if len(b) != revBytesLen { - plog.Panicf("cannot append mark to non normal revision bytes") + if lg != nil { + lg.Panic( + "cannot append tombstone mark to non-normal revision bytes", + zap.Int("expected-revision-bytes-size", revBytesLen), + zap.Int("given-revision-bytes-size", len(b)), + ) + } else { + plog.Panicf("cannot append mark to non normal revision bytes") + } } return append(b, markTombstone) } diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index 157247606..f4d0fdfe3 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -31,6 +31,7 @@ import ( "github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/pkg/schedule" "github.com/coreos/etcd/pkg/testutil" + "go.uber.org/zap" ) @@ -672,7 +673,7 @@ func newTestKeyBytes(rev revision, tombstone bool) []byte { bytes := newRevBytes() revToBytes(rev, bytes) if tombstone { - bytes = appendMarkTombstone(bytes) + bytes = appendMarkTombstone(zap.NewExample(), bytes) } return bytes } @@ -696,6 +697,7 @@ func newFakeStore() *store { compactMainRev: -1, fifoSched: schedule.NewFIFOScheduler(), stopc: make(chan struct{}), + lg: zap.NewExample(), } s.ReadView, s.WriteView = &readView{s}, &writeView{s} return s diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go index d568d8a2e..3e598cfc2 100644 --- a/mvcc/kvstore_txn.go +++ b/mvcc/kvstore_txn.go @@ -18,6 +18,7 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/mvccpb" + "go.uber.org/zap" ) type storeTxnRead struct { @@ -139,10 +140,25 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions revToBytes(revpair, revBytes) _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0) if len(vs) != 1 { - plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub) + if tr.s.lg != nil { + tr.s.lg.Fatal( + "range failed to find revision pair", + zap.Int64("revision-main", revpair.main), + zap.Int64("revision-sub", revpair.sub), + ) + } else { + plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub) + } } if err := kvs[i].Unmarshal(vs[0]); err != nil { - plog.Fatalf("cannot unmarshal event: %v", err) + if tr.s.lg != nil { + tr.s.lg.Fatal( + "failed to unmarshal mvccpb.KeyValue", + zap.Error(err), + ) + } else { + plog.Fatalf("cannot unmarshal event: %v", err) + } } } return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil @@ -177,7 +193,14 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { d, err := kv.Marshal() if err != nil { - plog.Fatalf("cannot marshal event: %v", err) + if tw.storeTxnRead.s.lg != nil { + tw.storeTxnRead.s.lg.Fatal( + "failed to marshal mvccpb.KeyValue", + zap.Error(err), + ) + } else { + plog.Fatalf("cannot marshal event: %v", err) + } } tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) @@ -190,7 +213,14 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { } err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) if err != nil { - plog.Errorf("unexpected error from lease detach: %v", err) + if tw.storeTxnRead.s.lg != nil { + tw.storeTxnRead.s.lg.Fatal( + "failed to detach old lease from a key", + zap.Error(err), + ) + } else { + plog.Errorf("unexpected error from lease detach: %v", err) + } } } if leaseID != lease.NoLease { @@ -223,19 +253,40 @@ func (tw *storeTxnWrite) delete(key []byte, rev revision) { ibytes := newRevBytes() idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))} revToBytes(idxRev, ibytes) - ibytes = appendMarkTombstone(ibytes) + + if tw.storeTxnRead.s != nil && tw.storeTxnRead.s.lg != nil { + ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes) + } else { + // TODO: remove this in v3.5 + ibytes = appendMarkTombstone(nil, ibytes) + } kv := mvccpb.KeyValue{Key: key} d, err := kv.Marshal() if err != nil { - plog.Fatalf("cannot marshal event: %v", err) + if tw.storeTxnRead.s.lg != nil { + tw.storeTxnRead.s.lg.Fatal( + "failed to marshal mvccpb.KeyValue", + zap.Error(err), + ) + } else { + plog.Fatalf("cannot marshal event: %v", err) + } } tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) err = tw.s.kvindex.Tombstone(key, idxRev) if err != nil { - plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err) + if tw.storeTxnRead.s.lg != nil { + tw.storeTxnRead.s.lg.Fatal( + "failed to tombstone an existing key", + zap.String("key", string(key)), + zap.Error(err), + ) + } else { + plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err) + } } tw.changes = append(tw.changes, kv) @@ -245,7 +296,14 @@ func (tw *storeTxnWrite) delete(key []byte, rev revision) { if leaseID != lease.NoLease { err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item}) if err != nil { - plog.Errorf("cannot detach %v", err) + if tw.storeTxnRead.s.lg != nil { + tw.storeTxnRead.s.lg.Fatal( + "failed to detach old lease from a key", + zap.Error(err), + ) + } else { + plog.Errorf("cannot detach %v", err) + } } } } diff --git a/mvcc/util.go b/mvcc/util.go index 8a0df0bfc..aeb2ea8cb 100644 --- a/mvcc/util.go +++ b/mvcc/util.go @@ -16,6 +16,7 @@ package mvcc import ( "encoding/binary" + "fmt" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/mvccpb" @@ -47,7 +48,7 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) { d, err := kv.Marshal() if err != nil { - plog.Fatalf("cannot marshal event: %v", err) + panic(fmt.Errorf("cannot marshal event: %v", err)) } be.BatchTx().Lock() diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 1c70d6ade..edbb7f9d0 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -347,7 +347,13 @@ func (s *watchableStore) syncWatchers() int { tx := s.store.b.ReadTx() tx.Lock() revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) - evs := kvsToEvents(wg, revs, vs) + var evs []mvccpb.Event + if s.store != nil && s.store.lg != nil { + evs = kvsToEvents(s.store.lg, wg, revs, vs) + } else { + // TODO: remove this in v3.5 + evs = kvsToEvents(nil, wg, revs, vs) + } tx.Unlock() var victims watcherBatch @@ -399,11 +405,15 @@ func (s *watchableStore) syncWatchers() int { } // kvsToEvents gets all events for the watchers from all key-value pairs -func kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) { +func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) { for i, v := range vals { var kv mvccpb.KeyValue if err := kv.Unmarshal(v); err != nil { - plog.Panicf("cannot unmarshal event: %v", err) + if lg != nil { + lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) + } else { + plog.Panicf("cannot unmarshal event: %v", err) + } } if !wg.contains(string(kv.Key)) { @@ -427,7 +437,14 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { var victim watcherBatch for w, eb := range newWatcherBatch(&s.synced, evs) { if eb.revs != 1 { - plog.Panicf("unexpected multiple revisions in notification") + if s.store != nil && s.store.lg != nil { + s.store.lg.Panic( + "unexpected multiple revisions in watch notification", + zap.Int("number-of-revisions", eb.revs), + ) + } else { + plog.Panicf("unexpected multiple revisions in notification") + } } if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) { pendingEventsGauge.Add(float64(len(eb.evs)))