Merge pull request #3524 from xiang90/grpc_error

etcdserver: use gRPC error instead of error message in header
release-2.3
Xiang Li 2015-09-14 16:38:44 -07:00
commit e0d8923f7b
4 changed files with 147 additions and 55 deletions

View File

@ -0,0 +1,27 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v3rpc
import (
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/codes"
"github.com/coreos/etcd/storage"
)
var (
ErrEmptyKey = grpc.Errorf(codes.InvalidArgument, "key is not provided")
ErrCompacted = grpc.Errorf(codes.OutOfRange, storage.ErrCompacted.Error())
ErrFutureRev = grpc.Errorf(codes.OutOfRange, storage.ErrFutureRev.Error())
)

View File

@ -16,8 +16,11 @@ package v3rpc
import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/codes"
"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/storage"
)
type handler struct {
@ -29,25 +32,127 @@ func New(s etcdserver.V3DemoServer) pb.EtcdServer {
}
func (h *handler) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
if err := checkRangeRequest(r); err != nil {
return nil, err
}
resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Range: r})
if err != nil {
err = togRPCError(err)
}
return resp.(*pb.RangeResponse), err
}
func (h *handler) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
if err := checkPutRequest(r); err != nil {
return nil, err
}
resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Put: r})
if err != nil {
err = togRPCError(err)
}
return resp.(*pb.PutResponse), err
}
func (h *handler) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
if err := checkDeleteRequest(r); err != nil {
return nil, err
}
resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{DeleteRange: r})
if err != nil {
err = togRPCError(err)
}
return resp.(*pb.DeleteRangeResponse), err
}
func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
if err := checkTxnRequest(r); err != nil {
return nil, err
}
resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Txn: r})
if err != nil {
err = togRPCError(err)
}
return resp.(*pb.TxnResponse), err
}
func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
panic("not implemented")
}
func checkRangeRequest(r *pb.RangeRequest) error {
if len(r.Key) == 0 {
return ErrEmptyKey
}
return nil
}
func checkPutRequest(r *pb.PutRequest) error {
if len(r.Key) == 0 {
return ErrEmptyKey
}
return nil
}
func checkDeleteRequest(r *pb.DeleteRangeRequest) error {
if len(r.Key) == 0 {
return ErrEmptyKey
}
return nil
}
func checkTxnRequest(r *pb.TxnRequest) error {
for _, c := range r.Compare {
if len(c.Key) == 0 {
return ErrEmptyKey
}
}
for _, u := range r.Success {
if err := checkRequestUnion(u); err != nil {
return err
}
}
for _, u := range r.Failure {
if err := checkRequestUnion(u); err != nil {
return err
}
}
return nil
}
func checkRequestUnion(u *pb.RequestUnion) error {
// TODO: ensure only one of the field is set.
switch {
case u.RequestRange != nil:
return checkRangeRequest(u.RequestRange)
case u.RequestPut != nil:
return checkPutRequest(u.RequestPut)
case u.RequestDeleteRange != nil:
return checkDeleteRequest(u.RequestDeleteRange)
default:
// empty union
return nil
}
}
func togRPCError(err error) error {
switch err {
case storage.ErrCompacted:
return ErrCompacted
case storage.ErrFutureRev:
return ErrFutureRev
// TODO: handle error from raft and timeout
default:
return grpc.Errorf(codes.Unknown, err.Error())
}
}

View File

@ -70,14 +70,12 @@ func (x Compare_CompareTarget) String() string {
}
type ResponseHeader struct {
// an error type message?
Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
ClusterId uint64 `protobuf:"varint,2,opt,name=cluster_id,proto3" json:"cluster_id,omitempty"`
MemberId uint64 `protobuf:"varint,3,opt,name=member_id,proto3" json:"member_id,omitempty"`
ClusterId uint64 `protobuf:"varint,1,opt,name=cluster_id,proto3" json:"cluster_id,omitempty"`
MemberId uint64 `protobuf:"varint,2,opt,name=member_id,proto3" json:"member_id,omitempty"`
// revision of the store when the request was applied.
Revision int64 `protobuf:"varint,4,opt,name=revision,proto3" json:"revision,omitempty"`
Revision int64 `protobuf:"varint,3,opt,name=revision,proto3" json:"revision,omitempty"`
// term of raft when the request was applied.
RaftTerm uint64 `protobuf:"varint,5,opt,name=raft_term,proto3" json:"raft_term,omitempty"`
RaftTerm uint64 `protobuf:"varint,4,opt,name=raft_term,proto3" json:"raft_term,omitempty"`
}
func (m *ResponseHeader) Reset() { *m = ResponseHeader{} }
@ -568,29 +566,23 @@ func (m *ResponseHeader) MarshalTo(data []byte) (int, error) {
_ = i
var l int
_ = l
if len(m.Error) > 0 {
data[i] = 0xa
i++
i = encodeVarintRpc(data, i, uint64(len(m.Error)))
i += copy(data[i:], m.Error)
}
if m.ClusterId != 0 {
data[i] = 0x10
data[i] = 0x8
i++
i = encodeVarintRpc(data, i, uint64(m.ClusterId))
}
if m.MemberId != 0 {
data[i] = 0x18
data[i] = 0x10
i++
i = encodeVarintRpc(data, i, uint64(m.MemberId))
}
if m.Revision != 0 {
data[i] = 0x20
data[i] = 0x18
i++
i = encodeVarintRpc(data, i, uint64(m.Revision))
}
if m.RaftTerm != 0 {
data[i] = 0x28
data[i] = 0x20
i++
i = encodeVarintRpc(data, i, uint64(m.RaftTerm))
}
@ -1155,10 +1147,6 @@ func encodeVarintRpc(data []byte, offset int, v uint64) int {
func (m *ResponseHeader) Size() (n int) {
var l int
_ = l
l = len(m.Error)
if l > 0 {
n += 1 + l + sovRpc(uint64(l))
}
if m.ClusterId != 0 {
n += 1 + sovRpc(uint64(m.ClusterId))
}
@ -1437,32 +1425,6 @@ func (m *ResponseHeader) Unmarshal(data []byte) error {
wireType := int(wire & 0x7)
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthRpc
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Error = string(data[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field ClusterId", wireType)
}
@ -1478,7 +1440,7 @@ func (m *ResponseHeader) Unmarshal(data []byte) error {
break
}
}
case 3:
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field MemberId", wireType)
}
@ -1494,7 +1456,7 @@ func (m *ResponseHeader) Unmarshal(data []byte) error {
break
}
}
case 4:
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Revision", wireType)
}
@ -1510,7 +1472,7 @@ func (m *ResponseHeader) Unmarshal(data []byte) error {
break
}
}
case 5:
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field RaftTerm", wireType)
}

View File

@ -33,14 +33,12 @@ service etcd {
}
message ResponseHeader {
// an error type message?
string error = 1;
uint64 cluster_id = 2;
uint64 member_id = 3;
uint64 cluster_id = 1;
uint64 member_id = 2;
// revision of the store when the request was applied.
int64 revision = 4;
int64 revision = 3;
// term of raft when the request was applied.
uint64 raft_term = 5;
uint64 raft_term = 4;
}
message RangeRequest {