bugfix:dead lock on store.mu when store.Compact in store.Restore happens

release-3.4
WizardCXY 2019-01-18 14:13:32 +08:00
parent a00bff7848
commit 6e8913b004
2 changed files with 83 additions and 48 deletions

View File

@ -139,6 +139,8 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentI
tx.Unlock()
s.b.ForceCommit()
s.mu.Lock()
defer s.mu.Unlock()
if err := s.restore(); err != nil {
// TODO: return the error instead of panic here?
panic("failed to recover store from backend")
@ -225,25 +227,20 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
return hash, currentRev, compactRev, err
}
func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.mu.Lock()
func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
s.revMu.Lock()
if rev <= s.compactMainRev {
ch := make(chan struct{})
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
s.fifoSched.Schedule(f)
s.mu.Unlock()
s.revMu.Unlock()
return ch, ErrCompacted
}
if rev > s.currentRev {
s.mu.Unlock()
s.revMu.Unlock()
return nil, ErrFutureRev
}
start := time.Now()
s.compactMainRev = rev
rbytes := newRevBytes()
@ -256,8 +253,13 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
// ensure that desired compaction is persisted
s.b.ForceCommit()
s.mu.Unlock()
s.revMu.Unlock()
return nil, nil
}
func (s *store) compact(rev int64) (<-chan struct{}, error) {
start := time.Now()
keep := s.kvindex.Compact(rev)
ch := make(chan struct{})
var j = func(ctx context.Context) {
@ -278,6 +280,29 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
return ch, nil
}
func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
ch, err := s.updateCompactRev(rev)
if nil != err {
return ch, err
}
return s.compact(rev)
}
func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.mu.Lock()
ch, err := s.updateCompactRev(rev)
if err != nil {
s.mu.Unlock()
return ch, err
}
s.mu.Unlock()
return s.compact(rev)
}
// DefaultIgnores is a map of keys to ignore in hash checking.
var DefaultIgnores map[backend.IgnoreKey]struct{}
@ -415,7 +440,7 @@ func (s *store) restore() error {
tx.Unlock()
if scheduledCompact != 0 {
s.Compact(scheduledCompact)
s.compactLockfree(scheduledCompact)
if s.lg != nil {
s.lg.Info(

View File

@ -466,51 +466,61 @@ func TestRestoreDelete(t *testing.T) {
}
func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)
tests := []string{"recreate", "restore"}
for _, test := range tests {
b, tmpPath := backend.NewDefaultTmpBackend()
s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)
s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease)
// write scheduled compaction, but not do compaction
rbytes := newRevBytes()
revToBytes(revision{main: 2}, rbytes)
tx := s0.b.BatchTx()
tx.Lock()
tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
tx.Unlock()
s0.Close()
s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
// wait for scheduled compaction to be finished
time.Sleep(100 * time.Millisecond)
if _, err := s1.Range([]byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted {
t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
}
// check the key in backend is deleted
revbytes := newRevBytes()
revToBytes(revision{main: 1}, revbytes)
// The disk compaction is done asynchronously and requires more time on slow disk.
// try 5 times for CI with slow IO.
for i := 0; i < 5; i++ {
tx = s1.b.BatchTx()
// write scheduled compaction, but not do compaction
rbytes := newRevBytes()
revToBytes(revision{main: 2}, rbytes)
tx := s0.b.BatchTx()
tx.Lock()
ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
tx.Unlock()
if len(ks) != 0 {
time.Sleep(100 * time.Millisecond)
continue
}
return
}
t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
s0.Close()
var s *store
switch test {
case "recreate":
s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
case "restore":
s0.Restore(b)
s = s0
}
// wait for scheduled compaction to be finished
time.Sleep(100 * time.Millisecond)
if _, err := s.Range([]byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted {
t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
}
// check the key in backend is deleted
revbytes := newRevBytes()
revToBytes(revision{main: 1}, revbytes)
// The disk compaction is done asynchronously and requires more time on slow disk.
// try 5 times for CI with slow IO.
for i := 0; i < 5; i++ {
tx = s.b.BatchTx()
tx.Lock()
ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
tx.Unlock()
if len(ks) != 0 {
time.Sleep(100 * time.Millisecond)
continue
}
return
}
t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
}
}
type hashKVResult struct {