Merge pull request #10506 from jingyih/improve_etcd_backend_readability
mvcc/backend: rename Lock() to RLock() in ReadTx interfacerelease-3.4
commit
b08e6db0e8
|
@ -336,8 +336,8 @@ func (b *backend) defrag() error {
|
||||||
defer b.mu.Unlock()
|
defer b.mu.Unlock()
|
||||||
|
|
||||||
// block concurrent read requests while resetting tx
|
// block concurrent read requests while resetting tx
|
||||||
b.readTx.mu.Lock()
|
b.readTx.Lock()
|
||||||
defer b.readTx.mu.Unlock()
|
defer b.readTx.Unlock()
|
||||||
|
|
||||||
b.batchTx.unsafeCommit(true)
|
b.batchTx.unsafeCommit(true)
|
||||||
|
|
||||||
|
|
|
@ -45,6 +45,29 @@ type batchTx struct {
|
||||||
pending int
|
pending int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *batchTx) Lock() {
|
||||||
|
t.Mutex.Lock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *batchTx) Unlock() {
|
||||||
|
if t.pending >= t.backend.batchLimit {
|
||||||
|
t.commit(false)
|
||||||
|
}
|
||||||
|
t.Mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not
|
||||||
|
// have appropriate semantics in BatchTx interface. Therefore should not be called.
|
||||||
|
// TODO: might want to decouple ReadTx and BatchTx
|
||||||
|
|
||||||
|
func (t *batchTx) RLock() {
|
||||||
|
panic("unexpected RLock")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *batchTx) RUnlock() {
|
||||||
|
panic("unexpected RUnlock")
|
||||||
|
}
|
||||||
|
|
||||||
func (t *batchTx) UnsafeCreateBucket(name []byte) {
|
func (t *batchTx) UnsafeCreateBucket(name []byte) {
|
||||||
_, err := t.tx.CreateBucket(name)
|
_, err := t.tx.CreateBucket(name)
|
||||||
if err != nil && err != bolt.ErrBucketExists {
|
if err != nil && err != bolt.ErrBucketExists {
|
||||||
|
@ -194,13 +217,6 @@ func (t *batchTx) CommitAndStop() {
|
||||||
t.Unlock()
|
t.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTx) Unlock() {
|
|
||||||
if t.pending >= t.backend.batchLimit {
|
|
||||||
t.commit(false)
|
|
||||||
}
|
|
||||||
t.Mutex.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *batchTx) safePending() int {
|
func (t *batchTx) safePending() int {
|
||||||
t.Mutex.Lock()
|
t.Mutex.Lock()
|
||||||
defer t.Mutex.Unlock()
|
defer t.Mutex.Unlock()
|
||||||
|
@ -259,9 +275,9 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered {
|
||||||
|
|
||||||
func (t *batchTxBuffered) Unlock() {
|
func (t *batchTxBuffered) Unlock() {
|
||||||
if t.pending != 0 {
|
if t.pending != 0 {
|
||||||
t.backend.readTx.mu.Lock()
|
t.backend.readTx.Lock() // blocks txReadBuffer for writing.
|
||||||
t.buf.writeback(&t.backend.readTx.buf)
|
t.buf.writeback(&t.backend.readTx.buf)
|
||||||
t.backend.readTx.mu.Unlock()
|
t.backend.readTx.Unlock()
|
||||||
if t.pending >= t.backend.batchLimit {
|
if t.pending >= t.backend.batchLimit {
|
||||||
t.commit(false)
|
t.commit(false)
|
||||||
}
|
}
|
||||||
|
@ -283,9 +299,9 @@ func (t *batchTxBuffered) CommitAndStop() {
|
||||||
|
|
||||||
func (t *batchTxBuffered) commit(stop bool) {
|
func (t *batchTxBuffered) commit(stop bool) {
|
||||||
// all read txs must be closed to acquire boltdb commit rwlock
|
// all read txs must be closed to acquire boltdb commit rwlock
|
||||||
t.backend.readTx.mu.Lock()
|
t.backend.readTx.Lock()
|
||||||
t.unsafeCommit(stop)
|
t.unsafeCommit(stop)
|
||||||
t.backend.readTx.mu.Unlock()
|
t.backend.readTx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
||||||
|
|
|
@ -30,6 +30,8 @@ var safeRangeBucket = []byte("key")
|
||||||
type ReadTx interface {
|
type ReadTx interface {
|
||||||
Lock()
|
Lock()
|
||||||
Unlock()
|
Unlock()
|
||||||
|
RLock()
|
||||||
|
RUnlock()
|
||||||
|
|
||||||
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
|
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
|
||||||
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
|
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
|
||||||
|
@ -46,8 +48,10 @@ type readTx struct {
|
||||||
buckets map[string]*bolt.Bucket
|
buckets map[string]*bolt.Bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rt *readTx) Lock() { rt.mu.RLock() }
|
func (rt *readTx) Lock() { rt.mu.Lock() }
|
||||||
func (rt *readTx) Unlock() { rt.mu.RUnlock() }
|
func (rt *readTx) Unlock() { rt.mu.Unlock() }
|
||||||
|
func (rt *readTx) RLock() { rt.mu.RLock() }
|
||||||
|
func (rt *readTx) RUnlock() { rt.mu.RUnlock() }
|
||||||
|
|
||||||
func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
||||||
if endKey == nil {
|
if endKey == nil {
|
||||||
|
|
|
@ -196,8 +196,8 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
|
||||||
keep := s.kvindex.Keep(rev)
|
keep := s.kvindex.Keep(rev)
|
||||||
|
|
||||||
tx := s.b.ReadTx()
|
tx := s.b.ReadTx()
|
||||||
tx.Lock()
|
tx.RLock()
|
||||||
defer tx.Unlock()
|
defer tx.RUnlock()
|
||||||
s.mu.RUnlock()
|
s.mu.RUnlock()
|
||||||
|
|
||||||
upper := revision{main: rev + 1}
|
upper := revision{main: rev + 1}
|
||||||
|
|
|
@ -725,6 +725,8 @@ type fakeBatchTx struct {
|
||||||
|
|
||||||
func (b *fakeBatchTx) Lock() {}
|
func (b *fakeBatchTx) Lock() {}
|
||||||
func (b *fakeBatchTx) Unlock() {}
|
func (b *fakeBatchTx) Unlock() {}
|
||||||
|
func (b *fakeBatchTx) RLock() {}
|
||||||
|
func (b *fakeBatchTx) RUnlock() {}
|
||||||
func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {}
|
func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {}
|
||||||
func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
|
func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
|
||||||
b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}})
|
b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}})
|
||||||
|
|
|
@ -33,7 +33,11 @@ func (s *store) Read() TxnRead {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
tx := s.b.ReadTx()
|
tx := s.b.ReadTx()
|
||||||
s.revMu.RLock()
|
s.revMu.RLock()
|
||||||
tx.Lock()
|
// tx.RLock() blocks txReadBuffer for reading, which could potentially block the following two operations:
|
||||||
|
// A) writeback from txWriteBuffer to txReadBuffer at the end of a write transaction (TxnWrite).
|
||||||
|
// B) starting of a new backend batch transaction, where the pending changes need to be committed to boltdb
|
||||||
|
// and txReadBuffer needs to be reset.
|
||||||
|
tx.RLock()
|
||||||
firstRev, rev := s.compactMainRev, s.currentRev
|
firstRev, rev := s.compactMainRev, s.currentRev
|
||||||
s.revMu.RUnlock()
|
s.revMu.RUnlock()
|
||||||
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
|
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
|
||||||
|
@ -47,7 +51,7 @@ func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tr *storeTxnRead) End() {
|
func (tr *storeTxnRead) End() {
|
||||||
tr.tx.Unlock()
|
tr.tx.RUnlock()
|
||||||
tr.s.mu.RUnlock()
|
tr.s.mu.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -346,7 +346,7 @@ func (s *watchableStore) syncWatchers() int {
|
||||||
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
|
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
|
||||||
// values are actual key-value pairs in backend.
|
// values are actual key-value pairs in backend.
|
||||||
tx := s.store.b.ReadTx()
|
tx := s.store.b.ReadTx()
|
||||||
tx.Lock()
|
tx.RLock()
|
||||||
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
|
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
|
||||||
var evs []mvccpb.Event
|
var evs []mvccpb.Event
|
||||||
if s.store != nil && s.store.lg != nil {
|
if s.store != nil && s.store.lg != nil {
|
||||||
|
@ -355,7 +355,7 @@ func (s *watchableStore) syncWatchers() int {
|
||||||
// TODO: remove this in v3.5
|
// TODO: remove this in v3.5
|
||||||
evs = kvsToEvents(nil, wg, revs, vs)
|
evs = kvsToEvents(nil, wg, revs, vs)
|
||||||
}
|
}
|
||||||
tx.Unlock()
|
tx.RUnlock()
|
||||||
|
|
||||||
var victims watcherBatch
|
var victims watcherBatch
|
||||||
wb := newWatcherBatch(wg, evs)
|
wb := newWatcherBatch(wg, evs)
|
||||||
|
|
Loading…
Reference in New Issue