retry_interceptor.go (etcd-3.5.5) | : | retry_interceptor.go (etcd-3.5.6) | ||
---|---|---|---|---|
skipping to change at line 77 | skipping to change at line 77 | |||
) | ) | |||
if isContextError(lastErr) { | if isContextError(lastErr) { | |||
if ctx.Err() != nil { | if ctx.Err() != nil { | |||
// its the context deadline or cancellati on. | // its the context deadline or cancellati on. | |||
return lastErr | return lastErr | |||
} | } | |||
// its the callCtx deadline or cancellation, in w hich case try again. | // its the callCtx deadline or cancellation, in w hich case try again. | |||
continue | continue | |||
} | } | |||
if c.shouldRefreshToken(lastErr, callOpts) { | if c.shouldRefreshToken(lastErr, callOpts) { | |||
// clear auth token before refreshing it. | gterr := c.refreshToken(ctx) | |||
// call c.Auth.Authenticate with an invalid token | ||||
will always fail the auth check on the server-side, | ||||
// if the server has not apply the patch of pr #1 | ||||
2165 (https://github.com/etcd-io/etcd/pull/12165) | ||||
// and a rpctypes.ErrInvalidAuthToken will recurs | ||||
ively call c.getToken until system run out of resource. | ||||
c.authTokenBundle.UpdateAuthToken("") | ||||
gterr := c.getToken(ctx) | ||||
if gterr != nil { | if gterr != nil { | |||
c.GetLogger().Warn( | c.GetLogger().Warn( | |||
"retrying of unary invoker failed to fetch new auth token", | "retrying of unary invoker failed to fetch new auth token", | |||
zap.String("target", cc.Target()) , | zap.String("target", cc.Target()) , | |||
zap.Error(gterr), | zap.Error(gterr), | |||
) | ) | |||
return gterr // lastErr must be invalid a uth token | return gterr // lastErr must be invalid a uth token | |||
} | } | |||
continue | continue | |||
} | } | |||
skipping to change at line 164 | skipping to change at line 158 | |||
if rpctypes.Error(err) == rpctypes.ErrUserEmpty { | if rpctypes.Error(err) == rpctypes.ErrUserEmpty { | |||
// refresh the token when username, password is present but the s erver returns ErrUserEmpty | // refresh the token when username, password is present but the s erver returns ErrUserEmpty | |||
// which is possible when the client token is cleared somehow | // which is possible when the client token is cleared somehow | |||
return c.authTokenBundle != nil // equal to c.Username != "" && c .Password != "" | return c.authTokenBundle != nil // equal to c.Username != "" && c .Password != "" | |||
} | } | |||
return callOpts.retryAuth && | return callOpts.retryAuth && | |||
(rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken || rpctypes. Error(err) == rpctypes.ErrAuthOldRevision) | (rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken || rpctypes. Error(err) == rpctypes.ErrAuthOldRevision) | |||
} | } | |||
func (c *Client) refreshToken(ctx context.Context) error { | ||||
if c.authTokenBundle == nil { | ||||
// c.authTokenBundle will be initialized only when | ||||
// c.Username != "" && c.Password != "". | ||||
// | ||||
// When users use the TLS CommonName based authentication, the | ||||
// authTokenBundle is always nil. But it's possible for the clien | ||||
ts | ||||
// to get `rpctypes.ErrAuthOldRevision` response when the clients | ||||
// concurrently modify auth data (e.g, addUser, deleteUser etc.). | ||||
// In this case, there is no need to refresh the token; instead t | ||||
he | ||||
// clients just need to retry the operations (e.g. Put, Delete et | ||||
c). | ||||
return nil | ||||
} | ||||
// clear auth token before refreshing it. | ||||
c.authTokenBundle.UpdateAuthToken("") | ||||
return c.getToken(ctx) | ||||
} | ||||
// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a | // type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a | |||
// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish | // proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish | |||
// a new ClientStream according to the retry policy. | // a new ClientStream according to the retry policy. | |||
type serverStreamingRetryingStream struct { | type serverStreamingRetryingStream struct { | |||
grpc.ClientStream | grpc.ClientStream | |||
client *Client | client *Client | |||
bufferedSends []interface{} // single message that the client can sen | bufferedSends []interface{} // single message that the client can sen | |||
receivedGood bool // indicates whether any prior receives were successful | receivedGood bool // indicates whether any prior receives were successful | |||
wasClosedSend bool // indicates that CloseSend was closed | wasClosedSend bool // indicates that CloseSend was closed | |||
ctx context.Context | ctx context.Context | |||
skipping to change at line 262 | skipping to change at line 274 | |||
return false, err | return false, err | |||
} | } | |||
if isContextError(err) { | if isContextError(err) { | |||
if s.ctx.Err() != nil { | if s.ctx.Err() != nil { | |||
return false, err | return false, err | |||
} | } | |||
// its the callCtx deadline or cancellation, in which case try ag ain. | // its the callCtx deadline or cancellation, in which case try ag ain. | |||
return true, err | return true, err | |||
} | } | |||
if s.client.shouldRefreshToken(err, s.callOpts) { | if s.client.shouldRefreshToken(err, s.callOpts) { | |||
// clear auth token to avoid failure when call getToken | gterr := s.client.refreshToken(s.ctx) | |||
s.client.authTokenBundle.UpdateAuthToken("") | ||||
gterr := s.client.getToken(s.ctx) | ||||
if gterr != nil { | if gterr != nil { | |||
s.client.lg.Warn("retry failed to fetch new auth token", zap.Error(gterr)) | s.client.lg.Warn("retry failed to fetch new auth token", zap.Error(gterr)) | |||
return false, err // return the original error for simpli city | return false, err // return the original error for simpli city | |||
} | } | |||
return true, err | return true, err | |||
} | } | |||
return isSafeRetry(s.client.lg, err, s.callOpts), err | return isSafeRetry(s.client.lg, err, s.callOpts), err | |||
} | } | |||
End of changes. 3 change blocks. | ||||
14 lines changed or deleted | 23 lines changed or added |