storage: wait for compact goroutine to exit before close backend
If backend is closed, the operations on backend in compact goroutine will panic. So this PR waits for compact goroutine to exit before close backend. This fixes the TestWorkflow failure too.release-2.1
parent
148394f66f
commit
7cba42fb73
|
@ -44,6 +44,9 @@ func TestWorkflow(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Errorf("#%d: range error (%v)", err)
|
||||
}
|
||||
if len(kvs) != len(wkvs) {
|
||||
t.Fatalf("#%d: len(kvs) = %d, want %d", i, len(kvs), len(wkvs))
|
||||
}
|
||||
for j, kv := range kvs {
|
||||
if !reflect.DeepEqual(kv.Key, wkvs[j].k) {
|
||||
t.Errorf("#%d: keys[%d] = %s, want %s", i, j, kv.Key, wkvs[j].k)
|
||||
|
|
|
@ -39,6 +39,9 @@ type store struct {
|
|||
|
||||
tmu sync.Mutex // protect the tnxID field
|
||||
tnxID int64 // tracks the current tnxID to verify tnx operations
|
||||
|
||||
wg sync.WaitGroup
|
||||
stopc chan struct{}
|
||||
}
|
||||
|
||||
func newStore(path string) *store {
|
||||
|
@ -47,6 +50,7 @@ func newStore(path string) *store {
|
|||
kvindex: newTreeIndex(),
|
||||
currentRev: reversion{},
|
||||
compactMainRev: -1,
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
|
@ -161,6 +165,7 @@ func (s *store) Compact(rev int64) error {
|
|||
|
||||
keep := s.kvindex.Compact(rev)
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.scheduleCompaction(rev, keep)
|
||||
return nil
|
||||
}
|
||||
|
@ -226,6 +231,9 @@ func (s *store) Restore() error {
|
|||
}
|
||||
|
||||
func (s *store) Close() error {
|
||||
close(s.stopc)
|
||||
s.wg.Wait()
|
||||
s.b.ForceCommit()
|
||||
return s.b.Close()
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
)
|
||||
|
||||
func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]struct{}) {
|
||||
defer s.wg.Done()
|
||||
end := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
|
||||
|
||||
|
@ -37,6 +38,10 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]stru
|
|||
revToBytes(reversion{main: rev.main, sub: rev.sub + 1}, last)
|
||||
tx.Unlock()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
select {
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
case <-s.stopc:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue