"Fossies" - the Fresh Open Source Software Archive

Member "swift-2.21.0/test/unit/common/test_db_replicator.py" (25 Mar 2019, 98357 Bytes) of package /linux/misc/openstack/swift-2.21.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_db_replicator.py": 2.19.1_vs_2.21.0.

    1 # Copyright (c) 2010-2012 OpenStack Foundation
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 from __future__ import print_function
   17 import unittest
   18 from contextlib import contextmanager
   19 
   20 import eventlet
   21 import os
   22 import logging
   23 import errno
   24 import math
   25 import time
   26 from shutil import rmtree, copy
   27 from tempfile import mkdtemp, NamedTemporaryFile
   28 import json
   29 
   30 import mock
   31 from mock import patch, call
   32 import six
   33 from six.moves import reload_module
   34 
   35 from swift.container.backend import DATADIR
   36 from swift.common import db_replicator
   37 from swift.common.utils import (normalize_timestamp, hash_path,
   38                                 storage_directory, Timestamp)
   39 from swift.common.exceptions import DriveNotMounted
   40 from swift.common.swob import HTTPException
   41 
   42 from test import unit
   43 from test.unit import FakeLogger, attach_fake_replication_rpc
   44 from test.unit.common.test_db import ExampleBroker
   45 
   46 
   47 TEST_ACCOUNT_NAME = 'a c t'
   48 TEST_CONTAINER_NAME = 'c o n'
   49 
   50 
   51 def teardown_module():
   52     "clean up my monkey patching"
   53     reload_module(db_replicator)
   54 
   55 
   56 @contextmanager
   57 def lock_parent_directory(filename):
   58     yield True
   59 
   60 
   61 class FakeRing(object):
   62     class Ring(object):
   63         devs = []
   64 
   65         def __init__(self, path, reload_time=15, ring_name=None):
   66             pass
   67 
   68         def get_part(self, account, container=None, obj=None):
   69             return 0
   70 
   71         def get_part_nodes(self, part):
   72             return []
   73 
   74         def get_more_nodes(self, *args):
   75             return []
   76 
   77 
   78 class FakeRingWithSingleNode(object):
   79     class Ring(object):
   80         devs = [dict(
   81             id=1, weight=10.0, zone=1, ip='1.1.1.1', port=6200, device='sdb',
   82             meta='', replication_ip='1.1.1.1', replication_port=6200, region=1
   83         )]
   84 
   85         def __init__(self, path, reload_time=15, ring_name=None):
   86             pass
   87 
   88         def get_part(self, account, container=None, obj=None):
   89             return 0
   90 
   91         def get_part_nodes(self, part):
   92             return self.devs
   93 
   94         def get_more_nodes(self, *args):
   95             return (d for d in self.devs)
   96 
   97 
   98 class FakeRingWithNodes(object):
   99     class Ring(object):
  100         devs = [dict(
  101             id=1, weight=10.0, zone=1, ip='1.1.1.1', port=6200, device='sdb',
  102             meta='', replication_ip='1.1.1.1', replication_port=6200, region=1
  103         ), dict(
  104             id=2, weight=10.0, zone=2, ip='1.1.1.2', port=6200, device='sdb',
  105             meta='', replication_ip='1.1.1.2', replication_port=6200, region=2
  106         ), dict(
  107             id=3, weight=10.0, zone=3, ip='1.1.1.3', port=6200, device='sdb',
  108             meta='', replication_ip='1.1.1.3', replication_port=6200, region=1
  109         ), dict(
  110             id=4, weight=10.0, zone=4, ip='1.1.1.4', port=6200, device='sdb',
  111             meta='', replication_ip='1.1.1.4', replication_port=6200, region=2
  112         ), dict(
  113             id=5, weight=10.0, zone=5, ip='1.1.1.5', port=6200, device='sdb',
  114             meta='', replication_ip='1.1.1.5', replication_port=6200, region=1
  115         ), dict(
  116             id=6, weight=10.0, zone=6, ip='1.1.1.6', port=6200, device='sdb',
  117             meta='', replication_ip='1.1.1.6', replication_port=6200, region=2
  118         )]
  119 
  120         def __init__(self, path, reload_time=15, ring_name=None):
  121             pass
  122 
  123         def get_part(self, account, container=None, obj=None):
  124             return 0
  125 
  126         def get_part_nodes(self, part):
  127             return self.devs[:3]
  128 
  129         def get_more_nodes(self, *args):
  130             return (d for d in self.devs[3:])
  131 
  132 
  133 class FakeProcess(object):
  134     def __init__(self, *codes):
  135         self.codes = iter(codes)
  136         self.args = None
  137         self.kwargs = None
  138 
  139     def __call__(self, *args, **kwargs):
  140         self.args = args
  141         self.kwargs = kwargs
  142 
  143         class Failure(object):
  144             def communicate(innerself):
  145                 next_item = next(self.codes)
  146                 if isinstance(next_item, int):
  147                     innerself.returncode = next_item
  148                     return next_item
  149                 raise next_item
  150         return Failure()
  151 
  152 
  153 @contextmanager
  154 def _mock_process(*args):
  155     orig_process = db_replicator.subprocess.Popen
  156     db_replicator.subprocess.Popen = FakeProcess(*args)
  157     yield db_replicator.subprocess.Popen
  158     db_replicator.subprocess.Popen = orig_process
  159 
  160 
  161 class ReplHttp(object):
  162     def __init__(self, response=None, set_status=200):
  163         if isinstance(response, six.text_type):
  164             response = response.encode('ascii')
  165         self.response = response
  166         self.set_status = set_status
  167     replicated = False
  168     host = 'localhost'
  169     node = {
  170         'ip': '127.0.0.1',
  171         'port': '6000',
  172         'device': 'sdb',
  173     }
  174 
  175     def replicate(self, *args):
  176         self.replicated = True
  177 
  178         class Response(object):
  179             status = self.set_status
  180             data = self.response
  181 
  182             def read(innerself):
  183                 return self.response
  184         return Response()
  185 
  186 
  187 class ChangingMtimesOs(object):
  188     def __init__(self):
  189         self.mtime = 0
  190 
  191     def __call__(self, *args, **kwargs):
  192         self.mtime += 1
  193         return self.mtime
  194 
  195 
  196 class FakeBroker(object):
  197     db_file = __file__
  198     get_repl_missing_table = False
  199     stub_replication_info = None
  200     db_type = 'container'
  201     db_contains_type = 'object'
  202     info = {'account': TEST_ACCOUNT_NAME, 'container': TEST_CONTAINER_NAME}
  203 
  204     def __init__(self, *args, **kwargs):
  205         self.locked = False
  206         self.metadata = {}
  207         return None
  208 
  209     @contextmanager
  210     def lock(self):
  211         self.locked = True
  212         yield True
  213         self.locked = False
  214 
  215     def get_sync(self, *args, **kwargs):
  216         return 5
  217 
  218     def get_syncs(self):
  219         return []
  220 
  221     def get_items_since(self, point, *args):
  222         if point == 0:
  223             return [{'ROWID': 1}]
  224         if point == -1:
  225             return [{'ROWID': 1}, {'ROWID': 2}]
  226         return []
  227 
  228     def merge_syncs(self, *args, **kwargs):
  229         self.args = args
  230 
  231     def merge_items(self, *args):
  232         self.args = args
  233 
  234     def get_replication_info(self):
  235         if self.get_repl_missing_table:
  236             raise Exception('no such table')
  237         info = dict(self.info)
  238         info.update({
  239             'hash': 12345,
  240             'delete_timestamp': 0,
  241             'put_timestamp': 1,
  242             'created_at': 1,
  243             'count': 0,
  244             'max_row': 99,
  245             'id': 'ID',
  246             'metadata': {}
  247         })
  248         if self.stub_replication_info:
  249             info.update(self.stub_replication_info)
  250         return info
  251 
  252     def get_max_row(self, table=None):
  253         return self.get_replication_info()['max_row']
  254 
  255     def is_reclaimable(self, now, reclaim_age):
  256         info = self.get_replication_info()
  257         return info['count'] == 0 and (
  258             (now - reclaim_age) >
  259             info['delete_timestamp'] >
  260             info['put_timestamp'])
  261 
  262     def get_other_replication_items(self):
  263         return None
  264 
  265     def reclaim(self, item_timestamp, sync_timestamp):
  266         pass
  267 
  268     def newid(self, remote_d):
  269         pass
  270 
  271     def update_metadata(self, metadata):
  272         self.metadata = metadata
  273 
  274     def merge_timestamps(self, created_at, put_timestamp, delete_timestamp):
  275         self.created_at = created_at
  276         self.put_timestamp = put_timestamp
  277         self.delete_timestamp = delete_timestamp
  278 
  279     def get_brokers(self):
  280         return [self]
  281 
  282 
  283 class FakeAccountBroker(FakeBroker):
  284     db_type = 'account'
  285     db_contains_type = 'container'
  286     info = {'account': TEST_ACCOUNT_NAME}
  287 
  288 
  289 class TestReplicator(db_replicator.Replicator):
  290     server_type = 'container'
  291     ring_file = 'container.ring.gz'
  292     brokerclass = FakeBroker
  293     datadir = DATADIR
  294     default_port = 1000
  295 
  296 
  297 class TestDBReplicator(unittest.TestCase):
  298     def setUp(self):
  299         db_replicator.ring = FakeRing()
  300         self.delete_db_calls = []
  301         self._patchers = []
  302         # recon cache path
  303         self.recon_cache = mkdtemp()
  304         rmtree(self.recon_cache, ignore_errors=1)
  305         os.mkdir(self.recon_cache)
  306         self.logger = unit.debug_logger('test-replicator')
  307 
  308     def tearDown(self):
  309         for patcher in self._patchers:
  310             patcher.stop()
  311         rmtree(self.recon_cache, ignore_errors=1)
  312 
  313     def _patch(self, patching_fn, *args, **kwargs):
  314         patcher = patching_fn(*args, **kwargs)
  315         patched_thing = patcher.start()
  316         self._patchers.append(patcher)
  317         return patched_thing
  318 
  319     def stub_delete_db(self, broker):
  320         self.delete_db_calls.append('/path/to/file')
  321         return True
  322 
  323     def test_creation(self):
  324         # later config should be extended to assert more config options
  325         replicator = TestReplicator({'node_timeout': '3.5'})
  326         self.assertEqual(replicator.node_timeout, 3.5)
  327         self.assertEqual(replicator.databases_per_second, 50)
  328 
  329     def test_repl_connection(self):
  330         node = {'replication_ip': '127.0.0.1', 'replication_port': 80,
  331                 'device': 'sdb1'}
  332         conn = db_replicator.ReplConnection(node, '1234567890', 'abcdefg',
  333                                             logging.getLogger())
  334 
  335         def req(method, path, body, headers):
  336             self.assertEqual(method, 'REPLICATE')
  337             self.assertEqual(headers['Content-Type'], 'application/json')
  338 
  339         class Resp(object):
  340             def read(self):
  341                 return 'data'
  342         resp = Resp()
  343         conn.request = req
  344         conn.getresponse = lambda *args: resp
  345         self.assertEqual(conn.replicate(1, 2, 3), resp)
  346 
  347         def other_req(method, path, body, headers):
  348             raise Exception('blah')
  349         conn.request = other_req
  350         self.assertIsNone(conn.replicate(1, 2, 3))
  351 
  352     def test_rsync_file(self):
  353         replicator = TestReplicator({})
  354         with _mock_process(-1):
  355             self.assertEqual(
  356                 False,
  357                 replicator._rsync_file('/some/file', 'remote:/some/file'))
  358         with _mock_process(0):
  359             self.assertEqual(
  360                 True,
  361                 replicator._rsync_file('/some/file', 'remote:/some/file'))
  362 
  363     def test_rsync_file_popen_args(self):
  364         replicator = TestReplicator({})
  365         with _mock_process(0) as process:
  366             replicator._rsync_file('/some/file', 'remote:/some_file')
  367             exp_args = ([
  368                 'rsync', '--quiet', '--no-motd',
  369                 '--timeout=%s' % int(math.ceil(replicator.node_timeout)),
  370                 '--contimeout=%s' % int(math.ceil(replicator.conn_timeout)),
  371                 '--whole-file', '/some/file', 'remote:/some_file'],)
  372             self.assertEqual(exp_args, process.args)
  373 
  374     def test_rsync_file_popen_args_whole_file_false(self):
  375         replicator = TestReplicator({})
  376         with _mock_process(0) as process:
  377             replicator._rsync_file('/some/file', 'remote:/some_file', False)
  378             exp_args = ([
  379                 'rsync', '--quiet', '--no-motd',
  380                 '--timeout=%s' % int(math.ceil(replicator.node_timeout)),
  381                 '--contimeout=%s' % int(math.ceil(replicator.conn_timeout)),
  382                 '/some/file', 'remote:/some_file'],)
  383             self.assertEqual(exp_args, process.args)
  384 
  385     def test_rsync_file_popen_args_different_region_and_rsync_compress(self):
  386         replicator = TestReplicator({})
  387         for rsync_compress in (False, True):
  388             replicator.rsync_compress = rsync_compress
  389             for different_region in (False, True):
  390                 with _mock_process(0) as process:
  391                     replicator._rsync_file('/some/file', 'remote:/some_file',
  392                                            False, different_region)
  393                     if rsync_compress and different_region:
  394                         # --compress arg should be passed to rsync binary
  395                         # only when rsync_compress option is enabled
  396                         # AND destination node is in a different
  397                         # region
  398                         self.assertTrue('--compress' in process.args[0])
  399                     else:
  400                         self.assertFalse('--compress' in process.args[0])
  401 
  402     def test_rsync_db(self):
  403         replicator = TestReplicator({})
  404         replicator._rsync_file = lambda *args, **kwargs: True
  405         fake_device = {'replication_ip': '127.0.0.1', 'device': 'sda1'}
  406         replicator._rsync_db(FakeBroker(), fake_device, ReplHttp(), 'abcd')
  407 
  408     def test_rsync_db_rsync_file_call(self):
  409         fake_device = {'ip': '127.0.0.1', 'port': '0',
  410                        'replication_ip': '127.0.0.1', 'replication_port': '0',
  411                        'device': 'sda1'}
  412 
  413         class MyTestReplicator(TestReplicator):
  414             def __init__(self, db_file, remote_file):
  415                 super(MyTestReplicator, self).__init__({})
  416                 self.db_file = db_file
  417                 self.remote_file = remote_file
  418 
  419             def _rsync_file(self_, db_file, remote_file, whole_file=True,
  420                             different_region=False):
  421                 self.assertEqual(self_.db_file, db_file)
  422                 self.assertEqual(self_.remote_file, remote_file)
  423                 self_._rsync_file_called = True
  424                 return False
  425 
  426         broker = FakeBroker()
  427         remote_file = '127.0.0.1::container/sda1/tmp/abcd'
  428         replicator = MyTestReplicator(broker.db_file, remote_file)
  429         replicator._rsync_db(broker, fake_device, ReplHttp(), 'abcd')
  430         self.assertTrue(replicator._rsync_file_called)
  431 
  432     def test_rsync_db_rsync_file_failure(self):
  433         class MyTestReplicator(TestReplicator):
  434             def __init__(self):
  435                 super(MyTestReplicator, self).__init__({})
  436                 self._rsync_file_called = False
  437 
  438             def _rsync_file(self_, *args, **kwargs):
  439                 self.assertEqual(
  440                     False, self_._rsync_file_called,
  441                     '_sync_file() should only be called once')
  442                 self_._rsync_file_called = True
  443                 return False
  444 
  445         with patch('os.path.exists', lambda *args: True):
  446             replicator = MyTestReplicator()
  447             fake_device = {'ip': '127.0.0.1', 'replication_ip': '127.0.0.1',
  448                            'device': 'sda1'}
  449             replicator._rsync_db(FakeBroker(), fake_device, ReplHttp(), 'abcd')
  450             self.assertEqual(True, replicator._rsync_file_called)
  451 
  452     def test_rsync_db_change_after_sync(self):
  453         class MyTestReplicator(TestReplicator):
  454             def __init__(self, broker):
  455                 super(MyTestReplicator, self).__init__({})
  456                 self.broker = broker
  457                 self._rsync_file_call_count = 0
  458 
  459             def _rsync_file(self_, db_file, remote_file, whole_file=True,
  460                             different_region=False):
  461                 self_._rsync_file_call_count += 1
  462                 if self_._rsync_file_call_count == 1:
  463                     self.assertEqual(True, whole_file)
  464                     self.assertEqual(False, self_.broker.locked)
  465                 elif self_._rsync_file_call_count == 2:
  466                     self.assertEqual(False, whole_file)
  467                     self.assertEqual(True, self_.broker.locked)
  468                 else:
  469                     raise RuntimeError('_rsync_file() called too many times')
  470                 return True
  471 
  472         # with journal file
  473         with patch('os.path.exists', lambda *args: True):
  474             broker = FakeBroker()
  475             replicator = MyTestReplicator(broker)
  476             fake_device = {'ip': '127.0.0.1', 'replication_ip': '127.0.0.1',
  477                            'device': 'sda1'}
  478             replicator._rsync_db(broker, fake_device, ReplHttp(), 'abcd')
  479             self.assertEqual(2, replicator._rsync_file_call_count)
  480 
  481         # with new mtime
  482         with patch('os.path.exists', lambda *args: False):
  483             with patch('os.path.getmtime', ChangingMtimesOs()):
  484                 broker = FakeBroker()
  485                 replicator = MyTestReplicator(broker)
  486                 fake_device = {'ip': '127.0.0.1',
  487                                'replication_ip': '127.0.0.1',
  488                                'device': 'sda1'}
  489                 replicator._rsync_db(broker, fake_device, ReplHttp(), 'abcd')
  490                 self.assertEqual(2, replicator._rsync_file_call_count)
  491 
  492     def test_in_sync(self):
  493         replicator = TestReplicator({})
  494         self.assertEqual(replicator._in_sync(
  495             {'id': 'a', 'point': 0, 'max_row': 0, 'hash': 'b'},
  496             {'id': 'a', 'point': -1, 'max_row': 0, 'hash': 'b'},
  497             FakeBroker(), -1), True)
  498         self.assertEqual(replicator._in_sync(
  499             {'id': 'a', 'point': -1, 'max_row': 0, 'hash': 'b'},
  500             {'id': 'a', 'point': -1, 'max_row': 10, 'hash': 'b'},
  501             FakeBroker(), -1), True)
  502         self.assertEqual(bool(replicator._in_sync(
  503             {'id': 'a', 'point': -1, 'max_row': 0, 'hash': 'c'},
  504             {'id': 'a', 'point': -1, 'max_row': 10, 'hash': 'd'},
  505             FakeBroker(), -1)), False)
  506 
  507     def test_run_once_no_local_device_in_ring(self):
  508         logger = unit.debug_logger('test-replicator')
  509         replicator = TestReplicator({'recon_cache_path': self.recon_cache},
  510                                     logger=logger)
  511         with patch('swift.common.db_replicator.whataremyips',
  512                    return_value=['127.0.0.1']):
  513             replicator.run_once()
  514         expected = [
  515             "Can't find itself 127.0.0.1 with port 1000 "
  516             "in ring file, not replicating",
  517         ]
  518         self.assertEqual(expected, logger.get_lines_for_level('error'))
  519 
  520     def test_run_once_with_local_device_in_ring(self):
  521         logger = unit.debug_logger('test-replicator')
  522         base = 'swift.common.db_replicator.'
  523         with patch(base + 'whataremyips', return_value=['1.1.1.1']), \
  524                 patch(base + 'ring', FakeRingWithNodes()):
  525             replicator = TestReplicator({'bind_port': 6200,
  526                                          'recon_cache_path': self.recon_cache},
  527                                         logger=logger)
  528             replicator.run_once()
  529         self.assertFalse(logger.get_lines_for_level('error'))
  530 
  531     def test_run_once_no_ips(self):
  532         replicator = TestReplicator({}, logger=unit.FakeLogger())
  533         self._patch(patch.object, db_replicator, 'whataremyips',
  534                     lambda *a, **kw: [])
  535 
  536         replicator.run_once()
  537 
  538         self.assertEqual(
  539             replicator.logger.log_dict['error'],
  540             [(('ERROR Failed to get my own IPs?',), {})])
  541 
  542     def test_run_once_node_is_not_mounted(self):
  543         db_replicator.ring = FakeRingWithSingleNode()
  544         # If a bind_ip is specified, it's plumbed into whataremyips() and
  545         # returned by itself.
  546         conf = {'mount_check': 'true', 'bind_ip': '1.1.1.1',
  547                 'bind_port': 6200}
  548         replicator = TestReplicator(conf, logger=unit.FakeLogger())
  549         self.assertEqual(replicator.mount_check, True)
  550         self.assertEqual(replicator.port, 6200)
  551 
  552         err = ValueError('Boom!')
  553 
  554         def mock_check_drive(root, device, mount_check):
  555             self.assertEqual(root, replicator.root)
  556             self.assertEqual(device, replicator.ring.devs[0]['device'])
  557             self.assertEqual(mount_check, True)
  558             raise err
  559 
  560         self._patch(patch.object, db_replicator, 'check_drive',
  561                     mock_check_drive)
  562         replicator.run_once()
  563 
  564         self.assertEqual(
  565             replicator.logger.log_dict['warning'],
  566             [(('Skipping: %s', (err, )), {})])
  567 
  568     def test_run_once_node_is_mounted(self):
  569         db_replicator.ring = FakeRingWithSingleNode()
  570         conf = {'mount_check': 'true', 'bind_port': 6200}
  571         replicator = TestReplicator(conf, logger=unit.FakeLogger())
  572         self.assertEqual(replicator.mount_check, True)
  573         self.assertEqual(replicator.port, 6200)
  574 
  575         def mock_unlink_older_than(path, mtime):
  576             self.assertEqual(path,
  577                              os.path.join(replicator.root,
  578                                           replicator.ring.devs[0]['device'],
  579                                           'tmp'))
  580             self.assertTrue(time.time() - replicator.reclaim_age >= mtime)
  581 
  582         def mock_spawn_n(fn, part, object_file, node_id):
  583             self.assertEqual('123', part)
  584             self.assertEqual('/srv/node/sda/c.db', object_file)
  585             self.assertEqual(1, node_id)
  586 
  587         self._patch(patch.object, db_replicator, 'whataremyips',
  588                     lambda *a, **kw: ['1.1.1.1'])
  589         self._patch(patch.object, db_replicator, 'unlink_older_than',
  590                     mock_unlink_older_than)
  591         self._patch(patch.object, db_replicator, 'roundrobin_datadirs',
  592                     lambda *args: [('123', '/srv/node/sda/c.db', 1)])
  593         self._patch(patch.object, replicator.cpool, 'spawn_n', mock_spawn_n)
  594 
  595         with patch('swift.common.db_replicator.os',
  596                    new=mock.MagicMock(wraps=os)) as mock_os, \
  597                 unit.mock_check_drive(ismount=True) as mocks:
  598             mock_os.path.isdir.return_value = True
  599             replicator.run_once()
  600             mock_os.path.isdir.assert_called_with(
  601                 os.path.join(replicator.root,
  602                              replicator.ring.devs[0]['device'],
  603                              replicator.datadir))
  604             self.assertEqual([
  605                 mock.call(os.path.join(
  606                     replicator.root,
  607                     replicator.ring.devs[0]['device'])),
  608             ], mocks['ismount'].call_args_list)
  609 
  610     def test_usync(self):
  611         fake_http = ReplHttp()
  612         replicator = TestReplicator({})
  613         replicator._usync_db(0, FakeBroker(), fake_http, '12345', '67890')
  614 
  615     def test_usync_http_error_above_300(self):
  616         fake_http = ReplHttp(set_status=301)
  617         replicator = TestReplicator({})
  618         self.assertFalse(
  619             replicator._usync_db(0, FakeBroker(), fake_http, '12345', '67890'))
  620 
  621     def test_usync_http_error_below_200(self):
  622         fake_http = ReplHttp(set_status=101)
  623         replicator = TestReplicator({})
  624         self.assertFalse(
  625             replicator._usync_db(0, FakeBroker(), fake_http, '12345', '67890'))
  626 
  627     @mock.patch('swift.common.db_replicator.dump_recon_cache')
  628     @mock.patch('swift.common.db_replicator.time.time', return_value=1234.5678)
  629     def test_stats(self, mock_time, mock_recon_cache):
  630         logger = unit.debug_logger('test-replicator')
  631         replicator = TestReplicator({}, logger=logger)
  632         replicator._zero_stats()
  633         self.assertEqual(replicator.stats['start'], mock_time.return_value)
  634         replicator._report_stats()
  635         self.assertEqual(logger.get_lines_for_level('info'), [
  636             'Attempted to replicate 0 dbs in 0.00000 seconds (0.00000/s)',
  637             'Removed 0 dbs',
  638             '0 successes, 0 failures',
  639             'diff:0 diff_capped:0 empty:0 hashmatch:0 no_change:0 '
  640             'remote_merge:0 rsync:0 ts_repl:0',
  641         ])
  642         self.assertEqual(1, len(mock_recon_cache.mock_calls))
  643         self.assertEqual(mock_recon_cache.mock_calls[0][1][0], {
  644             'replication_time': 0.0,
  645             'replication_last': mock_time.return_value,
  646             'replication_stats': replicator.stats,
  647         })
  648 
  649         mock_recon_cache.reset_mock()
  650         logger.clear()
  651         replicator.stats.update({
  652             'attempted': 30,
  653             'success': 25,
  654             'remove': 9,
  655             'failure': 1,
  656 
  657             'diff': 5,
  658             'diff_capped': 4,
  659             'empty': 7,
  660             'hashmatch': 8,
  661             'no_change': 6,
  662             'remote_merge': 2,
  663             'rsync': 3,
  664             'ts_repl': 10,
  665         })
  666         mock_time.return_value += 246.813576
  667         replicator._report_stats()
  668         self.maxDiff = None
  669         self.assertEqual(logger.get_lines_for_level('info'), [
  670             'Attempted to replicate 30 dbs in 246.81358 seconds (0.12155/s)',
  671             'Removed 9 dbs',
  672             '25 successes, 1 failures',
  673             'diff:5 diff_capped:4 empty:7 hashmatch:8 no_change:6 '
  674             'remote_merge:2 rsync:3 ts_repl:10',
  675         ])
  676         self.assertEqual(1, len(mock_recon_cache.mock_calls))
  677         self.assertEqual(mock_recon_cache.mock_calls[0][1][0], {
  678             'replication_time': 246.813576,
  679             'replication_last': mock_time.return_value,
  680             'replication_stats': replicator.stats,
  681         })
  682 
  683     def test_replicate_object(self):
  684         # verify return values from replicate_object
  685         db_replicator.ring = FakeRingWithNodes()
  686         db_path = '/path/to/file'
  687         replicator = TestReplicator({}, logger=FakeLogger())
  688         info = FakeBroker().get_replication_info()
  689         # make remote appear to be in sync
  690         rinfo = {'point': info['max_row'], 'id': 'remote_id'}
  691 
  692         class FakeResponse(object):
  693             def __init__(self, status, rinfo):
  694                 self._status = status
  695                 self.data = json.dumps(rinfo).encode('ascii')
  696 
  697             @property
  698             def status(self):
  699                 if isinstance(self._status, (Exception, eventlet.Timeout)):
  700                     raise self._status
  701                 return self._status
  702 
  703         # all requests fail
  704         replicate = 'swift.common.db_replicator.ReplConnection.replicate'
  705         with mock.patch(replicate) as fake_replicate:
  706             fake_replicate.side_effect = [
  707                 FakeResponse(500, None),
  708                 FakeResponse(500, None),
  709                 FakeResponse(500, None)]
  710             with mock.patch.object(replicator, 'delete_db') as mock_delete:
  711                 res = replicator._replicate_object('0', db_path, 'node_id')
  712         self.assertRaises(StopIteration, next, fake_replicate.side_effect)
  713         self.assertEqual((False, [False, False, False]), res)
  714         self.assertEqual(0, mock_delete.call_count)
  715         self.assertFalse(replicator.logger.get_lines_for_level('error'))
  716         self.assertFalse(replicator.logger.get_lines_for_level('warning'))
  717         replicator.logger.clear()
  718 
  719         with mock.patch(replicate) as fake_replicate:
  720             fake_replicate.side_effect = [
  721                 FakeResponse(Exception('ugh'), None),
  722                 FakeResponse(eventlet.Timeout(), None),
  723                 FakeResponse(200, rinfo)]
  724             with mock.patch.object(replicator, 'delete_db') as mock_delete:
  725                 res = replicator._replicate_object('0', db_path, 'node_id')
  726         self.assertRaises(StopIteration, next, fake_replicate.side_effect)
  727         self.assertEqual((False, [False, False, True]), res)
  728         self.assertEqual(0, mock_delete.call_count)
  729         lines = replicator.logger.get_lines_for_level('error')
  730         self.assertIn('ERROR syncing', lines[0])
  731         self.assertIn('ERROR syncing', lines[1])
  732         self.assertFalse(lines[2:])
  733         self.assertFalse(replicator.logger.get_lines_for_level('warning'))
  734         replicator.logger.clear()
  735 
  736         # partial success
  737         with mock.patch(replicate) as fake_replicate:
  738             fake_replicate.side_effect = [
  739                 FakeResponse(200, rinfo),
  740                 FakeResponse(200, rinfo),
  741                 FakeResponse(500, None)]
  742             with mock.patch.object(replicator, 'delete_db') as mock_delete:
  743                 res = replicator._replicate_object('0', db_path, 'node_id')
  744         self.assertRaises(StopIteration, next, fake_replicate.side_effect)
  745         self.assertEqual((False, [True, True, False]), res)
  746         self.assertEqual(0, mock_delete.call_count)
  747         self.assertFalse(replicator.logger.get_lines_for_level('error'))
  748         self.assertFalse(replicator.logger.get_lines_for_level('warning'))
  749         replicator.logger.clear()
  750 
  751         # 507 triggers additional requests
  752         with mock.patch(replicate) as fake_replicate:
  753             fake_replicate.side_effect = [
  754                 FakeResponse(200, rinfo),
  755                 FakeResponse(200, rinfo),
  756                 FakeResponse(507, None),
  757                 FakeResponse(507, None),
  758                 FakeResponse(200, rinfo)]
  759             with mock.patch.object(replicator, 'delete_db') as mock_delete:
  760                 res = replicator._replicate_object('0', db_path, 'node_id')
  761         self.assertRaises(StopIteration, next, fake_replicate.side_effect)
  762         self.assertEqual((False, [True, True, False, False, True]), res)
  763         self.assertEqual(0, mock_delete.call_count)
  764         lines = replicator.logger.get_lines_for_level('error')
  765         self.assertIn('Remote drive not mounted', lines[0])
  766         self.assertIn('Remote drive not mounted', lines[1])
  767         self.assertFalse(lines[2:])
  768         self.assertFalse(replicator.logger.get_lines_for_level('warning'))
  769         replicator.logger.clear()
  770 
  771         # all requests succeed; node id == 'node_id' causes node to be
  772         # considered a handoff so expect the db to be deleted
  773         with mock.patch(replicate) as fake_replicate:
  774             fake_replicate.side_effect = [
  775                 FakeResponse(200, rinfo),
  776                 FakeResponse(200, rinfo),
  777                 FakeResponse(200, rinfo)]
  778             with mock.patch.object(replicator, 'delete_db') as mock_delete:
  779                 res = replicator._replicate_object('0', db_path, 'node_id')
  780         self.assertRaises(StopIteration, next, fake_replicate.side_effect)
  781         self.assertEqual((True, [True, True, True]), res)
  782         self.assertEqual(1, mock_delete.call_count)
  783         self.assertFalse(replicator.logger.get_lines_for_level('error'))
  784         self.assertFalse(replicator.logger.get_lines_for_level('warning'))
  785 
  786     def test_replicate_object_quarantine(self):
  787         replicator = TestReplicator({})
  788         self._patch(patch.object, replicator.brokerclass, 'db_file',
  789                     '/a/b/c/d/e/hey')
  790         self._patch(patch.object, replicator.brokerclass,
  791                     'get_repl_missing_table', True)
  792 
  793         def mock_renamer(was, new, fsync=False, cause_colision=False):
  794             if cause_colision and '-' not in new:
  795                 raise OSError(errno.EEXIST, "File already exists")
  796             self.assertEqual('/a/b/c/d/e', was)
  797             if '-' in new:
  798                 self.assertTrue(
  799                     new.startswith('/a/quarantined/containers/e-'))
  800             else:
  801                 self.assertEqual('/a/quarantined/containers/e', new)
  802 
  803         def mock_renamer_error(was, new, fsync):
  804             return mock_renamer(was, new, fsync, cause_colision=True)
  805         with patch.object(db_replicator, 'renamer', mock_renamer):
  806             replicator._replicate_object('0', 'file', 'node_id')
  807         # try the double quarantine
  808         with patch.object(db_replicator, 'renamer', mock_renamer_error):
  809             replicator._replicate_object('0', 'file', 'node_id')
  810 
  811     def test_replicate_object_delete_because_deleted(self):
  812         replicator = TestReplicator({})
  813         try:
  814             replicator.delete_db = self.stub_delete_db
  815             replicator.brokerclass.stub_replication_info = {
  816                 'delete_timestamp': 2, 'put_timestamp': 1}
  817             replicator._replicate_object('0', '/path/to/file', 'node_id')
  818         finally:
  819             replicator.brokerclass.stub_replication_info = None
  820         self.assertEqual(['/path/to/file'], self.delete_db_calls)
  821 
  822     def test_replicate_object_delete_because_not_shouldbehere(self):
  823         replicator = TestReplicator({})
  824         replicator.ring = FakeRingWithNodes().Ring('path')
  825         replicator.brokerclass = FakeAccountBroker
  826         replicator._repl_to_node = lambda *args: True
  827         replicator.delete_db = self.stub_delete_db
  828         orig_cleanup = replicator.cleanup_post_replicate
  829         with mock.patch.object(replicator, 'cleanup_post_replicate',
  830                                side_effect=orig_cleanup) as mock_cleanup:
  831             replicator._replicate_object('0', '/path/to/file', 'node_id')
  832         mock_cleanup.assert_called_once_with(mock.ANY, mock.ANY, [True] * 3)
  833         self.assertIsInstance(mock_cleanup.call_args[0][0],
  834                               replicator.brokerclass)
  835         self.assertEqual(['/path/to/file'], self.delete_db_calls)
  836         self.assertEqual(0, replicator.stats['failure'])
  837 
  838     def test_replicate_object_delete_delegated_to_cleanup_post_replicate(self):
  839         replicator = TestReplicator({})
  840         replicator.ring = FakeRingWithNodes().Ring('path')
  841         replicator.brokerclass = FakeAccountBroker
  842         replicator._repl_to_node = lambda *args: True
  843         replicator.delete_db = self.stub_delete_db
  844 
  845         # cleanup succeeds
  846         with mock.patch.object(replicator, 'cleanup_post_replicate',
  847                                return_value=True) as mock_cleanup:
  848             replicator._replicate_object('0', '/path/to/file', 'node_id')
  849         mock_cleanup.assert_called_once_with(mock.ANY, mock.ANY, [True] * 3)
  850         self.assertIsInstance(mock_cleanup.call_args[0][0],
  851                               replicator.brokerclass)
  852         self.assertFalse(self.delete_db_calls)
  853         self.assertEqual(0, replicator.stats['failure'])
  854         self.assertEqual(3, replicator.stats['success'])
  855 
  856         # cleanup fails
  857         replicator._zero_stats()
  858         with mock.patch.object(replicator, 'cleanup_post_replicate',
  859                                return_value=False) as mock_cleanup:
  860             replicator._replicate_object('0', '/path/to/file', 'node_id')
  861         mock_cleanup.assert_called_once_with(mock.ANY, mock.ANY, [True] * 3)
  862         self.assertIsInstance(mock_cleanup.call_args[0][0],
  863                               replicator.brokerclass)
  864         self.assertFalse(self.delete_db_calls)
  865         self.assertEqual(3, replicator.stats['failure'])
  866         self.assertEqual(0, replicator.stats['success'])
  867 
  868         # shouldbehere True - cleanup not required
  869         replicator._zero_stats()
  870         primary_node_id = replicator.ring.get_part_nodes('0')[0]['id']
  871         with mock.patch.object(replicator, 'cleanup_post_replicate',
  872                                return_value=True) as mock_cleanup:
  873             replicator._replicate_object('0', '/path/to/file', primary_node_id)
  874         mock_cleanup.assert_not_called()
  875         self.assertFalse(self.delete_db_calls)
  876         self.assertEqual(0, replicator.stats['failure'])
  877         self.assertEqual(2, replicator.stats['success'])
  878 
  879     def test_cleanup_post_replicate(self):
  880         replicator = TestReplicator({}, logger=self.logger)
  881         replicator.ring = FakeRingWithNodes().Ring('path')
  882         broker = FakeBroker()
  883         replicator._repl_to_node = lambda *args: True
  884         info = broker.get_replication_info()
  885 
  886         with mock.patch.object(replicator, 'delete_db') as mock_delete_db:
  887             res = replicator.cleanup_post_replicate(
  888                 broker, info, [False] * 3)
  889         mock_delete_db.assert_not_called()
  890         self.assertTrue(res)
  891         self.assertEqual(['Not deleting db %s (0/3 success)' % broker.db_file],
  892                          replicator.logger.get_lines_for_level('debug'))
  893         replicator.logger.clear()
  894 
  895         with mock.patch.object(replicator, 'delete_db') as mock_delete_db:
  896             res = replicator.cleanup_post_replicate(
  897                 broker, info, [True, False, True])
  898         mock_delete_db.assert_not_called()
  899         self.assertTrue(res)
  900         self.assertEqual(['Not deleting db %s (2/3 success)' % broker.db_file],
  901                          replicator.logger.get_lines_for_level('debug'))
  902         replicator.logger.clear()
  903 
  904         broker.stub_replication_info = {'max_row': 101}
  905         with mock.patch.object(replicator, 'delete_db') as mock_delete_db:
  906             res = replicator.cleanup_post_replicate(
  907                 broker, info, [True] * 3)
  908         mock_delete_db.assert_not_called()
  909         self.assertTrue(res)
  910         self.assertEqual(['Not deleting db %s (2 new rows)' % broker.db_file],
  911                          replicator.logger.get_lines_for_level('debug'))
  912         replicator.logger.clear()
  913 
  914         broker.stub_replication_info = {'max_row': 98}
  915         with mock.patch.object(replicator, 'delete_db') as mock_delete_db:
  916             res = replicator.cleanup_post_replicate(
  917                 broker, info, [True] * 3)
  918         mock_delete_db.assert_not_called()
  919         self.assertTrue(res)
  920         broker.stub_replication_info = None
  921         self.assertEqual(['Not deleting db %s (negative max_row_delta: -1)' %
  922                           broker.db_file],
  923                          replicator.logger.get_lines_for_level('error'))
  924         replicator.logger.clear()
  925 
  926         with mock.patch.object(replicator, 'delete_db') as mock_delete_db:
  927             res = replicator.cleanup_post_replicate(
  928                 broker, info, [True] * 3)
  929         mock_delete_db.assert_called_once_with(broker)
  930         self.assertTrue(res)
  931         self.assertEqual(['Successfully deleted db %s' % broker.db_file],
  932                          replicator.logger.get_lines_for_level('debug'))
  933         replicator.logger.clear()
  934 
  935         with mock.patch.object(replicator, 'delete_db',
  936                                return_value=False) as mock_delete_db:
  937             res = replicator.cleanup_post_replicate(
  938                 broker, info, [True] * 3)
  939         mock_delete_db.assert_called_once_with(broker)
  940         self.assertFalse(res)
  941         self.assertEqual(['Failed to delete db %s' % broker.db_file],
  942                          replicator.logger.get_lines_for_level('debug'))
  943         replicator.logger.clear()
  944 
  945     def test_replicate_object_with_exception(self):
  946         replicator = TestReplicator({})
  947         replicator.ring = FakeRingWithNodes().Ring('path')
  948         replicator.brokerclass = FakeAccountBroker
  949         replicator.delete_db = self.stub_delete_db
  950         replicator._repl_to_node = mock.Mock(side_effect=Exception())
  951         replicator._replicate_object('0', '/path/to/file',
  952                                      replicator.ring.devs[0]['id'])
  953         self.assertEqual(2, replicator._repl_to_node.call_count)
  954         # with one DriveNotMounted exception called on +1 more replica
  955         replicator._repl_to_node = mock.Mock(side_effect=[DriveNotMounted()])
  956         replicator._replicate_object('0', '/path/to/file',
  957                                      replicator.ring.devs[0]['id'])
  958         self.assertEqual(3, replicator._repl_to_node.call_count)
  959         # called on +1 more replica and self when *first* handoff
  960         replicator._repl_to_node = mock.Mock(side_effect=[DriveNotMounted()])
  961         replicator._replicate_object('0', '/path/to/file',
  962                                      replicator.ring.devs[3]['id'])
  963         self.assertEqual(4, replicator._repl_to_node.call_count)
  964         # even if it's the last handoff it works to keep 3 replicas
  965         # 2 primaries + 1 handoff
  966         replicator._repl_to_node = mock.Mock(side_effect=[DriveNotMounted()])
  967         replicator._replicate_object('0', '/path/to/file',
  968                                      replicator.ring.devs[-1]['id'])
  969         self.assertEqual(4, replicator._repl_to_node.call_count)
  970         # with two DriveNotMounted exceptions called on +2 more replica keeping
  971         # durability
  972         replicator._repl_to_node = mock.Mock(
  973             side_effect=[DriveNotMounted()] * 2)
  974         replicator._replicate_object('0', '/path/to/file',
  975                                      replicator.ring.devs[0]['id'])
  976         self.assertEqual(4, replicator._repl_to_node.call_count)
  977 
  978     def test_replicate_object_with_exception_run_out_of_nodes(self):
  979         replicator = TestReplicator({})
  980         replicator.ring = FakeRingWithNodes().Ring('path')
  981         replicator.brokerclass = FakeAccountBroker
  982         replicator.delete_db = self.stub_delete_db
  983         # all other devices are not mounted
  984         replicator._repl_to_node = mock.Mock(side_effect=DriveNotMounted())
  985         replicator._replicate_object('0', '/path/to/file',
  986                                      replicator.ring.devs[0]['id'])
  987         self.assertEqual(5, replicator._repl_to_node.call_count)
  988 
  989     def test_replicate_account_out_of_place(self):
  990         replicator = TestReplicator({}, logger=unit.FakeLogger())
  991         replicator.ring = FakeRingWithNodes().Ring('path')
  992         replicator.brokerclass = FakeAccountBroker
  993         replicator._repl_to_node = lambda *args: True
  994         replicator.delete_db = self.stub_delete_db
  995         # Correct node_id, wrong part
  996         part = replicator.ring.get_part(TEST_ACCOUNT_NAME) + 1
  997         node_id = replicator.ring.get_part_nodes(part)[0]['id']
  998         replicator._replicate_object(str(part), '/path/to/file', node_id)
  999         self.assertEqual(['/path/to/file'], self.delete_db_calls)
 1000         error_msgs = replicator.logger.get_lines_for_level('error')
 1001         expected = 'Found /path/to/file for /a%20c%20t when it should be ' \
 1002             'on partition 0; will replicate out and remove.'
 1003         self.assertEqual(error_msgs, [expected])
 1004 
 1005     def test_replicate_container_out_of_place(self):
 1006         replicator = TestReplicator({}, logger=unit.FakeLogger())
 1007         replicator.ring = FakeRingWithNodes().Ring('path')
 1008         replicator._repl_to_node = lambda *args: True
 1009         replicator.delete_db = self.stub_delete_db
 1010         # Correct node_id, wrong part
 1011         part = replicator.ring.get_part(
 1012             TEST_ACCOUNT_NAME, TEST_CONTAINER_NAME) + 1
 1013         node_id = replicator.ring.get_part_nodes(part)[0]['id']
 1014         replicator._replicate_object(str(part), '/path/to/file', node_id)
 1015         self.assertEqual(['/path/to/file'], self.delete_db_calls)
 1016         self.assertEqual(
 1017             replicator.logger.log_dict['error'],
 1018             [(('Found /path/to/file for /a%20c%20t/c%20o%20n when it should '
 1019                'be on partition 0; will replicate out and remove.',), {})])
 1020 
 1021     def test_replicate_container_out_of_place_no_node(self):
 1022         replicator = TestReplicator({}, logger=unit.FakeLogger())
 1023         replicator.ring = FakeRingWithSingleNode().Ring('path')
 1024         replicator._repl_to_node = lambda *args: True
 1025 
 1026         replicator.delete_db = self.stub_delete_db
 1027         # Correct node_id, wrong part
 1028         part = replicator.ring.get_part(
 1029             TEST_ACCOUNT_NAME, TEST_CONTAINER_NAME) + 1
 1030         node_id = replicator.ring.get_part_nodes(part)[0]['id']
 1031         replicator._replicate_object(str(part), '/path/to/file', node_id)
 1032         self.assertEqual(['/path/to/file'], self.delete_db_calls)
 1033 
 1034         self.delete_db_calls = []
 1035 
 1036         # No nodes this time
 1037         replicator.ring.get_part_nodes = lambda *args: []
 1038         replicator.delete_db = self.stub_delete_db
 1039         # Correct node_id, wrong part
 1040         part = replicator.ring.get_part(
 1041             TEST_ACCOUNT_NAME, TEST_CONTAINER_NAME) + 1
 1042         replicator._replicate_object(str(part), '/path/to/file', node_id)
 1043         self.assertEqual([], self.delete_db_calls)
 1044 
 1045     def test_replicate_object_different_region(self):
 1046         db_replicator.ring = FakeRingWithNodes()
 1047         replicator = TestReplicator({})
 1048         replicator._repl_to_node = mock.Mock()
 1049         # For node_id = 1, one replica in same region(1) and other is in a
 1050         # different region(2). Refer: FakeRingWithNodes
 1051         replicator._replicate_object('0', '/path/to/file', 1)
 1052         # different_region was set True and passed to _repl_to_node()
 1053         self.assertEqual(replicator._repl_to_node.call_args_list[0][0][-1],
 1054                          True)
 1055         # different_region was set False and passed to _repl_to_node()
 1056         self.assertEqual(replicator._repl_to_node.call_args_list[1][0][-1],
 1057                          False)
 1058 
 1059     def test_delete_db(self):
 1060         db_replicator.lock_parent_directory = lock_parent_directory
 1061         replicator = TestReplicator({}, logger=unit.FakeLogger())
 1062         replicator._zero_stats()
 1063         replicator.extract_device = lambda _: 'some_device'
 1064 
 1065         temp_dir = mkdtemp()
 1066         try:
 1067             temp_suf_dir = os.path.join(temp_dir, '16e')
 1068             os.mkdir(temp_suf_dir)
 1069             temp_hash_dir = os.path.join(temp_suf_dir,
 1070                                          '166e33924a08ede4204871468c11e16e')
 1071             os.mkdir(temp_hash_dir)
 1072             temp_file = NamedTemporaryFile(dir=temp_hash_dir, delete=False)
 1073             temp_hash_dir2 = os.path.join(temp_suf_dir,
 1074                                           '266e33924a08ede4204871468c11e16e')
 1075             os.mkdir(temp_hash_dir2)
 1076             temp_file2 = NamedTemporaryFile(dir=temp_hash_dir2, delete=False)
 1077 
 1078             # sanity-checks
 1079             self.assertTrue(os.path.exists(temp_dir))
 1080             self.assertTrue(os.path.exists(temp_suf_dir))
 1081             self.assertTrue(os.path.exists(temp_hash_dir))
 1082             self.assertTrue(os.path.exists(temp_file.name))
 1083             self.assertTrue(os.path.exists(temp_hash_dir2))
 1084             self.assertTrue(os.path.exists(temp_file2.name))
 1085             self.assertEqual(0, replicator.stats['remove'])
 1086 
 1087             temp_file.db_file = temp_file.name
 1088             replicator.delete_db(temp_file)
 1089 
 1090             self.assertTrue(os.path.exists(temp_dir))
 1091             self.assertTrue(os.path.exists(temp_suf_dir))
 1092             self.assertFalse(os.path.exists(temp_hash_dir))
 1093             self.assertFalse(os.path.exists(temp_file.name))
 1094             self.assertTrue(os.path.exists(temp_hash_dir2))
 1095             self.assertTrue(os.path.exists(temp_file2.name))
 1096             self.assertEqual([(('removes.some_device',), {})],
 1097                              replicator.logger.log_dict['increment'])
 1098             self.assertEqual(1, replicator.stats['remove'])
 1099 
 1100             temp_file2.db_file = temp_file2.name
 1101             replicator.delete_db(temp_file2)
 1102 
 1103             self.assertTrue(os.path.exists(temp_dir))
 1104             self.assertFalse(os.path.exists(temp_suf_dir))
 1105             self.assertFalse(os.path.exists(temp_hash_dir))
 1106             self.assertFalse(os.path.exists(temp_file.name))
 1107             self.assertFalse(os.path.exists(temp_hash_dir2))
 1108             self.assertFalse(os.path.exists(temp_file2.name))
 1109             self.assertEqual([(('removes.some_device',), {})] * 2,
 1110                              replicator.logger.log_dict['increment'])
 1111             self.assertEqual(2, replicator.stats['remove'])
 1112         finally:
 1113             rmtree(temp_dir)
 1114 
 1115     def test_extract_device(self):
 1116         replicator = TestReplicator({'devices': '/some/root'})
 1117         self.assertEqual('some_device', replicator.extract_device(
 1118             '/some/root/some_device/deeper/and/deeper'))
 1119         self.assertEqual('UNKNOWN', replicator.extract_device(
 1120             '/some/foo/some_device/deeper/and/deeper'))
 1121 
 1122     def test_dispatch_no_arg_pop(self):
 1123         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1124                                           mount_check=False)
 1125         with unit.mock_check_drive(isdir=True):
 1126             response = rpc.dispatch(('a',), 'arg')
 1127         self.assertEqual(b'Invalid object type', response.body)
 1128         self.assertEqual(400, response.status_int)
 1129 
 1130     def test_dispatch_drive_not_mounted(self):
 1131         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1132                                           mount_check=True)
 1133 
 1134         with unit.mock_check_drive() as mocks:
 1135             response = rpc.dispatch(('drive', 'part', 'hash'), ['method'])
 1136         self.assertEqual([mock.call(os.path.join('/drive'))],
 1137                          mocks['ismount'].call_args_list)
 1138 
 1139         self.assertEqual('507 drive is not mounted', response.status)
 1140         self.assertEqual(507, response.status_int)
 1141 
 1142     def test_dispatch_unexpected_operation_db_does_not_exist(self):
 1143         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1144                                           mount_check=False)
 1145 
 1146         def mock_mkdirs(path):
 1147             self.assertEqual('/drive/tmp', path)
 1148 
 1149         self._patch(patch.object, db_replicator, 'mkdirs', mock_mkdirs)
 1150 
 1151         with patch('swift.common.db_replicator.os',
 1152                    new=mock.MagicMock(wraps=os)) as mock_os, \
 1153                 unit.mock_check_drive(isdir=True):
 1154             mock_os.path.exists.return_value = False
 1155             response = rpc.dispatch(('drive', 'part', 'hash'), ['unexpected'])
 1156 
 1157         self.assertEqual('404 Not Found', response.status)
 1158         self.assertEqual(404, response.status_int)
 1159 
 1160     def test_dispatch_operation_unexpected(self):
 1161         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1162                                           mount_check=False)
 1163 
 1164         self._patch(patch.object, db_replicator, 'mkdirs', lambda *args: True)
 1165 
 1166         def unexpected_method(broker, args):
 1167             self.assertEqual(FakeBroker, broker.__class__)
 1168             self.assertEqual(['arg1', 'arg2'], args)
 1169             return 'unexpected-called'
 1170 
 1171         rpc.unexpected = unexpected_method
 1172 
 1173         with patch('swift.common.db_replicator.os',
 1174                    new=mock.MagicMock(wraps=os)) as mock_os, \
 1175                 unit.mock_check_drive(isdir=True):
 1176             mock_os.path.exists.return_value = True
 1177             response = rpc.dispatch(('drive', 'part', 'hash'),
 1178                                     ['unexpected', 'arg1', 'arg2'])
 1179             mock_os.path.exists.assert_called_with('/part/ash/hash/hash.db')
 1180 
 1181         self.assertEqual('unexpected-called', response)
 1182 
 1183     def test_dispatch_operation_rsync_then_merge(self):
 1184         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1185                                           mount_check=False)
 1186 
 1187         self._patch(patch.object, db_replicator, 'renamer', lambda *args: True)
 1188 
 1189         with patch('swift.common.db_replicator.os',
 1190                    new=mock.MagicMock(wraps=os)) as mock_os, \
 1191                 unit.mock_check_drive(isdir=True):
 1192             mock_os.path.exists.return_value = True
 1193             response = rpc.dispatch(('drive', 'part', 'hash'),
 1194                                     ['rsync_then_merge', 'arg1', 'arg2'])
 1195             expected_calls = [call('/part/ash/hash/hash.db'),
 1196                               call('/drive/tmp/arg1'),
 1197                               call(FakeBroker.db_file),
 1198                               call('/drive/tmp/arg1')]
 1199             self.assertEqual(mock_os.path.exists.call_args_list,
 1200                              expected_calls)
 1201             self.assertEqual('204 No Content', response.status)
 1202             self.assertEqual(204, response.status_int)
 1203 
 1204     def test_dispatch_operation_complete_rsync(self):
 1205         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1206                                           mount_check=False)
 1207 
 1208         self._patch(patch.object, db_replicator, 'renamer', lambda *args: True)
 1209 
 1210         with patch('swift.common.db_replicator.os',
 1211                    new=mock.MagicMock(wraps=os)) as mock_os, \
 1212                 unit.mock_check_drive(isdir=True):
 1213             mock_os.path.exists.side_effect = [False, True]
 1214             response = rpc.dispatch(('drive', 'part', 'hash'),
 1215                                     ['complete_rsync', 'arg1'])
 1216             expected_calls = [call('/part/ash/hash/hash.db'),
 1217                               call('/drive/tmp/arg1')]
 1218             self.assertEqual(mock_os.path.exists.call_args_list,
 1219                              expected_calls)
 1220             self.assertEqual('204 No Content', response.status)
 1221             self.assertEqual(204, response.status_int)
 1222 
 1223         with patch('swift.common.db_replicator.os',
 1224                    new=mock.MagicMock(wraps=os)) as mock_os, \
 1225                 unit.mock_check_drive(isdir=True):
 1226             mock_os.path.exists.side_effect = [False, True]
 1227             response = rpc.dispatch(('drive', 'part', 'hash'),
 1228                                     ['complete_rsync', 'arg1', 'arg2'])
 1229             expected_calls = [call('/part/ash/hash/arg2'),
 1230                               call('/drive/tmp/arg1')]
 1231             self.assertEqual(mock_os.path.exists.call_args_list,
 1232                              expected_calls)
 1233             self.assertEqual('204 No Content', response.status)
 1234             self.assertEqual(204, response.status_int)
 1235 
 1236     def test_rsync_then_merge_db_does_not_exist(self):
 1237         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1238                                           mount_check=False)
 1239 
 1240         with patch('swift.common.db_replicator.os',
 1241                    new=mock.MagicMock(wraps=os)) as mock_os, \
 1242                 unit.mock_check_drive(isdir=True):
 1243             mock_os.path.exists.return_value = False
 1244             response = rpc.rsync_then_merge('drive', '/data/db.db',
 1245                                             ('arg1', 'arg2'))
 1246             mock_os.path.exists.assert_called_with('/data/db.db')
 1247             self.assertEqual('404 Not Found', response.status)
 1248             self.assertEqual(404, response.status_int)
 1249 
 1250     def test_rsync_then_merge_old_does_not_exist(self):
 1251         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1252                                           mount_check=False)
 1253 
 1254         with patch('swift.common.db_replicator.os',
 1255                    new=mock.MagicMock(wraps=os)) as mock_os, \
 1256                 unit.mock_check_drive(isdir=True):
 1257             mock_os.path.exists.side_effect = [True, False]
 1258             response = rpc.rsync_then_merge('drive', '/data/db.db',
 1259                                             ('arg1', 'arg2'))
 1260             expected_calls = [call('/data/db.db'), call('/drive/tmp/arg1')]
 1261             self.assertEqual(mock_os.path.exists.call_args_list,
 1262                              expected_calls)
 1263             self.assertEqual('404 Not Found', response.status)
 1264             self.assertEqual(404, response.status_int)
 1265 
 1266     def test_rsync_then_merge_with_objects(self):
 1267         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1268                                           mount_check=False)
 1269 
 1270         def mock_renamer(old, new):
 1271             self.assertEqual('/drive/tmp/arg1', old)
 1272             # FakeBroker uses module filename as db_file!
 1273             self.assertEqual(__file__, new)
 1274 
 1275         self._patch(patch.object, db_replicator, 'renamer', mock_renamer)
 1276 
 1277         with patch('swift.common.db_replicator.os',
 1278                    new=mock.MagicMock(wraps=os)) as mock_os, \
 1279                 unit.mock_check_drive(isdir=True):
 1280             mock_os.path.exists.return_value = True
 1281             response = rpc.rsync_then_merge('drive', '/data/db.db',
 1282                                             ['arg1', 'arg2'])
 1283             self.assertEqual('204 No Content', response.status)
 1284             self.assertEqual(204, response.status_int)
 1285 
 1286     def test_complete_rsync_db_exists(self):
 1287         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1288                                           mount_check=False)
 1289 
 1290         with patch('swift.common.db_replicator.os',
 1291                    new=mock.MagicMock(wraps=os)) as mock_os, \
 1292                 unit.mock_check_drive(isdir=True):
 1293             mock_os.path.exists.return_value = True
 1294             response = rpc.complete_rsync('drive', '/data/db.db', ['arg1'])
 1295             mock_os.path.exists.assert_called_with('/data/db.db')
 1296             self.assertEqual('404 Not Found', response.status)
 1297             self.assertEqual(404, response.status_int)
 1298 
 1299         with patch('swift.common.db_replicator.os',
 1300                    new=mock.MagicMock(wraps=os)) as mock_os, \
 1301                 unit.mock_check_drive(isdir=True):
 1302             mock_os.path.exists.return_value = True
 1303             response = rpc.complete_rsync('drive', '/data/db.db',
 1304                                           ['arg1', 'arg2'])
 1305             mock_os.path.exists.assert_called_with('/data/arg2')
 1306             self.assertEqual('404 Not Found', response.status)
 1307             self.assertEqual(404, response.status_int)
 1308 
 1309     def test_complete_rsync_old_file_does_not_exist(self):
 1310         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1311                                           mount_check=False)
 1312 
 1313         with patch('swift.common.db_replicator.os',
 1314                    new=mock.MagicMock(wraps=os)) as mock_os, \
 1315                 unit.mock_check_drive(isdir=True):
 1316             mock_os.path.exists.return_value = False
 1317             response = rpc.complete_rsync('drive', '/data/db.db',
 1318                                           ['arg1'])
 1319             expected_calls = [call('/data/db.db'), call('/drive/tmp/arg1')]
 1320             self.assertEqual(expected_calls,
 1321                              mock_os.path.exists.call_args_list)
 1322             self.assertEqual('404 Not Found', response.status)
 1323             self.assertEqual(404, response.status_int)
 1324 
 1325         with patch('swift.common.db_replicator.os',
 1326                    new=mock.MagicMock(wraps=os)) as mock_os, \
 1327                 unit.mock_check_drive(isdir=True):
 1328             mock_os.path.exists.return_value = False
 1329             response = rpc.complete_rsync('drive', '/data/db.db',
 1330                                           ['arg1', 'arg2'])
 1331             expected_calls = [call('/data/arg2'), call('/drive/tmp/arg1')]
 1332             self.assertEqual(expected_calls,
 1333                              mock_os.path.exists.call_args_list)
 1334             self.assertEqual('404 Not Found', response.status)
 1335             self.assertEqual(404, response.status_int)
 1336 
 1337     def test_complete_rsync_rename(self):
 1338         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1339                                           mount_check=False)
 1340 
 1341         def mock_renamer(old, new):
 1342             renamer_calls.append((old, new))
 1343 
 1344         self._patch(patch.object, db_replicator, 'renamer', mock_renamer)
 1345 
 1346         renamer_calls = []
 1347         with patch('swift.common.db_replicator.os',
 1348                    new=mock.MagicMock(wraps=os)) as mock_os, \
 1349                 unit.mock_check_drive(isdir=True):
 1350             mock_os.path.exists.side_effect = [False, True]
 1351             response = rpc.complete_rsync('drive', '/data/db.db',
 1352                                           ['arg1'])
 1353         self.assertEqual('204 No Content', response.status)
 1354         self.assertEqual(204, response.status_int)
 1355         self.assertEqual(('/drive/tmp/arg1', '/data/db.db'), renamer_calls[0])
 1356         self.assertFalse(renamer_calls[1:])
 1357 
 1358         renamer_calls = []
 1359         with patch('swift.common.db_replicator.os',
 1360                    new=mock.MagicMock(wraps=os)) as mock_os, \
 1361                 unit.mock_check_drive(isdir=True):
 1362             mock_os.path.exists.side_effect = [False, True]
 1363             response = rpc.complete_rsync('drive', '/data/db.db',
 1364                                           ['arg1', 'arg2'])
 1365         self.assertEqual('204 No Content', response.status)
 1366         self.assertEqual(204, response.status_int)
 1367         self.assertEqual(('/drive/tmp/arg1', '/data/arg2'), renamer_calls[0])
 1368         self.assertFalse(renamer_calls[1:])
 1369 
 1370     def test_replicator_sync_with_broker_replication_missing_table(self):
 1371         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1372                                           mount_check=False)
 1373         rpc.logger = unit.debug_logger()
 1374         broker = FakeBroker()
 1375         broker.get_repl_missing_table = True
 1376 
 1377         called = []
 1378 
 1379         def mock_quarantine_db(object_file, server_type):
 1380             called.append(True)
 1381             self.assertEqual(broker.db_file, object_file)
 1382             self.assertEqual(broker.db_type, server_type)
 1383 
 1384         self._patch(patch.object, db_replicator, 'quarantine_db',
 1385                     mock_quarantine_db)
 1386 
 1387         with unit.mock_check_drive(isdir=True):
 1388             response = rpc.sync(broker, ('remote_sync', 'hash_', 'id_',
 1389                                          'created_at', 'put_timestamp',
 1390                                          'delete_timestamp', 'metadata'))
 1391 
 1392         self.assertEqual('404 Not Found', response.status)
 1393         self.assertEqual(404, response.status_int)
 1394         self.assertEqual(called, [True])
 1395         errors = rpc.logger.get_lines_for_level('error')
 1396         self.assertEqual(errors,
 1397                          ["Unable to decode remote metadata 'metadata'",
 1398                           "Quarantining DB %s" % broker])
 1399 
 1400     def test_replicator_sync(self):
 1401         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1402                                           mount_check=False)
 1403         broker = FakeBroker()
 1404 
 1405         with unit.mock_check_drive(isdir=True):
 1406             response = rpc.sync(broker, (
 1407                 broker.get_sync() + 1, 12345, 'id_',
 1408                 'created_at', 'put_timestamp', 'delete_timestamp',
 1409                 '{"meta1": "data1", "meta2": "data2"}'))
 1410 
 1411         self.assertEqual({'meta1': 'data1', 'meta2': 'data2'},
 1412                          broker.metadata)
 1413         self.assertEqual('created_at', broker.created_at)
 1414         self.assertEqual('put_timestamp', broker.put_timestamp)
 1415         self.assertEqual('delete_timestamp', broker.delete_timestamp)
 1416 
 1417         self.assertEqual('200 OK', response.status)
 1418         self.assertEqual(200, response.status_int)
 1419 
 1420     def test_rsync_then_merge(self):
 1421         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1422                                           mount_check=False)
 1423         with unit.mock_check_drive(isdir=True):
 1424             rpc.rsync_then_merge('sda1', '/srv/swift/blah', ('a', 'b'))
 1425 
 1426     def test_merge_items(self):
 1427         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1428                                           mount_check=False)
 1429         fake_broker = FakeBroker()
 1430         args = ('a', 'b')
 1431         with unit.mock_check_drive(isdir=True):
 1432             rpc.merge_items(fake_broker, args)
 1433         self.assertEqual(fake_broker.args, args)
 1434 
 1435     def test_merge_syncs(self):
 1436         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1437                                           mount_check=False)
 1438         fake_broker = FakeBroker()
 1439         args = ('a', 'b')
 1440         with unit.mock_check_drive(isdir=True):
 1441             rpc.merge_syncs(fake_broker, args)
 1442         self.assertEqual(fake_broker.args, (args[0],))
 1443 
 1444     def test_complete_rsync_with_bad_input(self):
 1445         drive = '/some/root'
 1446         db_file = __file__
 1447         args = ['old_file']
 1448         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1449                                           mount_check=False)
 1450         with unit.mock_check_drive(isdir=True):
 1451             resp = rpc.complete_rsync(drive, db_file, args)
 1452         self.assertTrue(isinstance(resp, HTTPException))
 1453         self.assertEqual(404, resp.status_int)
 1454         with unit.mock_check_drive(isdir=True):
 1455             resp = rpc.complete_rsync(drive, 'new_db_file', args)
 1456         self.assertTrue(isinstance(resp, HTTPException))
 1457         self.assertEqual(404, resp.status_int)
 1458 
 1459     def test_complete_rsync(self):
 1460         drive = mkdtemp()
 1461         args = ['old_file']
 1462         rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
 1463                                           mount_check=False)
 1464         os.mkdir('%s/tmp' % drive)
 1465         old_file = '%s/tmp/old_file' % drive
 1466         new_file = '%s/new_db_file' % drive
 1467         try:
 1468             fp = open(old_file, 'w')
 1469             fp.write('void')
 1470             fp.close
 1471             resp = rpc.complete_rsync(drive, new_file, args)
 1472             self.assertEqual(204, resp.status_int)
 1473         finally:
 1474             rmtree(drive)
 1475 
 1476     @unit.with_tempdir
 1477     def test_empty_suffix_and_hash_dirs_get_cleanedup(self, tempdir):
 1478         datadir = os.path.join(tempdir, 'containers')
 1479         db_path = ('450/afd/7089ab48d955ab0851fc51cc17a34afd/'
 1480                    '7089ab48d955ab0851fc51cc17a34afd.db')
 1481         random_file = ('1060/xyz/1234ab48d955ab0851fc51cc17a34xyz/'
 1482                        '1234ab48d955ab0851fc51cc17a34xyz.abc')
 1483 
 1484         # trailing "/" indicates empty dir
 1485         paths = [
 1486             # empty part dir
 1487             '240/',
 1488             # empty suffix dir
 1489             '18/aba/',
 1490             # empty hashdir
 1491             '1054/27e/d41d8cd98f00b204e9800998ecf8427e/',
 1492             # database
 1493             db_path,
 1494             # non database file
 1495             random_file,
 1496         ]
 1497         for path in paths:
 1498             path = os.path.join(datadir, path)
 1499             os.makedirs(os.path.dirname(path))
 1500             if os.path.basename(path):
 1501                 # our setup requires "directories" to end in "/" (i.e. basename
 1502                 # is ''); otherwise, create an empty file
 1503                 open(path, 'w')
 1504         # sanity
 1505         self.assertEqual({'240', '18', '1054', '1060', '450'},
 1506                          set(os.listdir(datadir)))
 1507         for path in paths:
 1508             dirpath = os.path.join(datadir, os.path.dirname(path))
 1509             self.assertTrue(os.path.isdir(dirpath))
 1510 
 1511         node_id = 1
 1512         results = list(db_replicator.roundrobin_datadirs(
 1513             [(datadir, node_id, lambda p: True)]))
 1514         expected = [
 1515             ('450', os.path.join(datadir, db_path), node_id),
 1516         ]
 1517         self.assertEqual(results, expected)
 1518 
 1519         # all the empty leaf dirs are cleaned up
 1520         for path in paths:
 1521             if os.path.basename(path):
 1522                 check = self.assertTrue
 1523             else:
 1524                 check = self.assertFalse
 1525             dirpath = os.path.join(datadir, os.path.dirname(path))
 1526             isdir = os.path.isdir(dirpath)
 1527             check(isdir, '%r is%s a directory!' % (
 1528                 dirpath, '' if isdir else ' not'))
 1529 
 1530         # despite the leaves cleaned up it takes a few loops to finish it off
 1531         self.assertEqual({'18', '1054', '1060', '450'},
 1532                          set(os.listdir(datadir)))
 1533 
 1534         results = list(db_replicator.roundrobin_datadirs(
 1535             [(datadir, node_id, lambda p: True)]))
 1536         self.assertEqual(results, expected)
 1537         self.assertEqual({'1054', '1060', '450'},
 1538                          set(os.listdir(datadir)))
 1539 
 1540         results = list(db_replicator.roundrobin_datadirs(
 1541             [(datadir, node_id, lambda p: True)]))
 1542         self.assertEqual(results, expected)
 1543         # non db file in '1060' dir is not deleted and exception is handled
 1544         self.assertEqual({'1060', '450'},
 1545                          set(os.listdir(datadir)))
 1546 
 1547     def test_roundrobin_datadirs(self):
 1548         listdir_calls = []
 1549         isdir_calls = []
 1550         exists_calls = []
 1551         shuffle_calls = []
 1552         rmdir_calls = []
 1553 
 1554         def _listdir(path):
 1555             listdir_calls.append(path)
 1556             if not path.startswith('/srv/node/sda/containers') and \
 1557                     not path.startswith('/srv/node/sdb/containers'):
 1558                 return []
 1559             path = path[len('/srv/node/sdx/containers'):]
 1560             if path == '':
 1561                 return ['123', '456', '789', '9999', "-5", "not-a-partition"]
 1562                 # 456 will pretend to be a file
 1563                 # 9999 will be an empty partition with no contents
 1564                 # -5 and not-a-partition were created by something outside
 1565                 #   Swift
 1566             elif path == '/123':
 1567                 return ['abc', 'def.db']  # def.db will pretend to be a file
 1568             elif path == '/123/abc':
 1569                 # 11111111111111111111111111111abc will pretend to be a file
 1570                 return ['00000000000000000000000000000abc',
 1571                         '11111111111111111111111111111abc']
 1572             elif path == '/123/abc/00000000000000000000000000000abc':
 1573                 return ['00000000000000000000000000000abc.db',
 1574                         # This other.db isn't in the right place, so should be
 1575                         # ignored later.
 1576                         '000000000000000000000000000other.db',
 1577                         'weird1']  # weird1 will pretend to be a dir, if asked
 1578             elif path == '/789':
 1579                 return ['ghi', 'jkl']  # jkl will pretend to be a file
 1580             elif path == '/789/ghi':
 1581                 # 33333333333333333333333333333ghi will pretend to be a file
 1582                 return ['22222222222222222222222222222ghi',
 1583                         '33333333333333333333333333333ghi']
 1584             elif path == '/789/ghi/22222222222222222222222222222ghi':
 1585                 return ['22222222222222222222222222222ghi.db',
 1586                         'weird2']  # weird2 will pretend to be a dir, if asked
 1587             elif path == '9999':
 1588                 return []
 1589             elif path == 'not-a-partition':
 1590                 raise Exception("shouldn't look in not-a-partition")
 1591             elif path == '-5':
 1592                 raise Exception("shouldn't look in -5")
 1593             return []
 1594 
 1595         def _isdir(path):
 1596             isdir_calls.append(path)
 1597             if not path.startswith('/srv/node/sda/containers') and \
 1598                     not path.startswith('/srv/node/sdb/containers'):
 1599                 return False
 1600             path = path[len('/srv/node/sdx/containers'):]
 1601             if path in ('/123', '/123/abc',
 1602                         '/123/abc/00000000000000000000000000000abc',
 1603                         '/123/abc/00000000000000000000000000000abc/weird1',
 1604                         '/789', '/789/ghi',
 1605                         '/789/ghi/22222222222222222222222222222ghi',
 1606                         '/789/ghi/22222222222222222222222222222ghi/weird2',
 1607                         '/9999'):
 1608                 return True
 1609             return False
 1610 
 1611         def _exists(arg):
 1612             exists_calls.append(arg)
 1613             return True
 1614 
 1615         def _shuffle(arg):
 1616             shuffle_calls.append(arg)
 1617 
 1618         def _rmdir(arg):
 1619             rmdir_calls.append(arg)
 1620 
 1621         base = 'swift.common.db_replicator.'
 1622         with mock.patch(base + 'os.listdir', _listdir), \
 1623                 mock.patch(base + 'os.path.isdir', _isdir), \
 1624                 mock.patch(base + 'os.path.exists', _exists), \
 1625                 mock.patch(base + 'random.shuffle', _shuffle), \
 1626                 mock.patch(base + 'os.rmdir', _rmdir):
 1627 
 1628             datadirs = [('/srv/node/sda/containers', 1, lambda p: True),
 1629                         ('/srv/node/sdb/containers', 2, lambda p: True)]
 1630             results = list(db_replicator.roundrobin_datadirs(datadirs))
 1631             # The results show that the .db files are returned, the devices
 1632             # interleaved.
 1633             self.assertEqual(results, [
 1634                 ('123', '/srv/node/sda/containers/123/abc/'
 1635                         '00000000000000000000000000000abc/'
 1636                         '00000000000000000000000000000abc.db', 1),
 1637                 ('123', '/srv/node/sdb/containers/123/abc/'
 1638                         '00000000000000000000000000000abc/'
 1639                         '00000000000000000000000000000abc.db', 2),
 1640                 ('789', '/srv/node/sda/containers/789/ghi/'
 1641                         '22222222222222222222222222222ghi/'
 1642                         '22222222222222222222222222222ghi.db', 1),
 1643                 ('789', '/srv/node/sdb/containers/789/ghi/'
 1644                         '22222222222222222222222222222ghi/'
 1645                         '22222222222222222222222222222ghi.db', 2)])
 1646             # The listdir calls show that we only listdir the dirs
 1647             self.assertEqual(listdir_calls, [
 1648                 '/srv/node/sda/containers',
 1649                 '/srv/node/sda/containers/123',
 1650                 '/srv/node/sda/containers/123/abc',
 1651                 '/srv/node/sdb/containers',
 1652                 '/srv/node/sdb/containers/123',
 1653                 '/srv/node/sdb/containers/123/abc',
 1654                 '/srv/node/sda/containers/789',
 1655                 '/srv/node/sda/containers/789/ghi',
 1656                 '/srv/node/sdb/containers/789',
 1657                 '/srv/node/sdb/containers/789/ghi',
 1658                 '/srv/node/sda/containers/9999',
 1659                 '/srv/node/sdb/containers/9999'])
 1660             # The isdir calls show that we did ask about the things pretending
 1661             # to be files at various levels.
 1662             self.assertEqual(isdir_calls, [
 1663                 '/srv/node/sda/containers/123',
 1664                 '/srv/node/sda/containers/123/abc',
 1665                 ('/srv/node/sda/containers/123/abc/'
 1666                  '00000000000000000000000000000abc'),
 1667                 '/srv/node/sdb/containers/123',
 1668                 '/srv/node/sdb/containers/123/abc',
 1669                 ('/srv/node/sdb/containers/123/abc/'
 1670                  '00000000000000000000000000000abc'),
 1671                 ('/srv/node/sda/containers/123/abc/'
 1672                  '11111111111111111111111111111abc'),
 1673                 '/srv/node/sda/containers/123/def.db',
 1674                 '/srv/node/sda/containers/456',
 1675                 '/srv/node/sda/containers/789',
 1676                 '/srv/node/sda/containers/789/ghi',
 1677                 ('/srv/node/sda/containers/789/ghi/'
 1678                  '22222222222222222222222222222ghi'),
 1679                 ('/srv/node/sdb/containers/123/abc/'
 1680                  '11111111111111111111111111111abc'),
 1681                 '/srv/node/sdb/containers/123/def.db',
 1682                 '/srv/node/sdb/containers/456',
 1683                 '/srv/node/sdb/containers/789',
 1684                 '/srv/node/sdb/containers/789/ghi',
 1685                 ('/srv/node/sdb/containers/789/ghi/'
 1686                  '22222222222222222222222222222ghi'),
 1687                 ('/srv/node/sda/containers/789/ghi/'
 1688                  '33333333333333333333333333333ghi'),
 1689                 '/srv/node/sda/containers/789/jkl',
 1690                 '/srv/node/sda/containers/9999',
 1691                 ('/srv/node/sdb/containers/789/ghi/'
 1692                  '33333333333333333333333333333ghi'),
 1693                 '/srv/node/sdb/containers/789/jkl',
 1694                 '/srv/node/sdb/containers/9999'])
 1695             # The exists calls are the .db files we looked for as we walked the
 1696             # structure.
 1697             self.assertEqual(exists_calls, [
 1698                 ('/srv/node/sda/containers/123/abc/'
 1699                  '00000000000000000000000000000abc/'
 1700                  '00000000000000000000000000000abc.db'),
 1701                 ('/srv/node/sdb/containers/123/abc/'
 1702                  '00000000000000000000000000000abc/'
 1703                  '00000000000000000000000000000abc.db'),
 1704                 ('/srv/node/sda/containers/789/ghi/'
 1705                  '22222222222222222222222222222ghi/'
 1706                  '22222222222222222222222222222ghi.db'),
 1707                 ('/srv/node/sdb/containers/789/ghi/'
 1708                  '22222222222222222222222222222ghi/'
 1709                  '22222222222222222222222222222ghi.db')])
 1710             # Shows that we called shuffle twice, once for each device.
 1711             self.assertEqual(
 1712                 shuffle_calls, [['123', '456', '789', '9999'],
 1713                                 ['123', '456', '789', '9999']])
 1714 
 1715             # Shows that we called removed the two empty partition directories.
 1716             self.assertEqual(
 1717                 rmdir_calls, ['/srv/node/sda/containers/9999',
 1718                               '/srv/node/sdb/containers/9999'])
 1719 
 1720     @mock.patch("swift.common.db_replicator.ReplConnection", mock.Mock())
 1721     def test_http_connect(self):
 1722         node = "node"
 1723         partition = "partition"
 1724         db_file = __file__
 1725         replicator = TestReplicator({})
 1726         replicator._http_connect(node, partition, db_file)
 1727         expected_hsh = os.path.basename(db_file).split('.', 1)[0]
 1728         expected_hsh = expected_hsh.split('_', 1)[0]
 1729         db_replicator.ReplConnection.assert_has_calls([
 1730             mock.call(node, partition, expected_hsh, replicator.logger)])
 1731 
 1732 
 1733 class TestHandoffsOnly(unittest.TestCase):
 1734     class FakeRing3Nodes(object):
 1735         _replicas = 3
 1736 
 1737         # Three nodes, two disks each
 1738         devs = [
 1739             dict(id=0, region=1, zone=1,
 1740                  meta='', weight=500.0, ip='10.0.0.1', port=6201,
 1741                  replication_ip='10.0.0.1', replication_port=6201,
 1742                  device='sdp'),
 1743             dict(id=1, region=1, zone=1,
 1744                  meta='', weight=500.0, ip='10.0.0.1', port=6201,
 1745                  replication_ip='10.0.0.1', replication_port=6201,
 1746                  device='sdq'),
 1747 
 1748             dict(id=2, region=1, zone=1,
 1749                  meta='', weight=500.0, ip='10.0.0.2', port=6201,
 1750                  replication_ip='10.0.0.2', replication_port=6201,
 1751                  device='sdp'),
 1752             dict(id=3, region=1, zone=1,
 1753                  meta='', weight=500.0, ip='10.0.0.2', port=6201,
 1754                  replication_ip='10.0.0.2', replication_port=6201,
 1755                  device='sdq'),
 1756 
 1757             dict(id=4, region=1, zone=1,
 1758                  meta='', weight=500.0, ip='10.0.0.3', port=6201,
 1759                  replication_ip='10.0.0.3', replication_port=6201,
 1760                  device='sdp'),
 1761             dict(id=5, region=1, zone=1,
 1762                  meta='', weight=500.0, ip='10.0.0.3', port=6201,
 1763                  replication_ip='10.0.0.3', replication_port=6201,
 1764                  device='sdq'),
 1765         ]
 1766 
 1767         def __init__(self, *a, **kw):
 1768             pass
 1769 
 1770         def get_part(self, account, container=None, obj=None):
 1771             return 0
 1772 
 1773         def get_part_nodes(self, part):
 1774             nodes = []
 1775             for offset in range(self._replicas):
 1776                 i = (part + offset) % len(self.devs)
 1777                 nodes.append(self.devs[i])
 1778             return nodes
 1779 
 1780         def get_more_nodes(self, part):
 1781             for offset in range(self._replicas, len(self.devs)):
 1782                 i = (part + offset) % len(self.devs)
 1783                 yield self.devs[i]
 1784 
 1785     def _make_fake_db(self, disk, partition, db_hash):
 1786         directories = [
 1787             os.path.join(self.root, disk),
 1788             os.path.join(self.root, disk, 'containers'),
 1789             os.path.join(self.root, disk, 'containers', str(partition)),
 1790             os.path.join(self.root, disk, 'containers', str(partition),
 1791                          db_hash[-3:]),
 1792             os.path.join(self.root, disk, 'containers', str(partition),
 1793                          db_hash[-3:], db_hash)]
 1794 
 1795         for d in directories:
 1796             try:
 1797                 os.mkdir(d)
 1798             except OSError as err:
 1799                 if err.errno != errno.EEXIST:
 1800                     raise
 1801         file_path = os.path.join(directories[-1], db_hash + ".db")
 1802         with open(file_path, 'w'):
 1803             pass
 1804 
 1805     def setUp(self):
 1806         self.root = mkdtemp()
 1807 
 1808         # object disks; they're just here to make sure they don't trip us up
 1809         os.mkdir(os.path.join(self.root, 'sdc'))
 1810         os.mkdir(os.path.join(self.root, 'sdc', 'objects'))
 1811         os.mkdir(os.path.join(self.root, 'sdd'))
 1812         os.mkdir(os.path.join(self.root, 'sdd', 'objects'))
 1813 
 1814         # part 0 belongs on sdp
 1815         self._make_fake_db('sdp', 0, '010101013cf2b7979af9eaa71cb67220')
 1816 
 1817         # part 1 does not belong on sdp
 1818         self._make_fake_db('sdp', 1, 'abababab2b5368158355e799323b498d')
 1819 
 1820         # part 1 belongs on sdq
 1821         self._make_fake_db('sdq', 1, '02020202e30f696a3cfa63d434a3c94e')
 1822 
 1823         # part 2 does not belong on sdq
 1824         self._make_fake_db('sdq', 2, 'bcbcbcbc15d3835053d568c57e2c83b5')
 1825 
 1826     def tearDown(self):
 1827         rmtree(self.root, ignore_errors=True)
 1828 
 1829     def test_scary_warnings(self):
 1830         logger = unit.FakeLogger()
 1831         replicator = TestReplicator({
 1832             'handoffs_only': 'yes',
 1833             'devices': self.root,
 1834             'bind_port': 6201,
 1835             'mount_check': 'no',
 1836         }, logger=logger)
 1837 
 1838         with patch.object(db_replicator, 'whataremyips',
 1839                           return_value=['10.0.0.1']), \
 1840                 patch.object(replicator, '_replicate_object'), \
 1841                 patch.object(replicator, 'ring', self.FakeRing3Nodes()):
 1842             replicator.run_once()
 1843 
 1844         self.assertEqual(
 1845             logger.get_lines_for_level('warning'),
 1846             [('Starting replication pass with handoffs_only enabled. This '
 1847               'mode is not intended for normal operation; use '
 1848               'handoffs_only with care.'),
 1849              ('Finished replication pass with handoffs_only enabled. '
 1850               'If handoffs_only is no longer required, disable it.')])
 1851 
 1852     def test_skips_primary_partitions(self):
 1853         replicator = TestReplicator({
 1854             'handoffs_only': 'yes',
 1855             'devices': self.root,
 1856             'bind_port': 6201,
 1857             'mount_check': 'no',
 1858         })
 1859 
 1860         with patch.object(db_replicator, 'whataremyips',
 1861                           return_value=['10.0.0.1']), \
 1862                 patch.object(replicator, '_replicate_object') as mock_repl, \
 1863                 patch.object(replicator, 'ring', self.FakeRing3Nodes()):
 1864             replicator.run_once()
 1865 
 1866         self.assertEqual(sorted(mock_repl.mock_calls), [
 1867             mock.call('1', os.path.join(
 1868                 self.root, 'sdp', 'containers', '1', '98d',
 1869                 'abababab2b5368158355e799323b498d',
 1870                 'abababab2b5368158355e799323b498d.db'), 0),
 1871             mock.call('2', os.path.join(
 1872                 self.root, 'sdq', 'containers', '2', '3b5',
 1873                 'bcbcbcbc15d3835053d568c57e2c83b5',
 1874                 'bcbcbcbc15d3835053d568c57e2c83b5.db'), 1)])
 1875 
 1876     def test_override_partitions(self):
 1877         replicator = TestReplicator({
 1878             'devices': self.root,
 1879             'bind_port': 6201,
 1880             'mount_check': 'no',
 1881         })
 1882 
 1883         with patch.object(db_replicator, 'whataremyips',
 1884                           return_value=['10.0.0.1']), \
 1885                 patch.object(replicator, '_replicate_object') as mock_repl, \
 1886                 patch.object(replicator, 'ring', self.FakeRing3Nodes()):
 1887             replicator.run_once(partitions="0,2")
 1888 
 1889         self.assertEqual(sorted(mock_repl.mock_calls), [
 1890             mock.call('0', os.path.join(
 1891                 self.root, 'sdp', 'containers', '0', '220',
 1892                 '010101013cf2b7979af9eaa71cb67220',
 1893                 '010101013cf2b7979af9eaa71cb67220.db'), 0),
 1894             mock.call('2', os.path.join(
 1895                 self.root, 'sdq', 'containers', '2', '3b5',
 1896                 'bcbcbcbc15d3835053d568c57e2c83b5',
 1897                 'bcbcbcbc15d3835053d568c57e2c83b5.db'), 1)])
 1898 
 1899     def test_override_devices(self):
 1900         replicator = TestReplicator({
 1901             'devices': self.root,
 1902             'bind_port': 6201,
 1903             'mount_check': 'no',
 1904         })
 1905 
 1906         with patch.object(db_replicator, 'whataremyips',
 1907                           return_value=['10.0.0.1']), \
 1908                 patch.object(replicator, '_replicate_object') as mock_repl, \
 1909                 patch.object(replicator, 'ring', self.FakeRing3Nodes()):
 1910             replicator.run_once(devices="sdp")
 1911 
 1912         self.assertEqual(sorted(mock_repl.mock_calls), [
 1913             mock.call('0', os.path.join(
 1914                 self.root, 'sdp', 'containers', '0', '220',
 1915                 '010101013cf2b7979af9eaa71cb67220',
 1916                 '010101013cf2b7979af9eaa71cb67220.db'), 0),
 1917             mock.call('1', os.path.join(
 1918                 self.root, 'sdp', 'containers', '1', '98d',
 1919                 'abababab2b5368158355e799323b498d',
 1920                 'abababab2b5368158355e799323b498d.db'), 0)])
 1921 
 1922     def test_override_devices_and_partitions(self):
 1923         replicator = TestReplicator({
 1924             'devices': self.root,
 1925             'bind_port': 6201,
 1926             'mount_check': 'no',
 1927         })
 1928 
 1929         with patch.object(db_replicator, 'whataremyips',
 1930                           return_value=['10.0.0.1']), \
 1931                 patch.object(replicator, '_replicate_object') as mock_repl, \
 1932                 patch.object(replicator, 'ring', self.FakeRing3Nodes()):
 1933             replicator.run_once(partitions="0,2", devices="sdp")
 1934 
 1935         self.assertEqual(sorted(mock_repl.mock_calls), [
 1936             mock.call('0', os.path.join(
 1937                 self.root, 'sdp', 'containers', '0', '220',
 1938                 '010101013cf2b7979af9eaa71cb67220',
 1939                 '010101013cf2b7979af9eaa71cb67220.db'), 0)])
 1940 
 1941 
 1942 class TestReplToNode(unittest.TestCase):
 1943     def setUp(self):
 1944         db_replicator.ring = FakeRing()
 1945         self.delete_db_calls = []
 1946         self.broker = FakeBroker()
 1947         self.replicator = TestReplicator({'per_diff': 10})
 1948         self.fake_node = {'ip': '127.0.0.1', 'device': 'sda1', 'port': 1000}
 1949         self.fake_info = {'id': 'a', 'point': -1, 'max_row': 20, 'hash': 'b',
 1950                           'created_at': 100, 'put_timestamp': 0,
 1951                           'delete_timestamp': 0, 'count': 0,
 1952                           'metadata': json.dumps({
 1953                               'Test': ('Value', normalize_timestamp(1))})}
 1954         self.replicator.logger = mock.Mock()
 1955         self.replicator._rsync_db = mock.Mock(return_value=True)
 1956         self.replicator._usync_db = mock.Mock(return_value=True)
 1957         self.http = ReplHttp('{"id": 3, "point": -1}')
 1958         self.replicator._http_connect = lambda *args: self.http
 1959 
 1960     def test_repl_to_node_usync_success(self):
 1961         rinfo = {"id": 3, "point": -1, "max_row": 10, "hash": "c"}
 1962         self.http = ReplHttp(json.dumps(rinfo))
 1963         local_sync = self.broker.get_sync()
 1964         self.assertEqual(self.replicator._repl_to_node(
 1965             self.fake_node, self.broker, '0', self.fake_info), True)
 1966         self.replicator._usync_db.assert_has_calls([
 1967             mock.call(max(rinfo['point'], local_sync), self.broker,
 1968                       self.http, rinfo['id'], self.fake_info['id'])
 1969         ])
 1970 
 1971     def test_repl_to_node_rsync_success(self):
 1972         rinfo = {"id": 3, "point": -1, "max_row": 9, "hash": "c"}
 1973         self.http = ReplHttp(json.dumps(rinfo))
 1974         self.broker.get_sync()
 1975         self.assertEqual(self.replicator._repl_to_node(
 1976             self.fake_node, self.broker, '0', self.fake_info), True)
 1977         self.replicator.logger.increment.assert_has_calls([
 1978             mock.call.increment('remote_merges')
 1979         ])
 1980         self.replicator._rsync_db.assert_has_calls([
 1981             mock.call(self.broker, self.fake_node, self.http,
 1982                       self.fake_info['id'],
 1983                       replicate_method='rsync_then_merge',
 1984                       replicate_timeout=(self.fake_info['count'] / 2000),
 1985                       different_region=False)
 1986         ])
 1987 
 1988     def test_repl_to_node_already_in_sync(self):
 1989         rinfo = {"id": 3, "point": -1, "max_row": 20, "hash": "b"}
 1990         self.http = ReplHttp(json.dumps(rinfo))
 1991         self.broker.get_sync()
 1992         self.assertEqual(self.replicator._repl_to_node(
 1993             self.fake_node, self.broker, '0', self.fake_info), True)
 1994         self.assertEqual(self.replicator._rsync_db.call_count, 0)
 1995         self.assertEqual(self.replicator._usync_db.call_count, 0)
 1996 
 1997     def test_repl_to_node_metadata_update(self):
 1998         now = Timestamp(time.time()).internal
 1999         rmetadata = {"X-Container-Sysmeta-Test": ("XYZ", now)}
 2000         rinfo = {"id": 3, "point": -1, "max_row": 20, "hash": "b",
 2001                  "metadata": json.dumps(rmetadata)}
 2002         self.http = ReplHttp(json.dumps(rinfo))
 2003         self.broker.get_sync()
 2004         self.assertEqual(self.replicator._repl_to_node(
 2005             self.fake_node, self.broker, '0', self.fake_info), True)
 2006         metadata = self.broker.metadata
 2007         self.assertIn("X-Container-Sysmeta-Test", metadata)
 2008         self.assertEqual("XYZ", metadata["X-Container-Sysmeta-Test"][0])
 2009         self.assertEqual(now, metadata["X-Container-Sysmeta-Test"][1])
 2010 
 2011     def test_repl_to_node_not_found(self):
 2012         self.http = ReplHttp('{"id": 3, "point": -1}', set_status=404)
 2013         self.assertEqual(self.replicator._repl_to_node(
 2014             self.fake_node, self.broker, '0', self.fake_info, False), True)
 2015         self.replicator.logger.increment.assert_has_calls([
 2016             mock.call.increment('rsyncs')
 2017         ])
 2018         self.replicator._rsync_db.assert_has_calls([
 2019             mock.call(self.broker, self.fake_node, self.http,
 2020                       self.fake_info['id'], different_region=False)
 2021         ])
 2022 
 2023     def test_repl_to_node_drive_not_mounted(self):
 2024         self.http = ReplHttp('{"id": 3, "point": -1}', set_status=507)
 2025 
 2026         self.assertRaises(DriveNotMounted, self.replicator._repl_to_node,
 2027                           self.fake_node, FakeBroker(), '0', self.fake_info)
 2028 
 2029     def test_repl_to_node_300_status(self):
 2030         self.http = ReplHttp('{"id": 3, "point": -1}', set_status=300)
 2031 
 2032         self.assertFalse(self.replicator._repl_to_node(
 2033             self.fake_node, FakeBroker(), '0', self.fake_info))
 2034 
 2035     def test_repl_to_node_not_response(self):
 2036         self.http = mock.Mock(replicate=mock.Mock(return_value=None))
 2037         self.assertEqual(self.replicator._repl_to_node(
 2038             self.fake_node, FakeBroker(), '0', self.fake_info), False)
 2039 
 2040     def test_repl_to_node_small_container_always_usync(self):
 2041         # Tests that a small container that is > 50% out of sync will
 2042         # still use usync.
 2043         rinfo = {"id": 3, "point": -1, "hash": "c"}
 2044 
 2045         # Turn per_diff back to swift's default.
 2046         self.replicator.per_diff = 1000
 2047         for r, l in ((5, 20), (40, 100), (450, 1000), (550, 1500)):
 2048             rinfo['max_row'] = r
 2049             self.fake_info['max_row'] = l
 2050             self.replicator._usync_db = mock.Mock(return_value=True)
 2051             self.http = ReplHttp(json.dumps(rinfo))
 2052             local_sync = self.broker.get_sync()
 2053             self.assertEqual(self.replicator._repl_to_node(
 2054                 self.fake_node, self.broker, '0', self.fake_info), True)
 2055             self.replicator._usync_db.assert_has_calls([
 2056                 mock.call(max(rinfo['point'], local_sync), self.broker,
 2057                           self.http, rinfo['id'], self.fake_info['id'])
 2058             ])
 2059 
 2060 
 2061 class ExampleReplicator(db_replicator.Replicator):
 2062     server_type = 'fake'
 2063     brokerclass = ExampleBroker
 2064     datadir = 'fake'
 2065     default_port = 1000
 2066 
 2067 
 2068 class TestReplicatorSync(unittest.TestCase):
 2069 
 2070     # override in subclass
 2071     backend = ExampleReplicator.brokerclass
 2072     datadir = ExampleReplicator.datadir
 2073     replicator_daemon = ExampleReplicator
 2074     replicator_rpc = db_replicator.ReplicatorRpc
 2075 
 2076     def setUp(self):
 2077         self.root = mkdtemp()
 2078         self.rpc = self.replicator_rpc(
 2079             self.root, self.datadir, self.backend, mount_check=False,
 2080             logger=unit.debug_logger())
 2081         FakeReplConnection = attach_fake_replication_rpc(self.rpc)
 2082         self._orig_ReplConnection = db_replicator.ReplConnection
 2083         db_replicator.ReplConnection = FakeReplConnection
 2084         self._orig_Ring = db_replicator.ring.Ring
 2085         self._ring = unit.FakeRing()
 2086         db_replicator.ring.Ring = lambda *args, **kwargs: self._get_ring()
 2087         self.logger = unit.debug_logger()
 2088 
 2089     def tearDown(self):
 2090         db_replicator.ReplConnection = self._orig_ReplConnection
 2091         db_replicator.ring.Ring = self._orig_Ring
 2092         rmtree(self.root)
 2093 
 2094     def _get_ring(self):
 2095         return self._ring
 2096 
 2097     def _get_broker(self, account, container=None, node_index=0):
 2098         hash_ = hash_path(account, container)
 2099         part, nodes = self._ring.get_nodes(account, container)
 2100         drive = nodes[node_index]['device']
 2101         db_path = os.path.join(self.root, drive,
 2102                                storage_directory(self.datadir, part, hash_),
 2103                                hash_ + '.db')
 2104         return self.backend(db_path, account=account, container=container)
 2105 
 2106     def _get_broker_part_node(self, broker):
 2107         part, nodes = self._ring.get_nodes(broker.account, broker.container)
 2108         storage_dir = broker.db_file[len(self.root):].lstrip(os.path.sep)
 2109         broker_device = storage_dir.split(os.path.sep, 1)[0]
 2110         for node in nodes:
 2111             if node['device'] == broker_device:
 2112                 return part, node
 2113 
 2114     def _get_daemon(self, node, conf_updates):
 2115         conf = {
 2116             'devices': self.root,
 2117             'recon_cache_path': self.root,
 2118             'mount_check': 'false',
 2119             'bind_port': node['replication_port'],
 2120         }
 2121         if conf_updates:
 2122             conf.update(conf_updates)
 2123         return self.replicator_daemon(conf, logger=self.logger)
 2124 
 2125     def _install_fake_rsync_file(self, daemon, captured_calls=None):
 2126         def _rsync_file(db_file, remote_file, **kwargs):
 2127             if captured_calls is not None:
 2128                 captured_calls.append((db_file, remote_file, kwargs))
 2129             remote_server, remote_path = remote_file.split('/', 1)
 2130             dest_path = os.path.join(self.root, remote_path)
 2131             copy(db_file, dest_path)
 2132             return True
 2133         daemon._rsync_file = _rsync_file
 2134 
 2135     def _run_once(self, node, conf_updates=None, daemon=None):
 2136         daemon = daemon or self._get_daemon(node, conf_updates)
 2137         self._install_fake_rsync_file(daemon)
 2138         with mock.patch('swift.common.db_replicator.whataremyips',
 2139                         new=lambda *a, **kw: [node['replication_ip']]), \
 2140                 unit.mock_check_drive(isdir=not daemon.mount_check,
 2141                                       ismount=daemon.mount_check):
 2142             daemon.run_once()
 2143         return daemon
 2144 
 2145     def test_local_ids(self):
 2146         for drive in ('sda', 'sdb', 'sdd'):
 2147             os.makedirs(os.path.join(self.root, drive, self.datadir))
 2148         for node in self._ring.devs:
 2149             daemon = self._run_once(node)
 2150             if node['device'] == 'sdc':
 2151                 self.assertEqual(daemon._local_device_ids, set())
 2152             else:
 2153                 self.assertEqual(daemon._local_device_ids,
 2154                                  set([node['id']]))
 2155 
 2156     def test_clean_up_after_deleted_brokers(self):
 2157         broker = self._get_broker('a', 'c', node_index=0)
 2158         part, node = self._get_broker_part_node(broker)
 2159         part = str(part)
 2160         daemon = self._run_once(node)
 2161         # create a super old broker and delete it!
 2162         forever_ago = time.time() - daemon.reclaim_age
 2163         put_timestamp = normalize_timestamp(forever_ago - 2)
 2164         delete_timestamp = normalize_timestamp(forever_ago - 1)
 2165         broker.initialize(put_timestamp)
 2166         broker.delete_db(delete_timestamp)
 2167         # if we have a container broker make sure it's reported
 2168         if hasattr(broker, 'reported'):
 2169             info = broker.get_info()
 2170             broker.reported(info['put_timestamp'],
 2171                             info['delete_timestamp'],
 2172                             info['object_count'],
 2173                             info['bytes_used'])
 2174         info = broker.get_replication_info()
 2175         self.assertTrue(daemon.report_up_to_date(info))
 2176         # we have a part dir
 2177         part_root = os.path.join(self.root, node['device'], self.datadir)
 2178         parts = os.listdir(part_root)
 2179         self.assertEqual([part], parts)
 2180         # with a single suffix
 2181         suff = os.listdir(os.path.join(part_root, part))
 2182         self.assertEqual(1, len(suff))
 2183         # running replicator will remove the deleted db
 2184         daemon = self._run_once(node, daemon=daemon)
 2185         self.assertEqual(1, daemon.stats['remove'])
 2186         # we still have a part dir (but it's empty)
 2187         suff = os.listdir(os.path.join(part_root, part))
 2188         self.assertEqual(0, len(suff))
 2189         # run it again and there's nothing to do...
 2190         daemon = self._run_once(node, daemon=daemon)
 2191         self.assertEqual(0, daemon.stats['attempted'])
 2192         # but empty part dir is cleaned up!
 2193         parts = os.listdir(part_root)
 2194         self.assertEqual(0, len(parts))
 2195 
 2196     def test_rsync_then_merge(self):
 2197         # setup current db (and broker)
 2198         broker = self._get_broker('a', 'c', node_index=0)
 2199         part, node = self._get_broker_part_node(broker)
 2200         part = str(part)
 2201         put_timestamp = normalize_timestamp(time.time())
 2202         broker.initialize(put_timestamp)
 2203         put_metadata = {'example-meta': ['bah', put_timestamp]}
 2204         broker.update_metadata(put_metadata)
 2205 
 2206         # sanity (re-open, and the db keeps the metadata)
 2207         broker = self._get_broker('a', 'c', node_index=0)
 2208         self.assertEqual(put_metadata, broker.metadata)
 2209 
 2210         # create rsynced db in tmp dir
 2211         obj_hash = hash_path('a', 'c')
 2212         rsynced_db_broker = self.backend(
 2213             os.path.join(self.root, node['device'], 'tmp', obj_hash + '.db'),
 2214             account='a', container='b')
 2215         rsynced_db_broker.initialize(put_timestamp)
 2216 
 2217         # do rysnc_then_merge
 2218         rpc = db_replicator.ReplicatorRpc(
 2219             self.root, self.datadir, self.backend, False)
 2220         response = rpc.dispatch((node['device'], part, obj_hash),
 2221                                 ['rsync_then_merge', obj_hash + '.db', 'arg2'])
 2222         # sanity
 2223         self.assertEqual('204 No Content', response.status)
 2224         self.assertEqual(204, response.status_int)
 2225 
 2226         # re-open the db
 2227         broker = self._get_broker('a', 'c', node_index=0)
 2228         # keep the metadata in existing db
 2229         self.assertEqual(put_metadata, broker.metadata)
 2230 
 2231     def test_replicator_sync(self):
 2232         # setup current db (and broker)
 2233         broker = self._get_broker('a', 'c', node_index=0)
 2234         part, node = self._get_broker_part_node(broker)
 2235         part = str(part)
 2236         put_timestamp = normalize_timestamp(time.time())
 2237         broker.initialize(put_timestamp)
 2238         put_metadata = {'example-meta': ['bah', put_timestamp]}
 2239         sync_local_metadata = {
 2240             "meta1": ["data1", put_timestamp],
 2241             "meta2": ["data2", put_timestamp]}
 2242         broker.update_metadata(put_metadata)
 2243 
 2244         # sanity (re-open, and the db keeps the metadata)
 2245         broker = self._get_broker('a', 'c', node_index=0)
 2246         self.assertEqual(put_metadata, broker.metadata)
 2247 
 2248         # do rysnc_then_merge
 2249         rpc = db_replicator.ReplicatorRpc(
 2250             self.root, self.datadir, ExampleBroker, False)
 2251         response = rpc.sync(
 2252             broker, (broker.get_sync('id_') + 1, 12345, 'id_',
 2253                      put_timestamp, put_timestamp, '0',
 2254                      json.dumps(sync_local_metadata)))
 2255         # sanity
 2256         self.assertEqual('200 OK', response.status)
 2257         self.assertEqual(200, response.status_int)
 2258 
 2259         # re-open the db
 2260         broker = self._get_broker('a', 'c', node_index=0)
 2261         # keep the both metadata in existing db and local db
 2262         expected = put_metadata.copy()
 2263         expected.update(sync_local_metadata)
 2264         self.assertEqual(expected, broker.metadata)
 2265 
 2266 
 2267 if __name__ == '__main__':
 2268     unittest.main()