diff --git a/proxy/grpcproxy/kv.go b/proxy/grpcproxy/kv.go index c68f1b1c5..09b95458b 100644 --- a/proxy/grpcproxy/kv.go +++ b/proxy/grpcproxy/kv.go @@ -99,28 +99,13 @@ func (p *kvProxy) txnToCache(reqs []*pb.RequestOp, resps []*pb.ResponseOp) { } func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { - txn := p.kv.Txn(ctx) - cmps := make([]clientv3.Cmp, len(r.Compare)) - thenops := make([]clientv3.Op, len(r.Success)) - elseops := make([]clientv3.Op, len(r.Failure)) - - for i := range r.Compare { - cmps[i] = (clientv3.Cmp)(*r.Compare[i]) - } - - for i := range r.Success { - thenops[i] = requestOpToOp(r.Success[i]) - } - - for i := range r.Failure { - elseops[i] = requestOpToOp(r.Failure[i]) - } - - resp, err := txn.If(cmps...).Then(thenops...).Else(elseops...).Commit() - + op := TxnRequestToOp(r) + opResp, err := p.kv.Do(ctx, op) if err != nil { return nil, err } + resp := opResp.Txn() + // txn may claim an outdated key is updated; be safe and invalidate for _, cmp := range r.Compare { p.cache.Invalidate(cmp.Key, cmp.RangeEnd) @@ -167,6 +152,10 @@ func requestOpToOp(union *pb.RequestOp) clientv3.Op { if tv.RequestDeleteRange != nil { return DelRequestToOp(tv.RequestDeleteRange) } + case *pb.RequestOp_RequestTxn: + if tv.RequestTxn != nil { + return TxnRequestToOp(tv.RequestTxn) + } } panic("unknown request") } @@ -219,3 +208,19 @@ func DelRequestToOp(r *pb.DeleteRangeRequest) clientv3.Op { } return clientv3.OpDelete(string(r.Key), opts...) } + +func TxnRequestToOp(r *pb.TxnRequest) clientv3.Op { + cmps := make([]clientv3.Cmp, len(r.Compare)) + thenops := make([]clientv3.Op, len(r.Success)) + elseops := make([]clientv3.Op, len(r.Failure)) + for i := range r.Compare { + cmps[i] = (clientv3.Cmp)(*r.Compare[i]) + } + for i := range r.Success { + thenops[i] = requestOpToOp(r.Success[i]) + } + for i := range r.Failure { + elseops[i] = requestOpToOp(r.Failure[i]) + } + return clientv3.OpTxn(cmps, thenops, elseops) +}