diff --git a/storage/key_index.go b/storage/key_index.go new file mode 100644 index 000000000..a73eb3d44 --- /dev/null +++ b/storage/key_index.go @@ -0,0 +1,200 @@ +package storage + +import ( + "bytes" + "errors" + "log" + + "github.com/google/btree" +) + +var ( + ErrIndexNotFound = errors.New("index: not found") +) + +// keyIndex stores the index of an key in the backend. +// Each keyIndex has at least one key generation. +// Each generation might have several key versions. +// Tombstone on a key appends an tombstone version at the end +// of the current generation and creates a new empty generation. +// Each version of a key has an index pointing to the backend. +// +// For example: put(1);put(2);tombstone(3);put(4);tombstone(5) on key "foo" +// generate a keyIndex: +// key: "foo" +// index: 5 +// generations: +// {empty} +// {4, 5(t)} +// {1, 2, 3(t)} +// +// Compact a keyIndex removes the versions with smaller or equal to +// index except the largest one. If the generations becomes empty +// during compaction, it will be removed. if all the generations get +// removed, the keyIndex Should be removed. + +// For example: +// compact(2) on the previous example +// generations: +// {empty} +// {4, 5(t)} +// {2, 3(t)} +// +// compact(4) +// generations: +// {empty} +// {4, 5(t)} +// +// compact(5): +// generations: +// {empty} +// {5(t)} +// +// compact(6): +// generations: +// {empty} -> key SHOULD be removed. +type keyIndex struct { + key []byte + index uint64 + generations []generation +} + +// put puts an index to the keyIndex. +func (ki *keyIndex) put(index uint64) { + if index < ki.index { + log.Panicf("store.keyindex: put with unexpected smaller index [%d / %d]", index, ki.index) + } + if len(ki.generations) == 0 { + ki.generations = append(ki.generations, generation{}) + } + g := &ki.generations[len(ki.generations)-1] + g.cont = append(g.cont, index) + g.ver++ + ki.index = index +} + +// tombstone puts an index, pointing to a tombstone, to the keyIndex. +// It also creates a new empty generation in the keyIndex. +func (ki *keyIndex) tombstone(index uint64) { + if ki.isEmpty() { + log.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key)) + } + ki.put(index) + ki.generations = append(ki.generations, generation{}) +} + +// get gets the index of thk that satisfies the given atIndex. +// Index must be lower or equal to the given atIndex. +func (ki *keyIndex) get(atIndex uint64) (index uint64, err error) { + if ki.isEmpty() { + log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key)) + } + g := ki.findGeneration(atIndex) + if g.isEmpty() { + return 0, ErrIndexNotFound + } + + f := func(index, ver uint64) bool { + if index <= atIndex { + return false + } + return true + } + + _, n := g.walk(f) + if n != -1 { + return g.cont[n], nil + } + return 0, ErrIndexNotFound +} + +// compact compacts a keyIndex by removing the versions with smaller or equal +// index than the given atIndex except the largest one. +// If a generation becomes empty during compaction, it will be removed. +func (ki *keyIndex) compact(atIndex uint64, available map[uint64]struct{}) { + if ki.isEmpty() { + log.Panic("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key)) + } + // walk until reaching the first content that has an index smaller or equal to + // the atIndex. + // add all the reached indexes into available map. + f := func(index, _ uint64) bool { + available[index] = struct{}{} + if index <= atIndex { + return false + } + return true + } + + g := ki.findGeneration(atIndex) + i := len(ki.generations) - 1 + for i >= 0 { + wg := &ki.generations[i] + if wg == g { + break + } + wg.walk(f) + i-- + } + + _, n := g.walk(f) + + // remove the previous contents. + if n != -1 { + g.cont = g.cont[n:] + } + // remove the previous generations. + ki.generations = ki.generations[i:] + + return +} + +func (ki *keyIndex) isEmpty() bool { + return len(ki.generations) == 1 && ki.generations[0].isEmpty() +} + +// findGeneartion finds out the generation of the keyIndex that the +// given index belongs to. +func (ki *keyIndex) findGeneration(index uint64) *generation { + g, youngerg := len(ki.generations)-1, len(ki.generations)-2 + + // If the head index of a younger generation is smaller than + // the given index, the index cannot be in the younger + // generation. + for youngerg >= 0 && ki.generations[youngerg].cont != nil { + yg := ki.generations[youngerg] + if yg.cont[len(yg.cont)-1] < index { + break + } + g-- + youngerg-- + } + if g < 0 { + return nil + } + return &ki.generations[g] +} + +func (a *keyIndex) Less(b btree.Item) bool { + return bytes.Compare(a.key, b.(*keyIndex).key) == -1 +} + +type generation struct { + ver uint64 + cont []uint64 +} + +func (g *generation) isEmpty() bool { return len(g.cont) == 0 } + +func (g *generation) walk(f func(index, ver uint64) bool) (uint64, int) { + ver := g.ver + l := len(g.cont) + for i := range g.cont { + ok := f(g.cont[l-i-1], ver) + if !ok { + return ver, l - i - 1 + } + ver-- + } + return 0, -1 +} diff --git a/storage/key_index_test.go b/storage/key_index_test.go new file mode 100644 index 000000000..fe997d194 --- /dev/null +++ b/storage/key_index_test.go @@ -0,0 +1,364 @@ +package storage + +import ( + "reflect" + "testing" +) + +func TestKeyIndexGet(t *testing.T) { + // key: "foo" + // index: 12 + // generations: + // {empty} + // {8[1], 10[2], 12(t)[3]} + // {4[2], 6(t)[3]} + ki := newTestKeyIndex() + ki.compact(4, make(map[uint64]struct{})) + + tests := []struct { + index uint64 + + windex uint64 + werr error + }{ + // expected not exist on an index that is greater than the last tombstone + {13, 0, ErrIndexNotFound}, + {13, 0, ErrIndexNotFound}, + + // get on generation 2 + {12, 12, nil}, + {11, 10, nil}, + {10, 10, nil}, + {9, 8, nil}, + {8, 8, nil}, + {7, 0, ErrIndexNotFound}, + + // get on generation 1 + {6, 6, nil}, + {5, 4, nil}, + {4, 4, nil}, + } + + for i, tt := range tests { + index, err := ki.get(tt.index) + if err != tt.werr { + t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) + } + if index != tt.windex { + t.Errorf("#%d: index = %d, want %d", i, index, tt.index) + } + } +} + +func TestKeyIndexPut(t *testing.T) { + ki := &keyIndex{key: []byte("foo")} + ki.put(5) + + wki := &keyIndex{ + key: []byte("foo"), + index: 5, + generations: []generation{{ver: 1, cont: []uint64{5}}}, + } + if !reflect.DeepEqual(ki, wki) { + t.Errorf("ki = %+v, want %+v", ki, wki) + } + + ki.put(7) + + wki = &keyIndex{ + key: []byte("foo"), + index: 7, + generations: []generation{{ver: 2, cont: []uint64{5, 7}}}, + } + if !reflect.DeepEqual(ki, wki) { + t.Errorf("ki = %+v, want %+v", ki, wki) + } +} + +func TestKeyIndexTombstone(t *testing.T) { + ki := &keyIndex{key: []byte("foo")} + ki.put(5) + + ki.tombstone(7) + + wki := &keyIndex{ + key: []byte("foo"), + index: 7, + generations: []generation{{ver: 2, cont: []uint64{5, 7}}, {}}, + } + if !reflect.DeepEqual(ki, wki) { + t.Errorf("ki = %+v, want %+v", ki, wki) + } + + ki.put(8) + ki.put(9) + ki.tombstone(15) + + wki = &keyIndex{ + key: []byte("foo"), + index: 15, + generations: []generation{{ver: 2, cont: []uint64{5, 7}}, {ver: 3, cont: []uint64{8, 9, 15}}, {}}, + } + if !reflect.DeepEqual(ki, wki) { + t.Errorf("ki = %+v, want %+v", ki, wki) + } +} + +func TestKeyIndexCompact(t *testing.T) { + tests := []struct { + compact uint64 + + wki *keyIndex + wam map[uint64]struct{} + }{ + { + 1, + &keyIndex{ + key: []byte("foo"), + index: 12, + generations: []generation{ + {ver: 3, cont: []uint64{2, 4, 6}}, + {ver: 3, cont: []uint64{8, 10, 12}}, + {}, + }, + }, + map[uint64]struct{}{ + 2: struct{}{}, 4: struct{}{}, 6: struct{}{}, + 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, + }, + }, + { + 2, + &keyIndex{ + key: []byte("foo"), + index: 12, + generations: []generation{ + {ver: 3, cont: []uint64{2, 4, 6}}, + {ver: 3, cont: []uint64{8, 10, 12}}, + {}, + }, + }, + map[uint64]struct{}{ + 2: struct{}{}, 4: struct{}{}, 6: struct{}{}, + 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, + }, + }, + { + 3, + &keyIndex{ + key: []byte("foo"), + index: 12, + generations: []generation{ + {ver: 3, cont: []uint64{2, 4, 6}}, + {ver: 3, cont: []uint64{8, 10, 12}}, + {}, + }, + }, + map[uint64]struct{}{ + 2: struct{}{}, 4: struct{}{}, 6: struct{}{}, + 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, + }, + }, + { + 4, + &keyIndex{ + key: []byte("foo"), + index: 12, + generations: []generation{ + {ver: 3, cont: []uint64{4, 6}}, + {ver: 3, cont: []uint64{8, 10, 12}}, + {}, + }, + }, + map[uint64]struct{}{ + 4: struct{}{}, 6: struct{}{}, + 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, + }, + }, + { + 5, + &keyIndex{ + key: []byte("foo"), + index: 12, + generations: []generation{ + {ver: 3, cont: []uint64{4, 6}}, + {ver: 3, cont: []uint64{8, 10, 12}}, + {}, + }, + }, + map[uint64]struct{}{ + 4: struct{}{}, 6: struct{}{}, + 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, + }, + }, + { + 6, + &keyIndex{ + key: []byte("foo"), + index: 12, + generations: []generation{ + {ver: 3, cont: []uint64{6}}, + {ver: 3, cont: []uint64{8, 10, 12}}, + {}, + }, + }, + map[uint64]struct{}{ + 6: struct{}{}, + 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, + }, + }, + { + 7, + &keyIndex{ + key: []byte("foo"), + index: 12, + generations: []generation{ + {ver: 3, cont: []uint64{8, 10, 12}}, + {}, + }, + }, + map[uint64]struct{}{ + 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, + }, + }, + { + 8, + &keyIndex{ + key: []byte("foo"), + index: 12, + generations: []generation{ + {ver: 3, cont: []uint64{8, 10, 12}}, + {}, + }, + }, + map[uint64]struct{}{ + 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, + }, + }, + { + 9, + &keyIndex{ + key: []byte("foo"), + index: 12, + generations: []generation{ + {ver: 3, cont: []uint64{8, 10, 12}}, + {}, + }, + }, + map[uint64]struct{}{ + 8: struct{}{}, 10: struct{}{}, 12: struct{}{}, + }, + }, + { + 10, + &keyIndex{ + key: []byte("foo"), + index: 12, + generations: []generation{ + {ver: 3, cont: []uint64{10, 12}}, + {}, + }, + }, + map[uint64]struct{}{ + 10: struct{}{}, 12: struct{}{}, + }, + }, + { + 11, + &keyIndex{ + key: []byte("foo"), + index: 12, + generations: []generation{ + {ver: 3, cont: []uint64{10, 12}}, + {}, + }, + }, + map[uint64]struct{}{ + 10: struct{}{}, 12: struct{}{}, + }, + }, + { + 12, + &keyIndex{ + key: []byte("foo"), + index: 12, + generations: []generation{ + {ver: 3, cont: []uint64{12}}, + {}, + }, + }, + map[uint64]struct{}{ + 12: struct{}{}, + }, + }, + { + 13, + &keyIndex{ + key: []byte("foo"), + index: 12, + generations: []generation{ + {}, + }, + }, + map[uint64]struct{}{}, + }, + } + + // Continous Compaction + ki := newTestKeyIndex() + for i, tt := range tests { + am := make(map[uint64]struct{}) + ki.compact(tt.compact, am) + if !reflect.DeepEqual(ki, tt.wki) { + t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) + } + if !reflect.DeepEqual(am, tt.wam) { + t.Errorf("#%d: am = %+v, want %+v", am, tt.wam) + } + } + + // Jump Compaction + for i, tt := range tests { + if (i%2 == 0 && i < 6) && (i%2 == 1 && i > 6) { + am := make(map[uint64]struct{}) + ki.compact(tt.compact, am) + if !reflect.DeepEqual(ki, tt.wki) { + t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) + } + if !reflect.DeepEqual(am, tt.wam) { + t.Errorf("#%d: am = %+v, want %+v", am, tt.wam) + } + } + } + + // OnceCompaction + for i, tt := range tests { + ki := newTestKeyIndex() + am := make(map[uint64]struct{}) + ki.compact(tt.compact, am) + if !reflect.DeepEqual(ki, tt.wki) { + t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) + } + if !reflect.DeepEqual(am, tt.wam) { + t.Errorf("#%d: am = %+v, want %+v", am, tt.wam) + } + } +} + +func newTestKeyIndex() *keyIndex { + // key: "foo" + // index: 12 + // generations: + // {empty} + // {8[1], 10[2], 12(t)[3]} + // {2[1], 4[2], 6(t)[3]} + + ki := &keyIndex{key: []byte("foo")} + ki.put(2) + ki.put(4) + ki.tombstone(6) + ki.put(8) + ki.put(10) + ki.tombstone(12) + return ki +}