From 7cba42fb733bb058d5c25e9a27ed345d9cc9f8e1 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 18 Jun 2015 11:05:31 -0700 Subject: [PATCH] storage: wait for compact goroutine to exit before close backend If backend is closed, the operations on backend in compact goroutine will panic. So this PR waits for compact goroutine to exit before close backend. This fixes the TestWorkflow failure too. --- storage/kv_test.go | 3 +++ storage/kvstore.go | 8 ++++++++ storage/kvstore_compaction.go | 7 ++++++- 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/storage/kv_test.go b/storage/kv_test.go index 45c65389e..b39934d2f 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -44,6 +44,9 @@ func TestWorkflow(t *testing.T) { if err != nil { t.Errorf("#%d: range error (%v)", err) } + if len(kvs) != len(wkvs) { + t.Fatalf("#%d: len(kvs) = %d, want %d", i, len(kvs), len(wkvs)) + } for j, kv := range kvs { if !reflect.DeepEqual(kv.Key, wkvs[j].k) { t.Errorf("#%d: keys[%d] = %s, want %s", i, j, kv.Key, wkvs[j].k) diff --git a/storage/kvstore.go b/storage/kvstore.go index 83f2b135b..2c457482b 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -39,6 +39,9 @@ type store struct { tmu sync.Mutex // protect the tnxID field tnxID int64 // tracks the current tnxID to verify tnx operations + + wg sync.WaitGroup + stopc chan struct{} } func newStore(path string) *store { @@ -47,6 +50,7 @@ func newStore(path string) *store { kvindex: newTreeIndex(), currentRev: reversion{}, compactMainRev: -1, + stopc: make(chan struct{}), } tx := s.b.BatchTx() @@ -161,6 +165,7 @@ func (s *store) Compact(rev int64) error { keep := s.kvindex.Compact(rev) + s.wg.Add(1) go s.scheduleCompaction(rev, keep) return nil } @@ -226,6 +231,9 @@ func (s *store) Restore() error { } func (s *store) Close() error { + close(s.stopc) + s.wg.Wait() + s.b.ForceCommit() return s.b.Close() } diff --git a/storage/kvstore_compaction.go b/storage/kvstore_compaction.go index 466f58321..b6b1ba80f 100644 --- a/storage/kvstore_compaction.go +++ b/storage/kvstore_compaction.go @@ -6,6 +6,7 @@ import ( ) func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]struct{}) { + defer s.wg.Done() end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(compactMainRev+1)) @@ -37,6 +38,10 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]stru revToBytes(reversion{main: rev.main, sub: rev.sub + 1}, last) tx.Unlock() - time.Sleep(100 * time.Millisecond) + select { + case <-time.After(100 * time.Millisecond): + case <-s.stopc: + return + } } }