diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index 3bf96afda..0d3cd87ec 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -336,8 +336,8 @@ func (b *backend) defrag() error { defer b.mu.Unlock() // block concurrent read requests while resetting tx - b.readTx.mu.Lock() - defer b.readTx.mu.Unlock() + b.readTx.Lock() + defer b.readTx.Unlock() b.batchTx.unsafeCommit(true) diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index 8da0f1f51..77d0648b8 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -45,6 +45,29 @@ type batchTx struct { pending int } +func (t *batchTx) Lock() { + t.Mutex.Lock() +} + +func (t *batchTx) Unlock() { + if t.pending >= t.backend.batchLimit { + t.commit(false) + } + t.Mutex.Unlock() +} + +// BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not +// have appropriate semantics in BatchTx interface. Therefore should not be called. +// TODO: might want to decouple ReadTx and BatchTx + +func (t *batchTx) RLock() { + panic("unexpected RLock") +} + +func (t *batchTx) RUnlock() { + panic("unexpected RUnlock") +} + func (t *batchTx) UnsafeCreateBucket(name []byte) { _, err := t.tx.CreateBucket(name) if err != nil && err != bolt.ErrBucketExists { @@ -194,13 +217,6 @@ func (t *batchTx) CommitAndStop() { t.Unlock() } -func (t *batchTx) Unlock() { - if t.pending >= t.backend.batchLimit { - t.commit(false) - } - t.Mutex.Unlock() -} - func (t *batchTx) safePending() int { t.Mutex.Lock() defer t.Mutex.Unlock() @@ -259,9 +275,9 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered { func (t *batchTxBuffered) Unlock() { if t.pending != 0 { - t.backend.readTx.mu.Lock() + t.backend.readTx.Lock() // blocks txReadBuffer for writing. t.buf.writeback(&t.backend.readTx.buf) - t.backend.readTx.mu.Unlock() + t.backend.readTx.Unlock() if t.pending >= t.backend.batchLimit { t.commit(false) } @@ -283,9 +299,9 @@ func (t *batchTxBuffered) CommitAndStop() { func (t *batchTxBuffered) commit(stop bool) { // all read txs must be closed to acquire boltdb commit rwlock - t.backend.readTx.mu.Lock() + t.backend.readTx.Lock() t.unsafeCommit(stop) - t.backend.readTx.mu.Unlock() + t.backend.readTx.Unlock() } func (t *batchTxBuffered) unsafeCommit(stop bool) { diff --git a/mvcc/backend/read_tx.go b/mvcc/backend/read_tx.go index 5960aec33..7b8d855eb 100644 --- a/mvcc/backend/read_tx.go +++ b/mvcc/backend/read_tx.go @@ -30,6 +30,8 @@ var safeRangeBucket = []byte("key") type ReadTx interface { Lock() Unlock() + RLock() + RUnlock() UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error @@ -46,8 +48,10 @@ type readTx struct { buckets map[string]*bolt.Bucket } -func (rt *readTx) Lock() { rt.mu.RLock() } -func (rt *readTx) Unlock() { rt.mu.RUnlock() } +func (rt *readTx) Lock() { rt.mu.Lock() } +func (rt *readTx) Unlock() { rt.mu.Unlock() } +func (rt *readTx) RLock() { rt.mu.RLock() } +func (rt *readTx) RUnlock() { rt.mu.RUnlock() } func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { if endKey == nil { diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 4ebc27490..187f94029 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -196,8 +196,8 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev keep := s.kvindex.Keep(rev) tx := s.b.ReadTx() - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() s.mu.RUnlock() upper := revision{main: rev + 1} diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index ef48464d6..911839699 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -725,6 +725,8 @@ type fakeBatchTx struct { func (b *fakeBatchTx) Lock() {} func (b *fakeBatchTx) Unlock() {} +func (b *fakeBatchTx) RLock() {} +func (b *fakeBatchTx) RUnlock() {} func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {} func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}}) diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go index e56c679c9..088ea7341 100644 --- a/mvcc/kvstore_txn.go +++ b/mvcc/kvstore_txn.go @@ -33,7 +33,11 @@ func (s *store) Read() TxnRead { s.mu.RLock() tx := s.b.ReadTx() s.revMu.RLock() - tx.Lock() + // tx.RLock() blocks txReadBuffer for reading, which could potentially block the following two operations: + // A) writeback from txWriteBuffer to txReadBuffer at the end of a write transaction (TxnWrite). + // B) starting of a new backend batch transaction, where the pending changes need to be committed to boltdb + // and txReadBuffer needs to be reset. + tx.RLock() firstRev, rev := s.compactMainRev, s.currentRev s.revMu.RUnlock() return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev}) @@ -47,7 +51,7 @@ func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, } func (tr *storeTxnRead) End() { - tr.tx.Unlock() + tr.tx.RUnlock() tr.s.mu.RUnlock() } diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 7ac0df6ba..46a9af5ed 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -346,7 +346,7 @@ func (s *watchableStore) syncWatchers() int { // UnsafeRange returns keys and values. And in boltdb, keys are revisions. // values are actual key-value pairs in backend. tx := s.store.b.ReadTx() - tx.Lock() + tx.RLock() revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) var evs []mvccpb.Event if s.store != nil && s.store.lg != nil { @@ -355,7 +355,7 @@ func (s *watchableStore) syncWatchers() int { // TODO: remove this in v3.5 evs = kvsToEvents(nil, wg, revs, vs) } - tx.Unlock() + tx.RUnlock() var victims watcherBatch wb := newWatcherBatch(wg, evs)