From 6e8913b00487554953fc683bfbb81f9d79a3e962 Mon Sep 17 00:00:00 2001 From: WizardCXY Date: Fri, 18 Jan 2019 14:13:32 +0800 Subject: [PATCH] bugfix:dead lock on store.mu when store.Compact in store.Restore happens --- mvcc/kvstore.go | 41 ++++++++++++++++---- mvcc/kvstore_test.go | 90 ++++++++++++++++++++++++-------------------- 2 files changed, 83 insertions(+), 48 deletions(-) diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 183e720e2..4ebc27490 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -139,6 +139,8 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentI tx.Unlock() s.b.ForceCommit() + s.mu.Lock() + defer s.mu.Unlock() if err := s.restore(); err != nil { // TODO: return the error instead of panic here? panic("failed to recover store from backend") @@ -225,25 +227,20 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev return hash, currentRev, compactRev, err } -func (s *store) Compact(rev int64) (<-chan struct{}, error) { - s.mu.Lock() +func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { s.revMu.Lock() if rev <= s.compactMainRev { ch := make(chan struct{}) f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } s.fifoSched.Schedule(f) - s.mu.Unlock() s.revMu.Unlock() return ch, ErrCompacted } if rev > s.currentRev { - s.mu.Unlock() s.revMu.Unlock() return nil, ErrFutureRev } - start := time.Now() - s.compactMainRev = rev rbytes := newRevBytes() @@ -256,8 +253,13 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) { // ensure that desired compaction is persisted s.b.ForceCommit() - s.mu.Unlock() s.revMu.Unlock() + + return nil, nil +} + +func (s *store) compact(rev int64) (<-chan struct{}, error) { + start := time.Now() keep := s.kvindex.Compact(rev) ch := make(chan struct{}) var j = func(ctx context.Context) { @@ -278,6 +280,29 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) { return ch, nil } +func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { + ch, err := s.updateCompactRev(rev) + if nil != err { + return ch, err + } + + return s.compact(rev) +} + +func (s *store) Compact(rev int64) (<-chan struct{}, error) { + s.mu.Lock() + + ch, err := s.updateCompactRev(rev) + + if err != nil { + s.mu.Unlock() + return ch, err + } + s.mu.Unlock() + + return s.compact(rev) +} + // DefaultIgnores is a map of keys to ignore in hash checking. var DefaultIgnores map[backend.IgnoreKey]struct{} @@ -415,7 +440,7 @@ func (s *store) restore() error { tx.Unlock() if scheduledCompact != 0 { - s.Compact(scheduledCompact) + s.compactLockfree(scheduledCompact) if s.lg != nil { s.lg.Info( diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index 267c01b44..ef48464d6 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -466,51 +466,61 @@ func TestRestoreDelete(t *testing.T) { } func TestRestoreContinueUnfinishedCompaction(t *testing.T) { - b, tmpPath := backend.NewDefaultTmpBackend() - s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) - defer os.Remove(tmpPath) + tests := []string{"recreate", "restore"} + for _, test := range tests { + b, tmpPath := backend.NewDefaultTmpBackend() + s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + defer os.Remove(tmpPath) - s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) - s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease) - s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease) + s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) + s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease) + s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease) - // write scheduled compaction, but not do compaction - rbytes := newRevBytes() - revToBytes(revision{main: 2}, rbytes) - tx := s0.b.BatchTx() - tx.Lock() - tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes) - tx.Unlock() - - s0.Close() - - s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) - - // wait for scheduled compaction to be finished - time.Sleep(100 * time.Millisecond) - - if _, err := s1.Range([]byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted { - t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted) - } - // check the key in backend is deleted - revbytes := newRevBytes() - revToBytes(revision{main: 1}, revbytes) - - // The disk compaction is done asynchronously and requires more time on slow disk. - // try 5 times for CI with slow IO. - for i := 0; i < 5; i++ { - tx = s1.b.BatchTx() + // write scheduled compaction, but not do compaction + rbytes := newRevBytes() + revToBytes(revision{main: 2}, rbytes) + tx := s0.b.BatchTx() tx.Lock() - ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes) tx.Unlock() - if len(ks) != 0 { - time.Sleep(100 * time.Millisecond) - continue - } - return - } - t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes)) + s0.Close() + + var s *store + switch test { + case "recreate": + s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + case "restore": + s0.Restore(b) + s = s0 + } + + // wait for scheduled compaction to be finished + time.Sleep(100 * time.Millisecond) + + if _, err := s.Range([]byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted { + t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted) + } + // check the key in backend is deleted + revbytes := newRevBytes() + revToBytes(revision{main: 1}, revbytes) + + // The disk compaction is done asynchronously and requires more time on slow disk. + // try 5 times for CI with slow IO. + for i := 0; i < 5; i++ { + tx = s.b.BatchTx() + tx.Lock() + ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + tx.Unlock() + if len(ks) != 0 { + time.Sleep(100 * time.Millisecond) + continue + } + return + } + + t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes)) + } } type hashKVResult struct {