2016-05-13 06:50:33 +03:00
// Copyright 2015 The etcd Authors
2015-09-15 23:54:11 +03:00
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2015-05-15 06:25:52 +03:00
package backend
import (
2015-09-23 21:01:27 +03:00
"fmt"
"hash/crc32"
2015-06-08 19:26:56 +03:00
"io"
2016-01-06 06:45:18 +03:00
"io/ioutil"
"os"
2017-03-16 05:31:10 +03:00
"path/filepath"
2016-03-02 23:00:32 +03:00
"sync"
2015-10-06 02:15:44 +03:00
"sync/atomic"
2015-05-15 06:25:52 +03:00
"time"
2017-07-06 00:32:13 +03:00
bolt "github.com/coreos/bbolt"
2016-05-21 08:30:50 +03:00
"github.com/coreos/pkg/capnslog"
2015-05-15 06:25:52 +03:00
)
2016-01-06 06:45:18 +03:00
var (
defaultBatchLimit = 10000
defaultBatchInterval = 100 * time . Millisecond
2016-02-05 21:57:51 +03:00
2016-03-02 23:00:32 +03:00
defragLimit = 10000
2017-03-17 06:17:27 +03:00
// initialMmapSize is the initial size of the mmapped region. Setting this larger than
2016-02-05 21:57:51 +03:00
// the potential max db size can prevent writer from blocking reader.
// This only works for linux.
2017-03-17 06:17:27 +03:00
initialMmapSize = uint64 ( 10 * 1024 * 1024 * 1024 )
2016-05-21 08:30:50 +03:00
2016-10-03 12:03:22 +03:00
plog = capnslog . NewPackageLogger ( "github.com/coreos/etcd" , "mvcc/backend" )
2017-05-05 01:00:04 +03:00
2017-05-11 20:51:19 +03:00
// minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning.
minSnapshotWarningTimeout = time . Duration ( 30 * time . Second )
2016-01-06 06:45:18 +03:00
)
2015-05-15 06:25:52 +03:00
type Backend interface {
2017-01-05 13:07:50 +03:00
ReadTx ( ) ReadTx
2015-05-15 06:25:52 +03:00
BatchTx ( ) BatchTx
2017-01-05 13:07:50 +03:00
2015-09-17 02:26:46 +03:00
Snapshot ( ) Snapshot
2016-06-28 18:49:44 +03:00
Hash ( ignores map [ IgnoreKey ] struct { } ) ( uint32 , error )
2015-10-06 02:15:44 +03:00
// Size returns the current size of the backend.
Size ( ) int64
2016-03-02 23:00:32 +03:00
Defrag ( ) error
2015-05-15 06:25:52 +03:00
ForceCommit ( )
Close ( ) error
}
2015-09-17 02:26:46 +03:00
type Snapshot interface {
// Size gets the size of the snapshot.
Size ( ) int64
2016-02-01 08:42:39 +03:00
// WriteTo writes the snapshot into the given writer.
2015-09-17 02:26:46 +03:00
WriteTo ( w io . Writer ) ( n int64 , err error )
// Close closes the snapshot.
Close ( ) error
}
2015-05-15 06:25:52 +03:00
type backend struct {
2016-03-09 22:17:27 +03:00
// size and commits are used with atomic operations so they must be
// 64-bit aligned, otherwise 32-bit tests will crash
// size is the number of bytes in the backend
size int64
// commits counts number of commits since start
commits int64
2016-03-02 23:00:32 +03:00
mu sync . RWMutex
2015-05-15 06:25:52 +03:00
db * bolt . DB
batchInterval time . Duration
batchLimit int
2017-01-05 13:07:50 +03:00
batchTx * batchTxBuffered
readTx * readTx
2016-01-14 08:57:40 +03:00
2015-08-28 08:59:14 +03:00
stopc chan struct { }
donec chan struct { }
2015-05-15 06:25:52 +03:00
}
2017-03-17 06:17:27 +03:00
type BackendConfig struct {
// Path is the file path to the backend file.
Path string
// BatchInterval is the maximum time before flushing the BatchTx.
BatchInterval time . Duration
// BatchLimit is the maximum puts before flushing the BatchTx.
BatchLimit int
// MmapSize is the number of bytes to mmap for the backend.
MmapSize uint64
}
func DefaultBackendConfig ( ) BackendConfig {
return BackendConfig {
BatchInterval : defaultBatchInterval ,
BatchLimit : defaultBatchLimit ,
MmapSize : initialMmapSize ,
}
}
func New ( bcfg BackendConfig ) Backend {
return newBackend ( bcfg )
2015-08-29 08:03:32 +03:00
}
2016-01-06 06:45:18 +03:00
func NewDefaultBackend ( path string ) Backend {
2017-03-17 06:17:27 +03:00
bcfg := DefaultBackendConfig ( )
bcfg . Path = path
return newBackend ( bcfg )
2016-01-06 06:45:18 +03:00
}
2017-03-17 06:17:27 +03:00
func newBackend ( bcfg BackendConfig ) * backend {
bopts := & bolt . Options { }
if boltOpenOptions != nil {
* bopts = * boltOpenOptions
}
2017-05-13 00:29:04 +03:00
bopts . InitialMmapSize = bcfg . mmapSize ( )
2017-03-17 06:17:27 +03:00
db , err := bolt . Open ( bcfg . Path , 0600 , bopts )
2015-05-15 06:25:52 +03:00
if err != nil {
2017-03-17 06:17:27 +03:00
plog . Panicf ( "cannot open database at %s (%v)" , bcfg . Path , err )
2015-05-15 06:25:52 +03:00
}
2017-01-05 13:07:50 +03:00
// In future, may want to make buffering optional for low-concurrency systems
// or dynamically swap between buffered/non-buffered depending on workload.
2015-05-15 06:25:52 +03:00
b := & backend {
db : db ,
2017-03-17 06:17:27 +03:00
batchInterval : bcfg . BatchInterval ,
batchLimit : bcfg . BatchLimit ,
2015-05-15 06:25:52 +03:00
2017-01-05 13:07:50 +03:00
readTx : & readTx { buf : txReadBuffer {
txBuffer : txBuffer { make ( map [ string ] * bucketBuffer ) } } ,
} ,
2015-08-28 08:59:14 +03:00
stopc : make ( chan struct { } ) ,
donec : make ( chan struct { } ) ,
2015-05-15 06:25:52 +03:00
}
2017-01-05 13:07:50 +03:00
b . batchTx = newBatchTxBuffered ( b )
2015-05-15 06:25:52 +03:00
go b . run ( )
return b
}
2015-07-24 18:16:27 +03:00
// BatchTx returns the current batch tx in coalescer. The tx can be used for read and
2015-05-15 06:25:52 +03:00
// write operations. The write result can be retrieved within the same tx immediately.
// The write result is isolated with other txs until the current one get committed.
func ( b * backend ) BatchTx ( ) BatchTx {
return b . batchTx
}
2017-01-05 13:07:50 +03:00
func ( b * backend ) ReadTx ( ) ReadTx { return b . readTx }
2016-02-21 16:05:03 +03:00
// ForceCommit forces the current batching tx to commit.
2015-05-15 06:25:52 +03:00
func ( b * backend ) ForceCommit ( ) {
2015-05-21 03:32:53 +03:00
b . batchTx . Commit ( )
2015-05-15 06:25:52 +03:00
}
2015-09-17 02:26:46 +03:00
func ( b * backend ) Snapshot ( ) Snapshot {
2017-05-11 20:51:19 +03:00
b . batchTx . Commit ( )
b . mu . RLock ( )
defer b . mu . RUnlock ( )
tx , err := b . db . Begin ( false )
if err != nil {
plog . Fatalf ( "cannot begin tx (%s)" , err )
}
2017-05-05 01:00:04 +03:00
stopc , donec := make ( chan struct { } ) , make ( chan struct { } )
2017-05-11 20:51:19 +03:00
dbBytes := tx . Size ( )
2017-05-05 01:00:04 +03:00
go func ( ) {
defer close ( donec )
2017-05-11 20:51:19 +03:00
// sendRateBytes is based on transferring snapshot data over a 1 gigabit/s connection
// assuming a min tcp throughput of 100MB/s.
var sendRateBytes int64 = 100 * 1024 * 1014
warningTimeout := time . Duration ( int64 ( ( float64 ( dbBytes ) / float64 ( sendRateBytes ) ) * float64 ( time . Second ) ) )
if warningTimeout < minSnapshotWarningTimeout {
warningTimeout = minSnapshotWarningTimeout
}
2017-05-05 01:00:04 +03:00
start := time . Now ( )
2017-05-11 20:51:19 +03:00
ticker := time . NewTicker ( warningTimeout )
2017-05-05 01:00:04 +03:00
defer ticker . Stop ( )
for {
select {
case <- ticker . C :
2017-05-11 20:51:19 +03:00
plog . Warningf ( "snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]" , time . Since ( start ) . Seconds ( ) , float64 ( dbBytes ) / float64 ( 1024 * 1014 ) , start )
2017-05-05 01:00:04 +03:00
case <- stopc :
2017-05-06 02:34:22 +03:00
snapshotDurations . Observe ( time . Since ( start ) . Seconds ( ) )
2017-05-05 01:00:04 +03:00
return
}
}
} ( )
return & snapshot { tx , stopc , donec }
2015-06-08 19:26:56 +03:00
}
2016-06-28 18:49:44 +03:00
type IgnoreKey struct {
Bucket string
Key string
}
func ( b * backend ) Hash ( ignores map [ IgnoreKey ] struct { } ) ( uint32 , error ) {
2015-09-23 21:01:27 +03:00
h := crc32 . New ( crc32 . MakeTable ( crc32 . Castagnoli ) )
2016-03-02 23:00:32 +03:00
b . mu . RLock ( )
defer b . mu . RUnlock ( )
2015-09-23 21:01:27 +03:00
err := b . db . View ( func ( tx * bolt . Tx ) error {
c := tx . Cursor ( )
for next , _ := c . First ( ) ; next != nil ; next , _ = c . Next ( ) {
b := tx . Bucket ( next )
if b == nil {
return fmt . Errorf ( "cannot get hash of bucket %s" , string ( next ) )
}
h . Write ( next )
b . ForEach ( func ( k , v [ ] byte ) error {
2016-06-28 18:49:44 +03:00
bk := IgnoreKey { Bucket : string ( next ) , Key : string ( k ) }
if _ , ok := ignores [ bk ] ; ! ok {
h . Write ( k )
h . Write ( v )
}
2015-09-23 21:01:27 +03:00
return nil
} )
}
return nil
} )
if err != nil {
return 0 , err
}
return h . Sum32 ( ) , nil
}
2015-10-06 02:15:44 +03:00
func ( b * backend ) Size ( ) int64 {
return atomic . LoadInt64 ( & b . size )
}
2015-05-15 06:25:52 +03:00
func ( b * backend ) run ( ) {
defer close ( b . donec )
2016-06-23 16:49:41 +03:00
t := time . NewTimer ( b . batchInterval )
defer t . Stop ( )
2015-05-15 06:25:52 +03:00
for {
select {
2016-06-23 16:49:41 +03:00
case <- t . C :
2015-05-15 06:25:52 +03:00
case <- b . stopc :
2015-08-25 20:57:23 +03:00
b . batchTx . CommitAndStop ( )
2015-05-15 06:25:52 +03:00
return
}
2015-05-21 03:32:53 +03:00
b . batchTx . Commit ( )
2016-06-23 16:49:41 +03:00
t . Reset ( b . batchInterval )
2015-05-15 06:25:52 +03:00
}
}
func ( b * backend ) Close ( ) error {
close ( b . stopc )
<- b . donec
return b . db . Close ( )
}
2015-09-17 02:26:46 +03:00
2016-01-14 08:57:40 +03:00
// Commits returns total number of commits since start
func ( b * backend ) Commits ( ) int64 {
return atomic . LoadInt64 ( & b . commits )
}
2016-03-02 23:00:32 +03:00
func ( b * backend ) Defrag ( ) error {
2016-04-21 18:48:33 +03:00
err := b . defrag ( )
if err != nil {
return err
}
// commit to update metadata like db.size
b . batchTx . Commit ( )
return nil
}
func ( b * backend ) defrag ( ) error {
2016-03-02 23:00:32 +03:00
// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then
// close previous ongoing tx.
b . batchTx . Lock ( )
defer b . batchTx . Unlock ( )
// lock database after lock tx to avoid deadlock.
b . mu . Lock ( )
defer b . mu . Unlock ( )
2017-03-23 19:46:34 +03:00
// block concurrent read requests while resetting tx
b . readTx . mu . Lock ( )
defer b . readTx . mu . Unlock ( )
b . batchTx . unsafeCommit ( true )
2016-03-02 23:00:32 +03:00
b . batchTx . tx = nil
tmpdb , err := bolt . Open ( b . db . Path ( ) + ".tmp" , 0600 , boltOpenOptions )
if err != nil {
return err
}
err = defragdb ( b . db , tmpdb , defragLimit )
if err != nil {
tmpdb . Close ( )
os . RemoveAll ( tmpdb . Path ( ) )
return err
}
dbp := b . db . Path ( )
tdbp := tmpdb . Path ( )
err = b . db . Close ( )
if err != nil {
2016-05-21 08:30:50 +03:00
plog . Fatalf ( "cannot close database (%s)" , err )
2016-03-02 23:00:32 +03:00
}
err = tmpdb . Close ( )
if err != nil {
2016-05-21 08:30:50 +03:00
plog . Fatalf ( "cannot close database (%s)" , err )
2016-03-02 23:00:32 +03:00
}
err = os . Rename ( tdbp , dbp )
if err != nil {
2016-05-21 08:30:50 +03:00
plog . Fatalf ( "cannot rename database (%s)" , err )
2016-03-02 23:00:32 +03:00
}
b . db , err = bolt . Open ( dbp , 0600 , boltOpenOptions )
if err != nil {
2016-05-21 08:30:50 +03:00
plog . Panicf ( "cannot open database at %s (%v)" , dbp , err )
2016-03-02 23:00:32 +03:00
}
b . batchTx . tx , err = b . db . Begin ( true )
if err != nil {
2016-05-21 08:30:50 +03:00
plog . Fatalf ( "cannot begin tx (%s)" , err )
2016-03-02 23:00:32 +03:00
}
2017-03-23 19:46:34 +03:00
b . readTx . buf . reset ( )
b . readTx . tx = b . unsafeBegin ( false )
atomic . StoreInt64 ( & b . size , b . readTx . tx . Size ( ) )
2016-03-02 23:00:32 +03:00
return nil
}
func defragdb ( odb , tmpdb * bolt . DB , limit int ) error {
// open a tx on tmpdb for writes
tmptx , err := tmpdb . Begin ( true )
if err != nil {
return err
}
// open a tx on old db for read
tx , err := odb . Begin ( false )
if err != nil {
return err
}
defer tx . Rollback ( )
c := tx . Cursor ( )
count := 0
for next , _ := c . First ( ) ; next != nil ; next , _ = c . Next ( ) {
b := tx . Bucket ( next )
if b == nil {
return fmt . Errorf ( "backend: cannot defrag bucket %s" , string ( next ) )
}
tmpb , berr := tmptx . CreateBucketIfNotExists ( next )
2017-03-21 18:06:03 +03:00
tmpb . FillPercent = 0.9 // for seq write in for each
2016-03-02 23:00:32 +03:00
if berr != nil {
return berr
}
b . ForEach ( func ( k , v [ ] byte ) error {
count ++
if count > limit {
err = tmptx . Commit ( )
if err != nil {
return err
}
tmptx , err = tmpdb . Begin ( true )
if err != nil {
return err
}
tmpb = tmptx . Bucket ( next )
2017-03-21 18:06:03 +03:00
tmpb . FillPercent = 0.9 // for seq write in for each
2016-04-03 03:25:05 +03:00
count = 0
2016-03-02 23:00:32 +03:00
}
2016-04-08 21:36:41 +03:00
return tmpb . Put ( k , v )
2016-03-02 23:00:32 +03:00
} )
}
return tmptx . Commit ( )
}
2017-01-05 13:07:50 +03:00
func ( b * backend ) begin ( write bool ) * bolt . Tx {
b . mu . RLock ( )
2017-03-23 19:46:34 +03:00
tx := b . unsafeBegin ( write )
b . mu . RUnlock ( )
atomic . StoreInt64 ( & b . size , tx . Size ( ) )
return tx
}
func ( b * backend ) unsafeBegin ( write bool ) * bolt . Tx {
2017-01-05 13:07:50 +03:00
tx , err := b . db . Begin ( write )
if err != nil {
plog . Fatalf ( "cannot begin tx (%s)" , err )
}
return tx
}
2016-01-06 06:45:18 +03:00
// NewTmpBackend creates a backend implementation for testing.
func NewTmpBackend ( batchInterval time . Duration , batchLimit int ) ( * backend , string ) {
dir , err := ioutil . TempDir ( os . TempDir ( ) , "etcd_backend_test" )
if err != nil {
2016-05-21 08:30:50 +03:00
plog . Fatal ( err )
2016-01-06 06:45:18 +03:00
}
2017-03-16 05:31:10 +03:00
tmpPath := filepath . Join ( dir , "database" )
2017-03-17 06:17:27 +03:00
bcfg := DefaultBackendConfig ( )
bcfg . Path , bcfg . BatchInterval , bcfg . BatchLimit = tmpPath , batchInterval , batchLimit
return newBackend ( bcfg ) , tmpPath
2016-01-06 06:45:18 +03:00
}
func NewDefaultTmpBackend ( ) ( * backend , string ) {
return NewTmpBackend ( defaultBatchInterval , defaultBatchLimit )
}
2015-09-17 02:26:46 +03:00
type snapshot struct {
* bolt . Tx
2017-05-05 01:00:04 +03:00
stopc chan struct { }
donec chan struct { }
2015-09-17 02:26:46 +03:00
}
2017-05-05 01:00:04 +03:00
func ( s * snapshot ) Close ( ) error {
close ( s . stopc )
<- s . donec
return s . Tx . Rollback ( )
}