"Fossies" - the Fresh Open Source Software Archive

Member "monasca-api-3.1.0/monasca_api/common/repositories/cassandra/metrics_repository.py" (27 Sep 2019, 39850 Bytes) of package /linux/misc/openstack/monasca-api-3.1.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "metrics_repository.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 3.0.0_vs_3.1.0.

    1 # (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
    2 # (C) Copyright 2017-2018 SUSE LLC
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 # not use this file except in compliance with the License. You may obtain
    6 # a copy of the License at
    7 #
    8 # http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 # Unless required by applicable law or agreed to in writing, software
   11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 # License for the specific language governing permissions and limitations
   14 # under the License.
   15 
   16 import binascii
   17 from collections import namedtuple
   18 from datetime import datetime
   19 from datetime import timedelta
   20 import itertools
   21 import six
   22 import urllib
   23 
   24 from cassandra.auth import PlainTextAuthProvider
   25 from cassandra.cluster import Cluster
   26 from cassandra.cluster import DCAwareRoundRobinPolicy
   27 from cassandra.cluster import TokenAwarePolicy
   28 from cassandra.query import FETCH_SIZE_UNSET
   29 from cassandra.query import SimpleStatement
   30 from monasca_api.common.rest import utils as rest_utils
   31 from oslo_config import cfg
   32 from oslo_log import log
   33 from oslo_utils import encodeutils
   34 from oslo_utils import timeutils
   35 
   36 from monasca_api.common.repositories import exceptions
   37 from monasca_api.common.repositories import metrics_repository
   38 
   39 
   40 CONF = cfg.CONF
   41 LOG = log.getLogger(__name__)
   42 
   43 LIMIT_CLAUSE = 'limit %s'
   44 ALLOW_FILTERING = 'allow filtering'
   45 
   46 MEASUREMENT_LIST_CQL = ('select time_stamp, value, value_meta '
   47                         'from measurements where %s %s %s %s')
   48 METRIC_ID_EQ = 'metric_id = %s'
   49 METRIC_ID_IN = 'metric_id in %s'
   50 OFFSET_TIME_GT = "and time_stamp > %s"
   51 START_TIME_GE = "and time_stamp >= %s"
   52 END_TIME_LE = "and time_stamp <= %s"
   53 
   54 METRIC_LIST_CQL = ('select metric_name, dimensions, metric_id '
   55                    'from metrics where %s %s %s %s %s %s %s %s %s %s')
   56 REGION_EQ = 'region = %s'
   57 TENANT_EQ = 'and tenant_id = %s'
   58 METRIC_NAME_EQ = 'and metric_name = %s'
   59 DIMENSIONS_CONTAINS = 'and dimensions contains %s '
   60 DIMENSIONS_NAME_CONTAINS = 'and dimension_names contains %s '
   61 CREATED_TIME_LE = "and created_at <= %s"
   62 UPDATED_TIME_GE = "and updated_at >= %s"
   63 DIMENSIONS_GT = 'and dimensions > %s'
   64 
   65 DIMENSION_VALUE_BY_METRIC_CQL = ('select dimension_value as value from metrics_dimensions '
   66                                  'where region = ? and tenant_id = ? and metric_name = ? '
   67                                  'and dimension_name = ? group by dimension_value')
   68 
   69 DIMENSION_VALUE_CQL = ('select value from dimensions '
   70                        'where region = ? and tenant_id = ? and name = ? '
   71                        'group by value order by value')
   72 
   73 DIMENSION_NAME_BY_METRIC_CQL = ('select dimension_name as name from metrics_dimensions where '
   74                                 'region = ? and tenant_id = ? and metric_name = ? '
   75                                 'group by dimension_name order by dimension_name')
   76 
   77 DIMENSION_NAME_CQL = ('select name from dimensions where region = ? and tenant_id = ? '
   78                       'group by name allow filtering')
   79 
   80 METRIC_NAME_BY_DIMENSION_CQL = ('select metric_name from dimensions_metrics where region = ? and '
   81                                 'tenant_id = ? and dimension_name = ? and dimension_value = ? '
   82                                 'group by metric_name order by metric_name')
   83 
   84 METRIC_NAME_BY_DIMENSION_OFFSET_CQL = (
   85     'select metric_name from dimensions_metrics where region = ? and '
   86     'tenant_id = ? and dimension_name = ? and dimension_value = ? and '
   87     'metric_name >= ?'
   88     'group by metric_name order by metric_name')
   89 
   90 METRIC_NAME_CQL = ('select distinct region, tenant_id, metric_name from metrics_dimensions '
   91                    'where region = ? and tenant_id = ? allow filtering')
   92 
   93 METRIC_NAME_OFFSET_CQL = ('select distinct region, tenant_id, metric_name from metrics_dimensions '
   94                           'where region = ? and tenant_id = ? and metric_name >= ? allow filtering')
   95 
   96 METRIC_BY_ID_CQL = ('select region, tenant_id, metric_name, dimensions from measurements '
   97                     'where metric_id = ? limit 1')
   98 
   99 Metric = namedtuple('metric', 'id name dimensions')
  100 
  101 ALARM_HISTORY_CQL = (
  102     'select tenant_id, alarm_id, time_stamp, metric, new_state, old_state, reason, reason_data, '
  103     'sub_alarms from alarm_state_history where %s %s %s %s %s')
  104 
  105 ALARM_ID_EQ = 'and alarm_id = %s'
  106 
  107 ALARM_ID_IN = 'and alarm_id in %s'
  108 
  109 ALARM_TENANT_ID_EQ = 'tenant_id = %s'
  110 
  111 
  112 class MetricsRepository(metrics_repository.AbstractMetricsRepository):
  113     def __init__(self):
  114 
  115         try:
  116             self.conf = cfg.CONF
  117 
  118             if self.conf.cassandra.user:
  119                 auth_provider = PlainTextAuthProvider(username=self.conf.cassandra.user,
  120                                                       password=self.conf.cassandra.password)
  121             else:
  122                 auth_provider = None
  123 
  124             self.cluster = Cluster(self.conf.cassandra.contact_points,
  125                                    port=self.conf.cassandra.port,
  126                                    auth_provider=auth_provider,
  127                                    connect_timeout=self.conf.cassandra.connection_timeout,
  128                                    load_balancing_policy=TokenAwarePolicy(
  129                                        DCAwareRoundRobinPolicy(
  130                                            local_dc=self.conf.cassandra.local_data_center))
  131                                    )
  132             self.session = self.cluster.connect(self.conf.cassandra.keyspace)
  133 
  134             self.dim_val_by_metric_stmt = self.session.prepare(DIMENSION_VALUE_BY_METRIC_CQL)
  135 
  136             self.dim_val_stmt = self.session.prepare(DIMENSION_VALUE_CQL)
  137 
  138             self.dim_name_by_metric_stmt = self.session.prepare(DIMENSION_NAME_BY_METRIC_CQL)
  139 
  140             self.dim_name_stmt = self.session.prepare(DIMENSION_NAME_CQL)
  141 
  142             self.metric_name_by_dimension_stmt = self.session.prepare(METRIC_NAME_BY_DIMENSION_CQL)
  143 
  144             self.metric_name_by_dimension_offset_stmt = self.session.prepare(
  145                 METRIC_NAME_BY_DIMENSION_OFFSET_CQL)
  146 
  147             self.metric_name_stmt = self.session.prepare(METRIC_NAME_CQL)
  148 
  149             self.metric_name_offset_stmt = self.session.prepare(METRIC_NAME_OFFSET_CQL)
  150 
  151             self.metric_by_id_stmt = self.session.prepare(METRIC_BY_ID_CQL)
  152 
  153         except Exception as ex:
  154             LOG.exception(ex)
  155             raise exceptions.RepositoryException(ex)
  156 
  157         self.epoch = datetime.utcfromtimestamp(0)
  158 
  159     def list_dimension_values(self, tenant_id, region, metric_name,
  160                               dimension_name, start_timestamp=None,
  161                               end_timestamp=None):
  162 
  163         if start_timestamp or end_timestamp:
  164             # NOTE(brtknr): For more details, see story
  165             # https://storyboard.openstack.org/#!/story/2006204
  166             LOG.info("Scoping by timestamp not implemented for cassandra.")
  167 
  168         try:
  169             if metric_name:
  170                 rows = self.session.execute(
  171                     self.dim_val_by_metric_stmt,
  172                     [region, tenant_id, metric_name, dimension_name])
  173             else:
  174                 rows = self.session.execute(
  175                     self.dim_val_stmt,
  176                     [region, tenant_id, dimension_name])
  177 
  178         except Exception as ex:
  179             LOG.exception(ex)
  180             raise exceptions.RepositoryException(ex)
  181 
  182         json_dim_value_list = []
  183 
  184         if not rows:
  185             return json_dim_value_list
  186 
  187         for row in rows:
  188             json_dim_value_list.append({u'dimension_value': row.value})
  189 
  190         json_dim_value_list.sort(key=lambda x: x[u'dimension_value'])
  191 
  192         return json_dim_value_list
  193 
  194     def list_dimension_names(self, tenant_id, region, metric_name,
  195                              start_timestamp=None, end_timestamp=None):
  196 
  197         if start_timestamp or end_timestamp:
  198             # NOTE(brtknr): For more details, see story
  199             # https://storyboard.openstack.org/#!/story/2006204
  200             LOG.info("Scoping by timestamp not implemented for cassandra.")
  201 
  202         try:
  203             if metric_name:
  204                 rows = self.session.execute(
  205                     self.dim_name_by_metric_stmt,
  206                     [region, tenant_id, metric_name])
  207                 ordered = True
  208             else:
  209                 rows = self.session.execute(
  210                     self.dim_name_stmt,
  211                     [region, tenant_id])
  212                 ordered = False
  213 
  214         except Exception as ex:
  215             LOG.exception(ex)
  216             raise exceptions.RepositoryException(ex)
  217 
  218         if not rows:
  219             return []
  220 
  221         json_dim_name_list = [{u'dimension_name': row.name} for row in rows]
  222 
  223         if not ordered:
  224             json_dim_name_list.sort(key=lambda x: x[u'dimension_name'])
  225 
  226         return json_dim_name_list
  227 
  228     def list_metrics(self, tenant_id, region, name, dimensions, offset, limit, start_time=None,
  229                      end_time=None):
  230 
  231         offset_name = None
  232         offset_dimensions = []
  233         names = []
  234         metric_list = []
  235         offset_futures = []
  236         non_offset_futures = []
  237 
  238         try:
  239             if offset:
  240                 offset_metric = self._get_metric_by_id(offset)
  241                 if offset_metric:
  242                     offset_name = offset_metric.name
  243                     offset_dimensions = offset_metric.dimensions
  244 
  245             if not name:
  246                 names = self._list_metric_names(tenant_id, region, dimensions, offset=offset_name)
  247                 if names:
  248                     names = [elem['name'] for elem in names]
  249             else:
  250                 names.append(name)
  251 
  252             if not names:
  253                 return metric_list
  254 
  255             for name in names:
  256                 if name == offset_name:
  257                     futures = self._list_metrics_by_name(
  258                         tenant_id,
  259                         region,
  260                         name,
  261                         dimensions,
  262                         offset_dimensions,
  263                         limit,
  264                         start_time=None,
  265                         end_time=None)
  266                     if offset_dimensions and dimensions:
  267                         offset_futures.extend(futures)
  268                     else:
  269                         non_offset_futures.extend(futures)
  270                 else:
  271                     non_offset_futures.extend(
  272                         self._list_metrics_by_name(tenant_id, region, name, dimensions, None, limit,
  273                                                    start_time=None, end_time=None))
  274 
  275             # manually filter out metrics by the offset dimension
  276             for future in offset_futures:
  277                 rows = future.result()
  278                 for row in rows:
  279                     if offset_dimensions >= row.dimensions:
  280                         continue
  281 
  282                     metric_list.append(self._process_metric_row(row))
  283 
  284             for future in non_offset_futures:
  285                 metric_list.extend((self._process_metric_row(row) for row in future.result()))
  286 
  287             return metric_list
  288 
  289         except Exception as ex:
  290             LOG.exception(ex)
  291             raise exceptions.RepositoryException(ex)
  292 
  293     @staticmethod
  294     def _process_metric_row(row):
  295         dim_map = {}
  296         for d in row.dimensions:
  297             pair = d.split('\t')
  298             dim_map[pair[0]] = pair[1]
  299 
  300         if row.metric_id is None:
  301             LOG.error(
  302                 'Metric is missing metric_id, using metric_id=None'
  303                 ' name: {}, dimensions: {}'.format(
  304                     row.metric_name, row.dimensions))
  305             return {'id': None,
  306                     'name': row.metric_name,
  307                     'dimensions': dim_map}
  308 
  309         metric = {'id': binascii.hexlify(bytearray(row.metric_id)),
  310                   'name': row.metric_name,
  311                   'dimensions': dim_map}
  312 
  313         return metric
  314 
  315     def _list_metrics_by_name(
  316             self,
  317             tenant_id,
  318             region,
  319             name,
  320             dimensions,
  321             dimension_offset,
  322             limit,
  323             start_time=None,
  324             end_time=None):
  325 
  326         or_dimensions = []
  327         sub_dimensions = {}
  328         futures = []
  329 
  330         if not dimensions:
  331             query = self._build_metrics_by_name_query(
  332                 tenant_id,
  333                 region,
  334                 name,
  335                 dimensions,
  336                 None,
  337                 start_time,
  338                 end_time,
  339                 dimension_offset,
  340                 limit)
  341             futures.append(self.session.execute_async(query[0], query[1]))
  342             return futures
  343 
  344         wildcard_dimensions = []
  345         for dim_name, dim_value in dimensions.items():
  346             if not dim_value:
  347                 wildcard_dimensions.append(dim_name)
  348 
  349             elif '|' in dim_value:
  350 
  351                 def f(val):
  352                     return {dim_name: val}
  353 
  354                 or_dimensions.append(list(map(f, sorted(dim_value.split('|')))))
  355 
  356             else:
  357                 sub_dimensions[dim_name] = dim_value
  358 
  359         if or_dimensions:
  360             or_dims_list = list(itertools.product(*or_dimensions))
  361 
  362             for or_dims_tuple in or_dims_list:
  363                 extracted_dimensions = sub_dimensions.copy()
  364 
  365                 for dims in iter(or_dims_tuple):
  366                     for k, v in dims.items():
  367                         extracted_dimensions[k] = v
  368 
  369                 query = self._build_metrics_by_name_query(
  370                     tenant_id,
  371                     region,
  372                     name,
  373                     extracted_dimensions,
  374                     wildcard_dimensions,
  375                     start_time,
  376                     end_time,
  377                     dimension_offset,
  378                     limit)
  379 
  380                 futures.append(self.session.execute_async(query[0], query[1]))
  381 
  382         else:
  383             query = self._build_metrics_by_name_query(
  384                 tenant_id,
  385                 region,
  386                 name,
  387                 sub_dimensions,
  388                 wildcard_dimensions,
  389                 start_time,
  390                 end_time,
  391                 dimension_offset,
  392                 limit)
  393             futures.append(self.session.execute_async(query[0], query[1]))
  394 
  395         return futures
  396 
  397     def _get_metric_by_id(self, metric_id):
  398 
  399         rows = self.session.execute(self.metric_by_id_stmt, [bytearray.fromhex(metric_id)])
  400 
  401         if rows:
  402             return Metric(id=metric_id, name=rows[0].metric_name, dimensions=rows[0].dimensions)
  403 
  404         return None
  405 
  406     def _build_metrics_by_name_query(
  407             self,
  408             tenant_id,
  409             region,
  410             name,
  411             dimensions,
  412             wildcard_dimensions,
  413             start_time,
  414             end_time,
  415             dim_offset,
  416             limit):
  417 
  418         conditions = [REGION_EQ, TENANT_EQ]
  419         params = [region, tenant_id.encode('utf8')]
  420 
  421         if name:
  422             conditions.append(METRIC_NAME_EQ)
  423             params.append(name)
  424         else:
  425             conditions.append('')
  426 
  427         if dimensions:
  428             conditions.append(DIMENSIONS_CONTAINS * len(dimensions))
  429             params.extend(
  430                 [self._create_dimension_value_entry(dim_name, dim_value)
  431                  for dim_name, dim_value in dimensions.items()])
  432         else:
  433             conditions.append('')
  434 
  435         if wildcard_dimensions:
  436             conditions.append(DIMENSIONS_NAME_CONTAINS * len(wildcard_dimensions))
  437             params.extend(wildcard_dimensions)
  438         else:
  439             conditions.append('')
  440 
  441         if dim_offset and not dimensions:
  442             # cassandra does not allow using both contains and GT in collection column
  443             conditions.append(DIMENSIONS_GT)
  444             params.append(dim_offset)
  445         else:
  446             conditions.append('')
  447 
  448         if start_time:
  449             conditions.append(UPDATED_TIME_GE % start_time)
  450         else:
  451             conditions.append('')
  452 
  453         if end_time:
  454             conditions.append(CREATED_TIME_LE % end_time)
  455         else:
  456             conditions.append('')
  457 
  458         if limit:
  459             conditions.append(LIMIT_CLAUSE)
  460             params.append(limit)
  461         else:
  462             conditions.append('')
  463 
  464         if (not name) or dimensions or wildcard_dimensions or start_time or end_time:
  465             conditions.append(ALLOW_FILTERING)
  466         else:
  467             conditions.append('')
  468 
  469         return METRIC_LIST_CQL % tuple(conditions), params
  470 
  471     @staticmethod
  472     def _create_dimension_value_entry(name, value):
  473         return '%s\t%s' % (name, value)
  474 
  475     def list_metric_names(self, tenant_id, region, dimensions):
  476         return self._list_metric_names(tenant_id, region, dimensions)
  477 
  478     def _list_metric_names(self, tenant_id, region, dimensions, offset=None):
  479 
  480         or_dimensions = []
  481         single_dimensions = {}
  482 
  483         if dimensions:
  484             for key, value in dimensions.items():
  485                 if not value:
  486                     continue
  487 
  488                 elif '|' in value:
  489                     def f(val):
  490                         return {key: val}
  491 
  492                     or_dimensions.append(list(map(f, sorted(value.split('|')))))
  493 
  494                 else:
  495                     single_dimensions[key] = value
  496 
  497         if or_dimensions:
  498 
  499             names = []
  500             or_dims_list = list(itertools.product(*or_dimensions))
  501 
  502             for or_dims_tuple in or_dims_list:
  503                 extracted_dimensions = single_dimensions.copy()
  504 
  505                 for dims in iter(or_dims_tuple):
  506                     for k, v in dims.items():
  507                         extracted_dimensions[k] = v
  508 
  509                 names.extend(
  510                     self._list_metric_names_single_dimension_value(
  511                         tenant_id, region, extracted_dimensions, offset))
  512 
  513             names.sort(key=lambda x: x[u'name'])
  514             return names
  515 
  516         else:
  517             names = self._list_metric_names_single_dimension_value(
  518                 tenant_id, region, single_dimensions, offset)
  519             names.sort(key=lambda x: x[u'name'])
  520             return names
  521 
  522     def _list_metric_names_single_dimension_value(self, tenant_id, region, dimensions, offset=None):
  523 
  524         try:
  525             futures = []
  526             if dimensions:
  527                 for name, value in dimensions.items():
  528                     if offset:
  529                         futures.append(
  530                             self.session.execute_async(
  531                                 self.metric_name_by_dimension_offset_stmt, [
  532                                     region, tenant_id, name, value, offset]))
  533                     else:
  534                         futures.append(
  535                             self.session.execute_async(
  536                                 self.metric_name_by_dimension_stmt, [
  537                                     region, tenant_id, name, value]))
  538 
  539             else:
  540                 if offset:
  541                     futures.append(
  542                         self.session.execute_async(
  543                             self.metric_name_offset_stmt, [
  544                                 region, tenant_id, offset]))
  545                 else:
  546                     futures.append(
  547                         self.session.execute_async(
  548                             self.metric_name_stmt, [
  549                                 region, tenant_id]))
  550 
  551             names_list = []
  552 
  553             for future in futures:
  554                 rows = future.result()
  555                 tmp = set()
  556                 for row in rows:
  557                     tmp.add(row.metric_name)
  558 
  559                 names_list.append(tmp)
  560 
  561             return [{u'name': v} for v in set.intersection(*names_list)]
  562 
  563         except Exception as ex:
  564             LOG.exception(ex)
  565             raise exceptions.RepositoryException(ex)
  566 
  567     def measurement_list(self, tenant_id, region, name, dimensions,
  568                          start_timestamp, end_timestamp, offset, limit,
  569                          merge_metrics_flag, group_by):
  570 
  571         metrics = self.list_metrics(tenant_id, region, name, dimensions, None, None)
  572 
  573         if offset:
  574             tmp = offset.split("_")
  575             if len(tmp) > 1:
  576                 offset_id = tmp[0]
  577                 offset_timestamp = tmp[1]
  578             else:
  579                 offset_id = None
  580                 offset_timestamp = offset
  581         else:
  582             offset_timestamp = None
  583             offset_id = None
  584 
  585         if not metrics:
  586             return None
  587         elif len(metrics) > 1:
  588             if not merge_metrics_flag and not group_by:
  589                 raise exceptions.MultipleMetricsException(self.MULTIPLE_METRICS_MESSAGE)
  590 
  591         try:
  592             if len(metrics) > 1 and not group_by:
  593                 # offset is controlled only by offset_timestamp when the group by option
  594                 # is not enabled
  595                 count, series_list = self._query_merge_measurements(metrics,
  596                                                                     dimensions,
  597                                                                     start_timestamp,
  598                                                                     end_timestamp,
  599                                                                     offset_timestamp,
  600                                                                     limit)
  601                 return series_list
  602 
  603             if group_by:
  604                 if not isinstance(group_by, list):
  605                     group_by = group_by.split(',')
  606                 elif len(group_by) == 1:
  607                     group_by = group_by[0].split(',')
  608 
  609             if len(metrics) == 1 or group_by[0].startswith('*'):
  610                 if offset_id:
  611                     for index, metric in enumerate(metrics):
  612                         if metric['id'] == offset_id:
  613                             if index > 0:
  614                                 metrics[0:index] = []
  615                             break
  616 
  617                 count, series_list = self._query_measurements(metrics,
  618                                                               start_timestamp,
  619                                                               end_timestamp,
  620                                                               offset_timestamp,
  621                                                               limit)
  622 
  623                 return series_list
  624 
  625             grouped_metrics = self._group_metrics(metrics, group_by, dimensions)
  626 
  627             if not grouped_metrics or len(grouped_metrics) == 0:
  628                 return None
  629 
  630             if offset_id:
  631                 found_offset = False
  632                 for outer_index, sublist in enumerate(grouped_metrics):
  633                     for inner_index, metric in enumerate(sublist):
  634                         if metric['id'] == offset_id:
  635                             found_offset = True
  636                             if inner_index > 0:
  637                                 sublist[0:inner_index] = []
  638                             break
  639                     if found_offset:
  640                         if outer_index > 0:
  641                             grouped_metrics[0:outer_index] = []
  642                         break
  643 
  644             remaining = limit
  645             series_list = []
  646             for sublist in grouped_metrics:
  647                 sub_count, results = self._query_merge_measurements(sublist,
  648                                                                     sublist[0]['dimensions'],
  649                                                                     start_timestamp,
  650                                                                     end_timestamp,
  651                                                                     offset_timestamp,
  652                                                                     remaining)
  653 
  654                 series_list.extend(results)
  655 
  656                 if remaining:
  657                     remaining -= sub_count
  658                     if remaining <= 0:
  659                         break
  660 
  661                 # offset_timestamp is used only in the first group, reset to None for
  662                 # subsequent groups
  663                 if offset_timestamp:
  664                     offset_timestamp = None
  665 
  666             return series_list
  667 
  668         except Exception as ex:
  669             LOG.exception(ex)
  670             raise exceptions.RepositoryException(ex)
  671 
  672     def _query_merge_measurements(self, metrics, dimensions, start_timestamp, end_timestamp,
  673                                   offset_timestamp, limit):
  674         results = []
  675         for metric in metrics:
  676             if limit and len(metrics) > 1:
  677                 fetch_size = min(limit, max(1000, limit / len(metrics) + 2))
  678             else:
  679                 fetch_size = None
  680             query = self._build_measurement_query(metric['id'],
  681                                                   start_timestamp,
  682                                                   end_timestamp,
  683                                                   offset_timestamp,
  684                                                   limit,
  685                                                   fetch_size)
  686             results.append((metric, iter(self.session.execute_async(query[0], query[1]).result())))
  687 
  688         return self._merge_series(results, dimensions, limit)
  689 
  690     def _query_measurements(self, metrics, start_timestamp, end_timestamp,
  691                             offset_timestamp, limit):
  692         results = []
  693         for index, metric in enumerate(metrics):
  694             if index == 0:
  695                 query = self._build_measurement_query(metric['id'],
  696                                                       start_timestamp,
  697                                                       end_timestamp,
  698                                                       offset_timestamp,
  699                                                       limit)
  700             else:
  701                 if limit:
  702                     fetch_size = min(self.session.default_fetch_size,
  703                                      max(1000, limit / min(index, 4)))
  704                 else:
  705                     fetch_size = self.session.default_fetch_size
  706                 query = self._build_measurement_query(metric['id'],
  707                                                       start_timestamp,
  708                                                       end_timestamp,
  709                                                       None,
  710                                                       limit,
  711                                                       fetch_size)
  712 
  713             results.append([metric,
  714                             iter(self.session.execute_async(query[0], query[1]).result())])
  715 
  716         series_list = []
  717         count = 0
  718         for result in results:
  719             measurements = []
  720             row = next(result[1], None)
  721             while row:
  722                 measurements.append(
  723                     [self._isotime_msec(row.time_stamp), row.value,
  724                      rest_utils.from_json(row.value_meta) if row.value_meta else {}])
  725                 count += 1
  726                 if limit and count >= limit:
  727                     break
  728 
  729                 row = next(result[1], None)
  730 
  731             series_list.append({'name': result[0]['name'],
  732                                 'id': result[0]['id'],
  733                                 'columns': ['timestamp', 'value', 'value_meta'],
  734                                 'measurements': measurements,
  735                                 'dimensions': result[0]['dimensions']})
  736             if limit and count >= limit:
  737                 break
  738 
  739         return count, series_list
  740 
  741     @staticmethod
  742     def _build_measurement_query(metric_id, start_timestamp,
  743                                  end_timestamp, offset_timestamp,
  744                                  limit=None, fetch_size=FETCH_SIZE_UNSET):
  745         conditions = [METRIC_ID_EQ]
  746         decode_metric_id = metric_id if six.PY2 else metric_id.decode('utf-8')
  747         params = [bytearray.fromhex(decode_metric_id)]
  748 
  749         if offset_timestamp:
  750             conditions.append(OFFSET_TIME_GT)
  751             params.append(offset_timestamp)
  752         elif start_timestamp:
  753             conditions.append(START_TIME_GE)
  754             params.append(int(start_timestamp * 1000))
  755         else:
  756             conditions.append('')
  757 
  758         if end_timestamp:
  759             conditions.append(END_TIME_LE)
  760             params.append(int(end_timestamp * 1000))
  761         else:
  762             conditions.append('')
  763 
  764         if limit:
  765             conditions.append(LIMIT_CLAUSE)
  766             params.append(limit)
  767         else:
  768             conditions.append('')
  769 
  770         return SimpleStatement(MEASUREMENT_LIST_CQL %
  771                                tuple(conditions), fetch_size=fetch_size), params
  772 
  773     def _merge_series(self, series, dimensions, limit):
  774         series_list = []
  775 
  776         if not series:
  777             return series_list
  778 
  779         measurements = []
  780         top_batch = []
  781         num_series = len(series)
  782         for i in range(0, num_series):
  783             row = next(series[i][1], None)
  784             if row:
  785                 top_batch.append([i,
  786                                   row.time_stamp,
  787                                   row.value,
  788                                   rest_utils.from_json(row.value_meta) if row.value_meta else {}])
  789             else:
  790                 num_series -= 1
  791 
  792         top_batch.sort(key=lambda m: m[1], reverse=True)
  793 
  794         count = 0
  795         while (not limit or count < limit) and top_batch:
  796             measurements.append([self._isotime_msec(top_batch[num_series - 1][1]),
  797                                  top_batch[num_series - 1][2],
  798                                  top_batch[num_series - 1][3]])
  799             count += 1
  800             row = next(series[top_batch[num_series - 1][0]][1], None)
  801             if row:
  802                 top_batch[num_series - 1] = \
  803                     [top_batch[num_series - 1][0], row.time_stamp,
  804                      row.value, rest_utils.from_json(row.value_meta) if row.value_meta else {}]
  805 
  806                 top_batch.sort(key=lambda m: m[1], reverse=True)
  807             else:
  808                 num_series -= 1
  809                 top_batch.pop()
  810 
  811         series_list.append({'name': series[0][0]['name'],
  812                             'id': series[0][0]['id'],
  813                             'columns': ['timestamp', 'value', 'value_meta'],
  814                             'measurements': measurements,
  815                             'dimensions': dimensions})
  816 
  817         return count, series_list
  818 
  819     @staticmethod
  820     def _group_metrics(metrics, group_by, search_by):
  821 
  822         grouped_metrics = {}
  823         for metric in metrics:
  824             key = ''
  825             display_dimensions = dict(search_by.items())
  826             for name in group_by:
  827                 # '_' ensures te key with missing dimension is sorted lower
  828                 value = metric['dimensions'].get(name, '_')
  829                 if value != '_':
  830                     display_dimensions[name] = value
  831                 key = key + '='.join((urllib.quote_plus(name), urllib.quote_plus(value))) + '&'
  832 
  833             metric['dimensions'] = display_dimensions
  834 
  835             if key in grouped_metrics:
  836                 grouped_metrics[key].append(metric)
  837             else:
  838                 grouped_metrics[key] = [metric]
  839 
  840         grouped_metrics = grouped_metrics.items()
  841         grouped_metrics.sort(key=lambda k: k[0])
  842         return [x[1] for x in grouped_metrics]
  843 
  844     @staticmethod
  845     def _isotime_msec(timestamp):
  846         """Stringify datetime in ISO 8601 format + millisecond.
  847         """
  848         st = timestamp.isoformat()
  849         if '.' in st:
  850             st = st[:23] + u'Z'
  851         else:
  852             st += u'.000Z'
  853         return st
  854 
  855     def metrics_statistics(self, tenant_id, region, name, dimensions,
  856                            start_timestamp, end_timestamp, statistics,
  857                            period, offset, limit, merge_metrics_flag,
  858                            group_by):
  859 
  860         if not period:
  861             period = 300
  862         else:
  863             period = int(period)
  864 
  865         series_list = self.measurement_list(tenant_id, region, name, dimensions,
  866                                             start_timestamp, end_timestamp,
  867                                             offset, None, merge_metrics_flag, group_by)
  868 
  869         json_statistics_list = []
  870 
  871         if not series_list:
  872             return json_statistics_list
  873 
  874         statistics = [stat.lower() for stat in statistics]
  875 
  876         columns = [u'timestamp']
  877 
  878         columns.extend([x for x in ['avg', 'min', 'max', 'count', 'sum'] if x in statistics])
  879 
  880         start_time = datetime.utcfromtimestamp(start_timestamp)
  881         if end_timestamp:
  882             end_time = datetime.utcfromtimestamp(end_timestamp)
  883         else:
  884             end_time = datetime.utcnow()
  885 
  886         for series in series_list:
  887 
  888             if limit <= 0:
  889                 break
  890 
  891             measurements = series['measurements']
  892 
  893             if not measurements:
  894                 continue
  895 
  896             first_measure = measurements[0]
  897             first_measure_start_time = MetricsRepository._parse_time_string(first_measure[0])
  898 
  899             # skip blank intervals at the beginning, finds the start time of stat
  900             # period that is not empty
  901             stat_start_time = start_time + timedelta(
  902                 seconds=((first_measure_start_time - start_time).seconds / period) * period)
  903 
  904             stats_list = []
  905             stats_count = 0
  906             stats_sum = 0
  907             stats_min = stats_max = first_measure[1]
  908 
  909             for measurement in series['measurements']:
  910 
  911                 time_stamp = MetricsRepository._parse_time_string(measurement[0])
  912                 value = measurement[1]
  913 
  914                 if (time_stamp - stat_start_time).seconds >= period:
  915 
  916                     stat = MetricsRepository._create_stat(statistics, stat_start_time, stats_count,
  917                                                           stats_sum, stats_min, stats_max)
  918 
  919                     stats_list.append(stat)
  920                     limit -= 1
  921                     if limit <= 0:
  922                         break
  923 
  924                     # initialize the new stat period
  925                     stats_sum = value
  926                     stats_count = 1
  927                     stats_min = value
  928                     stats_max = value
  929                     stat_start_time += timedelta(seconds=period)
  930 
  931                 else:
  932                     stats_min = min(stats_min, value)
  933                     stats_max = max(stats_max, value)
  934                     stats_count += 1
  935                     stats_sum += value
  936 
  937             if stats_count:
  938                 stat = MetricsRepository._create_stat(
  939                     statistics, stat_start_time, stats_count, stats_sum, stats_min, stats_max)
  940                 stats_list.append(stat)
  941                 limit -= 1
  942 
  943             stats_end_time = stat_start_time + timedelta(seconds=period) - timedelta(milliseconds=1)
  944             if stats_end_time > end_time:
  945                 stats_end_time = end_time
  946 
  947             statistic = {u'name': encodeutils.safe_decode(name, 'utf-8'),
  948                          u'id': series['id'],
  949                          u'dimensions': series['dimensions'],
  950                          u'columns': columns,
  951                          u'statistics': stats_list,
  952                          u'end_time': self._isotime_msec(stats_end_time)}
  953 
  954             json_statistics_list.append(statistic)
  955 
  956         return json_statistics_list
  957 
  958     @staticmethod
  959     def _create_stat(
  960             statistics,
  961             timestamp,
  962             stat_count=None,
  963             stat_sum=None,
  964             stat_min=None,
  965             stat_max=None):
  966 
  967         stat = [MetricsRepository._isotime_msec(timestamp)]
  968 
  969         if not stat_count:
  970             stat.extend([0] * len(statistics))
  971 
  972         else:
  973             if 'avg' in statistics:
  974                 stat.append(stat_sum / stat_count)
  975 
  976             if 'min' in statistics:
  977                 stat.append(stat_min)
  978 
  979             if 'max' in statistics:
  980                 stat.append(stat_max)
  981 
  982             if 'count' in statistics:
  983                 stat.append(stat_count)
  984 
  985             if 'sum' in statistics:
  986                 stat.append(stat_sum)
  987 
  988         return stat
  989 
  990     @staticmethod
  991     def _parse_time_string(timestamp):
  992         dt = timeutils.parse_isotime(timestamp)
  993         dt = timeutils.normalize_time(dt)
  994         return dt
  995 
  996     def alarm_history(self, tenant_id, alarm_id_list,
  997                       offset, limit, start_timestamp=None,
  998                       end_timestamp=None):
  999 
 1000         try:
 1001 
 1002             json_alarm_history_list = []
 1003 
 1004             if not alarm_id_list:
 1005                 return json_alarm_history_list
 1006 
 1007             conditions = [ALARM_TENANT_ID_EQ]
 1008             params = [tenant_id.encode('utf8')]
 1009             if len(alarm_id_list) == 1:
 1010                 conditions.append(ALARM_ID_EQ)
 1011                 params.append(alarm_id_list[0])
 1012             else:
 1013                 conditions.append(
 1014                     ' and alarm_id in ({}) '.format(
 1015                         ','.join(
 1016                             ['%s'] *
 1017                             len(alarm_id_list))))
 1018                 for alarm_id in alarm_id_list:
 1019                     params.append(alarm_id)
 1020 
 1021             if offset:
 1022                 conditions.append(OFFSET_TIME_GT)
 1023                 params.append(offset)
 1024 
 1025             elif start_timestamp:
 1026                 conditions.append(START_TIME_GE)
 1027                 params.append(int(start_timestamp * 1000))
 1028             else:
 1029                 conditions.append('')
 1030 
 1031             if end_timestamp:
 1032                 conditions.append(END_TIME_LE)
 1033                 params.append(int(end_timestamp * 1000))
 1034             else:
 1035                 conditions.append('')
 1036 
 1037             if limit:
 1038                 conditions.append(LIMIT_CLAUSE)
 1039                 params.append(limit + 1)
 1040             else:
 1041                 conditions.append('')
 1042 
 1043             rows = self.session.execute(ALARM_HISTORY_CQL % tuple(conditions), params)
 1044 
 1045             if not rows:
 1046                 return json_alarm_history_list
 1047 
 1048             sorted_rows = sorted(rows, key=lambda row: row.time_stamp)
 1049 
 1050             for (tenant_id, alarm_id, time_stamp, metrics, new_state, old_state, reason,
 1051                  reason_data, sub_alarms) in sorted_rows:
 1052 
 1053                 alarm = {u'timestamp': self._isotime_msec(time_stamp),
 1054                          u'alarm_id': alarm_id,
 1055                          u'metrics': rest_utils.from_json(metrics),
 1056                          u'new_state': new_state,
 1057                          u'old_state': old_state,
 1058                          u'reason': reason,
 1059                          u'reason_data': reason_data,
 1060                          u'sub_alarms': rest_utils.from_json(sub_alarms),
 1061                          u'id': str(int((time_stamp - self.epoch).total_seconds() * 1000))}
 1062 
 1063                 if alarm[u'sub_alarms']:
 1064 
 1065                     for sub_alarm in alarm[u'sub_alarms']:
 1066                         sub_expr = sub_alarm['sub_alarm_expression']
 1067                         metric_def = sub_expr['metric_definition']
 1068                         sub_expr['metric_name'] = metric_def['name']
 1069                         sub_expr['dimensions'] = metric_def['dimensions']
 1070                         del sub_expr['metric_definition']
 1071 
 1072                 json_alarm_history_list.append(alarm)
 1073 
 1074             return json_alarm_history_list
 1075 
 1076         except Exception as ex:
 1077             LOG.exception(ex)
 1078             raise exceptions.RepositoryException(ex)
 1079 
 1080     @staticmethod
 1081     def check_status():
 1082         try:
 1083             cluster = Cluster(
 1084                 CONF.cassandra.contact_points
 1085             )
 1086             session = cluster.connect(CONF.cassandra.keyspace)
 1087             session.shutdown()
 1088         except Exception as ex:
 1089             LOG.exception(str(ex))
 1090             return False, str(ex)
 1091         return True, 'OK'