"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "clientv3/client.go" between
etcd-3.4.1.tar.gz and etcd-3.4.2.tar.gz

About: etcd is a distributed reliable key-value store for the most critical data of a distributed system (written in "Go").

client.go  (etcd-3.4.1):client.go  (etcd-3.4.2)
skipping to change at line 233 skipping to change at line 233
if c.cfg.DialKeepAliveTime > 0 { if c.cfg.DialKeepAliveTime > 0 {
params := keepalive.ClientParameters{ params := keepalive.ClientParameters{
Time: c.cfg.DialKeepAliveTime, Time: c.cfg.DialKeepAliveTime,
Timeout: c.cfg.DialKeepAliveTimeout, Timeout: c.cfg.DialKeepAliveTimeout,
PermitWithoutStream: c.cfg.PermitWithoutStream, PermitWithoutStream: c.cfg.PermitWithoutStream,
} }
opts = append(opts, grpc.WithKeepaliveParams(params)) opts = append(opts, grpc.WithKeepaliveParams(params))
} }
opts = append(opts, dopts...) opts = append(opts, dopts...)
// Provide a net dialer that supports cancelation and timeout. dialer := endpoint.Dialer
f := func(dialEp string, t time.Duration) (net.Conn, error) {
proto, host, _ := endpoint.ParseEndpoint(dialEp)
select {
case <-c.ctx.Done():
return nil, c.ctx.Err()
default:
}
dialer := &net.Dialer{Timeout: t}
return dialer.DialContext(c.ctx, proto, host)
}
opts = append(opts, grpc.WithDialer(f))
if creds != nil { if creds != nil {
opts = append(opts, grpc.WithTransportCredentials(creds)) opts = append(opts, grpc.WithTransportCredentials(creds))
// gRPC load balancer workaround. See credentials.transportCreden
tial for details.
if credsDialer, ok := creds.(TransportCredentialsWithDialer); ok
{
dialer = credsDialer.Dialer
}
} else { } else {
opts = append(opts, grpc.WithInsecure()) opts = append(opts, grpc.WithInsecure())
} }
opts = append(opts, grpc.WithContextDialer(dialer))
// Interceptor retry and backoff. // Interceptor retry and backoff.
// TODO: Replace all of clientv3/retry.go with interceptor based retry, o r with // TODO: Replace all of clientv3/retry.go with interceptor based retry, o r with
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retr y-policy // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retr y-policy
// once it is available. // once it is available.
rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetw een, defaultBackoffJitterFraction)) rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetw een, defaultBackoffJitterFraction))
opts = append(opts, opts = append(opts,
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams. // Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
// Streams that are safe to retry are enabled individually. // Streams that are safe to retry are enabled individually.
grpc.WithStreamInterceptor(c.streamClientInterceptor(c.lg, withMa x(0), rrBackoff)), grpc.WithStreamInterceptor(c.streamClientInterceptor(c.lg, withMa x(0), rrBackoff)),
grpc.WithUnaryInterceptor(c.unaryClientInterceptor(c.lg, withMax( defaultUnaryMaxRetries), rrBackoff)), grpc.WithUnaryInterceptor(c.unaryClientInterceptor(c.lg, withMax( defaultUnaryMaxRetries), rrBackoff)),
) )
return opts, nil return opts, nil
} }
// Dial connects to a single endpoint using the client's config. // Dial connects to a single endpoint using the client's config.
func (c *Client) Dial(ep string) (*grpc.ClientConn, error) { func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
creds := c.directDialCreds(ep) creds, err := c.directDialCreds(ep)
if err != nil {
return nil, err
}
// Use the grpc passthrough resolver to directly dial a single endpoint. // Use the grpc passthrough resolver to directly dial a single endpoint.
// This resolver passes through the 'unix' and 'unixs' endpoints schemes used // This resolver passes through the 'unix' and 'unixs' endpoints schemes used
// by etcd without modification, allowing us to directly dial endpoints a nd // by etcd without modification, allowing us to directly dial endpoints a nd
// using the same dial functions that we use for load balancer dialing. // using the same dial functions that we use for load balancer dialing.
return c.dial(fmt.Sprintf("passthrough:///%s", ep), creds) return c.dial(fmt.Sprintf("passthrough:///%s", ep), creds)
} }
func (c *Client) getToken(ctx context.Context) error { func (c *Client) getToken(ctx context.Context) error {
var err error // return last error in a case of fail var err error // return last error in a case of fail
var auth *authenticator var auth *authenticator
skipping to change at line 372 skipping to change at line 368
defer cancel() // TODO: Is this right for cases where grpc.WithBl ock() is not set on the dial options? defer cancel() // TODO: Is this right for cases where grpc.WithBl ock() is not set on the dial options?
} }
conn, err := grpc.DialContext(dctx, target, opts...) conn, err := grpc.DialContext(dctx, target, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return conn, nil return conn, nil
} }
func (c *Client) directDialCreds(ep string) grpccredentials.TransportCredentials func (c *Client) directDialCreds(ep string) (grpccredentials.TransportCredential
{ s, error) {
_, hostPort, scheme := endpoint.ParseEndpoint(ep) _, host, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds creds := c.creds
if len(scheme) != 0 { if len(scheme) != 0 {
creds = c.processCreds(scheme) creds = c.processCreds(scheme)
if creds != nil { if creds != nil {
clone := creds.Clone() clone := creds.Clone()
// Set the server name must to the endpoint hostname with out port since grpc // Set the server name must to the endpoint hostname with out port since grpc
// otherwise attempts to check if x509 cert is valid for the full endpoint // otherwise attempts to check if x509 cert is valid for the full endpoint
// including the scheme and port, which fails. // including the scheme and port, which fails.
host, _ := endpoint.ParseHostPort(hostPort) overrideServerName, _, err := net.SplitHostPort(host)
clone.OverrideServerName(host) if err != nil {
// Either the host didn't have a port or the host
could not be parsed. Either way, continue with the
// original host string.
overrideServerName = host
}
clone.OverrideServerName(overrideServerName)
creds = clone creds = clone
} }
} }
return creds return creds, nil
} }
func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCrede ntials { func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCrede ntials {
_, _, scheme := endpoint.ParseEndpoint(ep) _, _, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds creds := c.creds
if len(scheme) != 0 { if len(scheme) != 0 {
creds = c.processCreds(scheme) creds = c.processCreds(scheme)
} }
return creds return creds
} }
skipping to change at line 666 skipping to change at line 667
} }
// >= gRPC v1.10.x // >= gRPC v1.10.x
if err == context.Canceled { if err == context.Canceled {
return true return true
} }
// <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is clo sing")' // <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is clo sing")'
return strings.Contains(err.Error(), "grpc: the client connection is clos ing") return strings.Contains(err.Error(), "grpc: the client connection is clos ing")
} }
// TransportCredentialsWithDialer is for a gRPC load balancer workaround. See cr
edentials.transportCredential for details.
type TransportCredentialsWithDialer interface {
grpccredentials.TransportCredentials
Dialer(ctx context.Context, dialEp string) (net.Conn, error)
}
 End of changes. 8 change blocks. 
20 lines changed or deleted 24 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)