"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "freezer/tests/unit/scheduler/test_utils.py" between
freezer-9.0.0.tar.gz and freezer-10.0.0.tar.gz

About: OpenStack Freezer is a tool to automate (incremental) data backup and restore process using OpenStack Swift and/or other media storage.
The "Wallaby" series (latest release).

test_utils.py  (freezer-9.0.0):test_utils.py  (freezer-10.0.0)
skipping to change at line 37 skipping to change at line 37
def setUp(self): def setUp(self):
self.client = mock.Mock() self.client = mock.Mock()
self.client.clients.create = mock.Mock(return_value='test') self.client.clients.create = mock.Mock(return_value='test')
self.client.jobs.list = mock.Mock(return_value=job_list) self.client.jobs.list = mock.Mock(return_value=job_list)
self.client.client_id = "test" self.client.client_id = "test"
def test_do_register(self): def test_do_register(self):
ret = utils.do_register(self.client, args=None) ret = utils.do_register(self.client, args=None)
self.assertEqual(0, ret) self.assertEqual(0, ret)
def test_del_register_error(self):
self.client.clients.delete = mock.Mock(side_effect=Exception(
'delete client error: bad request'))
with self.assertRaises(Exception) as cm: # noqa
utils.del_register(self.client)
the_exception = cm.exception
self.assertIn('delete client error', str(the_exception))
def test_find_config_files(self): def test_find_config_files(self):
temp = tempfile.NamedTemporaryFile('wb', delete=True, temp = tempfile.NamedTemporaryFile('wb', delete=True,
suffix='.conf') suffix='.conf')
ret = utils.find_config_files(temp.name) ret = utils.find_config_files(temp.name)
self.assertEqual([temp.name], ret) self.assertEqual([temp.name], ret)
temp.close() temp.close()
self.assertFalse(os.path.exists(temp.name)) self.assertFalse(os.path.exists(temp.name))
def test_find_config_files_path(self): def test_find_config_files_path(self):
temp = tempfile.NamedTemporaryFile('wb', delete=True, temp = tempfile.NamedTemporaryFile('wb', delete=True,
suffix='.conf') suffix='.conf')
temp_path = os.path.dirname(temp.name) temp_path = os.path.dirname(temp.name)
ret = utils.find_config_files(temp_path) ret = utils.find_config_files(temp_path)
self.assertEqual([temp.name], ret) self.assertEqual([temp.name], ret)
temp.close() temp.close()
self.assertFalse(os.path.exists(temp.name)) self.assertFalse(os.path.exists(temp.name))
@patch('oslo_serialization.jsonutils.load')
def test_load_doc_from_json_file(self, mock_load):
os.mknod("/tmp/test_freezer.conf")
mock_load.side_effect = Exception('error')
try:
utils.load_doc_from_json_file("/tmp/test_freezer.conf")
except Exception as e:
self.assertIn("Unable to load conf file", str(e))
os.remove("/tmp/test_freezer.conf")
def test_get_jobs_from_disk(self): def test_get_jobs_from_disk(self):
temp = tempfile.mkdtemp() temp = tempfile.mkdtemp()
file = '/'.join([temp, "test.conf"]) file = '/'.join([temp, "test.conf"])
data = b'{"job_id": "test"}' data = b'{"job_id": "test"}'
with open(file, 'wb') as f: with open(file, 'wb') as f:
f.write(data) f.write(data)
ret = utils.get_jobs_from_disk(temp) ret = utils.get_jobs_from_disk(temp)
self.assertEqual(job_list, ret) self.assertEqual(job_list, ret)
shutil.rmtree(temp) shutil.rmtree(temp)
self.assertFalse(os.path.exists(file)) self.assertFalse(os.path.exists(file))
skipping to change at line 77 skipping to change at line 95
tmpdir = tempfile.mkdtemp() tmpdir = tempfile.mkdtemp()
utils.save_jobs_to_disk(job_doc_list, tmpdir) utils.save_jobs_to_disk(job_doc_list, tmpdir)
file = '/'.join([tmpdir, "job_test.conf"]) file = '/'.join([tmpdir, "job_test.conf"])
self.assertTrue(os.path.exists(file)) self.assertTrue(os.path.exists(file))
shutil.rmtree(tmpdir) shutil.rmtree(tmpdir)
def test_get_active_jobs_from_api(self): def test_get_active_jobs_from_api(self):
ret = utils.get_active_jobs_from_api(self.client) ret = utils.get_active_jobs_from_api(self.client)
self.assertEqual(job_list, ret) self.assertEqual(job_list, ret)
@patch('os.kill')
@patch('psutil.Process')
def test_terminate_subprocess1(self, mock_process, mock_oskill):
mock_pro = mock.MagicMock()
mock_pro.name.startswith.return_value = False
mock_process.return_value = mock_pro
result = utils.terminate_subprocess(35, 'test')
self.assertIsNone(result)
mock_pro.name.startswith.return_value = True
mock_oskill.side_effect = Exception("error")
result = utils.terminate_subprocess(35, 'test')
self.assertIsNone(result)
@patch('psutil.Process') @patch('psutil.Process')
def test_terminate_subprocess(self, mock_process_constructor): def test_terminate_subprocess(self, mock_process_constructor):
mock_pro = mock_process_constructor.return_value mock_pro = mock_process_constructor.return_value
seffect = mock.Mock( seffect = mock.Mock(
side_effect=Exception('Process 35 does not exists anymore')) side_effect=Exception('Process 35 does not exists anymore'))
mock_pro.raiseError.side_effect = seffect mock_pro.raiseError.side_effect = seffect
with self.assertRaises(Exception) as cm: # noqa with self.assertRaises(Exception) as cm: # noqa
utils.terminate_subprocess(35, "test") utils.terminate_subprocess(35, "test")
the_exception = cm.exception the_exception = cm.exception
self.assertIn('does not exists anymore', self.assertIn('does not exists anymore',
 End of changes. 3 change blocks. 
0 lines changed or deleted 31 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)