"Fossies" - the Fresh Open Source Software Archive

Member "barbican-12.0.0/barbican/api/controllers/secrets.py" (14 Apr 2021, 18888 Bytes) of package /linux/misc/openstack/barbican-12.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "secrets.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 11.0.0_vs_12.0.0.

    1 #  Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 #  not use this file except in compliance with the License. You may obtain
    3 #  a copy of the License at
    4 #
    5 #       http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 #  Unless required by applicable law or agreed to in writing, software
    8 #  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 #  License for the specific language governing permissions and limitations
   11 #  under the License.
   12 
   13 from oslo_utils import timeutils
   14 import pecan
   15 from six.moves.urllib import parse
   16 
   17 from barbican import api
   18 from barbican.api import controllers
   19 from barbican.api.controllers import acls
   20 from barbican.api.controllers import consumers
   21 from barbican.api.controllers import secretmeta
   22 from barbican.common import accept
   23 from barbican.common import exception
   24 from barbican.common import hrefs
   25 from barbican.common import quota
   26 from barbican.common import resources as res
   27 from barbican.common import utils
   28 from barbican.common import validators
   29 from barbican import i18n as u
   30 from barbican.model import models
   31 from barbican.model import repositories as repo
   32 from barbican.plugin import resources as plugin
   33 from barbican.plugin import util as putil
   34 
   35 
   36 LOG = utils.getLogger(__name__)
   37 
   38 
   39 def _secret_not_found():
   40     """Throw exception indicating secret not found."""
   41     pecan.abort(404, u._('Secret not found.'))
   42 
   43 
   44 def _invalid_secret_id():
   45     """Throw exception indicating secret id is invalid."""
   46     pecan.abort(404, u._('Not Found. Provided secret id is invalid.'))
   47 
   48 
   49 def _secret_payload_not_found():
   50     """Throw exception indicating secret's payload is not found."""
   51     pecan.abort(404, u._('Not Found. Sorry but your secret has no payload.'))
   52 
   53 
   54 def _secret_already_has_data():
   55     """Throw exception that the secret already has data."""
   56     pecan.abort(409, u._("Secret already has data, cannot modify it."))
   57 
   58 
   59 def _bad_query_string_parameters():
   60     pecan.abort(400, u._("URI provided invalid query string parameters."))
   61 
   62 
   63 def _request_has_twsk_but_no_transport_key_id():
   64     """Throw exception for bad wrapping parameters.
   65 
   66     Throw exception if transport key wrapped session key has been provided,
   67     but the transport key id has not.
   68     """
   69     pecan.abort(400, u._('Transport key wrapped session key has been '
   70                          'provided to wrap secrets for retrieval, but the '
   71                          'transport key id has not been provided.'))
   72 
   73 
   74 class SecretController(controllers.ACLMixin):
   75     """Handles Secret retrieval and deletion requests."""
   76 
   77     def __init__(self, secret):
   78         LOG.debug('=== Creating SecretController ===')
   79         self.secret = secret
   80         self.consumers = consumers.SecretConsumersController(secret.id)
   81         self.consumer_repo = repo.get_secret_consumer_repository()
   82         self.transport_key_repo = repo.get_transport_key_repository()
   83 
   84     def get_acl_tuple(self, req, **kwargs):
   85         d = self.get_acl_dict_for_user(req, self.secret.secret_acls)
   86         d['project_id'] = self.secret.project.external_id
   87         d['creator_id'] = self.secret.creator_id
   88         return 'secret', d
   89 
   90     @pecan.expose()
   91     def _lookup(self, sub_resource, *remainder):
   92         if sub_resource == 'acl':
   93             return acls.SecretACLsController(self.secret), remainder
   94         elif sub_resource == 'metadata':
   95             if len(remainder) == 0 or remainder == ('',):
   96                 return secretmeta.SecretMetadataController(self.secret), \
   97                     remainder
   98             else:
   99                 request_method = pecan.request.method
  100                 allowed_methods = ['GET', 'PUT', 'DELETE']
  101 
  102                 if request_method in allowed_methods:
  103                     return secretmeta.SecretMetadatumController(self.secret), \
  104                         remainder
  105                 else:
  106                     # methods cannot be handled at controller level
  107                     pecan.abort(405)
  108         else:
  109             # only 'acl' and 'metadata' as sub-resource is supported
  110             pecan.abort(404)
  111 
  112     @pecan.expose(generic=True)
  113     def index(self, **kwargs):
  114         pecan.abort(405)  # HTTP 405 Method Not Allowed as default
  115 
  116     @index.when(method='GET')
  117     @utils.allow_all_content_types
  118     @controllers.handle_exceptions(u._('Secret retrieval'))
  119     @controllers.enforce_rbac('secret:get')
  120     def on_get(self, external_project_id, **kwargs):
  121         if controllers.is_json_request_accept(pecan.request):
  122             resp = self._on_get_secret_metadata(self.secret, **kwargs)
  123 
  124             LOG.info('Retrieved secret metadata for project: %s',
  125                      external_project_id)
  126             return resp
  127         else:
  128             LOG.warning('Decrypted secret %s requested using deprecated '
  129                         'API call.', self.secret.id)
  130             return self._on_get_secret_payload(self.secret,
  131                                                external_project_id,
  132                                                **kwargs)
  133 
  134     def _on_get_secret_metadata(self, secret, **kwargs):
  135         """GET Metadata-only for a secret."""
  136         pecan.override_template('json', 'application/json')
  137 
  138         secret_fields = putil.mime_types.augment_fields_with_content_types(
  139             secret)
  140 
  141         transport_key_id = self._get_transport_key_id_if_needed(
  142             kwargs.get('transport_key_needed'), secret)
  143 
  144         if transport_key_id:
  145             secret_fields['transport_key_id'] = transport_key_id
  146 
  147         return hrefs.convert_to_hrefs(secret_fields)
  148 
  149     def _get_transport_key_id_if_needed(self, transport_key_needed, secret):
  150         if transport_key_needed and transport_key_needed.lower() == 'true':
  151             return plugin.get_transport_key_id_for_retrieval(secret)
  152         return None
  153 
  154     def _on_get_secret_payload(self, secret, external_project_id, **kwargs):
  155         """GET actual payload containing the secret."""
  156 
  157         # With ACL support, the user token project does not have to be same as
  158         # project associated with secret. The lookup project_id needs to be
  159         # derived from the secret's data considering authorization is already
  160         # done.
  161         external_project_id = secret.project.external_id
  162         project = res.get_or_create_project(external_project_id)
  163 
  164         # default to application/octet-stream if there is no Accept header
  165         if (type(pecan.request.accept) is accept.NoHeaderType or
  166                 not pecan.request.accept.header_value):
  167             accept_header = 'application/octet-stream'
  168         else:
  169             accept_header = pecan.request.accept.header_value
  170         pecan.override_template('', accept_header)
  171 
  172         # check if payload exists before proceeding
  173         if not secret.encrypted_data and not secret.secret_store_metadata:
  174             _secret_payload_not_found()
  175 
  176         twsk = kwargs.get('trans_wrapped_session_key', None)
  177         transport_key = None
  178 
  179         if twsk:
  180             transport_key = self._get_transport_key(
  181                 kwargs.get('transport_key_id', None))
  182 
  183         return plugin.get_secret(accept_header,
  184                                  secret,
  185                                  project,
  186                                  twsk,
  187                                  transport_key)
  188 
  189     def _get_transport_key(self, transport_key_id):
  190         if transport_key_id is None:
  191             _request_has_twsk_but_no_transport_key_id()
  192 
  193         transport_key_model = self.transport_key_repo.get(
  194             entity_id=transport_key_id,
  195             suppress_exception=True)
  196 
  197         return transport_key_model.transport_key
  198 
  199     @pecan.expose()
  200     @utils.allow_all_content_types
  201     @controllers.handle_exceptions(u._('Secret payload retrieval'))
  202     @controllers.enforce_rbac('secret:decrypt')
  203     def payload(self, external_project_id, **kwargs):
  204         if pecan.request.method != 'GET':
  205             pecan.abort(405)
  206 
  207         resp = self._on_get_secret_payload(self.secret,
  208                                            external_project_id,
  209                                            **kwargs)
  210 
  211         LOG.info('Retrieved secret payload for project: %s',
  212                  external_project_id)
  213         return resp
  214 
  215     @index.when(method='PUT')
  216     @utils.allow_all_content_types
  217     @controllers.handle_exceptions(u._('Secret update'))
  218     @controllers.enforce_rbac('secret:put')
  219     @controllers.enforce_content_types(['application/octet-stream',
  220                                        'text/plain'])
  221     def on_put(self, external_project_id, **kwargs):
  222         if (not pecan.request.content_type or
  223                 pecan.request.content_type == 'application/json'):
  224             pecan.abort(
  225                 415,
  226                 u._("Content-Type of '{content_type}' is not supported for "
  227                     "PUT.").format(content_type=pecan.request.content_type)
  228             )
  229 
  230         transport_key_id = kwargs.get('transport_key_id')
  231 
  232         payload = pecan.request.body
  233         if not payload:
  234             raise exception.NoDataToProcess()
  235         if validators.secret_too_big(payload):
  236             raise exception.LimitExceeded()
  237 
  238         if self.secret.encrypted_data or self.secret.secret_store_metadata:
  239             _secret_already_has_data()
  240 
  241         project_model = res.get_or_create_project(external_project_id)
  242         content_type = pecan.request.content_type
  243         content_encoding = pecan.request.headers.get('Content-Encoding')
  244 
  245         plugin.store_secret(
  246             unencrypted_raw=payload,
  247             content_type_raw=content_type,
  248             content_encoding=content_encoding,
  249             secret_model=self.secret,
  250             project_model=project_model,
  251             transport_key_id=transport_key_id)
  252         LOG.info('Updated secret for project: %s', external_project_id)
  253 
  254     @index.when(method='DELETE')
  255     @utils.allow_all_content_types
  256     @controllers.handle_exceptions(u._('Secret deletion'))
  257     @controllers.enforce_rbac('secret:delete')
  258     def on_delete(self, external_project_id, **kwargs):
  259         secret_consumers = self.consumer_repo.get_by_secret_id(
  260             self.secret.id,
  261             suppress_exception=True
  262         )
  263 
  264         # With ACL support, the user token project does not have to be same as
  265         # project associated with secret. The lookup project_id needs to be
  266         # derived from the secret's data considering authorization is already
  267         # done.
  268         external_project_id = self.secret.project.external_id
  269         plugin.delete_secret(self.secret, external_project_id)
  270         LOG.info('Deleted secret for project: %s', external_project_id)
  271 
  272         for consumer in secret_consumers[0]:
  273             try:
  274                 self.consumer_repo.delete_entity_by_id(
  275                     consumer.id, external_project_id)
  276             except exception.NotFound:  # nosec
  277                 pass
  278 
  279 
  280 class SecretsController(controllers.ACLMixin):
  281     """Handles Secret creation requests."""
  282 
  283     def __init__(self):
  284         LOG.debug('Creating SecretsController')
  285         self.validator = validators.NewSecretValidator()
  286         self.secret_repo = repo.get_secret_repository()
  287         self.quota_enforcer = quota.QuotaEnforcer('secrets', self.secret_repo)
  288 
  289     def _is_valid_date_filter(self, date_filter):
  290         filters = date_filter.split(',')
  291         sorted_filters = dict()
  292         try:
  293             for filter in filters:
  294                 if filter.startswith('gt:'):
  295                     if sorted_filters.get('gt') or sorted_filters.get('gte'):
  296                         return False
  297                     sorted_filters['gt'] = timeutils.parse_isotime(filter[3:])
  298                 elif filter.startswith('gte:'):
  299                     if sorted_filters.get('gt') or sorted_filters.get(
  300                             'gte') or sorted_filters.get('eq'):
  301                         return False
  302                     sorted_filters['gte'] = timeutils.parse_isotime(filter[4:])
  303                 elif filter.startswith('lt:'):
  304                     if sorted_filters.get('lt') or sorted_filters.get('lte'):
  305                         return False
  306                     sorted_filters['lt'] = timeutils.parse_isotime(filter[3:])
  307                 elif filter.startswith('lte:'):
  308                     if sorted_filters.get('lt') or sorted_filters.get(
  309                             'lte') or sorted_filters.get('eq'):
  310                         return False
  311                     sorted_filters['lte'] = timeutils.parse_isotime(filter[4:])
  312                 elif sorted_filters.get('eq') or sorted_filters.get(
  313                         'gte') or sorted_filters.get('lte'):
  314                     return False
  315                 else:
  316                     sorted_filters['eq'] = timeutils.parse_isotime(filter)
  317         except ValueError:
  318             return False
  319         return True
  320 
  321     def _is_valid_sorting(self, sorting):
  322         allowed_keys = ['algorithm', 'bit_length', 'created',
  323                         'expiration', 'mode', 'name', 'secret_type', 'status',
  324                         'updated']
  325         allowed_directions = ['asc', 'desc']
  326         sorted_keys = dict()
  327         for sort in sorting.split(','):
  328             if ':' in sort:
  329                 try:
  330                     key, direction = sort.split(':')
  331                 except ValueError:
  332                     return False
  333             else:
  334                 key, direction = sort, 'asc'
  335             if key not in allowed_keys or direction not in allowed_directions:
  336                 return False
  337             if sorted_keys.get(key):
  338                 return False
  339             else:
  340                 sorted_keys[key] = direction
  341         return True
  342 
  343     @pecan.expose()
  344     def _lookup(self, secret_id, *remainder):
  345         # NOTE(jaosorior): It's worth noting that even though this section
  346         # actually does a lookup in the database regardless of the RBAC policy
  347         # check, the execution only gets here if authentication of the user was
  348         # previously successful.
  349 
  350         if not utils.validate_id_is_uuid(secret_id):
  351             _invalid_secret_id()()
  352         secret = self.secret_repo.get_secret_by_id(
  353             entity_id=secret_id, suppress_exception=True)
  354         if not secret:
  355             _secret_not_found()
  356 
  357         return SecretController(secret), remainder
  358 
  359     @pecan.expose(generic=True)
  360     def index(self, **kwargs):
  361         pecan.abort(405)  # HTTP 405 Method Not Allowed as default
  362 
  363     @index.when(method='GET', template='json')
  364     @controllers.handle_exceptions(u._('Secret(s) retrieval'))
  365     @controllers.enforce_rbac('secrets:get')
  366     def on_get(self, external_project_id, **kw):
  367         def secret_fields(field):
  368             return putil.mime_types.augment_fields_with_content_types(field)
  369 
  370         LOG.debug('Start secrets on_get '
  371                   'for project-ID %s:', external_project_id)
  372 
  373         name = kw.get('name', '')
  374         if name:
  375             name = parse.unquote_plus(name)
  376 
  377         bits = kw.get('bits', 0)
  378         try:
  379             bits = int(bits)
  380         except ValueError:
  381             # as per Github issue 171, if bits is invalid then
  382             # the default should be used.
  383             bits = 0
  384 
  385         for date_filter in 'created', 'updated', 'expiration':
  386             if kw.get(date_filter) and not self._is_valid_date_filter(
  387                     kw.get(date_filter)):
  388                 _bad_query_string_parameters()
  389         if kw.get('sort') and not self._is_valid_sorting(kw.get('sort')):
  390             _bad_query_string_parameters()
  391 
  392         ctxt = controllers._get_barbican_context(pecan.request)
  393         user_id = None
  394         if ctxt:
  395             user_id = ctxt.user
  396 
  397         result = self.secret_repo.get_secret_list(
  398             external_project_id,
  399             offset_arg=kw.get('offset', 0),
  400             limit_arg=kw.get('limit'),
  401             name=name,
  402             alg=kw.get('alg'),
  403             mode=kw.get('mode'),
  404             bits=bits,
  405             secret_type=kw.get('secret_type'),
  406             suppress_exception=True,
  407             acl_only=kw.get('acl_only'),
  408             user_id=user_id,
  409             created=kw.get('created'),
  410             updated=kw.get('updated'),
  411             expiration=kw.get('expiration'),
  412             sort=kw.get('sort')
  413         )
  414 
  415         secrets, offset, limit, total = result
  416 
  417         if not secrets:
  418             secrets_resp_overall = {'secrets': [],
  419                                     'total': total}
  420         else:
  421             secrets_resp = [
  422                 hrefs.convert_to_hrefs(secret_fields(s))
  423                 for s in secrets
  424             ]
  425             secrets_resp_overall = hrefs.add_nav_hrefs(
  426                 'secrets', offset, limit, total,
  427                 {'secrets': secrets_resp}
  428             )
  429             secrets_resp_overall.update({'total': total})
  430 
  431         LOG.info('Retrieved secret list for project: %s',
  432                  external_project_id)
  433         return secrets_resp_overall
  434 
  435     @index.when(method='POST', template='json')
  436     @controllers.handle_exceptions(u._('Secret creation'))
  437     @controllers.enforce_rbac('secrets:post')
  438     @controllers.enforce_content_types(['application/json'])
  439     def on_post(self, external_project_id, **kwargs):
  440         LOG.debug('Start on_post for project-ID %s:...',
  441                   external_project_id)
  442 
  443         data = api.load_body(pecan.request, validator=self.validator)
  444         project = res.get_or_create_project(external_project_id)
  445 
  446         self.quota_enforcer.enforce(project)
  447 
  448         transport_key_needed = data.get('transport_key_needed',
  449                                         'false').lower() == 'true'
  450         ctxt = controllers._get_barbican_context(pecan.request)
  451         if ctxt:  # in authenticated pipleline case, always use auth token user
  452             data['creator_id'] = ctxt.user
  453 
  454         secret_model = models.Secret(data)
  455 
  456         new_secret, transport_key_model = plugin.store_secret(
  457             unencrypted_raw=data.get('payload'),
  458             content_type_raw=data.get('payload_content_type',
  459                                       'application/octet-stream'),
  460             content_encoding=data.get('payload_content_encoding'),
  461             secret_model=secret_model,
  462             project_model=project,
  463             transport_key_needed=transport_key_needed,
  464             transport_key_id=data.get('transport_key_id'))
  465 
  466         url = hrefs.convert_secret_to_href(new_secret.id)
  467         LOG.debug('URI to secret is %s', url)
  468 
  469         pecan.response.status = 201
  470         pecan.response.headers['Location'] = url
  471 
  472         LOG.info('Created a secret for project: %s',
  473                  external_project_id)
  474         if transport_key_model is not None:
  475             tkey_url = hrefs.convert_transport_key_to_href(
  476                 transport_key_model.id)
  477             return {'secret_ref': url, 'transport_key_ref': tkey_url}
  478         else:
  479             return {'secret_ref': url}