"Fossies" - the Fresh Open Source Software Archive

Member "cloudkitty-13.0.0/cloudkitty/orchestrator.py" (14 Oct 2020, 15122 Bytes) of package /linux/misc/openstack/cloudkitty-13.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "orchestrator.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 12.1.0_vs_13.0.0.

    1 # Copyright 2014 Objectif Libre
    2 #
    3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 #    not use this file except in compliance with the License. You may obtain
    5 #    a copy of the License at
    6 #
    7 #         http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 #    Unless required by applicable law or agreed to in writing, software
   10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 #    License for the specific language governing permissions and limitations
   13 #    under the License.
   14 #
   15 from datetime import timedelta
   16 import decimal
   17 import functools
   18 import hashlib
   19 import multiprocessing
   20 import random
   21 import sys
   22 import time
   23 
   24 import cotyledon
   25 import futurist
   26 from futurist import waiters
   27 from oslo_concurrency import lockutils
   28 from oslo_config import cfg
   29 from oslo_log import log as logging
   30 import oslo_messaging
   31 from oslo_utils import uuidutils
   32 from stevedore import driver
   33 from tooz import coordination
   34 
   35 from cloudkitty import collector
   36 from cloudkitty import config  # noqa
   37 from cloudkitty import dataframe
   38 from cloudkitty import extension_manager
   39 from cloudkitty import messaging
   40 from cloudkitty import storage
   41 from cloudkitty import storage_state as state
   42 from cloudkitty import utils as ck_utils
   43 from cloudkitty.utils import tz as tzutils
   44 
   45 
   46 LOG = logging.getLogger(__name__)
   47 
   48 CONF = cfg.CONF
   49 
   50 orchestrator_opts = [
   51     cfg.StrOpt(
   52         'coordination_url',
   53         secret=True,
   54         help='Coordination driver URL',
   55         default='file:///var/lib/cloudkitty/locks'),
   56     cfg.IntOpt(
   57         'max_workers',
   58         default=multiprocessing.cpu_count(),
   59         sample_default=4,
   60         min=1,
   61         help='Max nb of workers to run. Defaults to the nb of available CPUs'),
   62     cfg.IntOpt('max_threads',
   63                # NOTE(peschk_l): This is the futurist default
   64                default=multiprocessing.cpu_count() * 5,
   65                sample_default=20,
   66                min=1,
   67                deprecated_name='max_greenthreads',
   68                advanced=True,
   69                help='Maximal number of threads to use per worker. Defaults to '
   70                '5 times the nb of available CPUs'),
   71 ]
   72 
   73 CONF.register_opts(orchestrator_opts, group='orchestrator')
   74 
   75 CONF.import_opt('backend', 'cloudkitty.fetcher', 'fetcher')
   76 
   77 FETCHERS_NAMESPACE = 'cloudkitty.fetchers'
   78 PROCESSORS_NAMESPACE = 'cloudkitty.rating.processors'
   79 COLLECTORS_NAMESPACE = 'cloudkitty.collector.backends'
   80 STORAGES_NAMESPACE = 'cloudkitty.storage.backends'
   81 
   82 
   83 def get_lock(coord, tenant_id):
   84     name = hashlib.sha256(
   85         ("cloudkitty-"
   86          + str(tenant_id + '-')
   87          + str(CONF.collect.collector + '-')
   88          + str(CONF.fetcher.backend + '-')
   89          + str(CONF.collect.scope_key)).encode('ascii')).hexdigest()
   90     return name, coord.get_lock(name.encode('ascii'))
   91 
   92 
   93 class RatingEndpoint(object):
   94     target = oslo_messaging.Target(namespace='rating',
   95                                    version='1.0')
   96 
   97     def __init__(self, orchestrator):
   98         self._global_reload = False
   99         self._pending_reload = []
  100         self._module_state = {}
  101         self._orchestrator = orchestrator
  102 
  103     def get_reload_list(self):
  104         lock = lockutils.lock('module-reload')
  105         with lock:
  106             reload_list = self._pending_reload
  107             self._pending_reload = []
  108             return reload_list
  109 
  110     def get_module_state(self):
  111         lock = lockutils.lock('module-state')
  112         with lock:
  113             module_list = self._module_state
  114             self._module_state = {}
  115             return module_list
  116 
  117     def quote(self, ctxt, res_data):
  118         LOG.debug('Received quote from RPC.')
  119         worker = APIWorker()
  120         return str(worker.quote(res_data))
  121 
  122     def reload_modules(self, ctxt):
  123         LOG.info('Received reload modules command.')
  124         lock = lockutils.lock('module-reload')
  125         with lock:
  126             self._global_reload = True
  127 
  128     def reload_module(self, ctxt, name):
  129         LOG.info('Received reload command for module %s.', name)
  130         lock = lockutils.lock('module-reload')
  131         with lock:
  132             if name not in self._pending_reload:
  133                 self._pending_reload.append(name)
  134 
  135     def enable_module(self, ctxt, name):
  136         LOG.info('Received enable command for module %s.', name)
  137         lock = lockutils.lock('module-state')
  138         with lock:
  139             self._module_state[name] = True
  140 
  141     def disable_module(self, ctxt, name):
  142         LOG.info('Received disable command for module %s.', name)
  143         lock = lockutils.lock('module-state')
  144         with lock:
  145             self._module_state[name] = False
  146             if name in self._pending_reload:
  147                 self._pending_reload.remove(name)
  148 
  149 
  150 class ScopeEndpoint(object):
  151     target = oslo_messaging.Target(version='1.0')
  152 
  153     def __init__(self):
  154         self._coord = coordination.get_coordinator(
  155             CONF.orchestrator.coordination_url,
  156             uuidutils.generate_uuid().encode('ascii'))
  157         self._state = state.StateManager()
  158         self._storage = storage.get_storage()
  159         self._coord.start(start_heart=True)
  160 
  161     def reset_state(self, ctxt, res_data):
  162         LOG.info('Received state reset command. {}'.format(res_data))
  163         random.shuffle(res_data['scopes'])
  164         for scope in res_data['scopes']:
  165             lock_name, lock = get_lock(self._coord, scope['scope_id'])
  166             LOG.debug(
  167                 '[ScopeEndpoint] Trying to acquire lock "{}" ...'.format(
  168                     lock_name,
  169                 )
  170             )
  171             if lock.acquire(blocking=True):
  172                 LOG.debug(
  173                     '[ScopeEndpoint] Acquired lock "{}".'.format(
  174                         lock_name,
  175                     )
  176                 )
  177                 state_dt = tzutils.dt_from_iso(res_data['state'])
  178                 try:
  179                     self._storage.delete(begin=state_dt, end=None, filters={
  180                         scope['scope_key']: scope['scope_id'],
  181                     })
  182                     self._state.set_state(
  183                         scope['scope_id'],
  184                         state_dt,
  185                         fetcher=scope['fetcher'],
  186                         collector=scope['collector'],
  187                         scope_key=scope['scope_key'],
  188                     )
  189                 finally:
  190                     lock.release()
  191                     LOG.debug(
  192                         '[ScopeEndpoint] Released lock "{}" .'.format(
  193                             lock_name,
  194                         )
  195                     )
  196 
  197 
  198 class BaseWorker(object):
  199     def __init__(self, tenant_id=None):
  200         self._tenant_id = tenant_id
  201 
  202         # Rating processors
  203         self._processors = []
  204         self._load_rating_processors()
  205 
  206     def _load_rating_processors(self):
  207         self._processors = []
  208         processors = extension_manager.EnabledExtensionManager(
  209             PROCESSORS_NAMESPACE,
  210             invoke_kwds={'tenant_id': self._tenant_id})
  211 
  212         for processor in processors:
  213             self._processors.append(processor)
  214         self._processors.sort(key=lambda x: x.obj.priority, reverse=True)
  215 
  216 
  217 class APIWorker(BaseWorker):
  218     def __init__(self, tenant_id=None):
  219         super(APIWorker, self).__init__(tenant_id)
  220 
  221     def quote(self, res_data):
  222         for processor in self._processors:
  223             processor.obj.quote(res_data)
  224 
  225         price = decimal.Decimal(0)
  226         for res in res_data:
  227             for res_usage in res['usage'].values():
  228                 for data in res_usage:
  229                     price += data.get('rating', {}).get('price',
  230                                                         decimal.Decimal(0))
  231         return price
  232 
  233 
  234 def _check_state(obj, period, tenant_id):
  235     timestamp = obj._state.get_state(tenant_id)
  236     return ck_utils.check_time_state(timestamp,
  237                                      period,
  238                                      CONF.collect.wait_periods)
  239 
  240 
  241 class Worker(BaseWorker):
  242     def __init__(self, collector, storage, tenant_id, worker_id):
  243         self._collector = collector
  244         self._storage = storage
  245         self._period = CONF.collect.period
  246         self._wait_time = CONF.collect.wait_periods * self._period
  247         self._tenant_id = tenant_id
  248         self._worker_id = worker_id
  249         self._log_prefix = '[scope: {scope}, worker: {worker}] '.format(
  250             scope=self._tenant_id, worker=self._worker_id)
  251         self._conf = ck_utils.load_conf(CONF.collect.metrics_conf)
  252         self._state = state.StateManager()
  253         self._check_state = functools.partial(
  254             _check_state, self, self._period, self._tenant_id)
  255 
  256         super(Worker, self).__init__(self._tenant_id)
  257 
  258     def _collect(self, metric, start_timestamp):
  259         next_timestamp = tzutils.add_delta(
  260             start_timestamp, timedelta(seconds=self._period))
  261 
  262         name, data = self._collector.retrieve(
  263             metric,
  264             start_timestamp,
  265             next_timestamp,
  266             self._tenant_id,
  267         )
  268         if not data:
  269             raise collector.NoDataCollected
  270 
  271         return name, data
  272 
  273     def _do_collection(self, metrics, timestamp):
  274 
  275         def _get_result(metric):
  276             try:
  277                 return self._collect(metric, timestamp)
  278             except collector.NoDataCollected:
  279                 LOG.info(
  280                     self._log_prefix + 'No data collected '
  281                     'for metric {metric} at timestamp {ts}'.format(
  282                         metric=metric, ts=timestamp))
  283                 return metric, None
  284             except Exception as e:
  285                 LOG.exception(
  286                     self._log_prefix + 'Error while collecting'
  287                     ' metric {metric} at timestamp {ts}: {e}. Exiting.'.format(
  288                         metric=metric, ts=timestamp, e=e))
  289                 # FIXME(peschk_l): here we just exit, and the
  290                 # collection will be retried during the next collect
  291                 # cycle. In the future, we should implement a retrying
  292                 # system in workers
  293                 sys.exit(1)
  294 
  295         with futurist.ThreadPoolExecutor(
  296                 max_workers=CONF.orchestrator.max_threads) as tpool:
  297             futs = [tpool.submit(_get_result, metric) for metric in metrics]
  298             LOG.debug(self._log_prefix +
  299                       'Collecting {} metrics.'.format(len(metrics)))
  300             results = [r.result() for r in waiters.wait_for_all(futs).done]
  301             LOG.debug(self._log_prefix + 'Collecting {} metrics took {}s '
  302                       'total, with {}s average'.format(
  303                           tpool.statistics.executed,
  304                           tpool.statistics.runtime,
  305                           tpool.statistics.average_runtime))
  306         return dict(filter(lambda x: x[1] is not None, results))
  307 
  308     def run(self):
  309         while True:
  310             timestamp = self._check_state()
  311             if not timestamp:
  312                 break
  313 
  314             metrics = list(self._conf['metrics'].keys())
  315 
  316             # Collection
  317             usage_data = self._do_collection(metrics, timestamp)
  318 
  319             frame = dataframe.DataFrame(
  320                 start=timestamp,
  321                 end=tzutils.add_delta(timestamp,
  322                                       timedelta(seconds=self._period)),
  323                 usage=usage_data,
  324             )
  325             # Rating
  326             for processor in self._processors:
  327                 frame = processor.obj.process(frame)
  328 
  329             # Writing
  330             self._storage.push([frame], self._tenant_id)
  331             self._state.set_state(self._tenant_id, timestamp)
  332 
  333 
  334 class Orchestrator(cotyledon.Service):
  335     def __init__(self, worker_id):
  336         self._worker_id = worker_id
  337         super(Orchestrator, self).__init__(self._worker_id)
  338 
  339         self.fetcher = driver.DriverManager(
  340             FETCHERS_NAMESPACE,
  341             CONF.fetcher.backend,
  342             invoke_on_load=True,
  343         ).driver
  344 
  345         self.collector = collector.get_collector()
  346         self.storage = storage.get_storage()
  347         self._state = state.StateManager()
  348 
  349         # RPC
  350         self.server = None
  351         self._rating_endpoint = RatingEndpoint(self)
  352         self._scope_endpoint = ScopeEndpoint()
  353         self._init_messaging()
  354 
  355         # DLM
  356         self.coord = coordination.get_coordinator(
  357             CONF.orchestrator.coordination_url,
  358             uuidutils.generate_uuid().encode('ascii'))
  359         self.coord.start(start_heart=True)
  360         self._check_state = functools.partial(
  361             _check_state, self, CONF.collect.period)
  362 
  363     def _init_messaging(self):
  364         target = oslo_messaging.Target(topic='cloudkitty',
  365                                        server=CONF.host,
  366                                        version='1.0')
  367         endpoints = [
  368             self._rating_endpoint,
  369             self._scope_endpoint,
  370         ]
  371         self.server = messaging.get_server(target, endpoints)
  372         self.server.start()
  373 
  374     def process_messages(self):
  375         # TODO(sheeprine): Code kept to handle threading and asynchronous
  376         # reloading
  377         # pending_reload = self._rating_endpoint.get_reload_list()
  378         # pending_states = self._rating_endpoint.get_module_state()
  379         pass
  380 
  381     def run(self):
  382         LOG.debug('Started worker {}.'.format(self._worker_id))
  383         while True:
  384             self.tenants = self.fetcher.get_tenants()
  385             random.shuffle(self.tenants)
  386             LOG.info('[Worker: {w}] Tenants loaded for fetcher {f}'.format(
  387                 w=self._worker_id, f=self.fetcher.name))
  388 
  389             for tenant_id in self.tenants:
  390 
  391                 lock_name, lock = get_lock(self.coord, tenant_id)
  392                 LOG.debug(
  393                     '[Worker: {w}] Trying to acquire lock "{lck}" ...'.format(
  394                         w=self._worker_id, lck=lock_name)
  395                 )
  396                 if lock.acquire(blocking=False):
  397                     LOG.debug(
  398                         '[Worker: {w}] Acquired lock "{lck}" ...'.format(
  399                             w=self._worker_id, lck=lock_name)
  400                     )
  401                     state = self._check_state(tenant_id)
  402                     if state:
  403                         worker = Worker(
  404                             self.collector,
  405                             self.storage,
  406                             tenant_id,
  407                             self._worker_id,
  408                         )
  409                         worker.run()
  410 
  411                     lock.release()
  412 
  413             # FIXME(sheeprine): We may cause a drift here
  414             time.sleep(CONF.collect.period)
  415 
  416     def terminate(self):
  417         LOG.debug('Terminating worker {}...'.format(self._worker_id))
  418         self.coord.stop()
  419         LOG.debug('Terminated worker {}.'.format(self._worker_id))
  420 
  421 
  422 class OrchestratorServiceManager(cotyledon.ServiceManager):
  423 
  424     def __init__(self):
  425         super(OrchestratorServiceManager, self).__init__()
  426         self.service_id = self.add(Orchestrator,
  427                                    workers=CONF.orchestrator.max_workers)