"Fossies" - the Fresh Open Source Software Archive

Member "freezer-10.0.0/freezer/scheduler/freezer_scheduler.py" (14 Apr 2021, 9605 Bytes) of package /linux/misc/openstack/freezer-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "freezer_scheduler.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.0_vs_10.0.0.

    1 #!/usr/bin/env python
    2 #
    3 # Copyright 2015 Hewlett-Packard
    4 #
    5 # Licensed under the Apache License, Version 2.0 (the "License");
    6 # you may not use this file except in compliance with the License.
    7 # You may obtain a copy of the License at
    8 #
    9 #     http://www.apache.org/licenses/LICENSE-2.0
   10 #
   11 # Unless required by applicable law or agreed to in writing, software
   12 # distributed under the License is distributed on an "AS IS" BASIS,
   13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14 # See the License for the specific language governing permissions and
   15 # limitations under the License.
   16 
   17 
   18 from distutils import spawn
   19 import sys
   20 import threading
   21 import time
   22 
   23 from apscheduler.schedulers import background
   24 from freezerclient import utils as client_utils
   25 from oslo_config import cfg
   26 from oslo_log import log
   27 
   28 from freezer.scheduler import arguments
   29 from freezer.scheduler import scheduler_job
   30 from freezer.scheduler import utils
   31 from freezer.utils import utils as freezer_utils
   32 from freezer.utils import winutils
   33 
   34 if winutils.is_windows():
   35     from freezer.scheduler import win_daemon
   36 else:
   37     from freezer.scheduler import daemon as linux_daemon
   38 
   39 CONF = cfg.CONF
   40 LOG = log.getLogger(__name__)
   41 
   42 
   43 class FreezerScheduler(object):
   44     def __init__(self, apiclient, interval, job_path, concurrent_jobs=1):
   45         # config_manager
   46         self.client = apiclient
   47         self.freezerc_executable = spawn.find_executable('freezer-agent')
   48         if self.freezerc_executable is None:
   49             # Needed in the case of a non-activated virtualenv
   50             self.freezerc_executable = spawn.find_executable(
   51                 'freezer-agent', path=':'.join(sys.path))
   52         LOG.debug('Freezer-agent found at {0}'
   53                   .format(self.freezerc_executable))
   54         self.job_path = job_path
   55         self._client = None
   56         self.lock = threading.Lock()
   57         job_defaults = {
   58             'coalesce': True,
   59             'max_instances': 1
   60         }
   61         executors = {
   62             'default': {'type': 'threadpool', 'max_workers': 1},
   63             'threadpool': {'type': 'threadpool',
   64                            'max_workers': concurrent_jobs}
   65         }
   66         self.scheduler = background.BackgroundScheduler(
   67             job_defaults=job_defaults,
   68             executors=executors)
   69 
   70         if self.client:
   71             self.scheduler.add_job(self.poll, 'interval',
   72                                    seconds=interval, id='api_poll',
   73                                    executor='default')
   74 
   75         self.add_job = self.scheduler.add_job
   76         self.remove_job = self.scheduler.remove_job
   77         self.jobs = {}
   78 
   79     def get_jobs(self):
   80         if self.client:
   81             job_doc_list = utils.get_active_jobs_from_api(self.client)
   82             try:
   83                 utils.save_jobs_to_disk(job_doc_list, self.job_path)
   84             except Exception as e:
   85                 LOG.error('Unable to save jobs to {0}. '
   86                           '{1}'.format(self.job_path, e))
   87             return job_doc_list
   88         else:
   89             return utils.get_jobs_from_disk(self.job_path)
   90 
   91     def start_session(self, session_id, job_id, session_tag):
   92         if self.client:
   93             return self.client.sessions.start_session(session_id,
   94                                                       job_id,
   95                                                       session_tag)
   96         else:
   97             raise Exception("Unable to start session: api not in use.")
   98 
   99     def end_session(self, session_id, job_id, session_tag, result):
  100         if self.client:
  101             return self.client.sessions.end_session(session_id,
  102                                                     job_id,
  103                                                     session_tag,
  104                                                     result)
  105         else:
  106             raise Exception("Unable to end session: api not in use.")
  107 
  108     def upload_metadata(self, metadata_doc):
  109         if self.client:
  110             self.client.backups.create(metadata_doc)
  111 
  112     def start(self):
  113         utils.do_register(self.client)
  114         self.poll()
  115         self.scheduler.start()
  116         try:
  117             while True:
  118                 # Due to the new Background scheduler nature, we need to keep
  119                 # the main thread alive.
  120                 time.sleep(1)
  121         except (KeyboardInterrupt, SystemExit):
  122             # Not strictly necessary if daemonic mode is enabled but
  123             # should be done if possible
  124             self.scheduler.shutdown(wait=False)
  125 
  126     def update_job(self, job_id, job_doc):
  127         if self.client:
  128             try:
  129                 return self.client.jobs.update(job_id, job_doc)
  130             except Exception as e:
  131                 LOG.error("Job update error: {0}".format(e))
  132 
  133     def update_job_schedule(self, job_id, job_schedule):
  134         """
  135         Pushes to the API the updates the job_schedule information
  136         of the job_doc
  137 
  138         :param job_id: id of the job to modify
  139         :param job_schedule: dict containing the job_scheduler information
  140         :return: None
  141         """
  142         doc = {'job_schedule': job_schedule}
  143         self.update_job(job_id, doc)
  144 
  145     def update_job_status(self, job_id, status):
  146         doc = {'job_schedule': {'status': status}}
  147         self.update_job(job_id, doc)
  148 
  149     def is_scheduled(self, job_id):
  150         return self.scheduler.get_job(job_id) is not None
  151 
  152     def create_job(self, job_doc):
  153         job = scheduler_job.Job.create(self, self.freezerc_executable, job_doc)
  154         self.jobs[job.id] = job
  155         LOG.info("Created job {0}".format(job.id))
  156         return job
  157 
  158     def poll(self):
  159         try:
  160             work_job_doc_list = self.get_jobs()
  161         except Exception as e:
  162             LOG.error("Unable to get jobs: {0}".format(e))
  163             return
  164 
  165         work_job_id_list = []
  166 
  167         # create job if necessary, then let it process its events
  168         for job_doc in work_job_doc_list:
  169             job_id = job_doc['job_id']
  170             work_job_id_list.append(job_id)
  171             job = self.jobs.get(job_id, None) or self.create_job(job_doc)
  172 
  173             # check for abort status
  174             if job_doc['job_schedule']['event'] == 'abort':
  175                 pid = int(job_doc['job_schedule']['current_pid'])
  176                 utils.terminate_subprocess(pid, 'freezer-agent')
  177 
  178             job.process_event(job_doc)
  179 
  180         # request removal of any job that has been removed in the api
  181         for job_id, job in self.jobs.items():
  182             if job_id not in work_job_id_list:
  183                 job.remove()
  184 
  185         remove_list = [job_id for job_id, job in self.jobs.items()
  186                        if job.can_be_removed()]
  187 
  188         for k in remove_list:
  189             self.jobs.pop(k)
  190 
  191     def stop(self):
  192         sys.exit()
  193 
  194     def reload(self):
  195         LOG.warning("reload not supported")
  196 
  197 
  198 def main():
  199     possible_actions = ['start', 'stop', 'restart', 'status', 'reload']
  200 
  201     arguments.parse_args(possible_actions)
  202     arguments.setup_logging()
  203 
  204     if CONF.action not in possible_actions:
  205         CONF.print_help()
  206         return 65  # os.EX_DATAERR
  207 
  208     apiclient = None
  209     if CONF.no_api is False:
  210         try:
  211             opts = client_utils.Namespace({})
  212             opts.opts = CONF
  213             if CONF.enable_v1_api:
  214                 apiclient = client_utils.get_client_instance(opts=opts,
  215                                                              api_version='1')
  216             else:
  217                 apiclient = client_utils.get_client_instance(opts=opts,
  218                                                              api_version='2')
  219 
  220             if CONF.client_id:
  221                 apiclient.client_id = CONF.client_id
  222         except Exception as e:
  223             LOG.error(e)
  224             print(e)
  225             sys.exit(1)
  226     else:
  227         if winutils.is_windows():
  228             print("--no-api mode is not available on windows")
  229             return 69  # os.EX_UNAVAILABLE
  230 
  231     freezer_utils.create_dir(CONF.jobs_dir, do_log=False)
  232     freezer_scheduler = FreezerScheduler(apiclient=apiclient,
  233                                          interval=int(CONF.interval),
  234                                          job_path=CONF.jobs_dir,
  235                                          concurrent_jobs=CONF.concurrent_jobs)
  236 
  237     if CONF.no_daemon:
  238         print('Freezer Scheduler running in no-daemon mode')
  239         LOG.debug('Freezer Scheduler running in no-daemon mode')
  240         if winutils.is_windows():
  241             daemon = win_daemon.NoDaemon(daemonizable=freezer_scheduler)
  242         else:
  243             daemon = linux_daemon.NoDaemon(daemonizable=freezer_scheduler)
  244     else:
  245         if winutils.is_windows():
  246             daemon = win_daemon.Daemon(daemonizable=freezer_scheduler,
  247                                        interval=int(CONF.interval),
  248                                        job_path=CONF.jobs_dir,
  249                                        insecure=CONF.insecure,
  250                                        concurrent_jobs=CONF.concurrent_jobs)
  251         else:
  252             daemon = linux_daemon.Daemon(daemonizable=freezer_scheduler)
  253 
  254     if CONF.action == 'start':
  255         daemon.start()
  256     elif CONF.action == 'stop':
  257         daemon.stop()
  258     elif CONF.action == 'restart':
  259         daemon.restart()
  260     elif CONF.action == 'reload':
  261         daemon.reload()
  262     elif CONF.action == 'status':
  263         daemon.status()
  264 
  265     # os.RETURN_CODES are only available to posix like systems, on windows
  266     # we need to translate the code to an actual number which is the equivalent
  267     return 0  # os.EX_OK
  268 
  269 
  270 if __name__ == '__main__':
  271     sys.exit(main())