"Fossies" - the Fresh Open Source Software Archive

Member "zun-4.0.0/zun/common/context.py" (16 Oct 2019, 7173 Bytes) of package /linux/misc/openstack/zun-4.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "context.py" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 0.2.1_vs_2.1.0.

    1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 # not use this file except in compliance with the License. You may obtain
    3 # a copy of the License at
    4 #
    5 #      http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 # License for the specific language governing permissions and limitations
   11 # under the License.
   12 
   13 import functools
   14 
   15 import copy
   16 from oslo_context import context
   17 from oslo_utils import timeutils
   18 import six
   19 
   20 from zun.common import exception
   21 from zun.common import policy
   22 
   23 
   24 class RequestContext(context.RequestContext):
   25     """Extends security contexts from the OpenStack common library."""
   26 
   27     def __init__(self, auth_token=None, domain_id=None,
   28                  domain_name=None, user_name=None, user_id=None,
   29                  user_domain_name=None, user_domain_id=None,
   30                  project_name=None, project_id=None, roles=None,
   31                  is_admin=None, read_only=False, show_deleted=False,
   32                  request_id=None, trust_id=None, auth_token_info=None,
   33                  all_projects=False, password=None, timestamp=None, **kwargs):
   34         """Stores several additional request parameters:
   35 
   36         :param domain_id: The ID of the domain.
   37         :param domain_name: The name of the domain.
   38         :param user_domain_id: The ID of the domain to
   39                                authenticate a user against.
   40         :param user_domain_name: The name of the domain to
   41                                  authenticate a user against.
   42 
   43         """
   44         super(RequestContext, self).__init__(auth_token=auth_token,
   45                                              user_id=user_name,
   46                                              project_id=project_name,
   47                                              is_admin=is_admin,
   48                                              read_only=read_only,
   49                                              show_deleted=show_deleted,
   50                                              request_id=request_id,
   51                                              roles=roles)
   52 
   53         self.user_name = user_name
   54         self.user_id = user_id
   55         self.project_name = project_name
   56         self.project_id = project_id
   57         self.domain_id = domain_id
   58         self.domain_name = domain_name
   59         self.user_domain_id = user_domain_id
   60         self.user_domain_name = user_domain_name
   61         self.auth_token_info = auth_token_info
   62         self.trust_id = trust_id
   63         self.all_projects = all_projects
   64         self.password = password
   65         if is_admin is None:
   66             self.is_admin = policy.check_is_admin(self)
   67         else:
   68             self.is_admin = is_admin
   69 
   70         if not timestamp:
   71             timestamp = timeutils.utcnow()
   72         if isinstance(timestamp, six.string_types):
   73             timestamp = timeutils.parse_strtime(timestamp)
   74         self.timestamp = timestamp
   75 
   76     def to_dict(self):
   77         value = super(RequestContext, self).to_dict()
   78         value.update({'auth_token': self.auth_token,
   79                       'domain_id': self.domain_id,
   80                       'domain_name': self.domain_name,
   81                       'user_domain_id': self.user_domain_id,
   82                       'user_domain_name': self.user_domain_name,
   83                       'user_name': self.user_name,
   84                       'user_id': self.user_id,
   85                       'project_name': self.project_name,
   86                       'project_id': self.project_id,
   87                       'is_admin': self.is_admin,
   88                       'read_only': self.read_only,
   89                       'roles': self.roles,
   90                       'show_deleted': self.show_deleted,
   91                       'request_id': self.request_id,
   92                       'trust_id': self.trust_id,
   93                       'auth_token_info': self.auth_token_info,
   94                       'password': self.password,
   95                       'all_projects': self.all_projects,
   96                       'timestamp': timeutils.strtime(self.timestamp) if
   97                       hasattr(self, 'timestamp') else None
   98                       })
   99         return value
  100 
  101     def to_policy_values(self):
  102         policy = super(RequestContext, self).to_policy_values()
  103         policy['is_admin'] = self.is_admin
  104         return policy
  105 
  106     @classmethod
  107     def from_dict(cls, values):
  108         return cls(**values)
  109 
  110     def elevated(self):
  111         """Return a version of this context with admin flag set."""
  112         context = copy.copy(self)
  113         # context.roles must be deepcopied to leave original roles
  114         # without changes
  115         context.roles = copy.deepcopy(self.roles)
  116         context.is_admin = True
  117 
  118         if 'admin' not in context.roles:
  119             context.roles.append('admin')
  120 
  121         return context
  122 
  123     def can(self, action, target=None, fatal=True, might_not_exist=False):
  124         """Verifies that the given action is valid on the target in this context.
  125 
  126         :param action: string representing the action to be checked.
  127         :param target: dictionary representing the object of the action
  128             for object creation this should be a dictionary representing the
  129             location of the object e.g. ``{'project_id': context.project_id}``.
  130             If None, then this default target will be considered:
  131             {'project_id': self.project_id, 'user_id': self.user_id}
  132         :param fatal: if False, will return False when an
  133             exception.NotAuthorized occurs.
  134         :param might_not_exist: If True the policy check is skipped (and the
  135             function returns True) if the specified policy does not exist.
  136             Defaults to false.
  137 
  138         :raises zun.common.exception.NotAuthorized: if verification fails and
  139             fatal is True.
  140 
  141         :return: returns a non-False value (not necessarily "True") if
  142             authorized and False if not authorized and fatal is False.
  143         """
  144         if target is None:
  145             target = {'project_id': self.project_id,
  146                       'user_id': self.user_id}
  147 
  148         try:
  149             return policy.authorize(self, action, target,
  150                                     might_not_exist=might_not_exist)
  151         except exception.NotAuthorized:
  152             if fatal:
  153                 raise
  154             return False
  155 
  156 
  157 def make_context(*args, **kwargs):
  158     return RequestContext(*args, **kwargs)
  159 
  160 
  161 def get_admin_context(show_deleted=False, all_projects=False):
  162     """Create an administrator context.
  163 
  164     :param show_deleted: if True, will show deleted items when query db
  165     """
  166     context = RequestContext(user_id=None,
  167                              project=None,
  168                              is_admin=True,
  169                              show_deleted=show_deleted,
  170                              all_projects=all_projects)
  171     return context
  172 
  173 
  174 def set_context(func):
  175     @functools.wraps(func)
  176     def handler(self, ctx):
  177         if ctx is None:
  178             ctx = get_admin_context(all_projects=True)
  179         func(self, ctx)
  180     return handler