"Fossies" - the Fresh Open Source Software Archive

Member "freezer-10.0.0/freezer/tests/unit/scheduler/test_scheduler_job.py" (14 Apr 2021, 15418 Bytes) of package /linux/misc/openstack/freezer-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_scheduler_job.py": 9.0.0_vs_10.0.0.

    1 # (c) Copyright 2014,2015 Hewlett-Packard Development Company, L.P.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   12 # See the License for the specific language governing permissions and
   13 # limitations under the License.
   14 
   15 import os
   16 import shutil
   17 import tempfile
   18 import unittest
   19 
   20 from freezer.scheduler import scheduler_job
   21 from oslo_config import cfg
   22 from unittest import mock
   23 
   24 CONF = cfg.CONF
   25 
   26 
   27 action = {"action": "backup", "storage": "local",
   28           "mode": "fs", "backup_name": "test",
   29           "container": "/tmp/backuped",
   30           "path_to_backup": "/tmp/to_backup"}
   31 
   32 
   33 class TestSchedulerJob(unittest.TestCase):
   34     def setUp(self):
   35         self.job = scheduler_job.Job(None, None, {"job_schedule": {}})
   36 
   37     def test(self):
   38         scheduler_job.RunningState.stop(self.job, {})
   39 
   40     def test_save_action_to_disk(self):
   41         with tempfile.NamedTemporaryFile(mode='w',
   42                                          delete=False) as config_file:
   43             self.job.save_action_to_file(action, config_file)
   44             self.assertTrue(os.path.exists(config_file.name))
   45 
   46     def test_save_action_with_none_value_to_disk(self):
   47         action.update({"log_file": None})
   48         with tempfile.NamedTemporaryFile(mode='w',
   49                                          delete=False) as config_file:
   50             self.job.save_action_to_file(action, config_file)
   51             self.assertTrue(os.path.exists(config_file.name))
   52 
   53     def test_save_action_with_bool_value_to_disk(self):
   54         action.update({"no_incremental": False})
   55         with tempfile.NamedTemporaryFile(mode='w',
   56                                          delete=False) as config_file:
   57             self.job.save_action_to_file(action, config_file)
   58             self.assertTrue(os.path.exists(config_file.name))
   59 
   60 
   61 class TestSchedulerJob1(unittest.TestCase):
   62     def setUp(self):
   63         self.scheduler = mock.MagicMock()
   64         self.job_schedule = {"event": "start", "status": "start",
   65                              "schedule_day": "1"}
   66         self.jobdoc = {"job_id": "test", "job_schedule": self.job_schedule}
   67         self.job = scheduler_job.Job(self.scheduler, None, self.jobdoc)
   68 
   69     def test_stopstate_stop(self):
   70         result = scheduler_job.StopState.stop(self.job, self.jobdoc)
   71         self.assertEqual(result, '')
   72 
   73     def test_stopstate_abort(self):
   74         result = scheduler_job.StopState.abort(self.job, self.jobdoc)
   75         self.assertEqual(result, '')
   76 
   77     def test_stopstate_start(self):
   78         result = scheduler_job.StopState.start(self.job, self.jobdoc)
   79         self.assertEqual(result, '')
   80 
   81     def test_stopstate_remove(self):
   82         result = scheduler_job.StopState.remove(self.job)
   83         self.assertEqual(result, '')
   84 
   85     def test_scheduledstate_stop(self):
   86         result = scheduler_job.ScheduledState.stop(self.job, self.jobdoc)
   87         self.assertEqual(result, 'stop')
   88 
   89     def test_scheduledstate_abort(self):
   90         result = scheduler_job.ScheduledState.abort(self.job, self.jobdoc)
   91         self.assertEqual(result, '')
   92 
   93     def test_scheduledstate_start(self):
   94         result = scheduler_job.ScheduledState.start(self.job, self.jobdoc)
   95         self.assertEqual(result, '')
   96 
   97     def test_scheduledstate_remove(self):
   98         result = scheduler_job.ScheduledState.remove(self.job)
   99         self.assertEqual(result, '')
  100 
  101     def test_runningstate_stop(self):
  102         result = scheduler_job.RunningState.stop(self.job, {})
  103         self.assertEqual(result, '')
  104 
  105     def test_runningstate_abort(self):
  106         result = scheduler_job.RunningState.abort(self.job, self.jobdoc)
  107         self.assertEqual(result, 'aborted')
  108 
  109     def test_runningstate_start(self):
  110         result = scheduler_job.RunningState.start(self.job, self.jobdoc)
  111         self.assertEqual(result, '')
  112 
  113     def test_runningstate_remove(self):
  114         result = scheduler_job.RunningState.remove(self.job)
  115         self.assertEqual(result, '')
  116 
  117     def test_job_create(self):
  118         jobdoc = {"job_id": "test", "job_schedule": {"status": "running"}}
  119         result = scheduler_job.Job.create(None, None, jobdoc)
  120         self.assertEqual(result.job_doc_status, 'running')
  121         jobdoc = {"job_id": "test", "job_schedule": {"status": "stop"}}
  122         result = scheduler_job.Job.create(None, None, jobdoc)
  123         self.assertEqual(result.event, 'stop')
  124         jobdoc = {"job_id": "test", "job_schedule": {}}
  125         result = scheduler_job.Job.create(None, None, jobdoc)
  126         self.assertEqual(result.event, 'start')
  127 
  128     def test_job_remove(self):
  129         result = self.job.remove()
  130         self.assertIsNone(result)
  131 
  132     def test_job_session_id(self):
  133         self.assertEqual(self.job.session_id, '')
  134         self.job.session_id = 'test'
  135         self.assertEqual(self.job.session_id, 'test')
  136 
  137     def test_job_session_tag(self):
  138         self.assertEqual(self.job.session_tag, 0)
  139         self.job.session_tag = 1
  140         self.assertEqual(self.job.session_tag, 1)
  141 
  142     def test_job_result(self):
  143         self.assertEqual(self.job.result, '')
  144         self.job.result = 'test'
  145         self.assertEqual(self.job.result, 'test')
  146 
  147     def test_job_can_be_removed(self):
  148         result = self.job.can_be_removed()
  149         self.assertFalse(result)
  150 
  151     def test_save_action_to_file(self):
  152         action = {'start': "test"}
  153         temp = tempfile.mkdtemp()
  154         filename = '/'.join([temp, "test.conf"])
  155         f = mock.MagicMock()
  156         f.name = filename
  157         result = self.job.save_action_to_file(action, f)
  158         self.assertIsNone(result)
  159         shutil.rmtree(temp)
  160 
  161     def test_job_schedule_end_date(self):
  162         self.assertEqual(self.job.schedule_end_date, '')
  163 
  164     def test_job_schedule_cron_fields(self):
  165         result = self.job.schedule_cron_fields
  166         self.assertEqual(result, {"day": "1"})
  167 
  168     def test_get_schedule_args(self):
  169         jobdoc1 = {"job_schedule":
  170                    {"schedule_start_date": "2020-01-10T10:10:10",
  171                     "schedule_end_date": "2020-11-10T10:10:10",
  172                     "schedule_date": "2020-09-10T10:10:10"}}
  173         job1 = scheduler_job.Job(self.scheduler, None, jobdoc1)
  174         result = job1.get_schedule_args()
  175         self.assertEqual(result, {'trigger': 'date',
  176                                   'run_date': "2020-09-10T10:10:10"})
  177         jobdoc1 = {"job_schedule":
  178                    {"schedule_start_date": "2020-10-10T10:10:10",
  179                     "schedule_end_date": "2020-11-10T10:10:10",
  180                     "schedule_interval": "continuous"}}
  181         job1 = scheduler_job.Job(self.scheduler, None, jobdoc1)
  182         result = job1.get_schedule_args()
  183         self.assertEqual(result.get('seconds'), 1)
  184         jobdoc1 = {"job_schedule":
  185                    {"schedule_start_date": "2020-10-10T10:10:10",
  186                     "schedule_end_date": "2020-11-10T10:10:10",
  187                     "schedule_interval": "5 days"}}
  188         job1 = scheduler_job.Job(self.scheduler, None, jobdoc1)
  189         result = job1.get_schedule_args()
  190         self.assertEqual(result.get('days'), 5)
  191         jobdoc1 = {"job_schedule": {}}
  192         job1 = scheduler_job.Job(self.scheduler, None, jobdoc1)
  193         result = job1.get_schedule_args()
  194         self.assertEqual(result.get('trigger'), 'date')
  195 
  196     def test_job_process_event(self):
  197         jobdoc1 = {"job_id": "test", "job_schedule": {"event": "start",
  198                                                       "status": "start"}}
  199         result = self.job.process_event(jobdoc1)
  200         self.assertIsNone(result)
  201         jobdoc1 = {"job_id": "test", "job_schedule": {"event": "stop",
  202                                                       "status": "start"}}
  203         result = self.job.process_event(jobdoc1)
  204         self.assertIsNone(result)
  205         jobdoc1 = {"job_id": "test", "job_schedule": {"event": "abort",
  206                                                       "status": "start"}}
  207         result = self.job.process_event(jobdoc1)
  208         self.assertIsNone(result)
  209         jobdoc1 = {"job_id": "test", "job_schedule": {"event": "aborted",
  210                                                       "status": "start"}}
  211         result = self.job.process_event(jobdoc1)
  212         self.assertIsNone(result)
  213 
  214     def test_job_upload_metadata(self):
  215         metatring = '{"test": "freezer"}'
  216         self.job.upload_metadata(metatring)
  217         self.assertTrue(self.scheduler.upload_metadata.called)
  218         metatring = ''
  219         result = self.job.upload_metadata(metatring)
  220         self.assertIsNone(result)
  221 
  222     def test_job_contains_exec(self):
  223         jobdoc = {'job_actions': [{'freezer_action': {'action': 'exec'}}]}
  224         job = scheduler_job.Job(None, None, jobdoc)
  225         result = job.contains_exec()
  226         self.assertTrue(result)
  227         jobdoc = {'job_actions': [{'freezer_action': {'action': 'stop'}}]}
  228         job = scheduler_job.Job(None, None, jobdoc)
  229         result = job.contains_exec()
  230         self.assertFalse(result)
  231 
  232     def test_job_update_job_schedule_doc(self):
  233         jobdoc = {'job_actions': [{'freezer_action': {'action': 'exec'}}]}
  234         self.job.update_job_schedule_doc(**jobdoc)
  235         self.assertEqual(self.job.job_doc['job_schedule']['job_actions'],
  236                          [{'freezer_action': {'action': 'exec'}}])
  237 
  238     @mock.patch('subprocess.Popen')
  239     def test_job_execute(self, mock_process):
  240         CONF.disable_exec = True
  241         scheduler = mock.MagicMock()
  242         freezer_action = {"backup_name": "freezer",
  243                           'action': 'exec',
  244                           "remove_from_date": "2020-11-10T10:10:10"}
  245         jobdoc = {'job_id': 'test', 'job_schedule': {},
  246                   'job_actions': [{'freezer_action': freezer_action,
  247                                    'max_retries_interval': 1,
  248                                    'max_retries': 1}]}
  249         job = scheduler_job.Job(scheduler, None, jobdoc)
  250         result = job.execute()
  251         self.assertIsNone(result)
  252         self.assertEqual(job.result, 'fail')
  253 
  254         CONF.disable_exec = False
  255         process = mock.MagicMock()
  256         process.pid = 123
  257         process.communicate.return_value = (b'test', 0)
  258         process.returncode = -15
  259         mock_process.return_value = process
  260         result = job.execute()
  261         self.assertIsNone(result)
  262         self.assertEqual(job.result, 'aborted')
  263 
  264         process.communicate.return_value = ('test', 'test')
  265         process.returncode = 1
  266         mock_process.return_value = process
  267         result = job.execute()
  268         self.assertIsNone(result)
  269         self.assertEqual(job.result, 'fail')
  270 
  271     def test_job_finish(self):
  272         scheduler = mock.MagicMock()
  273         freezer_action = {"backup_name": "freezer",
  274                           'action': 'exec',
  275                           "remove_from_date": "2020-11-10T10:10:10"}
  276         jobdoc = {'job_id': 'test', 'job_schedule': {"event": "remove"},
  277                   'job_actions': [{'freezer_action': freezer_action,
  278                                    'max_retries_interval': 1,
  279                                    'max_retries': 1}]}
  280         job = scheduler_job.Job(scheduler, None, jobdoc)
  281         result = job.finish()
  282         self.assertIsNone(result)
  283         self.assertEqual(job.job_doc_status, 'removed')
  284 
  285         scheduler.is_scheduled.return_value = False
  286         jobdoc = {'job_id': 'test', 'job_schedule': {"event": "start"},
  287                   'job_actions': [{'freezer_action': freezer_action,
  288                                    'max_retries_interval': 1,
  289                                    'max_retries': 1}]}
  290         job = scheduler_job.Job(scheduler, None, jobdoc)
  291         result = job.finish()
  292         self.assertIsNone(result)
  293         self.assertEqual(job.job_doc_status, 'completed')
  294 
  295         scheduler.is_scheduled.return_value = True
  296         jobdoc = {'job_id': 'test', 'job_schedule': {"event": "stop"},
  297                   'job_actions': [{'freezer_action': freezer_action,
  298                                    'max_retries_interval': 1,
  299                                    'max_retries': 1}]}
  300         job = scheduler_job.Job(scheduler, None, jobdoc)
  301         result = job.finish()
  302         self.assertIsNone(result)
  303         self.assertEqual(job.job_doc_status, 'completed')
  304 
  305     def test_job_start_session(self):
  306         scheduler = mock.MagicMock()
  307         scheduler.start_session.side_effect = [Exception('error'),
  308                                                {'result': 'success',
  309                                                 'session_tag': 1024}]
  310         job = scheduler_job.Job(scheduler, None, self.jobdoc)
  311         job.session_id = 'test'
  312         result = job.start_session()
  313         self.assertIsNone(result)
  314         self.assertEqual(job.session_tag, 1024)
  315 
  316         scheduler.start_session.side_effect = [Exception('error'),
  317                                                Exception('error'),
  318                                                Exception('error'),
  319                                                Exception('error'),
  320                                                Exception('error')]
  321         job = scheduler_job.Job(scheduler, None, self.jobdoc)
  322         job.session_id = 'test'
  323         result = job.start_session()
  324         self.assertIsNone(result)
  325         self.assertEqual(job.session_tag, 1024)
  326 
  327     def test_job_end_session(self):
  328         scheduler = mock.MagicMock()
  329         scheduler.end_session.side_effect = [Exception('error'),
  330                                              {'result': 'success'}]
  331         job = scheduler_job.Job(scheduler, None, self.jobdoc)
  332         job.session_id = 'test'
  333         result = job.end_session('test')
  334         self.assertIsNone(result)
  335         self.assertEqual(job.session_tag, 0)
  336 
  337         scheduler.end_session.side_effect = [Exception('error'),
  338                                              Exception('error'),
  339                                              Exception('error'),
  340                                              Exception('error'),
  341                                              Exception('error')]
  342         job = scheduler_job.Job(scheduler, None, self.jobdoc)
  343         job.session_id = 'test'
  344         result = job.end_session('test')
  345         self.assertIsNone(result)
  346         self.assertEqual(job.session_tag, 0)
  347 
  348     def test_job_schedule(self):
  349         scheduler = mock.MagicMock()
  350         scheduler.is_scheduled.return_value = False
  351         scheduler.add_job.side_effect = Exception('error')
  352         job = scheduler_job.Job(scheduler, None, self.jobdoc)
  353         result = job.schedule()
  354         self.assertIsNone(result)
  355         self.assertEqual(job.job_doc_status, 'completed')
  356 
  357     def test_job_unschedule(self):
  358         scheduler = mock.MagicMock()
  359         scheduler.remove_job.side_effect = Exception('error')
  360         job = scheduler_job.Job(scheduler, None, self.jobdoc)
  361         result = job.unschedule()
  362         self.assertIsNone(result)
  363 
  364     def test_job_terminate_kill(self):
  365         process = mock.MagicMock()
  366         self.job.process = process
  367         self.assertIsNone(self.job.terminate())
  368         self.assertIsNone(self.job.kill())