diff --git a/alarm/alarms.go b/alarm/alarms.go new file mode 100644 index 000000000..f94fabc3d --- /dev/null +++ b/alarm/alarms.go @@ -0,0 +1,144 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package alarm manages health status alarms in etcd. +package alarm + +import ( + "sync" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/storage/backend" + "github.com/coreos/pkg/capnslog" +) + +var ( + alarmBucketName = []byte("alarm") + plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "alarm") +) + +type BackendGetter interface { + Backend() backend.Backend +} + +type alarmSet map[types.ID]*pb.AlarmMember + +// AlarmStore persists alarms to the backend. +type AlarmStore struct { + mu sync.Mutex + types map[pb.AlarmType]alarmSet + + bg BackendGetter +} + +func NewAlarmStore(bg BackendGetter) (*AlarmStore, error) { + ret := &AlarmStore{types: make(map[pb.AlarmType]alarmSet), bg: bg} + err := ret.restore() + return ret, err +} + +func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember { + a.mu.Lock() + defer a.mu.Unlock() + + newAlarm := &pb.AlarmMember{MemberID: uint64(id), Alarm: at} + if m := a.addToMap(newAlarm); m != newAlarm { + return m + } + + v, err := newAlarm.Marshal() + if err != nil { + plog.Panicf("failed to marshal alarm member") + } + + b := a.bg.Backend() + b.BatchTx().Lock() + b.BatchTx().UnsafePut(alarmBucketName, v, nil) + b.BatchTx().Unlock() + + return newAlarm +} + +func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember { + a.mu.Lock() + defer a.mu.Unlock() + + t := a.types[at] + if t == nil { + t = make(alarmSet) + a.types[at] = t + } + m := t[id] + if m == nil { + return nil + } + + delete(t, id) + + v, err := m.Marshal() + if err != nil { + plog.Panicf("failed to marshal alarm member") + } + + b := a.bg.Backend() + b.BatchTx().Lock() + b.BatchTx().UnsafeDelete(alarmBucketName, v) + b.BatchTx().Unlock() + + return m +} + +func (a *AlarmStore) Get(at pb.AlarmType) (ret []*pb.AlarmMember) { + a.mu.Lock() + defer a.mu.Unlock() + for _, m := range a.types[at] { + ret = append(ret, m) + } + return ret +} + +func (a *AlarmStore) restore() error { + b := a.bg.Backend() + tx := b.BatchTx() + + tx.Lock() + tx.UnsafeCreateBucket(alarmBucketName) + err := tx.UnsafeForEach(alarmBucketName, func(k, v []byte) error { + var m pb.AlarmMember + if err := m.Unmarshal(k); err != nil { + return err + } + a.addToMap(&m) + return nil + }) + tx.Unlock() + + b.ForceCommit() + return err +} + +func (a *AlarmStore) addToMap(newAlarm *pb.AlarmMember) *pb.AlarmMember { + t := a.types[newAlarm.Alarm] + if t == nil { + t = make(alarmSet) + a.types[newAlarm.Alarm] = t + } + m := t[types.ID(newAlarm.MemberID)] + if m != nil { + return m + } + t[types.ID(newAlarm.MemberID)] = newAlarm + return newAlarm +} diff --git a/etcdserver/api/v3rpc/grpc.go b/etcdserver/api/v3rpc/grpc.go index 52a09ee0d..7515363ab 100644 --- a/etcdserver/api/v3rpc/grpc.go +++ b/etcdserver/api/v3rpc/grpc.go @@ -30,9 +30,9 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server { } grpcServer := grpc.NewServer(opts...) - pb.RegisterKVServer(grpcServer, NewKVServer(s)) + pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s)) pb.RegisterWatchServer(grpcServer, NewWatchServer(s)) - pb.RegisterLeaseServer(grpcServer, NewLeaseServer(s)) + pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s)) pb.RegisterClusterServer(grpcServer, NewClusterServer(s)) pb.RegisterAuthServer(grpcServer, NewAuthServer(s)) pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s)) diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index e802744d9..5c6c07bf2 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -297,6 +297,8 @@ func togRPCError(err error) error { // TODO: handle error from raft and timeout case etcdserver.ErrRequestTooLarge: return rpctypes.ErrRequestTooLarge + case etcdserver.ErrNoSpace: + return rpctypes.ErrNoSpace default: return grpc.Errorf(codes.Internal, err.Error()) } diff --git a/etcdserver/api/v3rpc/maintenance.go b/etcdserver/api/v3rpc/maintenance.go index 6fa54113a..8c3664ffa 100644 --- a/etcdserver/api/v3rpc/maintenance.go +++ b/etcdserver/api/v3rpc/maintenance.go @@ -25,12 +25,17 @@ type BackendGetter interface { Backend() backend.Backend } +type Alarmer interface { + Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) +} + type maintenanceServer struct { bg BackendGetter + a Alarmer } func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer { - return &maintenanceServer{bg: s} + return &maintenanceServer{bg: s, a: s} } func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) { @@ -43,3 +48,8 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe plog.Noticef("finished defragmenting the storage backend") return &pb.DefragmentResponse{}, nil } + +func (ms *maintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) { + plog.Warningf("alarming %+v", ar) + return ms.a.Alarm(ctx, ar) +} diff --git a/etcdserver/api/v3rpc/quota.go b/etcdserver/api/v3rpc/quota.go new file mode 100644 index 000000000..ba2288d89 --- /dev/null +++ b/etcdserver/api/v3rpc/quota.go @@ -0,0 +1,89 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v3rpc + +import ( + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/types" + "golang.org/x/net/context" +) + +type quotaKVServer struct { + pb.KVServer + qa quotaAlarmer +} + +type quotaAlarmer struct { + q etcdserver.Quota + a Alarmer + id types.ID +} + +// check whether request satisfies the quota. If there is not enough space, +// ignore request and raise the free space alarm. +func (qa *quotaAlarmer) check(ctx context.Context, r interface{}) error { + if qa.q.Available(r) { + return nil + } + req := &pb.AlarmRequest{ + MemberID: int64(qa.id), + Action: pb.AlarmRequest_ACTIVATE, + Alarm: pb.AlarmType_NOSPACE, + } + qa.a.Alarm(ctx, req) + return rpctypes.ErrNoSpace +} + +func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer { + return "aKVServer{ + NewKVServer(s), + quotaAlarmer{etcdserver.NewBackendQuota(s), s, s.ID()}, + } +} + +func (s *quotaKVServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { + if err := s.qa.check(ctx, r); err != nil { + return nil, err + } + return s.KVServer.Put(ctx, r) +} + +func (s *quotaKVServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { + if err := s.qa.check(ctx, r); err != nil { + return nil, err + } + return s.KVServer.Txn(ctx, r) +} + +type quotaLeaseServer struct { + pb.LeaseServer + qa quotaAlarmer +} + +func (s *quotaLeaseServer) LeaseCreate(ctx context.Context, cr *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { + if err := s.qa.check(ctx, cr); err != nil { + return nil, err + } + return s.LeaseServer.LeaseCreate(ctx, cr) +} + +func NewQuotaLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer { + return "aLeaseServer{ + NewLeaseServer(s), + quotaAlarmer{etcdserver.NewBackendQuota(s), s, s.ID()}, + } +} diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 00ab1b07d..fb5bb22a9 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -25,6 +25,7 @@ var ( ErrDuplicateKey = grpc.Errorf(codes.InvalidArgument, "etcdserver: duplicate key given in txn request") ErrCompacted = grpc.Errorf(codes.OutOfRange, "etcdserver: storage: required revision has been compacted") ErrFutureRev = grpc.Errorf(codes.OutOfRange, "etcdserver: storage: required revision is a future revision") + ErrNoSpace = grpc.Errorf(codes.ResourceExhausted, "etcdserver: storage: database space exceeded") ErrLeaseNotFound = grpc.Errorf(codes.NotFound, "etcdserver: requested lease not found") ErrLeaseExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: lease already exists") diff --git a/etcdserver/apply.go b/etcdserver/apply.go new file mode 100644 index 000000000..4b4f0f28c --- /dev/null +++ b/etcdserver/apply.go @@ -0,0 +1,587 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "bytes" + "fmt" + "sort" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/pkg/types" + dstorage "github.com/coreos/etcd/storage" + "github.com/coreos/etcd/storage/storagepb" + "github.com/gogo/protobuf/proto" +) + +const ( + // noTxn is an invalid txn ID. + // To apply with independent Range, Put, Delete, you can pass noTxn + // to apply functions instead of a valid txn ID. + noTxn = -1 +) + +type applyResult struct { + resp proto.Message + err error +} + +// applierV3 is the interface for processing V3 raft messages +type applierV3 interface { + Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) + Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error) + DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) + Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) + Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, error) + LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) + LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) + Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error) + AuthEnable() (*pb.AuthEnableResponse, error) + UserAdd(ua *pb.UserAddRequest) (*pb.UserAddResponse, error) +} + +type applierV3backend struct { + s *EtcdServer +} + +func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) *applyResult { + ar := &applyResult{} + switch { + case r.Range != nil: + ar.resp, ar.err = s.applyV3.Range(noTxn, r.Range) + case r.Put != nil: + ar.resp, ar.err = s.applyV3.Put(noTxn, r.Put) + case r.DeleteRange != nil: + ar.resp, ar.err = s.applyV3.DeleteRange(noTxn, r.DeleteRange) + case r.Txn != nil: + ar.resp, ar.err = s.applyV3.Txn(r.Txn) + case r.Compaction != nil: + ar.resp, ar.err = s.applyV3.Compaction(r.Compaction) + case r.LeaseCreate != nil: + ar.resp, ar.err = s.applyV3.LeaseCreate(r.LeaseCreate) + case r.LeaseRevoke != nil: + ar.resp, ar.err = s.applyV3.LeaseRevoke(r.LeaseRevoke) + case r.Alarm != nil: + ar.resp, ar.err = s.applyV3.Alarm(r.Alarm) + case r.AuthEnable != nil: + ar.resp, ar.err = s.applyV3.AuthEnable() + case r.UserAdd != nil: + ar.resp, ar.err = s.applyV3.UserAdd(r.UserAdd) + default: + panic("not implemented") + } + return ar +} + +func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) { + resp := &pb.PutResponse{} + resp.Header = &pb.ResponseHeader{} + var ( + rev int64 + err error + ) + if txnID != noTxn { + rev, err = a.s.getKV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease)) + if err != nil { + return nil, err + } + } else { + leaseID := lease.LeaseID(p.Lease) + if leaseID != lease.NoLease { + if l := a.s.lessor.Lookup(leaseID); l == nil { + return nil, lease.ErrLeaseNotFound + } + } + rev = a.s.getKV().Put(p.Key, p.Value, leaseID) + } + resp.Header.Revision = rev + return resp, nil +} + +func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { + resp := &pb.DeleteRangeResponse{} + resp.Header = &pb.ResponseHeader{} + + var ( + n int64 + rev int64 + err error + ) + + if isGteRange(dr.RangeEnd) { + dr.RangeEnd = []byte{} + } + + if txnID != noTxn { + n, rev, err = a.s.getKV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd) + if err != nil { + return nil, err + } + } else { + n, rev = a.s.getKV().DeleteRange(dr.Key, dr.RangeEnd) + } + + resp.Deleted = n + resp.Header.Revision = rev + return resp, nil +} + +func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error) { + resp := &pb.RangeResponse{} + resp.Header = &pb.ResponseHeader{} + + var ( + kvs []storagepb.KeyValue + rev int64 + err error + ) + + if isGteRange(r.RangeEnd) { + r.RangeEnd = []byte{} + } + + limit := r.Limit + if r.SortOrder != pb.RangeRequest_NONE { + // fetch everything; sort and truncate afterwards + limit = 0 + } + if limit > 0 { + // fetch one extra for 'more' flag + limit = limit + 1 + } + + if txnID != noTxn { + kvs, rev, err = a.s.getKV().TxnRange(txnID, r.Key, r.RangeEnd, limit, r.Revision) + if err != nil { + return nil, err + } + } else { + kvs, rev, err = a.s.getKV().Range(r.Key, r.RangeEnd, limit, r.Revision) + if err != nil { + return nil, err + } + } + + if r.SortOrder != pb.RangeRequest_NONE { + var sorter sort.Interface + switch { + case r.SortTarget == pb.RangeRequest_KEY: + sorter = &kvSortByKey{&kvSort{kvs}} + case r.SortTarget == pb.RangeRequest_VERSION: + sorter = &kvSortByVersion{&kvSort{kvs}} + case r.SortTarget == pb.RangeRequest_CREATE: + sorter = &kvSortByCreate{&kvSort{kvs}} + case r.SortTarget == pb.RangeRequest_MOD: + sorter = &kvSortByMod{&kvSort{kvs}} + case r.SortTarget == pb.RangeRequest_VALUE: + sorter = &kvSortByValue{&kvSort{kvs}} + } + switch { + case r.SortOrder == pb.RangeRequest_ASCEND: + sort.Sort(sorter) + case r.SortOrder == pb.RangeRequest_DESCEND: + sort.Sort(sort.Reverse(sorter)) + } + } + + if r.Limit > 0 && len(kvs) > int(r.Limit) { + kvs = kvs[:r.Limit] + resp.More = true + } + + resp.Header.Revision = rev + for i := range kvs { + resp.Kvs = append(resp.Kvs, &kvs[i]) + } + return resp, nil +} + +func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { + var revision int64 + + ok := true + for _, c := range rt.Compare { + if revision, ok = a.applyCompare(c); !ok { + break + } + } + + var reqs []*pb.RequestUnion + if ok { + reqs = rt.Success + } else { + reqs = rt.Failure + } + + if err := a.checkRequestLeases(reqs); err != nil { + return nil, err + } + if err := a.checkRequestRange(reqs); err != nil { + return nil, err + } + + // When executing the operations of txn, we need to hold the txn lock. + // So the reader will not see any intermediate results. + txnID := a.s.getKV().TxnBegin() + defer func() { + err := a.s.getKV().TxnEnd(txnID) + if err != nil { + panic(fmt.Sprint("unexpected error when closing txn", txnID)) + } + }() + + resps := make([]*pb.ResponseUnion, len(reqs)) + for i := range reqs { + resps[i] = a.applyUnion(txnID, reqs[i]) + } + + if len(resps) != 0 { + revision += 1 + } + + txnResp := &pb.TxnResponse{} + txnResp.Header = &pb.ResponseHeader{} + txnResp.Header.Revision = revision + txnResp.Responses = resps + txnResp.Succeeded = ok + return txnResp, nil +} + +// applyCompare applies the compare request. +// It returns the revision at which the comparison happens. If the comparison +// succeeds, the it returns true. Otherwise it returns false. +func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) { + ckvs, rev, err := a.s.getKV().Range(c.Key, nil, 1, 0) + if err != nil { + if err == dstorage.ErrTxnIDMismatch { + panic("unexpected txn ID mismatch error") + } + return rev, false + } + var ckv storagepb.KeyValue + if len(ckvs) != 0 { + ckv = ckvs[0] + } else { + // Use the zero value of ckv normally. However... + if c.Target == pb.Compare_VALUE { + // Always fail if we're comparing a value on a key that doesn't exist. + // We can treat non-existence as the empty set explicitly, such that + // even a key with a value of length 0 bytes is still a real key + // that was written that way + return rev, false + } + } + + // -1 is less, 0 is equal, 1 is greater + var result int + switch c.Target { + case pb.Compare_VALUE: + tv, _ := c.TargetUnion.(*pb.Compare_Value) + if tv != nil { + result = bytes.Compare(ckv.Value, tv.Value) + } + case pb.Compare_CREATE: + tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision) + if tv != nil { + result = compareInt64(ckv.CreateRevision, tv.CreateRevision) + } + + case pb.Compare_MOD: + tv, _ := c.TargetUnion.(*pb.Compare_ModRevision) + if tv != nil { + result = compareInt64(ckv.ModRevision, tv.ModRevision) + } + case pb.Compare_VERSION: + tv, _ := c.TargetUnion.(*pb.Compare_Version) + if tv != nil { + result = compareInt64(ckv.Version, tv.Version) + } + } + + switch c.Result { + case pb.Compare_EQUAL: + if result != 0 { + return rev, false + } + case pb.Compare_GREATER: + if result != 1 { + return rev, false + } + case pb.Compare_LESS: + if result != -1 { + return rev, false + } + } + return rev, true +} + +func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestUnion) *pb.ResponseUnion { + switch tv := union.Request.(type) { + case *pb.RequestUnion_RequestRange: + if tv.RequestRange != nil { + resp, err := a.Range(txnID, tv.RequestRange) + if err != nil { + panic("unexpected error during txn") + } + return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponseRange{ResponseRange: resp}} + } + case *pb.RequestUnion_RequestPut: + if tv.RequestPut != nil { + resp, err := a.Put(txnID, tv.RequestPut) + if err != nil { + panic("unexpected error during txn") + } + return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponsePut{ResponsePut: resp}} + } + case *pb.RequestUnion_RequestDeleteRange: + if tv.RequestDeleteRange != nil { + resp, err := a.DeleteRange(txnID, tv.RequestDeleteRange) + if err != nil { + panic("unexpected error during txn") + } + return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponseDeleteRange{ResponseDeleteRange: resp}} + } + default: + // empty union + return nil + } + return nil + +} + +func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, error) { + resp := &pb.CompactionResponse{} + resp.Header = &pb.ResponseHeader{} + err := a.s.getKV().Compact(compaction.Revision) + if err != nil { + return nil, err + } + // get the current revision. which key to get is not important. + _, resp.Header.Revision, _ = a.s.getKV().Range([]byte("compaction"), nil, 1, 0) + return resp, err +} + +func (a *applierV3backend) LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { + l, err := a.s.lessor.Grant(lease.LeaseID(lc.ID), lc.TTL) + resp := &pb.LeaseCreateResponse{} + if err == nil { + resp.ID = int64(l.ID) + resp.TTL = l.TTL + } + return resp, err +} + +func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { + err := a.s.lessor.Revoke(lease.LeaseID(lc.ID)) + return &pb.LeaseRevokeResponse{}, err +} + +func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) { + resp := &pb.AlarmResponse{} + switch ar.Action { + case pb.AlarmRequest_GET: + resp.Alarms = a.s.alarmStore.Get(ar.Alarm) + case pb.AlarmRequest_ACTIVATE: + m := a.s.alarmStore.Activate(types.ID(ar.MemberID), ar.Alarm) + if m == nil { + break + } + resp.Alarms = append(resp.Alarms, m) + switch m.Alarm { + case pb.AlarmType_NOSPACE: + if len(a.s.alarmStore.Get(m.Alarm)) == 1 { + a.s.applyV3 = newApplierV3Capped(a) + } + default: + plog.Warningf("unimplemented alarm activation (%+v)", m) + } + case pb.AlarmRequest_DEACTIVATE: + m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm) + if m == nil { + break + } + resp.Alarms = append(resp.Alarms, m) + if m.Alarm == pb.AlarmType_NOSPACE && len(a.s.alarmStore.Get(ar.Alarm)) == 0 { + a.s.applyV3 = newQuotaApplierV3(a.s, &applierV3backend{a.s}) + } + default: + return nil, nil + } + return resp, nil +} + +type applierV3Capped struct { + applierV3 + q backendQuota +} + +// newApplierV3Capped creates an applyV3 that will reject Puts and transactions +// with Puts so that the number of keys in the store is capped. +func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} } + +func (a *applierV3Capped) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) { + return nil, ErrNoSpace +} + +func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) { + if a.q.Cost(r) > 0 { + return nil, ErrNoSpace + } + return a.applierV3.Txn(r) +} + +func (a *applierV3Capped) LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { + return nil, ErrNoSpace +} + +func (a *applierV3backend) AuthEnable() (*pb.AuthEnableResponse, error) { + a.s.AuthStore().AuthEnable() + return &pb.AuthEnableResponse{}, nil +} + +func (a *applierV3backend) UserAdd(r *pb.UserAddRequest) (*pb.UserAddResponse, error) { + return a.s.AuthStore().UserAdd(r) +} + +type quotaApplierV3 struct { + applierV3 + q Quota +} + +func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 { + return "aApplierV3{app, NewBackendQuota(s)} +} + +func (a *quotaApplierV3) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) { + ok := a.q.Available(p) + resp, err := a.applierV3.Put(txnID, p) + if err == nil && !ok { + err = ErrNoSpace + } + return resp, err +} + +func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { + ok := a.q.Available(rt) + resp, err := a.applierV3.Txn(rt) + if err == nil && !ok { + err = ErrNoSpace + } + return resp, err +} + +func (a *quotaApplierV3) LeaseCreate(lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { + ok := a.q.Available(lc) + resp, err := a.applierV3.LeaseCreate(lc) + if err == nil && !ok { + err = ErrNoSpace + } + return resp, err +} + +type kvSort struct{ kvs []storagepb.KeyValue } + +func (s *kvSort) Swap(i, j int) { + t := s.kvs[i] + s.kvs[i] = s.kvs[j] + s.kvs[j] = t +} +func (s *kvSort) Len() int { return len(s.kvs) } + +type kvSortByKey struct{ *kvSort } + +func (s *kvSortByKey) Less(i, j int) bool { + return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0 +} + +type kvSortByVersion struct{ *kvSort } + +func (s *kvSortByVersion) Less(i, j int) bool { + return (s.kvs[i].Version - s.kvs[j].Version) < 0 +} + +type kvSortByCreate struct{ *kvSort } + +func (s *kvSortByCreate) Less(i, j int) bool { + return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0 +} + +type kvSortByMod struct{ *kvSort } + +func (s *kvSortByMod) Less(i, j int) bool { + return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0 +} + +type kvSortByValue struct{ *kvSort } + +func (s *kvSortByValue) Less(i, j int) bool { + return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0 +} + +func (a *applierV3backend) checkRequestLeases(reqs []*pb.RequestUnion) error { + for _, requ := range reqs { + tv, ok := requ.Request.(*pb.RequestUnion_RequestPut) + if !ok { + continue + } + preq := tv.RequestPut + if preq == nil || lease.LeaseID(preq.Lease) == lease.NoLease { + continue + } + if l := a.s.lessor.Lookup(lease.LeaseID(preq.Lease)); l == nil { + return lease.ErrLeaseNotFound + } + } + return nil +} + +func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestUnion) error { + for _, requ := range reqs { + tv, ok := requ.Request.(*pb.RequestUnion_RequestRange) + if !ok { + continue + } + greq := tv.RequestRange + if greq == nil || greq.Revision == 0 { + continue + } + + if greq.Revision > a.s.getKV().Rev() { + return dstorage.ErrFutureRev + } + if greq.Revision < a.s.getKV().FirstRev() { + return dstorage.ErrCompacted + } + } + return nil +} + +func compareInt64(a, b int64) int { + switch { + case a < b: + return -1 + case a > b: + return 1 + default: + return 0 + } +} + +// isGteRange determines if the range end is a >= range. This works around grpc +// sending empty byte strings as nil; >= is encoded in the range end as '\0'. +func isGteRange(rangeEnd []byte) bool { + return len(rangeEnd) == 1 && rangeEnd[0] == 0 +} diff --git a/etcdserver/errors.go b/etcdserver/errors.go index eef57ccf3..07657f011 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -35,6 +35,7 @@ var ( ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") ErrNoLeader = errors.New("etcdserver: no leader") ErrRequestTooLarge = errors.New("etcdserver: request is too large") + ErrNoSpace = errors.New("etcdserver: no space") ) func isKeyNotFound(err error) bool { diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index 0044a7a57..e5519e944 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -52,6 +52,9 @@ MemberListResponse DefragmentRequest DefragmentResponse + AlarmRequest + AlarmMember + AlarmResponse AuthEnableRequest AuthDisableRequest AuthenticateRequest diff --git a/etcdserver/etcdserverpb/raft_internal.pb.go b/etcdserver/etcdserverpb/raft_internal.pb.go index d269791d5..83a6ef89c 100644 --- a/etcdserver/etcdserverpb/raft_internal.pb.go +++ b/etcdserver/etcdserverpb/raft_internal.pb.go @@ -33,6 +33,7 @@ type InternalRaftRequest struct { LeaseRevoke *LeaseRevokeRequest `protobuf:"bytes,9,opt,name=lease_revoke" json:"lease_revoke,omitempty"` AuthEnable *AuthEnableRequest `protobuf:"bytes,10,opt,name=auth_enable" json:"auth_enable,omitempty"` UserAdd *UserAddRequest `protobuf:"bytes,11,opt,name=user_add" json:"user_add,omitempty"` + Alarm *AlarmRequest `protobuf:"bytes,12,opt,name=alarm" json:"alarm,omitempty"` } func (m *InternalRaftRequest) Reset() { *m = InternalRaftRequest{} } @@ -170,6 +171,16 @@ func (m *InternalRaftRequest) MarshalTo(data []byte) (int, error) { } i += n10 } + if m.Alarm != nil { + data[i] = 0x62 + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.Alarm.Size())) + n11, err := m.Alarm.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n11 + } return i, nil } @@ -264,6 +275,10 @@ func (m *InternalRaftRequest) Size() (n int) { l = m.UserAdd.Size() n += 1 + l + sovRaftInternal(uint64(l)) } + if m.Alarm != nil { + l = m.Alarm.Size() + n += 1 + l + sovRaftInternal(uint64(l)) + } return n } @@ -664,6 +679,39 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error { return err } iNdEx = postIndex + case 12: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Alarm", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaftInternal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRaftInternal + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Alarm == nil { + m.Alarm = &AlarmRequest{} + } + if err := m.Alarm.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRaftInternal(data[iNdEx:]) diff --git a/etcdserver/etcdserverpb/raft_internal.proto b/etcdserver/etcdserverpb/raft_internal.proto index 01e0e7c56..dd7ac0586 100644 --- a/etcdserver/etcdserverpb/raft_internal.proto +++ b/etcdserver/etcdserverpb/raft_internal.proto @@ -27,6 +27,8 @@ message InternalRaftRequest { AuthEnableRequest auth_enable = 10; UserAddRequest user_add = 11; + + AlarmRequest alarm = 12; } message EmptyResponse { diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index f2e4abd84..d9e54728c 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -26,6 +26,26 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf +type AlarmType int32 + +const ( + AlarmType_NONE AlarmType = 0 + AlarmType_NOSPACE AlarmType = 1 +) + +var AlarmType_name = map[int32]string{ + 0: "NONE", + 1: "NOSPACE", +} +var AlarmType_value = map[string]int32{ + "NONE": 0, + "NOSPACE": 1, +} + +func (x AlarmType) String() string { + return proto.EnumName(AlarmType_name, int32(x)) +} + type RangeRequest_SortOrder int32 const ( @@ -127,6 +147,29 @@ func (x Compare_CompareTarget) String() string { return proto.EnumName(Compare_CompareTarget_name, int32(x)) } +type AlarmRequest_AlarmAction int32 + +const ( + AlarmRequest_GET AlarmRequest_AlarmAction = 0 + AlarmRequest_ACTIVATE AlarmRequest_AlarmAction = 1 + AlarmRequest_DEACTIVATE AlarmRequest_AlarmAction = 2 +) + +var AlarmRequest_AlarmAction_name = map[int32]string{ + 0: "GET", + 1: "ACTIVATE", + 2: "DEACTIVATE", +} +var AlarmRequest_AlarmAction_value = map[string]int32{ + "GET": 0, + "ACTIVATE": 1, + "DEACTIVATE": 2, +} + +func (x AlarmRequest_AlarmAction) String() string { + return proto.EnumName(AlarmRequest_AlarmAction_name, int32(x)) +} + type ResponseHeader struct { ClusterId uint64 `protobuf:"varint,1,opt,name=cluster_id,proto3" json:"cluster_id,omitempty"` MemberId uint64 `protobuf:"varint,2,opt,name=member_id,proto3" json:"member_id,omitempty"` @@ -1159,6 +1202,49 @@ func (m *DefragmentResponse) GetHeader() *ResponseHeader { return nil } +type AlarmRequest struct { + Action AlarmRequest_AlarmAction `protobuf:"varint,1,opt,name=action,proto3,enum=etcdserverpb.AlarmRequest_AlarmAction" json:"action,omitempty"` + // MemberID is the member raising the alarm request + MemberID int64 `protobuf:"varint,2,opt,name=memberID,proto3" json:"memberID,omitempty"` + Alarm AlarmType `protobuf:"varint,3,opt,name=alarm,proto3,enum=etcdserverpb.AlarmType" json:"alarm,omitempty"` +} + +func (m *AlarmRequest) Reset() { *m = AlarmRequest{} } +func (m *AlarmRequest) String() string { return proto.CompactTextString(m) } +func (*AlarmRequest) ProtoMessage() {} + +type AlarmMember struct { + MemberID uint64 `protobuf:"varint,1,opt,name=memberID,proto3" json:"memberID,omitempty"` + Alarm AlarmType `protobuf:"varint,2,opt,name=alarm,proto3,enum=etcdserverpb.AlarmType" json:"alarm,omitempty"` +} + +func (m *AlarmMember) Reset() { *m = AlarmMember{} } +func (m *AlarmMember) String() string { return proto.CompactTextString(m) } +func (*AlarmMember) ProtoMessage() {} + +type AlarmResponse struct { + Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` + Alarms []*AlarmMember `protobuf:"bytes,2,rep,name=alarms" json:"alarms,omitempty"` +} + +func (m *AlarmResponse) Reset() { *m = AlarmResponse{} } +func (m *AlarmResponse) String() string { return proto.CompactTextString(m) } +func (*AlarmResponse) ProtoMessage() {} + +func (m *AlarmResponse) GetHeader() *ResponseHeader { + if m != nil { + return m.Header + } + return nil +} + +func (m *AlarmResponse) GetAlarms() []*AlarmMember { + if m != nil { + return m.Alarms + } + return nil +} + type AuthEnableRequest struct { } @@ -1507,6 +1593,9 @@ func init() { proto.RegisterType((*MemberListResponse)(nil), "etcdserverpb.MemberListResponse") proto.RegisterType((*DefragmentRequest)(nil), "etcdserverpb.DefragmentRequest") proto.RegisterType((*DefragmentResponse)(nil), "etcdserverpb.DefragmentResponse") + proto.RegisterType((*AlarmRequest)(nil), "etcdserverpb.AlarmRequest") + proto.RegisterType((*AlarmMember)(nil), "etcdserverpb.AlarmMember") + proto.RegisterType((*AlarmResponse)(nil), "etcdserverpb.AlarmResponse") proto.RegisterType((*AuthEnableRequest)(nil), "etcdserverpb.AuthEnableRequest") proto.RegisterType((*AuthDisableRequest)(nil), "etcdserverpb.AuthDisableRequest") proto.RegisterType((*AuthenticateRequest)(nil), "etcdserverpb.AuthenticateRequest") @@ -1535,10 +1624,12 @@ func init() { proto.RegisterType((*RoleDeleteResponse)(nil), "etcdserverpb.RoleDeleteResponse") proto.RegisterType((*RoleGrantResponse)(nil), "etcdserverpb.RoleGrantResponse") proto.RegisterType((*RoleRevokeResponse)(nil), "etcdserverpb.RoleRevokeResponse") + proto.RegisterEnum("etcdserverpb.AlarmType", AlarmType_name, AlarmType_value) proto.RegisterEnum("etcdserverpb.RangeRequest_SortOrder", RangeRequest_SortOrder_name, RangeRequest_SortOrder_value) proto.RegisterEnum("etcdserverpb.RangeRequest_SortTarget", RangeRequest_SortTarget_name, RangeRequest_SortTarget_value) proto.RegisterEnum("etcdserverpb.Compare_CompareResult", Compare_CompareResult_name, Compare_CompareResult_value) proto.RegisterEnum("etcdserverpb.Compare_CompareTarget", Compare_CompareTarget_name, Compare_CompareTarget_value) + proto.RegisterEnum("etcdserverpb.AlarmRequest_AlarmAction", AlarmRequest_AlarmAction_name, AlarmRequest_AlarmAction_value) } // Reference imports to suppress errors if they are not otherwise used. @@ -2185,6 +2276,8 @@ var _Cluster_serviceDesc = grpc.ServiceDesc{ type MaintenanceClient interface { // TODO: move Hash from kv to Maintenance Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error) + // Alarm activates, deactivates, and queries alarms regarding cluster health. + Alarm(ctx context.Context, in *AlarmRequest, opts ...grpc.CallOption) (*AlarmResponse, error) } type maintenanceClient struct { @@ -2204,11 +2297,22 @@ func (c *maintenanceClient) Defragment(ctx context.Context, in *DefragmentReques return out, nil } +func (c *maintenanceClient) Alarm(ctx context.Context, in *AlarmRequest, opts ...grpc.CallOption) (*AlarmResponse, error) { + out := new(AlarmResponse) + err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Alarm", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // Server API for Maintenance service type MaintenanceServer interface { // TODO: move Hash from kv to Maintenance Defragment(context.Context, *DefragmentRequest) (*DefragmentResponse, error) + // Alarm activates, deactivates, and queries alarms regarding cluster health. + Alarm(context.Context, *AlarmRequest) (*AlarmResponse, error) } func RegisterMaintenanceServer(s *grpc.Server, srv MaintenanceServer) { @@ -2227,6 +2331,18 @@ func _Maintenance_Defragment_Handler(srv interface{}, ctx context.Context, dec f return out, nil } +func _Maintenance_Alarm_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { + in := new(AlarmRequest) + if err := dec(in); err != nil { + return nil, err + } + out, err := srv.(MaintenanceServer).Alarm(ctx, in) + if err != nil { + return nil, err + } + return out, nil +} + var _Maintenance_serviceDesc = grpc.ServiceDesc{ ServiceName: "etcdserverpb.Maintenance", HandlerType: (*MaintenanceServer)(nil), @@ -2235,6 +2351,10 @@ var _Maintenance_serviceDesc = grpc.ServiceDesc{ MethodName: "Defragment", Handler: _Maintenance_Defragment_Handler, }, + { + MethodName: "Alarm", + Handler: _Maintenance_Alarm_Handler, + }, }, Streams: []grpc.StreamDesc{}, } @@ -4115,6 +4235,107 @@ func (m *DefragmentResponse) MarshalTo(data []byte) (int, error) { return i, nil } +func (m *AlarmRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *AlarmRequest) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Action != 0 { + data[i] = 0x8 + i++ + i = encodeVarintRpc(data, i, uint64(m.Action)) + } + if m.MemberID != 0 { + data[i] = 0x10 + i++ + i = encodeVarintRpc(data, i, uint64(m.MemberID)) + } + if m.Alarm != 0 { + data[i] = 0x18 + i++ + i = encodeVarintRpc(data, i, uint64(m.Alarm)) + } + return i, nil +} + +func (m *AlarmMember) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *AlarmMember) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.MemberID != 0 { + data[i] = 0x8 + i++ + i = encodeVarintRpc(data, i, uint64(m.MemberID)) + } + if m.Alarm != 0 { + data[i] = 0x10 + i++ + i = encodeVarintRpc(data, i, uint64(m.Alarm)) + } + return i, nil +} + +func (m *AlarmResponse) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *AlarmResponse) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Header != nil { + data[i] = 0xa + i++ + i = encodeVarintRpc(data, i, uint64(m.Header.Size())) + n29, err := m.Header.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n29 + } + if len(m.Alarms) > 0 { + for _, msg := range m.Alarms { + data[i] = 0x12 + i++ + i = encodeVarintRpc(data, i, uint64(msg.Size())) + n, err := msg.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + func (m *AuthEnableRequest) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) @@ -4398,11 +4619,11 @@ func (m *AuthEnableResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n29, err := m.Header.MarshalTo(data[i:]) + n30, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n29 + i += n30 } return i, nil } @@ -4426,11 +4647,11 @@ func (m *AuthDisableResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n30, err := m.Header.MarshalTo(data[i:]) + n31, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n30 + i += n31 } return i, nil } @@ -4454,11 +4675,11 @@ func (m *AuthenticateResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n31, err := m.Header.MarshalTo(data[i:]) + n32, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n31 + i += n32 } return i, nil } @@ -4482,11 +4703,11 @@ func (m *UserAddResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n32, err := m.Header.MarshalTo(data[i:]) + n33, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n32 + i += n33 } return i, nil } @@ -4510,11 +4731,11 @@ func (m *UserGetResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n33, err := m.Header.MarshalTo(data[i:]) + n34, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n33 + i += n34 } return i, nil } @@ -4538,11 +4759,11 @@ func (m *UserDeleteResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n34, err := m.Header.MarshalTo(data[i:]) + n35, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n34 + i += n35 } return i, nil } @@ -4566,11 +4787,11 @@ func (m *UserChangePasswordResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n35, err := m.Header.MarshalTo(data[i:]) + n36, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n35 + i += n36 } return i, nil } @@ -4594,11 +4815,11 @@ func (m *UserGrantResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n36, err := m.Header.MarshalTo(data[i:]) + n37, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n36 + i += n37 } return i, nil } @@ -4622,11 +4843,11 @@ func (m *UserRevokeResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n37, err := m.Header.MarshalTo(data[i:]) + n38, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n37 + i += n38 } return i, nil } @@ -4650,11 +4871,11 @@ func (m *RoleAddResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n38, err := m.Header.MarshalTo(data[i:]) + n39, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n38 + i += n39 } return i, nil } @@ -4678,11 +4899,11 @@ func (m *RoleGetResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n39, err := m.Header.MarshalTo(data[i:]) + n40, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n39 + i += n40 } return i, nil } @@ -4706,11 +4927,11 @@ func (m *RoleDeleteResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n40, err := m.Header.MarshalTo(data[i:]) + n41, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n40 + i += n41 } return i, nil } @@ -4734,11 +4955,11 @@ func (m *RoleGrantResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n41, err := m.Header.MarshalTo(data[i:]) + n42, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n41 + i += n42 } return i, nil } @@ -4762,11 +4983,11 @@ func (m *RoleRevokeResponse) MarshalTo(data []byte) (int, error) { data[i] = 0xa i++ i = encodeVarintRpc(data, i, uint64(m.Header.Size())) - n42, err := m.Header.MarshalTo(data[i:]) + n43, err := m.Header.MarshalTo(data[i:]) if err != nil { return 0, err } - i += n42 + i += n43 } return i, nil } @@ -5431,6 +5652,49 @@ func (m *DefragmentResponse) Size() (n int) { return n } +func (m *AlarmRequest) Size() (n int) { + var l int + _ = l + if m.Action != 0 { + n += 1 + sovRpc(uint64(m.Action)) + } + if m.MemberID != 0 { + n += 1 + sovRpc(uint64(m.MemberID)) + } + if m.Alarm != 0 { + n += 1 + sovRpc(uint64(m.Alarm)) + } + return n +} + +func (m *AlarmMember) Size() (n int) { + var l int + _ = l + if m.MemberID != 0 { + n += 1 + sovRpc(uint64(m.MemberID)) + } + if m.Alarm != 0 { + n += 1 + sovRpc(uint64(m.Alarm)) + } + return n +} + +func (m *AlarmResponse) Size() (n int) { + var l int + _ = l + if m.Header != nil { + l = m.Header.Size() + n += 1 + l + sovRpc(uint64(l)) + } + if len(m.Alarms) > 0 { + for _, e := range m.Alarms { + l = e.Size() + n += 1 + l + sovRpc(uint64(l)) + } + } + return n +} + func (m *AuthEnableRequest) Size() (n int) { var l int _ = l @@ -9761,6 +10025,315 @@ func (m *DefragmentResponse) Unmarshal(data []byte) error { } return nil } +func (m *AlarmRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AlarmRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AlarmRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Action", wireType) + } + m.Action = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Action |= (AlarmRequest_AlarmAction(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MemberID", wireType) + } + m.MemberID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.MemberID |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Alarm", wireType) + } + m.Alarm = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Alarm |= (AlarmType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *AlarmMember) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AlarmMember: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AlarmMember: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MemberID", wireType) + } + m.MemberID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.MemberID |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Alarm", wireType) + } + m.Alarm = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Alarm |= (AlarmType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *AlarmResponse) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AlarmResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AlarmResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Header == nil { + m.Header = &ResponseHeader{} + } + if err := m.Header.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Alarms", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Alarms = append(m.Alarms, &AlarmMember{}) + if err := m.Alarms[len(m.Alarms)-1].Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *AuthEnableRequest) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index 444b004a7..cbed75a9b 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -79,6 +79,9 @@ service Cluster { service Maintenance { // TODO: move Hash from kv to Maintenance rpc Defragment(DefragmentRequest) returns (DefragmentResponse) {} + + // Alarm activates, deactivates, and queries alarms regarding cluster health. + rpc Alarm(AlarmRequest) returns (AlarmResponse) {} } service Auth { @@ -438,6 +441,33 @@ message DefragmentResponse { ResponseHeader header = 1; } +enum AlarmType { + NONE = 0; // default, used to query if any alarm is active + NOSPACE = 1; +} + +message AlarmRequest { + enum AlarmAction { + GET = 0; + ACTIVATE = 1; + DEACTIVATE = 2; + } + AlarmAction action = 1; + // MemberID is the member raising the alarm request + int64 memberID = 2; + AlarmType alarm = 3; +} + +message AlarmMember { + uint64 memberID = 1; + AlarmType alarm = 2; +} + +message AlarmResponse { + ResponseHeader header = 1; + repeated AlarmMember alarms = 2; +} + message AuthEnableRequest { } diff --git a/etcdserver/quota.go b/etcdserver/quota.go new file mode 100644 index 000000000..4e20e851b --- /dev/null +++ b/etcdserver/quota.go @@ -0,0 +1,95 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/storage/backend" +) + +// Quota represents an arbitrary quota against arbitrary requests. Each request +// costs some charge; if there is not enough remaining charge, then there are +// too few resources available within the quota to apply the request. +type Quota interface { + // Available judges whether the given request fits within the quota. + Available(req interface{}) bool + // Cost computes the charge against the quota for a given request. + Cost(req interface{}) int + // Remaining is the amount of charge left for the quota. + Remaining() int64 +} + +type backendQuota struct { + s *EtcdServer + maxBackendBytes int64 +} + +const ( + // leaseOverhead is an estimate for the cost of storing a lease + leaseOverhead = 64 + // kvOverhead is an estimate for the cost of storing a key's metadata + kvOverhead = 256 +) + +func NewBackendQuota(s *EtcdServer) Quota { + return &backendQuota{s, backend.InitialMmapSize} +} + +func (b *backendQuota) Available(v interface{}) bool { + // TODO: maybe optimize backend.Size() + return b.s.Backend().Size()+int64(b.Cost(v)) < b.maxBackendBytes +} + +func (b *backendQuota) Cost(v interface{}) int { + switch r := v.(type) { + case *pb.PutRequest: + return costPut(r) + case *pb.TxnRequest: + return costTxn(r) + case *pb.LeaseCreateRequest: + return leaseOverhead + default: + panic("unexpected cost") + } +} + +func costPut(r *pb.PutRequest) int { return kvOverhead + len(r.Key) + len(r.Value) } + +func costTxnReq(u *pb.RequestUnion) int { + r := u.GetRequestPut() + if r == nil { + return 0 + } + return costPut(r) +} + +func costTxn(r *pb.TxnRequest) int { + sizeSuccess := 0 + for _, u := range r.Success { + sizeSuccess += costTxnReq(u) + } + sizeFailure := 0 + for _, u := range r.Failure { + sizeFailure += costTxnReq(u) + } + if sizeFailure > sizeSuccess { + return sizeFailure + } + return sizeSuccess +} + +func (b *backendQuota) Remaining() int64 { + return b.maxBackendBytes - b.s.Backend().Size() +} diff --git a/etcdserver/server.go b/etcdserver/server.go index 8e14170e4..881df64d1 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -27,6 +27,7 @@ import ( "sync/atomic" "time" + "github.com/coreos/etcd/alarm" "github.com/coreos/etcd/auth" "github.com/coreos/etcd/compactor" "github.com/coreos/etcd/discovery" @@ -172,11 +173,13 @@ type EtcdServer struct { store store.Store - kv dstorage.ConsistentWatchableKV - lessor lease.Lessor - bemu sync.Mutex - be backend.Backend - authStore auth.AuthStore + applyV3 applierV3 + kv dstorage.ConsistentWatchableKV + lessor lease.Lessor + bemu sync.Mutex + be backend.Backend + authStore auth.AuthStore + alarmStore *alarm.AlarmStore stats *stats.ServerStats lstats *stats.LeaderStats @@ -374,6 +377,10 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { srv.compactor.Run() } + if err := srv.restoreAlarms(); err != nil { + return nil, err + } + // TODO: move transport initialization near the definition of remote tr := &rafthttp.Transport{ TLSInfo: cfg.PeerTLSInfo, @@ -613,6 +620,10 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { s.lessor.Recover(newbe, s.kv) } + if err := s.restoreAlarms(); err != nil { + plog.Panicf("restore alarms error: %v", err) + } + if s.authStore != nil { s.authStore.Recover(newbe) } @@ -1005,14 +1016,26 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint var r pb.Request pbutil.MustUnmarshal(&r, e.Data) s.w.Trigger(r.ID, s.applyRequest(r)) + } else if raftReq.V2 != nil { + req := raftReq.V2 + s.w.Trigger(req.ID, s.applyRequest(*req)) } else { - switch { - case raftReq.V2 != nil: - req := raftReq.V2 - s.w.Trigger(req.ID, s.applyRequest(*req)) - default: - s.w.Trigger(raftReq.ID, s.applyV3Request(&raftReq)) + ar := s.applyV3Request(&raftReq) + if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 { + s.w.Trigger(raftReq.ID, ar) + break } + plog.Errorf("applying raft message exceeded backend quota") + go func() { + a := &pb.AlarmRequest{ + MemberID: int64(s.ID()), + Action: pb.AlarmRequest_ACTIVATE, + Alarm: pb.AlarmType_NOSPACE, + } + r := pb.InternalRaftRequest{Alarm: a} + s.processInternalRaftRequest(context.TODO(), r) + s.w.Trigger(raftReq.ID, ar) + }() } case raftpb.EntryConfChange: var cc raftpb.ConfChange @@ -1319,3 +1342,17 @@ func (s *EtcdServer) Backend() backend.Backend { } func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore } + +func (s *EtcdServer) restoreAlarms() error { + s.applyV3 = newQuotaApplierV3(s, &applierV3backend{s}) + + as, err := alarm.NewAlarmStore(s) + if err != nil { + return err + } + s.alarmStore = as + if len(as.Get(pb.AlarmType_NOSPACE)) > 0 { + s.applyV3 = newApplierV3Capped(s.applyV3) + } + return nil +} diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 34866b7fb..395ed749d 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -15,17 +15,12 @@ package etcdserver import ( - "bytes" - "fmt" - "sort" "time" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/lease/leasehttp" dstorage "github.com/coreos/etcd/storage" - "github.com/coreos/etcd/storage/storagepb" - "github.com/gogo/protobuf/proto" "golang.org/x/net/context" ) @@ -64,7 +59,7 @@ type Authenticator interface { func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { if r.Serializable { - return applyRange(noTxn, s.kv, r) + return s.applyV3.Range(noTxn, r) } result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r}) @@ -178,6 +173,14 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) { return ttl, err } +func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) { + result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Alarm: r}) + if err != nil { + return nil, err + } + return result.resp.(*pb.AlarmResponse), result.err +} + func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) { result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthEnable: r}) if err != nil { @@ -194,11 +197,6 @@ func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.UserAddRequest) (*pb.Use return result.resp.(*pb.UserAddResponse), result.err } -type applyResult struct { - resp proto.Message - err error -} - func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) { r.ID = s.reqIDGen.Next() @@ -230,448 +228,3 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern func (s *EtcdServer) Watchable() dstorage.Watchable { return s.getKV() } - -const ( - // noTxn is an invalid txn ID. - // To apply with independent Range, Put, Delete, you can pass noTxn - // to apply functions instead of a valid txn ID. - noTxn = -1 -) - -func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) interface{} { - kv := s.getKV() - le := s.lessor - - ar := &applyResult{} - - switch { - case r.Range != nil: - ar.resp, ar.err = applyRange(noTxn, kv, r.Range) - case r.Put != nil: - ar.resp, ar.err = applyPut(noTxn, kv, le, r.Put) - case r.DeleteRange != nil: - ar.resp, ar.err = applyDeleteRange(noTxn, kv, r.DeleteRange) - case r.Txn != nil: - ar.resp, ar.err = applyTxn(kv, le, r.Txn) - case r.Compaction != nil: - ar.resp, ar.err = applyCompaction(kv, r.Compaction) - case r.LeaseCreate != nil: - ar.resp, ar.err = applyLeaseCreate(le, r.LeaseCreate) - case r.LeaseRevoke != nil: - ar.resp, ar.err = applyLeaseRevoke(le, r.LeaseRevoke) - case r.AuthEnable != nil: - ar.resp, ar.err = applyAuthEnable(s) - case r.UserAdd != nil: - ar.resp, ar.err = applyUserAdd(s, r.UserAdd) - default: - panic("not implemented") - } - - return ar -} - -func applyPut(txnID int64, kv dstorage.KV, le lease.Lessor, p *pb.PutRequest) (*pb.PutResponse, error) { - resp := &pb.PutResponse{} - resp.Header = &pb.ResponseHeader{} - var ( - rev int64 - err error - ) - if txnID != noTxn { - rev, err = kv.TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease)) - if err != nil { - return nil, err - } - } else { - leaseID := lease.LeaseID(p.Lease) - if leaseID != lease.NoLease { - if l := le.Lookup(leaseID); l == nil { - return nil, lease.ErrLeaseNotFound - } - } - rev = kv.Put(p.Key, p.Value, leaseID) - } - resp.Header.Revision = rev - return resp, nil -} - -type kvSort struct{ kvs []storagepb.KeyValue } - -func (s *kvSort) Swap(i, j int) { - t := s.kvs[i] - s.kvs[i] = s.kvs[j] - s.kvs[j] = t -} -func (s *kvSort) Len() int { return len(s.kvs) } - -type kvSortByKey struct{ *kvSort } - -func (s *kvSortByKey) Less(i, j int) bool { - return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0 -} - -type kvSortByVersion struct{ *kvSort } - -func (s *kvSortByVersion) Less(i, j int) bool { - return (s.kvs[i].Version - s.kvs[j].Version) < 0 -} - -type kvSortByCreate struct{ *kvSort } - -func (s *kvSortByCreate) Less(i, j int) bool { - return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0 -} - -type kvSortByMod struct{ *kvSort } - -func (s *kvSortByMod) Less(i, j int) bool { - return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0 -} - -type kvSortByValue struct{ *kvSort } - -func (s *kvSortByValue) Less(i, j int) bool { - return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0 -} - -func applyRange(txnID int64, kv dstorage.KV, r *pb.RangeRequest) (*pb.RangeResponse, error) { - resp := &pb.RangeResponse{} - resp.Header = &pb.ResponseHeader{} - - var ( - kvs []storagepb.KeyValue - rev int64 - err error - ) - - if isGteRange(r.RangeEnd) { - r.RangeEnd = []byte{} - } - - limit := r.Limit - if r.SortOrder != pb.RangeRequest_NONE { - // fetch everything; sort and truncate afterwards - limit = 0 - } - if limit > 0 { - // fetch one extra for 'more' flag - limit = limit + 1 - } - - if txnID != noTxn { - kvs, rev, err = kv.TxnRange(txnID, r.Key, r.RangeEnd, limit, r.Revision) - if err != nil { - return nil, err - } - } else { - kvs, rev, err = kv.Range(r.Key, r.RangeEnd, limit, r.Revision) - if err != nil { - return nil, err - } - } - - if r.SortOrder != pb.RangeRequest_NONE { - var sorter sort.Interface - switch { - case r.SortTarget == pb.RangeRequest_KEY: - sorter = &kvSortByKey{&kvSort{kvs}} - case r.SortTarget == pb.RangeRequest_VERSION: - sorter = &kvSortByVersion{&kvSort{kvs}} - case r.SortTarget == pb.RangeRequest_CREATE: - sorter = &kvSortByCreate{&kvSort{kvs}} - case r.SortTarget == pb.RangeRequest_MOD: - sorter = &kvSortByMod{&kvSort{kvs}} - case r.SortTarget == pb.RangeRequest_VALUE: - sorter = &kvSortByValue{&kvSort{kvs}} - } - switch { - case r.SortOrder == pb.RangeRequest_ASCEND: - sort.Sort(sorter) - case r.SortOrder == pb.RangeRequest_DESCEND: - sort.Sort(sort.Reverse(sorter)) - } - } - - if r.Limit > 0 && len(kvs) > int(r.Limit) { - kvs = kvs[:r.Limit] - resp.More = true - } - - resp.Header.Revision = rev - for i := range kvs { - resp.Kvs = append(resp.Kvs, &kvs[i]) - } - return resp, nil -} - -func applyDeleteRange(txnID int64, kv dstorage.KV, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { - resp := &pb.DeleteRangeResponse{} - resp.Header = &pb.ResponseHeader{} - - var ( - n int64 - rev int64 - err error - ) - - if isGteRange(dr.RangeEnd) { - dr.RangeEnd = []byte{} - } - - if txnID != noTxn { - n, rev, err = kv.TxnDeleteRange(txnID, dr.Key, dr.RangeEnd) - if err != nil { - return nil, err - } - } else { - n, rev = kv.DeleteRange(dr.Key, dr.RangeEnd) - } - - resp.Deleted = n - resp.Header.Revision = rev - return resp, nil -} - -func checkRequestLeases(le lease.Lessor, reqs []*pb.RequestUnion) error { - for _, requ := range reqs { - tv, ok := requ.Request.(*pb.RequestUnion_RequestPut) - if !ok { - continue - } - preq := tv.RequestPut - if preq == nil || lease.LeaseID(preq.Lease) == lease.NoLease { - continue - } - if l := le.Lookup(lease.LeaseID(preq.Lease)); l == nil { - return lease.ErrLeaseNotFound - } - } - return nil -} - -func checkRequestRange(kv dstorage.KV, reqs []*pb.RequestUnion) error { - for _, requ := range reqs { - tv, ok := requ.Request.(*pb.RequestUnion_RequestRange) - if !ok { - continue - } - greq := tv.RequestRange - if greq == nil || greq.Revision == 0 { - continue - } - - if greq.Revision > kv.Rev() { - return dstorage.ErrFutureRev - } - if greq.Revision < kv.FirstRev() { - return dstorage.ErrCompacted - } - } - return nil -} - -func applyTxn(kv dstorage.KV, le lease.Lessor, rt *pb.TxnRequest) (*pb.TxnResponse, error) { - var revision int64 - - ok := true - for _, c := range rt.Compare { - if revision, ok = applyCompare(kv, c); !ok { - break - } - } - - var reqs []*pb.RequestUnion - if ok { - reqs = rt.Success - } else { - reqs = rt.Failure - } - - if err := checkRequestLeases(le, reqs); err != nil { - return nil, err - } - if err := checkRequestRange(kv, reqs); err != nil { - return nil, err - } - - // When executing the operations of txn, we need to hold the txn lock. - // So the reader will not see any intermediate results. - txnID := kv.TxnBegin() - defer func() { - err := kv.TxnEnd(txnID) - if err != nil { - panic(fmt.Sprint("unexpected error when closing txn", txnID)) - } - }() - - resps := make([]*pb.ResponseUnion, len(reqs)) - for i := range reqs { - resps[i] = applyUnion(txnID, kv, reqs[i]) - } - - if len(resps) != 0 { - revision += 1 - } - - txnResp := &pb.TxnResponse{} - txnResp.Header = &pb.ResponseHeader{} - txnResp.Header.Revision = revision - txnResp.Responses = resps - txnResp.Succeeded = ok - return txnResp, nil -} - -func applyCompaction(kv dstorage.KV, compaction *pb.CompactionRequest) (*pb.CompactionResponse, error) { - resp := &pb.CompactionResponse{} - resp.Header = &pb.ResponseHeader{} - err := kv.Compact(compaction.Revision) - if err != nil { - return nil, err - } - // get the current revision. which key to get is not important. - _, resp.Header.Revision, _ = kv.Range([]byte("compaction"), nil, 1, 0) - return resp, err -} - -func applyUnion(txnID int64, kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion { - switch tv := union.Request.(type) { - case *pb.RequestUnion_RequestRange: - if tv.RequestRange != nil { - resp, err := applyRange(txnID, kv, tv.RequestRange) - if err != nil { - panic("unexpected error during txn") - } - return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponseRange{ResponseRange: resp}} - } - case *pb.RequestUnion_RequestPut: - if tv.RequestPut != nil { - resp, err := applyPut(txnID, kv, nil, tv.RequestPut) - if err != nil { - panic("unexpected error during txn") - } - return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponsePut{ResponsePut: resp}} - } - case *pb.RequestUnion_RequestDeleteRange: - if tv.RequestDeleteRange != nil { - resp, err := applyDeleteRange(txnID, kv, tv.RequestDeleteRange) - if err != nil { - panic("unexpected error during txn") - } - return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponseDeleteRange{ResponseDeleteRange: resp}} - } - default: - // empty union - return nil - } - return nil -} - -// applyCompare applies the compare request. -// It returns the revision at which the comparison happens. If the comparison -// succeeds, the it returns true. Otherwise it returns false. -func applyCompare(kv dstorage.KV, c *pb.Compare) (int64, bool) { - ckvs, rev, err := kv.Range(c.Key, nil, 1, 0) - if err != nil { - if err == dstorage.ErrTxnIDMismatch { - panic("unexpected txn ID mismatch error") - } - return rev, false - } - var ckv storagepb.KeyValue - if len(ckvs) != 0 { - ckv = ckvs[0] - } else { - // Use the zero value of ckv normally. However... - if c.Target == pb.Compare_VALUE { - // Always fail if we're comparing a value on a key that doesn't exist. - // We can treat non-existence as the empty set explicitly, such that - // even a key with a value of length 0 bytes is still a real key - // that was written that way - return rev, false - } - } - - // -1 is less, 0 is equal, 1 is greater - var result int - switch c.Target { - case pb.Compare_VALUE: - tv, _ := c.TargetUnion.(*pb.Compare_Value) - if tv != nil { - result = bytes.Compare(ckv.Value, tv.Value) - } - case pb.Compare_CREATE: - tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision) - if tv != nil { - result = compareInt64(ckv.CreateRevision, tv.CreateRevision) - } - - case pb.Compare_MOD: - tv, _ := c.TargetUnion.(*pb.Compare_ModRevision) - if tv != nil { - result = compareInt64(ckv.ModRevision, tv.ModRevision) - } - case pb.Compare_VERSION: - tv, _ := c.TargetUnion.(*pb.Compare_Version) - if tv != nil { - result = compareInt64(ckv.Version, tv.Version) - } - } - - switch c.Result { - case pb.Compare_EQUAL: - if result != 0 { - return rev, false - } - case pb.Compare_GREATER: - if result != 1 { - return rev, false - } - case pb.Compare_LESS: - if result != -1 { - return rev, false - } - } - return rev, true -} - -func applyLeaseCreate(le lease.Lessor, lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { - l, err := le.Grant(lease.LeaseID(lc.ID), lc.TTL) - resp := &pb.LeaseCreateResponse{} - if err == nil { - resp.ID = int64(l.ID) - resp.TTL = l.TTL - } - return resp, err -} - -func applyLeaseRevoke(le lease.Lessor, lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { - err := le.Revoke(lease.LeaseID(lc.ID)) - - return &pb.LeaseRevokeResponse{}, err -} - -func compareInt64(a, b int64) int { - switch { - case a < b: - return -1 - case a > b: - return 1 - default: - return 0 - } -} - -// isGteRange determines if the range end is a >= range. This works around grpc -// sending empty byte strings as nil; >= is encoded in the range end as '\0'. -func isGteRange(rangeEnd []byte) bool { - return len(rangeEnd) == 1 && rangeEnd[0] == 0 -} - -func applyAuthEnable(s *EtcdServer) (*pb.AuthEnableResponse, error) { - s.AuthStore().AuthEnable() - return &pb.AuthEnableResponse{}, nil -} - -func applyUserAdd(s *EtcdServer, r *pb.UserAddRequest) (*pb.UserAddResponse, error) { - return s.AuthStore().UserAdd(r) -} diff --git a/integration/cluster.go b/integration/cluster.go index 373679e3a..bbf394375 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -757,6 +757,8 @@ type grpcAPI struct { Lease pb.LeaseClient // Watch is the watch API for the client's connection. Watch pb.WatchClient + // Maintenance is the maintenance API for the client's connection. + Maintenance pb.MaintenanceClient } func toGRPC(c *clientv3.Client) grpcAPI { @@ -765,5 +767,6 @@ func toGRPC(c *clientv3.Client) grpcAPI { pb.NewKVClient(c.ActiveConnection()), pb.NewLeaseClient(c.ActiveConnection()), pb.NewWatchClient(c.ActiveConnection()), + pb.NewMaintenanceClient(c.ActiveConnection()), } } diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 3cc2cbf5c..57de58507 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" + "github.com/coreos/etcd/storage/backend" "golang.org/x/net/context" "google.golang.org/grpc" ) @@ -455,6 +456,142 @@ func TestV3Hash(t *testing.T) { } } +// TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer +func TestV3StorageQuotaAPI(t *testing.T) { + oldSize := backend.InitialMmapSize + defer func() { + backend.InitialMmapSize = oldSize + testutil.AfterTest(t) + }() + + backend.InitialMmapSize = 64 * 1024 + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + kvc := toGRPC(clus.RandClient()).KV + + key := []byte("abc") + + // test small put that fits in quota + smallbuf := make([]byte, 512) + if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil { + t.Fatal(err) + } + + // test big put + bigbuf := make([]byte, 64*1024) + _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf}) + if err == nil || err != rpctypes.ErrNoSpace { + t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrNoSpace) + } + + // test big txn + puttxn := &pb.RequestUnion{ + Request: &pb.RequestUnion_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: key, + Value: bigbuf, + }, + }, + } + txnreq := &pb.TxnRequest{} + txnreq.Success = append(txnreq.Success, puttxn) + _, txnerr := kvc.Txn(context.TODO(), txnreq) + if txnerr == nil || err != rpctypes.ErrNoSpace { + t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrNoSpace) + } +} + +// TestV3StorageQuotaApply tests the V3 server respects quotas during apply +func TestV3StorageQuotaApply(t *testing.T) { + oldSize := backend.InitialMmapSize + defer func() { + backend.InitialMmapSize = oldSize + testutil.AfterTest(t) + }() + + clus := NewClusterV3(t, &ClusterConfig{Size: 2}) + defer clus.Terminate(t) + kvc0 := toGRPC(clus.Client(0)).KV + kvc1 := toGRPC(clus.Client(1)).KV + + // force a node to have a different quota + backend.InitialMmapSize = 64 * 1024 + clus.Members[0].Stop(t) + clus.Members[0].Restart(t) + clus.waitLeader(t, clus.Members) + + key := []byte("abc") + + // test small put still works + smallbuf := make([]byte, 1024) + _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}) + if serr != nil { + t.Fatal(serr) + } + + // test big put + bigbuf := make([]byte, 64*1024) + _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf}) + if err != nil { + t.Fatal(err) + } + + // small quota machine should reject put + // first, synchronize with the cluster via quorum get + kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}) + if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { + t.Fatalf("past-quota instance should reject put") + } + + // large quota machine should reject put + if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { + t.Fatalf("past-quota instance should reject put") + } + + // reset large quota node to ensure alarm persisted + backend.InitialMmapSize = oldSize + clus.Members[1].Stop(t) + clus.Members[1].Restart(t) + clus.waitLeader(t, clus.Members) + + if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { + t.Fatalf("alarmed instance should reject put after reset") + } +} + +// TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through. +func TestV3AlarmDeactivate(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + kvc := toGRPC(clus.RandClient()).KV + mt := toGRPC(clus.RandClient()).Maintenance + + alarmReq := &pb.AlarmRequest{ + MemberID: 123, + Action: pb.AlarmRequest_ACTIVATE, + Alarm: pb.AlarmType_NOSPACE, + } + if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil { + t.Fatal(err) + } + + key := []byte("abc") + smallbuf := make([]byte, 512) + _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}) + if err == nil && err != rpctypes.ErrNoSpace { + t.Fatalf("put got %v, expected %v", err, rpctypes.ErrNoSpace) + } + + alarmReq.Action = pb.AlarmRequest_DEACTIVATE + if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil { + t.Fatal(err) + } + + if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil { + t.Fatal(err) + } +} + func TestV3RangeRequest(t *testing.T) { defer testutil.AfterTest(t) tests := []struct { diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index 387137124..d59833cd1 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -31,6 +31,7 @@ type BatchTx interface { UnsafeSeqPut(bucketName []byte, key []byte, value []byte) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) UnsafeDelete(bucketName []byte, key []byte) + UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error Commit() CommitAndStop() } @@ -122,6 +123,11 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { t.pending++ } +// UnsafeForEach must be called holding the lock on the tx. +func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { + return t.tx.Bucket(bucketName).ForEach(visitor) +} + // Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { t.Lock() diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 4b9660cde..6a7905a0f 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -584,6 +584,9 @@ func (b *fakeBatchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit i func (b *fakeBatchTx) UnsafeDelete(bucketName []byte, key []byte) { b.Recorder.Record(testutil.Action{Name: "delete", Params: []interface{}{bucketName, key}}) } +func (b *fakeBatchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { + return nil +} func (b *fakeBatchTx) Commit() {} func (b *fakeBatchTx) CommitAndStop() {} diff --git a/test b/test index 8882667f7..60551b6ed 100755 --- a/test +++ b/test @@ -21,7 +21,7 @@ TESTABLE_AND_FORMATTABLE="client clientv3 discovery error etcdctl/ctlv2 etcdctl/ # TODO: add it to race testing when the issue is resolved # https://github.com/golang/go/issues/9946 NO_RACE_TESTABLE="rafthttp" -FORMATTABLE="$TESTABLE_AND_FORMATTABLE $NO_RACE_TESTABLE *.go etcdctl/ integration clientv3/integration e2e" +FORMATTABLE="$TESTABLE_AND_FORMATTABLE $NO_RACE_TESTABLE *.go etcdctl/ integration clientv3/integration e2e alarm" # user has not provided PKG override if [ -z "$PKG" ]; then