storage: save created index and modified index
parent
97709b202d
commit
4581064060
|
@ -8,7 +8,7 @@ import (
|
|||
)
|
||||
|
||||
type index interface {
|
||||
Get(key []byte, atRev int64) (rev reversion, err error)
|
||||
Get(key []byte, atRev int64) (rev, created reversion, err error)
|
||||
Range(key, end []byte, atRev int64) ([][]byte, []reversion)
|
||||
Put(key []byte, rev reversion)
|
||||
Tombstone(key []byte, rev reversion) error
|
||||
|
@ -42,14 +42,14 @@ func (ti *treeIndex) Put(key []byte, rev reversion) {
|
|||
okeyi.put(rev.main, rev.sub)
|
||||
}
|
||||
|
||||
func (ti *treeIndex) Get(key []byte, atRev int64) (rev reversion, err error) {
|
||||
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created reversion, err error) {
|
||||
keyi := &keyIndex{key: key}
|
||||
|
||||
ti.RLock()
|
||||
defer ti.RUnlock()
|
||||
item := ti.tree.Get(keyi)
|
||||
if item == nil {
|
||||
return reversion{}, ErrReversionNotFound
|
||||
return reversion{}, reversion{}, ErrReversionNotFound
|
||||
}
|
||||
|
||||
keyi = item.(*keyIndex)
|
||||
|
@ -58,7 +58,7 @@ func (ti *treeIndex) Get(key []byte, atRev int64) (rev reversion, err error) {
|
|||
|
||||
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []reversion) {
|
||||
if end == nil {
|
||||
rev, err := ti.Get(key, atRev)
|
||||
rev, _, err := ti.Get(key, atRev)
|
||||
if err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -76,7 +76,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
|
|||
return false
|
||||
}
|
||||
curKeyi := item.(*keyIndex)
|
||||
rev, err := curKeyi.get(atRev)
|
||||
rev, _, err := curKeyi.get(atRev)
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -126,7 +126,7 @@ func TestContinuousCompact(t *testing.T) {
|
|||
|
||||
func verify(t *testing.T, index index, tests []T) {
|
||||
for i, tt := range tests {
|
||||
h, err := index.Get(tt.key, tt.rev)
|
||||
h, _, err := index.Get(tt.key, tt.rev)
|
||||
if err != tt.werr {
|
||||
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
|
||||
}
|
||||
|
|
|
@ -55,43 +55,48 @@ var (
|
|||
// {empty} -> key SHOULD be removed.
|
||||
type keyIndex struct {
|
||||
key []byte
|
||||
rev int64
|
||||
modified reversion // the main rev of the last modification
|
||||
generations []generation
|
||||
}
|
||||
|
||||
// put puts a reversion to the keyIndex.
|
||||
func (ki *keyIndex) put(rev int64, subrev int64) {
|
||||
if rev < ki.rev {
|
||||
log.Panicf("store.keyindex: put with unexpected smaller reversion [%d / %d]", rev, ki.rev)
|
||||
func (ki *keyIndex) put(main int64, sub int64) {
|
||||
rev := reversion{main: main, sub: sub}
|
||||
|
||||
if !rev.GreaterThan(ki.modified) {
|
||||
log.Panicf("store.keyindex: put with unexpected smaller reversion [%v / %v]", rev, ki.modified)
|
||||
}
|
||||
if len(ki.generations) == 0 {
|
||||
ki.generations = append(ki.generations, generation{})
|
||||
}
|
||||
g := &ki.generations[len(ki.generations)-1]
|
||||
g.revs = append(g.revs, reversion{rev, subrev})
|
||||
if len(g.revs) == 0 {
|
||||
g.created = rev
|
||||
}
|
||||
g.revs = append(g.revs, rev)
|
||||
g.ver++
|
||||
ki.rev = rev
|
||||
ki.modified = rev
|
||||
}
|
||||
|
||||
// tombstone puts a reversion, pointing to a tombstone, to the keyIndex.
|
||||
// It also creates a new empty generation in the keyIndex.
|
||||
func (ki *keyIndex) tombstone(rev int64, subrev int64) {
|
||||
func (ki *keyIndex) tombstone(main int64, sub int64) {
|
||||
if ki.isEmpty() {
|
||||
log.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
|
||||
}
|
||||
ki.put(rev, subrev)
|
||||
ki.put(main, sub)
|
||||
ki.generations = append(ki.generations, generation{})
|
||||
}
|
||||
|
||||
// get gets the reversion of the key that satisfies the given atRev.
|
||||
// get gets the modified and created reversion of the key that satisfies the given atRev.
|
||||
// Rev must be higher than or equal to the given atRev.
|
||||
func (ki *keyIndex) get(atRev int64) (rev reversion, err error) {
|
||||
func (ki *keyIndex) get(atRev int64) (modified, created reversion, err error) {
|
||||
if ki.isEmpty() {
|
||||
log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
|
||||
}
|
||||
g := ki.findGeneration(atRev)
|
||||
if g.isEmpty() {
|
||||
return reversion{}, ErrReversionNotFound
|
||||
return reversion{}, reversion{}, ErrReversionNotFound
|
||||
}
|
||||
|
||||
f := func(rev reversion) bool {
|
||||
|
@ -103,10 +108,10 @@ func (ki *keyIndex) get(atRev int64) (rev reversion, err error) {
|
|||
|
||||
n := g.walk(f)
|
||||
if n != -1 {
|
||||
return g.revs[n], nil
|
||||
return g.revs[n], g.created, nil
|
||||
}
|
||||
|
||||
return reversion{}, ErrReversionNotFound
|
||||
return reversion{}, reversion{}, ErrReversionNotFound
|
||||
}
|
||||
|
||||
// compact compacts a keyIndex by removing the versions with smaller or equal
|
||||
|
@ -191,7 +196,7 @@ func (a *keyIndex) equal(b *keyIndex) bool {
|
|||
if !bytes.Equal(a.key, b.key) {
|
||||
return false
|
||||
}
|
||||
if a.rev != b.rev {
|
||||
if a.modified != b.modified {
|
||||
return false
|
||||
}
|
||||
if len(a.generations) != len(b.generations) {
|
||||
|
@ -215,8 +220,9 @@ func (ki *keyIndex) String() string {
|
|||
}
|
||||
|
||||
type generation struct {
|
||||
ver int64
|
||||
revs []reversion
|
||||
ver int64
|
||||
created reversion // when the generation is created (put in first reversion).
|
||||
revs []reversion
|
||||
}
|
||||
|
||||
func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 }
|
||||
|
@ -238,7 +244,7 @@ func (g *generation) walk(f func(rev reversion) bool) int {
|
|||
}
|
||||
|
||||
func (g *generation) String() string {
|
||||
return fmt.Sprintf("g: ver[%d], revs %#v\n", g.ver, g.revs)
|
||||
return fmt.Sprintf("g: created[%d] ver[%d], revs %#v\n", g.created, g.ver, g.revs)
|
||||
}
|
||||
|
||||
func (a generation) equal(b generation) bool {
|
||||
|
|
|
@ -39,7 +39,7 @@ func TestKeyIndexGet(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
rev, err := ki.get(tt.rev)
|
||||
rev, _, err := ki.get(tt.rev)
|
||||
if err != tt.werr {
|
||||
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
|
||||
}
|
||||
|
@ -55,8 +55,8 @@ func TestKeyIndexPut(t *testing.T) {
|
|||
|
||||
wki := &keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 5,
|
||||
generations: []generation{{ver: 1, revs: []reversion{{main: 5}}}},
|
||||
modified: reversion{5, 0},
|
||||
generations: []generation{{created: reversion{5, 0}, ver: 1, revs: []reversion{{main: 5}}}},
|
||||
}
|
||||
if !reflect.DeepEqual(ki, wki) {
|
||||
t.Errorf("ki = %+v, want %+v", ki, wki)
|
||||
|
@ -66,8 +66,8 @@ func TestKeyIndexPut(t *testing.T) {
|
|||
|
||||
wki = &keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 7,
|
||||
generations: []generation{{ver: 2, revs: []reversion{{main: 5}, {main: 7}}}},
|
||||
modified: reversion{7, 0},
|
||||
generations: []generation{{created: reversion{5, 0}, ver: 2, revs: []reversion{{main: 5}, {main: 7}}}},
|
||||
}
|
||||
if !reflect.DeepEqual(ki, wki) {
|
||||
t.Errorf("ki = %+v, want %+v", ki, wki)
|
||||
|
@ -82,8 +82,8 @@ func TestKeyIndexTombstone(t *testing.T) {
|
|||
|
||||
wki := &keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 7,
|
||||
generations: []generation{{ver: 2, revs: []reversion{{main: 5}, {main: 7}}}, {}},
|
||||
modified: reversion{7, 0},
|
||||
generations: []generation{{created: reversion{5, 0}, ver: 2, revs: []reversion{{main: 5}, {main: 7}}}, {}},
|
||||
}
|
||||
if !reflect.DeepEqual(ki, wki) {
|
||||
t.Errorf("ki = %+v, want %+v", ki, wki)
|
||||
|
@ -94,11 +94,11 @@ func TestKeyIndexTombstone(t *testing.T) {
|
|||
ki.tombstone(15, 0)
|
||||
|
||||
wki = &keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 15,
|
||||
key: []byte("foo"),
|
||||
modified: reversion{15, 0},
|
||||
generations: []generation{
|
||||
{ver: 2, revs: []reversion{{main: 5}, {main: 7}}},
|
||||
{ver: 3, revs: []reversion{{main: 8}, {main: 9}, {main: 15}}},
|
||||
{created: reversion{5, 0}, ver: 2, revs: []reversion{{main: 5}, {main: 7}}},
|
||||
{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 9}, {main: 15}}},
|
||||
{},
|
||||
},
|
||||
}
|
||||
|
@ -117,11 +117,11 @@ func TestKeyIndexCompact(t *testing.T) {
|
|||
{
|
||||
1,
|
||||
&keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 12,
|
||||
key: []byte("foo"),
|
||||
modified: reversion{12, 0},
|
||||
generations: []generation{
|
||||
{ver: 3, revs: []reversion{{main: 2}, {main: 4}, {main: 6}}},
|
||||
{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{created: reversion{2, 0}, ver: 3, revs: []reversion{{main: 2}, {main: 4}, {main: 6}}},
|
||||
{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{},
|
||||
},
|
||||
},
|
||||
|
@ -130,11 +130,11 @@ func TestKeyIndexCompact(t *testing.T) {
|
|||
{
|
||||
2,
|
||||
&keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 12,
|
||||
key: []byte("foo"),
|
||||
modified: reversion{12, 0},
|
||||
generations: []generation{
|
||||
{ver: 3, revs: []reversion{{main: 2}, {main: 4}, {main: 6}}},
|
||||
{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{created: reversion{2, 0}, ver: 3, revs: []reversion{{main: 2}, {main: 4}, {main: 6}}},
|
||||
{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{},
|
||||
},
|
||||
},
|
||||
|
@ -145,11 +145,11 @@ func TestKeyIndexCompact(t *testing.T) {
|
|||
{
|
||||
3,
|
||||
&keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 12,
|
||||
key: []byte("foo"),
|
||||
modified: reversion{12, 0},
|
||||
generations: []generation{
|
||||
{ver: 3, revs: []reversion{{main: 2}, {main: 4}, {main: 6}}},
|
||||
{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{created: reversion{2, 0}, ver: 3, revs: []reversion{{main: 2}, {main: 4}, {main: 6}}},
|
||||
{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{},
|
||||
},
|
||||
},
|
||||
|
@ -160,11 +160,11 @@ func TestKeyIndexCompact(t *testing.T) {
|
|||
{
|
||||
4,
|
||||
&keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 12,
|
||||
key: []byte("foo"),
|
||||
modified: reversion{12, 0},
|
||||
generations: []generation{
|
||||
{ver: 3, revs: []reversion{{main: 4}, {main: 6}}},
|
||||
{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{created: reversion{2, 0}, ver: 3, revs: []reversion{{main: 4}, {main: 6}}},
|
||||
{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{},
|
||||
},
|
||||
},
|
||||
|
@ -175,11 +175,11 @@ func TestKeyIndexCompact(t *testing.T) {
|
|||
{
|
||||
5,
|
||||
&keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 12,
|
||||
key: []byte("foo"),
|
||||
modified: reversion{12, 0},
|
||||
generations: []generation{
|
||||
{ver: 3, revs: []reversion{{main: 4}, {main: 6}}},
|
||||
{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{created: reversion{2, 0}, ver: 3, revs: []reversion{{main: 4}, {main: 6}}},
|
||||
{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{},
|
||||
},
|
||||
},
|
||||
|
@ -190,10 +190,10 @@ func TestKeyIndexCompact(t *testing.T) {
|
|||
{
|
||||
6,
|
||||
&keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 12,
|
||||
key: []byte("foo"),
|
||||
modified: reversion{12, 0},
|
||||
generations: []generation{
|
||||
{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{},
|
||||
},
|
||||
},
|
||||
|
@ -202,10 +202,10 @@ func TestKeyIndexCompact(t *testing.T) {
|
|||
{
|
||||
7,
|
||||
&keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 12,
|
||||
key: []byte("foo"),
|
||||
modified: reversion{12, 0},
|
||||
generations: []generation{
|
||||
{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{},
|
||||
},
|
||||
},
|
||||
|
@ -214,10 +214,10 @@ func TestKeyIndexCompact(t *testing.T) {
|
|||
{
|
||||
8,
|
||||
&keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 12,
|
||||
key: []byte("foo"),
|
||||
modified: reversion{12, 0},
|
||||
generations: []generation{
|
||||
{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{},
|
||||
},
|
||||
},
|
||||
|
@ -228,10 +228,10 @@ func TestKeyIndexCompact(t *testing.T) {
|
|||
{
|
||||
9,
|
||||
&keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 12,
|
||||
key: []byte("foo"),
|
||||
modified: reversion{12, 0},
|
||||
generations: []generation{
|
||||
{ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 8}, {main: 10}, {main: 12}}},
|
||||
{},
|
||||
},
|
||||
},
|
||||
|
@ -242,10 +242,10 @@ func TestKeyIndexCompact(t *testing.T) {
|
|||
{
|
||||
10,
|
||||
&keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 12,
|
||||
key: []byte("foo"),
|
||||
modified: reversion{12, 0},
|
||||
generations: []generation{
|
||||
{ver: 3, revs: []reversion{{main: 10}, {main: 12}}},
|
||||
{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 10}, {main: 12}}},
|
||||
{},
|
||||
},
|
||||
},
|
||||
|
@ -256,10 +256,10 @@ func TestKeyIndexCompact(t *testing.T) {
|
|||
{
|
||||
11,
|
||||
&keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 12,
|
||||
key: []byte("foo"),
|
||||
modified: reversion{12, 0},
|
||||
generations: []generation{
|
||||
{ver: 3, revs: []reversion{{main: 10}, {main: 12}}},
|
||||
{created: reversion{8, 0}, ver: 3, revs: []reversion{{main: 10}, {main: 12}}},
|
||||
{},
|
||||
},
|
||||
},
|
||||
|
@ -271,7 +271,7 @@ func TestKeyIndexCompact(t *testing.T) {
|
|||
12,
|
||||
&keyIndex{
|
||||
key: []byte("foo"),
|
||||
rev: 12,
|
||||
modified: reversion{12, 0},
|
||||
generations: []generation{{}},
|
||||
},
|
||||
map[reversion]struct{}{},
|
||||
|
|
|
@ -282,14 +282,24 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
|
|||
}
|
||||
|
||||
func (s *store) put(key, value []byte, rev int64) {
|
||||
c := rev
|
||||
|
||||
// if the key exists before, use its previous created
|
||||
_, created, err := s.kvindex.Get(key, rev)
|
||||
if err == nil {
|
||||
c = created.main
|
||||
}
|
||||
|
||||
ibytes := newRevBytes()
|
||||
revToBytes(reversion{main: rev, sub: s.currentRev.sub}, ibytes)
|
||||
|
||||
event := storagepb.Event{
|
||||
Type: storagepb.PUT,
|
||||
Kv: storagepb.KeyValue{
|
||||
Key: key,
|
||||
Value: value,
|
||||
Key: key,
|
||||
Value: value,
|
||||
CreateIndex: c,
|
||||
ModIndex: rev,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -332,7 +342,7 @@ func (s *store) delete(key []byte, mainrev int64) bool {
|
|||
if s.currentRev.sub > 0 {
|
||||
grev += 1
|
||||
}
|
||||
rev, err := s.kvindex.Get(key, grev)
|
||||
rev, _, err := s.kvindex.Get(key, grev)
|
||||
if err != nil {
|
||||
// key not exist
|
||||
return false
|
||||
|
|
|
@ -7,6 +7,16 @@ type reversion struct {
|
|||
sub int64
|
||||
}
|
||||
|
||||
func (a reversion) GreaterThan(b reversion) bool {
|
||||
if a.main > b.main {
|
||||
return true
|
||||
}
|
||||
if a.main < b.main {
|
||||
return false
|
||||
}
|
||||
return a.sub > b.sub
|
||||
}
|
||||
|
||||
func newRevBytes() []byte {
|
||||
return make([]byte, 8+1+8)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue