Merge pull request #4886 from heyitsanthony/move-hash

v3rpc: move Hash RPC to Maintenance service
release-3.0
Anthony Romano 2016-03-28 19:35:03 -07:00
commit c53380cd2a
10 changed files with 158 additions and 139 deletions

View File

@ -0,0 +1,43 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v3rpc
import (
"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
type header struct {
clusterID int64
memberID int64
raftTimer etcdserver.RaftTimer
rev func() int64
}
func newHeader(s *etcdserver.EtcdServer) header {
return header{
clusterID: int64(s.Cluster().ID()),
memberID: int64(s.ID()),
raftTimer: s,
rev: func() int64 { return s.KV().Rev() },
}
}
// fill populates pb.ResponseHeader using etcdserver information
func (h *header) fill(rh *pb.ResponseHeader) {
rh.ClusterId = uint64(h.clusterID)
rh.MemberId = uint64(h.memberID)
rh.RaftTerm = h.raftTimer.Term()
}

View File

@ -38,20 +38,12 @@ var (
)
type kvServer struct {
clusterID int64
memberID int64
raftTimer etcdserver.RaftTimer
kv etcdserver.RaftKV
hdr header
kv etcdserver.RaftKV
}
func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
return &kvServer{
clusterID: int64(s.Cluster().ID()),
memberID: int64(s.ID()),
raftTimer: s,
kv: s,
}
return &kvServer{hdr: newHeader(s), kv: s}
}
func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
@ -67,7 +59,7 @@ func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResp
if resp.Header == nil {
plog.Panic("unexpected nil resp.Header")
}
s.fillInHeader(resp.Header)
s.hdr.fill(resp.Header)
return resp, err
}
@ -84,7 +76,7 @@ func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse,
if resp.Header == nil {
plog.Panic("unexpected nil resp.Header")
}
s.fillInHeader(resp.Header)
s.hdr.fill(resp.Header)
return resp, err
}
@ -101,7 +93,7 @@ func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*
if resp.Header == nil {
plog.Panic("unexpected nil resp.Header")
}
s.fillInHeader(resp.Header)
s.hdr.fill(resp.Header)
return resp, err
}
@ -118,7 +110,7 @@ func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse,
if resp.Header == nil {
plog.Panic("unexpected nil resp.Header")
}
s.fillInHeader(resp.Header)
s.hdr.fill(resp.Header)
return resp, err
}
@ -131,26 +123,10 @@ func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.Co
if resp.Header == nil {
plog.Panic("unexpected nil resp.Header")
}
s.fillInHeader(resp.Header)
s.hdr.fill(resp.Header)
return resp, nil
}
func (s *kvServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
resp, err := s.kv.Hash(ctx, r)
if err != nil {
return nil, togRPCError(err)
}
s.fillInHeader(resp.Header)
return resp, nil
}
// fillInHeader populates pb.ResponseHeader from kvServer, except Revision.
func (s *kvServer) fillInHeader(h *pb.ResponseHeader) {
h.ClusterId = uint64(s.clusterID)
h.MemberId = uint64(s.memberID)
h.RaftTerm = s.raftTimer.Term()
}
func checkRangeRequest(r *pb.RangeRequest) error {
if len(r.Key) == 0 {
return rpctypes.ErrEmptyKey

View File

@ -30,12 +30,13 @@ type Alarmer interface {
}
type maintenanceServer struct {
bg BackendGetter
a Alarmer
bg BackendGetter
a Alarmer
hdr header
}
func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
return &maintenanceServer{bg: s, a: s}
return &maintenanceServer{bg: s, a: s, hdr: newHeader(s)}
}
func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
@ -49,6 +50,16 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe
return &pb.DefragmentResponse{}, nil
}
func (s *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
h, err := s.bg.Backend().Hash()
if err != nil {
return nil, togRPCError(err)
}
resp := &pb.HashResponse{Header: &pb.ResponseHeader{Revision: s.hdr.rev()}, Hash: h}
s.hdr.fill(resp.Header)
return resp, nil
}
func (ms *maintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
plog.Warningf("alarming %+v", ar)
return ms.a.Alarm(ctx, ar)

View File

@ -94,7 +94,7 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse,
err error
)
if txnID != noTxn {
rev, err = a.s.getKV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease))
rev, err = a.s.KV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease))
if err != nil {
return nil, err
}
@ -105,7 +105,7 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse,
return nil, lease.ErrLeaseNotFound
}
}
rev = a.s.getKV().Put(p.Key, p.Value, leaseID)
rev = a.s.KV().Put(p.Key, p.Value, leaseID)
}
resp.Header.Revision = rev
return resp, nil
@ -126,12 +126,12 @@ func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (
}
if txnID != noTxn {
n, rev, err = a.s.getKV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd)
n, rev, err = a.s.KV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd)
if err != nil {
return nil, err
}
} else {
n, rev = a.s.getKV().DeleteRange(dr.Key, dr.RangeEnd)
n, rev = a.s.KV().DeleteRange(dr.Key, dr.RangeEnd)
}
resp.Deleted = n
@ -164,12 +164,12 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp
}
if txnID != noTxn {
kvs, rev, err = a.s.getKV().TxnRange(txnID, r.Key, r.RangeEnd, limit, r.Revision)
kvs, rev, err = a.s.KV().TxnRange(txnID, r.Key, r.RangeEnd, limit, r.Revision)
if err != nil {
return nil, err
}
} else {
kvs, rev, err = a.s.getKV().Range(r.Key, r.RangeEnd, limit, r.Revision)
kvs, rev, err = a.s.KV().Range(r.Key, r.RangeEnd, limit, r.Revision)
if err != nil {
return nil, err
}
@ -235,9 +235,9 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
// When executing the operations of txn, we need to hold the txn lock.
// So the reader will not see any intermediate results.
txnID := a.s.getKV().TxnBegin()
txnID := a.s.KV().TxnBegin()
defer func() {
err := a.s.getKV().TxnEnd(txnID)
err := a.s.KV().TxnEnd(txnID)
if err != nil {
panic(fmt.Sprint("unexpected error when closing txn", txnID))
}
@ -264,7 +264,7 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
// It returns the revision at which the comparison happens. If the comparison
// succeeds, the it returns true. Otherwise it returns false.
func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
ckvs, rev, err := a.s.getKV().Range(c.Key, nil, 1, 0)
ckvs, rev, err := a.s.KV().Range(c.Key, nil, 1, 0)
if err != nil {
if err == dstorage.ErrTxnIDMismatch {
panic("unexpected txn ID mismatch error")
@ -365,12 +365,12 @@ func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestUnion) *pb.R
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, error) {
resp := &pb.CompactionResponse{}
resp.Header = &pb.ResponseHeader{}
err := a.s.getKV().Compact(compaction.Revision)
err := a.s.KV().Compact(compaction.Revision)
if err != nil {
return nil, err
}
// get the current revision. which key to get is not important.
_, resp.Header.Revision, _ = a.s.getKV().Range([]byte("compaction"), nil, 1, 0)
_, resp.Header.Revision, _ = a.s.KV().Range([]byte("compaction"), nil, 1, 0)
return resp, err
}
@ -559,10 +559,10 @@ func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestUnion) error {
continue
}
if greq.Revision > a.s.getKV().Rev() {
if greq.Revision > a.s.KV().Rev() {
return dstorage.ErrFutureRev
}
if greq.Revision < a.s.getKV().FirstRev() {
if greq.Revision < a.s.KV().FirstRev() {
return dstorage.ErrCompacted
}
}

View File

@ -1657,10 +1657,6 @@ type KVClient interface {
// Compact compacts the event history in etcd. User should compact the
// event history periodically, or it will grow infinitely.
Compact(ctx context.Context, in *CompactionRequest, opts ...grpc.CallOption) (*CompactionResponse, error)
// Hash returns the hash of local KV state for consistency checking purpose.
// This is designed for testing purpose. Do not use this in production when there
// are ongoing transactions.
Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error)
}
type kVClient struct {
@ -1716,15 +1712,6 @@ func (c *kVClient) Compact(ctx context.Context, in *CompactionRequest, opts ...g
return out, nil
}
func (c *kVClient) Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error) {
out := new(HashResponse)
err := grpc.Invoke(ctx, "/etcdserverpb.KV/Hash", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for KV service
type KVServer interface {
@ -1746,10 +1733,6 @@ type KVServer interface {
// Compact compacts the event history in etcd. User should compact the
// event history periodically, or it will grow infinitely.
Compact(context.Context, *CompactionRequest) (*CompactionResponse, error)
// Hash returns the hash of local KV state for consistency checking purpose.
// This is designed for testing purpose. Do not use this in production when there
// are ongoing transactions.
Hash(context.Context, *HashRequest) (*HashResponse, error)
}
func RegisterKVServer(s *grpc.Server, srv KVServer) {
@ -1816,18 +1799,6 @@ func _KV_Compact_Handler(srv interface{}, ctx context.Context, dec func(interfac
return out, nil
}
func _KV_Hash_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(HashRequest)
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(KVServer).Hash(ctx, in)
if err != nil {
return nil, err
}
return out, nil
}
var _KV_serviceDesc = grpc.ServiceDesc{
ServiceName: "etcdserverpb.KV",
HandlerType: (*KVServer)(nil),
@ -1852,10 +1823,6 @@ var _KV_serviceDesc = grpc.ServiceDesc{
MethodName: "Compact",
Handler: _KV_Compact_Handler,
},
{
MethodName: "Hash",
Handler: _KV_Hash_Handler,
},
},
Streams: []grpc.StreamDesc{},
}
@ -2274,10 +2241,13 @@ var _Cluster_serviceDesc = grpc.ServiceDesc{
// Client API for Maintenance service
type MaintenanceClient interface {
// TODO: move Hash from kv to Maintenance
Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error)
// Alarm activates, deactivates, and queries alarms regarding cluster health.
Alarm(ctx context.Context, in *AlarmRequest, opts ...grpc.CallOption) (*AlarmResponse, error)
Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error)
// Hash returns the hash of the local KV state for consistency checking purpose.
// This is designed for testing; do not use this in production when there
// are ongoing transactions.
Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error)
}
type maintenanceClient struct {
@ -2288,15 +2258,6 @@ func NewMaintenanceClient(cc *grpc.ClientConn) MaintenanceClient {
return &maintenanceClient{cc}
}
func (c *maintenanceClient) Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error) {
out := new(DefragmentResponse)
err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Defragment", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *maintenanceClient) Alarm(ctx context.Context, in *AlarmRequest, opts ...grpc.CallOption) (*AlarmResponse, error) {
out := new(AlarmResponse)
err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Alarm", in, out, c.cc, opts...)
@ -2306,31 +2267,40 @@ func (c *maintenanceClient) Alarm(ctx context.Context, in *AlarmRequest, opts ..
return out, nil
}
// Server API for Maintenance service
type MaintenanceServer interface {
// TODO: move Hash from kv to Maintenance
Defragment(context.Context, *DefragmentRequest) (*DefragmentResponse, error)
// Alarm activates, deactivates, and queries alarms regarding cluster health.
Alarm(context.Context, *AlarmRequest) (*AlarmResponse, error)
}
func RegisterMaintenanceServer(s *grpc.Server, srv MaintenanceServer) {
s.RegisterService(&_Maintenance_serviceDesc, srv)
}
func _Maintenance_Defragment_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(DefragmentRequest)
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(MaintenanceServer).Defragment(ctx, in)
func (c *maintenanceClient) Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error) {
out := new(DefragmentResponse)
err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Defragment", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *maintenanceClient) Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error) {
out := new(HashResponse)
err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Hash", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Maintenance service
type MaintenanceServer interface {
// Alarm activates, deactivates, and queries alarms regarding cluster health.
Alarm(context.Context, *AlarmRequest) (*AlarmResponse, error)
Defragment(context.Context, *DefragmentRequest) (*DefragmentResponse, error)
// Hash returns the hash of the local KV state for consistency checking purpose.
// This is designed for testing; do not use this in production when there
// are ongoing transactions.
Hash(context.Context, *HashRequest) (*HashResponse, error)
}
func RegisterMaintenanceServer(s *grpc.Server, srv MaintenanceServer) {
s.RegisterService(&_Maintenance_serviceDesc, srv)
}
func _Maintenance_Alarm_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(AlarmRequest)
if err := dec(in); err != nil {
@ -2343,17 +2313,45 @@ func _Maintenance_Alarm_Handler(srv interface{}, ctx context.Context, dec func(i
return out, nil
}
func _Maintenance_Defragment_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(DefragmentRequest)
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(MaintenanceServer).Defragment(ctx, in)
if err != nil {
return nil, err
}
return out, nil
}
func _Maintenance_Hash_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(HashRequest)
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(MaintenanceServer).Hash(ctx, in)
if err != nil {
return nil, err
}
return out, nil
}
var _Maintenance_serviceDesc = grpc.ServiceDesc{
ServiceName: "etcdserverpb.Maintenance",
HandlerType: (*MaintenanceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Alarm",
Handler: _Maintenance_Alarm_Handler,
},
{
MethodName: "Defragment",
Handler: _Maintenance_Defragment_Handler,
},
{
MethodName: "Alarm",
Handler: _Maintenance_Alarm_Handler,
MethodName: "Hash",
Handler: _Maintenance_Hash_Handler,
},
},
Streams: []grpc.StreamDesc{},

View File

@ -30,11 +30,6 @@ service KV {
// Compact compacts the event history in etcd. User should compact the
// event history periodically, or it will grow infinitely.
rpc Compact(CompactionRequest) returns (CompactionResponse) {}
// Hash returns the hash of local KV state for consistency checking purpose.
// This is designed for testing purpose. Do not use this in production when there
// are ongoing transactions.
rpc Hash(HashRequest) returns (HashResponse) {}
}
service Watch {
@ -77,11 +72,15 @@ service Cluster {
}
service Maintenance {
// TODO: move Hash from kv to Maintenance
rpc Defragment(DefragmentRequest) returns (DefragmentResponse) {}
// Alarm activates, deactivates, and queries alarms regarding cluster health.
rpc Alarm(AlarmRequest) returns (AlarmResponse) {}
rpc Defragment(DefragmentRequest) returns (DefragmentResponse) {}
// Hash returns the hash of the local KV state for consistency checking purpose.
// This is designed for testing; do not use this in production when there
// are ongoing transactions.
rpc Hash(HashRequest) returns (HashResponse) {}
}
service Auth {

View File

@ -1195,7 +1195,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
}
// commit v3 storage because WAL file before snapshot index
// could be removed after SaveSnap.
s.getKV().Commit()
s.KV().Commit()
// SaveSnap saves the snapshot and releases the locked wal files
// to the snapshot index.
if err = s.r.storage.SaveSnap(snap); err != nil {
@ -1334,7 +1334,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
}
}
func (s *EtcdServer) getKV() dstorage.ConsistentWatchableKV { return s.kv }
func (s *EtcdServer) KV() dstorage.ConsistentWatchableKV { return s.kv }
func (s *EtcdServer) Backend() backend.Backend {
s.bemu.Lock()
defer s.bemu.Unlock()

View File

@ -38,7 +38,6 @@ type RaftKV interface {
DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error)
}
type Lessor interface {
@ -109,14 +108,6 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.
return resp, result.err
}
func (s *EtcdServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
h, err := s.be.Hash()
if err != nil {
return nil, err
}
return &pb.HashResponse{Header: &pb.ResponseHeader{Revision: s.kv.Rev()}, Hash: h}, nil
}
func (s *EtcdServer) LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
// no id given? choose one
for r.ID == int64(lease.NoLease) {
@ -225,6 +216,4 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern
}
// Watchable returns a watchable interface attached to the etcdserver.
func (s *EtcdServer) Watchable() dstorage.Watchable {
return s.getKV()
}
func (s *EtcdServer) Watchable() dstorage.Watchable { return s.KV() }

View File

@ -440,7 +440,10 @@ func TestV3Hash(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
kvc := toGRPC(clus.RandClient()).KV
cli := clus.RandClient()
kvc := toGRPC(cli).KV
m := toGRPC(cli).Maintenance
preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
for i := 0; i < 3; i++ {
@ -450,7 +453,7 @@ func TestV3Hash(t *testing.T) {
}
}
resp, err := kvc.Hash(context.Background(), &pb.HashRequest{})
resp, err := m.Hash(context.Background(), &pb.HashRequest{})
if err != nil || resp.Hash == 0 {
t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
}

View File

@ -303,9 +303,9 @@ func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error)
if err != nil {
return nil, nil, err
}
kvc := pb.NewKVClient(conn)
m := pb.NewMaintenanceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := kvc.Hash(ctx, &pb.HashRequest{})
resp, err := m.Hash(ctx, &pb.HashRequest{})
cancel()
conn.Close()
if err != nil {