"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "test/unit/container/test_backend.py" between
swift-2.19.1.tar.gz and swift-2.21.0.tar.gz

About: OpenStack swift is software for creating redundant, scalable object storage using clusters of commodity servers to store terabytes or even petabytes of accessible data (now supporting storage policies).
The "Stein" series (latest release).

test_backend.py  (swift-2.19.1):test_backend.py  (swift-2.21.0)
skipping to change at line 17 skipping to change at line 17
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, # distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
""" Tests for swift.container.backend """ """ Tests for swift.container.backend """
import base64
import errno import errno
import os import os
import hashlib import hashlib
import inspect import inspect
import unittest import unittest
from time import sleep, time from time import sleep, time
from uuid import uuid4 from uuid import uuid4
import random import random
from collections import defaultdict from collections import defaultdict
from contextlib import contextmanager from contextlib import contextmanager
import sqlite3 import sqlite3
import pickle import pickle
import json import json
import six
from swift.common.exceptions import LockTimeout from swift.common.exceptions import LockTimeout
from swift.container.backend import ContainerBroker, \ from swift.container.backend import ContainerBroker, \
update_new_item_from_existing, UNSHARDED, SHARDING, SHARDED, \ update_new_item_from_existing, UNSHARDED, SHARDING, SHARDED, \
COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES
from swift.common.db import DatabaseAlreadyExists, GreenDBConnection from swift.common.db import DatabaseAlreadyExists, GreenDBConnection
from swift.common.utils import Timestamp, encode_timestamps, hash_path, \ from swift.common.utils import Timestamp, encode_timestamps, hash_path, \
ShardRange, make_db_file_path ShardRange, make_db_file_path
from swift.common.storage_policy import POLICIES from swift.common.storage_policy import POLICIES
import mock import mock
skipping to change at line 130 skipping to change at line 133
raise Exception('OMG') raise Exception('OMG')
except Exception: except Exception:
pass pass
self.assertTrue(broker.conn is None) self.assertTrue(broker.conn is None)
@with_tempdir @with_tempdir
def test_is_deleted(self, tempdir): def test_is_deleted(self, tempdir):
# Test ContainerBroker.is_deleted() and get_info_is_deleted() # Test ContainerBroker.is_deleted() and get_info_is_deleted()
ts_iter = make_timestamp_iter() ts_iter = make_timestamp_iter()
db_path = os.path.join( db_path = os.path.join(
tempdir, 'part', 'suffix', 'hash', 'container.db') tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker(db_path, account='a', container='c') broker = ContainerBroker(db_path, account='a', container='c')
broker.initialize(next(ts_iter).internal, 0) broker.initialize(next(ts_iter).internal, 0)
self.assertFalse(broker.is_deleted()) self.assertFalse(broker.is_deleted())
broker.delete_db(next(ts_iter).internal) broker.delete_db(next(ts_iter).internal)
self.assertTrue(broker.is_deleted()) self.assertTrue(broker.is_deleted())
def check_object_counted(broker_to_test, broker_with_object): def check_object_counted(broker_to_test, broker_with_object):
obj = {'name': 'o', 'created_at': next(ts_iter).internal, obj = {'name': 'o', 'created_at': next(ts_iter).internal,
'size': 0, 'content_type': 'text/plain', 'etag': EMPTY_ETAG, 'size': 0, 'content_type': 'text/plain', 'etag': EMPTY_ETAG,
skipping to change at line 237 skipping to change at line 240
own_sr = broker.get_own_shard_range() own_sr = broker.get_own_shard_range()
own_sr.update_meta(3, 4, meta_timestamp=next(ts_iter)) own_sr.update_meta(3, 4, meta_timestamp=next(ts_iter))
broker.merge_shard_ranges([own_sr]) broker.merge_shard_ranges([own_sr])
self.assertTrue(broker.is_deleted()) self.assertTrue(broker.is_deleted())
@with_tempdir @with_tempdir
def test_empty(self, tempdir): def test_empty(self, tempdir):
# Test ContainerBroker.empty # Test ContainerBroker.empty
ts_iter = make_timestamp_iter() ts_iter = make_timestamp_iter()
db_path = os.path.join( db_path = os.path.join(
tempdir, 'part', 'suffix', 'hash', 'container.db') tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker(db_path, account='a', container='c') broker = ContainerBroker(db_path, account='a', container='c')
broker.initialize(next(ts_iter).internal, 0) broker.initialize(next(ts_iter).internal, 0)
self.assertTrue(broker.is_root_container()) self.assertTrue(broker.is_root_container())
def check_object_counted(broker_to_test, broker_with_object): def check_object_counted(broker_to_test, broker_with_object):
obj = {'name': 'o', 'created_at': next(ts_iter).internal, obj = {'name': 'o', 'created_at': next(ts_iter).internal,
'size': 0, 'content_type': 'text/plain', 'etag': EMPTY_ETAG, 'size': 0, 'content_type': 'text/plain', 'etag': EMPTY_ETAG,
'deleted': 0} 'deleted': 0}
broker_with_object.merge_items([dict(obj)]) broker_with_object.merge_items([dict(obj)])
self.assertFalse(broker_to_test.empty()) self.assertFalse(broker_to_test.empty())
skipping to change at line 342 skipping to change at line 345
own_sr.update_meta(3, 4, meta_timestamp=next(ts_iter)) own_sr.update_meta(3, 4, meta_timestamp=next(ts_iter))
broker.merge_shard_ranges([own_sr]) broker.merge_shard_ranges([own_sr])
self.assertTrue(broker.empty()) self.assertTrue(broker.empty())
@with_tempdir @with_tempdir
def test_empty_shard_container(self, tempdir): def test_empty_shard_container(self, tempdir):
# Test ContainerBroker.empty for a shard container where shard range # Test ContainerBroker.empty for a shard container where shard range
# usage should not be considered # usage should not be considered
ts_iter = make_timestamp_iter() ts_iter = make_timestamp_iter()
db_path = os.path.join( db_path = os.path.join(
tempdir, 'part', 'suffix', 'hash', 'container.db') tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker(db_path, account='.shards_a', container='cc') broker = ContainerBroker(db_path, account='.shards_a', container='cc')
broker.initialize(next(ts_iter).internal, 0) broker.initialize(next(ts_iter).internal, 0)
broker.set_sharding_sysmeta('Root', 'a/c') broker.set_sharding_sysmeta('Root', 'a/c')
self.assertFalse(broker.is_root_container()) self.assertFalse(broker.is_root_container())
def check_object_counted(broker_to_test, broker_with_object): def check_object_counted(broker_to_test, broker_with_object):
obj = {'name': 'o', 'created_at': next(ts_iter).internal, obj = {'name': 'o', 'created_at': next(ts_iter).internal,
'size': 0, 'content_type': 'text/plain', 'etag': EMPTY_ETAG, 'size': 0, 'content_type': 'text/plain', 'etag': EMPTY_ETAG,
'deleted': 0} 'deleted': 0}
broker_with_object.merge_items([dict(obj)]) broker_with_object.merge_items([dict(obj)])
skipping to change at line 480 skipping to change at line 483
'd41d8cd98f00b204e9800998ecf8427e') 'd41d8cd98f00b204e9800998ecf8427e')
broker.put_object('z', Timestamp.now().internal, 0, 'text/plain', broker.put_object('z', Timestamp.now().internal, 0, 'text/plain',
'd41d8cd98f00b204e9800998ecf8427e') 'd41d8cd98f00b204e9800998ecf8427e')
# Test before deletion # Test before deletion
broker.reclaim(Timestamp.now().internal, time()) broker.reclaim(Timestamp.now().internal, time())
broker.delete_db(Timestamp.now().internal) broker.delete_db(Timestamp.now().internal)
@with_tempdir @with_tempdir
def test_reclaim_deadlock(self, tempdir): def test_reclaim_deadlock(self, tempdir):
db_path = os.path.join( db_path = os.path.join(
tempdir, 'part', 'suffix', 'hash', '%s.db' % uuid4()) tempdir, 'containers', 'part', 'suffix', 'hash', '%s.db' % uuid4())
broker = ContainerBroker(db_path, account='a', container='c') broker = ContainerBroker(db_path, account='a', container='c')
broker.initialize(Timestamp(100).internal, 0) broker.initialize(Timestamp(100).internal, 0)
# there's some magic count here that causes the failure, something # there's some magic count here that causes the failure, something
# about the size of object records and sqlite page size maybe? # about the size of object records and sqlite page size maybe?
count = 23000 count = 23000
for i in range(count): for i in range(count):
obj_name = 'o%d' % i obj_name = 'o%d' % i
ts = Timestamp(200).internal ts = Timestamp(200).internal
broker.delete_object(obj_name, ts) broker.delete_object(obj_name, ts)
broker._commit_puts() broker._commit_puts()
skipping to change at line 509 skipping to change at line 512
# check all objects were reclaimed # check all objects were reclaimed
with broker.get() as conn: with broker.get() as conn:
self.assertEqual(conn.execute( self.assertEqual(conn.execute(
"SELECT count(*) FROM object" "SELECT count(*) FROM object"
).fetchone()[0], 0) ).fetchone()[0], 0)
@with_tempdir @with_tempdir
def test_reclaim_shard_ranges(self, tempdir): def test_reclaim_shard_ranges(self, tempdir):
ts_iter = make_timestamp_iter() ts_iter = make_timestamp_iter()
db_path = os.path.join( db_path = os.path.join(
tempdir, 'part', 'suffix', 'hash', '%s.db' % uuid4()) tempdir, 'containers', 'part', 'suffix', 'hash', '%s.db' % uuid4())
broker = ContainerBroker(db_path, account='a', container='c') broker = ContainerBroker(db_path, account='a', container='c')
broker.initialize(next(ts_iter).internal, 0) broker.initialize(next(ts_iter).internal, 0)
older = next(ts_iter) older = next(ts_iter)
same = next(ts_iter) same = next(ts_iter)
newer = next(ts_iter) newer = next(ts_iter)
shard_ranges = [ shard_ranges = [
ShardRange('.shards_a/older_deleted', older.internal, '', 'a', ShardRange('.shards_a/older_deleted', older.internal, '', 'a',
deleted=True), deleted=True),
ShardRange('.shards_a/same_deleted', same.internal, 'a', 'b', ShardRange('.shards_a/same_deleted', same.internal, 'a', 'b',
deleted=True), deleted=True),
skipping to change at line 1417 skipping to change at line 1420
# obviously this should return the fresh_db_path # obviously this should return the fresh_db_path
os.unlink(db_path) os.unlink(db_path)
broker.reload_db_files() broker.reload_db_files()
check_sharded_db_files(broker) check_sharded_db_files(broker)
broker = ContainerBroker(db_path, account=acct, container=cont) broker = ContainerBroker(db_path, account=acct, container=cont)
check_sharded_db_files(broker) check_sharded_db_files(broker)
@with_tempdir @with_tempdir
def test_sharding_initiated_and_required(self, tempdir): def test_sharding_initiated_and_required(self, tempdir):
db_path = os.path.join( db_path = os.path.join(
tempdir, 'part', 'suffix', 'hash', '%s.db' % uuid4()) tempdir, 'containers', 'part', 'suffix', 'hash', '%s.db' % uuid4())
broker = ContainerBroker(db_path, account='a', container='c') broker = ContainerBroker(db_path, account='a', container='c')
broker.initialize(Timestamp.now().internal, 0) broker.initialize(Timestamp.now().internal, 0)
# no shard ranges # no shard ranges
self.assertIs(False, broker.sharding_initiated()) self.assertIs(False, broker.sharding_initiated())
self.assertIs(False, broker.sharding_required()) self.assertIs(False, broker.sharding_required())
# only own shard range # only own shard range
own_sr = broker.get_own_shard_range() own_sr = broker.get_own_shard_range()
for state in ShardRange.STATES: for state in ShardRange.STATES:
own_sr.update_state(state, state_timestamp=Timestamp.now()) own_sr.update_state(state, state_timestamp=Timestamp.now())
broker.merge_shard_ranges(own_sr) broker.merge_shard_ranges(own_sr)
skipping to change at line 1859 skipping to change at line 1862
self.assertEqual(info['object_count'], 0) self.assertEqual(info['object_count'], 0)
self.assertEqual(info['bytes_used'], 0) self.assertEqual(info['bytes_used'], 0)
info = broker.get_info() info = broker.get_info()
self.assertEqual(info['x_container_sync_point1'], -1) self.assertEqual(info['x_container_sync_point1'], -1)
self.assertEqual(info['x_container_sync_point2'], -1) self.assertEqual(info['x_container_sync_point2'], -1)
@with_tempdir @with_tempdir
def test_get_info_sharding_states(self, tempdir): def test_get_info_sharding_states(self, tempdir):
ts_iter = make_timestamp_iter() ts_iter = make_timestamp_iter()
db_path = os.path.join(tempdir, 'part', 'suffix', 'hash', 'hash.db') db_path = os.path.join(
tempdir, 'containers', 'part', 'suffix', 'hash', 'hash.db')
broker = ContainerBroker( broker = ContainerBroker(
db_path, account='myaccount', container='mycontainer') db_path, account='myaccount', container='mycontainer')
broker.initialize(next(ts_iter).internal, 0) broker.initialize(next(ts_iter).internal, 0)
broker.put_object('o1', next(ts_iter).internal, 123, 'text/plain', broker.put_object('o1', next(ts_iter).internal, 123, 'text/plain',
'fake etag') 'fake etag')
sr = ShardRange('.shards_a/c', next(ts_iter)) sr = ShardRange('.shards_a/c', next(ts_iter))
broker.merge_shard_ranges(sr) broker.merge_shard_ranges(sr)
def check_info(expected): def check_info(expected):
errors = [] errors = []
skipping to change at line 2018 skipping to change at line 2022
broker.delete_object('o2', Timestamp.now().internal) broker.delete_object('o2', Timestamp.now().internal)
info = broker.get_info() info = broker.get_info()
self.assertEqual(info['object_count'], 0) self.assertEqual(info['object_count'], 0)
self.assertEqual(info['bytes_used'], 0) self.assertEqual(info['bytes_used'], 0)
self.assertEqual(info['reported_object_count'], 2) self.assertEqual(info['reported_object_count'], 2)
self.assertEqual(info['reported_bytes_used'], 1123) self.assertEqual(info['reported_bytes_used'], 1123)
@with_tempdir @with_tempdir
def test_get_replication_info(self, tempdir): def test_get_replication_info(self, tempdir):
ts_iter = make_timestamp_iter() ts_iter = make_timestamp_iter()
db_path = os.path.join(tempdir, 'part', 'suffix', 'hash', 'hash.db') db_path = os.path.join(
tempdir, 'containers', 'part', 'suffix', 'hash', 'hash.db')
broker = ContainerBroker( broker = ContainerBroker(
db_path, account='myaccount', container='mycontainer') db_path, account='myaccount', container='mycontainer')
broker.initialize(next(ts_iter).internal, 0) broker.initialize(next(ts_iter).internal, 0)
metadata = {'blah': ['val', next(ts_iter).internal]} metadata = {'blah': ['val', next(ts_iter).internal]}
broker.update_metadata(metadata) broker.update_metadata(metadata)
expected = broker.get_info() expected = broker.get_info()
expected['metadata'] = json.dumps(metadata) expected['metadata'] = json.dumps(metadata)
expected.pop('object_count') expected.pop('object_count')
expected['count'] = 0 expected['count'] = 0
expected['max_row'] = -1 expected['max_row'] = -1
skipping to change at line 2067 skipping to change at line 2072
EMPTY_ETAG, 1, 0)) EMPTY_ETAG, 1, 0))
object_names = [o[0] for o in objects] object_names = [o[0] for o in objects]
def get_rows(broker): def get_rows(broker):
with broker.get() as conn: with broker.get() as conn:
cursor = conn.execute("SELECT * FROM object") cursor = conn.execute("SELECT * FROM object")
return [r[1] for r in cursor] return [r[1] for r in cursor]
def do_setup(): def do_setup():
db_path = os.path.join( db_path = os.path.join(
tempdir, 'part', 'suffix', 'hash', '%s.db' % uuid4()) tempdir, 'containers', 'part', 'suffix',
'hash', '%s.db' % uuid4())
broker = ContainerBroker(db_path, account='a', container='c') broker = ContainerBroker(db_path, account='a', container='c')
broker.initialize(Timestamp.now().internal, 0) broker.initialize(Timestamp.now().internal, 0)
for obj in objects: for obj in objects:
# ensure row order matches put order # ensure row order matches put order
broker.put_object(*obj) broker.put_object(*obj)
broker._commit_puts() broker._commit_puts()
self.assertEqual(3, broker.get_max_row()) # sanity check self.assertEqual(3, broker.get_max_row()) # sanity check
self.assertEqual(object_names, get_rows(broker)) # sanity check self.assertEqual(object_names, get_rows(broker)) # sanity check
return broker return broker
skipping to change at line 2876 skipping to change at line 2882
listing = broker.list_objects_iter(25, None, None, '0:1:', ':') listing = broker.list_objects_iter(25, None, None, '0:1:', ':')
self.assertEqual(len(listing), 2) self.assertEqual(len(listing), 2)
self.assertEqual( self.assertEqual(
[row[0] for row in listing], [row[0] for row in listing],
['0:1:', '0:1:0']) ['0:1:', '0:1:0'])
listing = broker.list_objects_iter(25, None, None, 'b:', ':') listing = broker.list_objects_iter(25, None, None, 'b:', ':')
self.assertEqual(len(listing), 2) self.assertEqual(len(listing), 2)
self.assertEqual([row[0] for row in listing], ['b:a', 'b:b']) self.assertEqual([row[0] for row in listing], ['b:a', 'b:b'])
def test_chexor(self): def test_chexor(self):
def md5_str(s):
if not isinstance(s, bytes):
s = s.encode('utf8')
return hashlib.md5(s).hexdigest()
broker = ContainerBroker(':memory:', account='a', container='c') broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(Timestamp('1').internal, 0) broker.initialize(Timestamp('1').internal, 0)
broker.put_object('a', Timestamp(1).internal, 0, broker.put_object('a', Timestamp(1).internal, 0,
'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e')
broker.put_object('b', Timestamp(2).internal, 0, broker.put_object('b', Timestamp(2).internal, 0,
'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e')
hasha = hashlib.md5('%s-%s' % ('a', Timestamp(1).internal)).digest() hasha = md5_str('%s-%s' % ('a', Timestamp(1).internal))
hashb = hashlib.md5('%s-%s' % ('b', Timestamp(2).internal)).digest() hashb = md5_str('%s-%s' % ('b', Timestamp(2).internal))
hashc = ''.join( hashc = '%032x' % (int(hasha, 16) ^ int(hashb, 16))
('%02x' % (ord(a) ^ ord(b)) for a, b in zip(hasha, hashb)))
self.assertEqual(broker.get_info()['hash'], hashc) self.assertEqual(broker.get_info()['hash'], hashc)
broker.put_object('b', Timestamp(3).internal, 0, broker.put_object('b', Timestamp(3).internal, 0,
'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e')
hashb = hashlib.md5('%s-%s' % ('b', Timestamp(3).internal)).digest() hashb = md5_str('%s-%s' % ('b', Timestamp(3).internal))
hashc = ''.join( hashc = '%032x' % (int(hasha, 16) ^ int(hashb, 16))
('%02x' % (ord(a) ^ ord(b)) for a, b in zip(hasha, hashb)))
self.assertEqual(broker.get_info()['hash'], hashc) self.assertEqual(broker.get_info()['hash'], hashc)
def test_newid(self): def test_newid(self):
# test DatabaseBroker.newid # test DatabaseBroker.newid
broker = ContainerBroker(':memory:', account='a', container='c') broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(Timestamp('1').internal, 0) broker.initialize(Timestamp('1').internal, 0)
id = broker.get_info()['id'] id = broker.get_info()['id']
broker.newid('someid') broker.newid('someid')
self.assertNotEqual(id, broker.get_info()['id']) self.assertNotEqual(id, broker.get_info()['id'])
skipping to change at line 2967 skipping to change at line 2976
broker.put_object('b', next(ts).internal, 0, 'text/plain', broker.put_object('b', next(ts).internal, 0, 'text/plain',
EMPTY_ETAG) EMPTY_ETAG)
with mock.patch('swift.container.backend.tpool') as mock_tpool: with mock.patch('swift.container.backend.tpool') as mock_tpool:
broker.get_info() broker.get_info()
mock_tpool.execute.assert_called_once() mock_tpool.execute.assert_called_once()
def test_merge_items_overwrite_unicode(self): def test_merge_items_overwrite_unicode(self):
# test DatabaseBroker.merge_items # test DatabaseBroker.merge_items
snowman = u'\N{SNOWMAN}'.encode('utf-8') snowman = u'\N{SNOWMAN}'
if six.PY2:
snowman = snowman.encode('utf-8')
broker1 = ContainerBroker(':memory:', account='a', container='c') broker1 = ContainerBroker(':memory:', account='a', container='c')
broker1.initialize(Timestamp('1').internal, 0) broker1.initialize(Timestamp('1').internal, 0)
id = broker1.get_info()['id'] id = broker1.get_info()['id']
broker2 = ContainerBroker(':memory:', account='a', container='c') broker2 = ContainerBroker(':memory:', account='a', container='c')
broker2.initialize(Timestamp('1').internal, 0) broker2.initialize(Timestamp('1').internal, 0)
broker1.put_object(snowman, Timestamp(2).internal, 0, broker1.put_object(snowman, Timestamp(2).internal, 0,
'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e')
broker1.put_object('b', Timestamp(3).internal, 0, broker1.put_object('b', Timestamp(3).internal, 0,
'text/plain', 'd41d8cd98f00b204e9800998ecf8427e') 'text/plain', 'd41d8cd98f00b204e9800998ecf8427e')
broker2.merge_items(json.loads(json.dumps(broker1.get_items_since( broker2.merge_items(json.loads(json.dumps(broker1.get_items_since(
skipping to change at line 3150 skipping to change at line 3161
# first init an acct DB without the policy_stat table present # first init an acct DB without the policy_stat table present
broker = ContainerBroker(db_path, account='a', container='c') broker = ContainerBroker(db_path, account='a', container='c')
broker.initialize(next(ts).internal, 1) broker.initialize(next(ts).internal, 1)
# manually make some pending entries lacking storage_policy_index # manually make some pending entries lacking storage_policy_index
with open(broker.pending_file, 'a+b') as fp: with open(broker.pending_file, 'a+b') as fp:
for i in range(10): for i in range(10):
name, timestamp, size, content_type, etag, deleted = ( name, timestamp, size, content_type, etag, deleted = (
'o%s' % i, next(ts).internal, 0, 'c', 'e', 0) 'o%s' % i, next(ts).internal, 0, 'c', 'e', 0)
fp.write(':') fp.write(b':')
fp.write(pickle.dumps( fp.write(base64.b64encode(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted), (name, timestamp, size, content_type, etag, deleted),
protocol=2).encode('base64')) protocol=2)))
fp.flush() fp.flush()
# use put_object to append some more entries with different # use put_object to append some more entries with different
# values for storage_policy_index # values for storage_policy_index
for i in range(10, 30): for i in range(10, 30):
name = 'o%s' % i name = 'o%s' % i
if i < 20: if i < 20:
size = 1 size = 1
storage_policy_index = 0 storage_policy_index = 0
else: else:
skipping to change at line 3197 skipping to change at line 3208
broker = ContainerBroker(db_path, account='a', container='c', broker = ContainerBroker(db_path, account='a', container='c',
stale_reads_ok=False) stale_reads_ok=False)
broker.initialize(next(ts).internal, 1) broker.initialize(next(ts).internal, 1)
# manually make some pending entries # manually make some pending entries
with open(broker.pending_file, 'a+b') as fp: with open(broker.pending_file, 'a+b') as fp:
for i in range(10): for i in range(10):
name, timestamp, size, content_type, etag, deleted = ( name, timestamp, size, content_type, etag, deleted = (
'o%s' % i, next(ts).internal, 0, 'c', 'e', 0) 'o%s' % i, next(ts).internal, 0, 'c', 'e', 0)
fp.write(':') fp.write(b':')
fp.write(pickle.dumps( fp.write(base64.b64encode(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted), (name, timestamp, size, content_type, etag, deleted),
protocol=2).encode('base64')) protocol=2)))
fp.flush() fp.flush()
broker._commit_puts = mock_commit_puts broker._commit_puts = mock_commit_puts
with self.assertRaises(sqlite3.OperationalError) as exc_context: with self.assertRaises(sqlite3.OperationalError) as exc_context:
broker.get_info() broker.get_info()
self.assertIn('unable to open database file', self.assertIn('unable to open database file',
str(exc_context.exception)) str(exc_context.exception))
@with_tempdir @with_tempdir
def test_get_info_stale_read_ok(self, tempdir): def test_get_info_stale_read_ok(self, tempdir):
skipping to change at line 3226 skipping to change at line 3237
broker = ContainerBroker(db_path, account='a', container='c', broker = ContainerBroker(db_path, account='a', container='c',
stale_reads_ok=True) stale_reads_ok=True)
broker.initialize(next(ts).internal, 1) broker.initialize(next(ts).internal, 1)
# manually make some pending entries # manually make some pending entries
with open(broker.pending_file, 'a+b') as fp: with open(broker.pending_file, 'a+b') as fp:
for i in range(10): for i in range(10):
name, timestamp, size, content_type, etag, deleted = ( name, timestamp, size, content_type, etag, deleted = (
'o%s' % i, next(ts).internal, 0, 'c', 'e', 0) 'o%s' % i, next(ts).internal, 0, 'c', 'e', 0)
fp.write(':') fp.write(b':')
fp.write(pickle.dumps( fp.write(base64.b64encode(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted), (name, timestamp, size, content_type, etag, deleted),
protocol=2).encode('base64')) protocol=2)))
fp.flush() fp.flush()
broker._commit_puts = mock_commit_puts broker._commit_puts = mock_commit_puts
broker.get_info() broker.get_info()
@with_tempdir @with_tempdir
def test_create_broker(self, tempdir): def test_create_broker(self, tempdir):
broker = ContainerBroker.create_broker(tempdir, 0, 'a', 'c') broker = ContainerBroker.create_broker(tempdir, 0, 'a', 'c')
hsh = hash_path('a', 'c') hsh = hash_path('a', 'c')
expected_path = os.path.join( expected_path = os.path.join(
skipping to change at line 3730 skipping to change at line 3741
str(state), str(state + 1), str(state), str(state + 1),
2 * state, 2 * state + 1, 2, 2 * state, 2 * state + 1, 2,
state=state)) state=state))
for state in ShardRange.STATES) for state in ShardRange.STATES)
def make_broker(a, c): def make_broker(a, c):
db_path = os.path.join(tempdir, '%s.db' % uuid4()) db_path = os.path.join(tempdir, '%s.db' % uuid4())
broker = ContainerBroker(db_path, account=a, container=c) broker = ContainerBroker(db_path, account=a, container=c)
broker.initialize(next(ts_iter).internal, 0) broker.initialize(next(ts_iter).internal, 0)
broker.set_sharding_sysmeta('Root', 'a/c') broker.set_sharding_sysmeta('Root', 'a/c')
broker.merge_shard_ranges(shard_range_by_state.values()) broker.merge_shard_ranges(list(shard_range_by_state.values()))
return broker return broker
# make broker appear to be a root container # make broker appear to be a root container
broker = make_broker('a', 'c') broker = make_broker('a', 'c')
self.assertTrue(broker.is_root_container()) self.assertTrue(broker.is_root_container())
included_states = (ShardRange.ACTIVE, ShardRange.SHARDING, included_states = (ShardRange.ACTIVE, ShardRange.SHARDING,
ShardRange.SHRINKING) ShardRange.SHRINKING)
included = [shard_range_by_state[state] for state in included_states] included = [shard_range_by_state[state] for state in included_states]
expected = { expected = {
'object_count': sum([sr.object_count for sr in included]), 'object_count': sum([sr.object_count for sr in included]),
skipping to change at line 3945 skipping to change at line 3956
self.assertFalse(last_found) self.assertFalse(last_found)
self.assertFalse(ranges) self.assertFalse(ranges)
lines = broker.logger.get_lines_for_level('error') lines = broker.logger.get_lines_for_level('error')
self.assertIn('Problem finding shard upper', lines[0]) self.assertIn('Problem finding shard upper', lines[0])
self.assertFalse(lines[1:]) self.assertFalse(lines[1:])
@with_tempdir @with_tempdir
def test_set_db_states(self, tempdir): def test_set_db_states(self, tempdir):
ts_iter = make_timestamp_iter() ts_iter = make_timestamp_iter()
db_path = os.path.join( db_path = os.path.join(
tempdir, 'part', 'suffix', 'hash', 'container.db') tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker(db_path, account='a', container='c') broker = ContainerBroker(db_path, account='a', container='c')
broker.initialize(next(ts_iter).internal, 0) broker.initialize(next(ts_iter).internal, 0)
# load up the broker with some objects # load up the broker with some objects
objects = [{'name': 'obj_%d' % i, objects = [{'name': 'obj_%d' % i,
'created_at': next(ts_iter).normal, 'created_at': next(ts_iter).normal,
'content_type': 'text/plain', 'content_type': 'text/plain',
'etag': 'etag_%d' % i, 'etag': 'etag_%d' % i,
'size': 1024 * i, 'size': 1024 * i,
'deleted': 0, 'deleted': 0,
skipping to change at line 3991 skipping to change at line 4002
object_count=len(objects[i:i + 2]), object_count=len(objects[i:i + 2]),
bytes_used=sum(obj['size'] for obj in objects[i:i + 2]), bytes_used=sum(obj['size'] for obj in objects[i:i + 2]),
meta_timestamp=next(ts_iter)) for i in range(0, 6, 2)] meta_timestamp=next(ts_iter)) for i in range(0, 6, 2)]
deleted_range = ShardRange('.shards_a/shard_range_z', next(ts_iter), deleted_range = ShardRange('.shards_a/shard_range_z', next(ts_iter),
'z', '', state=ShardRange.SHARDED, 'z', '', state=ShardRange.SHARDED,
deleted=1) deleted=1)
own_sr = ShardRange(name='a/c', timestamp=next(ts_iter), own_sr = ShardRange(name='a/c', timestamp=next(ts_iter),
state=ShardRange.ACTIVE) state=ShardRange.ACTIVE)
broker.merge_shard_ranges([own_sr] + shard_ranges + [deleted_range]) broker.merge_shard_ranges([own_sr] + shard_ranges + [deleted_range])
ts_epoch = next(ts_iter) ts_epoch = next(ts_iter)
new_db_path = os.path.join(tempdir, 'part', 'suffix', 'hash', new_db_path = os.path.join(tempdir, 'containers', 'part', 'suffix',
'container_%s.db' % ts_epoch.normal) 'hash', 'container_%s.db' % ts_epoch.normal)
def check_broker_properties(broker): def check_broker_properties(broker):
# these broker properties should remain unchanged as state changes # these broker properties should remain unchanged as state changes
self.assertEqual(broker.get_max_row(), 5) self.assertEqual(broker.get_max_row(), 5)
all_metadata = broker.metadata all_metadata = broker.metadata
original_meta = dict((k, all_metadata[k]) for k in meta) original_meta = dict((k, all_metadata[k]) for k in meta)
self.assertEqual(original_meta, meta) self.assertEqual(original_meta, meta)
self.assertEqual(broker.get_syncs(True)[0], incoming_sync) self.assertEqual(broker.get_syncs(True)[0], incoming_sync)
self.assertEqual(broker.get_syncs(False)[0], outgoing_sync) self.assertEqual(broker.get_syncs(False)[0], outgoing_sync)
self.assertEqual(shard_ranges + [own_sr, deleted_range], self.assertEqual(shard_ranges + [own_sr, deleted_range],
skipping to change at line 4138 skipping to change at line 4149
self.assertTrue(broker.is_deleted()) self.assertTrue(broker.is_deleted())
self.assertEqual(SHARDED, broker.get_db_state()) self.assertEqual(SHARDED, broker.get_db_state())
do_revive_shard_delete(shard_ranges) do_revive_shard_delete(shard_ranges)
do_revive_shard_delete(shard_ranges) do_revive_shard_delete(shard_ranges)
@with_tempdir @with_tempdir
def test_set_sharding_state_errors(self, tempdir): def test_set_sharding_state_errors(self, tempdir):
ts_iter = make_timestamp_iter() ts_iter = make_timestamp_iter()
db_path = os.path.join( db_path = os.path.join(
tempdir, 'part', 'suffix', 'hash', 'container.db') tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker(db_path, account='a', container='c', broker = ContainerBroker(db_path, account='a', container='c',
logger=FakeLogger()) logger=FakeLogger())
broker.initialize(next(ts_iter).internal, 0) broker.initialize(next(ts_iter).internal, 0)
broker.enable_sharding(next(ts_iter)) broker.enable_sharding(next(ts_iter))
orig_execute = GreenDBConnection.execute orig_execute = GreenDBConnection.execute
trigger = 'INSERT into object' trigger = 'INSERT into object'
def mock_execute(conn, *args, **kwargs): def mock_execute(conn, *args, **kwargs):
if trigger in args[0]: if trigger in args[0]:
skipping to change at line 4174 skipping to change at line 4185
res = broker.set_sharding_state() res = broker.set_sharding_state()
self.assertFalse(res) self.assertFalse(res)
lines = broker.logger.get_lines_for_level('error') lines = broker.logger.get_lines_for_level('error')
self.assertIn('Failed to set matching', lines[0]) self.assertIn('Failed to set matching', lines[0])
self.assertFalse(lines[1:]) self.assertFalse(lines[1:])
@with_tempdir @with_tempdir
def test_set_sharded_state_errors(self, tempdir): def test_set_sharded_state_errors(self, tempdir):
ts_iter = make_timestamp_iter() ts_iter = make_timestamp_iter()
retiring_db_path = os.path.join( retiring_db_path = os.path.join(
tempdir, 'part', 'suffix', 'hash', 'container.db') tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker(retiring_db_path, account='a', container='c', broker = ContainerBroker(retiring_db_path, account='a', container='c',
logger=FakeLogger()) logger=FakeLogger())
broker.initialize(next(ts_iter).internal, 0) broker.initialize(next(ts_iter).internal, 0)
pre_epoch = next(ts_iter) pre_epoch = next(ts_iter)
broker.enable_sharding(next(ts_iter)) broker.enable_sharding(next(ts_iter))
self.assertTrue(broker.set_sharding_state()) self.assertTrue(broker.set_sharding_state())
# unlink fails # unlink fails
with mock.patch('os.unlink', side_effect=OSError(errno.EPERM)): with mock.patch('os.unlink', side_effect=OSError(errno.EPERM)):
self.assertFalse(broker.set_sharded_state()) self.assertFalse(broker.set_sharded_state())
lines = broker.logger.get_lines_for_level('error') lines = broker.logger.get_lines_for_level('error')
skipping to change at line 4219 skipping to change at line 4230
lines = broker.logger.get_lines_for_level('warning') lines = broker.logger.get_lines_for_level('warning')
self.assertIn('Refusing to delete', lines[0]) self.assertIn('Refusing to delete', lines[0])
self.assertFalse(lines[1:]) self.assertFalse(lines[1:])
self.assertFalse(broker.logger.get_lines_for_level('error')) self.assertFalse(broker.logger.get_lines_for_level('error'))
self.assertTrue(os.path.exists(broker.db_file)) self.assertTrue(os.path.exists(broker.db_file))
@with_tempdir @with_tempdir
def test_get_brokers(self, tempdir): def test_get_brokers(self, tempdir):
ts_iter = make_timestamp_iter() ts_iter = make_timestamp_iter()
retiring_db_path = os.path.join( retiring_db_path = os.path.join(
tempdir, 'part', 'suffix', 'hash', 'container.db') tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker(retiring_db_path, account='a', container='c', broker = ContainerBroker(retiring_db_path, account='a', container='c',
logger=FakeLogger()) logger=FakeLogger())
broker.initialize(next(ts_iter).internal, 0) broker.initialize(next(ts_iter).internal, 0)
brokers = broker.get_brokers() brokers = broker.get_brokers()
self.assertEqual(retiring_db_path, brokers[0].db_file) self.assertEqual(retiring_db_path, brokers[0].db_file)
self.assertFalse(brokers[0].skip_commits) self.assertFalse(brokers[0].skip_commits)
self.assertFalse(brokers[1:]) self.assertFalse(brokers[1:])
broker.enable_sharding(next(ts_iter)) broker.enable_sharding(next(ts_iter))
self.assertTrue(broker.set_sharding_state()) self.assertTrue(broker.set_sharding_state())
skipping to change at line 4274 skipping to change at line 4285
self.assertFalse(brokers[2:]) self.assertFalse(brokers[2:])
lines = broker.logger.get_lines_for_level('warning') lines = broker.logger.get_lines_for_level('warning')
self.assertIn('Unexpected db files', lines[0]) self.assertIn('Unexpected db files', lines[0])
self.assertFalse(lines[1:]) self.assertFalse(lines[1:])
@with_tempdir @with_tempdir
def test_merge_shard_ranges(self, tempdir): def test_merge_shard_ranges(self, tempdir):
ts_iter = make_timestamp_iter() ts_iter = make_timestamp_iter()
ts = [next(ts_iter) for _ in range(13)] ts = [next(ts_iter) for _ in range(13)]
db_path = os.path.join( db_path = os.path.join(
tempdir, 'part', 'suffix', 'hash', 'container.db') tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker( broker = ContainerBroker(
db_path, account='a', container='c') db_path, account='a', container='c')
broker.initialize(next(ts_iter).internal, 0) broker.initialize(next(ts_iter).internal, 0)
# sanity check # sanity check
self.assertFalse(broker.get_shard_ranges(include_deleted=True)) self.assertFalse(broker.get_shard_ranges(include_deleted=True))
broker.merge_shard_ranges(None) broker.merge_shard_ranges(None)
self.assertFalse(broker.get_shard_ranges(include_deleted=True)) self.assertFalse(broker.get_shard_ranges(include_deleted=True))
skipping to change at line 4372 skipping to change at line 4383
sr_c_10_10_deleted = ShardRange('a/c_c', ts[10], lower='b', upper='c', sr_c_10_10_deleted = ShardRange('a/c_c', ts[10], lower='b', upper='c',
object_count=0, deleted=1) object_count=0, deleted=1)
broker.merge_shard_ranges([sr_c_10_10_deleted, sr_c_9_12]) broker.merge_shard_ranges([sr_c_10_10_deleted, sr_c_9_12])
self._assert_shard_ranges( self._assert_shard_ranges(
broker, [sr_b_2_2_deleted, sr_c_10_10_deleted]) broker, [sr_b_2_2_deleted, sr_c_10_10_deleted])
@with_tempdir @with_tempdir
def test_merge_shard_ranges_state(self, tempdir): def test_merge_shard_ranges_state(self, tempdir):
ts_iter = make_timestamp_iter() ts_iter = make_timestamp_iter()
db_path = os.path.join( db_path = os.path.join(
tempdir, 'part', 'suffix', 'hash', 'container.db') tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker(db_path, account='a', container='c') broker = ContainerBroker(db_path, account='a', container='c')
broker.initialize(next(ts_iter).internal, 0) broker.initialize(next(ts_iter).internal, 0)
expected_shard_ranges = [] expected_shard_ranges = []
def do_test(orig_state, orig_timestamp, test_state, test_timestamp, def do_test(orig_state, orig_timestamp, test_state, test_timestamp,
expected_state, expected_timestamp): expected_state, expected_timestamp):
index = len(expected_shard_ranges) index = len(expected_shard_ranges)
sr = ShardRange('a/%s' % index, orig_timestamp, '%03d' % index, sr = ShardRange('a/%s' % index, orig_timestamp, '%03d' % index,
'%03d' % (index + 1), state=orig_state) '%03d' % (index + 1), state=orig_state)
broker.merge_shard_ranges([sr]) broker.merge_shard_ranges([sr])
skipping to change at line 4418 skipping to change at line 4429
for test_state in ShardRange.STATES: for test_state in ShardRange.STATES:
ts = next(ts_iter) ts = next(ts_iter)
ts_newer = next(ts_iter) ts_newer = next(ts_iter)
do_test(orig_state, ts, test_state, ts_newer, test_state, do_test(orig_state, ts, test_state, ts_newer, test_state,
ts_newer) ts_newer)
def _check_object_stats_when_sharded(self, a, c, root_a, root_c, tempdir): def _check_object_stats_when_sharded(self, a, c, root_a, root_c, tempdir):
# common setup and assertions for root and shard containers # common setup and assertions for root and shard containers
ts_iter = make_timestamp_iter() ts_iter = make_timestamp_iter()
db_path = os.path.join( db_path = os.path.join(
tempdir, 'part', 'suffix', 'hash', 'container.db') tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker( broker = ContainerBroker(
db_path, account=a, container=c) db_path, account=a, container=c)
broker.initialize(next(ts_iter).internal, 0) broker.initialize(next(ts_iter).internal, 0)
broker.set_sharding_sysmeta('Root', '%s/%s' % (root_a, root_c)) broker.set_sharding_sysmeta('Root', '%s/%s' % (root_a, root_c))
broker.merge_items([{'name': 'obj', 'size': 14, 'etag': 'blah', broker.merge_items([{'name': 'obj', 'size': 14, 'etag': 'blah',
'content_type': 'text/plain', 'deleted': 0, 'content_type': 'text/plain', 'deleted': 0,
'created_at': Timestamp.now().internal}]) 'created_at': Timestamp.now().internal}])
self.assertEqual(1, broker.get_info()['object_count']) self.assertEqual(1, broker.get_info()['object_count'])
self.assertEqual(14, broker.get_info()['bytes_used']) self.assertEqual(14, broker.get_info()['bytes_used'])
 End of changes. 30 change blocks. 
36 lines changed or deleted 47 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)