From d417b36e5d1ead7fbfac32a09d15c4ca19b71f54 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 30 May 2015 22:56:33 -0700 Subject: [PATCH 1/2] storage: refactor key_index --- storage/index.go | 61 +++++----- storage/index_test.go | 95 +++++++++------ storage/key_index.go | 199 +++++++++++++++++-------------- storage/key_index_test.go | 238 +++++++++++++++++--------------------- storage/kv.go | 22 ++-- storage/kvstore.go | 192 +++++++++++++++--------------- storage/kvstore_test.go | 88 +++++++------- 7 files changed, 452 insertions(+), 443 deletions(-) diff --git a/storage/index.go b/storage/index.go index 8af0d17db..af8a40d27 100644 --- a/storage/index.go +++ b/storage/index.go @@ -8,16 +8,11 @@ import ( ) type index interface { - Get(key []byte, atIndex uint64) (index uint64, err error) - Range(key, end []byte, atIndex uint64) []kipair - Put(key []byte, index uint64) - Tombstone(key []byte, index uint64) error - Compact(index uint64) map[uint64]struct{} -} - -type kipair struct { - index uint64 - key []byte + Get(key []byte, atRev int64) (rev reversion, err error) + Range(key, end []byte, atRev int64) ([][]byte, []reversion) + Put(key []byte, rev reversion) + Tombstone(key []byte, rev reversion) error + Compact(rev int64) map[reversion]struct{} } type treeIndex struct { @@ -31,47 +26,46 @@ func newTreeIndex() index { } } -func (ti *treeIndex) Put(key []byte, index uint64) { +func (ti *treeIndex) Put(key []byte, rev reversion) { keyi := &keyIndex{key: key} ti.Lock() defer ti.Unlock() item := ti.tree.Get(keyi) if item == nil { - keyi.put(index) + keyi.put(rev.main, rev.sub) ti.tree.ReplaceOrInsert(keyi) return } okeyi := item.(*keyIndex) - okeyi.put(index) + okeyi.put(rev.main, rev.sub) } -func (ti *treeIndex) Get(key []byte, atIndex uint64) (index uint64, err error) { +func (ti *treeIndex) Get(key []byte, atRev int64) (rev reversion, err error) { keyi := &keyIndex{key: key} ti.RLock() defer ti.RUnlock() item := ti.tree.Get(keyi) if item == nil { - return 0, ErrIndexNotFound + return reversion{}, ErrReversionNotFound } keyi = item.(*keyIndex) - return keyi.get(atIndex) + return keyi.get(atRev) } -func (ti *treeIndex) Range(key, end []byte, atIndex uint64) []kipair { +func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []reversion) { if end == nil { - index, err := ti.Get(key, atIndex) + rev, err := ti.Get(key, atRev) if err != nil { - return nil + return nil, nil } - return []kipair{{key: key, index: index}} + return [][]byte{key}, []reversion{rev} } keyi := &keyIndex{key: key} endi := &keyIndex{key: end} - pairs := make([]kipair, 0) ti.RLock() defer ti.RUnlock() @@ -81,41 +75,42 @@ func (ti *treeIndex) Range(key, end []byte, atIndex uint64) []kipair { return false } curKeyi := item.(*keyIndex) - index, err := curKeyi.get(atIndex) + rev, err := curKeyi.get(atRev) if err != nil { return true } - pairs = append(pairs, kipair{index, curKeyi.key}) + revs = append(revs, rev) + keys = append(keys, curKeyi.key) return true }) - return pairs + return keys, revs } -func (ti *treeIndex) Tombstone(key []byte, index uint64) error { +func (ti *treeIndex) Tombstone(key []byte, rev reversion) error { keyi := &keyIndex{key: key} ti.Lock() defer ti.Unlock() item := ti.tree.Get(keyi) if item == nil { - return ErrIndexNotFound + return ErrReversionNotFound } ki := item.(*keyIndex) - ki.tombstone(index) + ki.tombstone(rev.main, rev.sub) return nil } -func (ti *treeIndex) Compact(index uint64) map[uint64]struct{} { - available := make(map[uint64]struct{}) +func (ti *treeIndex) Compact(rev int64) map[reversion]struct{} { + available := make(map[reversion]struct{}) emptyki := make([]*keyIndex, 0) - log.Printf("store.index: compact %d", index) + log.Printf("store.index: compact %d", rev) // TODO: do not hold the lock for long time? // This is probably OK. Compacting 10M keys takes O(10ms). ti.Lock() defer ti.Unlock() - ti.tree.Ascend(compactIndex(index, available, &emptyki)) + ti.tree.Ascend(compactIndex(rev, available, &emptyki)) for _, ki := range emptyki { item := ti.tree.Delete(ki) if item == nil { @@ -125,10 +120,10 @@ func (ti *treeIndex) Compact(index uint64) map[uint64]struct{} { return available } -func compactIndex(index uint64, available map[uint64]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool { +func compactIndex(rev int64, available map[reversion]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool { return func(i btree.Item) bool { keyi := i.(*keyIndex) - keyi.compact(index, available) + keyi.compact(rev, available) if keyi.isEmpty() { *emptyki = append(*emptyki, keyi) } diff --git a/storage/index_test.go b/storage/index_test.go index ce89d3d70..3b9ef0c6b 100644 --- a/storage/index_test.go +++ b/storage/index_test.go @@ -9,20 +9,20 @@ func TestIndexPutAndGet(t *testing.T) { index := newTestTreeIndex() tests := []T{ - {[]byte("foo"), 0, ErrIndexNotFound, 0}, + {[]byte("foo"), 0, ErrReversionNotFound, 0}, {[]byte("foo"), 1, nil, 1}, {[]byte("foo"), 3, nil, 1}, {[]byte("foo"), 5, nil, 5}, {[]byte("foo"), 6, nil, 5}, - {[]byte("foo1"), 0, ErrIndexNotFound, 0}, - {[]byte("foo1"), 1, ErrIndexNotFound, 0}, + {[]byte("foo1"), 0, ErrReversionNotFound, 0}, + {[]byte("foo1"), 1, ErrReversionNotFound, 0}, {[]byte("foo1"), 2, nil, 2}, {[]byte("foo1"), 5, nil, 2}, {[]byte("foo1"), 6, nil, 6}, - {[]byte("foo2"), 0, ErrIndexNotFound, 0}, - {[]byte("foo2"), 1, ErrIndexNotFound, 0}, + {[]byte("foo2"), 0, ErrReversionNotFound, 0}, + {[]byte("foo2"), 1, ErrReversionNotFound, 0}, {[]byte("foo2"), 3, nil, 3}, {[]byte("foo2"), 4, nil, 4}, {[]byte("foo2"), 6, nil, 4}, @@ -34,31 +34,26 @@ func TestContinuousCompact(t *testing.T) { index := newTestTreeIndex() tests := []T{ - {[]byte("foo"), 0, ErrIndexNotFound, 0}, + {[]byte("foo"), 0, ErrReversionNotFound, 0}, {[]byte("foo"), 1, nil, 1}, {[]byte("foo"), 3, nil, 1}, {[]byte("foo"), 5, nil, 5}, {[]byte("foo"), 6, nil, 5}, - {[]byte("foo1"), 0, ErrIndexNotFound, 0}, - {[]byte("foo1"), 1, ErrIndexNotFound, 0}, + {[]byte("foo1"), 0, ErrReversionNotFound, 0}, + {[]byte("foo1"), 1, ErrReversionNotFound, 0}, {[]byte("foo1"), 2, nil, 2}, {[]byte("foo1"), 5, nil, 2}, {[]byte("foo1"), 6, nil, 6}, - {[]byte("foo2"), 0, ErrIndexNotFound, 0}, - {[]byte("foo2"), 1, ErrIndexNotFound, 0}, + {[]byte("foo2"), 0, ErrReversionNotFound, 0}, + {[]byte("foo2"), 1, ErrReversionNotFound, 0}, {[]byte("foo2"), 3, nil, 3}, {[]byte("foo2"), 4, nil, 4}, {[]byte("foo2"), 6, nil, 4}, } - wa := map[uint64]struct{}{ - 1: struct{}{}, - 2: struct{}{}, - 3: struct{}{}, - 4: struct{}{}, - 5: struct{}{}, - 6: struct{}{}, + wa := map[reversion]struct{}{ + reversion{main: 1}: struct{}{}, } ga := index.Compact(1) if !reflect.DeepEqual(ga, wa) { @@ -66,72 +61,96 @@ func TestContinuousCompact(t *testing.T) { } verify(t, index, tests) + wa = map[reversion]struct{}{ + reversion{main: 1}: struct{}{}, + reversion{main: 2}: struct{}{}, + } ga = index.Compact(2) if !reflect.DeepEqual(ga, wa) { t.Errorf("a = %v, want %v", ga, wa) } verify(t, index, tests) + wa = map[reversion]struct{}{ + reversion{main: 1}: struct{}{}, + reversion{main: 2}: struct{}{}, + reversion{main: 3}: struct{}{}, + } ga = index.Compact(3) if !reflect.DeepEqual(ga, wa) { t.Errorf("a = %v, want %v", ga, wa) } verify(t, index, tests) + wa = map[reversion]struct{}{ + reversion{main: 1}: struct{}{}, + reversion{main: 2}: struct{}{}, + reversion{main: 4}: struct{}{}, + } ga = index.Compact(4) - delete(wa, 3) - tests[12] = T{[]byte("foo2"), 3, ErrIndexNotFound, 0} + delete(wa, reversion{main: 3}) + tests[12] = T{[]byte("foo2"), 3, ErrReversionNotFound, 0} if !reflect.DeepEqual(wa, ga) { t.Errorf("a = %v, want %v", ga, wa) } verify(t, index, tests) + wa = map[reversion]struct{}{ + reversion{main: 2}: struct{}{}, + reversion{main: 4}: struct{}{}, + reversion{main: 5}: struct{}{}, + } ga = index.Compact(5) - delete(wa, 1) + delete(wa, reversion{main: 1}) if !reflect.DeepEqual(ga, wa) { t.Errorf("a = %v, want %v", ga, wa) } - tests[1] = T{[]byte("foo"), 1, ErrIndexNotFound, 0} - tests[2] = T{[]byte("foo"), 3, ErrIndexNotFound, 0} + tests[1] = T{[]byte("foo"), 1, ErrReversionNotFound, 0} + tests[2] = T{[]byte("foo"), 3, ErrReversionNotFound, 0} verify(t, index, tests) + wa = map[reversion]struct{}{ + reversion{main: 4}: struct{}{}, + reversion{main: 5}: struct{}{}, + reversion{main: 6}: struct{}{}, + } ga = index.Compact(6) - delete(wa, 2) + delete(wa, reversion{main: 2}) if !reflect.DeepEqual(ga, wa) { t.Errorf("a = %v, want %v", ga, wa) } - tests[7] = T{[]byte("foo1"), 2, ErrIndexNotFound, 0} - tests[8] = T{[]byte("foo1"), 5, ErrIndexNotFound, 0} + tests[7] = T{[]byte("foo1"), 2, ErrReversionNotFound, 0} + tests[8] = T{[]byte("foo1"), 5, ErrReversionNotFound, 0} verify(t, index, tests) } func verify(t *testing.T, index index, tests []T) { for i, tt := range tests { - h, err := index.Get(tt.key, tt.index) + h, err := index.Get(tt.key, tt.rev) if err != tt.werr { t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) } - if h != tt.windex { - t.Errorf("#%d: index = %d, want %d", i, h, tt.windex) + if h.main != tt.wrev { + t.Errorf("#%d: rev = %d, want %d", i, h.main, tt.wrev) } } } type T struct { - key []byte - index uint64 + key []byte + rev int64 - werr error - windex uint64 + werr error + wrev int64 } func newTestTreeIndex() index { index := newTreeIndex() - index.Put([]byte("foo"), 1) - index.Put([]byte("foo1"), 2) - index.Put([]byte("foo2"), 3) - index.Put([]byte("foo2"), 4) - index.Put([]byte("foo"), 5) - index.Put([]byte("foo1"), 6) + index.Put([]byte("foo"), reversion{main: 1}) + index.Put([]byte("foo1"), reversion{main: 2}) + index.Put([]byte("foo2"), reversion{main: 3}) + index.Put([]byte("foo2"), reversion{main: 4}) + index.Put([]byte("foo"), reversion{main: 5}) + index.Put([]byte("foo1"), reversion{main: 6}) return index } diff --git a/storage/key_index.go b/storage/key_index.go index cce3f6db5..51a5df223 100644 --- a/storage/key_index.go +++ b/storage/key_index.go @@ -3,33 +3,34 @@ package storage import ( "bytes" "errors" + "fmt" "log" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/google/btree" ) var ( - ErrIndexNotFound = errors.New("index: not found") + ErrReversionNotFound = errors.New("stroage: Reversion not found") ) -// keyIndex stores the index of an key in the backend. +// keyIndex stores the reversion of an key in the backend. // Each keyIndex has at least one key generation. // Each generation might have several key versions. // Tombstone on a key appends an tombstone version at the end // of the current generation and creates a new empty generation. // Each version of a key has an index pointing to the backend. // -// For example: put(1);put(2);tombstone(3);put(4);tombstone(5) on key "foo" +// For example: put(1.0);put(2.0);tombstone(3.0);put(4.0);tombstone(5.0) on key "foo" // generate a keyIndex: // key: "foo" -// index: 5 +// rev: 5 // generations: // {empty} -// {4, 5(t)} -// {1, 2, 3(t)} +// {4.0, 5.0(t)} +// {1.0, 2.0, 3.0(t)} // // Compact a keyIndex removes the versions with smaller or equal to -// index except the largest one. If the generations becomes empty +// rev except the largest one. If the generations becomes empty // during compaction, it will be removed. if all the generations get // removed, the keyIndex Should be removed. @@ -37,115 +38,125 @@ var ( // compact(2) on the previous example // generations: // {empty} -// {4, 5(t)} -// {2, 3(t)} +// {4.0, 5.0(t)} +// {2.0, 3.0(t)} // // compact(4) // generations: // {empty} -// {4, 5(t)} +// {4.0, 5.0(t)} // // compact(5): // generations: -// {empty} -// {5(t)} +// {empty} -> key SHOULD be removed. // // compact(6): // generations: // {empty} -> key SHOULD be removed. type keyIndex struct { key []byte - index uint64 + rev int64 generations []generation } -// put puts an index to the keyIndex. -func (ki *keyIndex) put(index uint64) { - if index < ki.index { - log.Panicf("store.keyindex: put with unexpected smaller index [%d / %d]", index, ki.index) +// put puts a reversion to the keyIndex. +func (ki *keyIndex) put(rev int64, subrev int64) { + if rev < ki.rev { + log.Panicf("store.keyindex: put with unexpected smaller reversion [%d / %d]", rev, ki.rev) } if len(ki.generations) == 0 { ki.generations = append(ki.generations, generation{}) } g := &ki.generations[len(ki.generations)-1] - g.cont = append(g.cont, index) + g.revs = append(g.revs, reversion{rev, subrev}) g.ver++ - ki.index = index + ki.rev = rev } -// tombstone puts an index, pointing to a tombstone, to the keyIndex. +// tombstone puts a reversion, pointing to a tombstone, to the keyIndex. // It also creates a new empty generation in the keyIndex. -func (ki *keyIndex) tombstone(index uint64) { +func (ki *keyIndex) tombstone(rev int64, subrev int64) { if ki.isEmpty() { log.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key)) } - ki.put(index) + ki.put(rev, subrev) ki.generations = append(ki.generations, generation{}) } -// get gets the index of thk that satisfies the given atIndex. -// Index must be lower or equal to the given atIndex. -func (ki *keyIndex) get(atIndex uint64) (index uint64, err error) { +// get gets the reversion of the key that satisfies the given atRev. +// Rev must be higher than or equal to the given atRev. +func (ki *keyIndex) get(atRev int64) (rev reversion, err error) { if ki.isEmpty() { log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key)) } - g := ki.findGeneration(atIndex) + g := ki.findGeneration(atRev) if g.isEmpty() { - return 0, ErrIndexNotFound + return reversion{}, ErrReversionNotFound } - f := func(index, ver uint64) bool { - if index <= atIndex { + f := func(rev reversion) bool { + if rev.main <= atRev { return false } return true } - _, n := g.walk(f) + n := g.walk(f) if n != -1 { - return g.cont[n], nil + return g.revs[n], nil } - return 0, ErrIndexNotFound + + return reversion{}, ErrReversionNotFound } // compact compacts a keyIndex by removing the versions with smaller or equal -// index than the given atIndex except the largest one. +// reversion than the given atRev except the largest one (If the largest one is +// a tombstone, it will not be kept). // If a generation becomes empty during compaction, it will be removed. -func (ki *keyIndex) compact(atIndex uint64, available map[uint64]struct{}) { +func (ki *keyIndex) compact(atRev int64, available map[reversion]struct{}) { if ki.isEmpty() { log.Panic("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key)) } - // walk until reaching the first content that has an index smaller or equal to - // the atIndex. - // add all the reached indexes into available map. - f := func(index, _ uint64) bool { - available[index] = struct{}{} - if index <= atIndex { + + // walk until reaching the first reversion that has an reversion smaller or equal to + // the atReversion. + // add it to the available map + f := func(rev reversion) bool { + if rev.main <= atRev { + available[rev] = struct{}{} return false } return true } - g := ki.findGeneration(atIndex) - i := len(ki.generations) - 1 - for i >= 0 { + g := ki.findGeneration(atRev) + if g == nil { + return + } + + i := 0 + for i <= len(ki.generations)-1 { wg := &ki.generations[i] if wg == g { break } - wg.walk(f) - i-- + i++ } - _, n := g.walk(f) - - // remove the previous contents. - if n != -1 { - g.cont = g.cont[n:] + if !g.isEmpty() { + n := g.walk(f) + // remove the previous contents. + if n != -1 { + g.revs = g.revs[n:] + } + // remove any tombstone + if len(g.revs) == 1 && i != len(ki.generations)-1 { + delete(available, g.revs[0]) + i++ + } } // remove the previous generations. ki.generations = ki.generations[i:] - return } @@ -155,51 +166,63 @@ func (ki *keyIndex) isEmpty() bool { // findGeneartion finds out the generation of the keyIndex that the // given index belongs to. -func (ki *keyIndex) findGeneration(index uint64) *generation { - g, youngerg := len(ki.generations)-1, len(ki.generations)-2 +func (ki *keyIndex) findGeneration(rev int64) *generation { + cg := len(ki.generations) - 1 - // If the head index of a younger generation is smaller than - // the given index, the index cannot be in the younger - // generation. - for youngerg >= 0 && ki.generations[youngerg].cont != nil { - yg := ki.generations[youngerg] - if yg.cont[len(yg.cont)-1] < index { - break + for cg >= 0 { + if len(ki.generations[cg].revs) == 0 { + cg-- + continue } - g-- - youngerg-- + g := ki.generations[cg] + if g.revs[0].main <= rev { + return &ki.generations[cg] + } + cg-- } - if g < 0 { - return nil - } - return &ki.generations[g] + return nil } func (a *keyIndex) Less(b btree.Item) bool { return bytes.Compare(a.key, b.(*keyIndex).key) == -1 } -type generation struct { - ver uint64 - cont []uint64 -} - -func (g *generation) isEmpty() bool { return len(g.cont) == 0 } - -// walk walks through the (index, version) pairs in the generation in ascending order. -// It passes the (index, version) to the given function. -// walk returns until: 1. it finishs walking all pairs 2. the function returns false. -// walk returns the (index, version) pair at where it stopped. If it stopped after -// finishing walking, (0, -1) will be returned. -func (g *generation) walk(f func(index, ver uint64) bool) (uint64, int) { - ver := g.ver - l := len(g.cont) - for i := range g.cont { - ok := f(g.cont[l-i-1], ver) - if !ok { - return ver, l - i - 1 - } - ver-- +func (ki *keyIndex) String() string { + var s string + for _, g := range ki.generations { + s += g.String() } - return 0, -1 + return s +} + +type generation struct { + ver int64 + revs []reversion +} + +type reversion struct { + main int64 + sub int64 +} + +func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 } + +// walk walks through the reversions in the generation in ascending order. +// It passes the revision to the given function. +// walk returns until: 1. it finishs walking all pairs 2. the function returns false. +// walk returns the position at where it stopped. If it stopped after +// finishing walking, -1 will be returned. +func (g *generation) walk(f func(rev reversion) bool) int { + l := len(g.revs) + for i := range g.revs { + ok := f(g.revs[l-i-1]) + if !ok { + return l - i - 1 + } + } + return -1 +} + +func (g *generation) String() string { + return fmt.Sprintf("g: ver[%d], revs %#v\n", g.ver, g.revs) } diff --git a/storage/key_index_test.go b/storage/key_index_test.go index fe997d194..9afd82199 100644 --- a/storage/key_index_test.go +++ b/storage/key_index_test.go @@ -7,23 +7,22 @@ import ( func TestKeyIndexGet(t *testing.T) { // key: "foo" - // index: 12 + // rev: 12 // generations: // {empty} // {8[1], 10[2], 12(t)[3]} // {4[2], 6(t)[3]} ki := newTestKeyIndex() - ki.compact(4, make(map[uint64]struct{})) + ki.compact(4, make(map[reversion]struct{})) tests := []struct { - index uint64 + rev int64 - windex uint64 - werr error + wrev int64 + werr error }{ - // expected not exist on an index that is greater than the last tombstone - {13, 0, ErrIndexNotFound}, - {13, 0, ErrIndexNotFound}, + {13, 12, nil}, + {13, 12, nil}, // get on generation 2 {12, 12, nil}, @@ -31,7 +30,7 @@ func TestKeyIndexGet(t *testing.T) { {10, 10, nil}, {9, 8, nil}, {8, 8, nil}, - {7, 0, ErrIndexNotFound}, + {7, 6, nil}, // get on generation 1 {6, 6, nil}, @@ -40,35 +39,35 @@ func TestKeyIndexGet(t *testing.T) { } for i, tt := range tests { - index, err := ki.get(tt.index) + rev, err := ki.get(tt.rev) if err != tt.werr { t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) } - if index != tt.windex { - t.Errorf("#%d: index = %d, want %d", i, index, tt.index) + if rev.main != tt.wrev { + t.Errorf("#%d: rev = %d, want %d", i, rev.main, tt.rev) } } } func TestKeyIndexPut(t *testing.T) { ki := &keyIndex{key: []byte("foo")} - ki.put(5) + ki.put(5, 0) wki := &keyIndex{ key: []byte("foo"), - index: 5, - generations: []generation{{ver: 1, cont: []uint64{5}}}, + rev: 5, + generations: []generation{{ver: 1, revs: []reversion{{main: 5}}}}, } if !reflect.DeepEqual(ki, wki) { t.Errorf("ki = %+v, want %+v", ki, wki) } - ki.put(7) + ki.put(7, 0) wki = &keyIndex{ key: []byte("foo"), - index: 7, - generations: []generation{{ver: 2, cont: []uint64{5, 7}}}, + rev: 7, + generations: []generation{{ver: 2, revs: []reversion{{main: 5}, {main: 7}}}}, } if !reflect.DeepEqual(ki, wki) { t.Errorf("ki = %+v, want %+v", ki, wki) @@ -77,27 +76,31 @@ func TestKeyIndexPut(t *testing.T) { func TestKeyIndexTombstone(t *testing.T) { ki := &keyIndex{key: []byte("foo")} - ki.put(5) + ki.put(5, 0) - ki.tombstone(7) + ki.tombstone(7, 0) wki := &keyIndex{ key: []byte("foo"), - index: 7, - generations: []generation{{ver: 2, cont: []uint64{5, 7}}, {}}, + rev: 7, + generations: []generation{{ver: 2, revs: []reversion{{main: 5}, {main: 7}}}, {}}, } if !reflect.DeepEqual(ki, wki) { t.Errorf("ki = %+v, want %+v", ki, wki) } - ki.put(8) - ki.put(9) - ki.tombstone(15) + ki.put(8, 0) + ki.put(9, 0) + ki.tombstone(15, 0) wki = &keyIndex{ - key: []byte("foo"), - index: 15, - generations: []generation{{ver: 2, cont: []uint64{5, 7}}, {ver: 3, cont: []uint64{8, 9, 15}}, {}}, + key: []byte("foo"), + rev: 15, + generations: []generation{ + {ver: 2, revs: []reversion{{main: 5}, {main: 7}}}, + {ver: 3, revs: []reversion{{main: 8}, {main: 9}, {main: 15}}}, + {}, + }, } if !reflect.DeepEqual(ki, wki) { t.Errorf("ki = %+v, want %+v", ki, wki) @@ -106,221 +109,192 @@ func TestKeyIndexTombstone(t *testing.T) { func TestKeyIndexCompact(t *testing.T) { tests := []struct { - compact uint64 + compact int64 wki *keyIndex - wam map[uint64]struct{} + wam map[reversion]struct{} }{ { 1, &keyIndex{ - key: []byte("foo"), - index: 12, + key: []byte("foo"), + rev: 12, generations: []generation{ - {ver: 3, cont: []uint64{2, 4, 6}}, - {ver: 3, cont: []uint64{8, 10, 12}}, + {ver: 3, revs: []reversion{{main: 2}, {main: 4}, {main: 6}}}, + {ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}}, {}, }, }, - map[uint64]struct{}{ - 2: struct{}{}, 4: struct{}{}, 6: struct{}{}, - 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, - }, + map[reversion]struct{}{}, }, { 2, &keyIndex{ - key: []byte("foo"), - index: 12, + key: []byte("foo"), + rev: 12, generations: []generation{ - {ver: 3, cont: []uint64{2, 4, 6}}, - {ver: 3, cont: []uint64{8, 10, 12}}, + {ver: 3, revs: []reversion{{main: 2}, {main: 4}, {main: 6}}}, + {ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}}, {}, }, }, - map[uint64]struct{}{ - 2: struct{}{}, 4: struct{}{}, 6: struct{}{}, - 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, + map[reversion]struct{}{ + reversion{main: 2}: struct{}{}, }, }, { 3, &keyIndex{ - key: []byte("foo"), - index: 12, + key: []byte("foo"), + rev: 12, generations: []generation{ - {ver: 3, cont: []uint64{2, 4, 6}}, - {ver: 3, cont: []uint64{8, 10, 12}}, + {ver: 3, revs: []reversion{{main: 2}, {main: 4}, {main: 6}}}, + {ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}}, {}, }, }, - map[uint64]struct{}{ - 2: struct{}{}, 4: struct{}{}, 6: struct{}{}, - 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, + map[reversion]struct{}{ + reversion{main: 2}: struct{}{}, }, }, { 4, &keyIndex{ - key: []byte("foo"), - index: 12, + key: []byte("foo"), + rev: 12, generations: []generation{ - {ver: 3, cont: []uint64{4, 6}}, - {ver: 3, cont: []uint64{8, 10, 12}}, + {ver: 3, revs: []reversion{{main: 4}, {main: 6}}}, + {ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}}, {}, }, }, - map[uint64]struct{}{ - 4: struct{}{}, 6: struct{}{}, - 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, + map[reversion]struct{}{ + reversion{main: 4}: struct{}{}, }, }, { 5, &keyIndex{ - key: []byte("foo"), - index: 12, + key: []byte("foo"), + rev: 12, generations: []generation{ - {ver: 3, cont: []uint64{4, 6}}, - {ver: 3, cont: []uint64{8, 10, 12}}, + {ver: 3, revs: []reversion{{main: 4}, {main: 6}}}, + {ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}}, {}, }, }, - map[uint64]struct{}{ - 4: struct{}{}, 6: struct{}{}, - 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, + map[reversion]struct{}{ + reversion{main: 4}: struct{}{}, }, }, { 6, &keyIndex{ - key: []byte("foo"), - index: 12, + key: []byte("foo"), + rev: 12, generations: []generation{ - {ver: 3, cont: []uint64{6}}, - {ver: 3, cont: []uint64{8, 10, 12}}, + {ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}}, {}, }, }, - map[uint64]struct{}{ - 6: struct{}{}, - 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, - }, + map[reversion]struct{}{}, }, { 7, &keyIndex{ - key: []byte("foo"), - index: 12, + key: []byte("foo"), + rev: 12, generations: []generation{ - {ver: 3, cont: []uint64{8, 10, 12}}, + {ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}}, {}, }, }, - map[uint64]struct{}{ - 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, - }, + map[reversion]struct{}{}, }, { 8, &keyIndex{ - key: []byte("foo"), - index: 12, + key: []byte("foo"), + rev: 12, generations: []generation{ - {ver: 3, cont: []uint64{8, 10, 12}}, + {ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}}, {}, }, }, - map[uint64]struct{}{ - 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, + map[reversion]struct{}{ + reversion{main: 8}: struct{}{}, }, }, { 9, &keyIndex{ - key: []byte("foo"), - index: 12, + key: []byte("foo"), + rev: 12, generations: []generation{ - {ver: 3, cont: []uint64{8, 10, 12}}, + {ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}}, {}, }, }, - map[uint64]struct{}{ - 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, + map[reversion]struct{}{ + reversion{main: 8}: struct{}{}, }, }, { 10, &keyIndex{ - key: []byte("foo"), - index: 12, + key: []byte("foo"), + rev: 12, generations: []generation{ - {ver: 3, cont: []uint64{10, 12}}, + {ver: 3, revs: []reversion{{main: 10}, {main: 12}}}, {}, }, }, - map[uint64]struct{}{ - 10: struct{}{}, 12: struct{}{}, + map[reversion]struct{}{ + reversion{main: 10}: struct{}{}, }, }, { 11, &keyIndex{ - key: []byte("foo"), - index: 12, + key: []byte("foo"), + rev: 12, generations: []generation{ - {ver: 3, cont: []uint64{10, 12}}, + {ver: 3, revs: []reversion{{main: 10}, {main: 12}}}, {}, }, }, - map[uint64]struct{}{ - 10: struct{}{}, 12: struct{}{}, + map[reversion]struct{}{ + reversion{main: 10}: struct{}{}, }, }, { 12, &keyIndex{ - key: []byte("foo"), - index: 12, - generations: []generation{ - {ver: 3, cont: []uint64{12}}, - {}, - }, + key: []byte("foo"), + rev: 12, + generations: []generation{{}}, }, - map[uint64]struct{}{ - 12: struct{}{}, - }, - }, - { - 13, - &keyIndex{ - key: []byte("foo"), - index: 12, - generations: []generation{ - {}, - }, - }, - map[uint64]struct{}{}, + map[reversion]struct{}{}, }, } // Continous Compaction ki := newTestKeyIndex() for i, tt := range tests { - am := make(map[uint64]struct{}) + am := make(map[reversion]struct{}) ki.compact(tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) } if !reflect.DeepEqual(am, tt.wam) { - t.Errorf("#%d: am = %+v, want %+v", am, tt.wam) + t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) } } // Jump Compaction for i, tt := range tests { if (i%2 == 0 && i < 6) && (i%2 == 1 && i > 6) { - am := make(map[uint64]struct{}) + am := make(map[reversion]struct{}) ki.compact(tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) @@ -334,31 +308,31 @@ func TestKeyIndexCompact(t *testing.T) { // OnceCompaction for i, tt := range tests { ki := newTestKeyIndex() - am := make(map[uint64]struct{}) + am := make(map[reversion]struct{}) ki.compact(tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) } if !reflect.DeepEqual(am, tt.wam) { - t.Errorf("#%d: am = %+v, want %+v", am, tt.wam) + t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) } } } func newTestKeyIndex() *keyIndex { // key: "foo" - // index: 12 + // rev: 12 // generations: // {empty} // {8[1], 10[2], 12(t)[3]} // {2[1], 4[2], 6(t)[3]} ki := &keyIndex{key: []byte("foo")} - ki.put(2) - ki.put(4) - ki.tombstone(6) - ki.put(8) - ki.put(10) - ki.tombstone(12) + ki.put(2, 0) + ki.put(4, 0) + ki.tombstone(6, 0) + ki.put(8, 0) + ki.put(10, 0) + ki.tombstone(12, 0) return ki } diff --git a/storage/kv.go b/storage/kv.go index d99934be3..949a9a4a0 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -3,33 +3,33 @@ package storage import "github.com/coreos/etcd/storage/storagepb" type KV interface { - // Range gets the keys in the range at rangeIndex. - // If rangeIndex <=0, range gets the keys at currentIndex. + // Range gets the keys in the range at rangeRev. + // If rangeRev <=0, range gets the keys at currentRev. // If `end` is nil, the request returns the key. // If `end` is not nil, it gets the keys in range [key, range_end). // Limit limits the number of keys returned. - Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) + Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64) // Put puts the given key,value into the store. - // A put also increases the index of the store, and generates one event in the event history. - Put(key, value []byte) (index int64) + // A put also increases the rev of the store, and generates one event in the event history. + Put(key, value []byte) (rev int64) // DeleteRange deletes the given range from the store. - // A deleteRange increases the index of the store if any key in the range exists. + // A deleteRange increases the rev of the store if any key in the range exists. // The number of key deleted will be returned. // It also generates one event for each key delete in the event history. // if the `end` is nil, deleteRange deletes the key. // if the `end` is not nil, deleteRange deletes the keys in range [key, range_end). - DeleteRange(key, end []byte) (n, index int64) + DeleteRange(key, end []byte) (n, rev int64) // TnxBegin begins a tnx. Only Tnx prefixed operation can be executed, others will be blocked // until tnx ends. Only one on-going tnx is allowed. // TnxBegin returns an int64 tnx ID. - // All tnx prefixed operations with same tnx ID will be done with the same index. + // All tnx prefixed operations with same tnx ID will be done with the same rev. TnxBegin() int64 // TnxEnd ends the on-going tnx with tnx ID. If the on-going tnx ID is not matched, error is returned. TnxEnd(tnxID int64) error - TnxRange(tnxID int64, key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64, err error) - TnxPut(tnxID int64, key, value []byte) (index int64, err error) - TnxDeleteRange(tnxID int64, key, end []byte) (n, index int64, err error) + TnxRange(tnxID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) + TnxPut(tnxID int64, key, value []byte) (rev int64, err error) + TnxDeleteRange(tnxID int64, key, end []byte) (n, rev int64, err error) } diff --git a/storage/kvstore.go b/storage/kvstore.go index d4237b293..3a2d183cb 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -1,7 +1,6 @@ package storage import ( - "bytes" "encoding/binary" "errors" "log" @@ -27,8 +26,7 @@ type store struct { b backend.Backend kvindex index - currentIndex uint64 - subIndex uint32 // tracks next subIndex to put into backend + currentRev reversion tmu sync.Mutex // protect the tnxID field tnxID int64 // tracks the current tnxID to verify tnx operations @@ -36,9 +34,9 @@ type store struct { func newStore(path string) KV { s := &store{ - b: backend.New(path, batchInterval, batchLimit), - kvindex: newTreeIndex(), - currentIndex: 0, + b: backend.New(path, batchInterval, batchLimit), + kvindex: newTreeIndex(), + currentRev: reversion{}, } tx := s.b.BatchTx() @@ -52,31 +50,31 @@ func newStore(path string) KV { func (s *store) Put(key, value []byte) int64 { id := s.TnxBegin() - s.put(key, value, s.currentIndex+1) + s.put(key, value, s.currentRev.main+1) s.TnxEnd(id) - return int64(s.currentIndex) + return int64(s.currentRev.main) } -func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { +func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64) { id := s.TnxBegin() - kvs, index = s.rangeKeys(key, end, limit, rangeIndex) + kvs, rev = s.rangeKeys(key, end, limit, rangeRev) s.TnxEnd(id) - return kvs, index + return kvs, rev } -func (s *store) DeleteRange(key, end []byte) (n, index int64) { +func (s *store) DeleteRange(key, end []byte) (n, rev int64) { id := s.TnxBegin() - n = s.deleteRange(key, end, s.currentIndex+1) + n = s.deleteRange(key, end, s.currentRev.main+1) s.TnxEnd(id) - return n, int64(s.currentIndex) + return n, int64(s.currentRev.main) } func (s *store) TnxBegin() int64 { s.mu.Lock() - s.subIndex = 0 + s.currentRev.sub = 0 s.tmu.Lock() defer s.tmu.Unlock() @@ -91,111 +89,94 @@ func (s *store) TnxEnd(tnxID int64) error { return ErrTnxIDMismatch } - if s.subIndex != 0 { - s.currentIndex += 1 + if s.currentRev.sub != 0 { + s.currentRev.main += 1 } - s.subIndex = 0 + s.currentRev.sub = 0 s.mu.Unlock() return nil } -func (s *store) TnxRange(tnxID int64, key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64, err error) { +func (s *store) TnxRange(tnxID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { s.tmu.Lock() defer s.tmu.Unlock() if tnxID != s.tnxID { return nil, 0, ErrTnxIDMismatch } - kvs, index = s.rangeKeys(key, end, limit, rangeIndex) - return kvs, index, nil + kvs, rev = s.rangeKeys(key, end, limit, rangeRev) + return kvs, rev, nil } -func (s *store) TnxPut(tnxID int64, key, value []byte) (index int64, err error) { +func (s *store) TnxPut(tnxID int64, key, value []byte) (rev int64, err error) { s.tmu.Lock() defer s.tmu.Unlock() if tnxID != s.tnxID { return 0, ErrTnxIDMismatch } - s.put(key, value, s.currentIndex+1) - return int64(s.currentIndex + 1), nil + s.put(key, value, s.currentRev.main+1) + return int64(s.currentRev.main + 1), nil } -func (s *store) TnxDeleteRange(tnxID int64, key, end []byte) (n, index int64, err error) { +func (s *store) TnxDeleteRange(tnxID int64, key, end []byte) (n, rev int64, err error) { s.tmu.Lock() defer s.tmu.Unlock() if tnxID != s.tnxID { return 0, 0, ErrTnxIDMismatch } - n = s.deleteRange(key, end, s.currentIndex+1) - if n != 0 || s.subIndex != 0 { - index = int64(s.currentIndex + 1) + n = s.deleteRange(key, end, s.currentRev.main+1) + if n != 0 || s.currentRev.sub != 0 { + rev = int64(s.currentRev.main + 1) } - return n, index, nil + return n, rev, nil } // range is a keyword in Go, add Keys suffix. -func (s *store) rangeKeys(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { - if rangeIndex <= 0 { - index = int64(s.currentIndex) - if s.subIndex > 0 { - index += 1 +func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64) { + if rangeRev <= 0 { + rev = int64(s.currentRev.main) + if s.currentRev.sub > 0 { + rev += 1 } } else { - index = rangeIndex + rev = rangeRev } - pairs := s.kvindex.Range(key, end, uint64(index)) - if len(pairs) == 0 { - return nil, index + _, revs := s.kvindex.Range(key, end, int64(rev)) + if len(revs) == 0 { + return nil, rev } - if limit > 0 && len(pairs) > int(limit) { - pairs = pairs[:limit] + if limit > 0 && len(revs) > int(limit) { + revs = revs[:limit] } tx := s.b.BatchTx() tx.Lock() defer tx.Unlock() + for _, rev := range revs { + revbytes := make([]byte, 8+1+8) + revToBytes(rev.main, rev.sub, revbytes) - for _, pair := range pairs { - ibytes := make([]byte, 8) - endbytes := make([]byte, 8) - binary.BigEndian.PutUint64(ibytes, pair.index) - binary.BigEndian.PutUint64(endbytes, pair.index+1) - - found := false - var kv *storagepb.KeyValue - - vs := tx.UnsafeRange(keyBucketName, ibytes, endbytes, 0) - for _, v := range vs { - var e storagepb.Event - err := e.Unmarshal(v) - if err != nil { - log.Fatalf("storage: range cannot unmarshal event: %v", err) - } - if bytes.Equal(e.Kv.Key, pair.key) { - if e.Type == storagepb.PUT { - kv = &e.Kv - } else { - kv = nil - } - found = true - } + vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + if len(vs) != 1 { + log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub) } - if !found { - log.Fatalf("storage: range cannot find key %s at index %d", string(pair.key), pair.index) + e := &storagepb.Event{} + if err := e.Unmarshal(vs[0]); err != nil { + log.Fatalf("storage: cannot unmarshal event: %v", err) } - if kv != nil { - kvs = append(kvs, *kv) + if e.Type == storagepb.PUT { + kvs = append(kvs, e.Kv) } } - return kvs, index + return kvs, rev } -func (s *store) put(key, value []byte, index uint64) { - ibytes := make([]byte, 8+1+4) - indexToBytes(index, s.subIndex, ibytes) +func (s *store) put(key, value []byte, rev int64) { + ibytes := make([]byte, 8+1+8) + revToBytes(rev, s.currentRev.sub, ibytes) event := storagepb.Event{ Type: storagepb.PUT, @@ -214,24 +195,24 @@ func (s *store) put(key, value []byte, index uint64) { tx.Lock() defer tx.Unlock() tx.UnsafePut(keyBucketName, ibytes, d) - s.kvindex.Put(key, index) - s.subIndex += 1 + s.kvindex.Put(key, reversion{main: rev, sub: s.currentRev.sub}) + s.currentRev.sub += 1 } -func (s *store) deleteRange(key, end []byte, index uint64) int64 { +func (s *store) deleteRange(key, end []byte, rev int64) int64 { var n int64 - rindex := index - if s.subIndex > 0 { - rindex += 1 + rrev := rev + if s.currentRev.sub > 0 { + rrev += 1 } - pairs := s.kvindex.Range(key, end, rindex) + keys, _ := s.kvindex.Range(key, end, rrev) - if len(pairs) == 0 { + if len(keys) == 0 { return 0 } - for _, pair := range pairs { - ok := s.delete(pair.key, index) + for _, key := range keys { + ok := s.delete(key, rev) if ok { n++ } @@ -239,19 +220,39 @@ func (s *store) deleteRange(key, end []byte, index uint64) int64 { return n } -func (s *store) delete(key []byte, index uint64) bool { - gindex := index - if s.subIndex > 0 { - gindex += 1 +func (s *store) delete(key []byte, mainrev int64) bool { + grev := mainrev + if s.currentRev.sub > 0 { + grev += 1 } - _, err := s.kvindex.Get(key, gindex) + rev, err := s.kvindex.Get(key, grev) if err != nil { // key not exist return false } - ibytes := make([]byte, 8+1+4) - indexToBytes(index, s.subIndex, ibytes) + tx := s.b.BatchTx() + tx.Lock() + defer tx.Unlock() + + revbytes := make([]byte, 8+1+8) + revToBytes(rev.main, rev.sub, revbytes) + + vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + if len(vs) != 1 { + log.Fatalf("storage: delete cannot find rev (%d,%d)", rev.main, rev.sub) + } + + e := &storagepb.Event{} + if err := e.Unmarshal(vs[0]); err != nil { + log.Fatalf("storage: cannot unmarshal event: %v", err) + } + if e.Type == storagepb.DELETE { + return false + } + + ibytes := make([]byte, 8+1+8) + revToBytes(mainrev, s.currentRev.sub, ibytes) event := storagepb.Event{ Type: storagepb.DELETE, @@ -265,20 +266,17 @@ func (s *store) delete(key []byte, index uint64) bool { log.Fatalf("storage: cannot marshal event: %v", err) } - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() tx.UnsafePut(keyBucketName, ibytes, d) - err = s.kvindex.Tombstone(key, index) + err = s.kvindex.Tombstone(key, reversion{main: mainrev, sub: s.currentRev.sub}) if err != nil { log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err) } - s.subIndex += 1 + s.currentRev.sub += 1 return true } -func indexToBytes(index uint64, subindex uint32, bytes []byte) { - binary.BigEndian.PutUint64(bytes, index) +func revToBytes(main int64, sub int64, bytes []byte) { + binary.BigEndian.PutUint64(bytes, uint64(main)) bytes[8] = '_' - binary.BigEndian.PutUint32(bytes[9:], subindex) + binary.BigEndian.PutUint64(bytes[9:], uint64(sub)) } diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 5aacd165f..d2ff0de23 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -16,9 +16,9 @@ func TestRange(t *testing.T) { tests := []struct { key, end []byte - index int64 + rev int64 - windex int64 + wrev int64 // TODO: change this to the actual kv wN int64 }{ @@ -41,12 +41,12 @@ func TestRange(t *testing.T) { } for i, tt := range tests { - kvs, index := s.Range(tt.key, tt.end, 0, tt.index) + kvs, rev := s.Range(tt.key, tt.end, 0, tt.rev) if len(kvs) != int(tt.wN) { t.Errorf("#%d: len(kvs) = %d, want %d", i, len(kvs), tt.wN) } - if index != tt.windex { - t.Errorf("#%d: index = %d, wang %d", i, tt.index, tt.windex) + if rev != tt.wrev { + t.Errorf("#%d: rev = %d, want %d", i, tt.rev, tt.wrev) } } } @@ -55,8 +55,8 @@ func TestSimpleDeleteRange(t *testing.T) { tests := []struct { key, end []byte - windex int64 - wN int64 + wrev int64 + wN int64 }{ { []byte("foo"), []byte("foo1"), @@ -83,12 +83,12 @@ func TestSimpleDeleteRange(t *testing.T) { s.Put([]byte("foo1"), []byte("bar1")) s.Put([]byte("foo2"), []byte("bar2")) - n, index := s.DeleteRange(tt.key, tt.end) + n, rev := s.DeleteRange(tt.key, tt.end) if n != tt.wN { t.Errorf("#%d: n = %d, want %d", i, n, tt.wN) } - if index != tt.windex { - t.Errorf("#%d: index = %d, wang %d", i, index, tt.windex) + if rev != tt.wrev { + t.Errorf("#%d: rev = %d, wang %d", i, rev, tt.wrev) } os.Remove("test") @@ -104,49 +104,49 @@ func TestRangeInSequence(t *testing.T) { s.Put([]byte("foo2"), []byte("bar2")) // remove foo - n, index := s.DeleteRange([]byte("foo"), nil) - if n != 1 || index != 4 { - t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 4) + n, rev := s.DeleteRange([]byte("foo"), nil) + if n != 1 || rev != 4 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, rev, 1, 4) } // before removal foo - kvs, index := s.Range([]byte("foo"), []byte("foo3"), 0, 3) + kvs, rev := s.Range([]byte("foo"), []byte("foo3"), 0, 3) if len(kvs) != 3 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 3) } // after removal foo - kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 4) + kvs, rev = s.Range([]byte("foo"), []byte("foo3"), 0, 4) if len(kvs) != 2 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2) } // remove again -> expect nothing - n, index = s.DeleteRange([]byte("foo"), nil) - if n != 0 || index != 4 { - t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 0, 4) + n, rev = s.DeleteRange([]byte("foo"), nil) + if n != 0 || rev != 4 { + t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 0, 4) } // remove foo1 - n, index = s.DeleteRange([]byte("foo"), []byte("foo2")) - if n != 1 || index != 5 { - t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 5) + n, rev = s.DeleteRange([]byte("foo"), []byte("foo2")) + if n != 1 || rev != 5 { + t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 5) } // after removal foo1 - kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 5) + kvs, rev = s.Range([]byte("foo"), []byte("foo3"), 0, 5) if len(kvs) != 1 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1) } // remove foo2 - n, index = s.DeleteRange([]byte("foo2"), []byte("foo3")) - if n != 1 || index != 6 { - t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 6) + n, rev = s.DeleteRange([]byte("foo2"), []byte("foo3")) + if n != 1 || rev != 6 { + t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 6) } // after removal foo2 - kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 6) + kvs, rev = s.Range([]byte("foo"), []byte("foo3"), 0, 6) if len(kvs) != 0 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) } @@ -163,15 +163,15 @@ func TestOneTnx(t *testing.T) { s.TnxPut(id, []byte("foo2"), []byte("bar2")) // remove foo - n, index, err := s.TnxDeleteRange(id, []byte("foo"), nil) + n, rev, err := s.TnxDeleteRange(id, []byte("foo"), nil) if err != nil { t.Fatal(err) } - if n != 1 || index != 1 { - t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) + if n != 1 || rev != 1 { + t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 1) } - kvs, index, err := s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) + kvs, rev, err := s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) if err != nil { t.Fatal(err) } @@ -180,25 +180,25 @@ func TestOneTnx(t *testing.T) { } // remove again -> expect nothing - n, index, err = s.TnxDeleteRange(id, []byte("foo"), nil) + n, rev, err = s.TnxDeleteRange(id, []byte("foo"), nil) if err != nil { t.Fatal(err) } - if n != 0 || index != 1 { - t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 0, 1) + if n != 0 || rev != 1 { + t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 0, 1) } // remove foo1 - n, index, err = s.TnxDeleteRange(id, []byte("foo"), []byte("foo2")) + n, rev, err = s.TnxDeleteRange(id, []byte("foo"), []byte("foo2")) if err != nil { t.Fatal(err) } - if n != 1 || index != 1 { - t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) + if n != 1 || rev != 1 { + t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 1) } // after removal foo1 - kvs, index, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) + kvs, rev, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) if err != nil { t.Fatal(err) } @@ -207,16 +207,16 @@ func TestOneTnx(t *testing.T) { } // remove foo2 - n, index, err = s.TnxDeleteRange(id, []byte("foo2"), []byte("foo3")) + n, rev, err = s.TnxDeleteRange(id, []byte("foo2"), []byte("foo3")) if err != nil { t.Fatal(err) } - if n != 1 || index != 1 { - t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) + if n != 1 || rev != 1 { + t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 1) } // after removal foo2 - kvs, index, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) + kvs, rev, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) if err != nil { t.Fatal(err) } @@ -230,12 +230,12 @@ func TestOneTnx(t *testing.T) { } // After tnx - kvs, index := s.Range([]byte("foo"), []byte("foo3"), 0, 1) + kvs, rev := s.Range([]byte("foo"), []byte("foo3"), 0, 1) if len(kvs) != 0 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) } - if index != 1 { - t.Fatalf("index = %d, want %d", index, 1) + if rev != 1 { + t.Fatalf("rev = %d, want %d", rev, 1) } } From f47ed4a364e364ff64dd9af13ce370d1d959da8f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 31 May 2015 08:59:31 -0700 Subject: [PATCH 2/2] storage: initial compact --- storage/backend/batch_tx.go | 13 +++-- storage/key_index.go | 7 +-- storage/kv.go | 5 +- storage/kvstore.go | 87 ++++++++++++++++++++------------ storage/kvstore_compaction.go | 42 ++++++++++++++++ storage/kvstore_test.go | 93 ++++++++++++++++++++++++++++++++--- storage/reversion.go | 21 ++++++++ 7 files changed, 217 insertions(+), 51 deletions(-) create mode 100644 storage/kvstore_compaction.go create mode 100644 storage/reversion.go diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index ca8cc4532..2ad1d0c90 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -13,7 +13,7 @@ type BatchTx interface { Unlock() UnsafeCreateBucket(name []byte) UnsafePut(bucketName []byte, key []byte, value []byte) - UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) [][]byte + UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) UnsafeDelete(bucketName []byte, key []byte) Commit() } @@ -49,28 +49,27 @@ func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { } // before calling unsafeRange, the caller MUST hold the lock on tnx. -func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) [][]byte { +func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) { bucket := t.tx.Bucket(bucketName) if bucket == nil { log.Fatalf("storage: bucket %s does not exist", string(bucketName)) } - var vs [][]byte - if len(endKey) == 0 { if v := bucket.Get(key); v == nil { - return vs + return keys, vs } else { - return append(vs, v) + return append(keys, key), append(vs, v) } } c := bucket.Cursor() for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() { vs = append(vs, cv) + keys = append(keys, ck) } - return vs + return keys, vs } // before calling unsafeDelete, the caller MUST hold the lock on tnx. diff --git a/storage/key_index.go b/storage/key_index.go index 51a5df223..e0235bbfd 100644 --- a/storage/key_index.go +++ b/storage/key_index.go @@ -10,7 +10,7 @@ import ( ) var ( - ErrReversionNotFound = errors.New("stroage: Reversion not found") + ErrReversionNotFound = errors.New("stroage: reversion not found") ) // keyIndex stores the reversion of an key in the backend. @@ -200,11 +200,6 @@ type generation struct { revs []reversion } -type reversion struct { - main int64 - sub int64 -} - func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 } // walk walks through the reversions in the generation in ascending order. diff --git a/storage/kv.go b/storage/kv.go index 949a9a4a0..436e64465 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -8,7 +8,8 @@ type KV interface { // If `end` is nil, the request returns the key. // If `end` is not nil, it gets the keys in range [key, range_end). // Limit limits the number of keys returned. - Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64) + // If the required rev is compacted, ErrCompacted will be returned. + Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) // Put puts the given key,value into the store. // A put also increases the rev of the store, and generates one event in the event history. @@ -32,4 +33,6 @@ type KV interface { TnxRange(tnxID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) TnxPut(tnxID int64, key, value []byte) (rev int64, err error) TnxDeleteRange(tnxID int64, key, end []byte) (n, rev int64, err error) + + Compact(rev int64) error } diff --git a/storage/kvstore.go b/storage/kvstore.go index 3a2d183cb..6d93ebd87 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -1,7 +1,6 @@ package storage import ( - "encoding/binary" "errors" "log" "math/rand" @@ -17,7 +16,11 @@ var ( batchInterval = 100 * time.Millisecond keyBucketName = []byte("key") + scheduledCompactKeyName = []byte("scheduledCompactRev") + finishedCompactKeyName = []byte("finishedCompactRev") + ErrTnxIDMismatch = errors.New("storage: tnx id mismatch") + ErrCompacted = errors.New("storage: required reversion has been compacted") ) type store struct { @@ -27,6 +30,8 @@ type store struct { kvindex index currentRev reversion + // the main reversion of the last compaction + compactMainRev int64 tmu sync.Mutex // protect the tnxID field tnxID int64 // tracks the current tnxID to verify tnx operations @@ -34,9 +39,10 @@ type store struct { func newStore(path string) KV { s := &store{ - b: backend.New(path, batchInterval, batchLimit), - kvindex: newTreeIndex(), - currentRev: reversion{}, + b: backend.New(path, batchInterval, batchLimit), + kvindex: newTreeIndex(), + currentRev: reversion{}, + compactMainRev: -1, } tx := s.b.BatchTx() @@ -56,12 +62,12 @@ func (s *store) Put(key, value []byte) int64 { return int64(s.currentRev.main) } -func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64) { +func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { id := s.TnxBegin() - kvs, rev = s.rangeKeys(key, end, limit, rangeRev) + kvs, rev, err = s.rangeKeys(key, end, limit, rangeRev) s.TnxEnd(id) - return kvs, rev + return kvs, rev, err } func (s *store) DeleteRange(key, end []byte) (n, rev int64) { @@ -103,8 +109,7 @@ func (s *store) TnxRange(tnxID int64, key, end []byte, limit, rangeRev int64) (k if tnxID != s.tnxID { return nil, 0, ErrTnxIDMismatch } - kvs, rev = s.rangeKeys(key, end, limit, rangeRev) - return kvs, rev, nil + return s.rangeKeys(key, end, limit, rangeRev) } func (s *store) TnxPut(tnxID int64, key, value []byte) (rev int64, err error) { @@ -132,8 +137,31 @@ func (s *store) TnxDeleteRange(tnxID int64, key, end []byte) (n, rev int64, err return n, rev, nil } +func (s *store) Compact(rev int64) error { + s.mu.Lock() + defer s.mu.Unlock() + if rev <= s.compactMainRev { + return ErrCompacted + } + + s.compactMainRev = rev + + rbytes := make([]byte, 8+1+8) + revToBytes(reversion{main: rev}, rbytes) + + tx := s.b.BatchTx() + tx.Lock() + tx.UnsafePut(keyBucketName, scheduledCompactKeyName, rbytes) + tx.Unlock() + + keep := s.kvindex.Compact(rev) + + go s.scheduleCompaction(rev, keep) + return nil +} + // range is a keyword in Go, add Keys suffix. -func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64) { +func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { if rangeRev <= 0 { rev = int64(s.currentRev.main) if s.currentRev.sub > 0 { @@ -142,25 +170,28 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage } else { rev = rangeRev } - - _, revs := s.kvindex.Range(key, end, int64(rev)) - if len(revs) == 0 { - return nil, rev + if rev <= s.compactMainRev { + return nil, 0, ErrCompacted } - if limit > 0 && len(revs) > int(limit) { - revs = revs[:limit] + + _, revpairs := s.kvindex.Range(key, end, int64(rev)) + if len(revpairs) == 0 { + return nil, rev, nil + } + if limit > 0 && len(revpairs) > int(limit) { + revpairs = revpairs[:limit] } tx := s.b.BatchTx() tx.Lock() defer tx.Unlock() - for _, rev := range revs { + for _, revpair := range revpairs { revbytes := make([]byte, 8+1+8) - revToBytes(rev.main, rev.sub, revbytes) + revToBytes(revpair, revbytes) - vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) if len(vs) != 1 { - log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub) + log.Fatalf("storage: range cannot find rev (%d,%d)", revpair.main, revpair.sub) } e := &storagepb.Event{} @@ -171,12 +202,12 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage kvs = append(kvs, e.Kv) } } - return kvs, rev + return kvs, rev, nil } func (s *store) put(key, value []byte, rev int64) { ibytes := make([]byte, 8+1+8) - revToBytes(rev, s.currentRev.sub, ibytes) + revToBytes(reversion{main: rev, sub: s.currentRev.sub}, ibytes) event := storagepb.Event{ Type: storagepb.PUT, @@ -236,9 +267,9 @@ func (s *store) delete(key []byte, mainrev int64) bool { defer tx.Unlock() revbytes := make([]byte, 8+1+8) - revToBytes(rev.main, rev.sub, revbytes) + revToBytes(rev, revbytes) - vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) if len(vs) != 1 { log.Fatalf("storage: delete cannot find rev (%d,%d)", rev.main, rev.sub) } @@ -252,7 +283,7 @@ func (s *store) delete(key []byte, mainrev int64) bool { } ibytes := make([]byte, 8+1+8) - revToBytes(mainrev, s.currentRev.sub, ibytes) + revToBytes(reversion{main: mainrev, sub: s.currentRev.sub}, ibytes) event := storagepb.Event{ Type: storagepb.DELETE, @@ -274,9 +305,3 @@ func (s *store) delete(key []byte, mainrev int64) bool { s.currentRev.sub += 1 return true } - -func revToBytes(main int64, sub int64, bytes []byte) { - binary.BigEndian.PutUint64(bytes, uint64(main)) - bytes[8] = '_' - binary.BigEndian.PutUint64(bytes[9:], uint64(sub)) -} diff --git a/storage/kvstore_compaction.go b/storage/kvstore_compaction.go new file mode 100644 index 000000000..ab4d9326f --- /dev/null +++ b/storage/kvstore_compaction.go @@ -0,0 +1,42 @@ +package storage + +import ( + "encoding/binary" + "time" +) + +func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]struct{}) { + end := make([]byte, 8) + binary.BigEndian.PutUint64(end, uint64(compactMainRev+1)) + + batchsize := int64(10000) + last := make([]byte, 8+1+8) + for { + var rev reversion + + tx := s.b.BatchTx() + tx.Lock() + + keys, _ := tx.UnsafeRange(keyBucketName, last, end, batchsize) + for _, key := range keys { + rev = bytesToRev(key) + if _, ok := keep[rev]; !ok { + tx.UnsafeDelete(keyBucketName, key) + } + } + + if len(keys) == 0 { + rbytes := make([]byte, 8+1+8) + revToBytes(reversion{main: compactMainRev}, rbytes) + tx.UnsafePut(keyBucketName, finishedCompactKeyName, rbytes) + tx.Unlock() + return + } + + // update last + revToBytes(reversion{main: rev.main, sub: rev.sub + 1}, last) + tx.Unlock() + + time.Sleep(100 * time.Millisecond) + } +} diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index d2ff0de23..d3ef5ab9c 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -1,6 +1,7 @@ package storage import ( + "bytes" "crypto/rand" "os" "testing" @@ -41,7 +42,10 @@ func TestRange(t *testing.T) { } for i, tt := range tests { - kvs, rev := s.Range(tt.key, tt.end, 0, tt.rev) + kvs, rev, err := s.Range(tt.key, tt.end, 0, tt.rev) + if err != nil { + t.Fatal(err) + } if len(kvs) != int(tt.wN) { t.Errorf("#%d: len(kvs) = %d, want %d", i, len(kvs), tt.wN) } @@ -110,13 +114,19 @@ func TestRangeInSequence(t *testing.T) { } // before removal foo - kvs, rev := s.Range([]byte("foo"), []byte("foo3"), 0, 3) + kvs, rev, err := s.Range([]byte("foo"), []byte("foo3"), 0, 3) + if err != nil { + t.Fatal(err) + } if len(kvs) != 3 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 3) } // after removal foo - kvs, rev = s.Range([]byte("foo"), []byte("foo3"), 0, 4) + kvs, rev, err = s.Range([]byte("foo"), []byte("foo3"), 0, 4) + if err != nil { + t.Fatal(err) + } if len(kvs) != 2 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2) } @@ -134,7 +144,10 @@ func TestRangeInSequence(t *testing.T) { } // after removal foo1 - kvs, rev = s.Range([]byte("foo"), []byte("foo3"), 0, 5) + kvs, rev, err = s.Range([]byte("foo"), []byte("foo3"), 0, 5) + if err != nil { + t.Fatal(err) + } if len(kvs) != 1 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1) } @@ -146,7 +159,10 @@ func TestRangeInSequence(t *testing.T) { } // after removal foo2 - kvs, rev = s.Range([]byte("foo"), []byte("foo3"), 0, 6) + kvs, rev, err = s.Range([]byte("foo"), []byte("foo3"), 0, 6) + if err != nil { + t.Fatal(err) + } if len(kvs) != 0 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) } @@ -230,7 +246,10 @@ func TestOneTnx(t *testing.T) { } // After tnx - kvs, rev := s.Range([]byte("foo"), []byte("foo3"), 0, 1) + kvs, rev, err := s.Range([]byte("foo"), []byte("foo3"), 0, 1) + if err != nil { + t.Fatal(err) + } if len(kvs) != 0 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) } @@ -239,6 +258,68 @@ func TestOneTnx(t *testing.T) { } } +func TestCompaction(t *testing.T) { + s := newStore("test") + defer os.Remove("test") + + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo1"), []byte("bar1")) + s.Put([]byte("foo2"), []byte("bar2")) + s.Put([]byte("foo"), []byte("bar11")) + s.Put([]byte("foo1"), []byte("bar12")) + s.Put([]byte("foo2"), []byte("bar13")) + s.Put([]byte("foo1"), []byte("bar14")) + s.DeleteRange([]byte("foo"), []byte("foo200")) + s.Put([]byte("foo4"), []byte("bar4")) + + err := s.Compact(4) + if err != nil { + t.Errorf("unexpect compact error %v", err) + } + + err = s.Compact(4) + if err != ErrCompacted { + t.Errorf("err = %v, want %v", err, ErrCompacted) + } + + _, _, err = s.Range([]byte("foo"), nil, 0, 4) + if err != ErrCompacted { + t.Errorf("err = %v, want %v", err, ErrCompacted) + } + + // compact should not compact the last value of foo + kvs, rev, err := s.Range([]byte("foo"), nil, 0, 5) + if err != nil { + t.Errorf("unexpected range error %v", err) + } + if !bytes.Equal(kvs[0].Value, []byte("bar11")) { + t.Errorf("value = %s, want %s", string(kvs[0].Value), "bar11") + } + if rev != 5 { + t.Errorf("rev = %d, want %d", rev, 5) + } + + // compact everything + err = s.Compact(8) + if err != nil { + t.Errorf("unexpect compact error %v", err) + } + + kvs, rev, err = s.Range([]byte("foo"), []byte("fop"), 0, 0) + if err != nil { + t.Errorf("unexpected range error %v", err) + } + if len(kvs) != 1 { + t.Errorf("len(kvs) = %d, want %d", len(kvs), 1) + } + if !bytes.Equal(kvs[0].Value, []byte("bar4")) { + t.Errorf("value = %s, want %s", string(kvs[0].Value), "bar4") + } + if rev != 9 { + t.Errorf("rev = %d, want %d", rev, 9) + } +} + func BenchmarkStorePut(b *testing.B) { s := newStore("test") defer os.Remove("test") diff --git a/storage/reversion.go b/storage/reversion.go new file mode 100644 index 000000000..581c713d0 --- /dev/null +++ b/storage/reversion.go @@ -0,0 +1,21 @@ +package storage + +import "encoding/binary" + +type reversion struct { + main int64 + sub int64 +} + +func revToBytes(rev reversion, bytes []byte) { + binary.BigEndian.PutUint64(bytes, uint64(rev.main)) + bytes[8] = '_' + binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub)) +} + +func bytesToRev(bytes []byte) reversion { + return reversion{ + main: int64(binary.BigEndian.Uint64(bytes[0:8])), + sub: int64(binary.BigEndian.Uint64(bytes[9:])), + } +}