"Fossies" - the Fresh Open Source Software Archive

Member "barbican-12.0.0/barbican/common/utils.py" (14 Apr 2021, 7695 Bytes) of package /linux/misc/openstack/barbican-12.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "utils.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 11.0.0_vs_12.0.0.

    1 # Copyright (c) 2013-2014 Rackspace, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 """
   17 Common utilities for Barbican.
   18 """
   19 import collections
   20 import importlib
   21 import mimetypes
   22 import uuid
   23 
   24 from oslo_log import log
   25 from oslo_utils import uuidutils
   26 import pecan
   27 import re
   28 import six
   29 from six.moves.urllib import parse
   30 
   31 from barbican.common import config
   32 from barbican import i18n as u
   33 
   34 
   35 CONF = config.CONF
   36 
   37 
   38 # Current API version
   39 API_VERSION = 'v1'
   40 
   41 # Added here to remove cyclic dependency.
   42 # In barbican.model.models module SecretType.OPAQUE was imported from
   43 # barbican.plugin.interface.secret_store which introduces a cyclic dependency
   44 # if `secret_store` plugin needs to use db model classes. So moving shared
   45 # value to another common python module which is already imported in both.
   46 SECRET_TYPE_OPAQUE = "opaque"  # nosec
   47 
   48 
   49 def _do_allow_certain_content_types(func, content_types_list=[]):
   50     # Allows you to bypass pecan's content-type restrictions
   51     cfg = pecan.util._cfg(func)
   52     cfg.setdefault('content_types', {})
   53     cfg['content_types'].update((value, '')
   54                                 for value in content_types_list)
   55     return func
   56 
   57 
   58 def allow_certain_content_types(*content_types_list):
   59     def _wrapper(func):
   60         return _do_allow_certain_content_types(func, content_types_list)
   61     return _wrapper
   62 
   63 
   64 def allow_all_content_types(f):
   65     return _do_allow_certain_content_types(f, mimetypes.types_map.values())
   66 
   67 
   68 def get_base_url_from_request():
   69     """Derive base url from wsgi request if CONF.host_href is not set
   70 
   71     Use host.href as base URL if its set in barbican.conf.
   72     If its not set, then derives value from wsgi request. WSGI request uses
   73     HOST header or HTTP_X_FORWARDED_FOR header (in case of proxy) for host +
   74     port part of its url. Proxies can also set HTTP_X_FORWARDED_PROTO header
   75     for indicating http vs https.
   76 
   77     Some of unit tests does not have pecan context that's why using request
   78     attr check on pecan instance.
   79     """
   80     if not CONF.host_href and hasattr(pecan.request, 'application_url'):
   81         p_url = parse.urlsplit(pecan.request.application_url)
   82         # Pecan does not handle X_FORWARDED_PROTO yet, so we need to
   83         # handle it ourselves. see lp#1445290
   84         scheme = pecan.request.environ.get('HTTP_X_FORWARDED_PROTO', 'http')
   85         # Pecan does not handle url reconstruction according to
   86         # https://www.python.org/dev/peps/pep-0333/#url-reconstruction
   87         netloc = pecan.request.environ.get('HTTP_HOST', p_url.netloc)
   88         # FIXME: implement SERVER_NAME lookup if HTTP_HOST is not set
   89         if p_url.path:
   90             # Remove the version from the path to extract the base path
   91             base_path = re.sub(r'/v[0-9\.]+$', '', p_url.path)
   92             base_url = '%s://%s%s' % (scheme, netloc, base_path)
   93         else:
   94             base_url = '%s://%s' % (scheme, netloc)
   95         return base_url
   96     else:  # when host_href is set or flow is not within wsgi request context
   97         return CONF.host_href
   98 
   99 
  100 def hostname_for_refs(resource=None):
  101     """Return the HATEOAS-style return URI reference for this service."""
  102     base_url = get_base_url_from_request()
  103     ref = ['{base}/{version}'.format(base=base_url, version=API_VERSION)]
  104     if resource:
  105         ref.append('/' + resource)
  106     return ''.join(ref)
  107 
  108 
  109 # Return a logger instance.
  110 #   Note: Centralize access to the logger to avoid the dreaded
  111 #   'ArgsAlreadyParsedError: arguments already parsed: cannot
  112 #   register CLI option'
  113 #   error.
  114 def getLogger(name):
  115     return log.getLogger(name)
  116 
  117 
  118 def get_accepted_encodings(req):
  119     """Returns a list of client acceptable encodings sorted by q value.
  120 
  121     For details see: http://tools.ietf.org/html/rfc2616#section-14.3
  122 
  123     :param req: request object
  124     :returns: list of client acceptable encodings sorted by q value.
  125     """
  126     header = req.get_header('Accept-Encoding')
  127 
  128     return get_accepted_encodings_direct(header)
  129 
  130 
  131 def get_accepted_encodings_direct(content_encoding_header):
  132     """Returns a list of client acceptable encodings sorted by q value.
  133 
  134     For details see: http://tools.ietf.org/html/rfc2616#section-14.3
  135 
  136     :param req: request object
  137     :returns: list of client acceptable encodings sorted by q value.
  138     """
  139     if content_encoding_header is None:
  140         return None
  141 
  142     Encoding = collections.namedtuple('Encoding', ['coding', 'quality'])
  143 
  144     encodings = list()
  145     for enc in content_encoding_header.split(','):
  146         if ';' in enc:
  147             coding, qvalue = enc.split(';')
  148             try:
  149                 qvalue = qvalue.split('=')[1]
  150                 quality = float(qvalue.strip())
  151             except ValueError:
  152                 # can't convert quality to float
  153                 return None
  154             if quality > 1.0 or quality < 0.0:
  155                 # quality is outside valid range
  156                 return None
  157             if quality > 0.0:
  158                 encodings.append(Encoding(coding.strip(), quality))
  159         else:
  160             encodings.append(Encoding(enc.strip(), 1))
  161 
  162     # Sort the encodings by quality
  163     encodings = sorted(encodings, key=lambda e: e.quality, reverse=True)
  164 
  165     return [encoding.coding for encoding in encodings]
  166 
  167 
  168 def generate_fullname_for(instance):
  169     """Produce a fully qualified class name for the specified instance.
  170 
  171     :param instance: The instance to generate information from.
  172     :return: A string providing the package.module information for the
  173     instance.
  174     :raises: ValueError if the given instance is null
  175     """
  176     if not instance:
  177         raise ValueError(u._("Cannot generate a fullname for a null instance"))
  178 
  179     module = type(instance).__module__
  180     class_name = type(instance).__name__
  181 
  182     if module is None or module == six.moves.builtins.__name__:
  183         return class_name
  184     return "{module}.{class_name}".format(module=module, class_name=class_name)
  185 
  186 
  187 def get_class_for(module_name, class_name):
  188     """Create a Python class from its text-specified components."""
  189     # Load the module via name, raising ImportError if module cannot be
  190     # loaded.
  191     python_module = importlib.import_module(module_name)
  192 
  193     # Load and return the resolved Python class, raising AttributeError if
  194     # class cannot be found.
  195     return getattr(python_module, class_name)
  196 
  197 
  198 def generate_uuid():
  199     return uuidutils.generate_uuid()
  200 
  201 
  202 def is_multiple_backends_enabled():
  203     try:
  204         secretstore_conf = config.get_module_config('secretstore')
  205     except KeyError:
  206         # Ensure module is initialized
  207         from barbican.plugin.interface import secret_store  # noqa: F401
  208         secretstore_conf = config.get_module_config('secretstore')
  209     return secretstore_conf.secretstore.enable_multiple_secret_stores
  210 
  211 
  212 def validate_id_is_uuid(input_id, version=4):
  213     """Validates provided id is uuid4 format value.
  214 
  215     Returns true when provided id is a valid version 4 uuid otherwise
  216     returns False.
  217     This validation is to be used only for ids which are generated by barbican
  218     (e.g. not for keystone project_id)
  219     """
  220 
  221     try:
  222         value = uuid.UUID(input_id, version=version)
  223     except Exception:
  224         return False
  225     return str(value) == input_id