From be80d11948a97450e6769912640099fad63acb1c Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 17 Sep 2015 18:26:17 -0700 Subject: [PATCH 1/5] storage: enhance test for keyIndex.Get and keyIndex.Compact It covers the case that one key is set multiple times in one main revision now. --- storage/key_index_test.go | 167 ++++++++++++++++++++++++++++---------- 1 file changed, 126 insertions(+), 41 deletions(-) diff --git a/storage/key_index_test.go b/storage/key_index_test.go index dfc43c7ff..0d6538204 100644 --- a/storage/key_index_test.go +++ b/storage/key_index_test.go @@ -20,50 +20,66 @@ import ( ) func TestKeyIndexGet(t *testing.T) { - // key: "foo" - // rev: 12 + // key: "foo" + // rev: 16 // generations: // {empty} - // {8[1], 10[2], 12(t)[3]} - // {4[2], 6(t)[3]} + // {{14, 0}[1], {14, 1}[2], {16, 0}(t)[3]} + // {{8, 0}[1], {10, 0}[2], {12, 0}(t)[3]} + // {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]} ki := newTestKeyIndex() ki.compact(4, make(map[revision]struct{})) tests := []struct { rev int64 - wrev int64 - werr error + wmod revision + wcreat revision + wver int64 + werr error }{ - {13, 0, ErrRevisionNotFound}, - {12, 0, ErrRevisionNotFound}, + {17, revision{}, revision{}, 0, ErrRevisionNotFound}, + {16, revision{}, revision{}, 0, ErrRevisionNotFound}, + + // get on generation 3 + {15, revision{14, 1}, revision{14, 0}, 2, nil}, + {14, revision{14, 1}, revision{14, 0}, 2, nil}, + + {13, revision{}, revision{}, 0, ErrRevisionNotFound}, + {12, revision{}, revision{}, 0, ErrRevisionNotFound}, // get on generation 2 - {11, 10, nil}, - {10, 10, nil}, - {9, 8, nil}, - {8, 8, nil}, + {11, revision{10, 0}, revision{8, 0}, 2, nil}, + {10, revision{10, 0}, revision{8, 0}, 2, nil}, + {9, revision{8, 0}, revision{8, 0}, 1, nil}, + {8, revision{8, 0}, revision{8, 0}, 1, nil}, - {7, 0, ErrRevisionNotFound}, - {6, 0, ErrRevisionNotFound}, + {7, revision{}, revision{}, 0, ErrRevisionNotFound}, + {6, revision{}, revision{}, 0, ErrRevisionNotFound}, // get on generation 1 - {5, 4, nil}, - {4, 4, nil}, + {5, revision{4, 0}, revision{2, 0}, 2, nil}, + {4, revision{4, 0}, revision{2, 0}, 2, nil}, - {3, 0, ErrRevisionNotFound}, - {2, 0, ErrRevisionNotFound}, - {1, 0, ErrRevisionNotFound}, - {0, 0, ErrRevisionNotFound}, + {3, revision{}, revision{}, 0, ErrRevisionNotFound}, + {2, revision{}, revision{}, 0, ErrRevisionNotFound}, + {1, revision{}, revision{}, 0, ErrRevisionNotFound}, + {0, revision{}, revision{}, 0, ErrRevisionNotFound}, } for i, tt := range tests { - rev, _, _, err := ki.get(tt.rev) + mod, creat, ver, err := ki.get(tt.rev) if err != tt.werr { t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) } - if rev.main != tt.wrev { - t.Errorf("#%d: rev = %d, want %d", i, rev.main, tt.rev) + if mod != tt.wmod { + t.Errorf("#%d: modified = %+v, want %+v", i, mod, tt.wmod) + } + if creat != tt.wcreat { + t.Errorf("#%d: created = %+v, want %+v", i, creat, tt.wcreat) + } + if ver != tt.wver { + t.Errorf("#%d: version = %d, want %d", i, ver, tt.wver) } } } @@ -162,10 +178,11 @@ func TestKeyIndexCompact(t *testing.T) { 1, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}}, {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -175,10 +192,11 @@ func TestKeyIndexCompact(t *testing.T) { 2, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}}, {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -190,10 +208,11 @@ func TestKeyIndexCompact(t *testing.T) { 3, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}}, {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -205,10 +224,11 @@ func TestKeyIndexCompact(t *testing.T) { 4, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{2, 0}, ver: 3, revs: []revision{{main: 4}, {main: 6}}}, {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -220,10 +240,11 @@ func TestKeyIndexCompact(t *testing.T) { 5, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{2, 0}, ver: 3, revs: []revision{{main: 4}, {main: 6}}}, {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -235,9 +256,10 @@ func TestKeyIndexCompact(t *testing.T) { 6, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -247,9 +269,10 @@ func TestKeyIndexCompact(t *testing.T) { 7, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -259,9 +282,10 @@ func TestKeyIndexCompact(t *testing.T) { 8, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -273,9 +297,10 @@ func TestKeyIndexCompact(t *testing.T) { 9, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -287,9 +312,10 @@ func TestKeyIndexCompact(t *testing.T) { 10, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{8, 0}, ver: 3, revs: []revision{{main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -301,9 +327,10 @@ func TestKeyIndexCompact(t *testing.T) { 11, &keyIndex{ key: []byte("foo"), - modified: revision{12, 0}, + modified: revision{16, 0}, generations: []generation{ {created: revision{8, 0}, ver: 3, revs: []revision{{main: 10}, {main: 12}}}, + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, {}, }, }, @@ -314,9 +341,63 @@ func TestKeyIndexCompact(t *testing.T) { { 12, &keyIndex{ - key: []byte("foo"), - modified: revision{12, 0}, - generations: []generation{{}}, + key: []byte("foo"), + modified: revision{16, 0}, + generations: []generation{ + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {}, + }, + }, + map[revision]struct{}{}, + }, + { + 13, + &keyIndex{ + key: []byte("foo"), + modified: revision{16, 0}, + generations: []generation{ + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {}, + }, + }, + map[revision]struct{}{}, + }, + { + 14, + &keyIndex{ + key: []byte("foo"), + modified: revision{16, 0}, + generations: []generation{ + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14, sub: 1}, {main: 16}}}, + {}, + }, + }, + map[revision]struct{}{ + revision{main: 14, sub: 1}: {}, + }, + }, + { + 15, + &keyIndex{ + key: []byte("foo"), + modified: revision{16, 0}, + generations: []generation{ + {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14, sub: 1}, {main: 16}}}, + {}, + }, + }, + map[revision]struct{}{ + revision{main: 14, sub: 1}: {}, + }, + }, + { + 16, + &keyIndex{ + key: []byte("foo"), + modified: revision{16, 0}, + generations: []generation{ + {}, + }, }, map[revision]struct{}{}, }, @@ -513,12 +594,13 @@ func TestGenerationWalk(t *testing.T) { } func newTestKeyIndex() *keyIndex { - // key: "foo" - // rev: 12 + // key: "foo" + // rev: 16 // generations: // {empty} - // {8[1], 10[2], 12(t)[3]} - // {2[1], 4[2], 6(t)[3]} + // {{14, 0}[1], {14, 1}[2], {16, 0}(t)[3]} + // {{8, 0}[1], {10, 0}[2], {12, 0}(t)[3]} + // {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]} ki := &keyIndex{key: []byte("foo")} ki.put(2, 0) @@ -527,5 +609,8 @@ func newTestKeyIndex() *keyIndex { ki.put(8, 0) ki.put(10, 0) ki.tombstone(12, 0) + ki.put(14, 0) + ki.put(14, 1) + ki.tombstone(16, 0) return ki } From 158d6e0e03ae119ae0d64d4c07f79dbc453ad6c0 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 17 Sep 2015 18:59:42 -0700 Subject: [PATCH 2/5] storage: fix calculating generation in keyIndex.since It should skip last empty generation when the key is just tombstoned. The rev15 and rev16 in the test fails if it doesn't skip last empty generation and find previous generations. --- storage/key_index.go | 6 +++++- storage/key_index_test.go | 23 +++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/storage/key_index.go b/storage/key_index.go index 095aa9b96..961ff5b3a 100644 --- a/storage/key_index.go +++ b/storage/key_index.go @@ -157,7 +157,11 @@ func (ki *keyIndex) since(rev int64) []revision { var gi int // find the generations to start checking for gi = len(ki.generations) - 1; gi > 0; gi-- { - if since.GreaterThan(ki.generations[gi].created) { + g := ki.generations[gi] + if g.isEmpty() { + continue + } + if since.GreaterThan(g.created) { break } } diff --git a/storage/key_index_test.go b/storage/key_index_test.go index 0d6538204..c611327a1 100644 --- a/storage/key_index_test.go +++ b/storage/key_index_test.go @@ -84,6 +84,29 @@ func TestKeyIndexGet(t *testing.T) { } } +func TestKeyIndexSince(t *testing.T) { + ki := newTestKeyIndex() + ki.compact(4, make(map[revision]struct{})) + + allRevs := []revision{{4, 0}, {6, 0}, {8, 0}, {10, 0}, {12, 0}, {14, 1}, {16, 0}} + tests := []struct { + rev int64 + + wrevs []revision + }{ + {17, nil}, + {16, allRevs[6:]}, + {15, allRevs[6:]}, + } + + for i, tt := range tests { + revs := ki.since(tt.rev) + if !reflect.DeepEqual(revs, tt.wrevs) { + t.Errorf("#%d: revs = %+v, want %+v", i, revs, tt.wrevs) + } + } +} + func TestKeyIndexPut(t *testing.T) { ki := &keyIndex{key: []byte("foo")} ki.put(5, 0) From 87b5143b153c0de1626243ec6145284eba2a4fcf Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 17 Sep 2015 19:03:31 -0700 Subject: [PATCH 3/5] storage: fix missing continue in keyIndex.since It should continue to skip following operations. The test from rev14 to rev0 fails if it doesn't call continue and append all revisions of the same main rev to the list. --- storage/key_index.go | 1 + storage/key_index_test.go | 15 +++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/storage/key_index.go b/storage/key_index.go index 961ff5b3a..2463a9810 100644 --- a/storage/key_index.go +++ b/storage/key_index.go @@ -177,6 +177,7 @@ func (ki *keyIndex) since(rev int64) []revision { // replace the revision with a new one that has higher sub value, // because the original one should not be seen by external revs[len(revs)-1] = r + continue } revs = append(revs, r) last = r.main diff --git a/storage/key_index_test.go b/storage/key_index_test.go index c611327a1..01d903315 100644 --- a/storage/key_index_test.go +++ b/storage/key_index_test.go @@ -97,6 +97,21 @@ func TestKeyIndexSince(t *testing.T) { {17, nil}, {16, allRevs[6:]}, {15, allRevs[6:]}, + {14, allRevs[5:]}, + {13, allRevs[5:]}, + {12, allRevs[4:]}, + {11, allRevs[4:]}, + {10, allRevs[3:]}, + {9, allRevs[3:]}, + {8, allRevs[2:]}, + {7, allRevs[2:]}, + {6, allRevs[1:]}, + {5, allRevs[1:]}, + {4, allRevs}, + {3, allRevs}, + {2, allRevs}, + {1, allRevs}, + {0, allRevs}, } for i, tt := range tests { From 5709b66dfb594f6160e072061dc80069a11b213b Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 17 Sep 2015 22:37:02 -0700 Subject: [PATCH 4/5] storage: add unit test for index.RangeEvents --- storage/index_test.go | 55 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/storage/index_test.go b/storage/index_test.go index bfe18c24c..5acf51836 100644 --- a/storage/index_test.go +++ b/storage/index_test.go @@ -136,6 +136,61 @@ func TestIndexTombstone(t *testing.T) { } } +func TestIndexRangeEvents(t *testing.T) { + allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2"), []byte("foo2"), []byte("foo1"), []byte("foo")} + allRevs := []revision{{main: 1}, {main: 2}, {main: 3}, {main: 4}, {main: 5}, {main: 6}} + + index := newTreeIndex() + for i := range allKeys { + index.Put(allKeys[i], allRevs[i]) + } + + atRev := int64(1) + tests := []struct { + key, end []byte + wrevs []revision + }{ + // single key that not found + { + []byte("bar"), nil, nil, + }, + // single key that found + { + []byte("foo"), nil, []revision{{main: 1}, {main: 6}}, + }, + // range keys, return first member + { + []byte("foo"), []byte("foo1"), []revision{{main: 1}, {main: 6}}, + }, + // range keys, return first two members + { + []byte("foo"), []byte("foo2"), []revision{{main: 1}, {main: 2}, {main: 5}, {main: 6}}, + }, + // range keys, return all members + { + []byte("foo"), []byte("fop"), allRevs, + }, + // range keys, return last two members + { + []byte("foo1"), []byte("fop"), []revision{{main: 2}, {main: 3}, {main: 4}, {main: 5}}, + }, + // range keys, return last member + { + []byte("foo2"), []byte("fop"), []revision{{main: 3}, {main: 4}}, + }, + // range keys, return nothing + { + []byte("foo3"), []byte("fop"), nil, + }, + } + for i, tt := range tests { + revs := index.RangeEvents(tt.key, tt.end, atRev) + if !reflect.DeepEqual(revs, tt.wrevs) { + t.Errorf("#%d: revs = %+v, want %+v", i, revs, tt.wrevs) + } + } +} + func TestIndexCompact(t *testing.T) { maxRev := int64(20) tests := []struct { From d72914c36fa170182187831df0b490d8b59bfa14 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 18 Sep 2015 10:46:09 -0700 Subject: [PATCH 5/5] storage: clarify comment for store.RangeEvents and fix related bugs Change to the function: 1. specify the meaning of startRev and endRev parameters 2. specify the meaning of returned nextRev Moreover, it adds unit tests for the function. --- storage/kvstore.go | 24 +++- storage/kvstore_test.go | 287 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 299 insertions(+), 12 deletions(-) diff --git a/storage/kvstore.go b/storage/kvstore.go index 57b0dd6fd..d9afdec9c 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -193,20 +193,34 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err return n, rev, nil } -// RangeEvents gets the events from key to end at or after rangeRev. -// If rangeRev <=0, rangeEvents returns events from the beginning of the history. +// RangeEvents gets the events from key to end in [startRev, endRev). // If `end` is nil, the request only observes the events on key. // If `end` is not nil, it observes the events on key range [key, range_end). // Limit limits the number of events returned. -// If the required rev is compacted, ErrCompacted will be returned. +// If startRev <=0, rangeEvents returns events from the beginning of uncompacted history. +// If endRev <=0, it indicates there is no end revision. +// +// If the required start rev is compacted, ErrCompacted will be returned. +// If the required start rev has not happened, ErrFutureRev will be returned. +// +// RangeEvents returns events that satisfy the requirement (0 <= n <= limit). +// If events in the revision range have not all happened, it returns immeidately +// what is available. +// It also returns nextRev which indicates the start revision used for the following +// RangeEvents call. The nextRev could be smaller than the given endRev if the store +// has not progressed so far or it hits the event limit. +// // TODO: return byte slices instead of events to avoid meaningless encode and decode. func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs []storagepb.Event, nextRev int64, err error) { s.mu.Lock() defer s.mu.Unlock() - if startRev <= s.compactMainRev { + if startRev > 0 && startRev <= s.compactMainRev { return nil, 0, ErrCompacted } + if startRev > s.currentRev.main { + return nil, 0, ErrFutureRev + } revs := s.kvindex.RangeEvents(key, end, startRev) if len(revs) == 0 { @@ -218,7 +232,7 @@ func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs defer tx.Unlock() // fetch events from the backend using revisions for _, rev := range revs { - if rev.main >= endRev { + if endRev > 0 && rev.main >= endRev { return evs, rev.main, nil } revbytes := newRevBytes() diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 41ab9a0d3..7116fff06 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -258,6 +258,68 @@ func TestStoreDeleteRange(t *testing.T) { } } +func TestStoreRangeEvents(t *testing.T) { + ev := storagepb.Event{ + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 1, + ModRevision: 2, + Version: 1, + }, + } + evb, err := ev.Marshal() + if err != nil { + t.Fatal(err) + } + currev := revision{2, 0} + + tests := []struct { + idxr indexRangeEventsResp + r rangeResp + }{ + { + indexRangeEventsResp{[]revision{{2, 0}}}, + rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}}, + }, + { + indexRangeEventsResp{[]revision{{2, 0}, {3, 0}}}, + rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}}, + }, + } + for i, tt := range tests { + s, b, index := newFakeStore() + s.currentRev = currev + index.indexRangeEventsRespc <- tt.idxr + b.tx.rangeRespc <- tt.r + + evs, _, err := s.RangeEvents([]byte("foo"), []byte("goo"), 1, 1, 4) + if err != nil { + t.Errorf("#%d: err = %v, want nil", i, err) + } + if w := []storagepb.Event{ev}; !reflect.DeepEqual(evs, w) { + t.Errorf("#%d: evs = %+v, want %+v", i, evs, w) + } + + wact := []testutil.Action{ + {"rangeEvents", []interface{}{[]byte("foo"), []byte("goo"), int64(1)}}, + } + if g := index.Action(); !reflect.DeepEqual(g, wact) { + t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) + } + wact = []testutil.Action{ + {"range", []interface{}{keyBucketName, newTestBytes(tt.idxr.revs[0]), []byte(nil), int64(0)}}, + } + if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { + t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) + } + if s.currentRev != currev { + t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev) + } + } +} + func TestStoreCompact(t *testing.T) { s, b, index := newFakeStore() s.currentRev = revision{3, 0} @@ -346,6 +408,209 @@ func TestStoreRestore(t *testing.T) { } } +// tests end parameter works well +func TestStoreRangeEventsEnd(t *testing.T) { + s := newStore(tmpPath) + defer cleanup(s, tmpPath) + + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo1"), []byte("bar1")) + s.Put([]byte("foo2"), []byte("bar2")) + evs := []storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + }, + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1}, + }, + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1}, + }, + } + + tests := []struct { + key, end []byte + wevs []storagepb.Event + }{ + // get no keys + { + []byte("doo"), []byte("foo"), + nil, + }, + // get no keys when key == end + { + []byte("foo"), []byte("foo"), + nil, + }, + // get no keys when ranging single key + { + []byte("doo"), nil, + nil, + }, + // get all keys + { + []byte("foo"), []byte("foo3"), + evs, + }, + // get partial keys + { + []byte("foo"), []byte("foo1"), + evs[:1], + }, + // get single key + { + []byte("foo"), nil, + evs[:1], + }, + } + + for i, tt := range tests { + evs, rev, err := s.RangeEvents(tt.key, tt.end, 0, 1, 100) + if err != nil { + t.Fatal(err) + } + if rev != 4 { + t.Errorf("#%d: rev = %d, want %d", i, rev, 4) + } + if !reflect.DeepEqual(evs, tt.wevs) { + t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs) + } + } +} + +func TestStoreRangeEventsRev(t *testing.T) { + s := newStore(tmpPath) + defer cleanup(s, tmpPath) + + s.Put([]byte("foo"), []byte("bar")) + s.DeleteRange([]byte("foo"), nil) + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("unrelated"), []byte("unrelated")) + evs := []storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + }, + { + Type: storagepb.DELETE, + Kv: &storagepb.KeyValue{Key: []byte("foo")}, + }, + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, + }, + } + + tests := []struct { + start int64 + end int64 + wevs []storagepb.Event + wnext int64 + }{ + {1, 1, nil, 1}, + {1, 2, evs[:1], 2}, + {1, 3, evs[:2], 3}, + {1, 4, evs, 5}, + {1, 5, evs, 5}, + {1, 10, evs, 5}, + {3, 4, evs[2:], 5}, + {0, 10, evs, 5}, + {1, 0, evs, 5}, + {0, 0, evs, 5}, + } + + for i, tt := range tests { + evs, next, err := s.RangeEvents([]byte("foo"), nil, 0, tt.start, tt.end) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(evs, tt.wevs) { + t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs) + } + if next != tt.wnext { + t.Errorf("#%d: next = %d, want %d", i, next, tt.wnext) + } + } +} + +func TestStoreRangeEventsBad(t *testing.T) { + s := newStore(tmpPath) + defer cleanup(s, tmpPath) + + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo"), []byte("bar1")) + s.Put([]byte("foo"), []byte("bar2")) + if err := s.Compact(3); err != nil { + t.Fatalf("compact error (%v)", err) + } + + tests := []struct { + rev int64 + werr error + }{ + {1, ErrCompacted}, + {2, ErrCompacted}, + {3, ErrCompacted}, + {4, ErrFutureRev}, + {10, ErrFutureRev}, + } + for i, tt := range tests { + _, _, err := s.RangeEvents([]byte("foo"), nil, 0, tt.rev, 100) + if err != tt.werr { + t.Errorf("#%d: error = %v, want %v", i, err, tt.werr) + } + } +} + +func TestStoreRangeEventsLimit(t *testing.T) { + s := newStore(tmpPath) + defer cleanup(s, tmpPath) + + s.Put([]byte("foo"), []byte("bar")) + s.DeleteRange([]byte("foo"), nil) + s.Put([]byte("foo"), []byte("bar")) + evs := []storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + }, + { + Type: storagepb.DELETE, + Kv: &storagepb.KeyValue{Key: []byte("foo")}, + }, + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, + }, + } + + tests := []struct { + limit int64 + wevs []storagepb.Event + }{ + // no limit + {-1, evs}, + // no limit + {0, evs}, + {1, evs[:1]}, + {2, evs[:2]}, + {3, evs}, + {100, evs}, + } + for i, tt := range tests { + evs, _, err := s.RangeEvents([]byte("foo"), nil, tt.limit, 1, 100) + if err != nil { + t.Fatalf("#%d: range error (%v)", i, err) + } + if !reflect.DeepEqual(evs, tt.wevs) { + t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs) + } + } +} + func TestRestoreContinueUnfinishedCompaction(t *testing.T) { s0 := newStore(tmpPath) defer os.Remove(tmpPath) @@ -412,9 +677,10 @@ func newTestBytes(rev revision) []byte { func newFakeStore() (*store, *fakeBackend, *fakeIndex) { b := &fakeBackend{&fakeBatchTx{rangeRespc: make(chan rangeResp, 5)}} index := &fakeIndex{ - indexGetRespc: make(chan indexGetResp, 1), - indexRangeRespc: make(chan indexRangeResp, 1), - indexCompactRespc: make(chan map[revision]struct{}, 1), + indexGetRespc: make(chan indexGetResp, 1), + indexRangeRespc: make(chan indexRangeResp, 1), + indexRangeEventsRespc: make(chan indexRangeEventsResp, 1), + indexCompactRespc: make(chan map[revision]struct{}, 1), } return &store{ b: b, @@ -472,11 +738,16 @@ type indexRangeResp struct { revs []revision } +type indexRangeEventsResp struct { + revs []revision +} + type fakeIndex struct { testutil.Recorder - indexGetRespc chan indexGetResp - indexRangeRespc chan indexRangeResp - indexCompactRespc chan map[revision]struct{} + indexGetRespc chan indexGetResp + indexRangeRespc chan indexRangeResp + indexRangeEventsRespc chan indexRangeEventsResp + indexCompactRespc chan map[revision]struct{} } func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) { @@ -500,7 +771,9 @@ func (i *fakeIndex) Tombstone(key []byte, rev revision) error { return nil } func (i *fakeIndex) RangeEvents(key, end []byte, rev int64) []revision { - return nil + i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []interface{}{key, end, rev}}) + r := <-i.indexRangeEventsRespc + return r.revs } func (i *fakeIndex) Compact(rev int64) map[revision]struct{} { i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})