"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "manila/tests/share/drivers/test_lvm.py" between
manila-8.1.3.tar.gz and manila-8.1.4.tar.gz

About: OpenStack Manila provides “Shared Filesystems as a service”.
The "Stein" series (maintained release).

test_lvm.py  (manila-8.1.3):test_lvm.py  (manila-8.1.4)
skipping to change at line 266 skipping to change at line 266
def test_deallocate_container_error(self): def test_deallocate_container_error(self):
def _fake_exec(*args, **kwargs): def _fake_exec(*args, **kwargs):
raise exception.ProcessExecutionError(stderr="error") raise exception.ProcessExecutionError(stderr="error")
self.mock_object(self._driver, '_try_execute', _fake_exec) self.mock_object(self._driver, '_try_execute', _fake_exec)
self.assertRaises(exception.ProcessExecutionError, self.assertRaises(exception.ProcessExecutionError,
self._driver._deallocate_container, self._driver._deallocate_container,
self.share['name']) self.share['name'])
def test_deallocate_container_not_found_error(self): @ddt.data(
'Logical volume "fake/fake-volume" not found\n',
'Failed to find logical volume "fake/fake-volume"\n')
def test_deallocate_container_not_found_error(self, error_msg):
def _fake_exec(*args, **kwargs): def _fake_exec(*args, **kwargs):
raise exception.ProcessExecutionError(stderr="not found") raise exception.ProcessExecutionError(stderr=error_msg)
self.mock_object(self._driver, '_try_execute', _fake_exec) self.mock_object(self._driver, '_try_execute', _fake_exec)
self._driver._deallocate_container(self.share['name']) self._driver._deallocate_container(self.share['name'])
@mock.patch.object(lvm.LVMShareDriver, '_update_share_stats', mock.Mock()) @mock.patch.object(lvm.LVMShareDriver, '_update_share_stats', mock.Mock())
def test_get_share_stats(self): def test_get_share_stats(self):
with mock.patch.object(self._driver, '_stats', mock.Mock) as stats: with mock.patch.object(self._driver, '_stats', mock.Mock) as stats:
self.assertEqual(stats, self._driver.get_share_stats()) self.assertEqual(stats, self._driver.get_share_stats())
self.assertFalse(self._driver._update_share_stats.called) self.assertFalse(self._driver._update_share_stats.called)
@mock.patch.object(lvm.LVMShareDriver, '_update_share_stats', mock.Mock()) @mock.patch.object(lvm.LVMShareDriver, '_update_share_stats', mock.Mock())
def test_get_share_stats_refresh(self): def test_get_share_stats_refresh(self):
with mock.patch.object(self._driver, '_stats', mock.Mock) as stats: with mock.patch.object(self._driver, '_stats', mock.Mock) as stats:
self.assertEqual(stats, self.assertEqual(stats,
self._driver.get_share_stats(refresh=True)) self._driver.get_share_stats(refresh=True))
self._driver._update_share_stats.assert_called_once_with() self._driver._update_share_stats.assert_called_once_with()
def test__unmount_device_not_mounted(self):
def exec_runner(*ignore_args, **ignore_kwargs):
umount_msg = (
"umount: /opt/stack/data/manila/mnt/share-fake-share: not "
"mounted.\n"
)
raise exception.ProcessExecutionError(stderr=umount_msg)
self._os.path.exists.return_value = True
mount_path = self._get_mount_path(self.share)
expected_exec = "umount -f %s" % (mount_path)
fake_utils.fake_execute_set_repliers([(expected_exec, exec_runner)])
self._driver._unmount_device(self.share, raise_if_missing=False)
self._os.path.exists.assert_called_with(mount_path)
def test__unmount_device_is_busy_error(self): def test__unmount_device_is_busy_error(self):
def exec_runner(*ignore_args, **ignore_kwargs): def exec_runner(*ignore_args, **ignore_kwargs):
raise exception.ProcessExecutionError(stderr='device is busy') raise exception.ProcessExecutionError(stderr='device is busy')
self._os.path.exists.return_value = True self._os.path.exists.return_value = True
mount_path = self._get_mount_path(self.share) mount_path = self._get_mount_path(self.share)
expected_exec = [ expected_exec = [
"umount -f %s" % (mount_path), "umount -f %s" % (mount_path),
] ]
fake_utils.fake_execute_set_repliers([(expected_exec[0], exec_runner)]) fake_utils.fake_execute_set_repliers([(expected_exec[0], exec_runner)])
skipping to change at line 485 skipping to change at line 504
run_as_root=True) run_as_root=True)
self._os.path.exists.assert_called_with(mount_path) self._os.path.exists.assert_called_with(mount_path)
def test_extend_share(self): def test_extend_share(self):
local_path = self._driver._get_local_path(self.share) local_path = self._driver._get_local_path(self.share)
self.mock_object(self._driver, '_extend_container') self.mock_object(self._driver, '_extend_container')
self.mock_object(self._driver, '_execute') self.mock_object(self._driver, '_execute')
self._driver.extend_share(self.share, 3) self._driver.extend_share(self.share, 3)
self._driver._extend_container.assert_called_once_with(self.share, self._driver._extend_container.assert_called_once_with(self.share,
local_path, 3) local_path, 3)
self._driver._execute.assert_called_once_with('resize2fs', local_path,
run_as_root=True)
def test_ssh_exec_as_root(self): def test_ssh_exec_as_root(self):
command = ['fake_command'] command = ['fake_command']
self.mock_object(self._driver, '_execute') self.mock_object(self._driver, '_execute')
self._driver._ssh_exec_as_root('fake_server', command) self._driver._ssh_exec_as_root('fake_server', command)
self._driver._execute.assert_called_once_with('fake_command', self._driver._execute.assert_called_once_with('fake_command',
check_exit_code=True) check_exit_code=True)
def test_ssh_exec_as_root_with_sudo(self): def test_ssh_exec_as_root_with_sudo(self):
command = ['sudo', 'fake_command'] command = ['sudo', 'fake_command']
skipping to change at line 509 skipping to change at line 526
self._driver._execute.assert_called_once_with( self._driver._execute.assert_called_once_with(
'fake_command', run_as_root=True, check_exit_code=True) 'fake_command', run_as_root=True, check_exit_code=True)
def test_extend_container(self): def test_extend_container(self):
self.mock_object(self._driver, '_try_execute') self.mock_object(self._driver, '_try_execute')
self._driver._extend_container(self.share, 'device_name', 3) self._driver._extend_container(self.share, 'device_name', 3)
self._driver._try_execute.assert_called_once_with( self._driver._try_execute.assert_called_once_with(
'lvextend', 'lvextend',
'-L', '-L',
'3G', '3G',
'-n', '-r',
'device_name', 'device_name',
run_as_root=True) run_as_root=True)
def test_get_share_server_pools(self): def test_get_share_server_pools(self):
expected_result = [{ expected_result = [{
'pool_name': 'lvm-single-pool', 'pool_name': 'lvm-single-pool',
'total_capacity_gb': 33, 'total_capacity_gb': 33,
'free_capacity_gb': 22, 'free_capacity_gb': 22,
'reserved_percentage': 0, 'reserved_percentage': 0,
}, ] }, ]
 End of changes. 5 change blocks. 
5 lines changed or deleted 22 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)