"Fossies" - the Fresh Open Source Software Archive

Member "mailman-3.3.7/src/mailman/rest/addresses.py" (10 Nov 2022, 11841 Bytes) of package /linux/misc/mailman-3.3.7.tar.bz2:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "addresses.py" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 3.3.5_vs_3.3.6.

    1 # Copyright (C) 2011-2022 by the Free Software Foundation, Inc.
    2 #
    3 # This file is part of GNU Mailman.
    4 #
    5 # GNU Mailman is free software: you can redistribute it and/or modify it under
    6 # the terms of the GNU General Public License as published by the Free
    7 # Software Foundation, either version 3 of the License, or (at your option)
    8 # any later version.
    9 #
   10 # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
   11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
   12 # FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
   13 # more details.
   14 #
   15 # You should have received a copy of the GNU General Public License along with
   16 # GNU Mailman.  If not, see <https://www.gnu.org/licenses/>.
   17 
   18 """REST for addresses."""
   19 
   20 from mailman.interfaces.address import (
   21     AddressAlreadyLinkedError,
   22     ExistingAddressError,
   23     InvalidEmailAddressError,
   24 )
   25 from mailman.interfaces.user import UnverifiedAddressError
   26 from mailman.interfaces.usermanager import IUserManager
   27 from mailman.rest.helpers import (
   28     bad_request,
   29     BadRequest,
   30     child,
   31     CollectionMixin,
   32     created,
   33     etag,
   34     no_content,
   35     not_found,
   36     NotFound,
   37     okay,
   38 )
   39 from mailman.rest.members import MemberCollection
   40 from mailman.rest.preferences import Preferences
   41 from mailman.rest.validator import email_validator, Validator
   42 from mailman.utilities.datetime import now
   43 from operator import attrgetter
   44 from public import public
   45 from zope.component import getUtility
   46 
   47 
   48 class _AddressBase(CollectionMixin):
   49     """Shared base class for address representations."""
   50 
   51     def _resource_as_dict(self, address):
   52         """See `CollectionMixin`."""
   53         # The canonical url for an address is its lower-cased version,
   54         # although it can be looked up with either its original or lower-cased
   55         # email address.
   56         representation = dict(
   57             email=address.email,
   58             original_email=address.original_email,
   59             registered_on=address.registered_on,
   60             self_link=self.api.path_to('addresses/{}'.format(address.email)),
   61             )
   62         # Add optional attributes.  These can be None or the empty string.
   63         if address.display_name:
   64             representation['display_name'] = address.display_name
   65         if address.verified_on:
   66             representation['verified_on'] = address.verified_on
   67         if address.user:
   68             uid = self.api.from_uuid(address.user.user_id)
   69             representation['user'] = self.api.path_to('users/{}'.format(uid))
   70         return representation
   71 
   72     def _get_collection(self, request):
   73         """See `CollectionMixin`."""
   74         return sorted(getUtility(IUserManager).addresses,
   75                       key=attrgetter('original_email'))
   76 
   77 
   78 @public
   79 class AllAddresses(_AddressBase):
   80     """The addresses."""
   81 
   82     def on_get(self, request, response):
   83         """/addresses"""
   84         resource = self._make_collection(request)
   85         okay(response, etag(resource))
   86 
   87 
   88 class _VerifyResource:
   89     """A helper resource for verify/unverify POSTS."""
   90 
   91     def __init__(self, address, action):
   92         self._address = address
   93         self._action = action
   94         assert action in ('verify', 'unverify')
   95 
   96     def on_post(self, request, response):
   97         # We don't care about the POST data, just do the action.
   98         if self._action == 'verify' and self._address.verified_on is None:
   99             self._address.verified_on = now()
  100         elif self._action == 'unverify':
  101             self._address.verified_on = None
  102         no_content(response)
  103 
  104 
  105 class PreferredAddress(_AddressBase):
  106     """Preferred address of a user."""
  107 
  108     def __init__(self, user):
  109         """Get or Set a user's preferred address.
  110 
  111         :param user: A valid user.
  112         :type user: IUser
  113         """
  114         self._user = user
  115         self._address = user.preferred_address
  116 
  117     def on_get(self, request, response):
  118         """Return the preferred address."""
  119         if self._address is None:
  120             not_found(response)
  121         else:
  122             okay(response, self._resource_as_json(self._address))
  123 
  124     def on_delete(self, request, response):
  125         if self._address is None:
  126             not_found(response)
  127         else:
  128             del self._user.preferred_address
  129             no_content(response)
  130 
  131     def on_post(self, request, response):
  132         user_manager = getUtility(IUserManager)
  133         validator = Validator(email=email_validator)
  134         try:
  135             data = validator(request)
  136         except ValueError as error:
  137             bad_request(response, str(error))
  138             return
  139         addr = user_manager.get_address(data['email'])
  140         if addr is None:
  141             bad_request(response,
  142                         'Address does not exist: {}'.format(data['email']))
  143             return
  144         try:
  145             self._user.preferred_address = addr
  146             created(response, self._resource_as_dict(addr)['self_link'])
  147         except (UnverifiedAddressError, AddressAlreadyLinkedError) as e:
  148             bad_request(response, str(e))
  149 
  150 
  151 @public
  152 class AnAddress(_AddressBase):
  153     """An address."""
  154 
  155     def __init__(self, email):
  156         """Get an address by either its original or lower-cased email.
  157 
  158         :param email: The email address of the `IAddress`.
  159         :type email: string
  160         """
  161         self._address = getUtility(IUserManager).get_address(email)
  162 
  163     def on_get(self, request, response):
  164         """Return a single address."""
  165         if self._address is None:
  166             not_found(response)
  167         else:
  168             okay(response, self._resource_as_json(self._address))
  169 
  170     def on_patch(self, request, response):
  171         """Patch an existing Address."""
  172         if self._address is None:
  173             not_found(response)
  174         else:
  175             # The only attribute of a address that can be PATCH'd is the
  176             # display_name. To change the verified_on, use the /verify
  177             # endpoint.
  178             validator = Validator(display_name=str)
  179             try:
  180                 data = validator(request)
  181             except ValueError as error:
  182                 bad_request(response, str(error))
  183                 return
  184             display_name = data.pop('display_name')
  185             self._address.display_name = display_name
  186             no_content(response)
  187 
  188     def on_delete(self, request, response):
  189         if self._address is None:
  190             not_found(response)
  191         else:
  192             getUtility(IUserManager).delete_address(self._address)
  193             no_content(response)
  194 
  195     @child()
  196     def memberships(self, context, segments):
  197         """/addresses/<email>/memberships"""
  198         if len(segments) != 0:
  199             return NotFound(), []
  200         if self._address is None:
  201             return NotFound(), []
  202         return AddressMemberships(self._address)
  203 
  204     @child()
  205     def preferences(self, context, segments):
  206         """/addresses/<email>/preferences"""
  207         if len(segments) != 0:
  208             return NotFound(), []
  209         if self._address is None:
  210             return NotFound(), []
  211         child = Preferences(
  212             self._address.preferences,
  213             'addresses/{}'.format(self._address.email))
  214         return child, []
  215 
  216     @child()
  217     def verify(self, context, segments):
  218         """/addresses/<email>/verify"""
  219         if len(segments) != 0:
  220             return BadRequest(), []
  221         if self._address is None:
  222             return NotFound(), []
  223         child = _VerifyResource(self._address, 'verify')
  224         return child, []
  225 
  226     @child()
  227     def unverify(self, context, segments):
  228         """/addresses/<email>/verify"""
  229         if len(segments) != 0:
  230             return BadRequest(), []
  231         if self._address is None:
  232             return NotFound(), []
  233         child = _VerifyResource(self._address, 'unverify')
  234         return child, []
  235 
  236     @child()
  237     def user(self, context, segments):
  238         """/addresses/<email>/user"""
  239         if self._address is None:
  240             return NotFound(), []
  241         # Avoid circular imports.
  242         from mailman.rest.users import AddressUser
  243         return AddressUser(self._address)
  244 
  245 
  246 @public
  247 class UserAddresses(_AddressBase):
  248     """The addresses of a user."""
  249 
  250     def __init__(self, user):
  251         super().__init__()
  252         self._user = user
  253 
  254     def _get_collection(self, request):
  255         """See `CollectionMixin`."""
  256         return sorted(self._user.addresses,
  257                       key=attrgetter('original_email'))
  258 
  259     def on_get(self, request, response):
  260         """/addresses"""
  261         assert self._user is not None
  262         okay(response, etag(self._make_collection(request)))
  263 
  264     def on_post(self, request, response):
  265         """POST to /addresses
  266 
  267         Add a new address to the user record.
  268         """
  269         assert self._user is not None
  270         user_manager = getUtility(IUserManager)
  271         validator = Validator(email=str,
  272                               display_name=str,
  273                               absorb_existing=bool,
  274                               _optional=('display_name', 'absorb_existing'))
  275         try:
  276             data = validator(request)
  277         except ValueError as error:
  278             bad_request(response, str(error))
  279             return
  280         absorb_existing = data.pop('absorb_existing', False)
  281         try:
  282             address = user_manager.create_address(**data)
  283         except InvalidEmailAddressError:
  284             bad_request(response, b'Invalid email address')
  285             return
  286         except ExistingAddressError:
  287             # If the address is not linked to any user, link it to the current
  288             # user and return it.  Since we're linking to an existing address,
  289             # ignore any given display_name attribute.
  290             address = user_manager.get_address(data['email'])
  291             if address.user is None:
  292                 address.user = self._user
  293                 location = self.api.path_to(
  294                     'addresses/{}'.format(address.email))
  295                 created(response, location)
  296                 return
  297             elif not absorb_existing:
  298                 bad_request(response, 'Address belongs to other user')
  299                 return
  300             else:
  301                 # The address exists and is linked but we can merge the users.
  302                 address = user_manager.get_address(data['email'])
  303                 self._user.absorb(address.user)
  304         else:
  305             # Link the address to the current user and return it.
  306             address.user = self._user
  307         location = self.api.path_to('addresses/{}'.format(address.email))
  308         created(response, location)
  309 
  310 
  311 def membership_key(member):
  312     # Sort first by mailing list, then by address, then by role.
  313     return member.list_id, member.address.email, member.role.value
  314 
  315 
  316 @public
  317 class AddressMemberships(MemberCollection):
  318     """All the memberships of a particular email address."""
  319 
  320     def __init__(self, address):
  321         super().__init__()
  322         self._address = address
  323 
  324     def _get_collection(self, request):
  325         """See `CollectionMixin`."""
  326         # XXX Improve this by implementing a .memberships attribute on
  327         # IAddress, similar to the way IUser does it.
  328         #
  329         # Start by getting the IUser that controls this address.  For now, if
  330         # the address is not controlled by a user, return the empty set.
  331         # Later when we address the XXX comment, it will return some
  332         # memberships.  But really, it should not be legal to subscribe an
  333         # address to a mailing list that isn't controlled by a user -- maybe!
  334         user = getUtility(IUserManager).get_user(self._address.email)
  335         if user is None:
  336             return []
  337         return sorted((member for member in user.memberships.members
  338                        if member.address == self._address),
  339                       key=membership_key)