"Fossies" - the Fresh Open Source Software Archive

Member "viewvc-1.2.1/lib/vcauth/svnauthz/__init__.py" (26 Mar 2020, 10389 Bytes) of package /linux/misc/viewvc-1.2.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "__init__.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 1.1.28_vs_1.2.1.

    1 # -*-python-*-
    2 #
    3 # Copyright (C) 2006-2020 The ViewCVS Group. All Rights Reserved.
    4 #
    5 # By using this file, you agree to the terms and conditions set forth in
    6 # the LICENSE.html file which can be found at the top level of the ViewVC
    7 # distribution or at http://viewvc.org/license-1.html.
    8 #
    9 # For more information, visit http://viewvc.org/
   10 #
   11 # -----------------------------------------------------------------------
   12 # (c) 2006 Sergey Lapin <slapin@dataart.com>
   13 
   14 import vcauth
   15 import os.path
   16 import debug
   17 
   18 from ConfigParser import ConfigParser
   19 
   20 class ViewVCAuthorizer(vcauth.GenericViewVCAuthorizer):
   21   """Subversion authz authorizer module"""
   22   
   23   def __init__(self, root_lookup_func, username, params={}):
   24     self.rootpaths = { }  # {root -> { paths -> access boolean for USERNAME }}
   25     self.root_lookup_func = root_lookup_func
   26     
   27     # Get the authz file location from exactly one of our related
   28     # passed-in parameters.
   29     self.authz_file = params.get('authzfile')
   30     self.rel_authz_file = params.get('root_relative_authzfile')
   31     if not (self.authz_file or self.rel_authz_file):
   32       raise debug.ViewVCException("No authzfile configured")
   33     if self.authz_file and self.rel_authz_file:
   34       raise debug.ViewVCException("Multiple authzfile locations defined")
   35 
   36     # See if the admin wants us to do case normalization of usernames.
   37     self.force_username_case = params.get('force_username_case')
   38     if self.force_username_case == "upper":
   39       self.username = username and username.upper() or username
   40     elif self.force_username_case == "lower":
   41       self.username = username and username.lower() or username
   42     elif not self.force_username_case:
   43       self.username = username
   44     else:
   45       raise debug.ViewVCException("Invalid value for force_username_case "
   46                                   "option")
   47 
   48   def _get_authz_file(self, rootname):
   49     if self.rel_authz_file:
   50       roottype, rootpath = self.root_lookup_func(rootname)
   51       return os.path.join(rootpath, self.rel_authz_file)
   52     else:
   53       return self.authz_file
   54     
   55   def _get_paths_for_root(self, rootname):
   56     if self.rootpaths.has_key(rootname):
   57       return self.rootpaths[rootname]
   58 
   59     paths_for_root = { }
   60     
   61     # Parse the authz file, replacing ConfigParser's optionxform()
   62     # method with something that won't futz with the case of the
   63     # option names.
   64     cp = ConfigParser()
   65     cp.optionxform = lambda x: x
   66     try:
   67       cp.read(self._get_authz_file(rootname))
   68     except:
   69       raise debug.ViewVCException("Unable to parse configured authzfile file")
   70 
   71     # Ignore context; assume the authz file only has the repository URL's
   72     # basename, just like mod_dav_svn requires it.
   73     # Perhaps it's better to just use the repository directory's basename.
   74     root_parts = rootname.split('/')
   75     repo_name = root_parts[-1]
   76 
   77     # Figure out if there are any aliases for the current username
   78     aliases = []
   79     if cp.has_section('aliases'):
   80       for alias in cp.options('aliases'):
   81         entry = cp.get('aliases', alias)
   82         if entry == self.username:
   83           aliases.append(alias)
   84 
   85     # Figure out which groups USERNAME has a part of.
   86     groups = []
   87     if cp.has_section('groups'):
   88       all_groups = []
   89 
   90       def _process_group(groupname):
   91         """Inline function to handle groups within groups.
   92         
   93         For a group to be within another group in SVN, the group
   94         definitions must be in the correct order in the config file.
   95         ie. If group A is a member of group B then group A must be
   96         defined before group B in the [groups] section.
   97         
   98         Unfortunately, the ConfigParser class provides no way of
   99         finding the order in which groups were defined so, for reasons
  100         of practicality, this function lets you get away with them
  101         being defined in the wrong order.  Recursion is guarded
  102         against though."""
  103         
  104         # If we already know the user is part of this already-
  105         # processed group, return that fact.
  106         if groupname in groups:
  107           return 1
  108         # Otherwise, ensure we don't process a group twice.
  109         if groupname in all_groups:          
  110           return 0
  111         # Store the group name in a global list so it won't be processed again
  112         all_groups.append(groupname)
  113         group_member = 0
  114         groupname = groupname.strip()
  115         entries = cp.get('groups', groupname).split(',')
  116         for entry in entries:
  117           entry = entry.strip()
  118           if entry == self.username:
  119             group_member = 1
  120             break
  121           elif entry[0:1] == "@" and _process_group(entry[1:]):
  122             group_member = 1
  123             break
  124           elif entry[0:1] == "&" and entry[1:] in aliases:
  125             group_member = 1
  126             break
  127         if group_member:
  128           groups.append(groupname)
  129         return group_member
  130       
  131       # Process the groups
  132       for group in cp.options('groups'):
  133         _process_group(group)
  134 
  135     def _userspec_matches_user(userspec):
  136       # If there is an inversion character, recurse and return the
  137       # opposite result.
  138       if userspec[0:1] == '~':
  139         return not _userspec_matches_user(userspec[1:])
  140 
  141       # See if the userspec applies to our current user.
  142       return userspec == '*' \
  143              or userspec == self.username \
  144              or (self.username is not None and userspec == "$authenticated") \
  145              or (self.username is None and userspec == "$anonymous") \
  146              or (userspec[0:1] == "@" and userspec[1:] in groups) \
  147              or (userspec[0:1] == "&" and userspec[1:] in aliases)
  148       
  149     def _process_access_section(section):
  150       """Inline function for determining user access in a single
  151       config secction.  Return a two-tuple (ALLOW, DENY) containing
  152       the access determination for USERNAME in a given authz file
  153       SECTION (if any)."""
  154   
  155       # Figure if this path is explicitly allowed or denied to USERNAME.
  156       allow = deny = 0
  157       for user in cp.options(section):
  158         user = user.strip()
  159         if _userspec_matches_user(user):
  160           # See if the 'r' permission is among the ones granted to
  161           # USER.  If so, we can stop looking.  (Entry order is not
  162           # relevant -- we'll use the most permissive entry, meaning
  163           # one 'allow' is all we need.)
  164           allow = cp.get(section, user).find('r') != -1
  165           deny = not allow
  166           if allow:
  167             break
  168       return allow, deny
  169     
  170     # Read the other (non-"groups") sections, and figure out in which
  171     # repositories USERNAME or his groups have read rights.  We'll
  172     # first check groups that have no specific repository designation,
  173     # then superimpose those that have a repository designation which
  174     # matches the one we're asking about.
  175     root_sections = []
  176     for section in cp.sections():
  177 
  178       # Skip the "groups" section -- we handled that already.
  179       if section == 'groups':
  180         continue
  181       
  182       if section == 'aliases':
  183         continue
  184 
  185       # Process root-agnostic access sections; skip (but remember)
  186       # root-specific ones that match our root; ignore altogether
  187       # root-specific ones that don't match our root.  While we're at
  188       # it, go ahead and figure out the repository path we're talking
  189       # about.
  190       if section.find(':') == -1:
  191         path = section
  192       else:
  193         name, path = section.split(':', 1)
  194         if name == repo_name:
  195           root_sections.append(section)
  196         continue
  197 
  198       # Check for a specific access determination.
  199       allow, deny = _process_access_section(section)
  200           
  201       # If we got an explicit access determination for this path and this
  202       # USERNAME, record it.
  203       if allow or deny:
  204         if path != '/':
  205           path = '/' + '/'.join(filter(None, path.split('/')))
  206         paths_for_root[path] = allow
  207 
  208     # Okay.  Superimpose those root-specific values now.
  209     for section in root_sections:
  210 
  211       # Get the path again.
  212       name, path = section.split(':', 1)
  213       
  214       # Check for a specific access determination.
  215       allow, deny = _process_access_section(section)
  216                 
  217       # If we got an explicit access determination for this path and this
  218       # USERNAME, record it.
  219       if allow or deny:
  220         if path != '/':
  221           path = '/' + '/'.join(filter(None, path.split('/')))
  222         paths_for_root[path] = allow
  223 
  224     # If the root isn't readable, there's no point in caring about all
  225     # the specific paths the user can't see.  Just point the rootname
  226     # to a None paths dictionary.
  227     root_is_readable = 0
  228     for path in paths_for_root.keys():
  229       if paths_for_root[path]:
  230         root_is_readable = 1
  231         break
  232     if not root_is_readable:
  233       paths_for_root = None
  234       
  235     self.rootpaths[rootname] = paths_for_root
  236     return paths_for_root
  237 
  238   def check_root_access(self, rootname):
  239     paths = self._get_paths_for_root(rootname)
  240     return (paths is not None) and 1 or 0
  241   
  242   def check_universal_access(self, rootname):
  243     paths = self._get_paths_for_root(rootname)
  244     if not paths: # None or empty.
  245       return 0
  246 
  247     # Search the access determinations.  If there's a mix, we can't
  248     # claim a universal access determination.
  249     found_allow = 0
  250     found_deny = 0
  251     for access in paths.values():
  252       if access:
  253         found_allow = 1
  254       else:
  255         found_deny = 1
  256       if found_allow and found_deny:
  257         return None
  258 
  259     # We didn't find both allowances and denials, so we must have
  260     # found one or the other.  Denials only is a universal denial.
  261     if found_deny:
  262       return 0
  263 
  264     # ... but allowances only is only a universal allowance if read
  265     # access is granted to the root directory.
  266     if found_allow and paths.has_key('/'):
  267       return 1
  268 
  269     # Anything else is indeterminable.
  270     return None
  271     
  272   def check_path_access(self, rootname, path_parts, pathtype, rev=None):
  273     # Crawl upward from the path represented by PATH_PARTS toward to
  274     # the root of the repository, looking for an explicitly grant or
  275     # denial of access.
  276     paths = self._get_paths_for_root(rootname)
  277     if paths is None:
  278       return 0
  279     parts = path_parts[:]
  280     while parts:
  281       path = '/' + '/'.join(parts)
  282       if paths.has_key(path):
  283         return paths[path]
  284       del parts[-1]
  285     return paths.get('/', 0)