"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "test/units/plugins/connection/test_ssh.py" between
ansible-2.7.6.tar.gz and ansible-2.7.7.tar.gz

About: Ansible is a platform for configuring and managing computers combining multi-node software deployment, ad hoc task execution, and configuration management.

test_ssh.py  (ansible-2.7.6):test_ssh.py  (ansible-2.7.7)
skipping to change at line 27 skipping to change at line 27
# along with Ansible. If not, see <http://www.gnu.org/licenses/>. # along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish # Make coding more python3-ish
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
from io import StringIO from io import StringIO
import pytest import pytest
from ansible import constants as C from ansible import constants as C
from ansible.errors import AnsibleAuthenticationFailure
from ansible.compat.selectors import SelectorKey, EVENT_READ from ansible.compat.selectors import SelectorKey, EVENT_READ
from ansible.compat.tests import unittest from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock, PropertyMock from ansible.compat.tests.mock import patch, MagicMock, PropertyMock
from ansible.errors import AnsibleError, AnsibleConnectionFailure, AnsibleFileNo tFound from ansible.errors import AnsibleError, AnsibleConnectionFailure, AnsibleFileNo tFound
from ansible.module_utils.six.moves import shlex_quote from ansible.module_utils.six.moves import shlex_quote
from ansible.module_utils._text import to_bytes from ansible.module_utils._text import to_bytes
from ansible.playbook.play_context import PlayContext from ansible.playbook.play_context import PlayContext
from ansible.plugins.connection import ssh from ansible.plugins.connection import ssh
from ansible.plugins.loader import connection_loader from ansible.plugins.loader import connection_loader
skipping to change at line 498 skipping to change at line 499
return_code, b_stdout, b_stderr = self.conn._run("ssh", "") return_code, b_stdout, b_stderr = self.conn._run("ssh", "")
assert return_code == 0 assert return_code == 0
assert b_stdout == b'some data' assert b_stdout == b'some data'
assert b_stderr == b'' assert b_stderr == b''
assert self.mock_selector.register.called is True assert self.mock_selector.register.called is True
assert self.mock_selector.register.call_count == 2 assert self.mock_selector.register.call_count == 2
assert self.conn._send_initial_data.called is False assert self.conn._send_initial_data.called is False
@pytest.mark.usefixtures('mock_run_env') @pytest.mark.usefixtures('mock_run_env')
class TestSSHConnectionRetries(object): class TestSSHConnectionRetries(object):
def test_incorrect_password(self, monkeypatch):
monkeypatch.setattr(C, 'HOST_KEY_CHECKING', False)
monkeypatch.setattr(C, 'ANSIBLE_SSH_RETRIES', 5)
monkeypatch.setattr('time.sleep', lambda x: None)
self.mock_popen_res.stdout.read.side_effect = [b'']
self.mock_popen_res.stderr.read.side_effect = [b'Permission denied, plea
se try again.\r\n']
type(self.mock_popen_res).returncode = PropertyMock(side_effect=[5] * 4)
self.mock_selector.select.side_effect = [
[(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None),
EVENT_READ)],
[(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None),
EVENT_READ)],
[],
]
self.mock_selector.get_map.side_effect = lambda: True
self.conn._build_command = MagicMock()
self.conn._build_command.return_value = [b'sshpass', b'-d41', b'ssh', b'
-C']
self.conn.get_option = MagicMock()
self.conn.get_option.return_value = True
exception_info = pytest.raises(AnsibleAuthenticationFailure, self.conn.e
xec_command, 'sshpass', 'some data')
assert exception_info.value.message == ('Invalid/incorrect username/pass
word. Skipping remaining 5 retries to prevent account lockout: '
'Permission denied, please try a
gain.')
assert self.mock_popen.call_count == 1
def test_retry_then_success(self, monkeypatch): def test_retry_then_success(self, monkeypatch):
monkeypatch.setattr(C, 'HOST_KEY_CHECKING', False) monkeypatch.setattr(C, 'HOST_KEY_CHECKING', False)
monkeypatch.setattr(C, 'ANSIBLE_SSH_RETRIES', 3) monkeypatch.setattr(C, 'ANSIBLE_SSH_RETRIES', 3)
monkeypatch.setattr('time.sleep', lambda x: None) monkeypatch.setattr('time.sleep', lambda x: None)
self.mock_popen_res.stdout.read.side_effect = [b"", b"my_stdout\n", b"se cond_line"] self.mock_popen_res.stdout.read.side_effect = [b"", b"my_stdout\n", b"se cond_line"]
self.mock_popen_res.stderr.read.side_effect = [b"", b"my_stderr"] self.mock_popen_res.stderr.read.side_effect = [b"", b"my_stderr"]
type(self.mock_popen_res).returncode = PropertyMock(side_effect=[255] * 3 + [0] * 4) type(self.mock_popen_res).returncode = PropertyMock(side_effect=[255] * 3 + [0] * 4)
 End of changes. 2 change blocks. 
0 lines changed or deleted 35 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)