storage: initial snapshot and restore

Snapshot takes an io.Writer and writes the entire backend data to
the given writer. Snapshot writes a consistent view and does not
block other storage operations.

Restore restores the in-memory states (index and book keeping) of
the storage from the backend data.
release-2.1
Xiang Li 2015-06-08 09:26:56 -07:00
parent 05b55d9d75
commit ba9a46aa02
7 changed files with 192 additions and 7 deletions

View File

@ -1,6 +1,7 @@
package backend
import (
"io"
"log"
"time"
@ -9,6 +10,7 @@ import (
type Backend interface {
BatchTx() BatchTx
Snapshot(w io.Writer) (n int64, err error)
ForceCommit()
Close() error
}
@ -60,6 +62,14 @@ func (b *backend) ForceCommit() {
b.batchTx.Commit()
}
func (b *backend) Snapshot(w io.Writer) (n int64, err error) {
b.db.View(func(tx *bolt.Tx) error {
n, err = tx.WriteTo(w)
return nil
})
return n, err
}
func (b *backend) run() {
defer close(b.donec)
@ -70,6 +80,7 @@ func (b *backend) run() {
select {
case <-time.After(b.batchInterval):
case <-b.stopc:
b.batchTx.Commit()
return
}
b.batchTx.Commit()

View File

@ -13,6 +13,7 @@ type index interface {
Put(key []byte, rev reversion)
Tombstone(key []byte, rev reversion) error
Compact(rev int64) map[reversion]struct{}
Equal(b index) bool
}
type treeIndex struct {
@ -130,3 +131,25 @@ func compactIndex(rev int64, available map[reversion]struct{}, emptyki *[]*keyIn
return true
}
}
func (a *treeIndex) Equal(bi index) bool {
b := bi.(*treeIndex)
if a.tree.Len() != b.tree.Len() {
return false
}
equal := true
a.tree.Ascend(func(item btree.Item) bool {
aki := item.(*keyIndex)
bki := b.tree.Get(item).(*keyIndex)
if !aki.equal(bki) {
equal = false
return false
}
return true
})
return equal
}

View File

@ -187,6 +187,25 @@ func (a *keyIndex) Less(b btree.Item) bool {
return bytes.Compare(a.key, b.(*keyIndex).key) == -1
}
func (a *keyIndex) equal(b *keyIndex) bool {
if !bytes.Equal(a.key, b.key) {
return false
}
if a.rev != b.rev {
return false
}
if len(a.generations) != len(b.generations) {
return false
}
for i := range a.generations {
ag, bg := a.generations[i], b.generations[i]
if !ag.equal(bg) {
return false
}
}
return true
}
func (ki *keyIndex) String() string {
var s string
for _, g := range ki.generations {
@ -221,3 +240,20 @@ func (g *generation) walk(f func(rev reversion) bool) int {
func (g *generation) String() string {
return fmt.Sprintf("g: ver[%d], revs %#v\n", g.ver, g.revs)
}
func (a generation) equal(b generation) bool {
if a.ver != b.ver {
return false
}
if len(a.revs) != len(b.revs) {
return false
}
for i := range a.revs {
ar, br := a.revs[i], b.revs[i]
if ar != br {
return false
}
}
return true
}

View File

@ -1,6 +1,10 @@
package storage
import "github.com/coreos/etcd/storage/storagepb"
import (
"io"
"github.com/coreos/etcd/storage/storagepb"
)
type KV interface {
// Range gets the keys in the range at rangeRev.
@ -35,4 +39,10 @@ type KV interface {
TnxDeleteRange(tnxID int64, key, end []byte) (n, rev int64, err error)
Compact(rev int64) error
// Write a snapshot to the given io writer
Snapshot(w io.Writer) (int64, error)
Restore() error
Close() error
}

View File

@ -2,7 +2,9 @@ package storage
import (
"errors"
"io"
"log"
"math"
"math/rand"
"sync"
"time"
@ -37,7 +39,7 @@ type store struct {
tnxID int64 // tracks the current tnxID to verify tnx operations
}
func newStore(path string) KV {
func newStore(path string) *store {
s := &store{
b: backend.New(path, batchInterval, batchLimit),
kvindex: newTreeIndex(),
@ -146,7 +148,7 @@ func (s *store) Compact(rev int64) error {
s.compactMainRev = rev
rbytes := make([]byte, 8+1+8)
rbytes := newRevBytes()
revToBytes(reversion{main: rev}, rbytes)
tx := s.b.BatchTx()
@ -160,6 +162,80 @@ func (s *store) Compact(rev int64) error {
return nil
}
func (s *store) Snapshot(w io.Writer) (int64, error) {
s.b.ForceCommit()
return s.b.Snapshot(w)
}
func (s *store) Restore() error {
s.mu.Lock()
defer s.mu.Unlock()
min, max := newRevBytes(), newRevBytes()
revToBytes(reversion{}, min)
revToBytes(reversion{main: math.MaxInt64, sub: math.MaxInt64}, max)
// restore index
tx := s.b.BatchTx()
tx.Lock()
_, finishedCompactBytes := tx.UnsafeRange(keyBucketName, finishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
log.Printf("storage: restore compact to %d", s.compactMainRev)
}
// TODO: limit N to reduce max memory usage
keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0)
for i, key := range keys {
e := &storagepb.Event{}
if err := e.Unmarshal(vals[i]); err != nil {
log.Fatalf("storage: cannot unmarshal event: %v", err)
}
rev := bytesToRev(key)
// restore index
switch e.Type {
case storagepb.PUT:
s.kvindex.Put(e.Kv.Key, rev)
case storagepb.DELETE:
s.kvindex.Tombstone(e.Kv.Key, rev)
default:
log.Panicf("storage: unexpected event type %s", e.Type)
}
// update reversion
s.currentRev = rev
}
_, scheduledCompactBytes := tx.UnsafeRange(keyBucketName, scheduledCompactKeyName, nil, 0)
if len(scheduledCompactBytes) != 0 {
scheduledCompact := bytesToRev(finishedCompactBytes[0]).main
if scheduledCompact > s.compactMainRev {
log.Printf("storage: resume scheduled compaction at %d", scheduledCompact)
go s.Compact(scheduledCompact)
}
}
tx.Unlock()
return nil
}
func (s *store) Close() error {
return s.b.Close()
}
func (a *store) Equal(b *store) bool {
if a.currentRev != b.currentRev {
return false
}
if a.compactMainRev != b.compactMainRev {
return false
}
return a.kvindex.Equal(b.kvindex)
}
// range is a keyword in Go, add Keys suffix.
func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
if rangeRev <= 0 {
@ -186,7 +262,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
tx.Lock()
defer tx.Unlock()
for _, revpair := range revpairs {
revbytes := make([]byte, 8+1+8)
revbytes := newRevBytes()
revToBytes(revpair, revbytes)
_, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
@ -206,7 +282,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
}
func (s *store) put(key, value []byte, rev int64) {
ibytes := make([]byte, 8+1+8)
ibytes := newRevBytes()
revToBytes(reversion{main: rev, sub: s.currentRev.sub}, ibytes)
event := storagepb.Event{
@ -266,7 +342,7 @@ func (s *store) delete(key []byte, mainrev int64) bool {
tx.Lock()
defer tx.Unlock()
revbytes := make([]byte, 8+1+8)
revbytes := newRevBytes()
revToBytes(rev, revbytes)
_, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
@ -282,7 +358,7 @@ func (s *store) delete(key []byte, mainrev int64) bool {
return false
}
ibytes := make([]byte, 8+1+8)
ibytes := newRevBytes()
revToBytes(reversion{main: mainrev, sub: s.currentRev.sub}, ibytes)
event := storagepb.Event{

View File

@ -320,6 +320,31 @@ func TestCompaction(t *testing.T) {
}
}
// TODO: test more complicated cases:
// with unfinished compaction
// with removed keys
func TestRestore(t *testing.T) {
s0 := newStore("test")
defer os.Remove("test")
s0.Put([]byte("foo"), []byte("bar"))
s0.Put([]byte("foo1"), []byte("bar1"))
s0.Put([]byte("foo2"), []byte("bar2"))
s0.Put([]byte("foo"), []byte("bar11"))
s0.Put([]byte("foo1"), []byte("bar12"))
s0.Put([]byte("foo2"), []byte("bar13"))
s0.Put([]byte("foo1"), []byte("bar14"))
s0.Close()
s1 := newStore("test")
s1.Restore()
if !s0.Equal(s1) {
t.Errorf("not equal!")
}
}
func BenchmarkStorePut(b *testing.B) {
s := newStore("test")
defer os.Remove("test")

View File

@ -7,6 +7,10 @@ type reversion struct {
sub int64
}
func newRevBytes() []byte {
return make([]byte, 8+1+8)
}
func revToBytes(rev reversion, bytes []byte) {
binary.BigEndian.PutUint64(bytes, uint64(rev.main))
bytes[8] = '_'