etcd/mvcc/backend/batch_tx.go

271 lines
6.8 KiB
Go
Raw Normal View History

2016-05-13 06:50:33 +03:00
// Copyright 2015 The etcd Authors
2015-09-15 23:54:11 +03:00
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package backend
import (
"bytes"
"fmt"
"math"
"sync"
2015-10-06 02:15:44 +03:00
"sync/atomic"
"time"
2016-03-23 03:10:28 +03:00
"github.com/boltdb/bolt"
)
type BatchTx interface {
ReadTx
UnsafeCreateBucket(name []byte)
UnsafePut(bucketName []byte, key []byte, value []byte)
UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
UnsafeDelete(bucketName []byte, key []byte)
// Commit commits a previous tx and begins a new writable one.
2015-05-21 03:32:53 +03:00
Commit()
// CommitAndStop commits the previous tx and does not create a new one.
CommitAndStop()
}
type batchTx struct {
2015-05-21 03:32:53 +03:00
sync.Mutex
tx *bolt.Tx
backend *backend
pending int
}
func (t *batchTx) UnsafeCreateBucket(name []byte) {
_, err := t.tx.CreateBucket(name)
if err != nil && err != bolt.ErrBucketExists {
2016-05-21 08:30:50 +03:00
plog.Fatalf("cannot create bucket %s (%v)", name, err)
}
t.pending++
}
2016-02-21 16:05:03 +03:00
// UnsafePut must be called holding the lock on the tx.
func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
t.unsafePut(bucketName, key, value, false)
}
// UnsafeSeqPut must be called holding the lock on the tx.
func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
t.unsafePut(bucketName, key, value, true)
}
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
2016-05-21 08:30:50 +03:00
plog.Fatalf("bucket %s does not exist", bucketName)
}
if seq {
// it is useful to increase fill percent when the workloads are mostly append-only.
// this can delay the page split and reduce space usage.
bucket.FillPercent = 0.9
}
if err := bucket.Put(key, value); err != nil {
2016-05-21 08:30:50 +03:00
plog.Fatalf("cannot put key into bucket (%v)", err)
}
t.pending++
}
2016-02-21 16:05:03 +03:00
// UnsafeRange must be called holding the lock on the tx.
func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
k, v, err := unsafeRange(t.tx, bucketName, key, endKey, limit)
if err != nil {
plog.Fatal(err)
}
return k, v
}
func unsafeRange(tx *bolt.Tx, bucketName, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte, err error) {
bucket := tx.Bucket(bucketName)
if bucket == nil {
return nil, nil, fmt.Errorf("bucket %s does not exist", bucketName)
}
if len(endKey) == 0 {
if v := bucket.Get(key); v != nil {
return append(keys, key), append(vs, v), nil
}
return nil, nil, nil
}
if limit <= 0 {
limit = math.MaxInt64
}
c := bucket.Cursor()
for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
vs = append(vs, cv)
2015-05-31 18:59:31 +03:00
keys = append(keys, ck)
if limit == int64(len(keys)) {
break
}
}
return keys, vs, nil
}
2016-02-21 16:05:03 +03:00
// UnsafeDelete must be called holding the lock on the tx.
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
2016-05-21 08:30:50 +03:00
plog.Fatalf("bucket %s does not exist", bucketName)
}
err := bucket.Delete(key)
if err != nil {
2016-05-21 08:30:50 +03:00
plog.Fatalf("cannot delete key from bucket (%v)", err)
}
t.pending++
}
2015-05-21 03:32:53 +03:00
// UnsafeForEach must be called holding the lock on the tx.
func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
return unsafeForEach(t.tx, bucketName, visitor)
}
func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error {
if b := tx.Bucket(bucket); b != nil {
return b.ForEach(visitor)
}
return nil
}
// Commit commits a previous tx and begins a new writable one.
2015-05-21 03:32:53 +03:00
func (t *batchTx) Commit() {
t.Lock()
defer t.Unlock()
t.commit(false)
2015-05-22 23:58:26 +03:00
}
2015-05-21 03:32:53 +03:00
// CommitAndStop commits the previous tx and does not create a new one.
func (t *batchTx) CommitAndStop() {
t.Lock()
defer t.Unlock()
t.commit(true)
}
func (t *batchTx) Unlock() {
if t.pending >= t.backend.batchLimit {
t.commit(false)
}
t.Mutex.Unlock()
}
func (t *batchTx) commit(stop bool) {
2015-05-21 03:32:53 +03:00
// commit the last tx
if t.tx != nil {
if t.pending == 0 && !stop {
t.backend.mu.RLock()
defer t.backend.mu.RUnlock()
// batchTx.commit(true) calls *bolt.Tx.Commit, which
// initializes *bolt.Tx.db and *bolt.Tx.meta as nil,
// and subsequent *bolt.Tx.Size() call panics.
//
// This nil pointer reference panic happens when:
// 1. batchTx.commit(false) from newBatchTx
// 2. batchTx.commit(true) from stopping backend
// 3. batchTx.commit(false) from inflight mvcc Hash call
//
// Check if db is nil to prevent this panic
if t.tx.DB() != nil {
atomic.StoreInt64(&t.backend.size, t.tx.Size())
}
return
}
start := time.Now()
// gofail: var beforeCommit struct{}
err := t.tx.Commit()
// gofail: var afterCommit struct{}
commitDurations.Observe(time.Since(start).Seconds())
2016-01-14 08:57:40 +03:00
atomic.AddInt64(&t.backend.commits, 1)
t.pending = 0
2015-05-21 03:32:53 +03:00
if err != nil {
2016-05-21 08:30:50 +03:00
plog.Fatalf("cannot commit tx (%s)", err)
2015-05-21 03:32:53 +03:00
}
}
if !stop {
t.tx = t.backend.begin(true)
}
}
type batchTxBuffered struct {
batchTx
buf txWriteBuffer
}
2015-05-21 03:32:53 +03:00
func newBatchTxBuffered(backend *backend) *batchTxBuffered {
tx := &batchTxBuffered{
batchTx: batchTx{backend: backend},
buf: txWriteBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
seq: true,
},
}
tx.Commit()
return tx
}
func (t *batchTxBuffered) Unlock() {
if t.pending != 0 {
t.backend.readTx.mu.Lock()
t.buf.writeback(&t.backend.readTx.buf)
t.backend.readTx.mu.Unlock()
if t.pending >= t.backend.batchLimit {
t.commit(false)
}
}
t.batchTx.Unlock()
}
func (t *batchTxBuffered) Commit() {
t.Lock()
defer t.Unlock()
t.commit(false)
}
func (t *batchTxBuffered) CommitAndStop() {
t.Lock()
defer t.Unlock()
t.commit(true)
}
func (t *batchTxBuffered) commit(stop bool) {
// all read txs must be closed to acquire boltdb commit rwlock
t.backend.readTx.mu.Lock()
defer t.backend.readTx.mu.Unlock()
if t.backend.readTx.tx != nil {
if err := t.backend.readTx.tx.Rollback(); err != nil {
plog.Fatalf("cannot rollback tx (%s)", err)
}
t.backend.readTx.buf.reset()
t.backend.readTx.tx = nil
}
t.batchTx.commit(stop)
if !stop {
t.backend.readTx.tx = t.backend.begin(false)
2015-05-21 03:32:53 +03:00
}
}
func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
t.batchTx.UnsafePut(bucketName, key, value)
t.buf.put(bucketName, key, value)
}
func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
t.batchTx.UnsafeSeqPut(bucketName, key, value)
t.buf.putSeq(bucketName, key, value)
2015-05-21 03:32:53 +03:00
}