mvcc: push down RangeOptions.limit argv into index tree

release-3.5
tangcong 2020-06-08 14:09:33 +08:00
parent 49f91d629a
commit 26c930f27d
4 changed files with 30 additions and 15 deletions

View File

@ -25,8 +25,8 @@ import (
type index interface {
Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
Range(key, end []byte, atRev int64) ([][]byte, []revision)
Revisions(key, end []byte, atRev int64) []revision
CountRevisions(key, end []byte, atRev int64) int
Revisions(key, end []byte, atRev int64, limit int) []revision
CountRevisions(key, end []byte, atRev int64, limit int) int
Put(key []byte, rev revision)
Tombstone(key []byte, rev revision) error
RangeSince(key, end []byte, rev int64) []revision
@ -89,7 +89,7 @@ func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
return nil
}
func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) {
func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex) bool) {
keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}
ti.RLock()
@ -99,12 +99,14 @@ func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) {
if len(endi.key) > 0 && !item.Less(endi) {
return false
}
f(item.(*keyIndex))
if !f(item.(*keyIndex)) {
return false
}
return true
})
}
func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision) {
if end == nil {
rev, _, _, err := ti.Get(key, atRev)
if err != nil {
@ -112,15 +114,19 @@ func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
}
return []revision{rev}
}
ti.visit(key, end, func(ki *keyIndex) {
ti.visit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
revs = append(revs, rev)
if len(revs) == limit {
return false
}
}
return true
})
return revs
}
func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64, limit int) int {
if end == nil {
_, _, _, err := ti.Get(key, atRev)
if err != nil {
@ -129,10 +135,14 @@ func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
return 1
}
total := 0
ti.visit(key, end, func(ki *keyIndex) {
ti.visit(key, end, func(ki *keyIndex) bool {
if _, _, _, err := ki.get(ti.lg, atRev); err == nil {
total++
if total == limit {
return false
}
}
return true
})
return total
}
@ -145,11 +155,12 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
}
return [][]byte{key}, []revision{rev}
}
ti.visit(key, end, func(ki *keyIndex) {
ti.visit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
revs = append(revs, rev)
keys = append(keys, ki.key)
}
return true
})
return keys, revs
}

View File

@ -242,8 +242,12 @@ func testKVRangeLimit(t *testing.T, f rangeFunc) {
if r.Rev != wrev {
t.Errorf("#%d: rev = %d, want %d", i, r.Rev, wrev)
}
if r.Count != len(kvs) {
t.Errorf("#%d: count = %d, want %d", i, r.Count, len(kvs))
if tt.limit <= 0 || int(tt.limit) > len(kvs) {
if r.Count != len(kvs) {
t.Errorf("#%d: count = %d, want %d", i, r.Count, len(kvs))
}
} else if r.Count != int(tt.limit) {
t.Errorf("#%d: count = %d, want %d", i, r.Count, tt.limit)
}
}
}

View File

@ -936,12 +936,12 @@ type fakeIndex struct {
indexCompactRespc chan map[revision]struct{}
}
func (i *fakeIndex) Revisions(key, end []byte, atRev int64) []revision {
func (i *fakeIndex) Revisions(key, end []byte, atRev int64, limit int) []revision {
_, rev := i.Range(key, end, atRev)
return rev
}
func (i *fakeIndex) CountRevisions(key, end []byte, atRev int64) int {
func (i *fakeIndex) CountRevisions(key, end []byte, atRev int64, limit int) int {
_, rev := i.Range(key, end, atRev)
return len(rev)
}

View File

@ -126,11 +126,11 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
}
if ro.Count {
total := tr.s.kvindex.CountRevisions(key, end, rev)
total := tr.s.kvindex.CountRevisions(key, end, rev, int(ro.Limit))
tr.trace.Step("count revisions from in-memory index tree")
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
}
revpairs := tr.s.kvindex.Revisions(key, end, rev)
revpairs := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
tr.trace.Step("range keys from in-memory index tree")
if len(revpairs) == 0 {
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil