Merge pull request #10523 from jingyih/fully_concurrent_reads
mvcc: fully concurrent readrelease-3.4
commit
2c5162af5c
|
@ -16,6 +16,7 @@ package integration
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
@ -103,22 +104,39 @@ func testMetricDbSizeDefrag(t *testing.T, name string) {
|
|||
t.Fatal(kerr)
|
||||
}
|
||||
|
||||
// Put to move PendingPages to FreePages
|
||||
if _, err = kvc.Put(context.TODO(), putreq); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
validateAfterCompactionInUse := func() error {
|
||||
// Put to move PendingPages to FreePages
|
||||
if _, err = kvc.Put(context.TODO(), putreq); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
afterCompactionInUse, err := clus.Members[0].Metric("etcd_mvcc_db_total_size_in_use_in_bytes")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
afterCompactionInUse, err := clus.Members[0].Metric("etcd_mvcc_db_total_size_in_use_in_bytes")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
aciu, err := strconv.Atoi(afterCompactionInUse)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if biu <= aciu {
|
||||
return fmt.Errorf("expected less than %d, got %d after compaction", biu, aciu)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
aciu, err := strconv.Atoi(afterCompactionInUse)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if biu <= aciu {
|
||||
t.Fatalf("expected less than %d, got %d after compaction", biu, aciu)
|
||||
|
||||
// backend rollbacks read transaction asynchronously (PR #10523),
|
||||
// which causes the result to be flaky. Retry 3 times.
|
||||
maxRetry, retry := 3, 0
|
||||
for {
|
||||
err := validateAfterCompactionInUse()
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
retry++
|
||||
if retry >= maxRetry {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// defrag should give freed space back to fs
|
||||
|
|
|
@ -49,8 +49,11 @@ var (
|
|||
)
|
||||
|
||||
type Backend interface {
|
||||
// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
|
||||
ReadTx() ReadTx
|
||||
BatchTx() BatchTx
|
||||
// ConcurrentReadTx returns a non-blocking read transaction.
|
||||
ConcurrentReadTx() ReadTx
|
||||
|
||||
Snapshot() Snapshot
|
||||
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
|
||||
|
@ -63,6 +66,8 @@ type Backend interface {
|
|||
// Since the backend can manage free space in a non-byte unit such as
|
||||
// number of pages, the returned value can be not exactly accurate in bytes.
|
||||
SizeInUse() int64
|
||||
// OpenReadTxN returns the number of currently open read transactions in the backend.
|
||||
OpenReadTxN() int64
|
||||
Defrag() error
|
||||
ForceCommit()
|
||||
Close() error
|
||||
|
@ -87,6 +92,8 @@ type backend struct {
|
|||
sizeInUse int64
|
||||
// commits counts number of commits since start
|
||||
commits int64
|
||||
// openReadTxN is the number of currently open read transactions in the backend
|
||||
openReadTxN int64
|
||||
|
||||
mu sync.RWMutex
|
||||
db *bolt.DB
|
||||
|
@ -166,6 +173,7 @@ func newBackend(bcfg BackendConfig) *backend {
|
|||
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
|
||||
},
|
||||
buckets: make(map[string]*bolt.Bucket),
|
||||
txWg: new(sync.WaitGroup),
|
||||
},
|
||||
|
||||
stopc: make(chan struct{}),
|
||||
|
@ -187,6 +195,24 @@ func (b *backend) BatchTx() BatchTx {
|
|||
|
||||
func (b *backend) ReadTx() ReadTx { return b.readTx }
|
||||
|
||||
// ConcurrentReadTx creates and returns a new ReadTx, which:
|
||||
// A) creates and keeps a copy of backend.readTx.txReadBuffer,
|
||||
// B) references the boltdb read Tx (and its bucket cache) of current batch interval.
|
||||
func (b *backend) ConcurrentReadTx() ReadTx {
|
||||
b.readTx.RLock()
|
||||
defer b.readTx.RUnlock()
|
||||
// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
|
||||
b.readTx.txWg.Add(1)
|
||||
// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
|
||||
return &concurrentReadTx{
|
||||
buf: b.readTx.buf.unsafeCopy(),
|
||||
tx: b.readTx.tx,
|
||||
txMu: &b.readTx.txMu,
|
||||
buckets: b.readTx.buckets,
|
||||
txWg: b.readTx.txWg,
|
||||
}
|
||||
}
|
||||
|
||||
// ForceCommit forces the current batching tx to commit.
|
||||
func (b *backend) ForceCommit() {
|
||||
b.batchTx.Commit()
|
||||
|
@ -491,8 +517,10 @@ func (b *backend) begin(write bool) *bolt.Tx {
|
|||
|
||||
size := tx.Size()
|
||||
db := tx.DB()
|
||||
stats := db.Stats()
|
||||
atomic.StoreInt64(&b.size, size)
|
||||
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
|
||||
atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize)))
|
||||
atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN))
|
||||
|
||||
return tx
|
||||
}
|
||||
|
@ -509,6 +537,10 @@ func (b *backend) unsafeBegin(write bool) *bolt.Tx {
|
|||
return tx
|
||||
}
|
||||
|
||||
func (b *backend) OpenReadTxN() int64 {
|
||||
return atomic.LoadInt64(&b.openReadTxN)
|
||||
}
|
||||
|
||||
// NewTmpBackend creates a backend implementation for testing.
|
||||
func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
|
||||
dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
|
||||
|
|
|
@ -250,6 +250,35 @@ func TestBackendWriteback(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestConcurrentReadTx ensures that current read transaction can see all prior writes stored in read buffer
|
||||
func TestConcurrentReadTx(t *testing.T) {
|
||||
b, tmpPath := NewTmpBackend(time.Hour, 10000)
|
||||
defer cleanup(b, tmpPath)
|
||||
|
||||
wtx1 := b.BatchTx()
|
||||
wtx1.Lock()
|
||||
wtx1.UnsafeCreateBucket([]byte("key"))
|
||||
wtx1.UnsafePut([]byte("key"), []byte("abc"), []byte("ABC"))
|
||||
wtx1.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1"))
|
||||
wtx1.Unlock()
|
||||
|
||||
wtx2 := b.BatchTx()
|
||||
wtx2.Lock()
|
||||
wtx2.UnsafePut([]byte("key"), []byte("def"), []byte("DEF"))
|
||||
wtx2.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2"))
|
||||
wtx2.Unlock()
|
||||
|
||||
rtx := b.ConcurrentReadTx()
|
||||
rtx.RLock() // no-op
|
||||
k, v := rtx.UnsafeRange([]byte("key"), []byte("abc"), []byte("\xff"), 0)
|
||||
rtx.RUnlock()
|
||||
wKey := [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")}
|
||||
wVal := [][]byte{[]byte("ABC"), []byte("DEF"), []byte("2")}
|
||||
if !reflect.DeepEqual(wKey, k) || !reflect.DeepEqual(wVal, v) {
|
||||
t.Errorf("want k=%+v, v=%+v; got k=%+v, v=%+v", wKey, wVal, k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBackendWritebackForEach checks that partially written / buffered
|
||||
// data is visited in the same order as fully committed data.
|
||||
func TestBackendWritebackForEach(t *testing.T) {
|
||||
|
|
|
@ -306,13 +306,18 @@ func (t *batchTxBuffered) commit(stop bool) {
|
|||
|
||||
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
||||
if t.backend.readTx.tx != nil {
|
||||
if err := t.backend.readTx.tx.Rollback(); err != nil {
|
||||
if t.backend.lg != nil {
|
||||
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
|
||||
} else {
|
||||
plog.Fatalf("cannot rollback tx (%s)", err)
|
||||
// wait all store read transactions using the current boltdb tx to finish,
|
||||
// then close the boltdb tx
|
||||
go func(tx *bolt.Tx, wg *sync.WaitGroup) {
|
||||
wg.Wait()
|
||||
if err := tx.Rollback(); err != nil {
|
||||
if t.backend.lg != nil {
|
||||
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
|
||||
} else {
|
||||
plog.Fatalf("cannot rollback tx (%s)", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}(t.backend.readTx.tx, t.backend.readTx.txWg)
|
||||
t.backend.readTx.reset()
|
||||
}
|
||||
|
||||
|
|
|
@ -42,10 +42,13 @@ type readTx struct {
|
|||
mu sync.RWMutex
|
||||
buf txReadBuffer
|
||||
|
||||
// txmu protects accesses to buckets and tx on Range requests.
|
||||
txmu sync.RWMutex
|
||||
// TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
|
||||
// txMu protects accesses to buckets and tx on Range requests.
|
||||
txMu sync.RWMutex
|
||||
tx *bolt.Tx
|
||||
buckets map[string]*bolt.Bucket
|
||||
// txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
|
||||
txWg *sync.WaitGroup
|
||||
}
|
||||
|
||||
func (rt *readTx) Lock() { rt.mu.Lock() }
|
||||
|
@ -71,23 +74,23 @@ func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]
|
|||
|
||||
// find/cache bucket
|
||||
bn := string(bucketName)
|
||||
rt.txmu.RLock()
|
||||
rt.txMu.RLock()
|
||||
bucket, ok := rt.buckets[bn]
|
||||
rt.txmu.RUnlock()
|
||||
rt.txMu.RUnlock()
|
||||
if !ok {
|
||||
rt.txmu.Lock()
|
||||
rt.txMu.Lock()
|
||||
bucket = rt.tx.Bucket(bucketName)
|
||||
rt.buckets[bn] = bucket
|
||||
rt.txmu.Unlock()
|
||||
rt.txMu.Unlock()
|
||||
}
|
||||
|
||||
// ignore missing bucket since may have been created in this batch
|
||||
if bucket == nil {
|
||||
return keys, vals
|
||||
}
|
||||
rt.txmu.Lock()
|
||||
rt.txMu.Lock()
|
||||
c := bucket.Cursor()
|
||||
rt.txmu.Unlock()
|
||||
rt.txMu.Unlock()
|
||||
|
||||
k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
|
||||
return append(k2, keys...), append(v2, vals...)
|
||||
|
@ -108,9 +111,9 @@ func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) err
|
|||
if err := rt.buf.ForEach(bucketName, getDups); err != nil {
|
||||
return err
|
||||
}
|
||||
rt.txmu.Lock()
|
||||
rt.txMu.Lock()
|
||||
err := unsafeForEach(rt.tx, bucketName, visitNoDup)
|
||||
rt.txmu.Unlock()
|
||||
rt.txMu.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -121,4 +124,87 @@ func (rt *readTx) reset() {
|
|||
rt.buf.reset()
|
||||
rt.buckets = make(map[string]*bolt.Bucket)
|
||||
rt.tx = nil
|
||||
rt.txWg = new(sync.WaitGroup)
|
||||
}
|
||||
|
||||
// TODO: create a base type for readTx and concurrentReadTx to avoid duplicated function implementation?
|
||||
type concurrentReadTx struct {
|
||||
buf txReadBuffer
|
||||
txMu *sync.RWMutex
|
||||
tx *bolt.Tx
|
||||
buckets map[string]*bolt.Bucket
|
||||
txWg *sync.WaitGroup
|
||||
}
|
||||
|
||||
func (rt *concurrentReadTx) Lock() {}
|
||||
func (rt *concurrentReadTx) Unlock() {}
|
||||
|
||||
// RLock is no-op. concurrentReadTx does not need to be locked after it is created.
|
||||
func (rt *concurrentReadTx) RLock() {}
|
||||
|
||||
// RUnlock signals the end of concurrentReadTx.
|
||||
func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }
|
||||
|
||||
func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
|
||||
dups := make(map[string]struct{})
|
||||
getDups := func(k, v []byte) error {
|
||||
dups[string(k)] = struct{}{}
|
||||
return nil
|
||||
}
|
||||
visitNoDup := func(k, v []byte) error {
|
||||
if _, ok := dups[string(k)]; ok {
|
||||
return nil
|
||||
}
|
||||
return visitor(k, v)
|
||||
}
|
||||
if err := rt.buf.ForEach(bucketName, getDups); err != nil {
|
||||
return err
|
||||
}
|
||||
rt.txMu.Lock()
|
||||
err := unsafeForEach(rt.tx, bucketName, visitNoDup)
|
||||
rt.txMu.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return rt.buf.ForEach(bucketName, visitor)
|
||||
}
|
||||
|
||||
func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
||||
if endKey == nil {
|
||||
// forbid duplicates for single keys
|
||||
limit = 1
|
||||
}
|
||||
if limit <= 0 {
|
||||
limit = math.MaxInt64
|
||||
}
|
||||
if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
|
||||
panic("do not use unsafeRange on non-keys bucket")
|
||||
}
|
||||
keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
|
||||
if int64(len(keys)) == limit {
|
||||
return keys, vals
|
||||
}
|
||||
|
||||
// find/cache bucket
|
||||
bn := string(bucketName)
|
||||
rt.txMu.RLock()
|
||||
bucket, ok := rt.buckets[bn]
|
||||
rt.txMu.RUnlock()
|
||||
if !ok {
|
||||
rt.txMu.Lock()
|
||||
bucket = rt.tx.Bucket(bucketName)
|
||||
rt.buckets[bn] = bucket
|
||||
rt.txMu.Unlock()
|
||||
}
|
||||
|
||||
// ignore missing bucket since may have been created in this batch
|
||||
if bucket == nil {
|
||||
return keys, vals
|
||||
}
|
||||
rt.txMu.Lock()
|
||||
c := bucket.Cursor()
|
||||
rt.txMu.Unlock()
|
||||
|
||||
k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
|
||||
return append(k2, keys...), append(v2, vals...)
|
||||
}
|
||||
|
|
|
@ -88,6 +88,19 @@ func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) er
|
|||
return nil
|
||||
}
|
||||
|
||||
// unsafeCopy returns a copy of txReadBuffer, caller should acquire backend.readTx.RLock()
|
||||
func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
|
||||
txrCopy := txReadBuffer{
|
||||
txBuffer: txBuffer{
|
||||
buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)),
|
||||
},
|
||||
}
|
||||
for bucketName, bucket := range txr.txBuffer.buckets {
|
||||
txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
|
||||
}
|
||||
return txrCopy
|
||||
}
|
||||
|
||||
type kv struct {
|
||||
key []byte
|
||||
val []byte
|
||||
|
@ -179,3 +192,12 @@ func (bb *bucketBuffer) Less(i, j int) bool {
|
|||
return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0
|
||||
}
|
||||
func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] }
|
||||
|
||||
func (bb *bucketBuffer) Copy() *bucketBuffer {
|
||||
bbCopy := bucketBuffer{
|
||||
buf: make([]kv, len(bb.buf)),
|
||||
used: bb.used,
|
||||
}
|
||||
copy(bbCopy.buf, bb.buf)
|
||||
return &bbCopy
|
||||
}
|
||||
|
|
|
@ -354,6 +354,9 @@ func (s *store) restore() error {
|
|||
reportDbTotalSizeInUseInBytesMu.Lock()
|
||||
reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
|
||||
reportDbTotalSizeInUseInBytesMu.Unlock()
|
||||
reportDbOpenReadTxNMu.Lock()
|
||||
reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) }
|
||||
reportDbOpenReadTxNMu.Unlock()
|
||||
|
||||
min, max := newRevBytes(), newRevBytes()
|
||||
revToBytes(revision{main: 1}, min)
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
package mvcc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
@ -22,6 +23,8 @@ import (
|
|||
mrand "math/rand"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -645,30 +648,173 @@ func TestTxnPut(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestTxnBlockBackendForceCommit(t *testing.T) {
|
||||
// TestConcurrentReadNotBlockingWrite ensures Read does not blocking Write after its creation
|
||||
func TestConcurrentReadNotBlockingWrite(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
||||
defer os.Remove(tmpPath)
|
||||
|
||||
txn := s.Read()
|
||||
// write something to read later
|
||||
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
||||
|
||||
// readTx simulates a long read request
|
||||
readTx1 := s.Read()
|
||||
|
||||
// write should not be blocked by reads
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
s.b.ForceCommit()
|
||||
s.Put([]byte("foo"), []byte("newBar"), lease.NoLease) // this is a write Txn
|
||||
done <- struct{}{}
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
t.Fatalf("failed to block ForceCommit")
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("write should not be blocked by read")
|
||||
}
|
||||
|
||||
txn.End()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(5 * time.Second): // wait 5 seconds for CI with slow IO
|
||||
testutil.FatalStack(t, "failed to execute ForceCommit")
|
||||
// readTx2 simulates a short read request
|
||||
readTx2 := s.Read()
|
||||
ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
|
||||
ret, err := readTx2.Range([]byte("foo"), nil, ro)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to range: %v", err)
|
||||
}
|
||||
// readTx2 should see the result of new write
|
||||
w := mvccpb.KeyValue{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("newBar"),
|
||||
CreateRevision: 2,
|
||||
ModRevision: 3,
|
||||
Version: 2,
|
||||
}
|
||||
if !reflect.DeepEqual(ret.KVs[0], w) {
|
||||
t.Fatalf("range result = %+v, want = %+v", ret.KVs[0], w)
|
||||
}
|
||||
readTx2.End()
|
||||
|
||||
ret, err = readTx1.Range([]byte("foo"), nil, ro)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to range: %v", err)
|
||||
}
|
||||
// readTx1 should not see the result of new write
|
||||
w = mvccpb.KeyValue{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
CreateRevision: 2,
|
||||
ModRevision: 2,
|
||||
Version: 1,
|
||||
}
|
||||
if !reflect.DeepEqual(ret.KVs[0], w) {
|
||||
t.Fatalf("range result = %+v, want = %+v", ret.KVs[0], w)
|
||||
}
|
||||
readTx1.End()
|
||||
}
|
||||
|
||||
// TestConcurrentReadTxAndWrite creates random concurrent Reads and Writes, and ensures Reads always see latest Writes
|
||||
func TestConcurrentReadTxAndWrite(t *testing.T) {
|
||||
var (
|
||||
numOfReads = 100
|
||||
numOfWrites = 100
|
||||
maxNumOfPutsPerWrite = 10
|
||||
committedKVs kvs // committedKVs records the key-value pairs written by the finished Write Txns
|
||||
mu sync.Mutex // mu protectes committedKVs
|
||||
)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
|
||||
defer os.Remove(tmpPath)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numOfWrites)
|
||||
for i := 0; i < numOfWrites; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time
|
||||
|
||||
tx := s.Write()
|
||||
numOfPuts := mrand.Intn(maxNumOfPutsPerWrite) + 1
|
||||
var pendingKvs kvs
|
||||
for j := 0; j < numOfPuts; j++ {
|
||||
k := []byte(strconv.Itoa(mrand.Int()))
|
||||
v := []byte(strconv.Itoa(mrand.Int()))
|
||||
tx.Put(k, v, lease.NoLease)
|
||||
pendingKvs = append(pendingKvs, kv{k, v})
|
||||
}
|
||||
// reads should not see above Puts until write is finished
|
||||
mu.Lock()
|
||||
committedKVs = merge(committedKVs, pendingKvs) // update shared data structure
|
||||
tx.End()
|
||||
mu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Add(numOfReads)
|
||||
for i := 0; i < numOfReads; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time
|
||||
|
||||
mu.Lock()
|
||||
wKVs := make(kvs, len(committedKVs))
|
||||
copy(wKVs, committedKVs)
|
||||
tx := s.Read()
|
||||
mu.Unlock()
|
||||
// get all keys in backend store, and compare with wKVs
|
||||
ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
|
||||
tx.End()
|
||||
if err != nil {
|
||||
t.Errorf("failed to range keys: %v", err)
|
||||
return
|
||||
}
|
||||
if len(wKVs) == 0 && len(ret.KVs) == 0 { // no committed KVs yet
|
||||
return
|
||||
}
|
||||
var result kvs
|
||||
for _, keyValue := range ret.KVs {
|
||||
result = append(result, kv{keyValue.Key, keyValue.Value})
|
||||
}
|
||||
if !reflect.DeepEqual(wKVs, result) {
|
||||
t.Errorf("unexpected range result") // too many key value pairs, skip printing them
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// wait until go routines finish or timeout
|
||||
doneC := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(doneC)
|
||||
}()
|
||||
select {
|
||||
case <-doneC:
|
||||
case <-time.After(5 * time.Minute):
|
||||
testutil.FatalStack(t, "timeout")
|
||||
}
|
||||
}
|
||||
|
||||
type kv struct {
|
||||
key []byte
|
||||
val []byte
|
||||
}
|
||||
|
||||
type kvs []kv
|
||||
|
||||
func (kvs kvs) Len() int { return len(kvs) }
|
||||
func (kvs kvs) Less(i, j int) bool { return bytes.Compare(kvs[i].key, kvs[j].key) < 0 }
|
||||
func (kvs kvs) Swap(i, j int) { kvs[i], kvs[j] = kvs[j], kvs[i] }
|
||||
|
||||
func merge(dst, src kvs) kvs {
|
||||
dst = append(dst, src...)
|
||||
sort.Stable(dst)
|
||||
// remove duplicates, using only the newest value
|
||||
// ref: tx_buffer.go
|
||||
widx := 0
|
||||
for ridx := 1; ridx < len(dst); ridx++ {
|
||||
if !bytes.Equal(dst[widx].key, dst[ridx].key) {
|
||||
widx++
|
||||
}
|
||||
dst[widx] = dst[ridx]
|
||||
}
|
||||
return dst[:widx+1]
|
||||
}
|
||||
|
||||
// TODO: test attach key to lessor
|
||||
|
@ -754,9 +900,11 @@ type fakeBackend struct {
|
|||
|
||||
func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
|
||||
func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx }
|
||||
func (b *fakeBackend) ConcurrentReadTx() backend.ReadTx { return b.tx }
|
||||
func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
|
||||
func (b *fakeBackend) Size() int64 { return 0 }
|
||||
func (b *fakeBackend) SizeInUse() int64 { return 0 }
|
||||
func (b *fakeBackend) OpenReadTxN() int64 { return 0 }
|
||||
func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
|
||||
func (b *fakeBackend) ForceCommit() {}
|
||||
func (b *fakeBackend) Defrag() error { return nil }
|
||||
|
|
|
@ -31,13 +31,11 @@ type storeTxnRead struct {
|
|||
|
||||
func (s *store) Read() TxnRead {
|
||||
s.mu.RLock()
|
||||
tx := s.b.ReadTx()
|
||||
s.revMu.RLock()
|
||||
// tx.RLock() blocks txReadBuffer for reading, which could potentially block the following two operations:
|
||||
// A) writeback from txWriteBuffer to txReadBuffer at the end of a write transaction (TxnWrite).
|
||||
// B) starting of a new backend batch transaction, where the pending changes need to be committed to boltdb
|
||||
// and txReadBuffer needs to be reset.
|
||||
tx.RLock()
|
||||
// backend holds b.readTx.RLock() only when creating the concurrentReadTx. After
|
||||
// ConcurrentReadTx is created, it will not block write transaction.
|
||||
tx := s.b.ConcurrentReadTx()
|
||||
tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
|
||||
firstRev, rev := s.compactMainRev, s.currentRev
|
||||
s.revMu.RUnlock()
|
||||
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
|
||||
|
@ -51,7 +49,7 @@ func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult,
|
|||
}
|
||||
|
||||
func (tr *storeTxnRead) End() {
|
||||
tr.tx.RUnlock()
|
||||
tr.tx.RUnlock() // RUnlock signals the end of concurrentReadTx.
|
||||
tr.s.mu.RUnlock()
|
||||
}
|
||||
|
||||
|
|
|
@ -194,6 +194,23 @@ var (
|
|||
reportDbTotalSizeInUseInBytesMu sync.RWMutex
|
||||
reportDbTotalSizeInUseInBytes = func() float64 { return 0 }
|
||||
|
||||
dbOpenReadTxN = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "mvcc",
|
||||
Name: "db_open_read_transactions",
|
||||
Help: "The number of currently open read transactions",
|
||||
},
|
||||
|
||||
func() float64 {
|
||||
reportDbOpenReadTxNMu.RLock()
|
||||
defer reportDbOpenReadTxNMu.RUnlock()
|
||||
return reportDbOpenReadTxN()
|
||||
},
|
||||
)
|
||||
// overridden by mvcc initialization
|
||||
reportDbOpenReadTxNMu sync.RWMutex
|
||||
reportDbOpenReadTxN = func() float64 { return 0 }
|
||||
|
||||
hashSec = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "mvcc",
|
||||
|
@ -237,6 +254,7 @@ func init() {
|
|||
prometheus.MustRegister(dbTotalSize)
|
||||
prometheus.MustRegister(dbTotalSizeDebugging)
|
||||
prometheus.MustRegister(dbTotalSizeInUse)
|
||||
prometheus.MustRegister(dbOpenReadTxN)
|
||||
prometheus.MustRegister(hashSec)
|
||||
prometheus.MustRegister(hashRevSec)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue