diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index d00cc6645..ae47aaeae 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -38,7 +38,7 @@ func (h *handler) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRespo resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Range: r}) if err != nil { - err = togRPCError(err) + return nil, togRPCError(err) } return resp.(*pb.RangeResponse), err @@ -51,7 +51,7 @@ func (h *handler) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, e resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Put: r}) if err != nil { - err = togRPCError(err) + return nil, togRPCError(err) } return resp.(*pb.PutResponse), err @@ -64,7 +64,7 @@ func (h *handler) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*p resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{DeleteRange: r}) if err != nil { - err = togRPCError(err) + return nil, togRPCError(err) } return resp.(*pb.DeleteRangeResponse), err @@ -77,7 +77,7 @@ func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, e resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Txn: r}) if err != nil { - err = togRPCError(err) + return nil, togRPCError(err) } return resp.(*pb.TxnResponse), err @@ -86,7 +86,7 @@ func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, e func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { resp, err := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Compaction: r}) if err != nil { - err = togRPCError(err) + return nil, togRPCError(err) } return resp.(*pb.CompactionResponse), nil diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index c90d4e5db..0a0bc415a 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -27,6 +27,11 @@ type V3DemoServer interface { V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) } +type applyResult struct { + resp proto.Message + err error +} + func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) { r.ID = s.reqIDGen.Next() @@ -40,8 +45,8 @@ func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (pr select { case x := <-ch: - resp := x.(proto.Message) - return resp, nil + result := x.(*applyResult) + return result.resp, result.err case <-ctx.Done(): s.w.Trigger(r.ID, nil) // GC wait return &pb.EmptyResponse{}, ctx.Err() @@ -51,54 +56,58 @@ func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) (pr } func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} { + ar := &applyResult{} + switch { case r.Range != nil: - return applyRange(s.kv, r.Range) + ar.resp, ar.err = applyRange(s.kv, r.Range) case r.Put != nil: - return applyPut(s.kv, r.Put) + ar.resp, ar.err = applyPut(s.kv, r.Put) case r.DeleteRange != nil: - return applyDeleteRange(s.kv, r.DeleteRange) + ar.resp, ar.err = applyDeleteRange(s.kv, r.DeleteRange) case r.Txn != nil: - return applyTxn(s.kv, r.Txn) + ar.resp, ar.err = applyTxn(s.kv, r.Txn) case r.Compaction != nil: - return applyCompaction(s.kv, r.Compaction) + ar.resp, ar.err = applyCompaction(s.kv, r.Compaction) default: panic("not implemented") } + + return ar } -func applyPut(kv dstorage.KV, p *pb.PutRequest) *pb.PutResponse { +func applyPut(kv dstorage.KV, p *pb.PutRequest) (*pb.PutResponse, error) { resp := &pb.PutResponse{} resp.Header = &pb.ResponseHeader{} rev := kv.Put(p.Key, p.Value) resp.Header.Revision = rev - return resp + return resp, nil } -func applyRange(kv dstorage.KV, r *pb.RangeRequest) *pb.RangeResponse { +func applyRange(kv dstorage.KV, r *pb.RangeRequest) (*pb.RangeResponse, error) { resp := &pb.RangeResponse{} resp.Header = &pb.ResponseHeader{} kvs, rev, err := kv.Range(r.Key, r.RangeEnd, r.Limit, 0) if err != nil { - panic("not handled error") + return nil, err } resp.Header.Revision = rev for i := range kvs { resp.Kvs = append(resp.Kvs, &kvs[i]) } - return resp + return resp, nil } -func applyDeleteRange(kv dstorage.KV, dr *pb.DeleteRangeRequest) *pb.DeleteRangeResponse { +func applyDeleteRange(kv dstorage.KV, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { resp := &pb.DeleteRangeResponse{} resp.Header = &pb.ResponseHeader{} _, rev := kv.DeleteRange(dr.Key, dr.RangeEnd) resp.Header.Revision = rev - return resp + return resp, nil } -func applyTxn(kv dstorage.KV, rt *pb.TxnRequest) *pb.TxnResponse { +func applyTxn(kv dstorage.KV, rt *pb.TxnRequest) (*pb.TxnResponse, error) { var revision int64 ok := true @@ -108,6 +117,8 @@ func applyTxn(kv dstorage.KV, rt *pb.TxnRequest) *pb.TxnResponse { } } + // TODO: check potential errors before actually applying anything + var reqs []*pb.RequestUnion if ok { reqs = rt.Success @@ -127,29 +138,41 @@ func applyTxn(kv dstorage.KV, rt *pb.TxnRequest) *pb.TxnResponse { txnResp.Header.Revision = revision txnResp.Responses = resps txnResp.Succeeded = ok - return txnResp + return txnResp, nil } -func applyCompaction(kv dstorage.KV, compaction *pb.CompactionRequest) *pb.CompactionResponse { +func applyCompaction(kv dstorage.KV, compaction *pb.CompactionRequest) (*pb.CompactionResponse, error) { resp := &pb.CompactionResponse{} resp.Header = &pb.ResponseHeader{} err := kv.Compact(compaction.Revision) if err != nil { - panic("handle error") + return nil, err } // get the current revision. which key to get is not important. _, resp.Header.Revision, _ = kv.Range([]byte("compaction"), nil, 1, 0) - return resp + return resp, err } func applyUnion(kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion { switch { case union.RequestRange != nil: - return &pb.ResponseUnion{ResponseRange: applyRange(kv, union.RequestRange)} + resp, err := applyRange(kv, union.RequestRange) + if err != nil { + panic("unexpected error during txn") + } + return &pb.ResponseUnion{ResponseRange: resp} case union.RequestPut != nil: - return &pb.ResponseUnion{ResponsePut: applyPut(kv, union.RequestPut)} + resp, err := applyPut(kv, union.RequestPut) + if err != nil { + panic("unexpected error during txn") + } + return &pb.ResponseUnion{ResponsePut: resp} case union.RequestDeleteRange != nil: - return &pb.ResponseUnion{ResponseDeleteRange: applyDeleteRange(kv, union.RequestDeleteRange)} + resp, err := applyDeleteRange(kv, union.RequestDeleteRange) + if err != nil { + panic("unexpected error during txn") + } + return &pb.ResponseUnion{ResponseDeleteRange: resp} default: // empty union return nil