Merge pull request #7872 from heyitsanthony/break-boltdb-lock-readtx
backend: don't hold boltdb read txn lock on cursor scanningrelease-3.3
commit
3e1eb1a2e7
|
@ -45,6 +45,13 @@ type batchTx struct {
|
||||||
pending int
|
pending int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var nopLock sync.Locker = &nopLocker{}
|
||||||
|
|
||||||
|
type nopLocker struct{}
|
||||||
|
|
||||||
|
func (*nopLocker) Lock() {}
|
||||||
|
func (*nopLocker) Unlock() {}
|
||||||
|
|
||||||
func (t *batchTx) UnsafeCreateBucket(name []byte) {
|
func (t *batchTx) UnsafeCreateBucket(name []byte) {
|
||||||
_, err := t.tx.CreateBucket(name)
|
_, err := t.tx.CreateBucket(name)
|
||||||
if err != nil && err != bolt.ErrBucketExists {
|
if err != nil && err != bolt.ErrBucketExists {
|
||||||
|
@ -81,28 +88,34 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
|
||||||
|
|
||||||
// UnsafeRange must be called holding the lock on the tx.
|
// UnsafeRange must be called holding the lock on the tx.
|
||||||
func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
||||||
k, v, err := unsafeRange(t.tx, bucketName, key, endKey, limit)
|
// nop lock since a write txn should already hold a lock over t.tx
|
||||||
|
k, v, err := unsafeRange(t.tx, bucketName, key, endKey, limit, nopLock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Fatal(err)
|
plog.Fatal(err)
|
||||||
}
|
}
|
||||||
return k, v
|
return k, v
|
||||||
}
|
}
|
||||||
|
|
||||||
func unsafeRange(tx *bolt.Tx, bucketName, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte, err error) {
|
func unsafeRange(tx *bolt.Tx, bucketName, key, endKey []byte, limit int64, l sync.Locker) (keys [][]byte, vs [][]byte, err error) {
|
||||||
|
l.Lock()
|
||||||
bucket := tx.Bucket(bucketName)
|
bucket := tx.Bucket(bucketName)
|
||||||
if bucket == nil {
|
if bucket == nil {
|
||||||
|
l.Unlock()
|
||||||
return nil, nil, fmt.Errorf("bucket %s does not exist", bucketName)
|
return nil, nil, fmt.Errorf("bucket %s does not exist", bucketName)
|
||||||
}
|
}
|
||||||
if len(endKey) == 0 {
|
if len(endKey) == 0 {
|
||||||
if v := bucket.Get(key); v != nil {
|
v := bucket.Get(key)
|
||||||
|
l.Unlock()
|
||||||
|
if v != nil {
|
||||||
return append(keys, key), append(vs, v), nil
|
return append(keys, key), append(vs, v), nil
|
||||||
}
|
}
|
||||||
return nil, nil, nil
|
return nil, nil, nil
|
||||||
}
|
}
|
||||||
|
c := bucket.Cursor()
|
||||||
|
l.Unlock()
|
||||||
if limit <= 0 {
|
if limit <= 0 {
|
||||||
limit = math.MaxInt64
|
limit = math.MaxInt64
|
||||||
}
|
}
|
||||||
c := bucket.Cursor()
|
|
||||||
for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
|
for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
|
||||||
vs = append(vs, cv)
|
vs = append(vs, cv)
|
||||||
keys = append(keys, ck)
|
keys = append(keys, ck)
|
||||||
|
|
|
@ -63,10 +63,8 @@ func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]
|
||||||
if int64(len(keys)) == limit {
|
if int64(len(keys)) == limit {
|
||||||
return keys, vals
|
return keys, vals
|
||||||
}
|
}
|
||||||
rt.txmu.Lock()
|
|
||||||
// ignore error since bucket may have been created in this batch
|
// ignore error since bucket may have been created in this batch
|
||||||
k2, v2, _ := unsafeRange(rt.tx, bucketName, key, endKey, limit-int64(len(keys)))
|
k2, v2, _ := unsafeRange(rt.tx, bucketName, key, endKey, limit-int64(len(keys)), &rt.txmu)
|
||||||
rt.txmu.Unlock()
|
|
||||||
return append(k2, keys...), append(v2, vals...)
|
return append(k2, keys...), append(v2, vals...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue