diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index dd8f1edb6..8d20db4b9 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -65,6 +65,8 @@ type Backend interface { // Since the backend can manage free space in a non-byte unit such as // number of pages, the returned value can be not exactly accurate in bytes. SizeInUse() int64 + // OpenReadTxN returns the number of currently open read transactions in the backend. + OpenReadTxN() int64 Defrag() error ForceCommit() Close() error @@ -89,6 +91,8 @@ type backend struct { sizeInUse int64 // commits counts number of commits since start commits int64 + // openReadTxN is the number of currently open read transactions in the backend + openReadTxN int64 mu sync.RWMutex db *bolt.DB @@ -198,6 +202,7 @@ func (b *backend) ConcurrentReadTx() ReadTx { defer b.readTx.RUnlock() // prevent boltdb read Tx from been rolled back until store read Tx is done. b.readTx.txWg.Add(1) + // TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval. return &concurrentReadTx{ buf: b.readTx.buf.unsafeCopy(), tx: b.readTx.tx, @@ -513,6 +518,7 @@ func (b *backend) begin(write bool) *bolt.Tx { db := tx.DB() atomic.StoreInt64(&b.size, size) atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize))) + atomic.StoreInt64(&b.openReadTxN, int64(db.Stats().OpenTxN)) return tx } @@ -529,6 +535,10 @@ func (b *backend) unsafeBegin(write bool) *bolt.Tx { return tx } +func (b *backend) OpenReadTxN() int64 { + return atomic.LoadInt64(&b.openReadTxN) +} + // NewTmpBackend creates a backend implementation for testing. func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) { dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test") diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 187f94029..bc6b895ed 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -354,6 +354,9 @@ func (s *store) restore() error { reportDbTotalSizeInUseInBytesMu.Lock() reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) } reportDbTotalSizeInUseInBytesMu.Unlock() + reportDbOpenReadTxNMu.Lock() + reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) } + reportDbOpenReadTxNMu.Unlock() min, max := newRevBytes(), newRevBytes() revToBytes(revision{main: 1}, min) diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index 7a5a6c4f4..27d72b63d 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -793,6 +793,7 @@ func (b *fakeBackend) ConcurrentReadTx() backend.ReadTx func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil } func (b *fakeBackend) Size() int64 { return 0 } func (b *fakeBackend) SizeInUse() int64 { return 0 } +func (b *fakeBackend) OpenReadTxN() int64 { return 0 } func (b *fakeBackend) Snapshot() backend.Snapshot { return nil } func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) Defrag() error { return nil } diff --git a/mvcc/metrics.go b/mvcc/metrics.go index 9163cc7c6..4f3c49aef 100644 --- a/mvcc/metrics.go +++ b/mvcc/metrics.go @@ -194,6 +194,23 @@ var ( reportDbTotalSizeInUseInBytesMu sync.RWMutex reportDbTotalSizeInUseInBytes = func() float64 { return 0 } + dbOpenReadTxN = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "mvcc", + Name: "db_open_read_transactions", + Help: "The number of currently open read transactions", + }, + + func() float64 { + reportDbOpenReadTxNMu.RLock() + defer reportDbOpenReadTxNMu.RUnlock() + return reportDbOpenReadTxN() + }, + ) + // overridden by mvcc initialization + reportDbOpenReadTxNMu sync.RWMutex + reportDbOpenReadTxN = func() float64 { return 0 } + hashSec = prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "etcd", Subsystem: "mvcc", @@ -237,6 +254,7 @@ func init() { prometheus.MustRegister(dbTotalSize) prometheus.MustRegister(dbTotalSizeDebugging) prometheus.MustRegister(dbTotalSizeInUse) + prometheus.MustRegister(dbOpenReadTxN) prometheus.MustRegister(hashSec) prometheus.MustRegister(hashRevSec) }