"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "freezer/tests/unit/scheduler/test_scheduler_job.py" between
freezer-9.0.0.tar.gz and freezer-10.0.0.tar.gz

About: OpenStack Freezer is a tool to automate (incremental) data backup and restore process using OpenStack Swift and/or other media storage.
The "Wallaby" series (latest release).

test_scheduler_job.py  (freezer-9.0.0):test_scheduler_job.py  (freezer-10.0.0)
skipping to change at line 16 skipping to change at line 16
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, # distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os import os
import shutil
import tempfile import tempfile
import unittest import unittest
from freezer.scheduler import scheduler_job from freezer.scheduler import scheduler_job
from oslo_config import cfg
from unittest import mock
CONF = cfg.CONF
action = {"action": "backup", "storage": "local", action = {"action": "backup", "storage": "local",
"mode": "fs", "backup_name": "test", "mode": "fs", "backup_name": "test",
"container": "/tmp/backuped", "container": "/tmp/backuped",
"path_to_backup": "/tmp/to_backup"} "path_to_backup": "/tmp/to_backup"}
class TestSchedulerJob(unittest.TestCase): class TestSchedulerJob(unittest.TestCase):
def setUp(self): def setUp(self):
self.job = scheduler_job.Job(None, None, {"job_schedule": {}}) self.job = scheduler_job.Job(None, None, {"job_schedule": {}})
skipping to change at line 52 skipping to change at line 57
delete=False) as config_file: delete=False) as config_file:
self.job.save_action_to_file(action, config_file) self.job.save_action_to_file(action, config_file)
self.assertTrue(os.path.exists(config_file.name)) self.assertTrue(os.path.exists(config_file.name))
def test_save_action_with_bool_value_to_disk(self): def test_save_action_with_bool_value_to_disk(self):
action.update({"no_incremental": False}) action.update({"no_incremental": False})
with tempfile.NamedTemporaryFile(mode='w', with tempfile.NamedTemporaryFile(mode='w',
delete=False) as config_file: delete=False) as config_file:
self.job.save_action_to_file(action, config_file) self.job.save_action_to_file(action, config_file)
self.assertTrue(os.path.exists(config_file.name)) self.assertTrue(os.path.exists(config_file.name))
class TestSchedulerJob1(unittest.TestCase):
def setUp(self):
self.scheduler = mock.MagicMock()
self.job_schedule = {"event": "start", "status": "start",
"schedule_day": "1"}
self.jobdoc = {"job_id": "test", "job_schedule": self.job_schedule}
self.job = scheduler_job.Job(self.scheduler, None, self.jobdoc)
def test_stopstate_stop(self):
result = scheduler_job.StopState.stop(self.job, self.jobdoc)
self.assertEqual(result, '')
def test_stopstate_abort(self):
result = scheduler_job.StopState.abort(self.job, self.jobdoc)
self.assertEqual(result, '')
def test_stopstate_start(self):
result = scheduler_job.StopState.start(self.job, self.jobdoc)
self.assertEqual(result, '')
def test_stopstate_remove(self):
result = scheduler_job.StopState.remove(self.job)
self.assertEqual(result, '')
def test_scheduledstate_stop(self):
result = scheduler_job.ScheduledState.stop(self.job, self.jobdoc)
self.assertEqual(result, 'stop')
def test_scheduledstate_abort(self):
result = scheduler_job.ScheduledState.abort(self.job, self.jobdoc)
self.assertEqual(result, '')
def test_scheduledstate_start(self):
result = scheduler_job.ScheduledState.start(self.job, self.jobdoc)
self.assertEqual(result, '')
def test_scheduledstate_remove(self):
result = scheduler_job.ScheduledState.remove(self.job)
self.assertEqual(result, '')
def test_runningstate_stop(self):
result = scheduler_job.RunningState.stop(self.job, {})
self.assertEqual(result, '')
def test_runningstate_abort(self):
result = scheduler_job.RunningState.abort(self.job, self.jobdoc)
self.assertEqual(result, 'aborted')
def test_runningstate_start(self):
result = scheduler_job.RunningState.start(self.job, self.jobdoc)
self.assertEqual(result, '')
def test_runningstate_remove(self):
result = scheduler_job.RunningState.remove(self.job)
self.assertEqual(result, '')
def test_job_create(self):
jobdoc = {"job_id": "test", "job_schedule": {"status": "running"}}
result = scheduler_job.Job.create(None, None, jobdoc)
self.assertEqual(result.job_doc_status, 'running')
jobdoc = {"job_id": "test", "job_schedule": {"status": "stop"}}
result = scheduler_job.Job.create(None, None, jobdoc)
self.assertEqual(result.event, 'stop')
jobdoc = {"job_id": "test", "job_schedule": {}}
result = scheduler_job.Job.create(None, None, jobdoc)
self.assertEqual(result.event, 'start')
def test_job_remove(self):
result = self.job.remove()
self.assertIsNone(result)
def test_job_session_id(self):
self.assertEqual(self.job.session_id, '')
self.job.session_id = 'test'
self.assertEqual(self.job.session_id, 'test')
def test_job_session_tag(self):
self.assertEqual(self.job.session_tag, 0)
self.job.session_tag = 1
self.assertEqual(self.job.session_tag, 1)
def test_job_result(self):
self.assertEqual(self.job.result, '')
self.job.result = 'test'
self.assertEqual(self.job.result, 'test')
def test_job_can_be_removed(self):
result = self.job.can_be_removed()
self.assertFalse(result)
def test_save_action_to_file(self):
action = {'start': "test"}
temp = tempfile.mkdtemp()
filename = '/'.join([temp, "test.conf"])
f = mock.MagicMock()
f.name = filename
result = self.job.save_action_to_file(action, f)
self.assertIsNone(result)
shutil.rmtree(temp)
def test_job_schedule_end_date(self):
self.assertEqual(self.job.schedule_end_date, '')
def test_job_schedule_cron_fields(self):
result = self.job.schedule_cron_fields
self.assertEqual(result, {"day": "1"})
def test_get_schedule_args(self):
jobdoc1 = {"job_schedule":
{"schedule_start_date": "2020-01-10T10:10:10",
"schedule_end_date": "2020-11-10T10:10:10",
"schedule_date": "2020-09-10T10:10:10"}}
job1 = scheduler_job.Job(self.scheduler, None, jobdoc1)
result = job1.get_schedule_args()
self.assertEqual(result, {'trigger': 'date',
'run_date': "2020-09-10T10:10:10"})
jobdoc1 = {"job_schedule":
{"schedule_start_date": "2020-10-10T10:10:10",
"schedule_end_date": "2020-11-10T10:10:10",
"schedule_interval": "continuous"}}
job1 = scheduler_job.Job(self.scheduler, None, jobdoc1)
result = job1.get_schedule_args()
self.assertEqual(result.get('seconds'), 1)
jobdoc1 = {"job_schedule":
{"schedule_start_date": "2020-10-10T10:10:10",
"schedule_end_date": "2020-11-10T10:10:10",
"schedule_interval": "5 days"}}
job1 = scheduler_job.Job(self.scheduler, None, jobdoc1)
result = job1.get_schedule_args()
self.assertEqual(result.get('days'), 5)
jobdoc1 = {"job_schedule": {}}
job1 = scheduler_job.Job(self.scheduler, None, jobdoc1)
result = job1.get_schedule_args()
self.assertEqual(result.get('trigger'), 'date')
def test_job_process_event(self):
jobdoc1 = {"job_id": "test", "job_schedule": {"event": "start",
"status": "start"}}
result = self.job.process_event(jobdoc1)
self.assertIsNone(result)
jobdoc1 = {"job_id": "test", "job_schedule": {"event": "stop",
"status": "start"}}
result = self.job.process_event(jobdoc1)
self.assertIsNone(result)
jobdoc1 = {"job_id": "test", "job_schedule": {"event": "abort",
"status": "start"}}
result = self.job.process_event(jobdoc1)
self.assertIsNone(result)
jobdoc1 = {"job_id": "test", "job_schedule": {"event": "aborted",
"status": "start"}}
result = self.job.process_event(jobdoc1)
self.assertIsNone(result)
def test_job_upload_metadata(self):
metatring = '{"test": "freezer"}'
self.job.upload_metadata(metatring)
self.assertTrue(self.scheduler.upload_metadata.called)
metatring = ''
result = self.job.upload_metadata(metatring)
self.assertIsNone(result)
def test_job_contains_exec(self):
jobdoc = {'job_actions': [{'freezer_action': {'action': 'exec'}}]}
job = scheduler_job.Job(None, None, jobdoc)
result = job.contains_exec()
self.assertTrue(result)
jobdoc = {'job_actions': [{'freezer_action': {'action': 'stop'}}]}
job = scheduler_job.Job(None, None, jobdoc)
result = job.contains_exec()
self.assertFalse(result)
def test_job_update_job_schedule_doc(self):
jobdoc = {'job_actions': [{'freezer_action': {'action': 'exec'}}]}
self.job.update_job_schedule_doc(**jobdoc)
self.assertEqual(self.job.job_doc['job_schedule']['job_actions'],
[{'freezer_action': {'action': 'exec'}}])
@mock.patch('subprocess.Popen')
def test_job_execute(self, mock_process):
CONF.disable_exec = True
scheduler = mock.MagicMock()
freezer_action = {"backup_name": "freezer",
'action': 'exec',
"remove_from_date": "2020-11-10T10:10:10"}
jobdoc = {'job_id': 'test', 'job_schedule': {},
'job_actions': [{'freezer_action': freezer_action,
'max_retries_interval': 1,
'max_retries': 1}]}
job = scheduler_job.Job(scheduler, None, jobdoc)
result = job.execute()
self.assertIsNone(result)
self.assertEqual(job.result, 'fail')
CONF.disable_exec = False
process = mock.MagicMock()
process.pid = 123
process.communicate.return_value = (b'test', 0)
process.returncode = -15
mock_process.return_value = process
result = job.execute()
self.assertIsNone(result)
self.assertEqual(job.result, 'aborted')
process.communicate.return_value = ('test', 'test')
process.returncode = 1
mock_process.return_value = process
result = job.execute()
self.assertIsNone(result)
self.assertEqual(job.result, 'fail')
def test_job_finish(self):
scheduler = mock.MagicMock()
freezer_action = {"backup_name": "freezer",
'action': 'exec',
"remove_from_date": "2020-11-10T10:10:10"}
jobdoc = {'job_id': 'test', 'job_schedule': {"event": "remove"},
'job_actions': [{'freezer_action': freezer_action,
'max_retries_interval': 1,
'max_retries': 1}]}
job = scheduler_job.Job(scheduler, None, jobdoc)
result = job.finish()
self.assertIsNone(result)
self.assertEqual(job.job_doc_status, 'removed')
scheduler.is_scheduled.return_value = False
jobdoc = {'job_id': 'test', 'job_schedule': {"event": "start"},
'job_actions': [{'freezer_action': freezer_action,
'max_retries_interval': 1,
'max_retries': 1}]}
job = scheduler_job.Job(scheduler, None, jobdoc)
result = job.finish()
self.assertIsNone(result)
self.assertEqual(job.job_doc_status, 'completed')
scheduler.is_scheduled.return_value = True
jobdoc = {'job_id': 'test', 'job_schedule': {"event": "stop"},
'job_actions': [{'freezer_action': freezer_action,
'max_retries_interval': 1,
'max_retries': 1}]}
job = scheduler_job.Job(scheduler, None, jobdoc)
result = job.finish()
self.assertIsNone(result)
self.assertEqual(job.job_doc_status, 'completed')
def test_job_start_session(self):
scheduler = mock.MagicMock()
scheduler.start_session.side_effect = [Exception('error'),
{'result': 'success',
'session_tag': 1024}]
job = scheduler_job.Job(scheduler, None, self.jobdoc)
job.session_id = 'test'
result = job.start_session()
self.assertIsNone(result)
self.assertEqual(job.session_tag, 1024)
scheduler.start_session.side_effect = [Exception('error'),
Exception('error'),
Exception('error'),
Exception('error'),
Exception('error')]
job = scheduler_job.Job(scheduler, None, self.jobdoc)
job.session_id = 'test'
result = job.start_session()
self.assertIsNone(result)
self.assertEqual(job.session_tag, 1024)
def test_job_end_session(self):
scheduler = mock.MagicMock()
scheduler.end_session.side_effect = [Exception('error'),
{'result': 'success'}]
job = scheduler_job.Job(scheduler, None, self.jobdoc)
job.session_id = 'test'
result = job.end_session('test')
self.assertIsNone(result)
self.assertEqual(job.session_tag, 0)
scheduler.end_session.side_effect = [Exception('error'),
Exception('error'),
Exception('error'),
Exception('error'),
Exception('error')]
job = scheduler_job.Job(scheduler, None, self.jobdoc)
job.session_id = 'test'
result = job.end_session('test')
self.assertIsNone(result)
self.assertEqual(job.session_tag, 0)
def test_job_schedule(self):
scheduler = mock.MagicMock()
scheduler.is_scheduled.return_value = False
scheduler.add_job.side_effect = Exception('error')
job = scheduler_job.Job(scheduler, None, self.jobdoc)
result = job.schedule()
self.assertIsNone(result)
self.assertEqual(job.job_doc_status, 'completed')
def test_job_unschedule(self):
scheduler = mock.MagicMock()
scheduler.remove_job.side_effect = Exception('error')
job = scheduler_job.Job(scheduler, None, self.jobdoc)
result = job.unschedule()
self.assertIsNone(result)
def test_job_terminate_kill(self):
process = mock.MagicMock()
self.job.process = process
self.assertIsNone(self.job.terminate())
self.assertIsNone(self.job.kill())
 End of changes. 3 change blocks. 
0 lines changed or deleted 5 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)