"Fossies" - the Fresh Open Source Software Archive

Member "roundup-2.0.0/test/rest_common.py" (29 Jun 2020, 150125 Bytes) of package /linux/www/roundup-2.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file.

    1 import pytest
    2 import unittest
    3 import os
    4 import shutil
    5 import errno
    6 import cgi
    7 
    8 from time import sleep
    9 from datetime import datetime, timedelta
   10 
   11 try:
   12     from datetime import timezone
   13     myutc = timezone.utc
   14 except ImportError:
   15     # python 2
   16     from datetime import tzinfo
   17     ZERO = timedelta(0)
   18     class UTC(tzinfo):
   19         """UTC"""
   20         def utcoffset(self, dt):
   21             return ZERO
   22         
   23         def tzname(self, dt):
   24             return "UTC"
   25         
   26         def dst(self, dt):
   27             return ZERO
   28 
   29     myutc = UTC()
   30 
   31 from roundup.cgi.exceptions import *
   32 from roundup.hyperdb import HyperdbValueError
   33 from roundup.exceptions import *
   34 from roundup import password, hyperdb
   35 from roundup.rest import RestfulInstance, calculate_etag
   36 from roundup.backends import list_backends
   37 from roundup.cgi import client
   38 from roundup.anypy.strings import b2s, s2b, us2u
   39 import random
   40 
   41 from roundup.backends.sessions_dbm import OneTimeKeys
   42 from roundup.anypy.dbm_ import anydbm, whichdb
   43 
   44 from .db_test_base import setupTracker
   45 
   46 from .mocknull import MockNull
   47 
   48 from io import BytesIO
   49 import json
   50 
   51 from copy import copy
   52 
   53 try:
   54     import jwt
   55     skip_jwt = lambda func, *args, **kwargs: func
   56 except ImportError:
   57     from .pytest_patcher import mark_class
   58     jwt=None 
   59     skip_jwt = mark_class(pytest.mark.skip(
   60         reason='Skipping JWT tests: jwt library not available'))
   61     
   62 NEEDS_INSTANCE = 1
   63 
   64 
   65 class TestCase():
   66 
   67     backend = None
   68     url_pfx = 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/'
   69 
   70     def setUp(self):
   71         self.dirname = '_test_rest'
   72         # set up and open a tracker
   73         # Set optimize=True as code under test (Client.main()::determine_user)
   74         # will close and re-open the database on user changes. This wipes
   75         # out additions to the schema needed for testing.
   76         self.instance = setupTracker(self.dirname, self.backend, optimize=True)
   77 
   78         # open the database
   79         self.db = self.instance.open('admin')
   80 
   81         # Create the Otk db.
   82         # This allows a test later on to open the existing db and
   83         # set a class attribute to test the open retry loop
   84         # just as though this process was using a pre-existing db
   85         # rather then the new one we create.
   86         otk = OneTimeKeys(self.db)
   87         otk.set('key', key="value")
   88 
   89         # Get user id (user4 maybe). Used later to get data from db.
   90         self.joeid = self.db.user.create(
   91             username='joe',
   92             password=password.Password('random'),
   93             address='random@home.org',
   94             realname='Joe Random',
   95             roles='User'
   96         )
   97 
   98         self.db.user.set('1', address="admin@admin.com")
   99         self.db.user.set('2', address="anon@admin.com")
  100         self.db.commit()
  101         self.db.close()
  102         self.db = self.instance.open('joe')
  103         # Allow joe to retire
  104         p = self.db.security.addPermission(name='Retire', klass='issue')
  105         self.db.security.addPermissionToRole('User', p)
  106 
  107         # add set of roles for testing jwt's.
  108         self.db.security.addRole(name="User:email",
  109                         description="allow email by jwt")
  110         # allow the jwt to access everybody's email addresses.
  111         # this makes it easier to differentiate between User and
  112         # User:email roles by accessing the /rest/data/user
  113         # endpoint
  114         jwt_perms = self.db.security.addPermission(name='View',
  115                        klass='user',
  116                        properties=('id', 'realname', 'address', 'username'),
  117                        description="Allow jwt access to email",
  118                        props_only=False)
  119         self.db.security.addPermissionToRole("User:email", jwt_perms)
  120         self.db.security.addPermissionToRole("User:email", "Rest Access")
  121 
  122         # add set of roles for testing jwt's.
  123         # this is like the user:email role, but it missing access to the rest endpoint.
  124         self.db.security.addRole(name="User:emailnorest",
  125                         description="allow email by jwt")
  126         jwt_perms = self.db.security.addPermission(name='View',
  127                        klass='user',
  128                        properties=('id', 'realname', 'address', 'username'),
  129                        description="Allow jwt access to email but forget to allow rest",
  130                        props_only=False)
  131         self.db.security.addPermissionToRole("User:emailnorest", jwt_perms)
  132 
  133 
  134         if jwt:
  135             # must be 32 chars in length minimum (I think this is at least
  136             # 256 bits of data)
  137 
  138             secret = "TestingTheJwtSecretTestingTheJwtSecret"
  139             self.db.config['WEB_JWT_SECRET'] = secret
  140 
  141             # generate all timestamps in UTC.
  142             base_datetime = datetime(1970,1,1, tzinfo=myutc)
  143 
  144             # A UTC timestamp for now.
  145             dt = datetime.now(myutc)
  146             now_ts = int((dt - base_datetime).total_seconds())
  147 
  148             # one good for a minute
  149             dt = dt + timedelta(seconds=60)
  150             plus1min_ts = int((dt - base_datetime).total_seconds())
  151 
  152             # one that expired a minute ago
  153             dt = dt - timedelta(seconds=120)
  154             expired_ts = int((dt - base_datetime).total_seconds())
  155 
  156             # claims match what cgi/client.py::determine_user
  157             # is looking for
  158             claim= { 'sub': self.db.getuid(),
  159                      'iss': self.db.config.TRACKER_WEB,
  160                      'aud': self.db.config.TRACKER_WEB,
  161                      'roles': [ 'User' ],
  162                      'iat': now_ts,
  163                      'exp': plus1min_ts,
  164             }
  165             
  166             self.jwt = {}
  167             self.claim = {}
  168             # generate invalid claim with expired timestamp
  169             self.claim['expired'] = copy(claim)
  170             self.claim['expired']['exp'] = expired_ts
  171             self.jwt['expired'] = b2s(jwt.encode(self.claim['expired'], secret,
  172                                              algorithm='HS256'))
  173          
  174             # generate valid claim with user role
  175             self.claim['user'] = copy(claim)
  176             self.claim['user']['exp'] = plus1min_ts
  177             self.jwt['user'] = b2s(jwt.encode(self.claim['user'], secret,
  178                                           algorithm='HS256'))
  179             # generate invalid claim bad issuer
  180             self.claim['badiss'] = copy(claim)
  181             self.claim['badiss']['iss'] = "http://someissuer/bugs"
  182             self.jwt['badiss'] = b2s(jwt.encode(self.claim['badiss'], secret,
  183                                           algorithm='HS256'))
  184             # generate invalid claim bad aud(ience)
  185             self.claim['badaud'] = copy(claim)
  186             self.claim['badaud']['aud'] = "http://someaudience/bugs"
  187             self.jwt['badaud'] = b2s(jwt.encode(self.claim['badaud'], secret,
  188                                           algorithm='HS256'))            
  189             # generate invalid claim bad sub(ject)
  190             self.claim['badsub'] = copy(claim)
  191             self.claim['badsub']['sub'] = str("99")
  192             self.jwt['badsub'] = b2s(jwt.encode(self.claim['badsub'], secret,
  193                                           algorithm='HS256'))            
  194             # generate invalid claim bad roles
  195             self.claim['badroles'] = copy(claim)
  196             self.claim['badroles']['roles'] = [ "badrole1", "badrole2" ]
  197             self.jwt['badroles'] = b2s(jwt.encode(self.claim['badroles'], secret,
  198                                           algorithm='HS256'))            
  199             # generate valid claim with limited user:email role
  200             self.claim['user:email'] = copy(claim)
  201             self.claim['user:email']['roles'] = [ "user:email" ]
  202             self.jwt['user:email'] = b2s(jwt.encode(self.claim['user:email'], secret,
  203                                           algorithm='HS256'))
  204 
  205             # generate valid claim with limited user:emailnorest role
  206             self.claim['user:emailnorest'] = copy(claim)
  207             self.claim['user:emailnorest']['roles'] = [ "user:emailnorest" ]
  208             self.jwt['user:emailnorest'] = b2s(jwt.encode(self.claim['user:emailnorest'], secret,
  209                                           algorithm='HS256'))
  210 
  211         self.db.tx_Source = 'web'
  212 
  213         self.db.issue.addprop(tx_Source=hyperdb.String())
  214         self.db.issue.addprop(anint=hyperdb.Integer())
  215         self.db.issue.addprop(afloat=hyperdb.Number())
  216         self.db.issue.addprop(abool=hyperdb.Boolean())
  217         self.db.issue.addprop(requireme=hyperdb.String(required=True))
  218         self.db.msg.addprop(tx_Source=hyperdb.String())
  219 
  220         self.db.post_init()
  221 
  222         thisdir = os.path.dirname(__file__)
  223         vars = {}
  224         with open(os.path.join(thisdir, "tx_Source_detector.py")) as f:
  225             code = compile(f.read(), "tx_Source_detector.py", "exec")
  226             exec(code, vars)
  227         vars['init'](self.db)
  228 
  229         env = {
  230             'PATH_INFO': 'http://localhost/rounduptest/rest/',
  231             'HTTP_HOST': 'localhost',
  232             'TRACKER_NAME': 'rounduptest'
  233         }
  234         self.dummy_client = client.Client(self.instance, MockNull(), env, [], None)
  235         self.dummy_client.request.headers.get = self.get_header
  236         self.empty_form = cgi.FieldStorage()
  237         self.terse_form = cgi.FieldStorage()
  238         self.terse_form.list = [
  239             cgi.MiniFieldStorage('@verbose', '0'),
  240         ]
  241 
  242         self.server = RestfulInstance(self.dummy_client, self.db)
  243 
  244         self.db.Otk = self.db.getOTKManager()
  245 
  246         self.db.config['WEB_SECRET_KEY'] = "XyzzykrnKm45Sd"
  247 
  248     def tearDown(self):
  249         self.db.close()
  250         try:
  251             shutil.rmtree(self.dirname)
  252         except OSError as error:
  253             if error.errno not in (errno.ENOENT, errno.ESRCH):
  254                 raise
  255 
  256     def get_header (self, header, not_found=None):
  257         try:
  258             return self.headers[header.lower()]
  259         except (AttributeError, KeyError, TypeError):
  260             return not_found
  261 
  262     def create_stati(self):
  263         try:
  264             self.db.status.create(name='open', order='9')
  265         except ValueError:
  266             pass
  267         try:
  268             self.db.status.create(name='closed', order='91')
  269         except ValueError:
  270             pass
  271         try:
  272             self.db.priority.create(name='normal')
  273         except ValueError:
  274             pass
  275         try:
  276             self.db.priority.create(name='critical')
  277         except ValueError:
  278             pass
  279 
  280     def create_sampledata(self):
  281         """ Create sample data common to some test cases
  282         """
  283         self.create_stati()
  284         self.db.issue.create(
  285             title='foo1',
  286             status=self.db.status.lookup('open'),
  287             priority=self.db.priority.lookup('normal'),
  288             nosy = [ "1", "2" ]
  289         )
  290         issue_open_norm = self.db.issue.create(
  291             title='foo2',
  292             status=self.db.status.lookup('open'),
  293             priority=self.db.priority.lookup('normal'),
  294             assignedto = "3"
  295         )
  296         issue_open_crit = self.db.issue.create(
  297             title='foo5',
  298             status=self.db.status.lookup('open'),
  299             priority=self.db.priority.lookup('critical')
  300         )
  301 
  302     def testGet(self):
  303         """
  304         Retrieve all three users
  305         obtain data for 'joe'
  306         """
  307         # Retrieve all three users.
  308         results = self.server.get_collection('user', self.empty_form)
  309         self.assertEqual(self.dummy_client.response_code, 200)
  310         self.assertEqual(len(results['data']['collection']), 3)
  311         self.assertEqual(results['data']['@total_size'], 3)
  312         print(self.dummy_client.additional_headers["X-Count-Total"])
  313         self.assertEqual(
  314             self.dummy_client.additional_headers["X-Count-Total"],
  315             "3"
  316         )
  317 
  318         # Obtain data for 'joe'.
  319         results = self.server.get_element('user', self.joeid, self.empty_form)
  320         results = results['data']
  321         self.assertEqual(self.dummy_client.response_code, 200)
  322         self.assertEqual(results['attributes']['username'], 'joe')
  323         self.assertEqual(results['attributes']['realname'], 'Joe Random')
  324 
  325         # Obtain data for 'joe' via username lookup.
  326         results = self.server.get_element('user', 'joe', self.empty_form)
  327         results = results['data']
  328         self.assertEqual(self.dummy_client.response_code, 200)
  329         self.assertEqual(results['attributes']['username'], 'joe')
  330         self.assertEqual(results['attributes']['realname'], 'Joe Random')
  331 
  332         # Obtain data for 'joe' via username lookup (long form).
  333         key = 'username=joe'
  334         results = self.server.get_element('user', key, self.empty_form)
  335         results = results['data']
  336         self.assertEqual(self.dummy_client.response_code, 200)
  337         self.assertEqual(results['attributes']['username'], 'joe')
  338         self.assertEqual(results['attributes']['realname'], 'Joe Random')
  339 
  340         # Obtain data for 'joe'.
  341         results = self.server.get_attribute(
  342             'user', self.joeid, 'username', self.empty_form
  343         )
  344         self.assertEqual(self.dummy_client.response_code, 200)
  345         self.assertEqual(results['data']['data'], 'joe')
  346 
  347     def testGetTransitive(self):
  348         """
  349         Retrieve all issues with an 'o' in status
  350         sort by status.name (not order)
  351         """
  352         base_path = self.db.config['TRACKER_WEB'] + 'rest/data/'
  353         #self.maxDiff=None
  354         self.create_sampledata()
  355         self.db.issue.set('2', status=self.db.status.lookup('closed'))
  356         self.db.issue.set('3', status=self.db.status.lookup('chatting'))
  357         expected={'data':
  358                    {'@total_size': 2,
  359                     'collection': [
  360                       { 'id': '2',
  361                         'link': base_path + 'issue/2',
  362                         'status':
  363                           { 'id': '10',
  364                             'link': base_path + 'status/10'
  365                           }
  366                       },
  367                       { 'id': '1',
  368                         'link': base_path + 'issue/1',
  369                         'status':
  370                           { 'id': '9',
  371                             'link': base_path + 'status/9'
  372                           }
  373                       },
  374                     ]}
  375                  }
  376         form = cgi.FieldStorage()
  377         form.list = [
  378             cgi.MiniFieldStorage('status.name', 'o'),
  379             cgi.MiniFieldStorage('@fields', 'status'),
  380             cgi.MiniFieldStorage('@sort', 'status.name'),
  381         ]
  382         results = self.server.get_collection('issue', form)
  383         self.assertDictEqual(expected, results)
  384 
  385     def testGetExactMatch(self):
  386         """ Retrieve all issues with an exact title
  387         """
  388         base_path = self.db.config['TRACKER_WEB'] + 'rest/data/'
  389         #self.maxDiff=None
  390         self.create_sampledata()
  391         self.db.issue.set('2', title='This is an exact match')
  392         self.db.issue.set('3', title='This is an exact match')
  393         self.db.issue.set('1', title='This is AN exact match')
  394         expected={'data':
  395                    {'@total_size': 2,
  396                     'collection': [
  397                       { 'id': '2',
  398                         'link': base_path + 'issue/2',
  399                       },
  400                       { 'id': '3',
  401                         'link': base_path + 'issue/3',
  402                       },
  403                     ]}
  404                  }
  405         form = cgi.FieldStorage()
  406         form.list = [
  407             cgi.MiniFieldStorage('title:', 'This is an exact match'),
  408             cgi.MiniFieldStorage('@sort', 'status.name'),
  409         ]
  410         results = self.server.get_collection('issue', form)
  411         self.assertDictEqual(expected, results)
  412 
  413     def testOutputFormat(self):
  414         """ test of @fields and @verbose implementation """
  415 
  416         self.maxDiff = 4000
  417         self.create_sampledata()
  418         base_path = self.db.config['TRACKER_WEB'] + 'rest/data/issue/'
  419 
  420 
  421         # Check formating for issues status=open; @fields and verbose tests
  422         form = cgi.FieldStorage()
  423         form.list = [
  424             cgi.MiniFieldStorage('status', 'open'),
  425             cgi.MiniFieldStorage('@fields', 'nosy,status,creator'),
  426             cgi.MiniFieldStorage('@verbose', '2')
  427         ]
  428 
  429         expected={'data':
  430                    {'@total_size': 3,
  431                     'collection': [ {
  432                          'creator': {'id': '3',
  433                                      'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/user/3',
  434                                      'username': 'joe'},
  435                         'status': {'id': '9',
  436                                     'name': 'open',
  437                                     'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/status/9'},
  438                          'id': '1',
  439                          'nosy': [
  440                              {'username': 'admin',
  441                               'id': '1',
  442                               'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/user/1'},
  443                              {'username': 'anonymous',
  444                               'id': '2',
  445                               'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/user/2'}
  446                          ],
  447                          'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue/1',
  448                          'title': 'foo1' },
  449                         { 'creator': {'id': '3',
  450                                       'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/user/3',
  451                                       'username': 'joe'},
  452                         'status': {
  453                             'id': '9',
  454                             'name': 'open',
  455                             'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/status/9' },
  456                          'id': '2',
  457                          'nosy': [
  458                              {'username': 'joe',
  459                               'id': '3',
  460                               'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/user/3'}
  461                          ],
  462                          'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue/2',
  463                          'title': 'foo2'},
  464                         {'creator': {'id': '3',
  465                                      'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/user/3',
  466                                      'username': 'joe'},
  467                             'status': {
  468                             'id': '9',
  469                             'name': 'open',
  470                             'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/status/9'},
  471                          'id': '3',
  472                          'nosy': [],
  473                          'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue/3',
  474                          'title': 'foo5'}
  475                     ]}}
  476 
  477         results = self.server.get_collection('issue', form)
  478         self.assertDictEqual(expected, results)
  479 
  480         # Check formating for issues status=open; @fields and verbose tests
  481         form = cgi.FieldStorage()
  482         form.list = [
  483             cgi.MiniFieldStorage('status', 'open')
  484             # default cgi.MiniFieldStorage('@verbose', '1')
  485         ]
  486 
  487         expected={'data':
  488                    {'@total_size': 3,
  489                     'collection': [
  490                         {'id': '1',
  491                          'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue/1',},
  492                         { 'id': '2',
  493                          
  494                          'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue/2'},
  495                         {'id': '3',
  496                          'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue/3'} ]}}
  497                     
  498 
  499         results = self.server.get_collection('issue', form)
  500         self.assertDictEqual(expected, results)
  501 
  502         # Generate failure case, unknown field.
  503         form = cgi.FieldStorage()
  504         form.list = [
  505             cgi.MiniFieldStorage('status', 'open'),
  506             cgi.MiniFieldStorage('@fields', 'title,foo')
  507         ]
  508 
  509         expected={'error': {
  510             'msg': UsageError("Failed to find property 'foo' "
  511                               "for class issue.",),
  512             'status': 400}}
  513 
  514         results = self.server.get_collection('issue', form)
  515         # I tried assertDictEqual but seems it can't handle
  516         # the exception value of 'msg'. So I am using repr to check.
  517         self.assertEqual(repr(sorted(expected['error'])),
  518                          repr(sorted(results['error']))
  519         )
  520 
  521         # Check formating for issues status=open; @fields and verbose tests
  522         form = cgi.FieldStorage()
  523         form.list = [
  524             cgi.MiniFieldStorage('status', 'open'),
  525             cgi.MiniFieldStorage('@fields', 'nosy,status,assignedto'),
  526             cgi.MiniFieldStorage('@verbose', '0')
  527         ]
  528 
  529         expected={'data': {
  530             '@total_size': 3,
  531             'collection': [
  532                 {'assignedto': None,
  533                  'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue/1',
  534                  'status': '9',
  535                  'nosy': ['1', '2'],
  536                  'id': '1'},
  537                 {'assignedto': '3',
  538                  'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue/2',
  539                  'status': '9',
  540                  'nosy': ['3'],
  541                  'id': '2'},
  542                 {'assignedto': None,
  543                  'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue/3',
  544                  'status': '9',
  545                  'nosy': [],
  546                  'id': '3'}]}}
  547 
  548         results = self.server.get_collection('issue', form)
  549         print(results)
  550         self.assertDictEqual(expected, results)
  551 
  552         # check users
  553         form = cgi.FieldStorage()
  554         form.list = [
  555             cgi.MiniFieldStorage('@fields', 'username,queries,password'),
  556             cgi.MiniFieldStorage('@verbose', '0')
  557         ]
  558         # note this is done as user joe, so we only get queries
  559         # and password for joe.
  560         expected = {'data': {'collection': [
  561             {'id': '1',
  562              'username': 'admin',
  563              'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/user/1'},
  564             {'id': '2',
  565              'username': 'anonymous',
  566              'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/user/2'},
  567             {'password': '[password hidden scheme PBKDF2]',
  568              'id': '3',
  569              'queries': [],
  570              'username': 'joe',
  571              'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/user/3'}],
  572                 '@total_size': 3}}
  573 
  574         results = self.server.get_collection('user', form)
  575         self.assertDictEqual(expected, results)
  576 
  577         ## Start testing get_element
  578         form = cgi.FieldStorage()
  579         form.list = [
  580             cgi.MiniFieldStorage('@fields', 'queries,password,creator'),
  581             cgi.MiniFieldStorage('@verbose', '2')
  582         ]
  583         expected = {'data': {
  584             'id': '3',
  585             'type': 'user',
  586             '@etag': '',
  587             'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/user/3',
  588             'attributes': {
  589                 'creator': {'id': '1',
  590                             'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/user/1',
  591                             'username': 'admin'},
  592                 'password': '[password hidden scheme PBKDF2]',
  593                 'queries': [],
  594                 'username': 'joe'
  595             }
  596         }}
  597 
  598         results = self.server.get_element('user', self.joeid, form)
  599         results['data']['@etag'] = '' # etag depends on date, set to empty
  600         self.assertDictEqual(expected,results)
  601 
  602         form = cgi.FieldStorage()
  603         form.list = [
  604             cgi.MiniFieldStorage('@fields', 'status:priority'),
  605             cgi.MiniFieldStorage('@verbose', '1')
  606         ]
  607         expected = {'data': {
  608             'type': 'issue',
  609             'id': '3',
  610             'attributes': {
  611                 'status': {
  612                     'id': '9', 
  613                     'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/status/9'},
  614                 'priority': {
  615                     'id': '1',
  616                     'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/priority/1'}},
  617             '@etag': '',
  618             'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue/3'}}
  619 
  620         results = self.server.get_element('issue', "3", form)
  621         results['data']['@etag'] = '' # etag depends on date, set to empty
  622         self.assertDictEqual(expected,results)
  623 
  624         form = cgi.FieldStorage()
  625         form.list = [
  626             cgi.MiniFieldStorage('@fields', 'status,priority'),
  627             cgi.MiniFieldStorage('@verbose', '0')
  628         ]
  629         expected = {'data': {
  630             'type': 'issue',
  631             'id': '3',
  632             'attributes': {
  633                 'status': '9', 
  634                 'priority': '1'},
  635             '@etag': '',
  636             'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue/3'}}
  637 
  638         results = self.server.get_element('issue', "3", form)
  639         results['data']['@etag'] = '' # etag depends on date, set to empty
  640         self.assertDictEqual(expected,results)
  641 
  642     def testSorting(self):
  643         self.maxDiff = 4000
  644         self.create_sampledata()
  645         self.db.issue.set('1', status='7')
  646         self.db.issue.set('2', status='2')
  647         self.db.issue.set('3', status='2')
  648         self.db.commit()
  649         base_path = self.db.config['TRACKER_WEB'] + 'rest/data/issue/'
  650         # change some data for sorting on later
  651         form = cgi.FieldStorage()
  652         form.list = [
  653             cgi.MiniFieldStorage('@fields', 'status'),
  654             cgi.MiniFieldStorage('@sort', 'status,-id'),
  655             cgi.MiniFieldStorage('@verbose', '0')
  656         ]
  657 
  658         # status is sorted by orderprop (property 'order')
  659         # which provides the same ordering as the status ID
  660         expected={'data': {
  661             '@total_size': 3,
  662             'collection': [
  663                 {'link': base_path + '3', 'status': '2', 'id': '3'},
  664                 {'link': base_path + '2', 'status': '2', 'id': '2'},
  665                 {'link': base_path + '1', 'status': '7', 'id': '1'}]}}
  666 
  667         results = self.server.get_collection('issue', form)
  668         self.assertDictEqual(expected, results)
  669 
  670     def testTransitiveField(self):
  671         """ Test a transitive property in @fields """
  672         base_path = self.db.config['TRACKER_WEB'] + 'rest/data/'
  673         # create sample data
  674         self.create_stati()
  675         self.db.issue.create(
  676             title='foo4',
  677             status=self.db.status.lookup('closed'),
  678             priority=self.db.priority.lookup('critical')
  679         )
  680         # Retrieve all issue @fields=status.name
  681         form = cgi.FieldStorage()
  682         form.list = [
  683             cgi.MiniFieldStorage('@fields', 'status.name')
  684         ]
  685         results = self.server.get_collection('issue', form)
  686         self.assertEqual(self.dummy_client.response_code, 200)
  687 
  688         exp = [
  689             {'link': base_path + 'issue/1', 'id': '1', 'status.name': 'closed'}]
  690         self.assertEqual(results['data']['collection'], exp)
  691 
  692     def testFilter(self):
  693         """
  694         Retrieve all three users
  695         obtain data for 'joe'
  696         """
  697         # create sample data
  698         self.create_stati()
  699         self.db.issue.create(
  700             title='foo4',
  701             status=self.db.status.lookup('closed'),
  702             priority=self.db.priority.lookup('critical')
  703         )
  704         self.db.issue.create(
  705             title='foo1',
  706             status=self.db.status.lookup('open'),
  707             priority=self.db.priority.lookup('normal')
  708         )
  709         issue_open_norm = self.db.issue.create(
  710             title='foo2 normal',
  711             status=self.db.status.lookup('open'),
  712             priority=self.db.priority.lookup('normal')
  713         )
  714         issue_closed_norm = self.db.issue.create(
  715             title='foo3 closed normal',
  716             status=self.db.status.lookup('closed'),
  717             priority=self.db.priority.lookup('normal')
  718         )
  719         issue_closed_crit = self.db.issue.create(
  720             title='foo4 closed',
  721             status=self.db.status.lookup('closed'),
  722             priority=self.db.priority.lookup('critical')
  723         )
  724         issue_open_crit = self.db.issue.create(
  725             title='foo5',
  726             status=self.db.status.lookup('open'),
  727             priority=self.db.priority.lookup('critical')
  728         )
  729         base_path = self.db.config['TRACKER_WEB'] + 'rest/data/issue/'
  730 
  731         # Retrieve all issue status=open
  732         form = cgi.FieldStorage()
  733         form.list = [
  734             cgi.MiniFieldStorage('status', 'open')
  735         ]
  736         results = self.server.get_collection('issue', form)
  737         self.assertEqual(self.dummy_client.response_code, 200)
  738         self.assertIn(get_obj(base_path, issue_open_norm),
  739                       results['data']['collection'])
  740         self.assertIn(get_obj(base_path, issue_open_crit),
  741                       results['data']['collection'])
  742         self.assertNotIn(
  743             get_obj(base_path, issue_closed_norm),
  744             results['data']['collection']
  745         )
  746 
  747         # Retrieve all issue status=closed and priority=critical
  748         form = cgi.FieldStorage()
  749         form.list = [
  750             cgi.MiniFieldStorage('status', 'closed'),
  751             cgi.MiniFieldStorage('priority', 'critical')
  752         ]
  753         results = self.server.get_collection('issue', form)
  754         self.assertEqual(self.dummy_client.response_code, 200)
  755         self.assertIn(get_obj(base_path, issue_closed_crit),
  756                       results['data']['collection'])
  757         self.assertNotIn(get_obj(base_path, issue_open_crit),
  758                          results['data']['collection'])
  759         self.assertNotIn(
  760             get_obj(base_path, issue_closed_norm),
  761             results['data']['collection']
  762         )
  763 
  764         # Retrieve all issue status=closed and priority=normal,critical
  765         form = cgi.FieldStorage()
  766         form.list = [
  767             cgi.MiniFieldStorage('status', 'closed'),
  768             cgi.MiniFieldStorage('priority', 'normal,critical')
  769         ]
  770         results = self.server.get_collection('issue', form)
  771         self.assertEqual(self.dummy_client.response_code, 200)
  772         self.assertIn(get_obj(base_path, issue_closed_crit),
  773                       results['data']['collection'])
  774         self.assertIn(get_obj(base_path, issue_closed_norm),
  775                       results['data']['collection'])
  776         self.assertNotIn(get_obj(base_path, issue_open_crit),
  777                          results['data']['collection'])
  778         self.assertNotIn(get_obj(base_path, issue_open_norm),
  779                          results['data']['collection'])
  780 
  781         # Retrieve all issue status=closed and priority=normal,critical
  782         # using duplicate priority key's.
  783         form = cgi.FieldStorage()
  784         form.list = [
  785             cgi.MiniFieldStorage('status', 'closed'),
  786             cgi.MiniFieldStorage('priority', 'normal'),
  787             cgi.MiniFieldStorage('priority', 'critical')
  788         ]
  789         results = self.server.get_collection('issue', form)
  790         self.assertEqual(self.dummy_client.response_code, 200)
  791         self.assertIn(get_obj(base_path, issue_closed_crit),
  792                       results['data']['collection'])
  793         self.assertIn(get_obj(base_path, issue_closed_norm),
  794                       results['data']['collection'])
  795         self.assertNotIn(get_obj(base_path, issue_open_crit),
  796                          results['data']['collection'])
  797         self.assertNotIn(get_obj(base_path, issue_open_norm),
  798                          results['data']['collection'])
  799 
  800         # Retrieve all issues with title containing
  801         # closed, normal and 3 using duplicate title filterkeys
  802         form = cgi.FieldStorage()
  803         form.list = [
  804             cgi.MiniFieldStorage('title', 'closed'),
  805             cgi.MiniFieldStorage('title', 'normal'),
  806             cgi.MiniFieldStorage('title', '3')
  807         ]
  808         results = self.server.get_collection('issue', form)
  809         self.assertEqual(self.dummy_client.response_code, 200)
  810         self.assertNotIn(get_obj(base_path, issue_closed_crit),
  811                       results['data']['collection'])
  812         self.assertIn(get_obj(base_path, issue_closed_norm),
  813                       results['data']['collection'])
  814         self.assertNotIn(get_obj(base_path, issue_open_crit),
  815                          results['data']['collection'])
  816         self.assertNotIn(get_obj(base_path, issue_open_norm),
  817                          results['data']['collection'])
  818         self.assertEqual(len(results['data']['collection']), 1)
  819 
  820         # Retrieve all issues (no hits) with title containing
  821         # closed, normal and foo3 in this order using title filter
  822         form = cgi.FieldStorage()
  823         form.list = [
  824             cgi.MiniFieldStorage('title', 'closed normal foo3')
  825         ]
  826         results = self.server.get_collection('issue', form)
  827         self.assertEqual(self.dummy_client.response_code, 200)
  828         self.assertNotIn(get_obj(base_path, issue_closed_crit),
  829                       results['data']['collection'])
  830         self.assertNotIn(get_obj(base_path, issue_closed_norm),
  831                       results['data']['collection'])
  832         self.assertNotIn(get_obj(base_path, issue_open_crit),
  833                          results['data']['collection'])
  834         self.assertNotIn(get_obj(base_path, issue_open_norm),
  835                          results['data']['collection'])
  836         self.assertEqual(len(results['data']['collection']), 0)
  837 
  838         # Retrieve all issues with title containing
  839         # foo3, closed and normal in this order using title filter
  840         form = cgi.FieldStorage()
  841         form.list = [
  842             cgi.MiniFieldStorage('title', 'foo3 closed normal')
  843         ]
  844         results = self.server.get_collection('issue', form)
  845         self.assertEqual(self.dummy_client.response_code, 200)
  846         self.assertNotIn(get_obj(base_path, issue_closed_crit),
  847                       results['data']['collection'])
  848         self.assertIn(get_obj(base_path, issue_closed_norm),
  849                       results['data']['collection'])
  850         self.assertNotIn(get_obj(base_path, issue_open_crit),
  851                          results['data']['collection'])
  852         self.assertNotIn(get_obj(base_path, issue_open_norm),
  853                          results['data']['collection'])
  854         self.assertEqual(len(results['data']['collection']), 1)
  855 
  856         # Retrieve all issues with word closed in title
  857         form = cgi.FieldStorage()
  858         form.list = [
  859             cgi.MiniFieldStorage('title', 'closed'),
  860         ]
  861         results = self.server.get_collection('issue', form)
  862         self.assertEqual(self.dummy_client.response_code, 200)
  863         self.assertIn(get_obj(base_path, issue_closed_crit),
  864                       results['data']['collection'])
  865         self.assertIn(get_obj(base_path, issue_closed_norm),
  866                       results['data']['collection'])
  867         self.assertNotIn(get_obj(base_path, issue_open_crit),
  868                          results['data']['collection'])
  869         self.assertNotIn(get_obj(base_path, issue_open_norm),
  870                          results['data']['collection'])
  871         self.assertEqual(len(results['data']['collection']), 2)
  872 
  873     def testPagination(self):
  874         """
  875         Test pagination. page_size is required and is an integer
  876         starting at 1. page_index is optional and is an integer
  877         starting at 1. Verify that pagination links are present
  878         if paging, @total_size and X-Count-Total header match
  879         number of items.        
  880         """
  881         # create sample data
  882         for i in range(0, random.randint(8,15)):
  883             self.db.issue.create(title='foo' + str(i))
  884 
  885         # Retrieving all the issues
  886         results = self.server.get_collection('issue', self.empty_form)
  887         self.assertEqual(self.dummy_client.response_code, 200)
  888         total_length = len(results['data']['collection'])
  889         # Verify no pagination links if paging not used
  890         self.assertFalse('@links' in results['data'])
  891         self.assertEqual(results['data']['@total_size'], total_length)
  892         self.assertEqual(
  893             self.dummy_client.additional_headers["X-Count-Total"],
  894             str(total_length)
  895         )
  896 
  897 
  898         # Pagination will be 45% of the total result
  899         # So 2 full pages and 1 partial page.
  900         page_size = total_length * 45 // 100
  901         page_one_expected = page_size
  902         page_two_expected = page_size
  903         page_three_expected = total_length - (2*page_one_expected)
  904         base_url="http://tracker.example/cgi-bin/roundup.cgi/" \
  905                  "bugs/rest/data/issue"
  906 
  907         # Retrieve page 1
  908         form = cgi.FieldStorage()
  909         form.list = [
  910             cgi.MiniFieldStorage('@page_size', page_size),
  911             cgi.MiniFieldStorage('@page_index', 1)
  912         ]
  913         results = self.server.get_collection('issue', form)
  914         self.assertEqual(self.dummy_client.response_code, 200)
  915         self.assertEqual(len(results['data']['collection']),
  916                          page_one_expected)
  917         self.assertTrue('@links' in results['data'])
  918         self.assertTrue('self' in results['data']['@links'])
  919         self.assertTrue('next' in results['data']['@links'])
  920         self.assertFalse('prev' in results['data']['@links'])
  921         self.assertEqual(results['data']['@links']['self'][0]['uri'],
  922                          "%s?@page_index=1&@page_size=%s"%(base_url,page_size))
  923         self.assertEqual(results['data']['@links']['next'][0]['uri'],
  924                          "%s?@page_index=2&@page_size=%s"%(base_url,page_size))
  925 
  926         page_one_results = results # save this for later
  927 
  928         # Retrieve page 2
  929         form = cgi.FieldStorage()
  930         form.list = [
  931             cgi.MiniFieldStorage('@page_size', page_size),
  932             cgi.MiniFieldStorage('@page_index', 2)
  933         ]
  934         results = self.server.get_collection('issue', form)
  935         self.assertEqual(self.dummy_client.response_code, 200)
  936         self.assertEqual(len(results['data']['collection']), page_two_expected)
  937         self.assertTrue('@links' in results['data'])
  938         self.assertTrue('self' in results['data']['@links'])
  939         self.assertTrue('next' in results['data']['@links'])
  940         self.assertTrue('prev' in results['data']['@links'])
  941         self.assertEqual(results['data']['@links']['self'][0]['uri'],
  942                          "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue?@page_index=2&@page_size=%s"%page_size)
  943         self.assertEqual(results['data']['@links']['next'][0]['uri'],
  944                          "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue?@page_index=3&@page_size=%s"%page_size)
  945         self.assertEqual(results['data']['@links']['prev'][0]['uri'],
  946                          "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue?@page_index=1&@page_size=%s"%page_size)
  947         self.assertEqual(results['data']['@links']['self'][0]['rel'],
  948                          'self')
  949         self.assertEqual(results['data']['@links']['next'][0]['rel'],
  950                          'next')
  951         self.assertEqual(results['data']['@links']['prev'][0]['rel'],
  952                          'prev')
  953 
  954         # Retrieve page 3
  955         form = cgi.FieldStorage()
  956         form.list = [
  957             cgi.MiniFieldStorage('@page_size', page_size),
  958             cgi.MiniFieldStorage('@page_index', 3)
  959         ]
  960         results = self.server.get_collection('issue', form)
  961         self.assertEqual(self.dummy_client.response_code, 200)
  962         self.assertEqual(len(results['data']['collection']), page_three_expected)
  963         self.assertTrue('@links' in results['data'])
  964         self.assertTrue('self' in results['data']['@links'])
  965         self.assertFalse('next' in results['data']['@links'])
  966         self.assertTrue('prev' in results['data']['@links'])
  967         self.assertEqual(results['data']['@links']['self'][0]['uri'],
  968                          "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue?@page_index=3&@page_size=%s"%page_size)
  969         self.assertEqual(results['data']['@links']['prev'][0]['uri'],
  970                          "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue?@page_index=2&@page_size=%s"%page_size)
  971 
  972         # Verify that page_index is optional
  973         # Should start at page 1
  974         form = cgi.FieldStorage()
  975         form.list = [
  976             cgi.MiniFieldStorage('@page_size', page_size),
  977         ]
  978         results = self.server.get_collection('issue', form)
  979         self.assertEqual(self.dummy_client.response_code, 200)
  980         self.assertEqual(len(results['data']['collection']), page_size)
  981         self.assertTrue('@links' in results['data'])
  982         self.assertTrue('self' in results['data']['@links'])
  983         self.assertTrue('next' in results['data']['@links'])
  984         self.assertFalse('prev' in results['data']['@links'])
  985         self.assertEqual(page_one_results, results)
  986 
  987         # FIXME add tests for out of range once we decide what response
  988         # is needed to:
  989         #   page_size < 0
  990         #   page_index < 0
  991 
  992     def testRestRateLimit(self):
  993 
  994         self.db.config['WEB_API_CALLS_PER_INTERVAL'] = 20
  995         self.db.config['WEB_API_INTERVAL_IN_SEC'] = 60
  996 
  997         # Otk code never passes through the
  998         # retry loop. Not sure why but I can force it
  999         # through the loop by setting the internal _db_type
 1000         # setting once the db is created by the previous command.
 1001         try:
 1002             self.db.Otk._db_type = whichdb("%s/%s"%(self.db.Otk.dir, self.db.Otk.name))
 1003         except AttributeError:
 1004             # if dir attribute doesn't exist the primary db is not
 1005             # sqlite or anydbm. So don't need to exercise code.
 1006             pass
 1007         
 1008         print("Now realtime start:", datetime.utcnow())
 1009         # don't set an accept header; json should be the default
 1010         # use up all our allowed api calls
 1011         for i in range(20):
 1012             # i is 0 ... 19
 1013             self.client_error_message = []
 1014             results = self.server.dispatch('GET',
 1015                             "/rest/data/user/%s/realname"%self.joeid,
 1016                             self.empty_form)
 1017  
 1018             # is successful
 1019             self.assertEqual(self.server.client.response_code, 200)
 1020             # does not have Retry-After header as we have
 1021             # suceeded with this query
 1022             self.assertFalse("Retry-After" in
 1023                              self.server.client.additional_headers) 
 1024             # remaining count is correct
 1025             self.assertEqual(
 1026                 self.server.client.additional_headers["X-RateLimit-Remaining"],
 1027                 str(self.db.config['WEB_API_CALLS_PER_INTERVAL'] -1 - i)
 1028                 )
 1029 
 1030         # trip limit
 1031         self.server.client.additional_headers.clear()
 1032         results = self.server.dispatch('GET',
 1033                      "/rest/data/user/%s/realname"%self.joeid,
 1034                             self.empty_form)
 1035         print(results)
 1036         self.assertEqual(self.server.client.response_code, 429)
 1037 
 1038         self.assertEqual(
 1039             self.server.client.additional_headers["X-RateLimit-Limit"],
 1040             str(self.db.config['WEB_API_CALLS_PER_INTERVAL']))
 1041         self.assertEqual(
 1042             self.server.client.additional_headers["X-RateLimit-Limit-Period"],
 1043             str(self.db.config['WEB_API_INTERVAL_IN_SEC']))
 1044         self.assertEqual(
 1045             self.server.client.additional_headers["X-RateLimit-Remaining"],
 1046             '0')
 1047         # value will be almost 60. Allow 1-2 seconds for all 20 rounds.
 1048         self.assertAlmostEqual(
 1049             float(self.server.client.additional_headers["X-RateLimit-Reset"]),
 1050             59, delta=1)
 1051         self.assertEqual(
 1052             str(self.server.client.additional_headers["Retry-After"]),
 1053             "3")  # check as string
 1054 
 1055         print("Reset:", self.server.client.additional_headers["X-RateLimit-Reset"])
 1056         print("Now realtime pre-sleep:", datetime.utcnow())
 1057         sleep(3.1) # sleep as requested so we can do another login
 1058         print("Now realtime post-sleep:", datetime.utcnow())
 1059 
 1060         # this should succeed
 1061         self.server.client.additional_headers.clear()
 1062         results = self.server.dispatch('GET',
 1063                      "/rest/data/user/%s/realname"%self.joeid,
 1064                             self.empty_form)
 1065         print(results)
 1066         print("Reset:", self.server.client.additional_headers["X-RateLimit-Reset-date"])
 1067         print("Now realtime:", datetime.utcnow())
 1068         print("Now ts header:", self.server.client.additional_headers["Now"])
 1069         print("Now date header:", self.server.client.additional_headers["Now-date"])
 1070 
 1071         self.assertEqual(self.server.client.response_code, 200)
 1072 
 1073         self.assertEqual(
 1074             self.server.client.additional_headers["X-RateLimit-Limit"],
 1075             str(self.db.config['WEB_API_CALLS_PER_INTERVAL']))
 1076         self.assertEqual(
 1077             self.server.client.additional_headers["X-RateLimit-Limit-Period"],
 1078             str(self.db.config['WEB_API_INTERVAL_IN_SEC']))
 1079         self.assertEqual(
 1080             self.server.client.additional_headers["X-RateLimit-Remaining"],
 1081             '0')
 1082         self.assertFalse("Retry-After" in
 1083                          self.server.client.additional_headers) 
 1084         # we still need to wait a minute for everything to clear
 1085         self.assertAlmostEqual(
 1086             float(self.server.client.additional_headers["X-RateLimit-Reset"]),
 1087             59, delta=1)
 1088 
 1089         # and make sure we need to wait another three seconds
 1090         # as we consumed the last api call
 1091         results = self.server.dispatch('GET',
 1092                      "/rest/data/user/%s/realname"%self.joeid,
 1093                             self.empty_form)
 1094 
 1095         self.assertEqual(self.server.client.response_code, 429)
 1096         self.assertEqual(
 1097             str(self.server.client.additional_headers["Retry-After"]),
 1098             "3")  # check as string
 1099 
 1100         json_dict = json.loads(b2s(results))
 1101         self.assertEqual(json_dict['error']['msg'],
 1102                          "Api rate limits exceeded. Please wait: 3 seconds.")
 1103 
 1104         # reset rest params
 1105         self.db.config['WEB_API_CALLS_PER_INTERVAL'] = 0
 1106         self.db.config['WEB_API_INTERVAL_IN_SEC'] = 3600
 1107             
 1108     def testEtagGeneration(self):
 1109         ''' Make sure etag generation is stable
 1110         
 1111             This mocks date.Date() when creating the target to be
 1112             etagged. Differing dates make this test impossible.
 1113         '''
 1114         from roundup import date
 1115 
 1116         originalDate = date.Date
 1117 
 1118         dummy=date.Date('2000-06-26.00:34:02.0')
 1119 
 1120         # is a closure the best way to return a static Date object??
 1121         def dummyDate(adate=None):
 1122             def dummyClosure(adate=None, translator=None):
 1123                 return dummy
 1124             return dummyClosure
 1125 
 1126         date.Date = dummyDate()
 1127         try:
 1128             newuser = self.db.user.create(
 1129                 username='john',
 1130                 password=password.Password('random1', scheme='plaintext'),
 1131                 address='random1@home.org',
 1132                 realname='JohnRandom',
 1133                 roles='User,Admin'
 1134             )
 1135 
 1136             # verify etag matches what we calculated in the past
 1137             node = self.db.user.getnode(newuser)
 1138             etag = calculate_etag(node, self.db.config['WEB_SECRET_KEY'])
 1139             items = node.items(protected=True) # include every item
 1140             print(repr(sorted(items)))
 1141             print(etag)
 1142             self.assertEqual(etag, '"0433784660a141e8262835171e70fd2f"')
 1143 
 1144             # modify key and verify we have a different etag
 1145             etag = calculate_etag(node, self.db.config['WEB_SECRET_KEY'] + "a")
 1146             items = node.items(protected=True) # include every item
 1147             print(repr(sorted(items)))
 1148             print(etag)
 1149             self.assertNotEqual(etag, '"0433784660a141e8262835171e70fd2f"')
 1150 
 1151             # change data and verify we have a different etag
 1152             node.username="Paul"
 1153             etag = calculate_etag(node, self.db.config['WEB_SECRET_KEY'])
 1154             items = node.items(protected=True) # include every item
 1155             print(repr(sorted(items)))
 1156             print(etag)
 1157             self.assertEqual(etag, '"8abeacd284d58655c620d60389e29d4d"')
 1158         finally:
 1159             date.Date = originalDate
 1160         
 1161     def testEtagProcessing(self):
 1162         '''
 1163         Etags can come from two places:
 1164            If-Match http header
 1165            @etags value posted in the form
 1166 
 1167         Both will be checked if availble. If either one
 1168         fails, the etag check will fail.
 1169 
 1170         Run over header only, etag in form only, both,
 1171         each one broke and no etag. Use the put command
 1172         to trigger the etag checking code.
 1173         '''
 1174         for mode in ('header', 'etag', 'both',
 1175                      'brokenheader', 'brokenetag', 'none'):
 1176             try:
 1177                 # clean up any old header
 1178                 del(self.headers)
 1179             except AttributeError:
 1180                 pass
 1181 
 1182             form = cgi.FieldStorage()
 1183             etag = calculate_etag(self.db.user.getnode(self.joeid),
 1184                                   self.db.config['WEB_SECRET_KEY'])
 1185             form.list = [
 1186                 cgi.MiniFieldStorage('data', 'Joe Doe Doe'),
 1187             ]
 1188 
 1189             if mode == 'header':
 1190                 print("Mode = %s"%mode)
 1191                 self.headers = {'if-match': etag}
 1192             elif mode == 'etag':
 1193                 print("Mode = %s"%mode)
 1194                 form.list.append(cgi.MiniFieldStorage('@etag', etag))
 1195             elif mode == 'both':
 1196                 print("Mode = %s"%mode)
 1197                 self.headers = {'etag': etag}
 1198                 form.list.append(cgi.MiniFieldStorage('@etag', etag))
 1199             elif mode == 'brokenheader':
 1200                 print("Mode = %s"%mode)
 1201                 self.headers = {'if-match': 'bad'}
 1202                 form.list.append(cgi.MiniFieldStorage('@etag', etag))
 1203             elif mode == 'brokenetag':
 1204                 print("Mode = %s"%mode)
 1205                 self.headers = {'if-match': etag}
 1206                 form.list.append(cgi.MiniFieldStorage('@etag', 'bad'))
 1207             elif mode == 'none':
 1208                 print( "Mode = %s"%mode)
 1209             else:
 1210                 self.fail("unknown mode found")
 1211 
 1212             results = self.server.put_attribute(
 1213                 'user', self.joeid, 'realname', form
 1214             )
 1215             if mode not in ('brokenheader', 'brokenetag', 'none'):
 1216                 self.assertEqual(self.dummy_client.response_code, 200)
 1217             else:
 1218                 self.assertEqual(self.dummy_client.response_code, 412)
 1219 
 1220     def testBinaryFieldStorage(self):
 1221         ''' attempt to exercise all paths in the BinaryFieldStorage
 1222             class
 1223         '''
 1224 
 1225         expected={ "data": {
 1226                       "link": "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue/1", 
 1227                       "id": "1"
 1228                    }
 1229                 }
 1230 
 1231         body=b'{ "title": "Joe Doe has problems", \
 1232                  "nosy": [ "1", "3" ], \
 1233                  "assignedto": "2", \
 1234                  "abool": true, \
 1235                  "afloat": 2.3, \
 1236                  "anint": 567890 \
 1237         }'
 1238         env = { "CONTENT_TYPE": "application/json",
 1239                 "CONTENT_LENGTH": len(body),
 1240                 "REQUEST_METHOD": "POST"
 1241         }
 1242         headers={"accept": "application/json; version=1",
 1243                  "content-type": env['CONTENT_TYPE'],
 1244                  "content-length": env['CONTENT_LENGTH'],
 1245         }
 1246         self.headers=headers
 1247         # we need to generate a FieldStorage the looks like
 1248         #  FieldStorage(None, None, 'string') rather than
 1249         #  FieldStorage(None, None, [])
 1250         body_file=BytesIO(body)  # FieldStorage needs a file
 1251         form = client.BinaryFieldStorage(body_file,
 1252                                 headers=headers,
 1253                                 environ=env)
 1254         self.server.client.request.headers.get=self.get_header
 1255         results = self.server.dispatch(env["REQUEST_METHOD"],
 1256                             "/rest/data/issue",
 1257                             form)
 1258         json_dict = json.loads(b2s(results))
 1259         self.assertEqual(json_dict,expected)
 1260 
 1261     def testDispatchPost(self):
 1262         """
 1263         run POST through rest dispatch(). This also tests
 1264         sending json payload through code as dispatch is the
 1265         code that changes json payload into something we can
 1266         process.
 1267         """
 1268 
 1269         # TEST #0
 1270         # POST: issue make joe assignee and admin and demo as
 1271         # nosy
 1272         # simulate: /rest/data/issue
 1273         body=b'{ "title": "Joe Doe has problems", \
 1274                  "nosy": [ "1", "3" ], \
 1275                  "assignedto": "2", \
 1276                  "abool": true, \
 1277                  "afloat": 2.3, \
 1278                  "anint": 567890 \
 1279         }'
 1280         env = { "CONTENT_TYPE": "application/json",
 1281                 "CONTENT_LENGTH": len(body),
 1282                 "REQUEST_METHOD": "POST"
 1283         }
 1284         headers={"accept": "application/json; version=1",
 1285                  "content-type": env['CONTENT_TYPE'],
 1286                  "content-length": env['CONTENT_LENGTH'],
 1287         }
 1288         self.headers=headers
 1289         # we need to generate a FieldStorage the looks like
 1290         #  FieldStorage(None, None, 'string') rather than
 1291         #  FieldStorage(None, None, [])
 1292         body_file=BytesIO(body)  # FieldStorage needs a file
 1293         form = client.BinaryFieldStorage(body_file,
 1294                                 headers=headers,
 1295                                 environ=env)
 1296         self.server.client.request.headers.get=self.get_header
 1297         results = self.server.dispatch(env["REQUEST_METHOD"],
 1298                             "/rest/data/issue",
 1299                             form)
 1300 
 1301         print(results)
 1302         self.assertEqual(self.server.client.response_code, 201)
 1303         json_dict = json.loads(b2s(results))
 1304         self.assertEqual(json_dict['data']['link'],
 1305                          "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue/1")
 1306         self.assertEqual(json_dict['data']['id'], "1")
 1307         results = self.server.dispatch('GET',
 1308                             "/rest/data/issue/1", self.empty_form)
 1309         print(results)
 1310         json_dict = json.loads(b2s(results))
 1311         self.assertEqual(json_dict['data']['link'],
 1312           "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue/1")
 1313         self.assertEqual(json_dict['data']['attributes']['abool'], True)
 1314         self.assertEqual(json_dict['data']['attributes']['afloat'], 2.3)
 1315         self.assertEqual(json_dict['data']['attributes']['anint'], 567890)
 1316         self.assertEqual(len(json_dict['data']['attributes']['nosy']), 3)
 1317         self.assertEqual(json_dict['data']['attributes']\
 1318                           ['assignedto']['link'],
 1319            "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/user/2")
 1320 
 1321 
 1322     def testStatsGen(self):
 1323         # check stats being returned by put and get ops
 1324         # using dispatch which parses the @stats query param
 1325 
 1326         # find correct py2/py3 list comparison ignoring order
 1327         try:
 1328             list_test = self.assertCountEqual  # py3
 1329         except AttributeError:
 1330             list_test = self.assertItemsEqual  # py2.7+
 1331 
 1332         # get stats
 1333         form = cgi.FieldStorage()
 1334         form.list = [
 1335             cgi.MiniFieldStorage('@stats', 'True'),
 1336         ]
 1337         results = self.server.dispatch('GET',
 1338                  "/rest/data/user/1/realname",
 1339                                  form)
 1340         self.assertEqual(self.dummy_client.response_code, 200)
 1341         json_dict = json.loads(b2s(results))
 1342 
 1343         # check that @stats are defined
 1344         self.assertTrue( '@stats' in json_dict['data'] )
 1345         # check that the keys are present
 1346         # not validating values as that changes
 1347         valid_fields= [ us2u('elapsed'),
 1348                         us2u('cache_hits'),
 1349                         us2u('cache_misses'),
 1350                         us2u('get_items'),
 1351                         us2u('filtering') ]
 1352         list_test(valid_fields,json_dict['data']['@stats'].keys())
 1353 
 1354         # Make sure false value works to suppress @stats
 1355         form = cgi.FieldStorage()
 1356         form.list = [
 1357             cgi.MiniFieldStorage('@stats', 'False'),
 1358         ]
 1359         results = self.server.dispatch('GET',
 1360                  "/rest/data/user/1/realname",
 1361                                  form)
 1362         self.assertEqual(self.dummy_client.response_code, 200)
 1363         json_dict = json.loads(b2s(results))
 1364         print(results)
 1365         # check that @stats are not defined
 1366         self.assertTrue( '@stats' not in json_dict['data'] )
 1367 
 1368         # Make sure non-true value works to suppress @stats
 1369         # false will always work
 1370         form = cgi.FieldStorage()
 1371         form.list = [
 1372             cgi.MiniFieldStorage('@stats', 'random'),
 1373         ]
 1374         results = self.server.dispatch('GET',
 1375                  "/rest/data/user/1/realname",
 1376                                  form)
 1377         self.assertEqual(self.dummy_client.response_code, 200)
 1378         json_dict = json.loads(b2s(results))
 1379         print(results)
 1380         # check that @stats are not defined
 1381         self.assertTrue( '@stats' not in json_dict['data'] )
 1382 
 1383         # if @stats is not defined there should be no stats
 1384         results = self.server.dispatch('GET',
 1385                  "/rest/data/user/1/realname",
 1386                                  self.empty_form)
 1387         self.assertEqual(self.dummy_client.response_code, 200)
 1388         json_dict = json.loads(b2s(results))
 1389 
 1390         # check that @stats are not defined
 1391         self.assertTrue( '@stats' not in json_dict['data'] )
 1392 
 1393 
 1394 
 1395         # change admin's realname via a normal web form
 1396         # This generates a FieldStorage that looks like:
 1397         #  FieldStorage(None, None, [])
 1398         # use etag from header
 1399         #
 1400         # Also use GET on the uri via the dispatch to retrieve
 1401         # the results from the db.
 1402         etag = calculate_etag(self.db.user.getnode('1'),
 1403                               self.db.config['WEB_SECRET_KEY'])
 1404         headers={"if-match": etag,
 1405                  "accept": "application/vnd.json.test-v1+json",
 1406         }
 1407         form = cgi.FieldStorage()
 1408         form.list = [
 1409             cgi.MiniFieldStorage('data', 'Joe Doe'),
 1410             cgi.MiniFieldStorage('@apiver', '1'),
 1411             cgi.MiniFieldStorage('@stats', 'true'),
 1412         ]
 1413         self.headers = headers
 1414         self.server.client.request.headers.get = self.get_header
 1415         self.db.setCurrentUser('admin') # must be admin to change user
 1416         results = self.server.dispatch('PUT',
 1417                             "/rest/data/user/1/realname",
 1418                             form)
 1419         self.assertEqual(self.dummy_client.response_code, 200)
 1420         json_dict = json.loads(b2s(results))
 1421         list_test(valid_fields,json_dict['data']['@stats'].keys())
 1422 
 1423     def testDispatch(self):
 1424         """
 1425         run changes through rest dispatch(). This also tests
 1426         sending json payload through code as dispatch is the
 1427         code that changes json payload into something we can
 1428         process.
 1429         """
 1430         # TEST #1
 1431         # PUT: joe's 'realname' using json data.
 1432         # simulate: /rest/data/user/<id>/realname
 1433         # use etag in header
 1434         etag = calculate_etag(self.db.user.getnode(self.joeid),
 1435                               self.db.config['WEB_SECRET_KEY'])
 1436         body=b'{ "data": "Joe Doe 1" }'
 1437         env = { "CONTENT_TYPE": "application/json",
 1438                 "CONTENT_LENGTH": len(body),
 1439                 "REQUEST_METHOD": "PUT"
 1440         }
 1441         headers={"accept": "application/json; version=1",
 1442                  "content-type": env['CONTENT_TYPE'],
 1443                  "content-length": env['CONTENT_LENGTH'],
 1444                  "if-match": etag
 1445         }
 1446         self.headers=headers
 1447         # we need to generate a FieldStorage the looks like
 1448         #  FieldStorage(None, None, 'string') rather than
 1449         #  FieldStorage(None, None, [])
 1450         body_file=BytesIO(body)  # FieldStorage needs a file
 1451         form = client.BinaryFieldStorage(body_file,
 1452                                 headers=headers,
 1453                                 environ=env)
 1454         self.server.client.request.headers.get=self.get_header
 1455         results = self.server.dispatch('PUT',
 1456                             "/rest/data/user/%s/realname"%self.joeid,
 1457                             form)
 1458 
 1459         self.assertEqual(self.server.client.response_code, 200)
 1460         results = self.server.get_element('user', self.joeid, self.empty_form)
 1461         self.assertEqual(self.dummy_client.response_code, 200)
 1462         self.assertEqual(results['data']['attributes']['realname'],
 1463                          'Joe Doe 1')
 1464 
 1465 
 1466         # substitute the version with an unacceptable version
 1467         # and verify it returns 400 code.
 1468         self.headers["accept"] = "application/json; version=1.1"
 1469         body_file=BytesIO(body)  # FieldStorage needs a file
 1470         form = client.BinaryFieldStorage(body_file,
 1471                                 headers=headers,
 1472                                 environ=env)
 1473         self.server.client.request.headers.get=self.get_header
 1474         results = self.server.dispatch('PUT',
 1475                             "/rest/data/user/%s/realname"%self.joeid,
 1476                             form)
 1477         self.assertEqual(self.server.client.response_code, 400)
 1478         del(self.headers)
 1479 
 1480         # TEST #2
 1481         # Set joe's 'realname' using json data.
 1482         # simulate: /rest/data/user/<id>/realname
 1483         # use etag in payload
 1484         etag = calculate_etag(self.db.user.getnode(self.joeid),
 1485                               self.db.config['WEB_SECRET_KEY'])
 1486         etagb = etag.strip ('"')
 1487         body=s2b('{ "@etag": "\\"%s\\"", "data": "Joe Doe 2" }'%etagb)
 1488         env = { "CONTENT_TYPE": "application/json",
 1489                 "CONTENT_LENGTH": len(body),
 1490                 "REQUEST_METHOD": "PUT",
 1491         }
 1492         self.headers=None  # have FieldStorage get len from env.
 1493         body_file=BytesIO(body)  # FieldStorage needs a file
 1494         form = client.BinaryFieldStorage(body_file,
 1495                                 headers=None,
 1496                                 environ=env)
 1497         self.server.client.request.headers.get=self.get_header
 1498 
 1499         headers={"accept": "application/json",
 1500                  "content-type": env['CONTENT_TYPE'],
 1501                  "if-match": etag
 1502         }
 1503         self.headers=headers # set for dispatch
 1504 
 1505         results = self.server.dispatch('PUT',
 1506                             "/rest/data/user/%s/realname"%self.joeid,
 1507                             form)
 1508 
 1509         self.assertEqual(self.server.client.response_code, 200)
 1510         results = self.server.get_element('user', self.joeid, self.empty_form)
 1511         self.assertEqual(self.dummy_client.response_code, 200)
 1512         self.assertEqual(results['data']['attributes']['realname'],
 1513                          'Joe Doe 2')
 1514         del(self.headers)
 1515 
 1516         # TEST #3
 1517         # change Joe's realname via a normal web form
 1518         # This generates a FieldStorage that looks like:
 1519         #  FieldStorage(None, None, [])
 1520         # use etag from header
 1521         #
 1522         # Also use GET on the uri via the dispatch to retrieve
 1523         # the results from the db.
 1524         etag = calculate_etag(self.db.user.getnode(self.joeid),
 1525                               self.db.config['WEB_SECRET_KEY'])
 1526         headers={"if-match": etag,
 1527                  "accept": "application/vnd.json.test-v1+json",
 1528         }
 1529         form = cgi.FieldStorage()
 1530         form.list = [
 1531             cgi.MiniFieldStorage('data', 'Joe Doe'),
 1532             cgi.MiniFieldStorage('@apiver', '1'),
 1533         ]
 1534         self.headers = headers
 1535         self.server.client.request.headers.get = self.get_header
 1536         results = self.server.dispatch('PUT',
 1537                             "/rest/data/user/%s/realname"%self.joeid,
 1538                             form)
 1539         self.assertEqual(self.dummy_client.response_code, 200)
 1540         results = self.server.dispatch('GET',
 1541                             "/rest/data/user/%s/realname"%self.joeid,
 1542                                        self.empty_form)
 1543         self.assertEqual(self.dummy_client.response_code, 200)
 1544         json_dict = json.loads(b2s(results))
 1545 
 1546         self.assertEqual(json_dict['data']['data'], 'Joe Doe')
 1547         self.assertEqual(json_dict['data']['link'],
 1548                          "http://tracker.example/cgi-bin/"
 1549                          "roundup.cgi/bugs/rest/data/user/3/realname") 
 1550         self.assertIn(json_dict['data']['type'], ("<class 'str'>",
 1551                                                   "<type 'str'>"))
 1552         self.assertEqual(json_dict['data']["id"], "3")
 1553         del(self.headers)
 1554 
 1555 
 1556         # TEST #4
 1557         # PATCH: joe's email address with json
 1558         # save address so we can use it later
 1559         stored_results = self.server.get_element('user', self.joeid,
 1560                                                  self.empty_form)
 1561         self.assertEqual(self.dummy_client.response_code, 200)
 1562 
 1563         etag = calculate_etag(self.db.user.getnode(self.joeid),
 1564                               self.db.config['WEB_SECRET_KEY'])
 1565         etagb = etag.strip ('"')
 1566         body=s2b('{ "address": "demo2@example.com", "@etag": "\\"%s\\""}'%etagb)
 1567         env = { "CONTENT_TYPE": "application/json",
 1568                 "CONTENT_LENGTH": len(body),
 1569                 "REQUEST_METHOD": "PATCH"
 1570         }
 1571         headers={"accept": "application/json",
 1572                  "content-type": env['CONTENT_TYPE'],
 1573                  "content-length": len(body)
 1574         }
 1575         self.headers=headers
 1576         body_file=BytesIO(body)  # FieldStorage needs a file
 1577         form = client.BinaryFieldStorage(body_file,
 1578                                 headers=headers,
 1579                                 environ=env)
 1580         self.server.client.request.headers.get=self.get_header
 1581         results = self.server.dispatch('PATCH',
 1582                             "/rest/data/user/%s"%self.joeid,
 1583                             form)
 1584 
 1585         self.assertEqual(self.server.client.response_code, 200)
 1586         results = self.server.get_element('user', self.joeid, self.empty_form)
 1587         self.assertEqual(self.dummy_client.response_code, 200)
 1588         self.assertEqual(results['data']['attributes']['address'],
 1589                          'demo2@example.com')
 1590 
 1591         # and set it back reusing env and headers from last test
 1592         etag = calculate_etag(self.db.user.getnode(self.joeid),
 1593                               self.db.config['WEB_SECRET_KEY'])
 1594         etagb = etag.strip ('"')
 1595         body=s2b('{ "address": "%s", "@etag": "\\"%s\\""}'%(
 1596             stored_results['data']['attributes']['address'],
 1597             etagb))
 1598         # reuse env and headers from prior test.
 1599         body_file=BytesIO(body)  # FieldStorage needs a file
 1600         form = client.BinaryFieldStorage(body_file,
 1601                                 headers=headers,
 1602                                 environ=env)
 1603         self.server.client.request.headers.get=self.get_header
 1604         results = self.server.dispatch('PATCH',
 1605                             "/rest/data/user/%s"%self.joeid,
 1606                             form)
 1607 
 1608         self.assertEqual(self.server.client.response_code, 200)
 1609         results = self.server.get_element('user', self.joeid, self.empty_form)
 1610         self.assertEqual(self.dummy_client.response_code, 200)
 1611         self.assertEqual(results['data']['attributes']['address'],
 1612                          'random@home.org')
 1613         del(self.headers)
 1614 
 1615         # TEST #5
 1616         # POST: create new issue
 1617         # no etag needed
 1618         etag = "not needed"
 1619         body=b'{ "title": "foo bar", "priority": "critical" }'
 1620         env = { "CONTENT_TYPE": "application/json",
 1621                 "CONTENT_LENGTH": len(body),
 1622                 "REQUEST_METHOD": "POST"
 1623         }
 1624         headers={"accept": "application/json",
 1625                  "content-type": env['CONTENT_TYPE'],
 1626                  "content-length": len(body)
 1627         }
 1628         self.headers=headers
 1629         body_file=BytesIO(body)  # FieldStorage needs a file
 1630         form = client.BinaryFieldStorage(body_file,
 1631                                 headers=headers,
 1632                                 environ=env)
 1633         self.server.client.request.headers.get=self.get_header
 1634         results = self.server.dispatch('POST',
 1635                             "/rest/data/issue",
 1636                             form)
 1637 
 1638         self.assertEqual(self.server.client.response_code, 201)
 1639         json_dict = json.loads(b2s(results))
 1640         issue_id=json_dict['data']['id']
 1641         results = self.server.get_element('issue',
 1642                             str(issue_id), # must be a string not unicode
 1643                             self.empty_form)
 1644         self.assertEqual(self.dummy_client.response_code, 200)
 1645         self.assertEqual(results['data']['attributes']['title'],
 1646                          'foo bar')
 1647 
 1648         # TEST #6
 1649         # POST: an invalid class
 1650         # no etag needed
 1651         etag = "not needed"
 1652         body=b'{ "title": "foo bar", "priority": "critical" }'
 1653         env = { "CONTENT_TYPE": "application/json",
 1654                 "CONTENT_LENGTH": len(body),
 1655                 "REQUEST_METHOD": "POST"
 1656         }
 1657         headers={"accept": "application/json",
 1658                  "content-type": env['CONTENT_TYPE'],
 1659                  "content-length": len(body)
 1660         }
 1661         self.headers=headers
 1662         body_file=BytesIO(body)  # FieldStorage needs a file
 1663         form = client.BinaryFieldStorage(body_file,
 1664                                 headers=headers,
 1665                                 environ=env)
 1666         self.server.client.request.headers.get=self.get_header
 1667         results = self.server.dispatch('POST',
 1668                             "/rest/data/nonissue",
 1669                             form)
 1670 
 1671         self.assertEqual(self.server.client.response_code, 404)
 1672         json_dict = json.loads(b2s(results))
 1673         status=json_dict['error']['status']
 1674         msg=json_dict['error']['msg']
 1675         self.assertEqual(status, 404)
 1676         self.assertEqual(msg, 'Class nonissue not found')
 1677 
 1678         # TEST #7
 1679         # POST: status without key field of name
 1680         # also test that version spec in accept header is accepted
 1681         # no etag needed
 1682         etag = "not needed"
 1683         body=b'{ "order": 5 }'
 1684         env = { "CONTENT_TYPE": "application/json",
 1685                 "CONTENT_LENGTH": len(body),
 1686                 "REQUEST_METHOD": "POST"
 1687         }
 1688         headers={"accept": "application/json; version=1",
 1689                  "content-type": env['CONTENT_TYPE'],
 1690                  "content-length": len(body)
 1691         }
 1692         self.headers=headers
 1693         body_file=BytesIO(body)  # FieldStorage needs a file
 1694         form = client.BinaryFieldStorage(body_file,
 1695                                 headers=headers,
 1696                                 environ=env)
 1697         self.server.client.request.headers.get=self.get_header
 1698         self.db.setCurrentUser('admin') # must be admin to create status
 1699         results = self.server.dispatch('POST',
 1700                             "/rest/data/status",
 1701                             form)
 1702 
 1703         self.assertEqual(self.server.client.response_code, 400)
 1704         json_dict = json.loads(b2s(results))
 1705         status=json_dict['error']['status']
 1706         msg=json_dict['error']['msg']
 1707         self.assertEqual(status, 400)
 1708         self.assertEqual(msg, "Must provide the 'name' property.")
 1709 
 1710 
 1711         # TEST #8
 1712         # DELETE: delete issue 1 also test return type by extension
 1713         #         test bogus extension as well.
 1714         etag = calculate_etag(self.db.issue.getnode("1"),
 1715                               self.db.config['WEB_SECRET_KEY'])
 1716         etagb = etag.strip ('"')
 1717         env = {"CONTENT_TYPE": "application/json",
 1718                "CONTENT_LEN": 0,
 1719                "REQUEST_METHOD": "DELETE" }
 1720         # use text/plain header and request json output by appending
 1721         # .json to the url.
 1722         headers={"accept": "text/plain",
 1723                  "content-type": env['CONTENT_TYPE'],
 1724                  "if-match": '"%s"'%etagb,
 1725                  "content-length": 0,
 1726         }
 1727         self.headers=headers
 1728         body_file=BytesIO(b'')  # FieldStorage needs a file
 1729         form = client.BinaryFieldStorage(body_file,
 1730                                 headers=headers,
 1731                                 environ=env)
 1732         self.server.client.request.headers.get=self.get_header
 1733         self.db.setCurrentUser('admin') # must be admin to delete issue
 1734         results = self.server.dispatch('DELETE',
 1735                             "/rest/data/issue/1.json",
 1736                             form)
 1737         self.assertEqual(self.server.client.response_code, 200)
 1738         print(results)
 1739         json_dict = json.loads(b2s(results))
 1740         status=json_dict['data']['status']
 1741         self.assertEqual(status, 'ok')
 1742 
 1743         results = self.server.dispatch('GET',
 1744                             "/rest/data/issuetitle:=asdf.jon",
 1745                             form)
 1746         self.assertEqual(self.server.client.response_code, 406)
 1747         print(results)
 1748         try:  # only verify local copy not system installed copy
 1749             from roundup.dicttoxml import dicttoxml
 1750             includexml = ', application/xml'
 1751         except ImportError:
 1752             includexml = ''
 1753         
 1754         response="Requested content type 'jon' is not available.\n" \
 1755                  "Acceptable types: */*, application/json%s\n"%includexml
 1756         self.assertEqual(b2s(results), response)
 1757 
 1758         # TEST #9
 1759         # GET: test that version can be set with accept:
 1760         #    ... ; version=z
 1761         # or
 1762         #    application/vnd.x.y-vz+json
 1763         # or
 1764         #    @apiver
 1765         # simulate: /rest/data/issue
 1766         form = cgi.FieldStorage()
 1767         form.list = [
 1768             cgi.MiniFieldStorage('@apiver', 'L'),
 1769         ]
 1770         headers={"accept": "application/json; notversion=z" }
 1771         self.headers=headers
 1772         self.server.client.request.headers.get=self.get_header
 1773         results = self.server.dispatch('GET',
 1774                             "/rest/data/issue/1", form)
 1775         print("9a: " + b2s(results))
 1776         json_dict = json.loads(b2s(results))
 1777         self.assertEqual(json_dict['error']['status'], 400)
 1778         self.assertEqual(json_dict['error']['msg'],
 1779               "Unrecognized version: L. See /rest without "
 1780               "specifying version for supported versions.")
 1781 
 1782         headers={"accept": "application/json; version=z" }
 1783         self.headers=headers
 1784         self.server.client.request.headers.get=self.get_header
 1785         results = self.server.dispatch('GET',
 1786                             "/rest/data/issue/1", form)
 1787         print("9b: " + b2s(results))
 1788         json_dict = json.loads(b2s(results))
 1789         self.assertEqual(json_dict['error']['status'], 400)
 1790         self.assertEqual(json_dict['error']['msg'],
 1791               "Unrecognized version: z. See /rest without "
 1792               "specifying version for supported versions.")
 1793 
 1794         headers={"accept": "application/vnd.roundup.test-vz+json" }
 1795         self.headers=headers
 1796         self.server.client.request.headers.get=self.get_header
 1797         results = self.server.dispatch('GET',
 1798                             "/rest/data/issue/1", self.empty_form)
 1799         print("9c:" + b2s(results))
 1800         self.assertEqual(self.server.client.response_code, 400)
 1801         json_dict = json.loads(b2s(results))
 1802         self.assertEqual(json_dict['error']['status'], 400)
 1803         self.assertEqual(json_dict['error']['msg'],
 1804               "Unrecognized version: z. See /rest without "
 1805               "specifying version for supported versions.")
 1806 
 1807         # verify that version priority is correct; should be version=...
 1808         headers={"accept": "application/vnd.roundup.test-vz+json; version=a"
 1809         }
 1810         self.headers=headers
 1811         self.server.client.request.headers.get=self.get_header
 1812         results = self.server.dispatch('GET',
 1813                             "/rest/data/issue/1", self.empty_form)
 1814         print("9d: " + b2s(results))
 1815         self.assertEqual(self.server.client.response_code, 400)
 1816         json_dict = json.loads(b2s(results))
 1817         self.assertEqual(json_dict['error']['status'], 400)
 1818         self.assertEqual(json_dict['error']['msg'],
 1819               "Unrecognized version: a. See /rest without "
 1820               "specifying version for supported versions.")
 1821 
 1822         # TEST #10
 1823         # check /rest and /rest/summary and /rest/notthere
 1824         expected_rest = {
 1825           "data": {
 1826               "supported_versions": [
 1827                   1
 1828               ],
 1829               "default_version": 1,
 1830               "links": [
 1831                   {
 1832                       "rel": "summary",
 1833                       "uri": "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/summary"
 1834                   },
 1835                   {
 1836                       "rel": "self",
 1837                       "uri": "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest"
 1838                   },
 1839                   {
 1840                       "rel": "data",
 1841                       "uri": "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data"
 1842                   }
 1843               ]
 1844           }
 1845         }
 1846 
 1847         self.headers={}
 1848         results = self.server.dispatch('GET',
 1849                             "/rest", self.empty_form)
 1850         print("10a: " + b2s(results))
 1851         self.assertEqual(self.server.client.response_code, 200)
 1852         results_dict = json.loads(b2s(results))
 1853         self.assertEqual(results_dict, expected_rest)
 1854 
 1855 
 1856         results = self.server.dispatch('GET',
 1857                             "/rest/", self.empty_form)
 1858         print("10b: " + b2s(results))
 1859         self.assertEqual(self.server.client.response_code, 200)
 1860         results_dict = json.loads(b2s(results))
 1861         self.assertEqual(results_dict, expected_rest)
 1862 
 1863         results = self.server.dispatch('GET',
 1864                             "/rest/summary", self.empty_form)
 1865         print("10c: " + b2s(results))
 1866         self.assertEqual(self.server.client.response_code, 200)
 1867 
 1868         results = self.server.dispatch('GET',
 1869                             "/rest/summary/", self.empty_form)
 1870         print("10d: " + b2s(results))
 1871         self.assertEqual(self.server.client.response_code, 200)
 1872 
 1873         expected_data = {
 1874             "data": {
 1875                 "issue": {
 1876                     "link": "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/issue"
 1877                 },
 1878                 "priority": {
 1879                     "link": "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/priority"
 1880                 },
 1881                 "user": {
 1882                     "link": "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/user"
 1883                 },
 1884                 "query": {
 1885                     "link": "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/query"
 1886                 },
 1887                 "status": {
 1888                     "link": "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/status"
 1889                 },
 1890                 "keyword": {
 1891                     "link": "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/keyword"
 1892                 },
 1893                 "msg": {
 1894                     "link": "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/msg"
 1895                 },
 1896                 "file": {
 1897                     "link": "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/file"
 1898                 }
 1899             }
 1900         }
 1901 
 1902         results = self.server.dispatch('GET',
 1903                             "/rest/data", self.empty_form)
 1904         print("10e: " + b2s(results))
 1905         self.assertEqual(self.server.client.response_code, 200)
 1906         results_dict = json.loads(b2s(results))
 1907         self.assertEqual(results_dict, expected_data)
 1908 
 1909         results = self.server.dispatch('GET',
 1910                             "/rest/data/", self.empty_form)
 1911         print("10f: " + b2s(results))
 1912         self.assertEqual(self.server.client.response_code, 200)
 1913         results_dict = json.loads(b2s(results))
 1914         self.assertEqual(results_dict, expected_data)
 1915 
 1916         results = self.server.dispatch('GET',
 1917                             "/rest/notthere", self.empty_form)
 1918         self.assertEqual(self.server.client.response_code, 404)
 1919 
 1920         results = self.server.dispatch('GET',
 1921                             "/rest/notthere/", self.empty_form)
 1922         self.assertEqual(self.server.client.response_code, 404)
 1923 
 1924         del(self.headers)
 1925 
 1926     def testAcceptHeaderParsing(self):
 1927         # TEST #1
 1928         # json highest priority
 1929         self.server.client.request.headers.get=self.get_header
 1930         headers={"accept": "application/json; version=1,"
 1931                            "application/xml; q=0.5; version=2,"
 1932                            "text/plain; q=0.75; version=2"
 1933         }
 1934         self.headers=headers
 1935         results = self.server.dispatch('GET',
 1936                                        "/rest/data/status/1",
 1937                                        self.empty_form)
 1938         print(results)
 1939         self.assertEqual(self.server.client.response_code, 200)
 1940         self.assertEqual(self.server.client.additional_headers['Content-Type'],
 1941                          "application/json")
 1942 
 1943         # TEST #2
 1944         # text highest priority
 1945         headers={"accept": "application/json; q=0.5; version=1,"
 1946                            "application/xml; q=0.25; version=2,"
 1947                            "text/plain; q=1.0; version=3"
 1948         }
 1949         self.headers=headers
 1950         results = self.server.dispatch('GET',
 1951                                        "/rest/data/status/1",
 1952                                        self.empty_form)
 1953         print(results)
 1954         self.assertEqual(self.server.client.response_code, 200)
 1955         self.assertEqual(self.server.client.additional_headers['Content-Type'],
 1956                          "application/json")
 1957 
 1958         # TEST #3
 1959         # no acceptable type
 1960         headers={"accept": "text/plain; q=1.0; version=2"
 1961         }
 1962         self.headers=headers
 1963         results = self.server.dispatch('GET',
 1964                                        "/rest/data/status/1",
 1965                                        self.empty_form)
 1966         print(results)
 1967         self.assertEqual(self.server.client.response_code, 406)
 1968         self.assertEqual(self.server.client.additional_headers['Content-Type'],
 1969                          "application/json")
 1970 
 1971         # TEST #4
 1972         # no accept header, should use default json
 1973         headers={}
 1974         self.headers=headers
 1975         results = self.server.dispatch('GET',
 1976                                        "/rest/data/status/1",
 1977                                        self.empty_form)
 1978         print(results)
 1979         self.assertEqual(self.server.client.response_code, 200)
 1980         self.assertEqual(self.server.client.additional_headers['Content-Type'],
 1981                          "application/json")
 1982 
 1983         # TEST #5
 1984         # wildcard accept header, should use default json
 1985         headers={ "accept": "*/*"}
 1986         self.headers=headers
 1987         results = self.server.dispatch('GET',
 1988                                        "/rest/data/status/1",
 1989                                        self.empty_form)
 1990         print(results)
 1991         self.assertEqual(self.server.client.response_code, 200)
 1992         self.assertEqual(self.server.client.additional_headers['Content-Type'],
 1993                          "application/json")
 1994 
 1995         # TEST #6
 1996         # invalid q factor if not ignored/demoted
 1997         # application/json is selected with invalid version
 1998         # and errors.
 1999         # this ends up choosing */* which triggers json.
 2000         self.server.client.request.headers.get=self.get_header
 2001         headers={"accept": "application/json; q=1.5; version=99,"
 2002                            "*/*; q=0.9; version=1,"
 2003                            "text/plain; q=3.75; version=2"
 2004         }
 2005         self.headers=headers
 2006         results = self.server.dispatch('GET',
 2007                                        "/rest/data/status/1",
 2008                                        self.empty_form)
 2009         print(results)
 2010         self.assertEqual(self.server.client.response_code, 200)
 2011         self.assertEqual(self.server.client.additional_headers['Content-Type'],
 2012                          "application/json")
 2013 
 2014 
 2015         '''
 2016         # only works if dicttoxml.py is installed.
 2017         #   not installed for testing
 2018         # TEST #7
 2019         # xml wins
 2020         headers={"accept": "application/json; q=0.5; version=2,"
 2021                            "application/xml; q=0.75; version=1,"
 2022                            "text/plain; q=1.0; version=2"
 2023         }
 2024         self.headers=headers
 2025         results = self.server.dispatch('GET',
 2026                                        "/rest/data/status/1",
 2027                                        self.empty_form)
 2028         print(results)
 2029         self.assertEqual(self.server.client.response_code, 200)
 2030         self.assertEqual(self.server.client.additional_headers['Content-Type'],
 2031                          "application/xml")
 2032         '''
 2033 
 2034     def testMethodOverride(self):
 2035         # TEST #1
 2036         # Use GET, PUT, PATCH to tunnel DELETE expect error
 2037         
 2038         body=b'{ "order": 5 }'
 2039         env = { "CONTENT_TYPE": "application/json",
 2040                 "CONTENT_LENGTH": len(body),
 2041                 "REQUEST_METHOD": "POST"
 2042         }
 2043         body_file=BytesIO(body)  # FieldStorage needs a file
 2044         self.server.client.request.headers.get=self.get_header
 2045         for method in ( "GET", "PUT", "PATCH" ):
 2046             headers={"accept": "application/json; version=1",
 2047                      "content-type": env['CONTENT_TYPE'],
 2048                      "content-length": len(body),
 2049                      "x-http-method-override": "DElETE",
 2050                  }
 2051             self.headers=headers
 2052             form = client.BinaryFieldStorage(body_file,
 2053                                          headers=headers,
 2054                                          environ=env)
 2055             self.db.setCurrentUser('admin') # must be admin to create status
 2056             results = self.server.dispatch(method,
 2057                                            "/rest/data/status",
 2058                                            form)
 2059 
 2060             self.assertEqual(self.server.client.response_code, 400)
 2061             json_dict = json.loads(b2s(results))
 2062             status=json_dict['error']['status']
 2063             msg=json_dict['error']['msg']
 2064             self.assertEqual(status, 400)
 2065             self.assertEqual(msg, "X-HTTP-Method-Override: DElETE must be "
 2066                              "used with POST method not %s."%method)
 2067 
 2068         # TEST #2
 2069         # DELETE: delete issue 1 via post tunnel
 2070         self.assertFalse(self.db.status.is_retired("1"))
 2071         etag = calculate_etag(self.db.status.getnode("1"),
 2072                               self.db.config['WEB_SECRET_KEY'])
 2073         etagb = etag.strip ('"')
 2074         headers={"accept": "application/json; q=1.0, application/xml; q=0.75",
 2075                  "if-match": '"%s"'%etagb,
 2076                  "content-length": 0,
 2077                  "x-http-method-override": "DElETE"
 2078         }
 2079         self.headers=headers
 2080         body_file=BytesIO(b'')  # FieldStorage needs a file
 2081         form = client.BinaryFieldStorage(body_file,
 2082                                 headers=headers,
 2083                                 environ=env)
 2084         self.server.client.request.headers.get=self.get_header
 2085         self.db.setCurrentUser('admin') # must be admin to delete issue
 2086         results = self.server.dispatch('POST',
 2087                             "/rest/data/status/1",
 2088                             form)
 2089         print(results)
 2090         self.assertEqual(self.server.client.response_code, 200)
 2091         json_dict = json.loads(b2s(results))
 2092         status=json_dict['data']['status']
 2093         self.assertEqual(status, 'ok')
 2094         self.assertTrue(self.db.status.is_retired("1"))
 2095         
 2096 
 2097     def testPostPOE(self):
 2098         ''' test post once exactly: get POE url, create issue
 2099             using POE url. Use dispatch entry point.
 2100         '''
 2101         import time
 2102         # setup environment
 2103         etag = "not needed"
 2104         empty_body=b''
 2105         env = { "CONTENT_TYPE": "application/json",
 2106                 "CONTENT_LENGTH": len(empty_body),
 2107                 "REQUEST_METHOD": "POST"
 2108         }
 2109         headers={"accept": "application/json",
 2110                  "content-type": env['CONTENT_TYPE'],
 2111                  "content-length": len(empty_body)
 2112         }
 2113         self.headers=headers
 2114         # use empty_body to test code path for missing/empty json
 2115         body_file=BytesIO(empty_body)  # FieldStorage needs a file
 2116         form = client.BinaryFieldStorage(body_file,
 2117                                 headers=headers,
 2118                                 environ=env)
 2119 
 2120         ## Obtain the POE url.
 2121         self.server.client.request.headers.get=self.get_header
 2122         results = self.server.dispatch('POST',
 2123                             "/rest/data/issue/@poe",
 2124                             form)
 2125 
 2126         self.assertEqual(self.server.client.response_code, 200)
 2127         json_dict = json.loads(b2s(results))
 2128         url=json_dict['data']['link']
 2129 
 2130         # strip tracker web prefix leaving leading /.
 2131         url = url[len(self.db.config['TRACKER_WEB'])-1:]
 2132 
 2133         ## create an issue using poe url.
 2134         body=b'{ "title": "foo bar", "priority": "critical" }'
 2135         env = { "CONTENT_TYPE": "application/json",
 2136                 "CONTENT_LENGTH": len(body),
 2137                 "REQUEST_METHOD": "POST"
 2138         }
 2139         headers={"accept": "application/json",
 2140                  "content-type": env['CONTENT_TYPE'],
 2141                  "content-length": len(body)
 2142         }
 2143         self.headers=headers
 2144         body_file=BytesIO(body)  # FieldStorage needs a file
 2145         form = client.BinaryFieldStorage(body_file,
 2146                                 headers=headers,
 2147                                 environ=env)
 2148         self.server.client.request.headers.get=self.get_header
 2149         results = self.server.dispatch('POST',
 2150                             url,
 2151                             form)
 2152 
 2153         self.assertEqual(self.server.client.response_code, 201)
 2154         json_dict = json.loads(b2s(results))
 2155         issue_id=json_dict['data']['id']
 2156         results = self.server.get_element('issue',
 2157                             str(issue_id), # must be a string not unicode
 2158                             self.empty_form)
 2159         self.assertEqual(self.dummy_client.response_code, 200)
 2160         self.assertEqual(results['data']['attributes']['title'],
 2161                          'foo bar')
 2162 
 2163         ## Reuse POE url. It will fail.
 2164         self.server.client.request.headers.get=self.get_header
 2165         results = self.server.dispatch('POST',
 2166                             url,
 2167                             form)
 2168         # get the last component stripping the trailing /
 2169         poe=url[url.rindex('/')+1:]
 2170         self.assertEqual(self.server.client.response_code, 400)
 2171         results = json.loads(b2s(results))
 2172         self.assertEqual(results['error']['status'], 400)
 2173         self.assertEqual(results['error']['msg'],
 2174                          "POE token \'%s\' not valid."%poe)
 2175 
 2176         ## Try using GET on POE url. Should fail with method not
 2177         ## allowed (405)
 2178         self.server.client.request.headers.get=self.get_header
 2179         results = self.server.dispatch('GET',
 2180                             "/rest/data/issue/@poe",
 2181                             form)
 2182         self.assertEqual(self.server.client.response_code, 405)
 2183 
 2184 
 2185         ## Try creating generic POE url.
 2186         body_poe=b'{"generic": "null", "lifetime": "100" }'
 2187         body_file=BytesIO(body_poe)  # FieldStorage needs a file
 2188         form = client.BinaryFieldStorage(body_file,
 2189                                 headers=headers,
 2190                                 environ=env)
 2191         self.server.client.request.headers.get=self.get_header
 2192         results = self.server.dispatch('POST',
 2193                             "/rest/data/issue/@poe",
 2194                             form)
 2195         json_dict = json.loads(b2s(results))
 2196         url=json_dict['data']['link']
 2197 
 2198         # strip tracker web prefix leaving leading /.
 2199         url = url[len(self.db.config['TRACKER_WEB'])-1:]
 2200         url = url.replace('/issue/', '/keyword/')
 2201 
 2202         body_keyword=b'{"name": "keyword"}'
 2203         body_file=BytesIO(body_keyword)  # FieldStorage needs a file
 2204         form = client.BinaryFieldStorage(body_file,
 2205                                 headers=headers,
 2206                                 environ=env)
 2207         results = self.server.dispatch('POST',
 2208                             url,
 2209                             form)
 2210         self.assertEqual(self.server.client.response_code, 201)
 2211         json_dict = json.loads(b2s(results))
 2212         url=json_dict['data']['link']
 2213         id=json_dict['data']['id']
 2214         self.assertEqual(id, "1")
 2215         self.assertEqual(url, "http://tracker.example/cgi-bin/roundup.cgi/bugs/rest/data/keyword/1")
 2216 
 2217         ## Create issue POE url and try to use for keyword.
 2218         ## This should fail.
 2219         body_poe=b'{"lifetime": "100" }'
 2220         body_file=BytesIO(body_poe)  # FieldStorage needs a file
 2221         form = client.BinaryFieldStorage(body_file,
 2222                                 headers=headers,
 2223                                 environ=env)
 2224         self.server.client.request.headers.get=self.get_header
 2225         results = self.server.dispatch('POST',
 2226                             "/rest/data/issue/@poe",
 2227                             form)
 2228         json_dict = json.loads(b2s(results))
 2229         url=json_dict['data']['link']
 2230 
 2231         # strip tracker web prefix leaving leading /.
 2232         url = url[len(self.db.config['TRACKER_WEB'])-1:]
 2233         url = url.replace('/issue/', '/keyword/')
 2234 
 2235         body_keyword=b'{"name": "keyword"}'
 2236         body_file=BytesIO(body_keyword)  # FieldStorage needs a file
 2237         form = client.BinaryFieldStorage(body_file,
 2238                                 headers=headers,
 2239                                 environ=env)
 2240         results = self.server.dispatch('POST',
 2241                             url,
 2242                             form)
 2243         poe=url[url.rindex('/')+1:]
 2244         self.assertEqual(self.server.client.response_code, 400)
 2245         json_dict = json.loads(b2s(results))
 2246         stat=json_dict['error']['status']
 2247         msg=json_dict['error']['msg']
 2248         self.assertEqual(stat, 400)
 2249         self.assertEqual(msg, "POE token '%s' not valid for keyword, was generated for class issue"%poe)
 2250 
 2251 
 2252         ## Create POE with 10 minute lifetime and verify
 2253         ## expires is within 10 minutes.
 2254         body_poe=b'{"lifetime": "30" }'
 2255         body_file=BytesIO(body_poe)  # FieldStorage needs a file
 2256         form = client.BinaryFieldStorage(body_file,
 2257                                 headers=headers,
 2258                                 environ=env)
 2259         self.server.client.request.headers.get=self.get_header
 2260         results = self.server.dispatch('POST',
 2261                             "/rest/data/issue/@poe",
 2262                             form)
 2263         json_dict = json.loads(b2s(results))
 2264         expires=int(json_dict['data']['expires'])
 2265         # allow up to 3 seconds between time stamp creation
 2266         # done under dispatch and this point.
 2267         expected=int(time.time() + 30)
 2268         print("expected=%d, expires=%d"%(expected,expires))
 2269         self.assertTrue((expected - expires) < 3 and (expected - expires) >= 0)
 2270 
 2271 
 2272         ## Use a token created above as joe by a different user.
 2273         self.db.setCurrentUser('admin')
 2274         url=json_dict['data']['link']
 2275         # strip tracker web prefix leaving leading /.
 2276         url = url[len(self.db.config['TRACKER_WEB'])-1:]
 2277         body_file=BytesIO(body_keyword)  # FieldStorage needs a file
 2278         form = client.BinaryFieldStorage(body_file,
 2279                                          headers=headers,
 2280                                          environ=env)
 2281         results = self.server.dispatch('POST',
 2282                                        url,
 2283                                        form)
 2284         print(results)
 2285         self.assertEqual(self.server.client.response_code, 400)
 2286         json_dict = json.loads(b2s(results))
 2287         # get the last component stripping the trailing /
 2288         poe=url[url.rindex('/')+1:]
 2289         self.assertEqual(json_dict['error']['msg'],
 2290                          "POE token '%s' not valid."%poe)
 2291 
 2292         ## Create POE with bogus lifetime
 2293         body_poe=b'{"lifetime": "10.2" }'
 2294         body_file=BytesIO(body_poe)  # FieldStorage needs a file
 2295         form = client.BinaryFieldStorage(body_file,
 2296                                 headers=headers,
 2297                                 environ=env)
 2298         self.server.client.request.headers.get=self.get_header
 2299         results = self.server.dispatch('POST',
 2300                             "/rest/data/issue/@poe",
 2301                             form)
 2302         self.assertEqual(self.server.client.response_code, 400)
 2303         print(results)
 2304         json_dict = json.loads(b2s(results))
 2305         self.assertEqual(json_dict['error']['msg'],
 2306                          "Value \'lifetime\' must be an integer specify "
 2307                          "lifetime in seconds. Got 10.2.")
 2308 
 2309         ## Create POE with lifetime > 1 hour
 2310         body_poe=b'{"lifetime": "3700" }'
 2311         body_file=BytesIO(body_poe)  # FieldStorage needs a file
 2312         form = client.BinaryFieldStorage(body_file,
 2313                                 headers=headers,
 2314                                 environ=env)
 2315         self.server.client.request.headers.get=self.get_header
 2316         results = self.server.dispatch('POST',
 2317                             "/rest/data/issue/@poe",
 2318                             form)
 2319         self.assertEqual(self.server.client.response_code, 400)
 2320         print(results)
 2321         json_dict = json.loads(b2s(results))
 2322         self.assertEqual(json_dict['error']['msg'],
 2323                          "Value 'lifetime' must be between 1 second and 1 "
 2324                          "hour (3600 seconds). Got 3700.")
 2325 
 2326         ## Create POE with lifetime < 1 second
 2327         body_poe=b'{"lifetime": "-1" }'
 2328         body_file=BytesIO(body_poe)  # FieldStorage needs a file
 2329         form = client.BinaryFieldStorage(body_file,
 2330                                 headers=headers,
 2331                                 environ=env)
 2332         self.server.client.request.headers.get=self.get_header
 2333         results = self.server.dispatch('POST',
 2334                             "/rest/data/issue/@poe",
 2335                             form)
 2336         self.assertEqual(self.server.client.response_code, 400)
 2337         print(results)
 2338         json_dict = json.loads(b2s(results))
 2339         self.assertEqual(json_dict['error']['msg'],
 2340                          "Value 'lifetime' must be between 1 second and 1 "
 2341                          "hour (3600 seconds). Got -1.")
 2342         del(self.headers)
 2343 
 2344     def testPutElement(self):
 2345         """
 2346         Change joe's 'realname'
 2347         Check if we can't change admin's detail
 2348         """
 2349         # fail to change Joe's realname via attribute uri
 2350         # no etag
 2351         form = cgi.FieldStorage()
 2352         form.list = [
 2353             cgi.MiniFieldStorage('data', 'Joe Doe Doe')
 2354         ]
 2355         results = self.server.put_attribute(
 2356             'user', self.joeid, 'realname', form
 2357         )
 2358         self.assertEqual(self.dummy_client.response_code, 412)
 2359         results = self.server.get_attribute(
 2360             'user', self.joeid, 'realname', self.empty_form
 2361         )
 2362         self.assertEqual(self.dummy_client.response_code, 200)
 2363         self.assertEqual(results['data']['data'], 'Joe Random')
 2364 
 2365         # change Joe's realname via attribute uri - etag in header
 2366         form = cgi.FieldStorage()
 2367         etag = calculate_etag(self.db.user.getnode(self.joeid),
 2368                               self.db.config['WEB_SECRET_KEY'])
 2369         form.list = [
 2370             cgi.MiniFieldStorage('data', 'Joe Doe Doe'),
 2371         ]
 2372 
 2373         self.headers = {'if-match': etag } # use etag in header
 2374         results = self.server.put_attribute(
 2375             'user', self.joeid, 'realname', form
 2376         )
 2377         self.assertEqual(self.dummy_client.response_code, 200)
 2378         results = self.server.get_attribute(
 2379             'user', self.joeid, 'realname', self.empty_form
 2380         )
 2381         self.assertEqual(self.dummy_client.response_code, 200)
 2382         self.assertEqual(results['data']['data'], 'Joe Doe Doe')
 2383         del(self.headers)
 2384 
 2385         # Reset joe's 'realname'. etag in body
 2386         # Also try to set protected items. The protected items should
 2387         # be ignored on put_element to make it easy to get the item
 2388         # with all fields, change one field and put the result without
 2389         # having to filter out protected items.
 2390         form = cgi.FieldStorage()
 2391         etag = calculate_etag(self.db.user.getnode(self.joeid),
 2392                               self.db.config['WEB_SECRET_KEY'])
 2393         form.list = [
 2394             cgi.MiniFieldStorage('creator', '3'),
 2395             cgi.MiniFieldStorage('realname', 'Joe Doe'),
 2396             cgi.MiniFieldStorage('@etag', etag)
 2397         ]
 2398         results = self.server.put_element('user', self.joeid, form)
 2399         self.assertEqual(self.dummy_client.response_code, 200)
 2400         results = self.server.get_element('user', self.joeid, self.empty_form)
 2401         self.assertEqual(self.dummy_client.response_code, 200)
 2402         self.assertEqual(results['data']['attributes']['realname'], 'Joe Doe')
 2403 
 2404         # We are joe, so check we can't change admin's details
 2405         results = self.server.put_element('user', '1', form)
 2406         self.assertEqual(self.dummy_client.response_code, 403)
 2407         self.assertEqual(results['error']['status'], 403)
 2408 
 2409         # Try to reset joe's 'realname' and add a broken prop.
 2410         # This should result in no change to the name and
 2411         # a 400 UsageError stating prop does not exist.
 2412         form = cgi.FieldStorage()
 2413         etag = calculate_etag(self.db.user.getnode(self.joeid),
 2414                               self.db.config['WEB_SECRET_KEY'])
 2415         form.list = [
 2416             cgi.MiniFieldStorage('JustKidding', '3'),
 2417             cgi.MiniFieldStorage('realname', 'Joe Doe'),
 2418             cgi.MiniFieldStorage('@etag', etag)
 2419         ]
 2420         results = self.server.put_element('user', self.joeid, form)
 2421         expected= {'error': {'status': 400,
 2422                              'msg': UsageError('Property JustKidding not '
 2423                                                'found in class user')}}
 2424         self.assertEqual(results['error']['status'],
 2425                          expected['error']['status'])
 2426         self.assertEqual(type(results['error']['msg']),
 2427                          type(expected['error']['msg']))
 2428         self.assertEqual(self.dummy_client.response_code, 400)
 2429         results = self.server.get_element('user', self.joeid, self.empty_form)
 2430         self.assertEqual(self.dummy_client.response_code, 200)
 2431         self.assertEqual(results['data']['attributes']['realname'], 'Joe Doe')
 2432 
 2433     def testPutAttribute(self):
 2434         # put protected property
 2435         # make sure we don't have permission issues
 2436         self.db.setCurrentUser('admin')
 2437         form = cgi.FieldStorage()
 2438         etag = calculate_etag(self.db.user.getnode(self.joeid),
 2439                               self.db.config['WEB_SECRET_KEY'])
 2440         form.list = [
 2441             cgi.MiniFieldStorage('data', '3'),
 2442             cgi.MiniFieldStorage('@etag', etag)
 2443         ]
 2444         results = self.server.put_attribute(
 2445             'user', self.joeid, 'creator', form
 2446         )
 2447         expected= {'error': {'status': 405, 'msg': 
 2448                              AttributeError('\'"creator", "actor", "creation" and '
 2449                                         '"activity" are reserved\'')}}
 2450         print(results)
 2451         self.assertEqual(results['error']['status'],
 2452                          expected['error']['status'])
 2453         self.assertEqual(type(results['error']['msg']),
 2454                          type(expected['error']['msg']))
 2455         self.assertEqual(self.dummy_client.response_code, 405)
 2456 
 2457         # put invalid property
 2458         # make sure we don't have permission issues
 2459         self.db.setCurrentUser('admin')
 2460         form = cgi.FieldStorage()
 2461         etag = calculate_etag(self.db.user.getnode(self.joeid),
 2462                               self.db.config['WEB_SECRET_KEY'])
 2463         form.list = [
 2464             cgi.MiniFieldStorage('data', '3'),
 2465             cgi.MiniFieldStorage('@etag', etag)
 2466         ]
 2467         results = self.server.put_attribute(
 2468             'user', self.joeid, 'youMustBeKiddingMe', form
 2469         )
 2470         expected= {'error': {'status': 400,
 2471                              'msg': UsageError("'youMustBeKiddingMe' "
 2472                                       "is not a property of user")}}
 2473         print(results)
 2474         self.assertEqual(results['error']['status'],
 2475                          expected['error']['status'])
 2476         self.assertEqual(type(results['error']['msg']),
 2477                          type(expected['error']['msg']))
 2478         self.assertEqual(self.dummy_client.response_code, 400)
 2479 
 2480     def testPost(self):
 2481         """
 2482         Post a new issue with title: foo
 2483         Verify the information of the created issue
 2484         """
 2485         form = cgi.FieldStorage()
 2486         form.list = [
 2487             cgi.MiniFieldStorage('title', 'foo')
 2488         ]
 2489         results = self.server.post_collection('issue', form)
 2490         self.assertEqual(self.dummy_client.response_code, 201)
 2491         issueid = results['data']['id']
 2492         results = self.server.get_element('issue', issueid, self.empty_form)
 2493         self.assertEqual(self.dummy_client.response_code, 200)
 2494         self.assertEqual(results['data']['attributes']['title'], 'foo')
 2495         self.assertEqual(self.db.issue.get(issueid, "tx_Source"), 'web')
 2496 
 2497     def testPostFile(self):
 2498         """
 2499         Post a new file with content: hello\r\nthere
 2500         Verify the information of the created file
 2501         """
 2502         form = cgi.FieldStorage()
 2503         form.list = [
 2504             cgi.MiniFieldStorage('content', 'hello\r\nthere')
 2505         ]
 2506         results = self.server.post_collection('file', form)
 2507         self.assertEqual(self.dummy_client.response_code, 201)
 2508         fileid = results['data']['id']
 2509         results = self.server.get_element('file', fileid, self.empty_form)
 2510         results = results['data']
 2511         self.assertEqual(self.dummy_client.response_code, 200)
 2512         self.assertEqual(results['attributes']['content'],
 2513             {'link': 'http://tracker.example/cgi-bin/roundup.cgi/bugs/file1/'})
 2514 
 2515         # File content is only shown with verbose=3
 2516         form = cgi.FieldStorage()
 2517         form.list = [
 2518             cgi.MiniFieldStorage('@verbose', '3')
 2519         ]
 2520         results = self.server.get_element('file', fileid, form)
 2521         results = results['data']
 2522         self.assertEqual(self.dummy_client.response_code, 200)
 2523         self.assertEqual(results['attributes']['content'], 'hello\r\nthere')
 2524 
 2525     def testAuthDeniedPut(self):
 2526         """
 2527         Test unauthorized PUT request
 2528         """
 2529         # Wrong permissions (caught by roundup security module).
 2530         form = cgi.FieldStorage()
 2531         form.list = [
 2532             cgi.MiniFieldStorage('realname', 'someone')
 2533         ]
 2534         results = self.server.put_element('user', '1', form)
 2535         self.assertEqual(self.dummy_client.response_code, 403)
 2536         self.assertEqual(results['error']['status'], 403)
 2537 
 2538     def testAuthDeniedPost(self):
 2539         """
 2540         Test unauthorized POST request
 2541         """
 2542         form = cgi.FieldStorage()
 2543         form.list = [
 2544             cgi.MiniFieldStorage('username', 'blah')
 2545         ]
 2546         results = self.server.post_collection('user', form)
 2547         self.assertEqual(self.dummy_client.response_code, 403)
 2548         self.assertEqual(results['error']['status'], 403)
 2549 
 2550     def testAuthAllowedPut(self):
 2551         """
 2552         Test authorized PUT request
 2553         """
 2554         self.db.setCurrentUser('admin')
 2555         form = cgi.FieldStorage()
 2556         form.list = [
 2557             cgi.MiniFieldStorage('realname', 'someone')
 2558         ]
 2559         try:
 2560             self.server.put_element('user', '2', form)
 2561         except Unauthorised as err:
 2562             self.fail('raised %s' % err)
 2563         finally:
 2564             self.db.setCurrentUser('joe')
 2565 
 2566     def testAuthAllowedPost(self):
 2567         """
 2568         Test authorized POST request
 2569         """
 2570         self.db.setCurrentUser('admin')
 2571         form = cgi.FieldStorage()
 2572         form.list = [
 2573             cgi.MiniFieldStorage('username', 'blah')
 2574         ]
 2575         try:
 2576             self.server.post_collection('user', form)
 2577         except Unauthorised as err:
 2578             self.fail('raised %s' % err)
 2579         finally:
 2580             self.db.setCurrentUser('joe')
 2581 
 2582     def testDeleteAttributeUri(self):
 2583         """
 2584         Test Delete an attribute
 2585         """
 2586         self.maxDiff = 4000
 2587         # create a new issue with userid 1 in the nosy list
 2588         issue_id = self.db.issue.create(title='foo', nosy=['1'])
 2589 
 2590         # No etag, so this should return 412 - Precondition Failed
 2591         # With no changes
 2592         results = self.server.delete_attribute(
 2593             'issue', issue_id, 'nosy', self.empty_form
 2594         )
 2595         self.assertEqual(self.dummy_client.response_code, 412)
 2596         results = self.server.get_element('issue', issue_id, self.empty_form)
 2597         results = results['data']
 2598         self.assertEqual(self.dummy_client.response_code, 200)
 2599         self.assertEqual(len(results['attributes']['nosy']), 1)
 2600         self.assertListEqual(results['attributes']['nosy'],
 2601             [{'id': '1', 'link': self.url_pfx + 'user/1'}])
 2602 
 2603         form = cgi.FieldStorage()
 2604         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2605                               self.db.config['WEB_SECRET_KEY'])
 2606         form.list.append(cgi.MiniFieldStorage('@etag', etag))
 2607         # remove the title and nosy
 2608         results = self.server.delete_attribute(
 2609             'issue', issue_id, 'title', form
 2610         )
 2611         self.assertEqual(self.dummy_client.response_code, 200)
 2612 
 2613         del(form.list[-1])
 2614         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2615                               self.db.config['WEB_SECRET_KEY'])
 2616         form.list.append(cgi.MiniFieldStorage('@etag', etag))
 2617         results = self.server.delete_attribute(
 2618             'issue', issue_id, 'nosy', form
 2619         )
 2620         self.assertEqual(self.dummy_client.response_code, 200)
 2621 
 2622         # verify the result
 2623         results = self.server.get_element('issue', issue_id, self.terse_form)
 2624         results = results['data']
 2625         self.assertEqual(self.dummy_client.response_code, 200)
 2626         self.assertEqual(len(results['attributes']['nosy']), 0)
 2627         self.assertListEqual(results['attributes']['nosy'], [])
 2628         self.assertEqual(results['attributes']['title'], None)
 2629 
 2630         # delete protected property
 2631         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2632                               self.db.config['WEB_SECRET_KEY'])
 2633         form.list.append(cgi.MiniFieldStorage('@etag', etag))
 2634         results = self.server.delete_attribute(
 2635             'issue', issue_id, 'creator', form
 2636         )
 2637         expected= {'error': {
 2638             'status': 405, 
 2639             'msg': AttributeError("Attribute 'creator' can not be updated for class issue.")
 2640         }}
 2641 
 2642         self.assertEqual(results['error']['status'],
 2643                          expected['error']['status'])
 2644         self.assertEqual(type(results['error']['msg']),
 2645                          type(expected['error']['msg']))
 2646         self.assertEqual(self.dummy_client.response_code, 405)
 2647 
 2648         # delete required property
 2649         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2650                               self.db.config['WEB_SECRET_KEY'])
 2651         form.list.append(cgi.MiniFieldStorage('@etag', etag))
 2652         results = self.server.delete_attribute(
 2653             'issue', issue_id, 'requireme', form
 2654         )
 2655         expected= {'error': {'status': 400,
 2656                     'msg': UsageError("Attribute 'requireme' is "
 2657                         "required by class issue and can not be deleted.")}}
 2658         print(results)
 2659         self.assertEqual(results['error']['status'],
 2660                          expected['error']['status'])
 2661         self.assertEqual(type(results['error']['msg']),
 2662                          type(expected['error']['msg']))
 2663         self.assertEqual(str(results['error']['msg']),
 2664                          str(expected['error']['msg']))
 2665         self.assertEqual(self.dummy_client.response_code, 400)
 2666 
 2667         # delete bogus property
 2668         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2669                               self.db.config['WEB_SECRET_KEY'])
 2670         form.list.append(cgi.MiniFieldStorage('@etag', etag))
 2671         results = self.server.delete_attribute(
 2672             'issue', issue_id, 'nosuchprop', form
 2673         )
 2674         expected= {'error': {'status': 400,
 2675                     'msg': UsageError("Attribute 'nosuchprop' not valid "
 2676                                       "for class issue.")}}
 2677         print(results)
 2678         self.assertEqual(results['error']['status'],
 2679                          expected['error']['status'])
 2680         self.assertEqual(type(results['error']['msg']),
 2681                          type(expected['error']['msg']))
 2682         self.assertEqual(str(results['error']['msg']),
 2683                          str(expected['error']['msg']))
 2684         self.assertEqual(self.dummy_client.response_code, 400)
 2685 
 2686     def testPatchAdd(self):
 2687         """
 2688         Test Patch op 'Add'
 2689         """
 2690         # create a new issue with userid 1 in the nosy list
 2691         issue_id = self.db.issue.create(title='foo', nosy=['1'])
 2692 
 2693         # fail to add userid 2 to the nosy list
 2694         # no etag
 2695         form = cgi.FieldStorage()
 2696         form.list = [
 2697             cgi.MiniFieldStorage('@op', 'add'),
 2698             cgi.MiniFieldStorage('nosy', '2')
 2699         ]
 2700         results = self.server.patch_element('issue', issue_id, form)
 2701         self.assertEqual(self.dummy_client.response_code, 412)
 2702 
 2703         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2704                               self.db.config['WEB_SECRET_KEY'])
 2705         form = cgi.FieldStorage()
 2706         form.list = [
 2707             cgi.MiniFieldStorage('@op', 'add'),
 2708             cgi.MiniFieldStorage('nosy', '2'),
 2709             cgi.MiniFieldStorage('@etag', etag)
 2710         ]
 2711         results = self.server.patch_element('issue', issue_id, form)
 2712         self.assertEqual(self.dummy_client.response_code, 200)
 2713 
 2714         # verify the result
 2715         results = self.server.get_element('issue', issue_id, self.terse_form)
 2716         results = results['data']
 2717         self.assertEqual(self.dummy_client.response_code, 200)
 2718         self.assertEqual(len(results['attributes']['nosy']), 2)
 2719         self.assertListEqual(results['attributes']['nosy'], ['1', '2'])
 2720 
 2721         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2722                               self.db.config['WEB_SECRET_KEY'])
 2723         form = cgi.FieldStorage()
 2724         form.list = [
 2725             cgi.MiniFieldStorage('@op', 'add'),
 2726             cgi.MiniFieldStorage('data', '3'),
 2727             cgi.MiniFieldStorage('@etag', etag)
 2728         ]
 2729         results = self.server.patch_attribute('issue', issue_id, 'nosy', form)
 2730         self.assertEqual(self.dummy_client.response_code, 200)
 2731 
 2732         # verify the result
 2733         results = self.server.get_element('issue', issue_id, self.terse_form)
 2734         results = results['data']
 2735         self.assertEqual(self.dummy_client.response_code, 200)
 2736         self.assertEqual(len(results['attributes']['nosy']), 3)
 2737         self.assertListEqual(results['attributes']['nosy'], ['1', '2', '3'])
 2738 
 2739         # patch with no new_val/data
 2740         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2741                               self.db.config['WEB_SECRET_KEY'])
 2742         form = cgi.FieldStorage()
 2743         form.list = [
 2744             cgi.MiniFieldStorage('@op', 'add'),
 2745             cgi.MiniFieldStorage('data', ''),
 2746             cgi.MiniFieldStorage('@etag', etag)
 2747         ]
 2748         results = self.server.patch_attribute('issue', issue_id, 'nosy', form)
 2749         self.assertEqual(self.dummy_client.response_code, 200)
 2750 
 2751         # verify the result
 2752         results = self.server.get_element('issue', issue_id, self.terse_form)
 2753         results = results['data']
 2754         self.assertEqual(self.dummy_client.response_code, 200)
 2755         self.assertEqual(len(results['attributes']['nosy']), 3)
 2756         self.assertListEqual(results['attributes']['nosy'], ['1', '2', '3'])
 2757 
 2758         # patch invalid property
 2759         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2760                               self.db.config['WEB_SECRET_KEY'])
 2761         form = cgi.FieldStorage()
 2762         form.list = [
 2763             cgi.MiniFieldStorage('@op', 'add'),
 2764             cgi.MiniFieldStorage('data', '3'),
 2765             cgi.MiniFieldStorage('@etag', etag)
 2766         ]
 2767         results = self.server.patch_attribute('issue', issue_id, 'notGoingToWork', form)
 2768         self.assertEqual(self.dummy_client.response_code, 400)
 2769         print(results)
 2770         expected={'error': {'status': 400,
 2771                             'msg': UsageError(
 2772                                 HyperdbValueError(
 2773                             "'notGoingToWork' is not a property of issue",),)}}
 2774         self.assertEqual(results['error']['status'],
 2775                          expected['error']['status'])
 2776         self.assertEqual(type(results['error']['msg']),
 2777                          type(expected['error']['msg']))
 2778         self.assertEqual(str(results['error']['msg']),
 2779                          str(expected['error']['msg']))
 2780 
 2781     def testPatchReplace(self):
 2782         """
 2783         Test Patch op 'Replace'
 2784         """
 2785         # create a new issue with userid 1 in the nosy list and status = 1
 2786         issue_id = self.db.issue.create(title='foo', nosy=['1'], status='1')
 2787 
 2788         # fail to replace userid 2 to the nosy list and status = 3
 2789         # no etag.
 2790         form = cgi.FieldStorage()
 2791         form.list = [
 2792             cgi.MiniFieldStorage('@op', 'replace'),
 2793             cgi.MiniFieldStorage('nosy', '2'),
 2794             cgi.MiniFieldStorage('status', '3')
 2795         ]
 2796         results = self.server.patch_element('issue', issue_id, form)
 2797         self.assertEqual(self.dummy_client.response_code, 412)
 2798         results = self.server.get_element('issue', issue_id, self.terse_form)
 2799         results = results['data']
 2800         self.assertEqual(self.dummy_client.response_code, 200)
 2801         self.assertEqual(results['attributes']['status'], '1')
 2802         self.assertEqual(len(results['attributes']['nosy']), 1)
 2803         self.assertListEqual(results['attributes']['nosy'], ['1'])
 2804 
 2805         # replace userid 2 to the nosy list and status = 3
 2806         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2807                               self.db.config['WEB_SECRET_KEY'])
 2808         form = cgi.FieldStorage()
 2809         form.list = [
 2810             cgi.MiniFieldStorage('@op', 'replace'),
 2811             cgi.MiniFieldStorage('nosy', '2'),
 2812             cgi.MiniFieldStorage('status', '3'),
 2813             cgi.MiniFieldStorage('@etag', etag)
 2814         ]
 2815         results = self.server.patch_element('issue', issue_id, form)
 2816         self.assertEqual(self.dummy_client.response_code, 200)
 2817         # verify the result
 2818         results = self.server.get_element('issue', issue_id, self.terse_form)
 2819         results = results['data']
 2820         self.assertEqual(self.dummy_client.response_code, 200)
 2821         self.assertEqual(results['attributes']['status'], '3')
 2822         self.assertEqual(len(results['attributes']['nosy']), 1)
 2823         self.assertListEqual(results['attributes']['nosy'], ['2'])
 2824 
 2825         # replace status = 2 using status attribute
 2826         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2827                               self.db.config['WEB_SECRET_KEY'])
 2828         form = cgi.FieldStorage()
 2829         form.list = [
 2830             cgi.MiniFieldStorage('@op', 'replace'),
 2831             cgi.MiniFieldStorage('data', '2'),
 2832             cgi.MiniFieldStorage('@etag', etag)
 2833         ]
 2834         results = self.server.patch_attribute('issue', issue_id, 'status',
 2835                                               form)
 2836         self.assertEqual(self.dummy_client.response_code, 200)
 2837         # verify the result
 2838         results = self.server.get_element('issue', issue_id, self.terse_form)
 2839         results = results['data']
 2840         self.assertEqual(self.dummy_client.response_code, 200)
 2841         self.assertEqual(results['attributes']['status'], '2')
 2842 
 2843         # try to set a protected prop. It should fail.
 2844         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2845                               self.db.config['WEB_SECRET_KEY'])
 2846         form = cgi.FieldStorage()
 2847         form.list = [
 2848             cgi.MiniFieldStorage('@op', 'replace'),
 2849             cgi.MiniFieldStorage('creator', '2'),
 2850             cgi.MiniFieldStorage('@etag', etag)
 2851         ]
 2852         results = self.server.patch_element('issue', issue_id, form)
 2853         expected= {'error': {'status': 400,
 2854                              'msg': KeyError('"creator", "actor", "creation" and "activity" are reserved',)}}
 2855         print(results)
 2856         self.assertEqual(results['error']['status'],
 2857                          expected['error']['status'])
 2858         self.assertEqual(type(results['error']['msg']),
 2859                          type(expected['error']['msg']))
 2860         self.assertEqual(str(results['error']['msg']),
 2861                          str(expected['error']['msg']))
 2862         self.assertEqual(self.dummy_client.response_code, 400)
 2863 
 2864         # try to set a protected prop using patch_attribute. It should
 2865         # fail with a 405 bad/unsupported method.
 2866         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2867                               self.db.config['WEB_SECRET_KEY'])
 2868         form = cgi.FieldStorage()
 2869         form.list = [
 2870             cgi.MiniFieldStorage('@op', 'replace'),
 2871             cgi.MiniFieldStorage('data', '2'),
 2872             cgi.MiniFieldStorage('@etag', etag)
 2873         ]
 2874         results = self.server.patch_attribute('issue', issue_id, 'creator', 
 2875                                               form)
 2876         expected= {'error': {'status': 405,
 2877                              'msg': AttributeError("Attribute 'creator' can not be updated for class issue.",)}}
 2878         print(results)
 2879         self.assertEqual(results['error']['status'],
 2880                          expected['error']['status'])
 2881         self.assertEqual(type(results['error']['msg']),
 2882                          type(expected['error']['msg']))
 2883         self.assertEqual(str(results['error']['msg']),
 2884                          str(expected['error']['msg']))
 2885         self.assertEqual(self.dummy_client.response_code, 405)
 2886 
 2887     def testPatchRemoveAll(self):
 2888         """
 2889         Test Patch Action 'Remove'
 2890         """
 2891         # create a new issue with userid 1 and 2 in the nosy list
 2892         issue_id = self.db.issue.create(title='foo', nosy=['1', '2'])
 2893 
 2894         # fail to remove the nosy list and the title
 2895         # no etag
 2896         form = cgi.FieldStorage()
 2897         form.list = [
 2898             cgi.MiniFieldStorage('@op', 'remove'),
 2899             cgi.MiniFieldStorage('nosy', ''),
 2900             cgi.MiniFieldStorage('title', '')
 2901         ]
 2902         results = self.server.patch_element('issue', issue_id, form)
 2903         self.assertEqual(self.dummy_client.response_code, 412)
 2904         results = self.server.get_element('issue', issue_id, self.terse_form)
 2905         results = results['data']
 2906         self.assertEqual(self.dummy_client.response_code, 200)
 2907         self.assertEqual(results['attributes']['title'], 'foo')
 2908         self.assertEqual(len(results['attributes']['nosy']), 2)
 2909         self.assertEqual(results['attributes']['nosy'], ['1', '2'])
 2910 
 2911         # remove the nosy list and the title
 2912         form = cgi.FieldStorage()
 2913         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2914                               self.db.config['WEB_SECRET_KEY'])
 2915         form.list = [
 2916             cgi.MiniFieldStorage('@op', 'remove'),
 2917             cgi.MiniFieldStorage('nosy', ''),
 2918             cgi.MiniFieldStorage('title', ''),
 2919             cgi.MiniFieldStorage('@etag', etag)
 2920         ]
 2921         results = self.server.patch_element('issue', issue_id, form)
 2922         self.assertEqual(self.dummy_client.response_code, 200)
 2923 
 2924         # verify the result
 2925         results = self.server.get_element('issue', issue_id, self.terse_form)
 2926         results = results['data']
 2927         self.assertEqual(self.dummy_client.response_code, 200)
 2928         self.assertEqual(results['attributes']['title'], None)
 2929         self.assertEqual(len(results['attributes']['nosy']), 0)
 2930         self.assertEqual(results['attributes']['nosy'], [])
 2931 
 2932         # try to remove a protected prop. It should fail.
 2933         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2934                               self.db.config['WEB_SECRET_KEY'])
 2935         form = cgi.FieldStorage()
 2936         form.list = [
 2937             cgi.MiniFieldStorage('@op', 'remove'),
 2938             cgi.MiniFieldStorage('creator', '2'),
 2939             cgi.MiniFieldStorage('@etag', etag)
 2940         ]
 2941         results = self.server.patch_element('issue', issue_id, form)
 2942         expected= {'error': {'status': 400,
 2943                              'msg': KeyError('"creator", "actor", "creation" and "activity" are reserved',)}}
 2944         print(results)
 2945         self.assertEqual(results['error']['status'],
 2946                          expected['error']['status'])
 2947         self.assertEqual(type(results['error']['msg']),
 2948                          type(expected['error']['msg']))
 2949         self.assertEqual(str(results['error']['msg']),
 2950                          str(expected['error']['msg']))
 2951         self.assertEqual(self.dummy_client.response_code, 400)
 2952 
 2953         # try to remove a required prop. it should fail
 2954         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2955                               self.db.config['WEB_SECRET_KEY'])
 2956         form.list = [
 2957             cgi.MiniFieldStorage('@op', 'remove'),
 2958             cgi.MiniFieldStorage('requireme', ''),
 2959             cgi.MiniFieldStorage('@etag', etag)
 2960         ]
 2961         results = self.server.patch_element('issue', issue_id, form)
 2962         expected= {'error': {'status': 400,
 2963                              'msg': UsageError("Attribute 'requireme' is required by class issue and can not be removed.")
 2964                              }}
 2965         print(results)
 2966         self.assertEqual(results['error']['status'],
 2967                          expected['error']['status'])
 2968         self.assertEqual(type(results['error']['msg']),
 2969                          type(expected['error']['msg']))
 2970         self.assertEqual(str(results['error']['msg']),
 2971                          str(expected['error']['msg']))
 2972         self.assertEqual(self.dummy_client.response_code, 400)
 2973 
 2974     def testPatchAction(self):
 2975         """
 2976         Test Patch Action 'Action'
 2977         """
 2978         # create a new issue with userid 1 and 2 in the nosy list
 2979         issue_id = self.db.issue.create(title='foo')
 2980 
 2981         # fail to execute action retire
 2982         # no etag
 2983         form = cgi.FieldStorage()
 2984         form.list = [
 2985             cgi.MiniFieldStorage('@op', 'action'),
 2986             cgi.MiniFieldStorage('@action_name', 'retire')
 2987         ]
 2988         results = self.server.patch_element('issue', issue_id, form)
 2989         self.assertEqual(self.dummy_client.response_code, 412)
 2990         self.assertFalse(self.db.issue.is_retired(issue_id))
 2991 
 2992         # execute action retire
 2993         form = cgi.FieldStorage()
 2994         etag = calculate_etag(self.db.issue.getnode(issue_id),
 2995                               self.db.config['WEB_SECRET_KEY'])
 2996         form.list = [
 2997             cgi.MiniFieldStorage('@op', 'action'),
 2998             cgi.MiniFieldStorage('@action_name', 'retire'),
 2999             cgi.MiniFieldStorage('@etag', etag)
 3000         ]
 3001         results = self.server.patch_element('issue', issue_id, form)
 3002         self.assertEqual(self.dummy_client.response_code, 200)
 3003 
 3004         # verify the result
 3005         self.assertTrue(self.db.issue.is_retired(issue_id))
 3006 
 3007         # execute action restore
 3008         form = cgi.FieldStorage()
 3009         etag = calculate_etag(self.db.issue.getnode(issue_id),
 3010                               self.db.config['WEB_SECRET_KEY'])
 3011         form.list = [
 3012             cgi.MiniFieldStorage('@op', 'action'),
 3013             cgi.MiniFieldStorage('@action_name', 'restore'),
 3014             cgi.MiniFieldStorage('@etag', etag)
 3015         ]
 3016         results = self.server.patch_element('issue', issue_id, form)
 3017         self.assertEqual(self.dummy_client.response_code, 200)
 3018 
 3019         # verify the result
 3020         self.assertTrue(not self.db.issue.is_retired(issue_id))
 3021 
 3022     def testPatchRemove(self):
 3023         """
 3024         Test Patch Action 'Remove' only some element from a list
 3025         """
 3026         # create a new issue with userid 1, 2, 3 in the nosy list
 3027         issue_id = self.db.issue.create(title='foo', nosy=['1', '2', '3'])
 3028 
 3029         # fail to remove the nosy list and the title
 3030         # no etag
 3031         form = cgi.FieldStorage()
 3032         form.list = [
 3033             cgi.MiniFieldStorage('@op', 'remove'),
 3034             cgi.MiniFieldStorage('nosy', '1, 2'),
 3035         ]
 3036         results = self.server.patch_element('issue', issue_id, form)
 3037         self.assertEqual(self.dummy_client.response_code, 412)
 3038         results = self.server.get_element('issue', issue_id, self.terse_form)
 3039         results = results['data']
 3040         self.assertEqual(self.dummy_client.response_code, 200)
 3041         self.assertEqual(len(results['attributes']['nosy']), 3)
 3042         self.assertEqual(results['attributes']['nosy'], ['1', '2', '3'])
 3043 
 3044         # remove 1 and 2 from the nosy list
 3045         form = cgi.FieldStorage()
 3046         etag = calculate_etag(self.db.issue.getnode(issue_id),
 3047                               self.db.config['WEB_SECRET_KEY'])
 3048         form.list = [
 3049             cgi.MiniFieldStorage('@op', 'remove'),
 3050             cgi.MiniFieldStorage('nosy', '1, 2'),
 3051             cgi.MiniFieldStorage('@etag', etag)
 3052         ]
 3053         results = self.server.patch_element('issue', issue_id, form)
 3054         self.assertEqual(self.dummy_client.response_code, 200)
 3055 
 3056         # verify the result
 3057         results = self.server.get_element('issue', issue_id, self.terse_form)
 3058         results = results['data']
 3059         self.assertEqual(self.dummy_client.response_code, 200)
 3060         self.assertEqual(len(results['attributes']['nosy']), 1)
 3061         self.assertEqual(results['attributes']['nosy'], ['3'])
 3062 
 3063         # delete last element: 3
 3064         etag = calculate_etag(self.db.issue.getnode(issue_id),
 3065                               self.db.config['WEB_SECRET_KEY'])
 3066         form = cgi.FieldStorage()
 3067         form.list = [
 3068             cgi.MiniFieldStorage('@op', 'remove'),
 3069             cgi.MiniFieldStorage('data', '3'),
 3070             cgi.MiniFieldStorage('@etag', etag)
 3071         ]
 3072         results = self.server.patch_attribute('issue', issue_id, 'nosy', form)
 3073         self.assertEqual(self.dummy_client.response_code, 200)
 3074 
 3075         # verify the result
 3076         results = self.server.get_element('issue', issue_id, self.terse_form)
 3077         results = results['data']
 3078         self.assertEqual(self.dummy_client.response_code, 200)
 3079         self.assertEqual(len(results['attributes']['nosy']), 0)
 3080         self.assertListEqual(results['attributes']['nosy'], [])
 3081 
 3082     @skip_jwt
 3083     def test_expired_jwt(self):
 3084         # self.dummy_client.main() closes database, so
 3085         # we need a new test with setup called for each test
 3086         out = []
 3087         def wh(s):
 3088             out.append(s)
 3089 
 3090         secret = self.db.config.WEB_JWT_SECRET
 3091 
 3092         # verify library and tokens are correct
 3093         self.assertRaises(jwt.exceptions.InvalidTokenError,
 3094                           jwt.decode, self.jwt['expired'],
 3095                           secret,  algorithms=['HS256'],
 3096                           audience=self.db.config.TRACKER_WEB,
 3097                           issuer=self.db.config.TRACKER_WEB)
 3098 
 3099         result = jwt.decode(self.jwt['user'],
 3100                             secret,  algorithms=['HS256'],
 3101                             audience=self.db.config.TRACKER_WEB,
 3102                             issuer=self.db.config.TRACKER_WEB)
 3103         self.assertEqual(self.claim['user'],result)
 3104 
 3105         result = jwt.decode(self.jwt['user:email'],
 3106                             secret,  algorithms=['HS256'],
 3107                             audience=self.db.config.TRACKER_WEB,
 3108                             issuer=self.db.config.TRACKER_WEB)
 3109         self.assertEqual(self.claim['user:email'],result)
 3110 
 3111         # set environment for all jwt tests
 3112         env = {
 3113             'PATH_INFO': 'rest/data/user',
 3114             'HTTP_HOST': 'localhost',
 3115             'TRACKER_NAME': 'rounduptest',
 3116             "REQUEST_METHOD": "GET"
 3117         }
 3118         self.dummy_client = client.Client(self.instance, MockNull(), env,
 3119                                           [], None)
 3120         self.dummy_client.db = self.db
 3121         self.dummy_client.request.headers.get = self.get_header
 3122         self.empty_form = cgi.FieldStorage()
 3123         self.terse_form = cgi.FieldStorage()
 3124         self.terse_form.list = [
 3125             cgi.MiniFieldStorage('@verbose', '0'),
 3126         ]
 3127         self.dummy_client.form = cgi.FieldStorage()
 3128         self.dummy_client.form.list = [
 3129             cgi.MiniFieldStorage('@fields', 'username,address'),
 3130         ]
 3131         # accumulate json output for further analysis
 3132         self.dummy_client.write = wh
 3133 
 3134         # set up for expired token first
 3135         env['HTTP_AUTHORIZATION'] = 'bearer %s'%self.jwt['expired']
 3136         self.dummy_client.main()
 3137 
 3138         # this will be the admin still as auth failed
 3139         self.assertEqual('1', self.db.getuid())
 3140         self.assertEqual(out[0], b'Invalid Login - Signature has expired')
 3141         del(out[0])
 3142 
 3143 
 3144     @skip_jwt
 3145     def test_user_jwt(self):
 3146         # self.dummy_client.main() closes database, so
 3147         # we need a new test with setup called for each test
 3148         out = []
 3149         def wh(s):
 3150             out.append(s)
 3151 
 3152         secret = self.db.config.WEB_JWT_SECRET
 3153 
 3154         # verify library and tokens are correct
 3155         self.assertRaises(jwt.exceptions.InvalidTokenError,
 3156                           jwt.decode, self.jwt['expired'],
 3157                           secret,  algorithms=['HS256'],
 3158                           audience=self.db.config.TRACKER_WEB,
 3159                           issuer=self.db.config.TRACKER_WEB)
 3160 
 3161         result = jwt.decode(self.jwt['user'],
 3162                             secret,  algorithms=['HS256'],
 3163                             audience=self.db.config.TRACKER_WEB,
 3164                             issuer=self.db.config.TRACKER_WEB)
 3165         self.assertEqual(self.claim['user'],result)
 3166 
 3167         result = jwt.decode(self.jwt['user:email'],
 3168                             secret,  algorithms=['HS256'],
 3169                             audience=self.db.config.TRACKER_WEB,
 3170                             issuer=self.db.config.TRACKER_WEB)
 3171         self.assertEqual(self.claim['user:email'],result)
 3172 
 3173         # set environment for all jwt tests
 3174         env = {
 3175             'PATH_INFO': 'rest/data/user',
 3176             'HTTP_HOST': 'localhost',
 3177             'TRACKER_NAME': 'rounduptest',
 3178             "REQUEST_METHOD": "GET"
 3179         }
 3180         self.dummy_client = client.Client(self.instance, MockNull(), env,
 3181                                           [], None)
 3182         self.dummy_client.db = self.db
 3183         self.dummy_client.request.headers.get = self.get_header
 3184         self.empty_form = cgi.FieldStorage()
 3185         self.terse_form = cgi.FieldStorage()
 3186         self.terse_form.list = [
 3187             cgi.MiniFieldStorage('@verbose', '0'),
 3188         ]
 3189         self.dummy_client.form = cgi.FieldStorage()
 3190         self.dummy_client.form.list = [
 3191             cgi.MiniFieldStorage('@fields', 'username,address'),
 3192         ]
 3193         # accumulate json output for further analysis
 3194         self.dummy_client.write = wh
 3195 
 3196         # set up for standard user role token
 3197         env['HTTP_AUTHORIZATION'] = 'bearer %s'%self.jwt['user']
 3198         self.dummy_client.main()
 3199         print(out[0])
 3200         json_dict = json.loads(b2s(out[0]))
 3201         print(json_dict)
 3202         # user will be joe id 3 as auth works
 3203         self.assertTrue('3', self.db.getuid())
 3204         # there should be three items in the collection admin, anon, and joe
 3205         self.assertEqual(3, len(json_dict['data']['collection']))
 3206         # since this token has no access to email addresses, only joe
 3207         # should have email addresses. Order is by id by default.
 3208         self.assertFalse('address' in json_dict['data']['collection'][0])
 3209         self.assertFalse('address' in json_dict['data']['collection'][1])
 3210         self.assertTrue('address' in json_dict['data']['collection'][2])
 3211         del(out[0])
 3212         self.db.setCurrentUser('admin')
 3213 
 3214     @skip_jwt
 3215     def test_user_email_jwt(self):
 3216         '''tests "Rest Access" permission present case'''
 3217         # self.dummy_client.main() closes database, so
 3218         # we need a new test with setup called for each test
 3219         out = []
 3220         def wh(s):
 3221             out.append(s)
 3222 
 3223         secret = self.db.config.WEB_JWT_SECRET
 3224 
 3225         # verify library and tokens are correct
 3226         self.assertRaises(jwt.exceptions.InvalidTokenError,
 3227                           jwt.decode, self.jwt['expired'],
 3228                           secret,  algorithms=['HS256'],
 3229                           audience=self.db.config.TRACKER_WEB,
 3230                           issuer=self.db.config.TRACKER_WEB)
 3231 
 3232         result = jwt.decode(self.jwt['user'],
 3233                             secret,  algorithms=['HS256'],
 3234                             audience=self.db.config.TRACKER_WEB,
 3235                             issuer=self.db.config.TRACKER_WEB)
 3236         self.assertEqual(self.claim['user'],result)
 3237 
 3238         result = jwt.decode(self.jwt['user:email'],
 3239                             secret,  algorithms=['HS256'],
 3240                             audience=self.db.config.TRACKER_WEB,
 3241                             issuer=self.db.config.TRACKER_WEB)
 3242         self.assertEqual(self.claim['user:email'],result)
 3243 
 3244         # set environment for all jwt tests
 3245         env = {
 3246             'PATH_INFO': 'rest/data/user',
 3247             'HTTP_HOST': 'localhost',
 3248             'TRACKER_NAME': 'rounduptest',
 3249             "REQUEST_METHOD": "GET"
 3250         }
 3251         self.dummy_client = client.Client(self.instance, MockNull(), env,
 3252                                           [], None)
 3253         self.dummy_client.db = self.db
 3254         self.dummy_client.request.headers.get = self.get_header
 3255         self.empty_form = cgi.FieldStorage()
 3256         self.terse_form = cgi.FieldStorage()
 3257         self.terse_form.list = [
 3258             cgi.MiniFieldStorage('@verbose', '0'),
 3259         ]
 3260         self.dummy_client.form = cgi.FieldStorage()
 3261         self.dummy_client.form.list = [
 3262             cgi.MiniFieldStorage('@fields', 'username,address'),
 3263         ]
 3264         # accumulate json output for further analysis
 3265         self.dummy_client.write = wh
 3266 
 3267         # set up for limited user:email role token
 3268         env['HTTP_AUTHORIZATION'] = 'bearer %s'%self.jwt['user:email']
 3269         self.dummy_client.main()
 3270         json_dict = json.loads(b2s(out[0]))
 3271         print(json_dict)
 3272         # user will be joe id 3 as auth works
 3273         self.assertTrue('3', self.db.getuid())
 3274         # there should be three items in the collection admin, anon, and joe
 3275         self.assertEqual(3, len(json_dict['data']['collection']))
 3276         # However this token has access to email addresses, so all three
 3277         # should have email addresses. Order is by id by default.
 3278         self.assertTrue('address' in json_dict['data']['collection'][0])
 3279         self.assertTrue('address' in json_dict['data']['collection'][1])
 3280         self.assertTrue('address' in json_dict['data']['collection'][2])
 3281 
 3282     @skip_jwt
 3283     def test_user_emailnorest_jwt(self):
 3284         '''tests "Rest Access" permission missing case'''
 3285         # self.dummy_client.main() closes database, so
 3286         # we need a new test with setup called for each test
 3287         out = []
 3288         def wh(s):
 3289             out.append(s)
 3290 
 3291         secret = self.db.config.WEB_JWT_SECRET
 3292 
 3293         # verify library and tokens are correct
 3294         self.assertRaises(jwt.exceptions.InvalidTokenError,
 3295                           jwt.decode, self.jwt['expired'],
 3296                           secret,  algorithms=['HS256'],
 3297                           audience=self.db.config.TRACKER_WEB,
 3298                           issuer=self.db.config.TRACKER_WEB)
 3299 
 3300         result = jwt.decode(self.jwt['user'],
 3301                             secret,  algorithms=['HS256'],
 3302                             audience=self.db.config.TRACKER_WEB,
 3303                             issuer=self.db.config.TRACKER_WEB)
 3304         self.assertEqual(self.claim['user'],result)
 3305 
 3306         result = jwt.decode(self.jwt['user:email'],
 3307                             secret,  algorithms=['HS256'],
 3308                             audience=self.db.config.TRACKER_WEB,
 3309                             issuer=self.db.config.TRACKER_WEB)
 3310         self.assertEqual(self.claim['user:email'],result)
 3311 
 3312         # set environment for all jwt tests
 3313         env = {
 3314             'PATH_INFO': 'rest/data/user',
 3315             'HTTP_HOST': 'localhost',
 3316             'TRACKER_NAME': 'rounduptest',
 3317             "REQUEST_METHOD": "GET"
 3318         }
 3319         self.dummy_client = client.Client(self.instance, MockNull(), env,
 3320                                           [], None)
 3321         self.dummy_client.db = self.db
 3322         self.dummy_client.request.headers.get = self.get_header
 3323         self.empty_form = cgi.FieldStorage()
 3324         self.terse_form = cgi.FieldStorage()
 3325         self.terse_form.list = [
 3326             cgi.MiniFieldStorage('@verbose', '0'),
 3327         ]
 3328         self.dummy_client.form = cgi.FieldStorage()
 3329         self.dummy_client.form.list = [
 3330             cgi.MiniFieldStorage('@fields', 'username,address'),
 3331         ]
 3332         # accumulate json output for further analysis
 3333         self.dummy_client.write = wh
 3334 
 3335         # set up for limited user:email role token
 3336         env['HTTP_AUTHORIZATION'] = 'bearer %s'%self.jwt['user:emailnorest']
 3337         self.dummy_client.main()
 3338         json_dict = json.loads(b2s(out[0]))
 3339         # user will be joe id 3 as auth works
 3340         self.assertTrue('1', self.db.getuid())
 3341         { "error": { "status": 403, "msg": "Forbidden." } }
 3342         self.assertTrue('error' in json_dict)
 3343         self.assertTrue(json_dict['error']['status'], 403)
 3344         self.assertTrue(json_dict['error']['msg'], "Forbidden.")
 3345 
 3346     @skip_jwt
 3347     def test_disabled_jwt(self):
 3348         # self.dummy_client.main() closes database, so
 3349         # we need a new test with setup called for each test
 3350         out = []
 3351         def wh(s):
 3352             out.append(s)
 3353 
 3354         # set environment for all jwt tests
 3355         env = {
 3356             'PATH_INFO': 'rest/data/user',
 3357             'HTTP_HOST': 'localhost',
 3358             'TRACKER_NAME': 'rounduptest',
 3359             "REQUEST_METHOD": "GET"
 3360         }
 3361         self.dummy_client = client.Client(self.instance, MockNull(), env,
 3362                                           [], None)
 3363         self.dummy_client.db = self.db
 3364         self.dummy_client.request.headers.get = self.get_header
 3365         self.empty_form = cgi.FieldStorage()
 3366         self.terse_form = cgi.FieldStorage()
 3367         self.terse_form.list = [
 3368             cgi.MiniFieldStorage('@verbose', '0'),
 3369         ]
 3370         self.dummy_client.form = cgi.FieldStorage()
 3371         self.dummy_client.form.list = [
 3372             cgi.MiniFieldStorage('@fields', 'username,address'),
 3373         ]
 3374         # accumulate json output for further analysis
 3375         self.dummy_client.write = wh
 3376         # disable jwt validation by making secret too short
 3377         # use the default value for this in configure.py.
 3378         self.db.config['WEB_JWT_SECRET'] = "disabled"
 3379         env['HTTP_AUTHORIZATION'] = 'bearer %s'%self.jwt['user']
 3380         self.dummy_client.main()
 3381         # user will be 1 as there is no auth
 3382         self.assertTrue('1', self.db.getuid())
 3383         self.assertEqual(out[0], b'Invalid Login - Support for jwt disabled by admin.')
 3384 
 3385     @skip_jwt
 3386     def test_bad_issue_jwt(self):
 3387         # self.dummy_client.main() closes database, so
 3388         # we need a new test with setup called for each test
 3389         out = []
 3390         def wh(s):
 3391             out.append(s)
 3392 
 3393         # set environment for all jwt tests
 3394         env = {
 3395             'PATH_INFO': 'rest/data/user',
 3396             'HTTP_HOST': 'localhost',
 3397             'TRACKER_NAME': 'rounduptest',
 3398             "REQUEST_METHOD": "GET"
 3399         }
 3400         self.dummy_client = client.Client(self.instance, MockNull(), env,
 3401                                           [], None)
 3402         self.dummy_client.db = self.db
 3403         self.dummy_client.request.headers.get = self.get_header
 3404         self.empty_form = cgi.FieldStorage()
 3405         self.terse_form = cgi.FieldStorage()
 3406         self.terse_form.list = [
 3407             cgi.MiniFieldStorage('@verbose', '0'),
 3408         ]
 3409         self.dummy_client.form = cgi.FieldStorage()
 3410         self.dummy_client.form.list = [
 3411             cgi.MiniFieldStorage('@fields', 'username,address'),
 3412         ]
 3413         # accumulate json output for further analysis
 3414         self.dummy_client.write = wh
 3415         env['HTTP_AUTHORIZATION'] = 'bearer %s'%self.jwt['badiss']
 3416         self.dummy_client.main()
 3417         # user will be 1 as there is no auth
 3418         self.assertTrue('1', self.db.getuid())
 3419         self.assertEqual(out[0], b'Invalid Login - Invalid issuer')
 3420 
 3421     @skip_jwt
 3422     def test_bad_audience_jwt(self):
 3423         # self.dummy_client.main() closes database, so
 3424         # we need a new test with setup called for each test
 3425         out = []
 3426         def wh(s):
 3427             out.append(s)
 3428 
 3429         # set environment for all jwt tests
 3430         env = {
 3431             'PATH_INFO': 'rest/data/user',
 3432             'HTTP_HOST': 'localhost',
 3433             'TRACKER_NAME': 'rounduptest',
 3434             "REQUEST_METHOD": "GET"
 3435         }
 3436         self.dummy_client = client.Client(self.instance, MockNull(), env,
 3437                                           [], None)
 3438         self.dummy_client.db = self.db
 3439         self.dummy_client.request.headers.get = self.get_header
 3440         self.empty_form = cgi.FieldStorage()
 3441         self.terse_form = cgi.FieldStorage()
 3442         self.terse_form.list = [
 3443             cgi.MiniFieldStorage('@verbose', '0'),
 3444         ]
 3445         self.dummy_client.form = cgi.FieldStorage()
 3446         self.dummy_client.form.list = [
 3447             cgi.MiniFieldStorage('@fields', 'username,address'),
 3448         ]
 3449         # accumulate json output for further analysis
 3450         self.dummy_client.write = wh
 3451         env['HTTP_AUTHORIZATION'] = 'bearer %s'%self.jwt['badaud']
 3452         self.dummy_client.main()
 3453         # user will be 1 as there is no auth
 3454         self.assertTrue('1', self.db.getuid())
 3455         self.assertEqual(out[0], b'Invalid Login - Invalid audience')
 3456 
 3457     @skip_jwt
 3458     def test_bad_roles_jwt(self):
 3459         # self.dummy_client.main() closes database, so
 3460         # we need a new test with setup called for each test
 3461         out = []
 3462         def wh(s):
 3463             out.append(s)
 3464 
 3465         # set environment for all jwt tests
 3466         env = {
 3467             'PATH_INFO': 'rest/data/user',
 3468             'HTTP_HOST': 'localhost',
 3469             'TRACKER_NAME': 'rounduptest',
 3470             "REQUEST_METHOD": "GET"
 3471         }
 3472         self.dummy_client = client.Client(self.instance, MockNull(), env,
 3473                                           [], None)
 3474         self.dummy_client.db = self.db
 3475         self.dummy_client.request.headers.get = self.get_header
 3476         self.empty_form = cgi.FieldStorage()
 3477         self.terse_form = cgi.FieldStorage()
 3478         self.terse_form.list = [
 3479             cgi.MiniFieldStorage('@verbose', '0'),
 3480         ]
 3481         self.dummy_client.form = cgi.FieldStorage()
 3482         self.dummy_client.form.list = [
 3483             cgi.MiniFieldStorage('@fields', 'username,address'),
 3484         ]
 3485         # accumulate json output for further analysis
 3486         self.dummy_client.write = wh
 3487         env['HTTP_AUTHORIZATION'] = 'bearer %s'%self.jwt['badroles']
 3488         self.dummy_client.main()
 3489         # user will be 1 as there is no auth
 3490         self.assertTrue('1', self.db.getuid())
 3491         self.assertEqual(out[0], b'Invalid Login - Token roles are invalid.')
 3492 
 3493     @skip_jwt
 3494     def test_bad_subject_jwt(self):
 3495         # self.dummy_client.main() closes database, so
 3496         # we need a new test with setup called for each test
 3497         out = []
 3498         def wh(s):
 3499             out.append(s)
 3500 
 3501         # set environment for all jwt tests
 3502         env = {
 3503             'PATH_INFO': 'rest/data/user',
 3504             'HTTP_HOST': 'localhost',
 3505             'TRACKER_NAME': 'rounduptest',
 3506             "REQUEST_METHOD": "GET"
 3507         }
 3508         self.dummy_client = client.Client(self.instance, MockNull(), env,
 3509                                           [], None)
 3510         self.dummy_client.db = self.db
 3511         self.dummy_client.request.headers.get = self.get_header
 3512         self.empty_form = cgi.FieldStorage()
 3513         self.terse_form = cgi.FieldStorage()
 3514         self.terse_form.list = [
 3515             cgi.MiniFieldStorage('@verbose', '0'),
 3516         ]
 3517         self.dummy_client.form = cgi.FieldStorage()
 3518         self.dummy_client.form.list = [
 3519             cgi.MiniFieldStorage('@fields', 'username,address'),
 3520         ]
 3521         # accumulate json output for further analysis
 3522         self.dummy_client.write = wh
 3523         env['HTTP_AUTHORIZATION'] = 'bearer %s'%self.jwt['badsub']
 3524         self.dummy_client.main()
 3525         # user will be 1 as there is no auth
 3526         self.assertTrue('1', self.db.getuid())
 3527         self.assertEqual(out[0], b'Invalid Login - Token subject is invalid.')
 3528 
 3529 def get_obj(path, id):
 3530     return {
 3531         'id': id,
 3532         'link': path + id
 3533     }
 3534 
 3535 if __name__ == '__main__':
 3536     runner = unittest.TextTestRunner()
 3537     unittest.main(testRunner=runner)