diff --git a/storage/index_test.go b/storage/index_test.go index bfe18c24c..5acf51836 100644 --- a/storage/index_test.go +++ b/storage/index_test.go @@ -136,6 +136,61 @@ func TestIndexTombstone(t *testing.T) { } } +func TestIndexRangeEvents(t *testing.T) { + allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2"), []byte("foo2"), []byte("foo1"), []byte("foo")} + allRevs := []revision{{main: 1}, {main: 2}, {main: 3}, {main: 4}, {main: 5}, {main: 6}} + + index := newTreeIndex() + for i := range allKeys { + index.Put(allKeys[i], allRevs[i]) + } + + atRev := int64(1) + tests := []struct { + key, end []byte + wrevs []revision + }{ + // single key that not found + { + []byte("bar"), nil, nil, + }, + // single key that found + { + []byte("foo"), nil, []revision{{main: 1}, {main: 6}}, + }, + // range keys, return first member + { + []byte("foo"), []byte("foo1"), []revision{{main: 1}, {main: 6}}, + }, + // range keys, return first two members + { + []byte("foo"), []byte("foo2"), []revision{{main: 1}, {main: 2}, {main: 5}, {main: 6}}, + }, + // range keys, return all members + { + []byte("foo"), []byte("fop"), allRevs, + }, + // range keys, return last two members + { + []byte("foo1"), []byte("fop"), []revision{{main: 2}, {main: 3}, {main: 4}, {main: 5}}, + }, + // range keys, return last member + { + []byte("foo2"), []byte("fop"), []revision{{main: 3}, {main: 4}}, + }, + // range keys, return nothing + { + []byte("foo3"), []byte("fop"), nil, + }, + } + for i, tt := range tests { + revs := index.RangeEvents(tt.key, tt.end, atRev) + if !reflect.DeepEqual(revs, tt.wrevs) { + t.Errorf("#%d: revs = %+v, want %+v", i, revs, tt.wrevs) + } + } +} + func TestIndexCompact(t *testing.T) { maxRev := int64(20) tests := []struct { diff --git a/storage/key_index.go b/storage/key_index.go index 095aa9b96..2463a9810 100644 --- a/storage/key_index.go +++ b/storage/key_index.go @@ -157,7 +157,11 @@ func (ki *keyIndex) since(rev int64) []revision { var gi int // find the generations to start checking for gi = len(ki.generations) - 1; gi > 0; gi-- { - if since.GreaterThan(ki.generations[gi].created) { + g := ki.generations[gi] + if g.isEmpty() { + continue + } + if since.GreaterThan(g.created) { break } } @@ -173,6 +177,7 @@ func (ki *keyIndex) since(rev int64) []revision { // replace the revision with a new one that has higher sub value, // because the original one should not be seen by external revs[len(revs)-1] = r + continue } revs = append(revs, r) last = r.main diff --git a/storage/key_index_test.go b/storage/key_index_test.go index dfc43c7ff..01d903315 100644 --- a/storage/key_index_test.go +++ b/storage/key_index_test.go @@ -20,50 +20,104 @@ import ( ) func TestKeyIndexGet(t *testing.T) { - // key: "foo" - // rev: 12 + // key: "foo" + // rev: 16 // generations: // {empty} - // {8[1], 10[2], 12(t)[3]} - // {4[2], 6(t)[3]} + // {{14, 0}[1], {14, 1}[2], {16, 0}(t)[3]} + // {{8, 0}[1], {10, 0}[2], {12, 0}(t)[3]} + // {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]} ki := newTestKeyIndex() ki.compact(4, make(map[revision]struct{})) tests := []struct { rev int64 - wrev int64 - werr error + wmod revision + wcreat revision + wver int64 + werr error }{ - {13, 0, ErrRevisionNotFound}, - {12, 0, ErrRevisionNotFound}, + {17, revision{}, revision{}, 0, ErrRevisionNotFound}, + {16, revision{}, revision{}, 0, ErrRevisionNotFound}, + + // get on generation 3 + {15, revision{14, 1}, revision{14, 0}, 2, nil}, + {14, revision{14, 1}, revision{14, 0}, 2, nil}, + + {13, revision{}, revision{}, 0, ErrRevisionNotFound}, + {12, revision{}, revision{}, 0, ErrRevisionNotFound}, // get on generation 2 - {11, 10, nil}, - {10, 10, nil}, - {9, 8, nil}, - {8, 8, nil}, + {11, revision{10, 0}, revision{8, 0}, 2, nil}, + {10, revision{10, 0}, revision{8, 0}, 2, nil}, + {9, revision{8, 0}, revision{8, 0}, 1, nil}, + {8, revision{8, 0}, revision{8, 0}, 1, nil}, - {7, 0, ErrRevisionNotFound}, - {6, 0, ErrRevisionNotFound}, + {7, revision{}, revision{}, 0, ErrRevisionNotFound}, + {6, revision{}, revision{}, 0, ErrRevisionNotFound}, // get on generation 1 - {5, 4, nil}, - {4, 4, nil}, + {5, revision{4, 0}, revision{2, 0}, 2, nil}, + {4, revision{4, 0}, revision{2, 0}, 2, nil}, - {3, 0, ErrRevisionNotFound}, - {2, 0, ErrRevisionNotFound}, - {1, 0, ErrRevisionNotFound}, - {0, 0, ErrRevisionNotFound}, + {3, revision{}, revision{}, 0, ErrRevisionNotFound}, + {2, revision{}, revision{}, 0, ErrRevisionNotFound}, + {1, revision{}, revision{}, 0, ErrRevisionNotFound}, + {0, revision{}, revision{}, 0, ErrRevisionNotFound}, } for i, tt := range tests { - rev, _, _, err := ki.get(tt.rev) + mod, creat, ver, err := ki.get(tt.rev) if err != tt.werr { t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) } - if rev.main != tt.wrev { - t.Errorf("#%d: rev = %d, want %d", i, rev.main, tt.rev) + if mod != tt.wmod { + t.Errorf("#%d: modified = %+v, want %+v", i, mod, tt.wmod) + } + if creat != tt.wcreat { + t.Errorf("#%d: created = %+v, want %+v", i, creat, tt.wcreat) + } + if ver != tt.wver { + t.Errorf("#%d: version = %d, want %d", i, ver, tt.wver) + } + } +} + +func TestKeyIndexSince(t *testing.T) { + ki := newTestKeyIndex() + ki.compact(4, make(map[revision]struct{})) + + allRevs := []revision{{4, 0}, {6, 0}, {8, 0}, {10, 0}, {12, 0}, {14, 1}, {16, 0}} + tests := []struct { + rev int64 + + wrevs []revision + }{ + {17, nil}, + {16, allRevs[6:]}, + {15, allRevs[6:]}, + {14, allRevs[5:]}, + {13, allRevs[5:]}, + {12, allRevs[4:]}, + {11, allRevs[4:]}, + {10, allRevs[3:]}, + {9, allRevs[3:]}, + {8, allRevs[2:]}, + {7, allRevs[2:]}, + {6, allRevs[1:]}, + {5, allRevs[1:]}, + {4, allRevs}, + {3, allRevs}, + {2, allRevs}, + {1, allRevs}, + {0, allRevs}, + } + + for i, tt := range tests { + revs := ki.since(tt.rev) + if !reflect.DeepEqual(revs, tt.wrevs) { + t.Errorf("#%d: revs = %+v, want %+v", i, revs, tt.wrevs) } } } @@ -162,10 +216,11 @@ func TestKeyIndexCompact(t *testing.T) { 1, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}}, {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -175,10 +230,11 @@ func TestKeyIndexCompact(t *testing.T) { 2, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}}, {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -190,10 +246,11 @@ func TestKeyIndexCompact(t *testing.T) { 3, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}}, {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -205,10 +262,11 @@ func TestKeyIndexCompact(t *testing.T) { 4, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{2, 0}, ver: 3, revs: []revision{{main: 4}, {main: 6}}}, {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -220,10 +278,11 @@ func TestKeyIndexCompact(t *testing.T) { 5, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{2, 0}, ver: 3, revs: []revision{{main: 4}, {main: 6}}}, {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -235,9 +294,10 @@ func TestKeyIndexCompact(t *testing.T) { 6, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -247,9 +307,10 @@ func TestKeyIndexCompact(t *testing.T) { 7, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -259,9 +320,10 @@ func TestKeyIndexCompact(t *testing.T) { 8, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -273,9 +335,10 @@ func TestKeyIndexCompact(t *testing.T) { 9, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -287,9 +350,10 @@ func TestKeyIndexCompact(t *testing.T) { 10, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{8, 0}, ver: 3, revs: []revision{{main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -301,9 +365,10 @@ func TestKeyIndexCompact(t *testing.T) { 11, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{8, 0}, ver: 3, revs: []revision{{main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -314,9 +379,63 @@ func TestKeyIndexCompact(t *testing.T) { { 12, &keyIndex{ - key: []byte("foo"), - modified: revision{12, 0}, - generations: []generation{{}}, + key: []byte("foo"), + modified: revision{16, 0}, + generations: []generation{ + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {}, + }, + }, + map[revision]struct{}{}, + }, + { + 13, + &keyIndex{ + key: []byte("foo"), + modified: revision{16, 0}, + generations: []generation{ + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {}, + }, + }, + map[revision]struct{}{}, + }, + { + 14, + &keyIndex{ + key: []byte("foo"), + modified: revision{16, 0}, + generations: []generation{ + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14, sub: 1}, {main: 16}}}, + {}, + }, + }, + map[revision]struct{}{ + revision{main: 14, sub: 1}: {}, + }, + }, + { + 15, + &keyIndex{ + key: []byte("foo"), + modified: revision{16, 0}, + generations: []generation{ + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14, sub: 1}, {main: 16}}}, + {}, + }, + }, + map[revision]struct{}{ + revision{main: 14, sub: 1}: {}, + }, + }, + { + 16, + &keyIndex{ + key: []byte("foo"), + modified: revision{16, 0}, + generations: []generation{ + {}, + }, }, map[revision]struct{}{}, }, @@ -513,12 +632,13 @@ func TestGenerationWalk(t *testing.T) { } func newTestKeyIndex() *keyIndex { - // key: "foo" - // rev: 12 + // key: "foo" + // rev: 16 // generations: // {empty} - // {8[1], 10[2], 12(t)[3]} - // {2[1], 4[2], 6(t)[3]} + // {{14, 0}[1], {14, 1}[2], {16, 0}(t)[3]} + // {{8, 0}[1], {10, 0}[2], {12, 0}(t)[3]} + // {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]} ki := &keyIndex{key: []byte("foo")} ki.put(2, 0) @@ -527,5 +647,8 @@ func newTestKeyIndex() *keyIndex { ki.put(8, 0) ki.put(10, 0) ki.tombstone(12, 0) + ki.put(14, 0) + ki.put(14, 1) + ki.tombstone(16, 0) return ki } diff --git a/storage/kvstore.go b/storage/kvstore.go index 57b0dd6fd..d9afdec9c 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -193,20 +193,34 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err return n, rev, nil } -// RangeEvents gets the events from key to end at or after rangeRev. -// If rangeRev <=0, rangeEvents returns events from the beginning of the history. +// RangeEvents gets the events from key to end in [startRev, endRev). // If `end` is nil, the request only observes the events on key. // If `end` is not nil, it observes the events on key range [key, range_end). // Limit limits the number of events returned. -// If the required rev is compacted, ErrCompacted will be returned. +// If startRev <=0, rangeEvents returns events from the beginning of uncompacted history. +// If endRev <=0, it indicates there is no end revision. +// +// If the required start rev is compacted, ErrCompacted will be returned. +// If the required start rev has not happened, ErrFutureRev will be returned. +// +// RangeEvents returns events that satisfy the requirement (0 <= n <= limit). +// If events in the revision range have not all happened, it returns immeidately +// what is available. +// It also returns nextRev which indicates the start revision used for the following +// RangeEvents call. The nextRev could be smaller than the given endRev if the store +// has not progressed so far or it hits the event limit. +// // TODO: return byte slices instead of events to avoid meaningless encode and decode. func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs []storagepb.Event, nextRev int64, err error) { s.mu.Lock() defer s.mu.Unlock() - if startRev <= s.compactMainRev { + if startRev > 0 && startRev <= s.compactMainRev { return nil, 0, ErrCompacted } + if startRev > s.currentRev.main { + return nil, 0, ErrFutureRev + } revs := s.kvindex.RangeEvents(key, end, startRev) if len(revs) == 0 { @@ -218,7 +232,7 @@ func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs defer tx.Unlock() // fetch events from the backend using revisions for _, rev := range revs { - if rev.main >= endRev { + if endRev > 0 && rev.main >= endRev { return evs, rev.main, nil } revbytes := newRevBytes() diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 41ab9a0d3..7116fff06 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -258,6 +258,68 @@ func TestStoreDeleteRange(t *testing.T) { } } +func TestStoreRangeEvents(t *testing.T) { + ev := storagepb.Event{ + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 1, + ModRevision: 2, + Version: 1, + }, + } + evb, err := ev.Marshal() + if err != nil { + t.Fatal(err) + } + currev := revision{2, 0} + + tests := []struct { + idxr indexRangeEventsResp + r rangeResp + }{ + { + indexRangeEventsResp{[]revision{{2, 0}}}, + rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}}, + }, + { + indexRangeEventsResp{[]revision{{2, 0}, {3, 0}}}, + rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}}, + }, + } + for i, tt := range tests { + s, b, index := newFakeStore() + s.currentRev = currev + index.indexRangeEventsRespc <- tt.idxr + b.tx.rangeRespc <- tt.r + + evs, _, err := s.RangeEvents([]byte("foo"), []byte("goo"), 1, 1, 4) + if err != nil { + t.Errorf("#%d: err = %v, want nil", i, err) + } + if w := []storagepb.Event{ev}; !reflect.DeepEqual(evs, w) { + t.Errorf("#%d: evs = %+v, want %+v", i, evs, w) + } + + wact := []testutil.Action{ + {"rangeEvents", []interface{}{[]byte("foo"), []byte("goo"), int64(1)}}, + } + if g := index.Action(); !reflect.DeepEqual(g, wact) { + t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) + } + wact = []testutil.Action{ + {"range", []interface{}{keyBucketName, newTestBytes(tt.idxr.revs[0]), []byte(nil), int64(0)}}, + } + if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { + t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) + } + if s.currentRev != currev { + t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev) + } + } +} + func TestStoreCompact(t *testing.T) { s, b, index := newFakeStore() s.currentRev = revision{3, 0} @@ -346,6 +408,209 @@ func TestStoreRestore(t *testing.T) { } } +// tests end parameter works well +func TestStoreRangeEventsEnd(t *testing.T) { + s := newStore(tmpPath) + defer cleanup(s, tmpPath) + + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo1"), []byte("bar1")) + s.Put([]byte("foo2"), []byte("bar2")) + evs := []storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + }, + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1}, + }, + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1}, + }, + } + + tests := []struct { + key, end []byte + wevs []storagepb.Event + }{ + // get no keys + { + []byte("doo"), []byte("foo"), + nil, + }, + // get no keys when key == end + { + []byte("foo"), []byte("foo"), + nil, + }, + // get no keys when ranging single key + { + []byte("doo"), nil, + nil, + }, + // get all keys + { + []byte("foo"), []byte("foo3"), + evs, + }, + // get partial keys + { + []byte("foo"), []byte("foo1"), + evs[:1], + }, + // get single key + { + []byte("foo"), nil, + evs[:1], + }, + } + + for i, tt := range tests { + evs, rev, err := s.RangeEvents(tt.key, tt.end, 0, 1, 100) + if err != nil { + t.Fatal(err) + } + if rev != 4 { + t.Errorf("#%d: rev = %d, want %d", i, rev, 4) + } + if !reflect.DeepEqual(evs, tt.wevs) { + t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs) + } + } +} + +func TestStoreRangeEventsRev(t *testing.T) { + s := newStore(tmpPath) + defer cleanup(s, tmpPath) + + s.Put([]byte("foo"), []byte("bar")) + s.DeleteRange([]byte("foo"), nil) + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("unrelated"), []byte("unrelated")) + evs := []storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + }, + { + Type: storagepb.DELETE, + Kv: &storagepb.KeyValue{Key: []byte("foo")}, + }, + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, + }, + } + + tests := []struct { + start int64 + end int64 + wevs []storagepb.Event + wnext int64 + }{ + {1, 1, nil, 1}, + {1, 2, evs[:1], 2}, + {1, 3, evs[:2], 3}, + {1, 4, evs, 5}, + {1, 5, evs, 5}, + {1, 10, evs, 5}, + {3, 4, evs[2:], 5}, + {0, 10, evs, 5}, + {1, 0, evs, 5}, + {0, 0, evs, 5}, + } + + for i, tt := range tests { + evs, next, err := s.RangeEvents([]byte("foo"), nil, 0, tt.start, tt.end) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(evs, tt.wevs) { + t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs) + } + if next != tt.wnext { + t.Errorf("#%d: next = %d, want %d", i, next, tt.wnext) + } + } +} + +func TestStoreRangeEventsBad(t *testing.T) { + s := newStore(tmpPath) + defer cleanup(s, tmpPath) + + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo"), []byte("bar1")) + s.Put([]byte("foo"), []byte("bar2")) + if err := s.Compact(3); err != nil { + t.Fatalf("compact error (%v)", err) + } + + tests := []struct { + rev int64 + werr error + }{ + {1, ErrCompacted}, + {2, ErrCompacted}, + {3, ErrCompacted}, + {4, ErrFutureRev}, + {10, ErrFutureRev}, + } + for i, tt := range tests { + _, _, err := s.RangeEvents([]byte("foo"), nil, 0, tt.rev, 100) + if err != tt.werr { + t.Errorf("#%d: error = %v, want %v", i, err, tt.werr) + } + } +} + +func TestStoreRangeEventsLimit(t *testing.T) { + s := newStore(tmpPath) + defer cleanup(s, tmpPath) + + s.Put([]byte("foo"), []byte("bar")) + s.DeleteRange([]byte("foo"), nil) + s.Put([]byte("foo"), []byte("bar")) + evs := []storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + }, + { + Type: storagepb.DELETE, + Kv: &storagepb.KeyValue{Key: []byte("foo")}, + }, + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, + }, + } + + tests := []struct { + limit int64 + wevs []storagepb.Event + }{ + // no limit + {-1, evs}, + // no limit + {0, evs}, + {1, evs[:1]}, + {2, evs[:2]}, + {3, evs}, + {100, evs}, + } + for i, tt := range tests { + evs, _, err := s.RangeEvents([]byte("foo"), nil, tt.limit, 1, 100) + if err != nil { + t.Fatalf("#%d: range error (%v)", i, err) + } + if !reflect.DeepEqual(evs, tt.wevs) { + t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs) + } + } +} + func TestRestoreContinueUnfinishedCompaction(t *testing.T) { s0 := newStore(tmpPath) defer os.Remove(tmpPath) @@ -412,9 +677,10 @@ func newTestBytes(rev revision) []byte { func newFakeStore() (*store, *fakeBackend, *fakeIndex) { b := &fakeBackend{&fakeBatchTx{rangeRespc: make(chan rangeResp, 5)}} index := &fakeIndex{ - indexGetRespc: make(chan indexGetResp, 1), - indexRangeRespc: make(chan indexRangeResp, 1), - indexCompactRespc: make(chan map[revision]struct{}, 1), + indexGetRespc: make(chan indexGetResp, 1), + indexRangeRespc: make(chan indexRangeResp, 1), + indexRangeEventsRespc: make(chan indexRangeEventsResp, 1), + indexCompactRespc: make(chan map[revision]struct{}, 1), } return &store{ b: b, @@ -472,11 +738,16 @@ type indexRangeResp struct { revs []revision } +type indexRangeEventsResp struct { + revs []revision +} + type fakeIndex struct { testutil.Recorder - indexGetRespc chan indexGetResp - indexRangeRespc chan indexRangeResp - indexCompactRespc chan map[revision]struct{} + indexGetRespc chan indexGetResp + indexRangeRespc chan indexRangeResp + indexRangeEventsRespc chan indexRangeEventsResp + indexCompactRespc chan map[revision]struct{} } func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) { @@ -500,7 +771,9 @@ func (i *fakeIndex) Tombstone(key []byte, rev revision) error { return nil } func (i *fakeIndex) RangeEvents(key, end []byte, rev int64) []revision { - return nil + i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []interface{}{key, end, rev}}) + r := <-i.indexRangeEventsRespc + return r.revs } func (i *fakeIndex) Compact(rev int64) map[revision]struct{} { i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})