// Copyright 2017 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package backend import ( "math" "sync" bolt "go.etcd.io/bbolt" ) // IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys; // overwrites on a bucket should only fetch with limit=1, but IsSafeRangeBucket // is known to never overwrite any key so range is safe. type ReadTx interface { Lock() Unlock() RLock() RUnlock() UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error } // Base type for readTx and concurrentReadTx to eliminate duplicate functions between these type baseReadTx struct { // mu protects accesses to the txReadBuffer mu sync.RWMutex buf txReadBuffer // TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle. // txMu protects accesses to buckets and tx on Range requests. txMu *sync.RWMutex tx *bolt.Tx buckets map[BucketID]*bolt.Bucket // txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done. txWg *sync.WaitGroup } func (baseReadTx *baseReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error { dups := make(map[string]struct{}) getDups := func(k, v []byte) error { dups[string(k)] = struct{}{} return nil } visitNoDup := func(k, v []byte) error { if _, ok := dups[string(k)]; ok { return nil } return visitor(k, v) } if err := baseReadTx.buf.ForEach(bucket, getDups); err != nil { return err } baseReadTx.txMu.Lock() err := unsafeForEach(baseReadTx.tx, bucket, visitNoDup) baseReadTx.txMu.Unlock() if err != nil { return err } return baseReadTx.buf.ForEach(bucket, visitor) } func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) { if endKey == nil { // forbid duplicates for single keys limit = 1 } if limit <= 0 { limit = math.MaxInt64 } if limit > 1 && !bucketType.IsSafeRangeBucket() { panic("do not use unsafeRange on non-keys bucket") } keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit) if int64(len(keys)) == limit { return keys, vals } // find/cache bucket bn := bucketType.ID() baseReadTx.txMu.RLock() bucket, ok := baseReadTx.buckets[bn] baseReadTx.txMu.RUnlock() lockHeld := false if !ok { baseReadTx.txMu.Lock() lockHeld = true bucket = baseReadTx.tx.Bucket(bucketType.Name()) baseReadTx.buckets[bn] = bucket } // ignore missing bucket since may have been created in this batch if bucket == nil { if lockHeld { baseReadTx.txMu.Unlock() } return keys, vals } if !lockHeld { baseReadTx.txMu.Lock() } c := bucket.Cursor() baseReadTx.txMu.Unlock() k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys))) return append(k2, keys...), append(v2, vals...) } type readTx struct { baseReadTx } func (rt *readTx) Lock() { rt.mu.Lock() } func (rt *readTx) Unlock() { rt.mu.Unlock() } func (rt *readTx) RLock() { rt.mu.RLock() } func (rt *readTx) RUnlock() { rt.mu.RUnlock() } func (rt *readTx) reset() { rt.buf.reset() rt.buckets = make(map[BucketID]*bolt.Bucket) rt.tx = nil rt.txWg = new(sync.WaitGroup) } type concurrentReadTx struct { baseReadTx } func (rt *concurrentReadTx) Lock() {} func (rt *concurrentReadTx) Unlock() {} // RLock is no-op. concurrentReadTx does not need to be locked after it is created. func (rt *concurrentReadTx) RLock() {} // RUnlock signals the end of concurrentReadTx. func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }