etcdserver: add >= support for v3 delete range

release-2.3
Anthony Romano 2016-02-18 11:16:11 -08:00
parent 71288597da
commit 4fc89678b2
2 changed files with 60 additions and 22 deletions

View File

@ -229,35 +229,65 @@ func TestKVDeleteRange(t *testing.T) {
kv := clientv3.NewKV(clus.RandClient())
ctx := context.TODO()
keySet := []string{"a", "b", "c", "c", "c", "d", "e", "f"}
for i, key := range keySet {
if _, err := kv.Put(ctx, key, ""); err != nil {
t.Fatalf("#%d: couldn't put %q (%v)", i, key, err)
}
}
tests := []struct {
key, end string
delRev int64
key string
opts []clientv3.OpOption
wkeys []string
}{
{"a", "b", int64(len(keySet) + 2)}, // delete [a, b)
{"d", "f", int64(len(keySet) + 3)}, // delete [d, f)
// [a, c)
{
key: "a",
opts: []clientv3.OpOption{clientv3.WithRange("c")},
wkeys: []string{"c", "c/abc", "d"},
},
// >= c
{
key: "c",
opts: []clientv3.OpOption{clientv3.WithFromKey()},
wkeys: []string{"a", "b"},
},
// c*
{
key: "c",
opts: []clientv3.OpOption{clientv3.WithPrefix()},
wkeys: []string{"a", "b", "d"},
},
// *
{
key: "\x00",
opts: []clientv3.OpOption{clientv3.WithFromKey()},
wkeys: []string{},
},
}
for i, tt := range tests {
dresp, err := kv.Delete(ctx, tt.key, clientv3.WithRange(tt.end))
keySet := []string{"a", "b", "c", "c/abc", "d"}
for j, key := range keySet {
if _, err := kv.Put(ctx, key, ""); err != nil {
t.Fatalf("#%d: couldn't put %q (%v)", j, key, err)
}
}
_, err := kv.Delete(ctx, tt.key, tt.opts...)
if err != nil {
t.Fatalf("#%d: couldn't delete range (%v)", i, err)
}
if dresp.Header.Revision != tt.delRev {
t.Fatalf("#%d: dresp.Header.Revision got %d, want %d", i, dresp.Header.Revision, tt.delRev)
}
resp, err := kv.Get(ctx, tt.key, clientv3.WithRange(tt.end))
resp, err := kv.Get(ctx, "a", clientv3.WithFromKey())
if err != nil {
t.Fatalf("#%d: couldn't get key (%v)", i, err)
t.Fatalf("#%d: couldn't get keys (%v)", i, err)
}
if len(resp.Kvs) > 0 {
t.Fatalf("#%d: resp.Kvs expected none, but got %+v", i, resp.Kvs)
keys := []string{}
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
}
if !reflect.DeepEqual(tt.wkeys, keys) {
t.Errorf("#%d: resp.Kvs got %v, expected %v", i, keys, tt.wkeys)
}
}
}

View File

@ -319,9 +319,7 @@ func applyRange(txnID int64, kv dstorage.KV, r *pb.RangeRequest) (*pb.RangeRespo
err error
)
// grpc sends empty byte strings as nils, so use a '\0' to indicate
// wanting a >= query
if len(r.RangeEnd) == 1 && r.RangeEnd[0] == 0 {
if isGteRange(r.RangeEnd) {
r.RangeEnd = []byte{}
}
@ -390,6 +388,10 @@ func applyDeleteRange(txnID int64, kv dstorage.KV, dr *pb.DeleteRangeRequest) (*
err error
)
if isGteRange(dr.RangeEnd) {
dr.RangeEnd = []byte{}
}
if txnID != noTxn {
_, rev, err = kv.TxnDeleteRange(txnID, dr.Key, dr.RangeEnd)
if err != nil {
@ -631,3 +633,9 @@ func compareInt64(a, b int64) int {
return 0
}
}
// isGteRange determines if the range end is a >= range. This works around grpc
// sending empty byte strings as nil; >= is encoded in the range end as '\0'.
func isGteRange(rangeEnd []byte) bool {
return len(rangeEnd) == 1 && rangeEnd[0] == 0
}