diff --git a/etcdserver/api/v3rpc/error.go b/etcdserver/api/v3rpc/error.go new file mode 100644 index 000000000..5ad2269c1 --- /dev/null +++ b/etcdserver/api/v3rpc/error.go @@ -0,0 +1,27 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v3rpc + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/codes" + "github.com/coreos/etcd/storage" +) + +var ( + ErrEmptyKey = grpc.Errorf(codes.InvalidArgument, "key is not provided") + ErrCompacted = grpc.Errorf(codes.OutOfRange, storage.ErrCompacted.Error()) + ErrFutureRev = grpc.Errorf(codes.OutOfRange, storage.ErrFutureRev.Error()) +) diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index 57f83a5ea..513d25db2 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -16,8 +16,11 @@ package v3rpc import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/codes" "github.com/coreos/etcd/etcdserver" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/storage" ) type handler struct { @@ -29,25 +32,127 @@ func New(s etcdserver.V3DemoServer) pb.EtcdServer { } func (h *handler) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { + if err := checkRangeRequest(r); err != nil { + return nil, err + } + resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Range: r}) + if err != nil { + err = togRPCError(err) + } + return resp.(*pb.RangeResponse), err } func (h *handler) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { + if err := checkPutRequest(r); err != nil { + return nil, err + } + resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Put: r}) + if err != nil { + err = togRPCError(err) + } + return resp.(*pb.PutResponse), err } func (h *handler) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { + if err := checkDeleteRequest(r); err != nil { + return nil, err + } + resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{DeleteRange: r}) + if err != nil { + err = togRPCError(err) + } + return resp.(*pb.DeleteRangeResponse), err } func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { + if err := checkTxnRequest(r); err != nil { + return nil, err + } + resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Txn: r}) + if err != nil { + err = togRPCError(err) + } + return resp.(*pb.TxnResponse), err } func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { panic("not implemented") } + +func checkRangeRequest(r *pb.RangeRequest) error { + if len(r.Key) == 0 { + return ErrEmptyKey + } + return nil +} + +func checkPutRequest(r *pb.PutRequest) error { + if len(r.Key) == 0 { + return ErrEmptyKey + } + return nil +} + +func checkDeleteRequest(r *pb.DeleteRangeRequest) error { + if len(r.Key) == 0 { + return ErrEmptyKey + } + return nil +} + +func checkTxnRequest(r *pb.TxnRequest) error { + for _, c := range r.Compare { + if len(c.Key) == 0 { + return ErrEmptyKey + } + } + + for _, u := range r.Success { + if err := checkRequestUnion(u); err != nil { + return err + } + } + + for _, u := range r.Failure { + if err := checkRequestUnion(u); err != nil { + return err + } + } + + return nil +} + +func checkRequestUnion(u *pb.RequestUnion) error { + // TODO: ensure only one of the field is set. + switch { + case u.RequestRange != nil: + return checkRangeRequest(u.RequestRange) + case u.RequestPut != nil: + return checkPutRequest(u.RequestPut) + case u.RequestDeleteRange != nil: + return checkDeleteRequest(u.RequestDeleteRange) + default: + // empty union + return nil + } +} + +func togRPCError(err error) error { + switch err { + case storage.ErrCompacted: + return ErrCompacted + case storage.ErrFutureRev: + return ErrFutureRev + // TODO: handle error from raft and timeout + default: + return grpc.Errorf(codes.Unknown, err.Error()) + } +} diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index dcdc6ee99..2055a2a97 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -70,14 +70,12 @@ func (x Compare_CompareTarget) String() string { } type ResponseHeader struct { - // an error type message? - Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` - ClusterId uint64 `protobuf:"varint,2,opt,name=cluster_id,proto3" json:"cluster_id,omitempty"` - MemberId uint64 `protobuf:"varint,3,opt,name=member_id,proto3" json:"member_id,omitempty"` + ClusterId uint64 `protobuf:"varint,1,opt,name=cluster_id,proto3" json:"cluster_id,omitempty"` + MemberId uint64 `protobuf:"varint,2,opt,name=member_id,proto3" json:"member_id,omitempty"` // revision of the store when the request was applied. - Revision int64 `protobuf:"varint,4,opt,name=revision,proto3" json:"revision,omitempty"` + Revision int64 `protobuf:"varint,3,opt,name=revision,proto3" json:"revision,omitempty"` // term of raft when the request was applied. - RaftTerm uint64 `protobuf:"varint,5,opt,name=raft_term,proto3" json:"raft_term,omitempty"` + RaftTerm uint64 `protobuf:"varint,4,opt,name=raft_term,proto3" json:"raft_term,omitempty"` } func (m *ResponseHeader) Reset() { *m = ResponseHeader{} } @@ -568,29 +566,23 @@ func (m *ResponseHeader) MarshalTo(data []byte) (int, error) { _ = i var l int _ = l - if len(m.Error) > 0 { - data[i] = 0xa - i++ - i = encodeVarintRpc(data, i, uint64(len(m.Error))) - i += copy(data[i:], m.Error) - } if m.ClusterId != 0 { - data[i] = 0x10 + data[i] = 0x8 i++ i = encodeVarintRpc(data, i, uint64(m.ClusterId)) } if m.MemberId != 0 { - data[i] = 0x18 + data[i] = 0x10 i++ i = encodeVarintRpc(data, i, uint64(m.MemberId)) } if m.Revision != 0 { - data[i] = 0x20 + data[i] = 0x18 i++ i = encodeVarintRpc(data, i, uint64(m.Revision)) } if m.RaftTerm != 0 { - data[i] = 0x28 + data[i] = 0x20 i++ i = encodeVarintRpc(data, i, uint64(m.RaftTerm)) } @@ -1155,10 +1147,6 @@ func encodeVarintRpc(data []byte, offset int, v uint64) int { func (m *ResponseHeader) Size() (n int) { var l int _ = l - l = len(m.Error) - if l > 0 { - n += 1 + l + sovRpc(uint64(l)) - } if m.ClusterId != 0 { n += 1 + sovRpc(uint64(m.ClusterId)) } @@ -1437,32 +1425,6 @@ func (m *ResponseHeader) Unmarshal(data []byte) error { wireType := int(wire & 0x7) switch fieldNum { case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthRpc - } - postIndex := iNdEx + intStringLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Error = string(data[iNdEx:postIndex]) - iNdEx = postIndex - case 2: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field ClusterId", wireType) } @@ -1478,7 +1440,7 @@ func (m *ResponseHeader) Unmarshal(data []byte) error { break } } - case 3: + case 2: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field MemberId", wireType) } @@ -1494,7 +1456,7 @@ func (m *ResponseHeader) Unmarshal(data []byte) error { break } } - case 4: + case 3: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Revision", wireType) } @@ -1510,7 +1472,7 @@ func (m *ResponseHeader) Unmarshal(data []byte) error { break } } - case 5: + case 4: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field RaftTerm", wireType) } diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index 566d3ab33..c91d1299d 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -33,14 +33,12 @@ service etcd { } message ResponseHeader { - // an error type message? - string error = 1; - uint64 cluster_id = 2; - uint64 member_id = 3; + uint64 cluster_id = 1; + uint64 member_id = 2; // revision of the store when the request was applied. - int64 revision = 4; + int64 revision = 3; // term of raft when the request was applied. - uint64 raft_term = 5; + uint64 raft_term = 4; } message RangeRequest {