"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/storage/utils.py" (13 May 2020, 8111 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "utils.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.0_vs_10.0.0.

    1 # Copyright (c) 2013 Rackspace, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
    4 # use this file except in compliance with the License.  You may obtain a copy
    5 # of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations under
   13 # the License.
   14 
   15 import copy
   16 import hashlib
   17 import json
   18 
   19 from oslo_config import cfg
   20 from oslo_log import log
   21 from osprofiler import profiler
   22 import six
   23 from stevedore import driver
   24 
   25 from zaqar.common import errors
   26 from zaqar.common import utils
   27 from zaqar.storage import configuration
   28 
   29 LOG = log.getLogger(__name__)
   30 
   31 
   32 def dynamic_conf(uri, options, conf=None):
   33     """Given metadata, yields a dynamic configuration.
   34 
   35     :param uri: pool location
   36     :type uri: six.text_type
   37     :param options: additional pool metadata
   38     :type options: dict
   39     :param conf: Optional conf object to copy
   40     :type conf: `oslo_config.cfg.ConfigOpts`
   41     :returns: Configuration object suitable for constructing storage
   42               drivers
   43     :rtype: oslo_config.cfg.ConfigOpts
   44     """
   45     storage_type = six.moves.urllib_parse.urlparse(uri).scheme
   46 
   47     # NOTE(cpp-cabrera): parse storage-specific opts:
   48     # 'drivers:storage:{type}'
   49     options['uri'] = uri
   50     storage_opts = utils.dict_to_conf(options)
   51     storage_group = u'drivers:message_store:%s' % storage_type
   52 
   53     # NOTE(cpp-cabrera): register those options!
   54     if conf is None:
   55         conf = cfg.ConfigOpts()
   56     else:
   57         conf_wrap = configuration.Configuration(conf)
   58         conf = copy.copy(conf_wrap)
   59 
   60     if storage_group not in conf:
   61         conf.register_opts(storage_opts, group=storage_group)
   62 
   63     if 'drivers' not in conf:
   64         # NOTE(cpp-cabrera): parse general opts: 'drivers'
   65         driver_opts = utils.dict_to_conf({'message_store': storage_type})
   66         conf.register_opts(driver_opts, group=u'drivers')
   67 
   68     conf.set_override('message_store', storage_type, 'drivers')
   69 
   70     for opt in options:
   71         if opt in conf[storage_group]:
   72             conf.set_override(opt, options[opt], group=storage_group)
   73     return conf
   74 
   75 
   76 def load_storage_impl(uri, control_mode=False, default_store=None):
   77     """Loads a storage driver implementation and returns it.
   78 
   79     :param uri: The connection uri to parse and load a driver for.
   80     :param control_mode: (Default False). Determines which
   81         driver type to load; if False, the data driver is
   82         loaded. If True, the control driver is loaded.
   83     :param default_store: The default store to load if no scheme
   84         is parsed.
   85     """
   86 
   87     mode = 'control' if control_mode else 'data'
   88     driver_type = 'zaqar.{0}.storage'.format(mode)
   89     storage_type = six.moves.urllib_parse.urlparse(uri).scheme or default_store
   90 
   91     try:
   92         mgr = driver.DriverManager(driver_type, storage_type,
   93                                    invoke_on_load=False)
   94 
   95         return mgr.driver
   96 
   97     except Exception as exc:
   98         LOG.exception('Error loading storage driver')
   99         raise errors.InvalidDriver(exc)
  100 
  101 
  102 def load_storage_driver(conf, cache, storage_type=None,
  103                         control_mode=False, control_driver=None):
  104     """Loads a storage driver and returns it.
  105 
  106     The driver's initializer will be passed conf and cache as
  107     its positional args.
  108 
  109     :param conf: Configuration instance to use for loading the
  110         driver. Must include a 'drivers' group.
  111     :param cache: Cache instance that the driver can (optionally)
  112         use to reduce latency for some operations.
  113     :param storage_type: The storage_type to load. If None, then
  114         the `drivers` option will be used.
  115     :param control_mode: (Default False). Determines which
  116         driver type to load; if False, the data driver is
  117         loaded. If True, the control driver is loaded.
  118     :param control_driver: (Default None). The control driver
  119         instance to pass to the storage driver. Needed to access
  120         the queue controller, mainly.
  121     """
  122     if control_mode:
  123         mode = 'control'
  124         storage_type = storage_type or conf['drivers'].management_store
  125     else:
  126         mode = 'data'
  127         storage_type = storage_type or conf['drivers'].message_store
  128 
  129     driver_type = 'zaqar.{0}.storage'.format(mode)
  130 
  131     _invoke_args = (conf, cache)
  132     if control_driver is not None:
  133         _invoke_args = (conf, cache, control_driver)
  134 
  135     try:
  136         mgr = driver.DriverManager(driver_type,
  137                                    storage_type,
  138                                    invoke_on_load=True,
  139                                    invoke_args=_invoke_args)
  140 
  141         if conf.profiler.enabled:
  142             if ((mode == "control" and conf.profiler.trace_management_store) or
  143                     (mode == "data" and conf.profiler.trace_message_store)):
  144                 trace_name = '{0}_{1}_driver'.format(storage_type, mode)
  145                 return profiler.trace_cls(trace_name,
  146                                           trace_private=True)(mgr.driver)
  147         else:
  148             return mgr.driver
  149 
  150     except Exception as exc:
  151         LOG.exception('Failed to load "%s" driver for "%s"',
  152                       driver_type, storage_type)
  153         raise errors.InvalidDriver(exc)
  154 
  155 
  156 def keyify(key, iterable):
  157     """Make an iterator from an iterable of dicts compared with a key.
  158 
  159     :param key: A key exists for all dict inside the iterable object
  160     :param iterable: The input iterable object
  161     """
  162 
  163     class Keyed(object):
  164         def __init__(self, obj):
  165             self.obj = obj
  166 
  167         def __eq__(self, other):
  168             return self.obj[key] == other.obj[key]
  169 
  170         def __ne__(self, other):
  171             return self.obj[key] != other.obj[key]
  172 
  173         def __lt__(self, other):
  174             return self.obj[key] < other.obj[key]
  175 
  176         def __le__(self, other):
  177             return self.obj[key] <= other.obj[key]
  178 
  179         def __gt__(self, other):
  180             return self.obj[key] > other.obj[key]
  181 
  182         def __ge__(self, other):
  183             return self.obj[key] >= other.obj[key]
  184 
  185     for item in iterable:
  186         yield Keyed(item)
  187 
  188 
  189 def can_connect(uri, conf=None):
  190     """Given a URI, verifies whether it's possible to connect to it.
  191 
  192     :param uri: connection string to a storage endpoint
  193     :type uri: six.text_type
  194     :returns: True if can connect else False
  195     :rtype: bool
  196     """
  197     # NOTE(cabrera): create a mock configuration containing only
  198     # the URI field. This should be sufficient to initialize a
  199     # storage driver.
  200     conf = dynamic_conf(uri, {}, conf=conf)
  201     storage_type = six.moves.urllib_parse.urlparse(uri).scheme
  202 
  203     try:
  204         ctrl = load_storage_driver(conf, None,
  205                                    storage_type=conf.drivers.management_store,
  206                                    control_mode=True)
  207         driver = load_storage_driver(conf, None,
  208                                      storage_type=storage_type,
  209                                      control_driver=ctrl)
  210         return driver.is_alive()
  211     except Exception as exc:
  212         LOG.debug('Can\'t connect to: %s \n%s', (uri, exc))
  213         return False
  214 
  215 
  216 def get_checksum(body, algorithm='MD5'):
  217     """According to the algorithm to get the message body checksum.
  218 
  219     :param body: The message body.
  220     :type body: six.text_type
  221     :param algorithm: The algorithm type, default is MD5.
  222     :type algorithm: six.text_type
  223     :returns: The message body checksum.
  224     :rtype: six.text_type
  225     """
  226 
  227     checksum = '%s:' % algorithm
  228 
  229     if body is None:
  230         return ''
  231     else:
  232         checksum_body = json.dumps(body).encode('utf-8')
  233     # TODO(yangzhenyu): We may support other algorithms in future
  234     # versions, including SHA1, SHA256, SHA512, and so on.
  235     if algorithm == 'MD5':
  236         md5 = hashlib.md5()
  237         md5.update(checksum_body)
  238         checksum += md5.hexdigest()
  239 
  240     return checksum