clientv3:get AuthToken gracefully without extra connection. (#12165)
* etcdserver: check authinfo if it is not InternalAuthenticateRequest. * credentials: let GetRequestMetadata() return nil when authToken isn't initialized. * clientv3: get AuthToken gracefully without extra connection.release-3.5
parent
74fea11ddc
commit
8050881aaf
|
@ -56,6 +56,9 @@ const (
|
|||
type UserAddOptions authpb.UserAddOptions
|
||||
|
||||
type Auth interface {
|
||||
// Authenticate login and get token
|
||||
Authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error)
|
||||
|
||||
// AuthEnable enables auth of an etcd cluster.
|
||||
AuthEnable(ctx context.Context) (*AuthEnableResponse, error)
|
||||
|
||||
|
@ -129,6 +132,11 @@ func NewAuthFromAuthClient(remote pb.AuthClient, c *Client) Auth {
|
|||
return api
|
||||
}
|
||||
|
||||
func (auth *authClient) Authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
|
||||
resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...)
|
||||
return (*AuthenticateResponse)(resp), toErr(ctx, err)
|
||||
}
|
||||
|
||||
func (auth *authClient) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
|
||||
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...)
|
||||
return (*AuthEnableResponse)(resp), toErr(ctx, err)
|
||||
|
@ -226,34 +234,3 @@ func StrToPermissionType(s string) (PermissionType, error) {
|
|||
}
|
||||
return PermissionType(-1), fmt.Errorf("invalid permission type: %s", s)
|
||||
}
|
||||
|
||||
type authenticator struct {
|
||||
conn *grpc.ClientConn // conn in-use
|
||||
remote pb.AuthClient
|
||||
callOpts []grpc.CallOption
|
||||
}
|
||||
|
||||
func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
|
||||
resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...)
|
||||
return (*AuthenticateResponse)(resp), toErr(ctx, err)
|
||||
}
|
||||
|
||||
func (auth *authenticator) close() {
|
||||
auth.conn.Close()
|
||||
}
|
||||
|
||||
func newAuthenticator(ctx context.Context, target string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
|
||||
conn, err := grpc.DialContext(ctx, target, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
api := &authenticator{
|
||||
conn: conn,
|
||||
remote: pb.NewAuthClient(conn),
|
||||
}
|
||||
if c != nil {
|
||||
api.callOpts = c.callOpts
|
||||
}
|
||||
return api, nil
|
||||
}
|
||||
|
|
|
@ -274,41 +274,19 @@ func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
|
|||
func (c *Client) getToken(ctx context.Context) error {
|
||||
var err error // return last error in a case of fail
|
||||
|
||||
eps := c.Endpoints()
|
||||
for _, ep := range eps {
|
||||
var auth *authenticator
|
||||
// use dial options without dopts to avoid reusing the client balancer
|
||||
var dOpts []grpc.DialOption
|
||||
_, host, _ := endpoint.ParseEndpoint(ep)
|
||||
target := c.resolverGroup.Target(host)
|
||||
creds := c.dialWithBalancerCreds(ep)
|
||||
dOpts, err = c.dialSetupOpts(creds, c.cfg.DialOptions...)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to configure auth dialer: %v", err)
|
||||
continue
|
||||
}
|
||||
dOpts = append(dOpts, grpc.WithBalancerName(roundRobinBalancerName))
|
||||
auth, err = newAuthenticator(ctx, target, dOpts, c)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
defer auth.close()
|
||||
|
||||
var resp *AuthenticateResponse
|
||||
resp, err = auth.authenticate(ctx, c.Username, c.Password)
|
||||
if err != nil {
|
||||
// return err without retrying other endpoints
|
||||
if err == rpctypes.ErrAuthNotEnabled {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
c.authTokenBundle.UpdateAuthToken(resp.Token)
|
||||
if c.Username == "" || c.Password == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
resp, err := c.Auth.Authenticate(ctx, c.Username, c.Password)
|
||||
if err != nil {
|
||||
if err == rpctypes.ErrAuthNotEnabled {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
c.authTokenBundle.UpdateAuthToken(resp.Token)
|
||||
return nil
|
||||
}
|
||||
|
||||
// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
|
||||
|
@ -329,31 +307,7 @@ func (c *Client) dial(target string, creds grpccredentials.TransportCredentials,
|
|||
|
||||
if c.Username != "" && c.Password != "" {
|
||||
c.authTokenBundle = credentials.NewBundle(credentials.Config{})
|
||||
|
||||
ctx, cancel := c.ctx, func() {}
|
||||
if c.cfg.DialTimeout > 0 {
|
||||
ctx, cancel = context.WithTimeout(ctx, c.cfg.DialTimeout)
|
||||
}
|
||||
|
||||
err = c.getToken(ctx)
|
||||
if err != nil {
|
||||
// TODO: Consider retrying transient errors like:
|
||||
// "error":"rpc error: code = Unavailable desc = etcdserver: leader changed"
|
||||
|
||||
// Ignore rpctypes.ErrAuthNotEnabled error.
|
||||
if toErr(ctx, err) != rpctypes.ErrAuthNotEnabled {
|
||||
// This logic originates from 62d7bae496 and is not clear why we cannot just return err
|
||||
// without looking into parent's context.
|
||||
if err == ctx.Err() && ctx.Err() != c.ctx.Err() {
|
||||
err = context.DeadlineExceeded
|
||||
}
|
||||
cancel()
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
|
||||
}
|
||||
cancel()
|
||||
opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
|
||||
}
|
||||
|
||||
opts = append(opts, c.cfg.DialOptions...)
|
||||
|
@ -496,6 +450,19 @@ func newClient(cfg *Config) (*Client, error) {
|
|||
client.Auth = NewAuth(client)
|
||||
client.Maintenance = NewMaintenance(client)
|
||||
|
||||
//get token with established connection
|
||||
ctx, cancel = client.ctx, func() {}
|
||||
if client.cfg.DialTimeout > 0 {
|
||||
ctx, cancel = context.WithTimeout(ctx, client.cfg.DialTimeout)
|
||||
}
|
||||
err = client.getToken(ctx)
|
||||
if err != nil {
|
||||
client.Close()
|
||||
cancel()
|
||||
return nil, err
|
||||
}
|
||||
cancel()
|
||||
|
||||
if cfg.RejectOldCluster {
|
||||
if err := client.checkVersion(); err != nil {
|
||||
client.Close()
|
||||
|
|
|
@ -111,6 +111,9 @@ func (rc *perRPCCredential) GetRequestMetadata(ctx context.Context, s ...string)
|
|||
rc.authTokenMu.RLock()
|
||||
authToken := rc.authToken
|
||||
rc.authTokenMu.RUnlock()
|
||||
if authToken == "" {
|
||||
return nil, nil
|
||||
}
|
||||
return map[string]string{rpctypes.TokenFieldNameGRPC: authToken}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -82,7 +82,19 @@ func NewMaintenance(c *Client) Maintenance {
|
|||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to dial endpoint %s with maintenance client: %v", endpoint, err)
|
||||
}
|
||||
cancel := func() { conn.Close() }
|
||||
|
||||
//get token with established connection
|
||||
dctx := c.ctx
|
||||
cancel := func() {}
|
||||
if c.cfg.DialTimeout > 0 {
|
||||
dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
|
||||
}
|
||||
err = c.getToken(dctx)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to getToken from endpoint %s with maintenance client: %v", endpoint, err)
|
||||
}
|
||||
cancel = func() { conn.Close() }
|
||||
return RetryMaintenanceClient(c, conn), cancel, nil
|
||||
},
|
||||
remote: RetryMaintenanceClient(c, c.conn),
|
||||
|
|
|
@ -647,13 +647,16 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
|
|||
ID: s.reqIDGen.Next(),
|
||||
}
|
||||
|
||||
authInfo, err := s.AuthInfoFromCtx(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if authInfo != nil {
|
||||
r.Header.Username = authInfo.Username
|
||||
r.Header.AuthRevision = authInfo.Revision
|
||||
// check authinfo if it is not InternalAuthenticateRequest
|
||||
if r.Authenticate == nil {
|
||||
authInfo, err := s.AuthInfoFromCtx(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if authInfo != nil {
|
||||
r.Header.Username = authInfo.Username
|
||||
r.Header.AuthRevision = authInfo.Revision
|
||||
}
|
||||
}
|
||||
|
||||
data, err := r.Marshal()
|
||||
|
|
Loading…
Reference in New Issue