agent_client.py (ironic-17.0.2) | : | agent_client.py (ironic-17.0.3) | ||
---|---|---|---|---|
skipping to change at line 27 | skipping to change at line 27 | |||
from ironic_lib import metrics_utils | from ironic_lib import metrics_utils | |||
from oslo_log import log | from oslo_log import log | |||
from oslo_serialization import jsonutils | from oslo_serialization import jsonutils | |||
from oslo_utils import strutils | from oslo_utils import strutils | |||
import requests | import requests | |||
import tenacity | import tenacity | |||
from ironic.common import exception | from ironic.common import exception | |||
from ironic.common.i18n import _ | from ironic.common.i18n import _ | |||
from ironic.common import utils | ||||
from ironic.conf import CONF | from ironic.conf import CONF | |||
LOG = log.getLogger(__name__) | LOG = log.getLogger(__name__) | |||
METRICS = metrics_utils.get_metrics_logger(__name__) | METRICS = metrics_utils.get_metrics_logger(__name__) | |||
DEFAULT_IPA_PORTAL_PORT = 3260 | DEFAULT_IPA_PORTAL_PORT = 3260 | |||
REBOOT_COMMAND = 'run_image' | REBOOT_COMMAND = 'run_image' | |||
skipping to change at line 54 | skipping to change at line 55 | |||
if error is None: | if error is None: | |||
LOG.error('Agent returned invalid response: missing command_error in ' | LOG.error('Agent returned invalid response: missing command_error in ' | |||
'%s', command) | '%s', command) | |||
return _('Invalid agent response') | return _('Invalid agent response') | |||
if isinstance(error, dict): | if isinstance(error, dict): | |||
return error.get('details') or error.get('message') or str(error) | return error.get('details') or error.get('message') or str(error) | |||
else: | else: | |||
return error | return error | |||
def _sanitize_for_logging(var): | ||||
if not var: | ||||
return var | ||||
elif isinstance(var, str): | ||||
return strutils.mask_password(var) | ||||
else: | ||||
return utils.remove_large_keys(strutils.mask_dict_password(var)) | ||||
class AgentClient(object): | class AgentClient(object): | |||
"""Client for interacting with nodes via a REST API.""" | """Client for interacting with nodes via a REST API.""" | |||
@METRICS.timer('AgentClient.__init__') | @METRICS.timer('AgentClient.__init__') | |||
def __init__(self): | def __init__(self): | |||
self.session = requests.Session() | self.session = requests.Session() | |||
self.session.headers.update({'Content-Type': 'application/json'}) | self.session.headers.update({'Content-Type': 'application/json'}) | |||
def _get_command_url(self, node): | def _get_command_url(self, node): | |||
"""Get URL endpoint for agent command request""" | """Get URL endpoint for agent command request""" | |||
agent_url = node.driver_internal_info.get('agent_url') | agent_url = node.driver_internal_info.get('agent_url') | |||
skipping to change at line 171 | skipping to change at line 180 | |||
assert not (wait and poll) | assert not (wait and poll) | |||
url = self._get_command_url(node) | url = self._get_command_url(node) | |||
body = self._get_command_body(method, params) | body = self._get_command_body(method, params) | |||
request_params = { | request_params = { | |||
'wait': str(wait).lower() | 'wait': str(wait).lower() | |||
} | } | |||
agent_token = node.driver_internal_info.get('agent_secret_token') | agent_token = node.driver_internal_info.get('agent_secret_token') | |||
if agent_token: | if agent_token: | |||
request_params['agent_token'] = agent_token | request_params['agent_token'] = agent_token | |||
LOG.debug('Executing agent command %(method)s for node %(node)s', | LOG.debug('Executing agent command %(method)s for node %(node)s ' | |||
{'node': node.uuid, 'method': method}) | 'with params %(params)s', | |||
{'node': node.uuid, 'method': method, | ||||
'params': _sanitize_for_logging(request_params)}) | ||||
try: | try: | |||
response = self.session.post( | response = self.session.post( | |||
url, params=request_params, data=body, | url, params=request_params, data=body, | |||
verify=self._get_verify(node), | verify=self._get_verify(node), | |||
timeout=CONF.agent.command_timeout) | timeout=CONF.agent.command_timeout) | |||
except (requests.ConnectionError, requests.Timeout) as e: | except (requests.ConnectionError, requests.Timeout) as e: | |||
msg = (_('Failed to connect to the agent running on node %(node)s ' | result = self._handle_timeout_on_command_execution(node, method, | |||
'for invoking command %(method)s. Error: %(error)s') % | params, e) | |||
{'node': node.uuid, 'method': method, 'error': e}) | response = None | |||
LOG.error(msg) | ||||
raise exception.AgentConnectionFailed(reason=msg) | ||||
except requests.RequestException as e: | except requests.RequestException as e: | |||
msg = (_('Error invoking agent command %(method)s for node ' | msg = (_('Error invoking agent command %(method)s for node ' | |||
'%(node)s. Error: %(error)s') % | '%(node)s. Error: %(error)s') % | |||
{'method': method, 'node': node.uuid, 'error': e}) | {'method': method, 'node': node.uuid, 'error': e}) | |||
LOG.error(msg) | LOG.error(msg) | |||
raise exception.IronicException(msg) | raise exception.IronicException(msg) | |||
# TODO(russellhaering): real error handling | if response is not None: | |||
try: | # TODO(russellhaering): real error handling | |||
result = response.json() | try: | |||
except ValueError: | result = response.json() | |||
msg = _( | except ValueError: | |||
'Unable to decode response as JSON.\n' | msg = _( | |||
'Request URL: %(url)s\nRequest body: "%(body)s"\n' | 'Unable to decode response as JSON.\n' | |||
'Response status code: %(code)s\n' | 'Request URL: %(url)s\nRequest body: "%(body)s"\n' | |||
'Response: "%(response)s"' | 'Response status code: %(code)s\n' | |||
) % ({'response': response.text, 'body': body, 'url': url, | 'Response: "%(response)s"' | |||
'code': response.status_code}) | ) % ({'response': response.text, 'body': body, 'url': url, | |||
LOG.error(msg) | 'code': response.status_code}) | |||
raise exception.IronicException(msg) | LOG.error(msg) | |||
raise exception.IronicException(msg) | ||||
error = result.get('command_error') | error = result.get('command_error') | |||
LOG.debug('Agent command %(method)s for node %(node)s returned ' | LOG.debug('Agent command %(method)s for node %(node)s returned ' | |||
'result %(res)s, error %(error)s, HTTP status code %(code)d', | 'result %(res)s, error %(error)s, HTTP status code %(code)s', | |||
{'node': node.uuid, 'method': method, | {'node': node.uuid, 'method': method, | |||
'res': result.get('command_result'), | 'res': _sanitize_for_logging(result.get('command_result')), | |||
'error': error, | 'error': error, | |||
'code': response.status_code}) | 'code': response.status_code if response is not None | |||
if response.status_code >= http_client.BAD_REQUEST: | else 'unknown'}) | |||
if (response is not None | ||||
and response.status_code >= http_client.BAD_REQUEST): | ||||
faultstring = result.get('faultstring') | faultstring = result.get('faultstring') | |||
if 'agent_token' in faultstring: | if 'agent_token' in faultstring: | |||
LOG.error('Agent command %(method)s for node %(node)s ' | LOG.error('Agent command %(method)s for node %(node)s ' | |||
'failed. Expected 2xx HTTP status code, got ' | 'failed. Expected 2xx HTTP status code, got ' | |||
'%(code)d. Error suggests an older ramdisk ' | '%(code)d. Error suggests an older ramdisk ' | |||
'which does not support ``agent_token``. ' | 'which does not support ``agent_token``. ' | |||
'This is a fatal error.', | 'This is a fatal error.', | |||
{'method': method, 'node': node.uuid, | {'method': method, 'node': node.uuid, | |||
'code': response.status_code}) | 'code': response.status_code}) | |||
else: | else: | |||
skipping to change at line 313 | skipping to change at line 325 | |||
_get = tenacity.retry( | _get = tenacity.retry( | |||
retry=tenacity.retry_if_exception_type( | retry=tenacity.retry_if_exception_type( | |||
exception.AgentConnectionFailed), | exception.AgentConnectionFailed), | |||
stop=tenacity.stop_after_attempt( | stop=tenacity.stop_after_attempt( | |||
CONF.agent.max_command_attempts), | CONF.agent.max_command_attempts), | |||
reraise=True)(_get) | reraise=True)(_get) | |||
result = _get().json()['commands'] | result = _get().json()['commands'] | |||
status = '; '.join('%(cmd)s: result "%(res)s", error "%(err)s"' % | status = '; '.join('%(cmd)s: result "%(res)s", error "%(err)s"' % | |||
{'cmd': r.get('command_name'), | {'cmd': r.get('command_name'), | |||
'res': r.get('command_result'), | 'res': _sanitize_for_logging( | |||
r.get('command_result')), | ||||
'err': r.get('command_error')} | 'err': r.get('command_error')} | |||
for r in result) | for r in result) | |||
LOG.debug('Status of agent commands for node %(node)s: %(status)s', | LOG.debug('Status of agent commands for node %(node)s: %(status)s', | |||
{'node': node.uuid, 'status': status}) | {'node': node.uuid, 'status': status}) | |||
return result | return result | |||
def _status_if_last_command_matches(self, node, method, params): | ||||
"""Return the status of the given command if it's the last running.""" | ||||
try: | ||||
method = method.split('.', 1)[1] | ||||
except IndexError: | ||||
pass | ||||
commands = self.get_commands_status(node) | ||||
if not commands: | ||||
return None | ||||
# TODO(dtantsur): a more reliable way to detect repeated execution | ||||
# would be to pass a sort of require ID to the agent. | ||||
command = commands[-1] | ||||
if command['command_name'] != method: | ||||
LOG.debug('Command %(cmd)s is not currently executing, the last ' | ||||
'command is %(curr)s', | ||||
{'cmd': method, 'curr': command['command_name']}) | ||||
return None | ||||
if command['command_status'] != 'RUNNING': | ||||
LOG.debug('Command %(cmd)s is not currently executing, its status ' | ||||
'is %(curr)s', | ||||
{'cmd': method, 'curr': command['command_status']}) | ||||
return None | ||||
return command | ||||
def _handle_timeout_on_command_execution(self, node, method, params, | ||||
error): | ||||
result = None | ||||
# NOTE(dtantsur): it is possible, especially with eventlet+TLS, that | ||||
# agent receives a command but fails to return the result to Ironic. | ||||
# To avoid a failure, check if the last command is the one we're trying | ||||
# to execute. | ||||
try: | ||||
result = self._status_if_last_command_matches(node, method, params) | ||||
except Exception as e: | ||||
msg = (_('Failed to connect to the agent running on node ' | ||||
'%(node)s for checking the last command status ' | ||||
'after failing to invoke command %(method)s. ' | ||||
'Error: %(error)s') % | ||||
{'node': node.uuid, 'method': method, 'error': e}) | ||||
LOG.error(msg) | ||||
if result is None: | ||||
msg = (_('Failed to connect to the agent running on node %(node)s ' | ||||
'for invoking command %(method)s. Error: %(error)s') % | ||||
{'node': node.uuid, 'method': method, 'error': error}) | ||||
LOG.error(msg) | ||||
raise exception.AgentConnectionFailed(reason=msg) | ||||
return result | ||||
def get_last_command_status(self, node, method): | def get_last_command_status(self, node, method): | |||
"""Get the last status for the given command. | """Get the last status for the given command. | |||
:param node: A Node object. | :param node: A Node object. | |||
:param method: Command name. | :param method: Command name. | |||
:returns: A dict containing command status from agent or None | :returns: A dict containing command status from agent or None | |||
if the command was not found. | if the command was not found. | |||
""" | """ | |||
try: | try: | |||
method = method.split('.', 1)[1] | method = method.split('.', 1)[1] | |||
skipping to change at line 594 | skipping to change at line 662 | |||
:: | :: | |||
{ | { | |||
'deploy_result': <the result of execution, step specific>, | 'deploy_result': <the result of execution, step specific>, | |||
'deploy_step': <the deploy step issued to agent> | 'deploy_step': <the deploy step issued to agent> | |||
} | } | |||
""" | """ | |||
params = { | params = { | |||
'step': step, | 'step': step, | |||
'node': node.as_dict(secure=True), | 'node': node.as_dict(secure=True, mask_configdrive=False), | |||
'ports': [port.as_dict() for port in ports], | 'ports': [port.as_dict() for port in ports], | |||
'deploy_version': node.driver_internal_info.get( | 'deploy_version': node.driver_internal_info.get( | |||
'hardware_manager_version') | 'hardware_manager_version') | |||
} | } | |||
return self._command(node=node, | return self._command(node=node, | |||
method='deploy.execute_deploy_step', | method='deploy.execute_deploy_step', | |||
params=params) | params=params) | |||
@METRICS.timer('AgentClient.get_partition_uuids') | @METRICS.timer('AgentClient.get_partition_uuids') | |||
def get_partition_uuids(self, node): | def get_partition_uuids(self, node): | |||
End of changes. 11 change blocks. | ||||
26 lines changed or deleted | 94 lines changed or added |