etcd/mvcc/backend/batch_tx.go

193 lines
4.8 KiB
Go
Raw Normal View History

2016-05-13 06:50:33 +03:00
// Copyright 2015 The etcd Authors
2015-09-15 23:54:11 +03:00
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package backend
import (
"bytes"
"log"
"sync"
2015-10-06 02:15:44 +03:00
"sync/atomic"
"time"
2016-03-23 03:10:28 +03:00
"github.com/boltdb/bolt"
)
type BatchTx interface {
Lock()
Unlock()
UnsafeCreateBucket(name []byte)
UnsafePut(bucketName []byte, key []byte, value []byte)
UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
2015-05-31 18:59:31 +03:00
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeDelete(bucketName []byte, key []byte)
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
2015-05-21 03:32:53 +03:00
Commit()
CommitAndStop()
}
type batchTx struct {
2015-05-21 03:32:53 +03:00
sync.Mutex
tx *bolt.Tx
backend *backend
pending int
}
func newBatchTx(backend *backend) *batchTx {
tx := &batchTx{backend: backend}
tx.Commit()
return tx
}
func (t *batchTx) UnsafeCreateBucket(name []byte) {
_, err := t.tx.CreateBucket(name)
if err != nil && err != bolt.ErrBucketExists {
2016-05-03 20:59:57 +03:00
log.Fatalf("mvcc: cannot create bucket %s (%v)", name, err)
}
t.pending++
}
2016-02-21 16:05:03 +03:00
// UnsafePut must be called holding the lock on the tx.
func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
t.unsafePut(bucketName, key, value, false)
}
// UnsafeSeqPut must be called holding the lock on the tx.
func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
t.unsafePut(bucketName, key, value, true)
}
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
2016-05-03 20:59:57 +03:00
log.Fatalf("mvcc: bucket %s does not exist", bucketName)
}
if seq {
// it is useful to increase fill percent when the workloads are mostly append-only.
// this can delay the page split and reduce space usage.
bucket.FillPercent = 0.9
}
if err := bucket.Put(key, value); err != nil {
2016-04-25 22:32:58 +03:00
log.Fatalf("mvcc: cannot put key into bucket (%v)", err)
}
t.pending++
}
2016-02-21 16:05:03 +03:00
// UnsafeRange must be called holding the lock on the tx.
2015-05-31 18:59:31 +03:00
func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
2016-05-03 20:59:57 +03:00
log.Fatalf("mvcc: bucket %s does not exist", bucketName)
}
if len(endKey) == 0 {
if v := bucket.Get(key); v == nil {
2015-05-31 18:59:31 +03:00
return keys, vs
} else {
2015-05-31 18:59:31 +03:00
return append(keys, key), append(vs, v)
}
}
c := bucket.Cursor()
for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
vs = append(vs, cv)
2015-05-31 18:59:31 +03:00
keys = append(keys, ck)
if limit > 0 && limit == int64(len(keys)) {
break
}
}
2015-05-31 18:59:31 +03:00
return keys, vs
}
2016-02-21 16:05:03 +03:00
// UnsafeDelete must be called holding the lock on the tx.
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
2016-05-03 20:59:57 +03:00
log.Fatalf("mvcc: bucket %s does not exist", bucketName)
}
err := bucket.Delete(key)
if err != nil {
2016-04-25 22:32:58 +03:00
log.Fatalf("mvcc: cannot delete key from bucket (%v)", err)
}
t.pending++
}
2015-05-21 03:32:53 +03:00
// UnsafeForEach must be called holding the lock on the tx.
func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
b := t.tx.Bucket(bucketName)
if b == nil {
// bucket does not exist
return nil
}
return b.ForEach(visitor)
}
// Commit commits a previous tx and begins a new writable one.
2015-05-21 03:32:53 +03:00
func (t *batchTx) Commit() {
t.Lock()
defer t.Unlock()
t.commit(false)
2015-05-22 23:58:26 +03:00
}
2015-05-21 03:32:53 +03:00
// CommitAndStop commits the previous tx and do not create a new one.
func (t *batchTx) CommitAndStop() {
t.Lock()
defer t.Unlock()
t.commit(true)
}
func (t *batchTx) Unlock() {
if t.pending >= t.backend.batchLimit {
t.commit(false)
t.pending = 0
}
t.Mutex.Unlock()
}
func (t *batchTx) commit(stop bool) {
2015-05-21 03:32:53 +03:00
var err error
// commit the last tx
if t.tx != nil {
if t.pending == 0 && !stop {
t.backend.mu.RLock()
defer t.backend.mu.RUnlock()
2016-04-20 19:49:28 +03:00
atomic.StoreInt64(&t.backend.size, t.tx.Size())
return
}
start := time.Now()
2015-05-21 03:32:53 +03:00
err = t.tx.Commit()
commitDurations.Observe(time.Since(start).Seconds())
2016-01-14 08:57:40 +03:00
atomic.AddInt64(&t.backend.commits, 1)
t.pending = 0
2015-05-21 03:32:53 +03:00
if err != nil {
2016-04-25 22:32:58 +03:00
log.Fatalf("mvcc: cannot commit tx (%s)", err)
2015-05-21 03:32:53 +03:00
}
}
if stop {
return
}
2016-03-02 23:00:32 +03:00
t.backend.mu.RLock()
defer t.backend.mu.RUnlock()
2015-05-21 03:32:53 +03:00
// begin a new tx
t.tx, err = t.backend.db.Begin(true)
if err != nil {
2016-04-25 22:32:58 +03:00
log.Fatalf("mvcc: cannot begin tx (%s)", err)
2015-05-21 03:32:53 +03:00
}
2015-10-06 02:15:44 +03:00
atomic.StoreInt64(&t.backend.size, t.tx.Size())
2015-05-21 03:32:53 +03:00
}