grpcproxy: support nested txns

release-3.3
Anthony Romano 2017-06-14 18:23:54 -07:00
parent f400010028
commit f465e3ea8a
1 changed files with 24 additions and 19 deletions

View File

@ -99,28 +99,13 @@ func (p *kvProxy) txnToCache(reqs []*pb.RequestOp, resps []*pb.ResponseOp) {
}
func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
txn := p.kv.Txn(ctx)
cmps := make([]clientv3.Cmp, len(r.Compare))
thenops := make([]clientv3.Op, len(r.Success))
elseops := make([]clientv3.Op, len(r.Failure))
for i := range r.Compare {
cmps[i] = (clientv3.Cmp)(*r.Compare[i])
}
for i := range r.Success {
thenops[i] = requestOpToOp(r.Success[i])
}
for i := range r.Failure {
elseops[i] = requestOpToOp(r.Failure[i])
}
resp, err := txn.If(cmps...).Then(thenops...).Else(elseops...).Commit()
op := TxnRequestToOp(r)
opResp, err := p.kv.Do(ctx, op)
if err != nil {
return nil, err
}
resp := opResp.Txn()
// txn may claim an outdated key is updated; be safe and invalidate
for _, cmp := range r.Compare {
p.cache.Invalidate(cmp.Key, cmp.RangeEnd)
@ -167,6 +152,10 @@ func requestOpToOp(union *pb.RequestOp) clientv3.Op {
if tv.RequestDeleteRange != nil {
return DelRequestToOp(tv.RequestDeleteRange)
}
case *pb.RequestOp_RequestTxn:
if tv.RequestTxn != nil {
return TxnRequestToOp(tv.RequestTxn)
}
}
panic("unknown request")
}
@ -219,3 +208,19 @@ func DelRequestToOp(r *pb.DeleteRangeRequest) clientv3.Op {
}
return clientv3.OpDelete(string(r.Key), opts...)
}
func TxnRequestToOp(r *pb.TxnRequest) clientv3.Op {
cmps := make([]clientv3.Cmp, len(r.Compare))
thenops := make([]clientv3.Op, len(r.Success))
elseops := make([]clientv3.Op, len(r.Failure))
for i := range r.Compare {
cmps[i] = (clientv3.Cmp)(*r.Compare[i])
}
for i := range r.Success {
thenops[i] = requestOpToOp(r.Success[i])
}
for i := range r.Failure {
elseops[i] = requestOpToOp(r.Failure[i])
}
return clientv3.OpTxn(cmps, thenops, elseops)
}