etcdserver: update gRPC, proto interface

release-2.3
Gyu-Ho Lee 2016-01-26 17:41:19 -08:00
parent 1c4c45cc7a
commit ad15bdcb07
6 changed files with 1902 additions and 677 deletions

View File

@ -187,17 +187,24 @@ func checkTxnRequest(r *pb.TxnRequest) error {
func checkRequestUnion(u *pb.RequestUnion) error {
// TODO: ensure only one of the field is set.
switch {
case u.RequestRange != nil:
return checkRangeRequest(u.RequestRange)
case u.RequestPut != nil:
return checkPutRequest(u.RequestPut)
case u.RequestDeleteRange != nil:
return checkDeleteRequest(u.RequestDeleteRange)
switch uv := u.Request.(type) {
case *pb.RequestUnion_RequestRange:
if uv.RequestRange != nil {
return checkRangeRequest(uv.RequestRange)
}
case *pb.RequestUnion_RequestPut:
if uv.RequestPut != nil {
return checkPutRequest(uv.RequestPut)
}
case *pb.RequestUnion_RequestDeleteRange:
if uv.RequestDeleteRange != nil {
return checkDeleteRequest(uv.RequestDeleteRange)
}
default:
// empty union
return nil
}
return nil
}
func togRPCError(err error) error {

View File

@ -92,29 +92,33 @@ func (sws *serverWatchStream) recvLoop() error {
return err
}
switch {
case req.CreateRequest != nil:
creq := req.CreateRequest
var prefix bool
toWatch := creq.Key
if len(creq.Key) == 0 {
toWatch = creq.Prefix
prefix = true
}
id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision)
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: int64(id),
Created: true,
}
case req.CancelRequest != nil:
id := req.CancelRequest.WatchId
err := sws.watchStream.Cancel(storage.WatchID(id))
if err == nil {
switch uv := req.RequestUnion.(type) {
case *pb.WatchRequest_CreateRequest:
if uv.CreateRequest != nil {
creq := uv.CreateRequest
var prefix bool
toWatch := creq.Key
if len(creq.Key) == 0 {
toWatch = creq.Prefix
prefix = true
}
id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision)
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: id,
Canceled: true,
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: int64(id),
Created: true,
}
}
case *pb.WatchRequest_CancelRequest:
if uv.CancelRequest != nil {
id := uv.CancelRequest.WatchId
err := sws.watchStream.Cancel(storage.WatchID(id))
if err == nil {
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: id,
Canceled: true,
}
}
}
// TODO: do we need to return error back to client?

View File

@ -13,38 +13,76 @@
It has these top-level messages:
Request
Metadata
InternalRaftRequest
EmptyResponse
ResponseHeader
RangeRequest
RangeResponse
PutRequest
PutResponse
DeleteRangeRequest
DeleteRangeResponse
RequestUnion
ResponseUnion
Compare
TxnRequest
TxnResponse
CompactionRequest
CompactionResponse
WatchRequest
WatchCreateRequest
WatchCancelRequest
WatchResponse
LeaseCreateRequest
LeaseCreateResponse
LeaseRevokeRequest
LeaseRevokeResponse
LeaseKeepAliveRequest
LeaseKeepAliveResponse
Member
AddMemberRequest
AddMemberResponse
RemoveMemberRequest
RemoveMemberResponse
UpdateMemberRequest
UpdateMemberResponse
ListMemberRequest
ListMemberResponse
*/
package etcdserverpb
import proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
import (
"fmt"
proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
)
import math "math"
// discarding unused import gogoproto "github.com/coreos/etcd/Godeps/_workspace/src/gogoproto"
import io "io"
import fmt "fmt"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type Request struct {
ID uint64 `protobuf:"varint,1,opt" json:"ID"`
Method string `protobuf:"bytes,2,opt" json:"Method"`
Path string `protobuf:"bytes,3,opt" json:"Path"`
Val string `protobuf:"bytes,4,opt" json:"Val"`
Dir bool `protobuf:"varint,5,opt" json:"Dir"`
PrevValue string `protobuf:"bytes,6,opt" json:"PrevValue"`
PrevIndex uint64 `protobuf:"varint,7,opt" json:"PrevIndex"`
PrevExist *bool `protobuf:"varint,8,opt" json:"PrevExist,omitempty"`
Expiration int64 `protobuf:"varint,9,opt" json:"Expiration"`
Wait bool `protobuf:"varint,10,opt" json:"Wait"`
Since uint64 `protobuf:"varint,11,opt" json:"Since"`
Recursive bool `protobuf:"varint,12,opt" json:"Recursive"`
Sorted bool `protobuf:"varint,13,opt" json:"Sorted"`
Quorum bool `protobuf:"varint,14,opt" json:"Quorum"`
Time int64 `protobuf:"varint,15,opt" json:"Time"`
Stream bool `protobuf:"varint,16,opt" json:"Stream"`
ID uint64 `protobuf:"varint,1,opt,name=ID" json:"ID"`
Method string `protobuf:"bytes,2,opt,name=Method" json:"Method"`
Path string `protobuf:"bytes,3,opt,name=Path" json:"Path"`
Val string `protobuf:"bytes,4,opt,name=Val" json:"Val"`
Dir bool `protobuf:"varint,5,opt,name=Dir" json:"Dir"`
PrevValue string `protobuf:"bytes,6,opt,name=PrevValue" json:"PrevValue"`
PrevIndex uint64 `protobuf:"varint,7,opt,name=PrevIndex" json:"PrevIndex"`
PrevExist *bool `protobuf:"varint,8,opt,name=PrevExist" json:"PrevExist,omitempty"`
Expiration int64 `protobuf:"varint,9,opt,name=Expiration" json:"Expiration"`
Wait bool `protobuf:"varint,10,opt,name=Wait" json:"Wait"`
Since uint64 `protobuf:"varint,11,opt,name=Since" json:"Since"`
Recursive bool `protobuf:"varint,12,opt,name=Recursive" json:"Recursive"`
Sorted bool `protobuf:"varint,13,opt,name=Sorted" json:"Sorted"`
Quorum bool `protobuf:"varint,14,opt,name=Quorum" json:"Quorum"`
Time int64 `protobuf:"varint,15,opt,name=Time" json:"Time"`
Stream bool `protobuf:"varint,16,opt,name=Stream" json:"Stream"`
XXX_unrecognized []byte `json:"-"`
}
@ -53,8 +91,8 @@ func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {}
type Metadata struct {
NodeID uint64 `protobuf:"varint,1,opt" json:"NodeID"`
ClusterID uint64 `protobuf:"varint,2,opt" json:"ClusterID"`
NodeID uint64 `protobuf:"varint,1,opt,name=NodeID" json:"NodeID"`
ClusterID uint64 `protobuf:"varint,2,opt,name=ClusterID" json:"ClusterID"`
XXX_unrecognized []byte `json:"-"`
}
@ -62,6 +100,10 @@ func (m *Metadata) Reset() { *m = Metadata{} }
func (m *Metadata) String() string { return proto.CompactTextString(m) }
func (*Metadata) ProtoMessage() {}
func init() {
proto.RegisterType((*Request)(nil), "etcdserverpb.Request")
proto.RegisterType((*Metadata)(nil), "etcdserverpb.Metadata")
}
func (m *Request) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
@ -287,8 +329,12 @@ func (m *Request) Unmarshal(data []byte) error {
l := len(data)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -301,6 +347,12 @@ func (m *Request) Unmarshal(data []byte) error {
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Request: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Request: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
@ -308,6 +360,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
m.ID = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -324,6 +379,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -350,6 +408,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -376,6 +437,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -402,6 +466,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -419,6 +486,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -445,6 +515,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
m.PrevIndex = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -461,6 +534,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -479,6 +555,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
m.Expiration = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -495,6 +574,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -512,6 +594,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
m.Since = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -528,6 +613,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -545,6 +633,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -562,6 +653,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -579,6 +673,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
m.Time = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -595,6 +692,9 @@ func (m *Request) Unmarshal(data []byte) error {
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -607,15 +707,7 @@ func (m *Request) Unmarshal(data []byte) error {
}
m.Stream = bool(v != 0)
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
iNdEx -= sizeOfWire
iNdEx = preIndex
skippy, err := skipEtcdserver(data[iNdEx:])
if err != nil {
return err
@ -631,14 +723,21 @@ func (m *Request) Unmarshal(data []byte) error {
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Metadata) Unmarshal(data []byte) error {
l := len(data)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -651,6 +750,12 @@ func (m *Metadata) Unmarshal(data []byte) error {
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Metadata: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Metadata: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
@ -658,6 +763,9 @@ func (m *Metadata) Unmarshal(data []byte) error {
}
m.NodeID = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -674,6 +782,9 @@ func (m *Metadata) Unmarshal(data []byte) error {
}
m.ClusterID = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -685,15 +796,7 @@ func (m *Metadata) Unmarshal(data []byte) error {
}
}
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
iNdEx -= sizeOfWire
iNdEx = preIndex
skippy, err := skipEtcdserver(data[iNdEx:])
if err != nil {
return err
@ -709,6 +812,9 @@ func (m *Metadata) Unmarshal(data []byte) error {
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipEtcdserver(data []byte) (n int, err error) {
@ -717,6 +823,9 @@ func skipEtcdserver(data []byte) (n int, err error) {
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
@ -730,7 +839,10 @@ func skipEtcdserver(data []byte) (n int, err error) {
wireType := int(wire & 0x7)
switch wireType {
case 0:
for {
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
@ -746,6 +858,9 @@ func skipEtcdserver(data []byte) (n int, err error) {
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
@ -766,6 +881,9 @@ func skipEtcdserver(data []byte) (n int, err error) {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowEtcdserver
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
@ -801,4 +919,5 @@ func skipEtcdserver(data []byte) (n int, err error) {
var (
ErrInvalidLengthEtcdserver = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowEtcdserver = fmt.Errorf("proto: integer overflow")
)

View File

@ -4,20 +4,25 @@
package etcdserverpb
import proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
import (
"fmt"
// discarding unused import gogoproto "github.com/coreos/etcd/Godeps/_workspace/src/gogoproto"
proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
)
import math "math"
import io "io"
import fmt "fmt"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// An InternalRaftRequest is the union of all requests which can be
// sent via raft.
type InternalRaftRequest struct {
ID uint64 `protobuf:"varint,1,opt,proto3" json:"ID,omitempty"`
ID uint64 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"`
V2 *Request `protobuf:"bytes,2,opt,name=v2" json:"v2,omitempty"`
Range *RangeRequest `protobuf:"bytes,3,opt,name=range" json:"range,omitempty"`
Put *PutRequest `protobuf:"bytes,4,opt,name=put" json:"put,omitempty"`
@ -39,6 +44,10 @@ func (m *EmptyResponse) Reset() { *m = EmptyResponse{} }
func (m *EmptyResponse) String() string { return proto.CompactTextString(m) }
func (*EmptyResponse) ProtoMessage() {}
func init() {
proto.RegisterType((*InternalRaftRequest)(nil), "etcdserverpb.InternalRaftRequest")
proto.RegisterType((*EmptyResponse)(nil), "etcdserverpb.EmptyResponse")
}
func (m *InternalRaftRequest) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
@ -251,8 +260,12 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
l := len(data)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -265,6 +278,12 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: InternalRaftRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: InternalRaftRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
@ -272,6 +291,9 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
}
m.ID = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -288,6 +310,9 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -318,6 +343,9 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -348,6 +376,9 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -378,6 +409,9 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -408,6 +442,9 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -438,6 +475,9 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -468,6 +508,9 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -498,6 +541,9 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -523,15 +569,7 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
}
iNdEx = postIndex
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
iNdEx -= sizeOfWire
iNdEx = preIndex
skippy, err := skipRaftInternal(data[iNdEx:])
if err != nil {
return err
@ -546,14 +584,21 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error {
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *EmptyResponse) Unmarshal(data []byte) error {
l := len(data)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
@ -565,17 +610,16 @@ func (m *EmptyResponse) Unmarshal(data []byte) error {
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: EmptyResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: EmptyResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
iNdEx -= sizeOfWire
iNdEx = preIndex
skippy, err := skipRaftInternal(data[iNdEx:])
if err != nil {
return err
@ -590,6 +634,9 @@ func (m *EmptyResponse) Unmarshal(data []byte) error {
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipRaftInternal(data []byte) (n int, err error) {
@ -598,6 +645,9 @@ func skipRaftInternal(data []byte) (n int, err error) {
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
@ -611,7 +661,10 @@ func skipRaftInternal(data []byte) (n int, err error) {
wireType := int(wire & 0x7)
switch wireType {
case 0:
for {
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
@ -627,6 +680,9 @@ func skipRaftInternal(data []byte) (n int, err error) {
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
@ -647,6 +703,9 @@ func skipRaftInternal(data []byte) (n int, err error) {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowRaftInternal
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
@ -682,4 +741,5 @@ func skipRaftInternal(data []byte) (n int, err error) {
var (
ErrInvalidLengthRaftInternal = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowRaftInternal = fmt.Errorf("proto: integer overflow")
)

File diff suppressed because it is too large Load Diff

View File

@ -369,7 +369,11 @@ func applyDeleteRange(txnID int64, kv dstorage.KV, dr *pb.DeleteRangeRequest) (*
func checkRequestLeases(le lease.Lessor, reqs []*pb.RequestUnion) error {
for _, requ := range reqs {
preq := requ.RequestPut
tv, ok := requ.Request.(*pb.RequestUnion_RequestPut)
if !ok {
continue
}
preq := tv.RequestPut
if preq == nil || lease.LeaseID(preq.Lease) == lease.NoLease {
continue
}
@ -382,7 +386,11 @@ func checkRequestLeases(le lease.Lessor, reqs []*pb.RequestUnion) error {
func checkRequestRange(kv dstorage.KV, reqs []*pb.RequestUnion) error {
for _, requ := range reqs {
greq := requ.RequestRange
tv, ok := requ.Request.(*pb.RequestUnion_RequestRange)
if !ok {
continue
}
greq := tv.RequestRange
if greq == nil || greq.Revision == 0 {
continue
}
@ -461,29 +469,36 @@ func applyCompaction(kv dstorage.KV, compaction *pb.CompactionRequest) (*pb.Comp
}
func applyUnion(txnID int64, kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion {
switch {
case union.RequestRange != nil:
resp, err := applyRange(txnID, kv, union.RequestRange)
if err != nil {
panic("unexpected error during txn")
switch tv := union.Request.(type) {
case *pb.RequestUnion_RequestRange:
if tv.RequestRange != nil {
resp, err := applyRange(txnID, kv, tv.RequestRange)
if err != nil {
panic("unexpected error during txn")
}
return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponseRange{ResponseRange: resp}}
}
return &pb.ResponseUnion{ResponseRange: resp}
case union.RequestPut != nil:
resp, err := applyPut(txnID, kv, nil, union.RequestPut)
if err != nil {
panic("unexpected error during txn")
case *pb.RequestUnion_RequestPut:
if tv.RequestPut != nil {
resp, err := applyPut(txnID, kv, nil, tv.RequestPut)
if err != nil {
panic("unexpected error during txn")
}
return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponsePut{ResponsePut: resp}}
}
return &pb.ResponseUnion{ResponsePut: resp}
case union.RequestDeleteRange != nil:
resp, err := applyDeleteRange(txnID, kv, union.RequestDeleteRange)
if err != nil {
panic("unexpected error during txn")
case *pb.RequestUnion_RequestDeleteRange:
if tv.RequestDeleteRange != nil {
resp, err := applyDeleteRange(txnID, kv, tv.RequestDeleteRange)
if err != nil {
panic("unexpected error during txn")
}
return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponseDeleteRange{ResponseDeleteRange: resp}}
}
return &pb.ResponseUnion{ResponseDeleteRange: resp}
default:
// empty union
return nil
}
return nil
}
// applyCompare applies the compare request.
@ -515,13 +530,26 @@ func applyCompare(kv dstorage.KV, c *pb.Compare) (int64, bool) {
var result int
switch c.Target {
case pb.Compare_VALUE:
result = bytes.Compare(ckv.Value, c.Value)
tv, _ := c.TargetUnion.(*pb.Compare_Value)
if tv != nil {
result = bytes.Compare(ckv.Value, tv.Value)
}
case pb.Compare_CREATE:
result = compareInt64(ckv.CreateRevision, c.CreateRevision)
tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision)
if tv != nil {
result = compareInt64(ckv.CreateRevision, tv.CreateRevision)
}
case pb.Compare_MOD:
result = compareInt64(ckv.ModRevision, c.ModRevision)
tv, _ := c.TargetUnion.(*pb.Compare_ModRevision)
if tv != nil {
result = compareInt64(ckv.ModRevision, tv.ModRevision)
}
case pb.Compare_VERSION:
result = compareInt64(ckv.Version, c.Version)
tv, _ := c.TargetUnion.(*pb.Compare_Version)
if tv != nil {
result = compareInt64(ckv.Version, tv.Version)
}
}
switch c.Result {