mvcc: add and implement Keep api to index

Keep finds all revisions to be kept for a Compaction at the given rev.
release-3.3
fanmin shi 2017-07-28 15:07:11 -07:00
parent ca586147bd
commit 7b8fb3cf0a
2 changed files with 57 additions and 18 deletions

View File

@ -28,6 +28,7 @@ type index interface {
Tombstone(key []byte, rev revision) error
RangeSince(key, end []byte, rev int64) []revision
Compact(rev int64) map[revision]struct{}
Keep(rev int64) map[revision]struct{}
Equal(b index) bool
Insert(ki *keyIndex)
@ -179,6 +180,19 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
return available
}
// Keep finds all revisions to be kept for a Compaction at the given rev.
func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
ti.RLock()
defer ti.RUnlock()
ti.tree.Ascend(func(i btree.Item) bool {
keyi := i.(*keyIndex)
keyi.keep(rev, available)
return true
})
return available
}
func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool {
return func(i btree.Item) bool {
keyi := i.(*keyIndex)

View File

@ -187,6 +187,42 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
}
genIdx, revIndex := ki.doCompact(atRev, available)
g := ki.generations[genIdx]
if !g.isEmpty() {
// remove the previous contents.
if revIndex != -1 {
g.revs = g.revs[revIndex:]
}
// remove any tombstone
if len(g.revs) == 1 && genIdx != len(ki.generations)-1 {
delete(available, g.revs[0])
genIdx++
}
}
// remove the previous generations.
ki.generations = ki.generations[genIdx:]
}
// keep finds the revision to be kept if compact is called at given atRev.
func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
if ki.isEmpty() {
return
}
genIdx, revIndex := ki.doCompact(atRev, available)
g := ki.generations[genIdx]
if !g.isEmpty() {
// remove any tombstone
if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 {
delete(available, g.revs[revIndex])
}
}
}
func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) {
// walk until reaching the first revision that has an revision smaller or equal to
// the atRev.
// add it to the available map
@ -198,30 +234,19 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
return true
}
i, g := 0, &ki.generations[0]
genIdx, g := 0, &ki.generations[0]
// find first generation includes atRev or created after atRev
for i < len(ki.generations)-1 {
for genIdx < len(ki.generations)-1 {
if tomb := g.revs[len(g.revs)-1].main; tomb > atRev {
break
}
i++
g = &ki.generations[i]
genIdx++
g = &ki.generations[genIdx]
}
if !g.isEmpty() {
n := g.walk(f)
// remove the previous contents.
if n != -1 {
g.revs = g.revs[n:]
}
// remove any tombstone
if len(g.revs) == 1 && i != len(ki.generations)-1 {
delete(available, g.revs[0])
i++
}
}
// remove the previous generations.
ki.generations = ki.generations[i:]
revIndex = g.walk(f)
return genIdx, revIndex
}
func (ki *keyIndex) isEmpty() bool {