etcd/etcdserver/v3_server.go

307 lines
10 KiB
Go
Raw Normal View History

2015-08-08 15:58:29 +03:00
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import (
"time"
2015-08-14 21:45:31 +03:00
2015-08-08 15:58:29 +03:00
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
2016-01-05 21:16:50 +03:00
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/lease/leasehttp"
2016-04-25 22:32:58 +03:00
"github.com/coreos/etcd/mvcc"
2016-03-23 03:10:28 +03:00
"golang.org/x/net/context"
2015-08-08 15:58:29 +03:00
)
2016-02-08 23:44:25 +03:00
const (
// the max request size that raft accepts.
// TODO: make this a flag? But we probably do not want to
// accept large request which might block raft stream. User
// specify a large value might end up with shooting in the foot.
maxRequestBytes = 1.5 * 1024 * 1024
// max timeout for waiting a v3 request to go through raft.
maxV3RequestTimeout = 5 * time.Second
2016-02-08 23:44:25 +03:00
)
2015-11-04 22:23:33 +03:00
type RaftKV interface {
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
}
type Lessor interface {
// LeaseGrant sends LeaseGrant request to raft and apply it after committed.
LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
2016-01-10 00:26:45 +03:00
// LeaseRevoke sends LeaseRevoke request to raft and apply it after committed.
LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
2016-01-10 00:26:45 +03:00
// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
// is returned.
LeaseRenew(id lease.LeaseID) (int64, error)
}
type Authenticator interface {
AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error)
Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error)
UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
UserGrant(ctx context.Context, r *pb.AuthUserGrantRequest) (*pb.AuthUserGrantResponse, error)
2016-04-01 05:07:43 +03:00
RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
RoleGrant(ctx context.Context, r *pb.AuthRoleGrantRequest) (*pb.AuthRoleGrantResponse, error)
}
2015-11-04 22:23:33 +03:00
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
2016-02-12 23:04:06 +03:00
if r.Serializable {
return s.applyV3.Range(noTxn, r)
2016-02-12 23:04:06 +03:00
}
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r})
2015-11-04 22:23:33 +03:00
if err != nil {
return nil, err
}
return result.resp.(*pb.RangeResponse), result.err
}
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Put: r})
2015-11-04 22:23:33 +03:00
if err != nil {
return nil, err
}
return result.resp.(*pb.PutResponse), result.err
}
func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r})
2015-11-04 22:23:33 +03:00
if err != nil {
return nil, err
}
return result.resp.(*pb.DeleteRangeResponse), result.err
}
func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
if isTxnSerializable(r) {
return s.applyV3.Txn(r)
}
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Txn: r})
2015-11-04 22:23:33 +03:00
if err != nil {
return nil, err
}
return result.resp.(*pb.TxnResponse), result.err
}
func isTxnSerializable(r *pb.TxnRequest) bool {
for _, u := range r.Success {
if r := u.GetRequestRange(); r == nil || !r.Serializable {
return false
}
}
for _, u := range r.Failure {
if r := u.GetRequestRange(); r == nil || !r.Serializable {
return false
}
}
return true
}
2015-11-04 22:23:33 +03:00
func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Compaction: r})
if r.Physical && result.physc != nil {
<-result.physc
// The compaction is done deleting keys; the hash is now settled
// but the data is not necessarily committed. If there's a crash,
// the hash may revert to a hash prior to compaction completing
// if the compaction resumes. Force the finished compaction to
// commit so it won't resume following a crash.
s.be.ForceCommit()
}
if err != nil {
return nil, err
}
resp := result.resp.(*pb.CompactionResponse)
if resp == nil {
resp = &pb.CompactionResponse{}
}
if resp.Header == nil {
resp.Header = &pb.ResponseHeader{}
}
resp.Header.Revision = s.kv.Rev()
return resp, result.err
2015-08-08 20:14:42 +03:00
}
func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
// no id given? choose one
for r.ID == int64(lease.NoLease) {
// only use positive int64 id's
r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1))
}
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{LeaseGrant: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.LeaseGrantResponse), result.err
}
func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.LeaseRevokeResponse), result.err
}
2016-01-10 00:26:45 +03:00
func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
ttl, err := s.lessor.Renew(id)
if err == nil {
return ttl, nil
}
if err != lease.ErrNotPrimary {
return -1, err
}
// renewals don't go through raft; forward to leader manually
leader := s.cluster.Member(s.Leader())
for i := 0; i < 5 && leader == nil; i++ {
// wait an election
dur := time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond
select {
case <-time.After(dur):
leader = s.cluster.Member(s.Leader())
case <-s.done:
return -1, ErrStopped
}
}
if leader == nil || len(leader.PeerURLs) == 0 {
return -1, ErrNoLeader
}
for _, url := range leader.PeerURLs {
lurl := url + "/leases"
ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.cfg.peerDialTimeout())
if err == nil {
break
}
}
return ttl, err
2016-01-10 00:26:45 +03:00
}
func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Alarm: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.AlarmResponse), result.err
}
func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthEnable: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.AuthEnableResponse), result.err
}
func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Authenticate: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.AuthenticateResponse), result.err
}
func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.AuthUserAddResponse), result.err
}
func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.AuthUserDeleteResponse), result.err
}
func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.AuthUserChangePasswordResponse), result.err
}
func (s *EtcdServer) UserGrant(ctx context.Context, r *pb.AuthUserGrantRequest) (*pb.AuthUserGrantResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserGrant: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.AuthUserGrantResponse), result.err
}
2016-04-01 05:07:43 +03:00
func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.AuthRoleAddResponse), result.err
}
func (s *EtcdServer) RoleGrant(ctx context.Context, r *pb.AuthRoleGrantRequest) (*pb.AuthRoleGrantResponse, error) {
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrant: r})
if err != nil {
return nil, err
}
return result.resp.(*pb.AuthRoleGrantResponse), result.err
}
func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
r.ID = s.reqIDGen.Next()
data, err := r.Marshal()
if err != nil {
2015-11-04 22:23:33 +03:00
return nil, err
}
2016-02-08 23:44:25 +03:00
if len(data) > maxRequestBytes {
return nil, ErrRequestTooLarge
}
ch := s.w.Register(r.ID)
cctx, cancel := context.WithTimeout(ctx, maxV3RequestTimeout)
defer cancel()
s.r.Propose(cctx, data)
select {
case x := <-ch:
2015-11-04 22:23:33 +03:00
return x.(*applyResult), nil
case <-cctx.Done():
s.w.Trigger(r.ID, nil) // GC wait
return nil, cctx.Err()
case <-s.done:
2015-11-04 22:23:33 +03:00
return nil, ErrStopped
}
}
2016-01-08 11:21:19 +03:00
// Watchable returns a watchable interface attached to the etcdserver.
2016-04-25 22:32:58 +03:00
func (s *EtcdServer) Watchable() mvcc.Watchable { return s.KV() }