Merge pull request #11574 from YoyinZyc/simpligy-grpc

clientv3: simplify grpc dialer usage.
release-3.5
Gyuho Lee 2020-02-03 12:41:41 -08:00 committed by GitHub
commit a698ad65f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 4 additions and 58 deletions

View File

@ -174,7 +174,8 @@ type Resolver struct {
func epsToAddrs(eps ...string) (addrs []resolver.Address) {
addrs = make([]resolver.Address, 0, len(eps))
for _, ep := range eps {
addrs = append(addrs, resolver.Address{Addr: ep})
_, host, _ := ParseEndpoint(ep)
addrs = append(addrs, resolver.Address{Addr: ep, ServerName: host})
}
return addrs
}

View File

@ -233,10 +233,6 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
dialer := endpoint.Dialer
if creds != nil {
opts = append(opts, grpc.WithTransportCredentials(creds))
// gRPC load balancer workaround. See credentials.transportCredential for details.
if credsDialer, ok := creds.(TransportCredentialsWithDialer); ok {
dialer = credsDialer.Dialer
}
} else {
opts = append(opts, grpc.WithInsecure())
}
@ -664,9 +660,3 @@ func IsConnCanceled(err error) bool {
// <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
return strings.Contains(err.Error(), "grpc: the client connection is closing")
}
// TransportCredentialsWithDialer is for a gRPC load balancer workaround. See credentials.transportCredential for details.
type TransportCredentialsWithDialer interface {
grpccredentials.TransportCredentials
Dialer(ctx context.Context, dialEp string) (net.Conn, error)
}

View File

@ -22,7 +22,6 @@ import (
"net"
"sync"
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
grpccredentials "google.golang.org/grpc/credentials"
)
@ -66,46 +65,20 @@ func (b *bundle) NewWithMode(mode string) (grpccredentials.Bundle, error) {
}
// transportCredential implements "grpccredentials.TransportCredentials" interface.
// transportCredential wraps TransportCredentials to track which
// addresses are dialed for which endpoints, and then sets the authority when checking the endpoint's cert to the
// hostname or IP of the dialed endpoint.
// This is a workaround of a gRPC load balancer issue. gRPC uses the dialed target's service name as the authority when
// checking all endpoint certs, which does not work for etcd servers using their hostname or IP as the Subject Alternative Name
// in their TLS certs.
// To enable, include both WithTransportCredentials(creds) and WithContextDialer(creds.Dialer)
// when dialing.
type transportCredential struct {
gtc grpccredentials.TransportCredentials
mu sync.Mutex
// addrToEndpoint maps from the connection addresses that are dialed to the hostname or IP of the
// endpoint provided to the dialer when dialing
addrToEndpoint map[string]string
}
func newTransportCredential(cfg *tls.Config) *transportCredential {
return &transportCredential{
gtc: grpccredentials.NewTLS(cfg),
addrToEndpoint: map[string]string{},
gtc: grpccredentials.NewTLS(cfg),
}
}
func (tc *transportCredential) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) {
// Set the authority when checking the endpoint's cert to the hostname or IP of the dialed endpoint
tc.mu.Lock()
dialEp, ok := tc.addrToEndpoint[rawConn.RemoteAddr().String()]
tc.mu.Unlock()
if ok {
_, host, _ := endpoint.ParseEndpoint(dialEp)
authority = host
}
return tc.gtc.ClientHandshake(ctx, authority, rawConn)
}
// return true if given string is an IP.
func isIP(ep string) bool {
return net.ParseIP(ep) != nil
}
func (tc *transportCredential) ServerHandshake(rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) {
return tc.gtc.ServerHandshake(rawConn)
}
@ -115,15 +88,8 @@ func (tc *transportCredential) Info() grpccredentials.ProtocolInfo {
}
func (tc *transportCredential) Clone() grpccredentials.TransportCredentials {
copy := map[string]string{}
tc.mu.Lock()
for k, v := range tc.addrToEndpoint {
copy[k] = v
}
tc.mu.Unlock()
return &transportCredential{
gtc: tc.gtc.Clone(),
addrToEndpoint: copy,
gtc: tc.gtc.Clone(),
}
}
@ -131,17 +97,6 @@ func (tc *transportCredential) OverrideServerName(serverNameOverride string) err
return tc.gtc.OverrideServerName(serverNameOverride)
}
func (tc *transportCredential) Dialer(ctx context.Context, dialEp string) (net.Conn, error) {
// Keep track of which addresses are dialed for which endpoints
conn, err := endpoint.Dialer(ctx, dialEp)
if conn != nil {
tc.mu.Lock()
tc.addrToEndpoint[conn.RemoteAddr().String()] = dialEp
tc.mu.Unlock()
}
return conn, err
}
// perRPCCredential implements "grpccredentials.PerRPCCredentials" interface.
type perRPCCredential struct {
authToken string