From 8e821cdc707fe4b775f6673930ee26e5932288c1 Mon Sep 17 00:00:00 2001 From: Hitoshi Mitake Date: Tue, 17 May 2016 15:20:50 +0900 Subject: [PATCH] *: do permission check in raft log apply phase This commit lets etcdserver check permission during its log applying phase. With this change, permission checking of operations is supported. Currently, put and range are supported. In addition, multi key permission check of range isn't supported yet. --- auth/simple_token.go | 11 +- auth/store.go | 100 ++++++- etcdserver/api/v3rpc/rpctypes/error.go | 1 + etcdserver/apply.go | 13 +- etcdserver/errors.go | 1 + etcdserver/etcdserverpb/etcdserver.pb.go | 1 + etcdserver/etcdserverpb/raft_internal.pb.go | 281 +++++++++++++++++--- etcdserver/etcdserverpb/raft_internal.proto | 8 + etcdserver/server.go | 10 +- etcdserver/v3_server.go | 35 ++- 10 files changed, 412 insertions(+), 49 deletions(-) diff --git a/auth/simple_token.go b/auth/simple_token.go index 33e245027..b2579116b 100644 --- a/auth/simple_token.go +++ b/auth/simple_token.go @@ -20,6 +20,7 @@ package auth import ( "crypto/rand" "math/big" + "sync" ) const ( @@ -28,7 +29,8 @@ const ( ) var ( - simpleTokens map[string]string // token -> user ID + simpleTokensMu sync.RWMutex + simpleTokens map[string]string // token -> username ) func init() { @@ -50,7 +52,7 @@ func genSimpleToken() (string, error) { return string(ret), nil } -func genSimpleTokenForUser(userID string) (string, error) { +func genSimpleTokenForUser(username string) (string, error) { var token string var err error @@ -66,6 +68,9 @@ func genSimpleTokenForUser(userID string) (string, error) { } } - simpleTokens[token] = userID + simpleTokensMu.Lock() + simpleTokens[token] = username + simpleTokensMu.Unlock() + return token, nil } diff --git a/auth/store.go b/auth/store.go index be1c9a61c..20b60adee 100644 --- a/auth/store.go +++ b/auth/store.go @@ -19,6 +19,7 @@ import ( "errors" "sort" "strings" + "sync" "github.com/coreos/etcd/auth/authpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -76,10 +77,21 @@ type AuthStore interface { // RoleGrant grants a permission to a role RoleGrant(r *pb.AuthRoleGrantRequest) (*pb.AuthRoleGrantResponse, error) + + // UsernameFromToken gets a username from the given Token + UsernameFromToken(token string) (string, bool) + + // IsPutPermitted checks put permission of the user + IsPutPermitted(header *pb.RequestHeader, key string) bool + + // IsRangePermitted checks range permission of the user + IsRangePermitted(header *pb.RequestHeader, key string) bool } type authStore struct { - be backend.Backend + be backend.Backend + enabled bool + enabledMu sync.RWMutex } func (as *authStore) AuthEnable() { @@ -92,6 +104,10 @@ func (as *authStore) AuthEnable() { tx.Unlock() b.ForceCommit() + as.enabledMu.Lock() + as.enabled = true + as.enabledMu.Unlock() + plog.Noticef("Authentication enabled") } @@ -105,6 +121,10 @@ func (as *authStore) AuthDisable() { tx.Unlock() b.ForceCommit() + as.enabledMu.Lock() + as.enabled = false + as.enabledMu.Unlock() + plog.Noticef("Authentication disabled") } @@ -299,6 +319,13 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, return &pb.AuthRoleAddResponse{}, nil } +func (as *authStore) UsernameFromToken(token string) (string, bool) { + simpleTokensMu.RLock() + defer simpleTokensMu.RUnlock() + t, ok := simpleTokens[token] + return t, ok +} + type permSlice []*authpb.Permission func (perms permSlice) Len() int { @@ -361,6 +388,77 @@ func (as *authStore) RoleGrant(r *pb.AuthRoleGrantRequest) (*pb.AuthRoleGrantRes return &pb.AuthRoleGrantResponse{}, nil } +func (as *authStore) isOpPermitted(userName string, key string, write bool, read bool) bool { + // TODO(mitake): this function would be costly so we need a caching mechanism + if !as.isAuthEnabled() { + return true + } + + tx := as.be.BatchTx() + tx.Lock() + defer tx.Unlock() + + _, vs := tx.UnsafeRange(authUsersBucketName, []byte(userName), nil, 0) + if len(vs) != 1 { + plog.Errorf("invalid user name %s for permission checking", userName) + return false + } + + user := &authpb.User{} + err := user.Unmarshal(vs[0]) + if err != nil { + plog.Errorf("failed to unmarshal user struct (name: %s): %s", userName, err) + return false + } + + for _, roleName := range user.Roles { + _, vs := tx.UnsafeRange(authRolesBucketName, []byte(roleName), nil, 0) + if len(vs) != 1 { + plog.Errorf("invalid role name %s for permission checking", roleName) + return false + } + + role := &authpb.Role{} + err := role.Unmarshal(vs[0]) + if err != nil { + plog.Errorf("failed to unmarshal a role %s: %s", roleName, err) + return false + } + + for _, perm := range role.KeyPermission { + if bytes.Equal(perm.Key, []byte(key)) { + if perm.PermType == authpb.READWRITE { + return true + } + + if write && !read && perm.PermType == authpb.WRITE { + return true + } + + if read && !write && perm.PermType == authpb.READ { + return true + } + } + } + } + + return false +} + +func (as *authStore) IsPutPermitted(header *pb.RequestHeader, key string) bool { + return as.isOpPermitted(header.Username, key, true, false) +} + +func (as *authStore) IsRangePermitted(header *pb.RequestHeader, key string) bool { + return as.isOpPermitted(header.Username, key, false, true) +} + +func (as *authStore) isAuthEnabled() bool { + as.enabledMu.RLock() + defer as.enabledMu.RUnlock() + return as.enabled +} + func NewAuthStore(be backend.Backend) *authStore { tx := be.BatchTx() tx.Lock() diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 16909287f..67e0134f0 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -43,6 +43,7 @@ var ( ErrGRPCRoleAlreadyExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: role name already exists") ErrGRPCRoleNotFound = grpc.Errorf(codes.FailedPrecondition, "etcdserver: role name not found") ErrGRPCAuthFailed = grpc.Errorf(codes.InvalidArgument, "etcdserver: authentication failed, invalid user ID or password") + ErrGRPCPermissionDenied = grpc.Errorf(codes.FailedPrecondition, "etcdserver: permission denied") ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader") ErrGRPCNotCapable = grpc.Errorf(codes.Unavailable, "etcdserver: not capable") diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 53bed5343..2a5a315eb 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -19,6 +19,7 @@ import ( "fmt" "sort" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc" @@ -72,9 +73,17 @@ func (s *EtcdServer) applyV3Request(r *pb.InternalRaftRequest) *applyResult { ar := &applyResult{} switch { case r.Range != nil: - ar.resp, ar.err = s.applyV3.Range(noTxn, r.Range) + if s.AuthStore().IsRangePermitted(r.Header, string(r.Range.Key)) { + ar.resp, ar.err = s.applyV3.Range(noTxn, r.Range) + } else { + ar.err = rpctypes.ErrGRPCPermissionDenied + } case r.Put != nil: - ar.resp, ar.err = s.applyV3.Put(noTxn, r.Put) + if s.AuthStore().IsPutPermitted(r.Header, string(r.Put.Key)) { + ar.resp, ar.err = s.applyV3.Put(noTxn, r.Put) + } else { + ar.err = rpctypes.ErrGRPCPermissionDenied + } case r.DeleteRange != nil: ar.resp, ar.err = s.applyV3.DeleteRange(noTxn, r.DeleteRange) case r.Txn != nil: diff --git a/etcdserver/errors.go b/etcdserver/errors.go index 0c00512a2..1b9789fc8 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -30,6 +30,7 @@ var ( ErrNoLeader = errors.New("etcdserver: no leader") ErrRequestTooLarge = errors.New("etcdserver: request is too large") ErrNoSpace = errors.New("etcdserver: no space") + ErrInvalidAuthToken = errors.New("etcdserver: invalid auth token") ) type DiscoveryError struct { diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index 872997b61..af5c25c6e 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -13,6 +13,7 @@ It has these top-level messages: Request Metadata + RequestHeader InternalRaftRequest EmptyResponse ResponseHeader diff --git a/etcdserver/etcdserverpb/raft_internal.pb.go b/etcdserver/etcdserverpb/raft_internal.pb.go index 84a56ab14..d46ffe9d2 100644 --- a/etcdserver/etcdserverpb/raft_internal.pb.go +++ b/etcdserver/etcdserverpb/raft_internal.pb.go @@ -19,9 +19,21 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf +type RequestHeader struct { + ID uint64 `protobuf:"varint,1,opt,name=ID,json=iD,proto3" json:"ID,omitempty"` + // username is a username that is associated with an auth token of gRPC connection + Username string `protobuf:"bytes,2,opt,name=username,proto3" json:"username,omitempty"` +} + +func (m *RequestHeader) Reset() { *m = RequestHeader{} } +func (m *RequestHeader) String() string { return proto.CompactTextString(m) } +func (*RequestHeader) ProtoMessage() {} +func (*RequestHeader) Descriptor() ([]byte, []int) { return fileDescriptorRaftInternal, []int{0} } + // An InternalRaftRequest is the union of all requests which can be // sent via raft. type InternalRaftRequest struct { + Header *RequestHeader `protobuf:"bytes,100,opt,name=header" json:"header,omitempty"` ID uint64 `protobuf:"varint,1,opt,name=ID,json=iD,proto3" json:"ID,omitempty"` V2 *Request `protobuf:"bytes,2,opt,name=v2" json:"v2,omitempty"` Range *RangeRequest `protobuf:"bytes,3,opt,name=range" json:"range,omitempty"` @@ -46,7 +58,7 @@ type InternalRaftRequest struct { func (m *InternalRaftRequest) Reset() { *m = InternalRaftRequest{} } func (m *InternalRaftRequest) String() string { return proto.CompactTextString(m) } func (*InternalRaftRequest) ProtoMessage() {} -func (*InternalRaftRequest) Descriptor() ([]byte, []int) { return fileDescriptorRaftInternal, []int{0} } +func (*InternalRaftRequest) Descriptor() ([]byte, []int) { return fileDescriptorRaftInternal, []int{1} } type EmptyResponse struct { } @@ -54,12 +66,42 @@ type EmptyResponse struct { func (m *EmptyResponse) Reset() { *m = EmptyResponse{} } func (m *EmptyResponse) String() string { return proto.CompactTextString(m) } func (*EmptyResponse) ProtoMessage() {} -func (*EmptyResponse) Descriptor() ([]byte, []int) { return fileDescriptorRaftInternal, []int{1} } +func (*EmptyResponse) Descriptor() ([]byte, []int) { return fileDescriptorRaftInternal, []int{2} } func init() { + proto.RegisterType((*RequestHeader)(nil), "etcdserverpb.RequestHeader") proto.RegisterType((*InternalRaftRequest)(nil), "etcdserverpb.InternalRaftRequest") proto.RegisterType((*EmptyResponse)(nil), "etcdserverpb.EmptyResponse") } +func (m *RequestHeader) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *RequestHeader) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.ID != 0 { + data[i] = 0x8 + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.ID)) + } + if len(m.Username) > 0 { + data[i] = 0x12 + i++ + i = encodeVarintRaftInternal(data, i, uint64(len(m.Username))) + i += copy(data[i:], m.Username) + } + return i, nil +} + func (m *InternalRaftRequest) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) @@ -268,6 +310,18 @@ func (m *InternalRaftRequest) MarshalTo(data []byte) (int, error) { } i += n18 } + if m.Header != nil { + data[i] = 0xa2 + i++ + data[i] = 0x6 + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.Header.Size())) + n19, err := m.Header.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n19 + } return i, nil } @@ -316,6 +370,19 @@ func encodeVarintRaftInternal(data []byte, offset int, v uint64) int { data[offset] = uint8(v) return offset + 1 } +func (m *RequestHeader) Size() (n int) { + var l int + _ = l + if m.ID != 0 { + n += 1 + sovRaftInternal(uint64(m.ID)) + } + l = len(m.Username) + if l > 0 { + n += 1 + l + sovRaftInternal(uint64(l)) + } + return n +} + func (m *InternalRaftRequest) Size() (n int) { var l int _ = l @@ -394,6 +461,10 @@ func (m *InternalRaftRequest) Size() (n int) { l = m.Alarm.Size() n += 2 + l + sovRaftInternal(uint64(l)) } + if m.Header != nil { + l = m.Header.Size() + n += 2 + l + sovRaftInternal(uint64(l)) + } return n } @@ -416,6 +487,104 @@ func sovRaftInternal(x uint64) (n int) { func sozRaftInternal(x uint64) (n int) { return sovRaftInternal(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } +func (m *RequestHeader) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaftInternal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: RequestHeader: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: RequestHeader: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType) + } + m.ID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaftInternal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.ID |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Username", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaftInternal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRaftInternal + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Username = string(data[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRaftInternal(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRaftInternal + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *InternalRaftRequest) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 @@ -1058,6 +1227,39 @@ func (m *InternalRaftRequest) Unmarshal(data []byte) error { return err } iNdEx = postIndex + case 100: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Header", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaftInternal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRaftInternal + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Header == nil { + m.Header = &RequestHeader{} + } + if err := m.Header.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRaftInternal(data[iNdEx:]) @@ -1235,40 +1437,43 @@ var ( ) var fileDescriptorRaftInternal = []byte{ - // 550 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x94, 0xdf, 0x6e, 0xd3, 0x30, - 0x14, 0xc6, 0x69, 0xf7, 0x07, 0xea, 0xfe, 0xc5, 0x05, 0x64, 0x7a, 0x31, 0x46, 0x11, 0x12, 0x02, - 0xa9, 0xa0, 0xf1, 0x00, 0x50, 0xda, 0x0a, 0x0d, 0x81, 0x34, 0x45, 0x70, 0x1d, 0xb9, 0xc9, 0x59, - 0x57, 0x91, 0x26, 0xc1, 0x71, 0xcb, 0x78, 0x39, 0xae, 0x77, 0xc9, 0x23, 0x00, 0x4f, 0x32, 0xfb, - 0x38, 0x71, 0x9a, 0xcd, 0xbd, 0x88, 0x94, 0x7c, 0xe7, 0x3b, 0xbf, 0xf3, 0xd9, 0xb1, 0x4c, 0xfa, - 0x82, 0x9f, 0x4b, 0x7f, 0x19, 0x4b, 0x10, 0x31, 0x8f, 0x46, 0xa9, 0x48, 0x64, 0x42, 0x5b, 0x20, - 0x83, 0x30, 0x03, 0xb1, 0x01, 0x91, 0xce, 0x07, 0x0f, 0x16, 0xc9, 0x22, 0xc1, 0xc2, 0x6b, 0xfd, - 0x66, 0x3c, 0x83, 0x5e, 0xe9, 0xc9, 0x95, 0x86, 0x48, 0x03, 0xf3, 0x3a, 0xfc, 0xdd, 0x20, 0xfd, - 0xd3, 0x9c, 0xe9, 0xa9, 0x01, 0x1e, 0xfc, 0x58, 0x43, 0x26, 0x69, 0x87, 0xd4, 0x4f, 0xa7, 0xac, - 0x76, 0x5c, 0x7b, 0xb1, 0xef, 0xd5, 0x97, 0x53, 0xfa, 0x9c, 0xd4, 0x37, 0x27, 0xac, 0xae, 0xbe, - 0x9b, 0x27, 0x0f, 0x47, 0xdb, 0x53, 0x47, 0x79, 0x8b, 0xa7, 0x0c, 0xf4, 0x0d, 0x39, 0x10, 0x3c, - 0x5e, 0x00, 0xdb, 0x43, 0xe7, 0xe0, 0x86, 0x53, 0x97, 0x0a, 0xbb, 0x31, 0xd2, 0x97, 0x64, 0x2f, - 0x5d, 0x4b, 0xb6, 0x8f, 0x7e, 0x56, 0xf5, 0x9f, 0xad, 0x8b, 0x3c, 0x9e, 0x36, 0xd1, 0x09, 0x69, - 0x85, 0x10, 0x81, 0x04, 0xdf, 0x0c, 0x39, 0xc0, 0xa6, 0xe3, 0x6a, 0xd3, 0x14, 0x1d, 0x95, 0x51, - 0xcd, 0xb0, 0xd4, 0xf4, 0x40, 0x79, 0x19, 0xb3, 0x43, 0xd7, 0xc0, 0xaf, 0x97, 0xb1, 0x1d, 0xa8, - 0x4c, 0xf4, 0x1d, 0x21, 0x41, 0xb2, 0x4a, 0x79, 0x20, 0x97, 0x49, 0xcc, 0xee, 0x62, 0xcb, 0x93, - 0x6a, 0xcb, 0xc4, 0xd6, 0x8b, 0xce, 0xad, 0x16, 0xfa, 0x9e, 0x34, 0x23, 0xe0, 0x19, 0xf8, 0x0b, - 0x95, 0x58, 0xb2, 0x7b, 0x2e, 0xc2, 0x67, 0x6d, 0xf8, 0xa8, 0xeb, 0x96, 0x10, 0x59, 0x49, 0xaf, - 0xd9, 0x10, 0x04, 0x6c, 0x92, 0xef, 0xc0, 0x1a, 0xae, 0x35, 0x23, 0xc2, 0x43, 0x83, 0x5d, 0x73, - 0x54, 0x6a, 0x3a, 0x06, 0x5f, 0xcb, 0x0b, 0x1f, 0x62, 0x3e, 0x8f, 0x80, 0x11, 0x57, 0x8c, 0xb1, - 0x32, 0xcc, 0xb0, 0x6e, 0x63, 0x70, 0x2b, 0xe9, 0x18, 0x48, 0x08, 0x97, 0x19, 0x22, 0x9a, 0xae, - 0x18, 0x1a, 0x31, 0x35, 0x06, 0x1b, 0x83, 0x97, 0x1a, 0x9d, 0x92, 0x36, 0x42, 0xd6, 0xaa, 0xc3, - 0xe7, 0x61, 0xc8, 0x5a, 0xbb, 0x28, 0xdf, 0xd4, 0xd7, 0x38, 0x0c, 0x2b, 0x94, 0x5c, 0xa3, 0x5f, - 0x48, 0xaf, 0xa4, 0x98, 0x3f, 0xcb, 0xda, 0x08, 0x7a, 0xe6, 0x06, 0xe5, 0x27, 0x22, 0x67, 0x75, - 0x78, 0x45, 0xa6, 0xe7, 0xe4, 0x71, 0x89, 0x0b, 0x2e, 0xf4, 0x19, 0xf1, 0x53, 0x9e, 0x65, 0x3f, - 0x13, 0x11, 0xb2, 0x0e, 0x72, 0x5f, 0xb9, 0xb9, 0x13, 0x34, 0x9f, 0xe5, 0xde, 0x82, 0xff, 0x88, - 0x3b, 0xcb, 0xf4, 0x13, 0xe9, 0x96, 0x73, 0xcc, 0x71, 0xe8, 0x22, 0x7d, 0xe8, 0xa6, 0x57, 0x4e, - 0x44, 0x9b, 0x6f, 0xab, 0x76, 0x23, 0x45, 0x12, 0x01, 0x6e, 0x64, 0x6f, 0xd7, 0x46, 0x7a, 0xca, - 0x71, 0x73, 0x23, 0x73, 0xcd, 0x26, 0x42, 0x8a, 0x49, 0x74, 0x7f, 0x57, 0x22, 0xdd, 0x73, 0x3b, - 0x91, 0x55, 0xe9, 0xcc, 0x9c, 0x0f, 0x88, 0xe5, 0x32, 0xe0, 0xea, 0x87, 0x50, 0x04, 0x3d, 0xbd, - 0x0d, 0x2a, 0x1c, 0x05, 0xa7, 0xd2, 0xa6, 0xef, 0x0f, 0x1e, 0x71, 0xb1, 0x62, 0x7d, 0xd7, 0xfd, - 0x31, 0xd6, 0x25, 0x7b, 0x7f, 0xa0, 0x71, 0xd8, 0x25, 0xed, 0xd9, 0x2a, 0x95, 0xbf, 0x3c, 0xc8, - 0xd2, 0x24, 0xce, 0xe0, 0x43, 0xef, 0xea, 0xdf, 0xd1, 0x9d, 0xab, 0xff, 0x47, 0xb5, 0x3f, 0xea, - 0xf9, 0xab, 0x9e, 0xf9, 0x21, 0x5e, 0x75, 0x6f, 0xaf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xe8, 0x6d, - 0xd8, 0x5b, 0x42, 0x05, 0x00, 0x00, + // 593 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x94, 0xcf, 0x6e, 0xd3, 0x40, + 0x10, 0xc6, 0x69, 0xda, 0x86, 0x66, 0xf3, 0x97, 0x0d, 0xa0, 0x25, 0x48, 0xa5, 0x04, 0x21, 0x21, + 0x90, 0x02, 0x6a, 0x8f, 0x1c, 0x20, 0x24, 0x11, 0x14, 0x81, 0x54, 0x59, 0x70, 0xb6, 0x36, 0xf6, + 0x34, 0x89, 0x70, 0x6c, 0xb3, 0xde, 0x84, 0xf2, 0x86, 0x3d, 0xf2, 0x08, 0xc0, 0x2b, 0xf0, 0x02, + 0x78, 0x67, 0xed, 0x75, 0xdc, 0x6c, 0x0e, 0x96, 0xec, 0x99, 0x6f, 0x7e, 0xf3, 0xed, 0x7a, 0x34, + 0xa4, 0x2b, 0xf8, 0xa5, 0x74, 0x17, 0xa1, 0x04, 0x11, 0xf2, 0x60, 0x10, 0x8b, 0x48, 0x46, 0xb4, + 0x01, 0xd2, 0xf3, 0x13, 0x10, 0x6b, 0x10, 0xf1, 0xb4, 0x77, 0x77, 0x16, 0xcd, 0x22, 0x4c, 0xbc, + 0x54, 0x6f, 0x5a, 0xd3, 0xeb, 0x14, 0x9a, 0x2c, 0x52, 0x13, 0xb1, 0xa7, 0x5f, 0xfb, 0xaf, 0x49, + 0xd3, 0x81, 0xef, 0x2b, 0x48, 0xe4, 0x07, 0xe0, 0x3e, 0x08, 0xda, 0x22, 0x95, 0xf3, 0x31, 0xdb, + 0x3b, 0xd9, 0x7b, 0x76, 0xe0, 0x54, 0x16, 0x63, 0xda, 0x23, 0x47, 0xab, 0x44, 0xb5, 0x5c, 0x02, + 0xab, 0xa4, 0xd1, 0x9a, 0x63, 0xbe, 0xfb, 0xff, 0x6a, 0xa4, 0x7b, 0x9e, 0x19, 0x72, 0x52, 0x77, + 0x19, 0x69, 0x8b, 0xf1, 0x94, 0x54, 0xd6, 0xa7, 0x58, 0x5d, 0x3f, 0xbd, 0x37, 0xd8, 0xb4, 0x3c, + 0xc8, 0x4a, 0x9c, 0x54, 0x40, 0x5f, 0x91, 0x43, 0xc1, 0xc3, 0x19, 0xb0, 0x7d, 0x54, 0xf6, 0x6e, + 0x28, 0x55, 0x2a, 0x97, 0x6b, 0x21, 0x7d, 0x4e, 0xf6, 0xe3, 0x95, 0x64, 0x07, 0xa8, 0x67, 0x65, + 0xfd, 0xc5, 0x2a, 0xf7, 0xe3, 0x28, 0x11, 0x1d, 0x91, 0x86, 0x0f, 0x01, 0x48, 0x70, 0x75, 0x93, + 0x43, 0x2c, 0x3a, 0x29, 0x17, 0x8d, 0x51, 0x51, 0x6a, 0x55, 0xf7, 0x8b, 0x98, 0x6a, 0x28, 0xaf, + 0x42, 0x56, 0xb5, 0x35, 0xfc, 0x72, 0x15, 0x9a, 0x86, 0xa9, 0x88, 0xbe, 0x21, 0xc4, 0x8b, 0x96, + 0x31, 0xf7, 0xe4, 0x22, 0x0a, 0xd9, 0x6d, 0x2c, 0x79, 0x54, 0x2e, 0x19, 0x99, 0x7c, 0x5e, 0xb9, + 0x51, 0x42, 0xdf, 0x92, 0x7a, 0x00, 0x3c, 0x01, 0x77, 0x96, 0x3a, 0x96, 0xec, 0xc8, 0x46, 0xf8, + 0xa4, 0x04, 0xef, 0x55, 0xde, 0x10, 0x02, 0x13, 0x52, 0x67, 0xd6, 0x04, 0x01, 0xeb, 0xe8, 0x1b, + 0xb0, 0x9a, 0xed, 0xcc, 0x88, 0x70, 0x50, 0x60, 0xce, 0x1c, 0x14, 0x31, 0x65, 0x83, 0xaf, 0xe4, + 0xdc, 0x85, 0x90, 0x4f, 0x03, 0x60, 0xc4, 0x66, 0x63, 0x98, 0x0a, 0x26, 0x98, 0x37, 0x36, 0xb8, + 0x09, 0x29, 0x1b, 0x48, 0xf0, 0x17, 0x09, 0x22, 0xea, 0x36, 0x1b, 0x0a, 0x31, 0xd6, 0x02, 0x63, + 0x83, 0x17, 0x31, 0x3a, 0x26, 0x4d, 0x84, 0xa8, 0xe9, 0x73, 0xb9, 0xef, 0xb3, 0xc6, 0x2e, 0xca, + 0xd7, 0xf4, 0x6b, 0xe8, 0xfb, 0x25, 0x4a, 0x16, 0xa3, 0x9f, 0x49, 0xa7, 0xa0, 0xe8, 0x3f, 0xcb, + 0x9a, 0x08, 0x7a, 0x62, 0x07, 0x65, 0x13, 0x91, 0xb1, 0x5a, 0xbc, 0x14, 0xa6, 0x97, 0xe4, 0x41, + 0x81, 0xf3, 0xe6, 0x6a, 0x46, 0xdc, 0x98, 0x27, 0xc9, 0x8f, 0x48, 0xf8, 0xac, 0x85, 0xdc, 0x17, + 0x76, 0xee, 0x08, 0xc5, 0x17, 0x99, 0x36, 0xe7, 0xdf, 0xe7, 0xd6, 0x34, 0xfd, 0x48, 0xda, 0x45, + 0x1f, 0x3d, 0x0e, 0x6d, 0xa4, 0xf7, 0xed, 0xf4, 0xd2, 0x44, 0x34, 0xf9, 0x66, 0xd4, 0x5c, 0xa4, + 0x88, 0x02, 0xc0, 0x8b, 0xec, 0xec, 0xba, 0x48, 0x27, 0x55, 0xdc, 0xbc, 0xc8, 0x2c, 0x66, 0x1c, + 0x21, 0x45, 0x3b, 0xba, 0xb3, 0xcb, 0x91, 0xaa, 0xd9, 0x76, 0x64, 0xa2, 0x74, 0xa2, 0xe7, 0x03, + 0x42, 0xb9, 0xf0, 0x78, 0xfa, 0x43, 0x28, 0x82, 0x1e, 0x6f, 0x83, 0x72, 0x45, 0xce, 0x29, 0x95, + 0xa9, 0xfd, 0xc1, 0x03, 0x2e, 0x96, 0xac, 0x6b, 0xdb, 0x1f, 0x43, 0x95, 0x32, 0xfb, 0x03, 0x85, + 0xf4, 0x8c, 0x54, 0xe7, 0xb8, 0xf6, 0x98, 0x8f, 0x25, 0x0f, 0xad, 0xcb, 0x49, 0x6f, 0x46, 0x27, + 0x93, 0xf6, 0xdb, 0xa4, 0x39, 0x59, 0xc6, 0xf2, 0xa7, 0x03, 0x49, 0x1c, 0x85, 0x09, 0xbc, 0xeb, + 0x5c, 0xff, 0x39, 0xbe, 0x75, 0xfd, 0xf7, 0x78, 0xef, 0x57, 0xfa, 0xfc, 0x4e, 0x9f, 0x69, 0x15, + 0x97, 0xeb, 0xd9, 0xff, 0x00, 0x00, 0x00, 0xff, 0xff, 0xb1, 0xad, 0x4e, 0x86, 0xb4, 0x05, 0x00, + 0x00, } diff --git a/etcdserver/etcdserverpb/raft_internal.proto b/etcdserver/etcdserverpb/raft_internal.proto index 042279efe..df0872808 100644 --- a/etcdserver/etcdserverpb/raft_internal.proto +++ b/etcdserver/etcdserverpb/raft_internal.proto @@ -10,10 +10,18 @@ option (gogoproto.sizer_all) = true; option (gogoproto.unmarshaler_all) = true; option (gogoproto.goproto_getters_all) = false; +message RequestHeader { + uint64 ID = 1; + // username is a username that is associated with an auth token of gRPC connection + string username = 2; +} + // An InternalRaftRequest is the union of all requests which can be // sent via raft. message InternalRaftRequest { + RequestHeader header = 100; uint64 ID = 1; + Request v2 = 2; RangeRequest range = 3; diff --git a/etcdserver/server.go b/etcdserver/server.go index 056b6f643..5d3e75927 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1029,11 +1029,17 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { if e.Index <= s.consistIndex.ConsistentIndex() { return } + + id := raftReq.ID + if id == 0 { + id = raftReq.Header.ID + } + // set the consistent index of current executing entry s.consistIndex.setConsistentIndex(e.Index) ar := s.applyV3Request(&raftReq) if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 { - s.w.Trigger(raftReq.ID, ar) + s.w.Trigger(id, ar) return } @@ -1046,7 +1052,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { } r := pb.InternalRaftRequest{Alarm: a} s.processInternalRaftRequest(context.TODO(), r) - s.w.Trigger(raftReq.ID, ar) + s.w.Trigger(id, ar) }() } diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 08e5da483..962bf088c 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -22,6 +22,7 @@ import ( "github.com/coreos/etcd/lease/leasehttp" "github.com/coreos/etcd/mvcc" "golang.org/x/net/context" + "google.golang.org/grpc/metadata" ) const ( @@ -332,8 +333,32 @@ func (s *EtcdServer) RoleGrant(ctx context.Context, r *pb.AuthRoleGrantRequest) return result.resp.(*pb.AuthRoleGrantResponse), nil } +func (s *EtcdServer) usernameFromCtx(ctx context.Context) (string, error) { + md, mdexist := metadata.FromContext(ctx) + if mdexist { + token, texist := md["token"] + if texist { + username, uexist := s.AuthStore().UsernameFromToken(token[0]) + if !uexist { + plog.Warningf("invalid auth token: %s", token[0]) + return "", ErrInvalidAuthToken + } + return username, nil + } + } + + return "", nil +} + func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) { - r.ID = s.reqIDGen.Next() + r.Header = &pb.RequestHeader{ + ID: s.reqIDGen.Next(), + } + username, err := s.usernameFromCtx(ctx) + if err != nil { + return nil, err + } + r.Header.Username = username data, err := r.Marshal() if err != nil { @@ -344,7 +369,11 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern return nil, ErrRequestTooLarge } - ch := s.w.Register(r.ID) + id := r.ID + if id == 0 { + id = r.Header.ID + } + ch := s.w.Register(id) cctx, cancel := context.WithTimeout(ctx, maxV3RequestTimeout) defer cancel() @@ -355,7 +384,7 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern case x := <-ch: return x.(*applyResult), nil case <-cctx.Done(): - s.w.Trigger(r.ID, nil) // GC wait + s.w.Trigger(id, nil) // GC wait return nil, cctx.Err() case <-s.done: return nil, ErrStopped