"Fossies" - the Fresh Open Source Software Archive

Member "swift-2.21.0/test/functional/s3api/test_bucket.py" (25 Mar 2019, 23421 Bytes) of package /linux/misc/openstack/swift-2.21.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_bucket.py": 2.19.1_vs_2.21.0.

    1 # Copyright (c) 2015 OpenStack Foundation
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 import unittest2
   17 import os
   18 
   19 import test.functional as tf
   20 from swift.common.utils import config_true_value
   21 from swift.common.middleware.s3api.etree import fromstring, tostring, Element, \
   22     SubElement
   23 from test.functional.s3api import S3ApiBase
   24 from test.functional.s3api.s3_test_client import Connection
   25 from test.functional.s3api.utils import get_error_code
   26 
   27 
   28 def setUpModule():
   29     tf.setup_package()
   30 
   31 
   32 def tearDownModule():
   33     tf.teardown_package()
   34 
   35 
   36 class TestS3ApiBucket(S3ApiBase):
   37     def setUp(self):
   38         super(TestS3ApiBucket, self).setUp()
   39 
   40     def _gen_location_xml(self, location):
   41         elem = Element('CreateBucketConfiguration')
   42         SubElement(elem, 'LocationConstraint').text = location
   43         return tostring(elem)
   44 
   45     def test_bucket(self):
   46         bucket = 'bucket'
   47         max_bucket_listing = tf.cluster_info['s3api'].get(
   48             'max_bucket_listing', 1000)
   49 
   50         # PUT Bucket
   51         status, headers, body = self.conn.make_request('PUT', bucket)
   52         self.assertEqual(status, 200)
   53 
   54         self.assertCommonResponseHeaders(headers)
   55         self.assertIn(headers['location'], (
   56             '/' + bucket,  # swob won't touch it...
   57             # but webob (which we get because of auth_token) *does*
   58             'http://%s%s/%s' % (
   59                 self.conn.host,
   60                 '' if self.conn.port == 80 else ':%d' % self.conn.port,
   61                 bucket),
   62             # This is all based on the Host header the client provided,
   63             # and boto will double-up ports for sig v4. See
   64             #   - https://github.com/boto/boto/issues/2623
   65             #   - https://github.com/boto/boto/issues/3716
   66             # with proposed fixes at
   67             #   - https://github.com/boto/boto/pull/3513
   68             #   - https://github.com/boto/boto/pull/3676
   69             'http://%s%s:%d/%s' % (
   70                 self.conn.host,
   71                 '' if self.conn.port == 80 else ':%d' % self.conn.port,
   72                 self.conn.port,
   73                 bucket),
   74         ))
   75         self.assertEqual(headers['content-length'], '0')
   76 
   77         # GET Bucket(Without Object)
   78         status, headers, body = self.conn.make_request('GET', bucket)
   79         self.assertEqual(status, 200)
   80 
   81         self.assertCommonResponseHeaders(headers)
   82         self.assertIsNotNone(headers['content-type'])
   83         self.assertEqual(headers['content-length'], str(len(body)))
   84         # TODO; requires consideration
   85         # self.assertEqual(headers['transfer-encoding'], 'chunked')
   86 
   87         elem = fromstring(body, 'ListBucketResult')
   88         self.assertEqual(elem.find('Name').text, bucket)
   89         self.assertIsNone(elem.find('Prefix').text)
   90         self.assertIsNone(elem.find('Marker').text)
   91         self.assertEqual(
   92             elem.find('MaxKeys').text, str(max_bucket_listing))
   93         self.assertEqual(elem.find('IsTruncated').text, 'false')
   94         objects = elem.findall('./Contents')
   95         self.assertEqual(list(objects), [])
   96 
   97         # GET Bucket(With Object)
   98         req_objects = ('object', 'object2')
   99         for obj in req_objects:
  100             self.conn.make_request('PUT', bucket, obj)
  101         status, headers, body = self.conn.make_request('GET', bucket)
  102         self.assertEqual(status, 200)
  103 
  104         elem = fromstring(body, 'ListBucketResult')
  105         self.assertEqual(elem.find('Name').text, bucket)
  106         self.assertIsNone(elem.find('Prefix').text)
  107         self.assertIsNone(elem.find('Marker').text)
  108         self.assertEqual(elem.find('MaxKeys').text,
  109                          str(max_bucket_listing))
  110         self.assertEqual(elem.find('IsTruncated').text, 'false')
  111         resp_objects = elem.findall('./Contents')
  112         self.assertEqual(len(list(resp_objects)), 2)
  113         for o in resp_objects:
  114             self.assertIn(o.find('Key').text, req_objects)
  115             self.assertIsNotNone(o.find('LastModified').text)
  116             self.assertRegexpMatches(
  117                 o.find('LastModified').text,
  118                 r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$')
  119             self.assertIsNotNone(o.find('ETag').text)
  120             self.assertIsNotNone(o.find('Size').text)
  121             self.assertIsNotNone(o.find('StorageClass').text)
  122             self.assertEqual(o.find('Owner/ID').text, self.conn.user_id)
  123             self.assertEqual(o.find('Owner/DisplayName').text,
  124                              self.conn.user_id)
  125 
  126         # HEAD Bucket
  127         status, headers, body = self.conn.make_request('HEAD', bucket)
  128         self.assertEqual(status, 200)
  129 
  130         self.assertCommonResponseHeaders(headers)
  131         self.assertIsNotNone(headers['content-type'])
  132         self.assertEqual(headers['content-length'], str(len(body)))
  133         # TODO; requires consideration
  134         # self.assertEqual(headers['transfer-encoding'], 'chunked')
  135 
  136         # DELETE Bucket
  137         for obj in req_objects:
  138             self.conn.make_request('DELETE', bucket, obj)
  139         status, headers, body = self.conn.make_request('DELETE', bucket)
  140         self.assertEqual(status, 204)
  141 
  142         self.assertCommonResponseHeaders(headers)
  143 
  144     def test_put_bucket_error(self):
  145         status, headers, body = \
  146             self.conn.make_request('PUT', 'bucket+invalid')
  147         self.assertEqual(get_error_code(body), 'InvalidBucketName')
  148 
  149         auth_error_conn = Connection(aws_secret_key='invalid')
  150         status, headers, body = auth_error_conn.make_request('PUT', 'bucket')
  151         self.assertEqual(get_error_code(body), 'SignatureDoesNotMatch')
  152 
  153         self.conn.make_request('PUT', 'bucket')
  154         status, headers, body = self.conn.make_request('PUT', 'bucket')
  155         self.assertEqual(status, 409)
  156         self.assertEqual(get_error_code(body), 'BucketAlreadyOwnedByYou')
  157 
  158     def test_put_bucket_error_key2(self):
  159         if config_true_value(tf.cluster_info['s3api'].get('s3_acl')):
  160             if 's3_access_key2' not in tf.config or \
  161                     's3_secret_key2' not in tf.config:
  162                 raise tf.SkipTest(
  163                     'Cannot test for BucketAlreadyExists with second user; '
  164                     'need s3_access_key2 and s3_secret_key2 configured')
  165 
  166             self.conn.make_request('PUT', 'bucket')
  167 
  168             # Other users of the same account get the same 409 error
  169             conn2 = Connection(tf.config['s3_access_key2'],
  170                                tf.config['s3_secret_key2'],
  171                                tf.config['s3_access_key2'])
  172             status, headers, body = conn2.make_request('PUT', 'bucket')
  173             self.assertEqual(status, 409)
  174             self.assertEqual(get_error_code(body), 'BucketAlreadyExists')
  175 
  176     def test_put_bucket_error_key3(self):
  177         if 's3_access_key3' not in tf.config or \
  178                 's3_secret_key3' not in tf.config:
  179             raise tf.SkipTest('Cannot test for AccessDenied; need '
  180                               's3_access_key3 and s3_secret_key3 configured')
  181 
  182         self.conn.make_request('PUT', 'bucket')
  183         # If the user can't create buckets, they shouldn't even know
  184         # whether the bucket exists.
  185         conn3 = Connection(tf.config['s3_access_key3'],
  186                            tf.config['s3_secret_key3'],
  187                            tf.config['s3_access_key3'])
  188         status, headers, body = conn3.make_request('PUT', 'bucket')
  189         self.assertEqual(status, 403)
  190         self.assertEqual(get_error_code(body), 'AccessDenied')
  191 
  192     def test_put_bucket_with_LocationConstraint(self):
  193         bucket = 'bucket'
  194         xml = self._gen_location_xml(self.conn.conn.auth_region_name)
  195         status, headers, body = \
  196             self.conn.make_request('PUT', bucket, body=xml)
  197         self.assertEqual(status, 200)
  198 
  199     def test_get_bucket_error(self):
  200         self.conn.make_request('PUT', 'bucket')
  201 
  202         status, headers, body = \
  203             self.conn.make_request('GET', 'bucket+invalid')
  204         self.assertEqual(get_error_code(body), 'InvalidBucketName')
  205 
  206         auth_error_conn = Connection(aws_secret_key='invalid')
  207         status, headers, body = auth_error_conn.make_request('GET', 'bucket')
  208         self.assertEqual(get_error_code(body), 'SignatureDoesNotMatch')
  209 
  210         status, headers, body = self.conn.make_request('GET', 'nothing')
  211         self.assertEqual(get_error_code(body), 'NoSuchBucket')
  212 
  213     def _prepare_test_get_bucket(self, bucket, objects):
  214         self.conn.make_request('PUT', bucket)
  215         for obj in objects:
  216             self.conn.make_request('PUT', bucket, obj)
  217 
  218     def test_get_bucket_with_delimiter(self):
  219         bucket = 'bucket'
  220         put_objects = ('object', 'object2', 'subdir/object', 'subdir2/object',
  221                        'dir/subdir/object')
  222         self._prepare_test_get_bucket(bucket, put_objects)
  223 
  224         delimiter = '/'
  225         query = 'delimiter=%s' % delimiter
  226         expect_objects = ('object', 'object2')
  227         expect_prefixes = ('dir/', 'subdir/', 'subdir2/')
  228         status, headers, body = \
  229             self.conn.make_request('GET', bucket, query=query)
  230         self.assertEqual(status, 200)
  231         elem = fromstring(body, 'ListBucketResult')
  232         self.assertEqual(elem.find('Delimiter').text, delimiter)
  233         resp_objects = elem.findall('./Contents')
  234         self.assertEqual(len(list(resp_objects)), len(expect_objects))
  235         for i, o in enumerate(resp_objects):
  236             self.assertEqual(o.find('Key').text, expect_objects[i])
  237             self.assertIsNotNone(o.find('LastModified').text)
  238             self.assertRegexpMatches(
  239                 o.find('LastModified').text,
  240                 r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$')
  241             self.assertIsNotNone(o.find('ETag').text)
  242             self.assertIsNotNone(o.find('Size').text)
  243             self.assertEqual(o.find('StorageClass').text, 'STANDARD')
  244             self.assertEqual(o.find('Owner/ID').text, self.conn.user_id)
  245             self.assertEqual(o.find('Owner/DisplayName').text,
  246                              self.conn.user_id)
  247         resp_prefixes = elem.findall('CommonPrefixes')
  248         self.assertEqual(len(resp_prefixes), len(expect_prefixes))
  249         for i, p in enumerate(resp_prefixes):
  250             self.assertEqual(p.find('./Prefix').text, expect_prefixes[i])
  251 
  252     def test_get_bucket_with_encoding_type(self):
  253         bucket = 'bucket'
  254         put_objects = ('object', 'object2')
  255         self._prepare_test_get_bucket(bucket, put_objects)
  256 
  257         encoding_type = 'url'
  258         query = 'encoding-type=%s' % encoding_type
  259         status, headers, body = \
  260             self.conn.make_request('GET', bucket, query=query)
  261         self.assertEqual(status, 200)
  262         elem = fromstring(body, 'ListBucketResult')
  263         self.assertEqual(elem.find('EncodingType').text, encoding_type)
  264 
  265     def test_get_bucket_with_marker(self):
  266         bucket = 'bucket'
  267         put_objects = ('object', 'object2', 'subdir/object', 'subdir2/object',
  268                        'dir/subdir/object')
  269         self._prepare_test_get_bucket(bucket, put_objects)
  270 
  271         marker = 'object'
  272         query = 'marker=%s' % marker
  273         expect_objects = ('object2', 'subdir/object', 'subdir2/object')
  274         status, headers, body = \
  275             self.conn.make_request('GET', bucket, query=query)
  276         self.assertEqual(status, 200)
  277         elem = fromstring(body, 'ListBucketResult')
  278         self.assertEqual(elem.find('Marker').text, marker)
  279         resp_objects = elem.findall('./Contents')
  280         self.assertEqual(len(list(resp_objects)), len(expect_objects))
  281         for i, o in enumerate(resp_objects):
  282             self.assertEqual(o.find('Key').text, expect_objects[i])
  283             self.assertIsNotNone(o.find('LastModified').text)
  284             self.assertRegexpMatches(
  285                 o.find('LastModified').text,
  286                 r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$')
  287             self.assertIsNotNone(o.find('ETag').text)
  288             self.assertIsNotNone(o.find('Size').text)
  289             self.assertEqual(o.find('StorageClass').text, 'STANDARD')
  290             self.assertEqual(o.find('Owner/ID').text, self.conn.user_id)
  291             self.assertEqual(o.find('Owner/DisplayName').text,
  292                              self.conn.user_id)
  293 
  294     def test_get_bucket_with_max_keys(self):
  295         bucket = 'bucket'
  296         put_objects = ('object', 'object2', 'subdir/object', 'subdir2/object',
  297                        'dir/subdir/object')
  298         self._prepare_test_get_bucket(bucket, put_objects)
  299 
  300         max_keys = '2'
  301         query = 'max-keys=%s' % max_keys
  302         expect_objects = ('dir/subdir/object', 'object')
  303         status, headers, body = \
  304             self.conn.make_request('GET', bucket, query=query)
  305         self.assertEqual(status, 200)
  306         elem = fromstring(body, 'ListBucketResult')
  307         self.assertEqual(elem.find('MaxKeys').text, max_keys)
  308         resp_objects = elem.findall('./Contents')
  309         self.assertEqual(len(list(resp_objects)), len(expect_objects))
  310         for i, o in enumerate(resp_objects):
  311             self.assertEqual(o.find('Key').text, expect_objects[i])
  312             self.assertIsNotNone(o.find('LastModified').text)
  313             self.assertRegexpMatches(
  314                 o.find('LastModified').text,
  315                 r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$')
  316             self.assertIsNotNone(o.find('ETag').text)
  317             self.assertIsNotNone(o.find('Size').text)
  318             self.assertEqual(o.find('StorageClass').text, 'STANDARD')
  319             self.assertEqual(o.find('Owner/ID').text, self.conn.user_id)
  320             self.assertEqual(o.find('Owner/DisplayName').text,
  321                              self.conn.user_id)
  322 
  323     def test_get_bucket_with_prefix(self):
  324         bucket = 'bucket'
  325         req_objects = ('object', 'object2', 'subdir/object', 'subdir2/object',
  326                        'dir/subdir/object')
  327         self._prepare_test_get_bucket(bucket, req_objects)
  328 
  329         prefix = 'object'
  330         query = 'prefix=%s' % prefix
  331         expect_objects = ('object', 'object2')
  332         status, headers, body = \
  333             self.conn.make_request('GET', bucket, query=query)
  334         self.assertEqual(status, 200)
  335         elem = fromstring(body, 'ListBucketResult')
  336         self.assertEqual(elem.find('Prefix').text, prefix)
  337         resp_objects = elem.findall('./Contents')
  338         self.assertEqual(len(list(resp_objects)), len(expect_objects))
  339         for i, o in enumerate(resp_objects):
  340             self.assertEqual(o.find('Key').text, expect_objects[i])
  341             self.assertIsNotNone(o.find('LastModified').text)
  342             self.assertRegexpMatches(
  343                 o.find('LastModified').text,
  344                 r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$')
  345             self.assertIsNotNone(o.find('ETag').text)
  346             self.assertIsNotNone(o.find('Size').text)
  347             self.assertEqual(o.find('StorageClass').text, 'STANDARD')
  348             self.assertEqual(o.find('Owner/ID').text, self.conn.user_id)
  349             self.assertEqual(o.find('Owner/DisplayName').text,
  350                              self.conn.user_id)
  351 
  352     def test_get_bucket_v2_with_start_after(self):
  353         bucket = 'bucket'
  354         put_objects = ('object', 'object2', 'subdir/object', 'subdir2/object',
  355                        'dir/subdir/object')
  356         self._prepare_test_get_bucket(bucket, put_objects)
  357 
  358         marker = 'object'
  359         query = 'list-type=2&start-after=%s' % marker
  360         expect_objects = ('object2', 'subdir/object', 'subdir2/object')
  361         status, headers, body = \
  362             self.conn.make_request('GET', bucket, query=query)
  363         self.assertEqual(status, 200)
  364         elem = fromstring(body, 'ListBucketResult')
  365         self.assertEqual(elem.find('StartAfter').text, marker)
  366         resp_objects = elem.findall('./Contents')
  367         self.assertEqual(len(list(resp_objects)), len(expect_objects))
  368         for i, o in enumerate(resp_objects):
  369             self.assertEqual(o.find('Key').text, expect_objects[i])
  370             self.assertTrue(o.find('LastModified').text is not None)
  371             self.assertRegexpMatches(
  372                 o.find('LastModified').text,
  373                 r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$')
  374             self.assertTrue(o.find('ETag').text is not None)
  375             self.assertTrue(o.find('Size').text is not None)
  376             self.assertEqual(o.find('StorageClass').text, 'STANDARD')
  377             self.assertIsNone(o.find('Owner/ID'))
  378             self.assertIsNone(o.find('Owner/DisplayName'))
  379 
  380     def test_get_bucket_v2_with_fetch_owner(self):
  381         bucket = 'bucket'
  382         put_objects = ('object', 'object2', 'subdir/object', 'subdir2/object',
  383                        'dir/subdir/object')
  384         self._prepare_test_get_bucket(bucket, put_objects)
  385 
  386         query = 'list-type=2&fetch-owner=true'
  387         expect_objects = ('dir/subdir/object', 'object', 'object2',
  388                           'subdir/object', 'subdir2/object')
  389         status, headers, body = \
  390             self.conn.make_request('GET', bucket, query=query)
  391         self.assertEqual(status, 200)
  392         elem = fromstring(body, 'ListBucketResult')
  393         self.assertEqual(elem.find('KeyCount').text, '5')
  394         resp_objects = elem.findall('./Contents')
  395         self.assertEqual(len(list(resp_objects)), len(expect_objects))
  396         for i, o in enumerate(resp_objects):
  397             self.assertEqual(o.find('Key').text, expect_objects[i])
  398             self.assertTrue(o.find('LastModified').text is not None)
  399             self.assertRegexpMatches(
  400                 o.find('LastModified').text,
  401                 r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$')
  402             self.assertTrue(o.find('ETag').text is not None)
  403             self.assertTrue(o.find('Size').text is not None)
  404             self.assertEqual(o.find('StorageClass').text, 'STANDARD')
  405             self.assertTrue(o.find('Owner/ID').text, self.conn.user_id)
  406             self.assertTrue(o.find('Owner/DisplayName').text,
  407                             self.conn.user_id)
  408 
  409     def test_get_bucket_v2_with_continuation_token_and_delimiter(self):
  410         bucket = 'bucket'
  411         put_objects = ('object', u'object2-\u062a', 'subdir/object',
  412                        u'subdir2-\u062a/object', 'dir/subdir/object',
  413                        'x', 'y', 'z')
  414         self._prepare_test_get_bucket(bucket, put_objects)
  415 
  416         expected = [{'objects': ['object', u'object2-\u062a'],
  417                      'subdirs': ['dir/']},
  418                     {'objects': ['x'],
  419                      'subdirs': ['subdir/', u'subdir2-\u062a/']},
  420                     {'objects': ['y', 'z'],
  421                      'subdirs': []}]
  422 
  423         continuation_token = ''
  424         query = 'list-type=2&max-keys=3&delimiter=/&continuation-token=%s'
  425 
  426         for i in range(len(expected)):
  427             status, headers, body = self.conn.make_request(
  428                 'GET', bucket, query=query % continuation_token)
  429             self.assertEqual(status, 200)
  430             elem = fromstring(body, 'ListBucketResult')
  431             self.assertEqual(elem.find('MaxKeys').text, '3')
  432             self.assertEqual(
  433                 elem.find('KeyCount').text,
  434                 str(len(expected[i]['objects']) + len(expected[i]['subdirs'])))
  435             expect_truncated = 'true' if i < len(expected) - 1 else 'false'
  436             self.assertEqual(elem.find('IsTruncated').text, expect_truncated)
  437             next_cont_token_elem = elem.find('NextContinuationToken')
  438             if expect_truncated == 'true':
  439                 self.assertIsNotNone(next_cont_token_elem)
  440                 continuation_token = next_cont_token_elem.text
  441             resp_objects = elem.findall('./Contents')
  442             self.assertEqual(
  443                 len(list(resp_objects)), len(expected[i]['objects']))
  444             for j, o in enumerate(resp_objects):
  445                 self.assertEqual(o.find('Key').text,
  446                                  expected[i]['objects'][j].encode('utf-8'))
  447                 self.assertTrue(o.find('LastModified').text is not None)
  448                 self.assertRegexpMatches(
  449                     o.find('LastModified').text,
  450                     r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$')
  451                 self.assertTrue(o.find('ETag').text is not None)
  452                 self.assertTrue(o.find('Size').text is not None)
  453                 self.assertEqual(o.find('StorageClass').text, 'STANDARD')
  454                 self.assertIsNone(o.find('Owner/ID'))
  455                 self.assertIsNone(o.find('Owner/DisplayName'))
  456             resp_subdirs = elem.findall('./CommonPrefixes')
  457             self.assertEqual(
  458                 len(list(resp_subdirs)), len(expected[i]['subdirs']))
  459             for j, o in enumerate(resp_subdirs):
  460                 self.assertEqual(
  461                     o.find('Prefix').text,
  462                     expected[i]['subdirs'][j].encode('utf-8'))
  463 
  464     def test_head_bucket_error(self):
  465         self.conn.make_request('PUT', 'bucket')
  466 
  467         status, headers, body = \
  468             self.conn.make_request('HEAD', 'bucket+invalid')
  469         self.assertEqual(status, 400)
  470         self.assertEqual(body, '')  # sanity
  471 
  472         auth_error_conn = Connection(aws_secret_key='invalid')
  473         status, headers, body = \
  474             auth_error_conn.make_request('HEAD', 'bucket')
  475         self.assertEqual(status, 403)
  476         self.assertEqual(body, '')  # sanity
  477 
  478         status, headers, body = self.conn.make_request('HEAD', 'nothing')
  479         self.assertEqual(status, 404)
  480         self.assertEqual(body, '')  # sanity
  481 
  482     def test_delete_bucket_error(self):
  483         status, headers, body = \
  484             self.conn.make_request('DELETE', 'bucket+invalid')
  485         self.assertEqual(get_error_code(body), 'InvalidBucketName')
  486 
  487         auth_error_conn = Connection(aws_secret_key='invalid')
  488         status, headers, body = \
  489             auth_error_conn.make_request('DELETE', 'bucket')
  490         self.assertEqual(get_error_code(body), 'SignatureDoesNotMatch')
  491 
  492         status, headers, body = self.conn.make_request('DELETE', 'bucket')
  493         self.assertEqual(get_error_code(body), 'NoSuchBucket')
  494 
  495     def test_bucket_invalid_method_error(self):
  496         # non existed verb in the controller
  497         status, headers, body = \
  498             self.conn.make_request('GETPUT', 'bucket')
  499         self.assertEqual(get_error_code(body), 'MethodNotAllowed')
  500         # the method exists in the controller but deny as MethodNotAllowed
  501         status, headers, body = \
  502             self.conn.make_request('_delete_segments_bucket', 'bucket')
  503         self.assertEqual(get_error_code(body), 'MethodNotAllowed')
  504 
  505 
  506 class TestS3ApiBucketSigV4(TestS3ApiBucket):
  507     @classmethod
  508     def setUpClass(cls):
  509         os.environ['S3_USE_SIGV4'] = "True"
  510 
  511     @classmethod
  512     def tearDownClass(cls):
  513         del os.environ['S3_USE_SIGV4']
  514 
  515     def setUp(self):
  516         super(TestS3ApiBucket, self).setUp()
  517 
  518 
  519 if __name__ == '__main__':
  520     unittest2.main()