"Fossies" - the Fresh Open Source Software Archive

Member "masakari-9.0.0/masakari/tests/unit/api/openstack/test_wsgi.py" (13 May 2020, 38013 Bytes) of package /linux/misc/openstack/masakari-9.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_wsgi.py": 8.0.0_vs_9.0.0.

    1 #    Copyright 2016 NTT DATA
    2 #    All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 
   16 import inspect
   17 from unittest import mock
   18 
   19 import six
   20 from six.moves import http_client as http
   21 import testscenarios
   22 import webob
   23 
   24 from oslo_serialization import jsonutils
   25 
   26 from masakari.api import api_version_request as api_version
   27 from masakari.api.openstack import extensions
   28 from masakari.api.openstack import wsgi
   29 from masakari.api import versioned_method
   30 from masakari import exception
   31 from masakari import test
   32 from masakari.tests.unit.api.openstack import fakes
   33 from masakari.tests.unit import matchers
   34 
   35 
   36 class MicroversionedTest(testscenarios.WithScenarios, test.NoDBTestCase):
   37 
   38     header_name = 'OpenStack-API-Version'
   39 
   40     def _make_microversion_header(self, value):
   41         return {self.header_name: 'instance-ha %s' % value}
   42 
   43 
   44 class RequestTest(MicroversionedTest):
   45 
   46     def test_content_type_missing(self):
   47         request = wsgi.Request.blank('/tests/123', method='POST')
   48         request.body = b"<body />"
   49         self.assertIsNone(request.get_content_type())
   50 
   51     def test_content_type_unsupported(self):
   52         request = wsgi.Request.blank('/tests/123', method='POST')
   53         request.headers["Content-Type"] = "text/html"
   54         request.body = b"asdf<br />"
   55         self.assertRaises(exception.InvalidContentType,
   56                           request.get_content_type)
   57 
   58     def test_content_type_with_charset(self):
   59         request = wsgi.Request.blank('/tests/123')
   60         request.headers["Content-Type"] = "application/json; charset=UTF-8"
   61         result = request.get_content_type()
   62         self.assertEqual(result, "application/json")
   63 
   64     def test_content_type_accept_default(self):
   65         request = wsgi.Request.blank('/tests/123.unsupported')
   66         request.headers["Accept"] = "application/unsupported1"
   67         result = request.best_match_content_type()
   68         self.assertEqual(result, "application/json")
   69 
   70     def test_from_request(self):
   71         self.stub_out('masakari.i18n.get_available_languages',
   72                       fakes.fake_get_available_languages)
   73 
   74         request = wsgi.Request.blank('/')
   75         accepted = 'bogus;q=1, en-gb;q=0.7,en-us,en;q=0.5,*;q=0.7'
   76         request.headers = {'Accept-Language': accepted}
   77         self.assertEqual(request.best_match_language(), 'en_US')
   78 
   79     def test_asterisk(self):
   80         # asterisk should match first available if there
   81         # are not any other available matches
   82         self.stub_out('masakari.i18n.get_available_languages',
   83                       fakes.fake_get_available_languages)
   84 
   85         request = wsgi.Request.blank('/')
   86         accepted = '*,es;q=0.5'
   87         request.headers = {'Accept-Language': accepted}
   88         self.assertEqual(request.best_match_language(), 'en_GB')
   89 
   90     def test_prefix(self):
   91         self.stub_out('masakari.i18n.get_available_languages',
   92                       fakes.fake_get_available_languages)
   93 
   94         request = wsgi.Request.blank('/')
   95         accepted = 'zh'
   96         request.headers = {'Accept-Language': accepted}
   97         self.assertEqual(request.best_match_language(), 'zh_CN')
   98 
   99     def test_secondary(self):
  100         self.stub_out('masakari.i18n.get_available_languages',
  101                       fakes.fake_get_available_languages)
  102 
  103         request = wsgi.Request.blank('/')
  104         accepted = 'nn,en-gb;q=0.5'
  105         request.headers = {'Accept-Language': accepted}
  106         self.assertEqual(request.best_match_language(), 'en_GB')
  107 
  108     def test_none_found(self):
  109         self.stub_out('masakari.i18n.get_available_languages',
  110                       fakes.fake_get_available_languages)
  111 
  112         request = wsgi.Request.blank('/')
  113         accepted = 'nb-no'
  114         request.headers = {'Accept-Language': accepted}
  115         self.assertIsNone(request.best_match_language())
  116 
  117     def test_no_lang_header(self):
  118         self.stub_out('masakari.i18n.get_available_languages',
  119                       fakes.fake_get_available_languages)
  120 
  121         request = wsgi.Request.blank('/')
  122         accepted = ''
  123         request.headers = {'Accept-Language': accepted}
  124         self.assertIsNone(request.best_match_language())
  125 
  126     def test_api_version_request_header_none(self):
  127         request = wsgi.Request.blank('/')
  128         request.set_api_version_request()
  129         self.assertEqual(api_version.APIVersionRequest(
  130             api_version.DEFAULT_API_VERSION), request.api_version_request)
  131 
  132     @mock.patch("masakari.api.api_version_request.max_api_version")
  133     def test_api_version_request_header(self, mock_maxver):
  134         mock_maxver.return_value = api_version.APIVersionRequest("1.0")
  135 
  136         request = wsgi.Request.blank('/')
  137         request.headers = self._make_microversion_header('1.0')
  138         request.set_api_version_request()
  139         self.assertEqual(api_version.APIVersionRequest("1.0"),
  140                          request.api_version_request)
  141 
  142     def test_api_version_request_header_invalid(self):
  143         request = wsgi.Request.blank('/')
  144         request.headers = self._make_microversion_header('1.1.1')
  145         self.assertRaises(exception.InvalidAPIVersionString,
  146                           request.set_api_version_request)
  147 
  148 
  149 class ActionDispatcherTest(test.NoDBTestCase):
  150     def test_dispatch(self):
  151         serializer = wsgi.ActionDispatcher()
  152         serializer.create = lambda x: 'pants'
  153         self.assertEqual(serializer.dispatch({}, action='create'), 'pants')
  154 
  155     def test_dispatch_action_None(self):
  156         serializer = wsgi.ActionDispatcher()
  157         serializer.create = lambda x: 'pants'
  158         serializer.default = lambda x: 'trousers'
  159         self.assertEqual(serializer.dispatch({}, action=None), 'trousers')
  160 
  161     def test_dispatch_default(self):
  162         serializer = wsgi.ActionDispatcher()
  163         serializer.create = lambda x: 'pants'
  164         serializer.default = lambda x: 'trousers'
  165         self.assertEqual(serializer.dispatch({}, action='update'), 'trousers')
  166 
  167 
  168 class JSONDictSerializerTest(test.NoDBTestCase):
  169     def test_json(self):
  170         input_dict = dict(segments=dict(a=(2, 3)))
  171         expected_json = '{"segments":{"a":[2,3]}}'
  172         serializer = wsgi.JSONDictSerializer()
  173         result = serializer.serialize(input_dict)
  174         result = result.replace('\n', '').replace(' ', '')
  175         self.assertEqual(result, expected_json)
  176 
  177 
  178 class JSONDeserializerTest(test.NoDBTestCase):
  179     def test_json(self):
  180         data = """{"a": {
  181                 "a1": "1",
  182                 "a2": "2",
  183                 "bs": ["1", "2", "3", {"c": {"c1": "1"}}],
  184                 "d": {"e": "1"},
  185                 "f": "1"}}"""
  186         as_dict = {
  187             'body': {
  188                 'a': {
  189                     'a1': '1',
  190                     'a2': '2',
  191                     'bs': ['1', '2', '3', {'c': {'c1': '1'}}],
  192                     'd': {'e': '1'},
  193                     'f': '1',
  194                 },
  195             },
  196         }
  197         deserializer = wsgi.JSONDeserializer()
  198         self.assertEqual(deserializer.deserialize(data), as_dict)
  199 
  200     def test_json_valid_utf8(self):
  201         data = b"""{"segment": {"recovery_method": "auto",
  202                 "name": "\xe6\xa6\x82\xe5\xbf\xb5",
  203                 "service_type": "COMPUTE_HOST"
  204                 }} """
  205         as_dict = {
  206             'body': {
  207                 u'segment': {u'recovery_method': 'auto',
  208                              u'name': u'\u6982\u5ff5',
  209                              u'service_type': u'COMPUTE_HOST'
  210                              }
  211             }
  212         }
  213         deserializer = wsgi.JSONDeserializer()
  214         self.assertEqual(deserializer.deserialize(data), as_dict)
  215 
  216     def test_json_invalid_utf8(self):
  217         """Send invalid utf-8 to JSONDeserializer."""
  218         data = b"""{"segment": {
  219                 "name": "\xf0\x28\x8c\x28",
  220                 "recovery_method": "auto",
  221                 "description": "compute hosts with shared storage enabled."
  222                 "service_type": "COMPUTE_HOST"}} """
  223         deserializer = wsgi.JSONDeserializer()
  224         self.assertRaises(exception.MalformedRequestBody,
  225                           deserializer.deserialize, data)
  226 
  227 
  228 class ResourceTest(MicroversionedTest):
  229 
  230     def get_req_id_header_name(self, request):
  231         return 'x-openstack-request-id'
  232 
  233     def test_resource_call_with_method_get(self):
  234         class Controller(object):
  235             def index(self, req):
  236                 return 'success'
  237 
  238         app = fakes.TestRouter(Controller())
  239         # the default method is GET
  240         req = webob.Request.blank('/tests')
  241         response = req.get_response(app)
  242         self.assertEqual(b'success', response.body)
  243         self.assertEqual(response.status_int, http.OK)
  244         req.body = b'{"body": {"key": "value"}}'
  245         response = req.get_response(app)
  246         self.assertEqual(b'success', response.body)
  247         self.assertEqual(response.status_int, http.OK)
  248         req.content_type = 'application/json'
  249         response = req.get_response(app)
  250         self.assertEqual(b'success', response.body)
  251         self.assertEqual(response.status_int, http.OK)
  252 
  253     def test_resource_call_with_method_post(self):
  254         class Controller(object):
  255             @extensions.expected_errors(http.BAD_REQUEST)
  256             def create(self, req, body):
  257                 if expected_body != body:
  258                     msg = "The request body invalid"
  259                     raise webob.exc.HTTPBadRequest(explanation=msg)
  260                 return "success"
  261         # verify the method: POST
  262         app = fakes.TestRouter(Controller())
  263         req = webob.Request.blank('/tests', method="POST",
  264                                   content_type='application/json')
  265         req.body = b'{"body": {"key": "value"}}'
  266         expected_body = {'body': {
  267             "key": "value"
  268             }
  269         }
  270         response = req.get_response(app)
  271         self.assertEqual(response.status_int, http.OK)
  272         self.assertEqual(b'success', response.body)
  273         # verify without body
  274         expected_body = None
  275         req.body = None
  276         response = req.get_response(app)
  277         self.assertEqual(response.status_int, http.OK)
  278         self.assertEqual(b'success', response.body)
  279         # the body is validated in the controller
  280         expected_body = {'body': None}
  281         response = req.get_response(app)
  282         expected_unsupported_type_body = {'badRequest':
  283             {'message': 'The request body invalid', 'code': http.BAD_REQUEST}}
  284         self.assertEqual(response.status_int, http.BAD_REQUEST)
  285         self.assertEqual(expected_unsupported_type_body,
  286                          jsonutils.loads(response.body))
  287 
  288     def test_resource_call_with_method_put(self):
  289         class Controller(object):
  290             def update(self, req, id, body):
  291                 if expected_body != body:
  292                     msg = "The request body invalid"
  293                     raise webob.exc.HTTPBadRequest(explanation=msg)
  294                 return "success"
  295         # verify the method: PUT
  296         app = fakes.TestRouter(Controller())
  297         req = webob.Request.blank('/tests/test_id', method="PUT",
  298                                   content_type='application/json')
  299         req.body = b'{"body": {"key": "value"}}'
  300         expected_body = {'body': {
  301             "key": "value"
  302             }
  303         }
  304         response = req.get_response(app)
  305         self.assertEqual(b'success', response.body)
  306         self.assertEqual(response.status_int, http.OK)
  307         req.body = None
  308         expected_body = None
  309         response = req.get_response(app)
  310         self.assertEqual(response.status_int, http.OK)
  311         # verify no content_type is contained in the request
  312         req = webob.Request.blank('/tests/test_id', method="PUT",
  313                                   content_type='application/xml')
  314         req.content_type = 'application/xml'
  315         req.body = b'{"body": {"key": "value"}}'
  316         response = req.get_response(app)
  317         expected_unsupported_type_body = {'badMediaType':
  318             {'message': 'Unsupported Content-Type',
  319              'code': http.UNSUPPORTED_MEDIA_TYPE}}
  320         self.assertEqual(response.status_int, http.UNSUPPORTED_MEDIA_TYPE)
  321         self.assertEqual(expected_unsupported_type_body,
  322                          jsonutils.loads(response.body))
  323 
  324     def test_resource_call_with_method_delete(self):
  325         class Controller(object):
  326             def delete(self, req, id):
  327                 return "success"
  328 
  329         # verify the method: DELETE
  330         app = fakes.TestRouter(Controller())
  331         req = webob.Request.blank('/tests/test_id', method="DELETE")
  332         response = req.get_response(app)
  333         self.assertEqual(response.status_int, http.OK)
  334         self.assertEqual(b'success', response.body)
  335         # ignore the body
  336         req.body = b'{"body": {"key": "value"}}'
  337         response = req.get_response(app)
  338         self.assertEqual(response.status_int, http.OK)
  339         self.assertEqual(b'success', response.body)
  340 
  341     def test_resource_not_authorized(self):
  342         class Controller(object):
  343             def index(self, req):
  344                 raise exception.Forbidden()
  345 
  346         req = webob.Request.blank('/tests')
  347         app = fakes.TestRouter(Controller())
  348         response = req.get_response(app)
  349         self.assertEqual(response.status_int, http.FORBIDDEN)
  350 
  351     def test_dispatch(self):
  352         class Controller(object):
  353             def index(self, req, pants=None):
  354                 return pants
  355 
  356         controller = Controller()
  357         resource = wsgi.Resource(controller)
  358         method, extensions = resource.get_method(None, 'index', None, '')
  359         actual = resource.dispatch(method, None, {'pants': 'off'})
  360         expected = 'off'
  361         self.assertEqual(actual, expected)
  362 
  363     def test_get_method_unknown_controller_method(self):
  364         class Controller(object):
  365             def index(self, req, pants=None):
  366                 return pants
  367 
  368         controller = Controller()
  369         resource = wsgi.Resource(controller)
  370         self.assertRaises(AttributeError, resource.get_method,
  371                           None, 'create', None, '')
  372 
  373     def test_get_method_action_json(self):
  374         class Controller(wsgi.Controller):
  375             @wsgi.action('fooAction')
  376             def _action_foo(self, req, id, body):
  377                 return body
  378 
  379         controller = Controller()
  380         resource = wsgi.Resource(controller)
  381         method, extensions = resource.get_method(None, 'action',
  382                                                  'application/json',
  383                                                  '{"fooAction": true}')
  384         self.assertEqual(controller._action_foo, method)
  385 
  386     def test_get_method_action_bad_body(self):
  387         class Controller(wsgi.Controller):
  388             @wsgi.action('fooAction')
  389             def _action_foo(self, req, id, body):
  390                 return body
  391 
  392         controller = Controller()
  393         resource = wsgi.Resource(controller)
  394         self.assertRaises(exception.MalformedRequestBody, resource.get_method,
  395                           None, 'action', 'application/json', '{}')
  396 
  397     def test_get_method_unknown_controller_action(self):
  398         class Controller(wsgi.Controller):
  399             @wsgi.action('fooAction')
  400             def _action_foo(self, req, id, body):
  401                 return body
  402 
  403         controller = Controller()
  404         resource = wsgi.Resource(controller)
  405         self.assertRaises(KeyError, resource.get_method,
  406                           None, 'action', 'application/json',
  407                           '{"barAction": true}')
  408 
  409     def test_get_method_action_method(self):
  410         class Controller(object):
  411             def action(self, req, pants=None):
  412                 return pants
  413 
  414         controller = Controller()
  415         resource = wsgi.Resource(controller)
  416         method, extensions = resource.get_method(None, 'action',
  417                                                  'application/xml',
  418                                                  '<fooAction>true</fooAction')
  419         self.assertEqual(controller.action, method)
  420 
  421     def test_get_action_args(self):
  422         class Controller(object):
  423             def index(self, req, pants=None):
  424                 return pants
  425 
  426         controller = Controller()
  427         resource = wsgi.Resource(controller)
  428 
  429         env = {
  430             'wsgiorg.routing_args': [None, {
  431                 'controller': None,
  432                 'format': None,
  433                 'action': 'update',
  434                 'id': 12,
  435             }],
  436         }
  437 
  438         expected = {'action': 'update', 'id': 12}
  439 
  440         self.assertEqual(resource.get_action_args(env), expected)
  441 
  442     def test_get_body_bad_content(self):
  443         class Controller(object):
  444             def index(self, req, pants=None):
  445                 return pants
  446 
  447         controller = Controller()
  448         resource = wsgi.Resource(controller)
  449 
  450         request = wsgi.Request.blank('/', method='POST')
  451         request.headers['Content-Type'] = 'application/none'
  452         request.body = b'foo'
  453 
  454         self.assertRaises(exception.InvalidContentType,
  455                           resource.get_body, request)
  456 
  457     def test_get_body_no_content_type(self):
  458         class Controller(object):
  459             def index(self, req, pants=None):
  460                 return pants
  461 
  462         controller = Controller()
  463         resource = wsgi.Resource(controller)
  464 
  465         request = wsgi.Request.blank('/', method='POST')
  466         request.body = b'foo'
  467 
  468         content_type, body = resource.get_body(request)
  469         self.assertIsNone(content_type)
  470         self.assertEqual(b'foo', body)
  471 
  472     def test_get_body_no_content_body(self):
  473         class Controller(object):
  474             def index(self, req, pants=None):
  475                 return pants
  476 
  477         controller = Controller()
  478         resource = wsgi.Resource(controller)
  479 
  480         request = wsgi.Request.blank('/', method='POST')
  481         request.headers['Content-Type'] = 'application/json'
  482         request.body = b''
  483 
  484         content_type, body = resource.get_body(request)
  485         self.assertEqual('application/json', content_type)
  486         self.assertEqual(b'', body)
  487 
  488     def test_get_body(self):
  489         class Controller(object):
  490             def index(self, req, pants=None):
  491                 return pants
  492 
  493         controller = Controller()
  494         resource = wsgi.Resource(controller)
  495 
  496         request = wsgi.Request.blank('/', method='POST')
  497         request.headers['Content-Type'] = 'application/json'
  498         request.body = b'foo'
  499 
  500         content_type, body = resource.get_body(request)
  501         self.assertEqual(content_type, 'application/json')
  502         self.assertEqual(b'foo', body)
  503 
  504     def test_get_dict_response_body(self):
  505 
  506         class Controller(wsgi.Controller):
  507             def index(self, req):
  508                 return {'foo': 'bar'}
  509 
  510         req = fakes.HTTPRequest.blank('/tests')
  511         app = fakes.TestRouter(Controller())
  512         response = req.get_response(app)
  513         self.assertIn('masakari.context', req.environ)
  514         self.assertEqual(b'{"foo": "bar"}', response.body)
  515         self.assertEqual(response.status_int, http.OK)
  516 
  517     def test_str_response_body(self):
  518 
  519         class Controller(wsgi.Controller):
  520             def index(self, req):
  521                 return 'foo'
  522 
  523         req = fakes.HTTPRequest.blank('/tests')
  524         app = fakes.TestRouter(Controller())
  525         response = req.get_response(app)
  526         expected_header = self.get_req_id_header_name(req)
  527         self.assertFalse(hasattr(response.headers, expected_header))
  528         self.assertEqual(b'foo', response.body)
  529         self.assertEqual(response.status_int, http.OK)
  530 
  531     def test_get_no_response_body(self):
  532 
  533         class Controller(object):
  534             def index(self, req):
  535                 pass
  536 
  537         req = fakes.HTTPRequest.blank('/tests')
  538         app = fakes.TestRouter(Controller())
  539         response = req.get_response(app)
  540         self.assertIn('masakari.context', req.environ)
  541         self.assertEqual(b'', response.body)
  542         self.assertEqual(response.status_int, http.OK)
  543 
  544     def test_deserialize_default(self):
  545         class Controller(object):
  546             def index(self, req, pants=None):
  547                 return pants
  548 
  549         controller = Controller()
  550         resource = wsgi.Resource(controller)
  551 
  552         obj = resource.deserialize('["foo"]')
  553         self.assertEqual(obj, {'body': ['foo']})
  554 
  555     def test_register_actions(self):
  556         class Controller(object):
  557             def index(self, req, pants=None):
  558                 return pants
  559 
  560         class ControllerExtended(wsgi.Controller):
  561             @wsgi.action('fooAction')
  562             def _action_foo(self, req, id, body):
  563                 return body
  564 
  565             @wsgi.action('barAction')
  566             def _action_bar(self, req, id, body):
  567                 return body
  568 
  569         controller = Controller()
  570         resource = wsgi.Resource(controller)
  571         self.assertEqual({}, resource.wsgi_actions)
  572 
  573         extended = ControllerExtended()
  574         resource.register_actions(extended)
  575         self.assertEqual({'fooAction': extended._action_foo,
  576                           'barAction': extended._action_bar
  577                           }, resource.wsgi_actions)
  578 
  579     def test_register_extensions(self):
  580         class Controller(object):
  581             def index(self, req, pants=None):
  582                 return pants
  583 
  584         class ControllerExtended(wsgi.Controller):
  585             @wsgi.extends
  586             def index(self, req, resp_obj, pants=None):
  587                 return None
  588 
  589             @wsgi.extends(action='fooAction')
  590             def _action_foo(self, req, resp, id, body):
  591                 return None
  592 
  593         controller = Controller()
  594         resource = wsgi.Resource(controller)
  595         self.assertEqual({}, resource.wsgi_extensions)
  596         self.assertEqual({}, resource.wsgi_action_extensions)
  597 
  598         extended = ControllerExtended()
  599         resource.register_extensions(extended)
  600         self.assertEqual({'index': [extended.index]}, resource.wsgi_extensions)
  601         self.assertEqual({'fooAction': [extended._action_foo]},
  602                          resource.wsgi_action_extensions)
  603 
  604     def test_get_method_extensions(self):
  605         class Controller(object):
  606             def index(self, req, pants=None):
  607                 return pants
  608 
  609         class ControllerExtended(wsgi.Controller):
  610             @wsgi.extends
  611             def index(self, req, resp_obj, pants=None):
  612                 return None
  613 
  614         controller = Controller()
  615         extended = ControllerExtended()
  616         resource = wsgi.Resource(controller)
  617         resource.register_extensions(extended)
  618         method, extensions = resource.get_method(None, 'index', None, '')
  619         self.assertEqual(method, controller.index)
  620         self.assertEqual(extensions, [extended.index])
  621 
  622     def test_get_method_action_extensions(self):
  623         class Controller(wsgi.Controller):
  624             def index(self, req, pants=None):
  625                 return pants
  626 
  627             @wsgi.action('fooAction')
  628             def _action_foo(self, req, id, body):
  629                 return body
  630 
  631         class ControllerExtended(wsgi.Controller):
  632             @wsgi.extends(action='fooAction')
  633             def _action_foo(self, req, resp_obj, id, body):
  634                 return None
  635 
  636         controller = Controller()
  637         extended = ControllerExtended()
  638         resource = wsgi.Resource(controller)
  639         resource.register_extensions(extended)
  640         method, extensions = resource.get_method(None, 'action',
  641                                                  'application/json',
  642                                                  '{"fooAction": true}')
  643         self.assertEqual(method, controller._action_foo)
  644         self.assertEqual(extensions, [extended._action_foo])
  645 
  646     def test_get_method_action_whitelist_extensions(self):
  647         class Controller(wsgi.Controller):
  648             def index(self, req, pants=None):
  649                 return pants
  650 
  651         class ControllerExtended(wsgi.Controller):
  652             @wsgi.action('create')
  653             def _create(self, req, body):
  654                 pass
  655 
  656             @wsgi.action('delete')
  657             def _delete(self, req, id):
  658                 pass
  659 
  660         controller = Controller()
  661         extended = ControllerExtended()
  662         resource = wsgi.Resource(controller)
  663         resource.register_actions(extended)
  664 
  665         method, extensions = resource.get_method(None, 'create',
  666                                                  'application/json',
  667                                                  '{"create": true}')
  668         self.assertEqual(method, extended._create)
  669         self.assertEqual(extensions, [])
  670 
  671         method, extensions = resource.get_method(None, 'delete', None, None)
  672         self.assertEqual(method, extended._delete)
  673         self.assertEqual(extensions, [])
  674 
  675     def test_pre_process_extensions_regular(self):
  676         class Controller(object):
  677             def index(self, req, pants=None):
  678                 return pants
  679 
  680         controller = Controller()
  681         resource = wsgi.Resource(controller)
  682 
  683         called = []
  684 
  685         def extension1(req, resp_obj):
  686             called.append(1)
  687             return None
  688 
  689         def extension2(req, resp_obj):
  690             called.append(2)
  691             return None
  692 
  693         extensions = [extension1, extension2]
  694         response, post = resource.pre_process_extensions(extensions, None, {})
  695         self.assertEqual(called, [])
  696         self.assertIsNone(response)
  697         self.assertEqual(list(post), [extension2, extension1])
  698 
  699     def test_pre_process_extensions_generator(self):
  700         class Controller(object):
  701             def index(self, req, pants=None):
  702                 return pants
  703 
  704         controller = Controller()
  705         resource = wsgi.Resource(controller)
  706 
  707         called = []
  708 
  709         def extension1(req):
  710             called.append('pre1')
  711             yield
  712             called.append('post1')
  713 
  714         def extension2(req):
  715             called.append('pre2')
  716             yield
  717             called.append('post2')
  718 
  719         extensions = [extension1, extension2]
  720         response, post = resource.pre_process_extensions(extensions, None, {})
  721         post = list(post)
  722         self.assertEqual(called, ['pre1', 'pre2'])
  723         self.assertIsNone(response)
  724         self.assertEqual(len(post), 2)
  725         self.assertTrue(inspect.isgenerator(post[0]))
  726         self.assertTrue(inspect.isgenerator(post[1]))
  727 
  728         for gen in post:
  729             try:
  730                 gen.send(None)
  731             except StopIteration:
  732                 continue
  733 
  734         self.assertEqual(called, ['pre1', 'pre2', 'post2', 'post1'])
  735 
  736     def test_pre_process_extensions_generator_response(self):
  737         class Controller(object):
  738             def index(self, req, pants=None):
  739                 return pants
  740 
  741         controller = Controller()
  742         resource = wsgi.Resource(controller)
  743 
  744         called = []
  745 
  746         def extension1(req):
  747             called.append('pre1')
  748             yield 'foo'
  749 
  750         def extension2(req):
  751             called.append('pre2')
  752 
  753         extensions = [extension1, extension2]
  754         response, post = resource.pre_process_extensions(extensions, None, {})
  755         self.assertEqual(called, ['pre1'])
  756         self.assertEqual(response, 'foo')
  757         self.assertEqual(post, [])
  758 
  759     def test_post_process_extensions_regular(self):
  760         class Controller(object):
  761             def index(self, req, pants=None):
  762                 return pants
  763 
  764         controller = Controller()
  765         resource = wsgi.Resource(controller)
  766 
  767         called = []
  768 
  769         def extension1(req, resp_obj):
  770             called.append(1)
  771             return None
  772 
  773         def extension2(req, resp_obj):
  774             called.append(2)
  775             return None
  776 
  777         response = resource.post_process_extensions([extension2, extension1],
  778                                                     None, None, {})
  779         self.assertEqual(called, [2, 1])
  780         self.assertIsNone(response)
  781 
  782     def test_post_process_extensions_regular_response(self):
  783         class Controller(object):
  784             def index(self, req, pants=None):
  785                 return pants
  786 
  787         controller = Controller()
  788         resource = wsgi.Resource(controller)
  789 
  790         called = []
  791 
  792         def extension1(req, resp_obj):
  793             called.append(1)
  794             return None
  795 
  796         def extension2(req, resp_obj):
  797             called.append(2)
  798             return 'foo'
  799 
  800         response = resource.post_process_extensions([extension2, extension1],
  801                                                     None, None, {})
  802         self.assertEqual(called, [2])
  803         self.assertEqual(response, 'foo')
  804 
  805     def test_post_process_extensions_generator(self):
  806         class Controller(object):
  807             def index(self, req, pants=None):
  808                 return pants
  809 
  810         controller = Controller()
  811         resource = wsgi.Resource(controller)
  812 
  813         called = []
  814 
  815         def extension1(req):
  816             yield
  817             called.append(1)
  818 
  819         def extension2(req):
  820             yield
  821             called.append(2)
  822 
  823         ext1 = extension1(None)
  824         next(ext1)
  825         ext2 = extension2(None)
  826         next(ext2)
  827 
  828         response = resource.post_process_extensions([ext2, ext1],
  829                                                     None, None, {})
  830 
  831         self.assertEqual(called, [2, 1])
  832         self.assertIsNone(response)
  833 
  834     def test_post_process_extensions_generator_response(self):
  835         class Controller(object):
  836             def index(self, req, pants=None):
  837                 return pants
  838 
  839         controller = Controller()
  840         resource = wsgi.Resource(controller)
  841 
  842         called = []
  843 
  844         def extension1(req):
  845             yield
  846             called.append(1)
  847 
  848         def extension2(req):
  849             yield
  850             called.append(2)
  851             yield 'foo'
  852 
  853         ext1 = extension1(None)
  854         next(ext1)
  855         ext2 = extension2(None)
  856         next(ext2)
  857 
  858         response = resource.post_process_extensions([ext2, ext1],
  859                                                     None, None, {})
  860 
  861         self.assertEqual(called, [2])
  862         self.assertEqual(response, 'foo')
  863 
  864     def test_resource_exception_handler_type_error(self):
  865         # A TypeError should be translated to a Fault/HTTP 400.
  866         def foo(a,):
  867             return a
  868 
  869         try:
  870             with wsgi.ResourceExceptionHandler():
  871                 foo()  # generate a TypeError
  872             self.fail("Should have raised a Fault (HTTP 400)")
  873         except wsgi.Fault as fault:
  874             self.assertEqual(http.BAD_REQUEST, fault.status_int)
  875 
  876     def test_resource_headers_py2_are_utf8(self):
  877         resp = webob.Response(status_int=http.ACCEPTED)
  878         resp.headers['x-header1'] = 1
  879         resp.headers['x-header2'] = u'header2'
  880         resp.headers['x-header3'] = u'header3'
  881 
  882         class Controller(object):
  883             def index(self, req):
  884                 return resp
  885 
  886         req = webob.Request.blank('/tests')
  887         app = fakes.TestRouter(Controller())
  888         response = req.get_response(app)
  889 
  890         if six.PY2:
  891             for val in response.headers.values():
  892                 # All headers must be utf8
  893                 self.assertThat(val, matchers.EncodedByUTF8())
  894             self.assertEqual(b'1', response.headers['x-header1'])
  895             self.assertEqual(b'header2', response.headers['x-header2'])
  896             self.assertEqual(b'header3', response.headers['x-header3'])
  897         else:
  898             self.assertEqual('1', response.headers['x-header1'])
  899             self.assertEqual(u'header2', response.headers['x-header2'])
  900             self.assertEqual(u'header3', response.headers['x-header3'])
  901 
  902     def test_resource_valid_utf8_body(self):
  903         class Controller(object):
  904             def update(self, req, id, body):
  905                 return body
  906 
  907         req = webob.Request.blank('/tests/test_id', method="PUT")
  908         body = b""" {"name": "\xe6\xa6\x82\xe5\xbf\xb5" } """
  909         expected_body = b'{"name": "\\u6982\\u5ff5"}'
  910         req.body = body
  911         req.headers['Content-Type'] = 'application/json'
  912         app = fakes.TestRouter(Controller())
  913         response = req.get_response(app)
  914         self.assertEqual(response.body, expected_body)
  915         self.assertEqual(response.status_int, http.OK)
  916 
  917     def test_resource_invalid_utf8(self):
  918         class Controller(object):
  919             def update(self, req, id, body):
  920                 return body
  921 
  922         req = webob.Request.blank('/tests/test_id', method="PUT")
  923         body = b""" {"name": "\xf0\x28\x8c\x28" } """
  924         req.body = body
  925         req.headers['Content-Type'] = 'application/json'
  926         app = fakes.TestRouter(Controller())
  927         self.assertRaises(UnicodeDecodeError, req.get_response, app)
  928 
  929 
  930 class ResponseObjectTest(test.NoDBTestCase):
  931     def test_default_code(self):
  932         robj = wsgi.ResponseObject({})
  933         self.assertEqual(robj.code, http.OK)
  934 
  935     def test_modified_code(self):
  936         robj = wsgi.ResponseObject({})
  937         robj._default_code = http.ACCEPTED
  938         self.assertEqual(robj.code, http.ACCEPTED)
  939 
  940     def test_override_default_code(self):
  941         robj = wsgi.ResponseObject({}, code=http.NOT_FOUND)
  942         self.assertEqual(robj.code, http.NOT_FOUND)
  943 
  944     def test_override_modified_code(self):
  945         robj = wsgi.ResponseObject({}, code=http.NOT_FOUND)
  946         robj._default_code = http.ACCEPTED
  947         self.assertEqual(robj.code, http.NOT_FOUND)
  948 
  949     def test_set_header(self):
  950         robj = wsgi.ResponseObject({})
  951         robj['Header'] = 'foo'
  952         self.assertEqual(robj.headers, {'header': 'foo'})
  953 
  954     def test_get_header(self):
  955         robj = wsgi.ResponseObject({})
  956         robj['Header'] = 'foo'
  957         self.assertEqual(robj['hEADER'], 'foo')
  958 
  959     def test_del_header(self):
  960         robj = wsgi.ResponseObject({})
  961         robj['Header'] = 'foo'
  962         del robj['hEADER']
  963         self.assertNotIn('header', robj.headers)
  964 
  965     def test_header_isolation(self):
  966         robj = wsgi.ResponseObject({})
  967         robj['Header'] = 'foo'
  968         hdrs = robj.headers
  969         hdrs['hEADER'] = 'bar'
  970         self.assertEqual(robj['hEADER'], 'foo')
  971 
  972 
  973 class ValidBodyTest(test.NoDBTestCase):
  974 
  975     def setUp(self):
  976         super(ValidBodyTest, self).setUp()
  977         self.controller = wsgi.Controller()
  978 
  979     def test_is_valid_body(self):
  980         body = {'foo': {}}
  981         self.assertTrue(self.controller.is_valid_body(body, 'foo'))
  982 
  983     def test_is_valid_body_none(self):
  984         wsgi.Resource(controller=None)
  985         self.assertFalse(self.controller.is_valid_body(None, 'foo'))
  986 
  987     def test_is_valid_body_empty(self):
  988         wsgi.Resource(controller=None)
  989         self.assertFalse(self.controller.is_valid_body({}, 'foo'))
  990 
  991     def test_is_valid_body_no_entity(self):
  992         wsgi.Resource(controller=None)
  993         body = {'bar': {}}
  994         self.assertFalse(self.controller.is_valid_body(body, 'foo'))
  995 
  996     def test_is_valid_body_malformed_entity(self):
  997         wsgi.Resource(controller=None)
  998         body = {'foo': 'bar'}
  999         self.assertFalse(self.controller.is_valid_body(body, 'foo'))
 1000 
 1001 
 1002 class TestController(test.NoDBTestCase):
 1003     def test_check_for_versions_intersection_negative(self):
 1004         func_list = [
 1005             versioned_method.VersionedMethod('foo', (
 1006                 api_version.APIVersionRequest('2.1'
 1007                                               )
 1008             ), api_version.APIVersionRequest('2.4'), None),
 1009             versioned_method.VersionedMethod('foo', (
 1010                 api_version.APIVersionRequest('2.11'
 1011                                               )
 1012             ), (api_version.APIVersionRequest('3.1')), None),
 1013             versioned_method.VersionedMethod('foo', (
 1014                 api_version.APIVersionRequest('2.8')
 1015             ), api_version.APIVersionRequest('2.9'), None),
 1016         ]
 1017 
 1018         result = (
 1019             wsgi.Controller.check_for_versions_intersection(func_list=func_list
 1020                                                             ))
 1021         self.assertFalse(result)
 1022 
 1023         func_list = [
 1024             versioned_method.VersionedMethod('foo', (
 1025                 api_version.APIVersionRequest('2.12'
 1026                                               )
 1027             ), api_version.APIVersionRequest('2.14'), None),
 1028             versioned_method.VersionedMethod('foo', (
 1029                 api_version.APIVersionRequest('3.0'
 1030                                               )
 1031             ), api_version.APIVersionRequest('3.4'), None)]
 1032 
 1033         result = (
 1034             wsgi.Controller.check_for_versions_intersection(func_list=func_list
 1035                                                             ))
 1036         self.assertFalse(result)
 1037 
 1038     def test_check_for_versions_intersection_positive(self):
 1039         func_list = [
 1040             versioned_method.VersionedMethod('foo', (
 1041                 api_version.APIVersionRequest('2.1'
 1042                                               )
 1043             ), api_version.APIVersionRequest('2.4'), None),
 1044             versioned_method.VersionedMethod('foo', (
 1045                 api_version.APIVersionRequest('2.3'
 1046                                               )
 1047             ), api_version.APIVersionRequest('3.0'), None),
 1048             versioned_method.VersionedMethod('foo', (
 1049                 api_version.APIVersionRequest('2.8'
 1050                                               )
 1051             ), api_version.APIVersionRequest('2.9'), None), ]
 1052 
 1053         result = (
 1054             wsgi.Controller.check_for_versions_intersection(func_list=func_list
 1055                                                             ))
 1056         self.assertTrue(result)