"Fossies" - the Fresh Open Source Software Archive

Member "swift-2.21.0/test/functional/test_tempurl.py" (25 Mar 2019, 32056 Bytes) of package /linux/misc/openstack/swift-2.21.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_tempurl.py": 2.19.1_vs_2.21.0.

    1 #!/usr/bin/python -u
    2 # Copyright (c) 2010-2016 OpenStack Foundation
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License");
    5 # you may not use this file except in compliance with the License.
    6 # You may obtain a copy of the License at
    7 #
    8 #    http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 # Unless required by applicable law or agreed to in writing, software
   11 # distributed under the License is distributed on an "AS IS" BASIS,
   12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   13 # implied.
   14 # See the License for the specific language governing permissions and
   15 # limitations under the License.
   16 
   17 import base64
   18 import functools
   19 import hmac
   20 import hashlib
   21 import json
   22 from copy import deepcopy
   23 from six.moves import urllib
   24 from time import time, strftime, gmtime
   25 
   26 import test.functional as tf
   27 from swift.common.middleware import tempurl
   28 from test.functional import cluster_info
   29 from test.functional.tests import Utils, Base, Base2, BaseEnv
   30 from test.functional import requires_acls, SkipTest
   31 from test.functional.swift_test_client import Account, Connection, \
   32     ResponseError
   33 
   34 
   35 def setUpModule():
   36     tf.setup_package()
   37 
   38 
   39 def tearDownModule():
   40     tf.teardown_package()
   41 
   42 
   43 class TestTempurlBaseEnv(BaseEnv):
   44     original_account_meta = None
   45 
   46     @classmethod
   47     def setUp(cls):
   48         super(TestTempurlBaseEnv, cls).setUp()
   49         cls.original_account_meta = cls.account.info()
   50 
   51     @classmethod
   52     def tearDown(cls):
   53         if cls.original_account_meta:
   54             # restore any tempurl keys that the tests may have overwritten
   55             cls.account.update_metadata(
   56                 dict((k, cls.original_account_meta.get(k, ''))
   57                      for k in ('temp-url-key', 'temp-url-key-2',)))
   58 
   59 
   60 class TestTempurlEnv(TestTempurlBaseEnv):
   61     tempurl_enabled = None  # tri-state: None initially, then True/False
   62 
   63     @classmethod
   64     def setUp(cls):
   65         if cls.tempurl_enabled is None:
   66             cls.tempurl_enabled = 'tempurl' in cluster_info
   67             if not cls.tempurl_enabled:
   68                 return
   69 
   70         super(TestTempurlEnv, cls).setUp()
   71 
   72         cls.tempurl_key = Utils.create_name()
   73         cls.tempurl_key2 = Utils.create_name()
   74 
   75         cls.account.update_metadata({
   76             'temp-url-key': cls.tempurl_key,
   77             'temp-url-key-2': cls.tempurl_key2
   78         })
   79 
   80         cls.container = cls.account.container(Utils.create_name())
   81         if not cls.container.create():
   82             raise ResponseError(cls.conn.response)
   83 
   84         cls.obj = cls.container.file(Utils.create_name())
   85         cls.obj.write("obj contents")
   86         cls.other_obj = cls.container.file(Utils.create_name())
   87         cls.other_obj.write("other obj contents")
   88 
   89 
   90 class TestTempurl(Base):
   91     env = TestTempurlEnv
   92     digest_name = 'sha1'
   93 
   94     def setUp(self):
   95         super(TestTempurl, self).setUp()
   96         if self.env.tempurl_enabled is False:
   97             raise SkipTest("TempURL not enabled")
   98         elif self.env.tempurl_enabled is not True:
   99             # just some sanity checking
  100             raise Exception(
  101                 "Expected tempurl_enabled to be True/False, got %r" %
  102                 (self.env.tempurl_enabled,))
  103 
  104         if self.digest_name not in cluster_info['tempurl'].get(
  105                 'allowed_digests', ['sha1']):
  106             raise SkipTest("tempurl does not support %s signatures" %
  107                            self.digest_name)
  108 
  109         self.digest = getattr(hashlib, self.digest_name)
  110         self.expires = int(time()) + 86400
  111         self.expires_8601 = strftime(
  112             tempurl.EXPIRES_ISO8601_FORMAT, gmtime(self.expires))
  113         self.obj_tempurl_parms = self.tempurl_parms(
  114             'GET', self.expires, self.env.conn.make_path(self.env.obj.path),
  115             self.env.tempurl_key)
  116 
  117     def tempurl_parms(self, method, expires, path, key):
  118         sig = hmac.new(
  119             key,
  120             '%s\n%s\n%s' % (method, expires, urllib.parse.unquote(path)),
  121             self.digest).hexdigest()
  122         return {'temp_url_sig': sig, 'temp_url_expires': str(expires)}
  123 
  124     def test_GET(self):
  125         for e in (str(self.expires), self.expires_8601):
  126             self.obj_tempurl_parms['temp_url_expires'] = e
  127 
  128             contents = self.env.obj.read(
  129                 parms=self.obj_tempurl_parms,
  130                 cfg={'no_auth_token': True})
  131             self.assertEqual(contents, "obj contents")
  132 
  133             # GET tempurls also allow HEAD requests
  134             self.assertTrue(self.env.obj.info(parms=self.obj_tempurl_parms,
  135                                               cfg={'no_auth_token': True}))
  136 
  137     def test_GET_with_key_2(self):
  138         expires = int(time()) + 86400
  139         parms = self.tempurl_parms(
  140             'GET', expires, self.env.conn.make_path(self.env.obj.path),
  141             self.env.tempurl_key2)
  142 
  143         contents = self.env.obj.read(parms=parms, cfg={'no_auth_token': True})
  144         self.assertEqual(contents, "obj contents")
  145 
  146     def test_GET_DLO_inside_container(self):
  147         seg1 = self.env.container.file(
  148             "get-dlo-inside-seg1" + Utils.create_name())
  149         seg2 = self.env.container.file(
  150             "get-dlo-inside-seg2" + Utils.create_name())
  151         seg1.write("one fish two fish ")
  152         seg2.write("red fish blue fish")
  153 
  154         manifest = self.env.container.file("manifest" + Utils.create_name())
  155         manifest.write(
  156             '',
  157             hdrs={"X-Object-Manifest": "%s/get-dlo-inside-seg" %
  158                   (self.env.container.name,)})
  159 
  160         expires = int(time()) + 86400
  161         parms = self.tempurl_parms(
  162             'GET', expires, self.env.conn.make_path(manifest.path),
  163             self.env.tempurl_key)
  164 
  165         contents = manifest.read(parms=parms, cfg={'no_auth_token': True})
  166         self.assertEqual(contents, "one fish two fish red fish blue fish")
  167 
  168     def test_GET_DLO_outside_container(self):
  169         seg1 = self.env.container.file(
  170             "get-dlo-outside-seg1" + Utils.create_name())
  171         seg2 = self.env.container.file(
  172             "get-dlo-outside-seg2" + Utils.create_name())
  173         seg1.write("one fish two fish ")
  174         seg2.write("red fish blue fish")
  175 
  176         container2 = self.env.account.container(Utils.create_name())
  177         container2.create()
  178 
  179         manifest = container2.file("manifest" + Utils.create_name())
  180         manifest.write(
  181             '',
  182             hdrs={"X-Object-Manifest": "%s/get-dlo-outside-seg" %
  183                   (self.env.container.name,)})
  184 
  185         expires = int(time()) + 86400
  186         parms = self.tempurl_parms(
  187             'GET', expires, self.env.conn.make_path(manifest.path),
  188             self.env.tempurl_key)
  189 
  190         # cross container tempurl works fine for account tempurl key
  191         contents = manifest.read(parms=parms, cfg={'no_auth_token': True})
  192         self.assertEqual(contents, "one fish two fish red fish blue fish")
  193         self.assert_status([200])
  194 
  195     def test_PUT(self):
  196         new_obj = self.env.container.file(Utils.create_name())
  197 
  198         expires = int(time()) + 86400
  199         expires_8601 = strftime(
  200             tempurl.EXPIRES_ISO8601_FORMAT, gmtime(expires))
  201 
  202         put_parms = self.tempurl_parms(
  203             'PUT', expires, self.env.conn.make_path(new_obj.path),
  204             self.env.tempurl_key)
  205         for e in (str(expires), expires_8601):
  206             put_parms['temp_url_expires'] = e
  207 
  208             new_obj.write('new obj contents',
  209                           parms=put_parms, cfg={'no_auth_token': True})
  210             self.assertEqual(new_obj.read(), "new obj contents")
  211 
  212             # PUT tempurls also allow HEAD requests
  213             self.assertTrue(new_obj.info(parms=put_parms,
  214                                          cfg={'no_auth_token': True}))
  215 
  216     def test_PUT_manifest_access(self):
  217         new_obj = self.env.container.file(Utils.create_name())
  218 
  219         # give out a signature which allows a PUT to new_obj
  220         expires = int(time()) + 86400
  221         put_parms = self.tempurl_parms(
  222             'PUT', expires, self.env.conn.make_path(new_obj.path),
  223             self.env.tempurl_key)
  224 
  225         # try to create manifest pointing to some random container
  226         try:
  227             new_obj.write('', {
  228                 'x-object-manifest': '%s/foo' % 'some_random_container'
  229             }, parms=put_parms, cfg={'no_auth_token': True})
  230         except ResponseError as e:
  231             self.assertEqual(e.status, 400)
  232         else:
  233             self.fail('request did not error')
  234 
  235         # create some other container
  236         other_container = self.env.account.container(Utils.create_name())
  237         if not other_container.create():
  238             raise ResponseError(self.conn.response)
  239 
  240         # try to create manifest pointing to new container
  241         try:
  242             new_obj.write('', {
  243                 'x-object-manifest': '%s/foo' % other_container
  244             }, parms=put_parms, cfg={'no_auth_token': True})
  245         except ResponseError as e:
  246             self.assertEqual(e.status, 400)
  247         else:
  248             self.fail('request did not error')
  249 
  250         # try again using a tempurl POST to an already created object
  251         new_obj.write('', {}, parms=put_parms, cfg={'no_auth_token': True})
  252         expires = int(time()) + 86400
  253         post_parms = self.tempurl_parms(
  254             'POST', expires, self.env.conn.make_path(new_obj.path),
  255             self.env.tempurl_key)
  256         try:
  257             new_obj.post({'x-object-manifest': '%s/foo' % other_container},
  258                          parms=post_parms, cfg={'no_auth_token': True})
  259         except ResponseError as e:
  260             self.assertEqual(e.status, 400)
  261         else:
  262             self.fail('request did not error')
  263 
  264     def test_HEAD(self):
  265         expires = int(time()) + 86400
  266         head_parms = self.tempurl_parms(
  267             'HEAD', expires, self.env.conn.make_path(self.env.obj.path),
  268             self.env.tempurl_key)
  269 
  270         self.assertTrue(self.env.obj.info(parms=head_parms,
  271                                           cfg={'no_auth_token': True}))
  272 
  273         # HEAD tempurls don't allow PUT or GET requests, despite the fact that
  274         # PUT and GET tempurls both allow HEAD requests
  275         self.assertRaises(ResponseError, self.env.other_obj.read,
  276                           cfg={'no_auth_token': True},
  277                           parms=head_parms)
  278         self.assert_status([401])
  279 
  280         self.assertRaises(ResponseError, self.env.other_obj.write,
  281                           'new contents',
  282                           cfg={'no_auth_token': True},
  283                           parms=head_parms)
  284         self.assert_status([401])
  285 
  286     def test_different_object(self):
  287         contents = self.env.obj.read(
  288             parms=self.obj_tempurl_parms,
  289             cfg={'no_auth_token': True})
  290         self.assertEqual(contents, "obj contents")
  291 
  292         self.assertRaises(ResponseError, self.env.other_obj.read,
  293                           cfg={'no_auth_token': True},
  294                           parms=self.obj_tempurl_parms)
  295         self.assert_status([401])
  296 
  297     def test_changing_sig(self):
  298         contents = self.env.obj.read(
  299             parms=self.obj_tempurl_parms,
  300             cfg={'no_auth_token': True})
  301         self.assertEqual(contents, "obj contents")
  302 
  303         parms = self.obj_tempurl_parms.copy()
  304         if parms['temp_url_sig'][0] == 'a':
  305             parms['temp_url_sig'] = 'b' + parms['temp_url_sig'][1:]
  306         else:
  307             parms['temp_url_sig'] = 'a' + parms['temp_url_sig'][1:]
  308 
  309         self.assertRaises(ResponseError, self.env.obj.read,
  310                           cfg={'no_auth_token': True},
  311                           parms=parms)
  312         self.assert_status([401])
  313 
  314     def test_changing_expires(self):
  315         contents = self.env.obj.read(
  316             parms=self.obj_tempurl_parms,
  317             cfg={'no_auth_token': True})
  318         self.assertEqual(contents, "obj contents")
  319 
  320         parms = self.obj_tempurl_parms.copy()
  321         if parms['temp_url_expires'][-1] == '0':
  322             parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '1'
  323         else:
  324             parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '0'
  325 
  326         self.assertRaises(ResponseError, self.env.obj.read,
  327                           cfg={'no_auth_token': True},
  328                           parms=parms)
  329         self.assert_status([401])
  330 
  331 
  332 class TestTempURLPrefix(TestTempurl):
  333     def tempurl_parms(self, method, expires, path, key,
  334                       prefix=None):
  335         path_parts = urllib.parse.unquote(path).split('/')
  336 
  337         if prefix is None:
  338             # Choose the first 4 chars of object name as prefix.
  339             prefix = path_parts[4][0:4]
  340         sig = hmac.new(
  341             key,
  342             '%s\n%s\nprefix:%s' % (method, expires,
  343                                    '/'.join(path_parts[0:4]) + '/' + prefix),
  344             self.digest).hexdigest()
  345         return {
  346             'temp_url_sig': sig, 'temp_url_expires': str(expires),
  347             'temp_url_prefix': prefix}
  348 
  349     def test_empty_prefix(self):
  350         parms = self.tempurl_parms(
  351             'GET', self.expires,
  352             self.env.conn.make_path(self.env.obj.path),
  353             self.env.tempurl_key, '')
  354 
  355         contents = self.env.obj.read(
  356             parms=parms,
  357             cfg={'no_auth_token': True})
  358         self.assertEqual(contents, "obj contents")
  359 
  360     def test_no_prefix_match(self):
  361         prefix = 'b' if self.env.obj.name[0] == 'a' else 'a'
  362 
  363         parms = self.tempurl_parms(
  364             'GET', self.expires,
  365             self.env.conn.make_path(self.env.obj.path),
  366             self.env.tempurl_key, prefix)
  367 
  368         self.assertRaises(ResponseError, self.env.obj.read,
  369                           cfg={'no_auth_token': True},
  370                           parms=parms)
  371         self.assert_status([401])
  372 
  373     def test_object_url_with_prefix(self):
  374         parms = super(TestTempURLPrefix, self).tempurl_parms(
  375             'GET', self.expires,
  376             self.env.conn.make_path(self.env.obj.path),
  377             self.env.tempurl_key)
  378         parms['temp_url_prefix'] = self.env.obj.name
  379 
  380         self.assertRaises(ResponseError, self.env.obj.read,
  381                           cfg={'no_auth_token': True},
  382                           parms=parms)
  383         self.assert_status([401])
  384 
  385     def test_missing_query_parm(self):
  386         del self.obj_tempurl_parms['temp_url_prefix']
  387 
  388         self.assertRaises(ResponseError, self.env.obj.read,
  389                           cfg={'no_auth_token': True},
  390                           parms=self.obj_tempurl_parms)
  391         self.assert_status([401])
  392 
  393 
  394 class TestTempurlUTF8(Base2, TestTempurl):
  395     pass
  396 
  397 
  398 class TestContainerTempurlEnv(BaseEnv):
  399     tempurl_enabled = None  # tri-state: None initially, then True/False
  400 
  401     @classmethod
  402     def setUp(cls):
  403         if cls.tempurl_enabled is None:
  404             cls.tempurl_enabled = 'tempurl' in cluster_info
  405             if not cls.tempurl_enabled:
  406                 return
  407 
  408         super(TestContainerTempurlEnv, cls).setUp()
  409 
  410         cls.tempurl_key = Utils.create_name()
  411         cls.tempurl_key2 = Utils.create_name()
  412 
  413         if not tf.skip2:
  414             # creating another account and connection
  415             # for ACL tests
  416             config2 = deepcopy(tf.config)
  417             config2['account'] = tf.config['account2']
  418             config2['username'] = tf.config['username2']
  419             config2['password'] = tf.config['password2']
  420             cls.conn2 = Connection(config2)
  421             cls.conn2.authenticate()
  422             cls.account2 = Account(
  423                 cls.conn2, config2.get('account', config2['username']))
  424             cls.account2 = cls.conn2.get_account()
  425 
  426         cls.container = cls.account.container(Utils.create_name())
  427         if not tf.skip2:
  428             if not cls.container.create({
  429                     'x-container-meta-temp-url-key': cls.tempurl_key,
  430                     'x-container-meta-temp-url-key-2': cls.tempurl_key2,
  431                     'x-container-read': cls.account2.name}):
  432                 raise ResponseError(cls.conn.response)
  433         else:
  434             if not cls.container.create({
  435                     'x-container-meta-temp-url-key': cls.tempurl_key,
  436                     'x-container-meta-temp-url-key-2': cls.tempurl_key2}):
  437                 raise ResponseError(cls.conn.response)
  438 
  439         cls.obj = cls.container.file(Utils.create_name())
  440         cls.obj.write("obj contents")
  441         cls.other_obj = cls.container.file(Utils.create_name())
  442         cls.other_obj.write("other obj contents")
  443 
  444 
  445 class TestContainerTempurl(Base):
  446     env = TestContainerTempurlEnv
  447     digest_name = 'sha1'
  448 
  449     def setUp(self):
  450         super(TestContainerTempurl, self).setUp()
  451         if self.env.tempurl_enabled is False:
  452             raise SkipTest("TempURL not enabled")
  453         elif self.env.tempurl_enabled is not True:
  454             # just some sanity checking
  455             raise Exception(
  456                 "Expected tempurl_enabled to be True/False, got %r" %
  457                 (self.env.tempurl_enabled,))
  458 
  459         if self.digest_name not in cluster_info['tempurl'].get(
  460                 'allowed_digests', ['sha1']):
  461             raise SkipTest("tempurl does not support %s signatures" %
  462                            self.digest_name)
  463 
  464         self.digest = getattr(hashlib, self.digest_name)
  465         expires = int(time()) + 86400
  466         sig = self.tempurl_sig(
  467             'GET', expires, self.env.conn.make_path(self.env.obj.path),
  468             self.env.tempurl_key)
  469         self.obj_tempurl_parms = {'temp_url_sig': sig,
  470                                   'temp_url_expires': str(expires)}
  471 
  472     def tempurl_sig(self, method, expires, path, key):
  473         return hmac.new(
  474             key,
  475             '%s\n%s\n%s' % (method, expires, urllib.parse.unquote(path)),
  476             self.digest).hexdigest()
  477 
  478     def test_GET(self):
  479         contents = self.env.obj.read(
  480             parms=self.obj_tempurl_parms,
  481             cfg={'no_auth_token': True})
  482         self.assertEqual(contents, "obj contents")
  483 
  484         # GET tempurls also allow HEAD requests
  485         self.assertTrue(self.env.obj.info(parms=self.obj_tempurl_parms,
  486                                           cfg={'no_auth_token': True}))
  487 
  488     def test_GET_with_key_2(self):
  489         expires = int(time()) + 86400
  490         sig = self.tempurl_sig(
  491             'GET', expires, self.env.conn.make_path(self.env.obj.path),
  492             self.env.tempurl_key2)
  493         parms = {'temp_url_sig': sig,
  494                  'temp_url_expires': str(expires)}
  495 
  496         contents = self.env.obj.read(parms=parms, cfg={'no_auth_token': True})
  497         self.assertEqual(contents, "obj contents")
  498 
  499     def test_PUT(self):
  500         new_obj = self.env.container.file(Utils.create_name())
  501 
  502         expires = int(time()) + 86400
  503         sig = self.tempurl_sig(
  504             'PUT', expires, self.env.conn.make_path(new_obj.path),
  505             self.env.tempurl_key)
  506         put_parms = {'temp_url_sig': sig,
  507                      'temp_url_expires': str(expires)}
  508 
  509         new_obj.write('new obj contents',
  510                       parms=put_parms, cfg={'no_auth_token': True})
  511         self.assertEqual(new_obj.read(), "new obj contents")
  512 
  513         # PUT tempurls also allow HEAD requests
  514         self.assertTrue(new_obj.info(parms=put_parms,
  515                                      cfg={'no_auth_token': True}))
  516 
  517     def test_HEAD(self):
  518         expires = int(time()) + 86400
  519         sig = self.tempurl_sig(
  520             'HEAD', expires, self.env.conn.make_path(self.env.obj.path),
  521             self.env.tempurl_key)
  522         head_parms = {'temp_url_sig': sig,
  523                       'temp_url_expires': str(expires)}
  524 
  525         self.assertTrue(self.env.obj.info(parms=head_parms,
  526                                           cfg={'no_auth_token': True}))
  527         # HEAD tempurls don't allow PUT or GET requests, despite the fact that
  528         # PUT and GET tempurls both allow HEAD requests
  529         self.assertRaises(ResponseError, self.env.other_obj.read,
  530                           cfg={'no_auth_token': True},
  531                           parms=self.obj_tempurl_parms)
  532         self.assert_status([401])
  533 
  534         self.assertRaises(ResponseError, self.env.other_obj.write,
  535                           'new contents',
  536                           cfg={'no_auth_token': True},
  537                           parms=self.obj_tempurl_parms)
  538         self.assert_status([401])
  539 
  540     def test_different_object(self):
  541         contents = self.env.obj.read(
  542             parms=self.obj_tempurl_parms,
  543             cfg={'no_auth_token': True})
  544         self.assertEqual(contents, "obj contents")
  545 
  546         self.assertRaises(ResponseError, self.env.other_obj.read,
  547                           cfg={'no_auth_token': True},
  548                           parms=self.obj_tempurl_parms)
  549         self.assert_status([401])
  550 
  551     def test_changing_sig(self):
  552         contents = self.env.obj.read(
  553             parms=self.obj_tempurl_parms,
  554             cfg={'no_auth_token': True})
  555         self.assertEqual(contents, "obj contents")
  556 
  557         parms = self.obj_tempurl_parms.copy()
  558         if parms['temp_url_sig'][0] == 'a':
  559             parms['temp_url_sig'] = 'b' + parms['temp_url_sig'][1:]
  560         else:
  561             parms['temp_url_sig'] = 'a' + parms['temp_url_sig'][1:]
  562 
  563         self.assertRaises(ResponseError, self.env.obj.read,
  564                           cfg={'no_auth_token': True},
  565                           parms=parms)
  566         self.assert_status([401])
  567 
  568     def test_changing_expires(self):
  569         contents = self.env.obj.read(
  570             parms=self.obj_tempurl_parms,
  571             cfg={'no_auth_token': True})
  572         self.assertEqual(contents, "obj contents")
  573 
  574         parms = self.obj_tempurl_parms.copy()
  575         if parms['temp_url_expires'][-1] == '0':
  576             parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '1'
  577         else:
  578             parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '0'
  579 
  580         self.assertRaises(ResponseError, self.env.obj.read,
  581                           cfg={'no_auth_token': True},
  582                           parms=parms)
  583         self.assert_status([401])
  584 
  585     @requires_acls
  586     def test_tempurl_keys_visible_to_account_owner(self):
  587         metadata = self.env.container.info()
  588         self.assertEqual(metadata.get('tempurl_key'), self.env.tempurl_key)
  589         self.assertEqual(metadata.get('tempurl_key2'), self.env.tempurl_key2)
  590 
  591     @requires_acls
  592     def test_tempurl_keys_hidden_from_acl_readonly(self):
  593         if tf.skip2:
  594             raise SkipTest('Account2 not set')
  595         metadata = self.env.container.info(cfg={
  596             'use_token': self.env.conn2.storage_token})
  597 
  598         self.assertNotIn(
  599             'tempurl_key', metadata,
  600             'Container TempURL key found, should not be visible '
  601             'to readonly ACLs')
  602         self.assertNotIn(
  603             'tempurl_key2', metadata,
  604             'Container TempURL key-2 found, should not be visible '
  605             'to readonly ACLs')
  606 
  607     def test_GET_DLO_inside_container(self):
  608         seg1 = self.env.container.file(
  609             "get-dlo-inside-seg1" + Utils.create_name())
  610         seg2 = self.env.container.file(
  611             "get-dlo-inside-seg2" + Utils.create_name())
  612         seg1.write("one fish two fish ")
  613         seg2.write("red fish blue fish")
  614 
  615         manifest = self.env.container.file("manifest" + Utils.create_name())
  616         manifest.write(
  617             '',
  618             hdrs={"X-Object-Manifest": "%s/get-dlo-inside-seg" %
  619                   (self.env.container.name,)})
  620 
  621         expires = int(time()) + 86400
  622         sig = self.tempurl_sig(
  623             'GET', expires, self.env.conn.make_path(manifest.path),
  624             self.env.tempurl_key)
  625         parms = {'temp_url_sig': sig,
  626                  'temp_url_expires': str(expires)}
  627 
  628         contents = manifest.read(parms=parms, cfg={'no_auth_token': True})
  629         self.assertEqual(contents, "one fish two fish red fish blue fish")
  630 
  631     def test_GET_DLO_outside_container(self):
  632         container2 = self.env.account.container(Utils.create_name())
  633         container2.create()
  634         seg1 = container2.file(
  635             "get-dlo-outside-seg1" + Utils.create_name())
  636         seg2 = container2.file(
  637             "get-dlo-outside-seg2" + Utils.create_name())
  638         seg1.write("one fish two fish ")
  639         seg2.write("red fish blue fish")
  640 
  641         manifest = self.env.container.file("manifest" + Utils.create_name())
  642         manifest.write(
  643             '',
  644             hdrs={"X-Object-Manifest": "%s/get-dlo-outside-seg" %
  645                   (container2.name,)})
  646 
  647         expires = int(time()) + 86400
  648         sig = self.tempurl_sig(
  649             'GET', expires, self.env.conn.make_path(manifest.path),
  650             self.env.tempurl_key)
  651         parms = {'temp_url_sig': sig,
  652                  'temp_url_expires': str(expires)}
  653 
  654         # cross container tempurl does not work for container tempurl key
  655         try:
  656             manifest.read(parms=parms, cfg={'no_auth_token': True})
  657         except ResponseError as e:
  658             self.assertEqual(e.status, 401)
  659         else:
  660             self.fail('request did not error')
  661         try:
  662             manifest.info(parms=parms, cfg={'no_auth_token': True})
  663         except ResponseError as e:
  664             self.assertEqual(e.status, 401)
  665         else:
  666             self.fail('request did not error')
  667 
  668 
  669 class TestContainerTempurlUTF8(Base2, TestContainerTempurl):
  670     pass
  671 
  672 
  673 class TestSloTempurlEnv(TestTempurlBaseEnv):
  674     enabled = None  # tri-state: None initially, then True/False
  675 
  676     @classmethod
  677     def setUp(cls):
  678         super(TestSloTempurlEnv, cls).setUp()
  679         if cls.enabled is None:
  680             cls.enabled = 'tempurl' in cluster_info and 'slo' in cluster_info
  681 
  682         cls.tempurl_key = Utils.create_name()
  683 
  684         cls.account.update_metadata({'temp-url-key': cls.tempurl_key})
  685 
  686         cls.manifest_container = cls.account.container(Utils.create_name())
  687         cls.segments_container = cls.account.container(Utils.create_name())
  688         if not cls.manifest_container.create():
  689             raise ResponseError(cls.conn.response)
  690         if not cls.segments_container.create():
  691             raise ResponseError(cls.conn.response)
  692 
  693         seg1 = cls.segments_container.file(Utils.create_name())
  694         seg1.write('1' * 1024 * 1024)
  695 
  696         seg2 = cls.segments_container.file(Utils.create_name())
  697         seg2.write('2' * 1024 * 1024)
  698 
  699         cls.manifest_data = [{'size_bytes': 1024 * 1024,
  700                               'etag': seg1.md5,
  701                               'path': '/%s/%s' % (cls.segments_container.name,
  702                                                   seg1.name)},
  703                              {'size_bytes': 1024 * 1024,
  704                               'etag': seg2.md5,
  705                               'path': '/%s/%s' % (cls.segments_container.name,
  706                                                   seg2.name)}]
  707 
  708         cls.manifest = cls.manifest_container.file(Utils.create_name())
  709         cls.manifest.write(
  710             json.dumps(cls.manifest_data),
  711             parms={'multipart-manifest': 'put'})
  712 
  713 
  714 class TestSloTempurl(Base):
  715     env = TestSloTempurlEnv
  716     digest_name = 'sha1'
  717 
  718     def setUp(self):
  719         super(TestSloTempurl, self).setUp()
  720         if self.env.enabled is False:
  721             raise SkipTest("TempURL and SLO not both enabled")
  722         elif self.env.enabled is not True:
  723             # just some sanity checking
  724             raise Exception(
  725                 "Expected enabled to be True/False, got %r" %
  726                 (self.env.enabled,))
  727 
  728         if self.digest_name not in cluster_info['tempurl'].get(
  729                 'allowed_digests', ['sha1']):
  730             raise SkipTest("tempurl does not support %s signatures" %
  731                            self.digest_name)
  732         self.digest = getattr(hashlib, self.digest_name)
  733 
  734     def tempurl_sig(self, method, expires, path, key):
  735         return hmac.new(
  736             key,
  737             '%s\n%s\n%s' % (method, expires, urllib.parse.unquote(path)),
  738             self.digest).hexdigest()
  739 
  740     def test_GET(self):
  741         expires = int(time()) + 86400
  742         sig = self.tempurl_sig(
  743             'GET', expires, self.env.conn.make_path(self.env.manifest.path),
  744             self.env.tempurl_key)
  745         parms = {'temp_url_sig': sig, 'temp_url_expires': str(expires)}
  746 
  747         contents = self.env.manifest.read(
  748             parms=parms,
  749             cfg={'no_auth_token': True})
  750         self.assertEqual(len(contents), 2 * 1024 * 1024)
  751 
  752         # GET tempurls also allow HEAD requests
  753         self.assertTrue(self.env.manifest.info(
  754             parms=parms, cfg={'no_auth_token': True}))
  755 
  756 
  757 class TestSloTempurlUTF8(Base2, TestSloTempurl):
  758     pass
  759 
  760 
  761 def requires_digest(digest):
  762     def decorator(func):
  763         @functools.wraps(func)
  764         def wrapper(*args, **kwargs):
  765             if digest not in cluster_info['tempurl'].get(
  766                     'allowed_digests', ['sha1']):
  767                 raise SkipTest("tempurl does not support %s signatures" %
  768                                digest)
  769             return func(*args, **kwargs)
  770         return wrapper
  771     return decorator
  772 
  773 
  774 class TestTempurlAlgorithms(Base):
  775     env = TestTempurlEnv
  776 
  777     def get_sig(self, expires, digest, encoding):
  778         path = self.env.conn.make_path(self.env.obj.path)
  779 
  780         sig = hmac.new(
  781             self.env.tempurl_key,
  782             '%s\n%s\n%s' % ('GET', expires,
  783                             urllib.parse.unquote(path)),
  784             getattr(hashlib, digest))
  785 
  786         if encoding == 'hex':
  787             return sig.hexdigest()
  788         elif encoding == 'base64':
  789             return digest + ':' + base64.b64encode(sig.digest())
  790         elif encoding == 'base64-no-padding':
  791             return digest + ':' + base64.b64encode(sig.digest()).strip('=')
  792         elif encoding == 'url-safe-base64':
  793             return digest + ':' + base64.urlsafe_b64encode(sig.digest())
  794         else:
  795             raise ValueError('Unrecognized encoding: %r' % encoding)
  796 
  797     def _do_test(self, digest, encoding, expect_failure=False):
  798         expires = int(time()) + 86400
  799         sig = self.get_sig(expires, digest, encoding)
  800 
  801         if encoding == 'url-safe-base64':
  802             # Make sure that we're actually testing url-safe-ness
  803             while '-' not in sig and '_' not in sig:
  804                 expires += 1
  805                 sig = self.get_sig(expires, digest, encoding)
  806 
  807         parms = {'temp_url_sig': sig, 'temp_url_expires': str(expires)}
  808 
  809         if expect_failure:
  810             with self.assertRaises(ResponseError):
  811                 self.env.obj.read(parms=parms, cfg={'no_auth_token': True})
  812             self.assert_status([401])
  813 
  814             # ditto for HEADs
  815             with self.assertRaises(ResponseError):
  816                 self.env.obj.info(parms=parms, cfg={'no_auth_token': True})
  817             self.assert_status([401])
  818         else:
  819             contents = self.env.obj.read(
  820                 parms=parms,
  821                 cfg={'no_auth_token': True})
  822             self.assertEqual(contents, "obj contents")
  823 
  824             # GET tempurls also allow HEAD requests
  825             self.assertTrue(self.env.obj.info(
  826                 parms=parms, cfg={'no_auth_token': True}))
  827 
  828     @requires_digest('sha1')
  829     def test_sha1(self):
  830         self._do_test('sha1', 'hex')
  831         self._do_test('sha1', 'base64')
  832         self._do_test('sha1', 'base64-no-padding')
  833         self._do_test('sha1', 'url-safe-base64')
  834 
  835     @requires_digest('sha256')
  836     def test_sha256(self):
  837         # apparently Cloud Files supports hex-encoded SHA-256
  838         # let's not break that just for the sake of being different
  839         self._do_test('sha256', 'hex')
  840         self._do_test('sha256', 'base64')
  841         self._do_test('sha256', 'base64-no-padding')
  842         self._do_test('sha256', 'url-safe-base64')
  843 
  844     @requires_digest('sha512')
  845     def test_sha512(self):
  846         # 128 chars seems awfully long for a signature -- let's require base64
  847         self._do_test('sha512', 'hex', expect_failure=True)
  848         self._do_test('sha512', 'base64')
  849         self._do_test('sha512', 'base64-no-padding')
  850         self._do_test('sha512', 'url-safe-base64')