"Fossies" - the Fresh Open Source Software Archive

Member "cloudkitty-13.0.0/cloudkitty/collector/prometheus.py" (14 Oct 2020, 7206 Bytes) of package /linux/misc/openstack/cloudkitty-13.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "prometheus.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 12.1.0_vs_13.0.0.

    1 # Copyright 2018 Objectif Libre
    2 #
    3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 #    not use this file except in compliance with the License. You may obtain
    5 #    a copy of the License at
    6 #
    7 #         http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 #    Unless required by applicable law or agreed to in writing, software
   10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 #    License for the specific language governing permissions and limitations
   13 #    under the License.
   14 #
   15 from decimal import Decimal
   16 from decimal import localcontext
   17 from decimal import ROUND_HALF_UP
   18 
   19 from oslo_config import cfg
   20 from oslo_log import log
   21 from voluptuous import In
   22 from voluptuous import Optional
   23 from voluptuous import Required
   24 from voluptuous import Schema
   25 
   26 from cloudkitty import collector
   27 from cloudkitty.collector.exceptions import CollectError
   28 from cloudkitty.common.prometheus_client import PrometheusClient
   29 from cloudkitty.common.prometheus_client import PrometheusResponseError
   30 from cloudkitty import dataframe
   31 from cloudkitty import utils as ck_utils
   32 from cloudkitty.utils import tz as tzutils
   33 
   34 
   35 LOG = log.getLogger(__name__)
   36 
   37 PROMETHEUS_COLLECTOR_OPTS = 'collector_prometheus'
   38 collector_prometheus_opts = [
   39     cfg.StrOpt(
   40         'prometheus_url',
   41         default='',
   42         help='Prometheus service URL',
   43     ),
   44     cfg.StrOpt(
   45         'prometheus_user',
   46         help='Prometheus user (for basic auth only)',
   47     ),
   48     cfg.StrOpt(
   49         'prometheus_password',
   50         help='Prometheus user password (for basic auth only)',
   51         secret=True,
   52     ),
   53     cfg.StrOpt(
   54         'cafile',
   55         help='Custom certificate authority file path',
   56     ),
   57     cfg.BoolOpt(
   58         'insecure',
   59         default=False,
   60         help='Explicitly trust untrusted HTTPS responses',
   61     ),
   62 ]
   63 cfg.CONF.register_opts(collector_prometheus_opts, PROMETHEUS_COLLECTOR_OPTS)
   64 
   65 CONF = cfg.CONF
   66 
   67 PROMETHEUS_EXTRA_SCHEMA = {
   68     Required('extra_args', default={}): {
   69         Required('aggregation_method', default='max'):
   70             In([
   71                 'avg', 'count', 'max',
   72                 'min', 'stddev', 'stdvar',
   73                 'sum'
   74             ]),
   75         Optional('query_function'):
   76             In([
   77                 'abs', 'ceil', 'exp',
   78                 'floor', 'ln', 'log2',
   79                 'log10', 'round', 'sqrt'
   80             ]),
   81         Optional('range_function'):
   82             In([
   83                 'changes', 'delta', 'deriv',
   84                 'idelta', 'irange', 'irate',
   85                 'rate'
   86             ])
   87     }
   88 }
   89 
   90 
   91 class PrometheusCollector(collector.BaseCollector):
   92     collector_name = 'prometheus'
   93 
   94     def __init__(self, **kwargs):
   95         super(PrometheusCollector, self).__init__(**kwargs)
   96         url = CONF.collector_prometheus.prometheus_url
   97 
   98         user = CONF.collector_prometheus.prometheus_user
   99         password = CONF.collector_prometheus.prometheus_password
  100 
  101         verify = True
  102         if CONF.collector_prometheus.cafile:
  103             verify = CONF.collector_prometheus.cafile
  104         elif CONF.collector_prometheus.insecure:
  105             verify = False
  106 
  107         self._conn = PrometheusClient(
  108             url,
  109             auth=(user, password) if user and password else None,
  110             verify=verify,
  111         )
  112 
  113     @staticmethod
  114     def check_configuration(conf):
  115         conf = collector.BaseCollector.check_configuration(conf)
  116         metric_schema = Schema(collector.METRIC_BASE_SCHEMA).extend(
  117             PROMETHEUS_EXTRA_SCHEMA,
  118         )
  119 
  120         output = {}
  121         for metric_name, metric in conf.items():
  122             output[metric_name] = metric_schema(metric)
  123 
  124         return output
  125 
  126     def _format_data(self, metric_name, scope_key, scope_id, start, end, data):
  127         """Formats Prometheus data format to Cloudkitty data format.
  128 
  129         Returns metadata, groupby, qty
  130         """
  131         metadata = {}
  132         for meta in self.conf[metric_name]['metadata']:
  133             metadata[meta] = data['metric'].get(meta, '')
  134 
  135         groupby = {scope_key: scope_id}
  136         for meta in self.conf[metric_name]['groupby']:
  137             groupby[meta] = data['metric'].get(meta, '')
  138 
  139         with localcontext() as ctx:
  140             ctx.prec = 9
  141             ctx.rounding = ROUND_HALF_UP
  142 
  143             qty = ck_utils.convert_unit(
  144                 +Decimal(data['value'][1]),
  145                 self.conf[metric_name]['factor'],
  146                 self.conf[metric_name]['offset'],
  147             )
  148             qty = ck_utils.mutate(qty, self.conf[metric_name]['mutate'])
  149 
  150         return metadata, groupby, qty
  151 
  152     def fetch_all(self, metric_name, start, end, scope_id, q_filter=None):
  153         """Returns metrics to be valorized."""
  154         scope_key = CONF.collect.scope_key
  155         method = self.conf[metric_name]['extra_args']['aggregation_method']
  156         query_function = self.conf[metric_name]['extra_args'].get(
  157             'query_function')
  158         range_function = self.conf[metric_name]['extra_args'].get(
  159             'range_function')
  160         groupby = self.conf[metric_name].get('groupby', [])
  161         metadata = self.conf[metric_name].get('metadata', [])
  162         period = tzutils.diff_seconds(end, start)
  163         time = end
  164 
  165         # The metric with the period
  166         query = '{0}{{{1}="{2}"}}[{3}s]'.format(
  167             metric_name,
  168             scope_key,
  169             scope_id,
  170             period
  171         )
  172         # Applying the aggregation_method or the range_function on
  173         # a Range Vector
  174         if range_function is not None:
  175             query = "{0}({1})".format(
  176                 range_function,
  177                 query
  178             )
  179         else:
  180             query = "{0}_over_time({1})".format(
  181                 method,
  182                 query
  183             )
  184         # Applying the query_function
  185         if query_function is not None:
  186             query = "{0}({1})".format(
  187                 query_function,
  188                 query
  189             )
  190         # Applying the aggregation_method on a Instant Vector
  191         query = "{0}({1})".format(
  192             method,
  193             query
  194         )
  195         # Filter by groupby and metadata
  196         query = "{0} by ({1})".format(
  197             query,
  198             ', '.join(groupby + metadata)
  199         )
  200 
  201         try:
  202             res = self._conn.get_instant(
  203                 query,
  204                 time.isoformat(),
  205             )
  206         except PrometheusResponseError as e:
  207             raise CollectError(*e.args)
  208 
  209         # If the query returns an empty dataset,
  210         # return an empty list
  211         if not res['data']['result']:
  212             return []
  213 
  214         formatted_resources = []
  215 
  216         for item in res['data']['result']:
  217             metadata, groupby, qty = self._format_data(
  218                 metric_name,
  219                 scope_key,
  220                 scope_id,
  221                 start,
  222                 end,
  223                 item,
  224             )
  225 
  226             formatted_resources.append(dataframe.DataPoint(
  227                 self.conf[metric_name]['unit'],
  228                 qty,
  229                 0,
  230                 groupby,
  231                 metadata,
  232             ))
  233 
  234         return formatted_resources