"Fossies" - the Fresh Open Source Software Archive

Member "cinder-17.1.0/cinder/volume/drivers/rbd.py" (8 Mar 2021, 91538 Bytes) of package /linux/misc/openstack/cinder-17.1.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "rbd.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 17.0.1_vs_17.1.0.

    1 #    Copyright 2013 OpenStack Foundation
    2 #
    3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 #    not use this file except in compliance with the License. You may obtain
    5 #    a copy of the License at
    6 #
    7 #         http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 #    Unless required by applicable law or agreed to in writing, software
   10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 #    License for the specific language governing permissions and limitations
   13 #    under the License.
   14 """RADOS Block Device Driver"""
   15 
   16 import binascii
   17 import errno
   18 import json
   19 import math
   20 import os
   21 import tempfile
   22 
   23 from castellan import key_manager
   24 from eventlet import tpool
   25 from os_brick.initiator import linuxrbd
   26 from oslo_config import cfg
   27 from oslo_log import log as logging
   28 from oslo_service import loopingcall
   29 from oslo_utils import encodeutils
   30 from oslo_utils import excutils
   31 from oslo_utils import fileutils
   32 from oslo_utils import units
   33 try:
   34     import rados
   35     import rbd
   36 except ImportError:
   37     rados = None
   38     rbd = None
   39 import six
   40 from six.moves import urllib
   41 
   42 from cinder import exception
   43 from cinder.i18n import _
   44 from cinder.image import image_utils
   45 from cinder import interface
   46 from cinder import objects
   47 from cinder.objects import fields
   48 from cinder import utils
   49 from cinder.volume import configuration
   50 from cinder.volume import driver
   51 from cinder.volume import volume_utils
   52 
   53 LOG = logging.getLogger(__name__)
   54 
   55 RBD_OPTS = [
   56     cfg.StrOpt('rbd_cluster_name',
   57                default='ceph',
   58                help='The name of ceph cluster'),
   59     cfg.StrOpt('rbd_pool',
   60                default='rbd',
   61                help='The RADOS pool where rbd volumes are stored'),
   62     cfg.StrOpt('rbd_user',
   63                help='The RADOS client name for accessing rbd volumes '
   64                     '- only set when using cephx authentication'),
   65     cfg.StrOpt('rbd_ceph_conf',
   66                default='',  # default determined by librados
   67                help='Path to the ceph configuration file'),
   68     cfg.BoolOpt('rbd_flatten_volume_from_snapshot',
   69                 default=False,
   70                 help='Flatten volumes created from snapshots to remove '
   71                      'dependency from volume to snapshot'),
   72     cfg.StrOpt('rbd_secret_uuid',
   73                help='The libvirt uuid of the secret for the rbd_user '
   74                     'volumes'),
   75     cfg.IntOpt('rbd_max_clone_depth',
   76                default=5,
   77                help='Maximum number of nested volume clones that are '
   78                     'taken before a flatten occurs. Set to 0 to disable '
   79                     'cloning. Note: lowering this value will not affect '
   80                     'existing volumes whose clone depth exceeds the new '
   81                     'value.'),
   82     cfg.IntOpt('rbd_store_chunk_size', default=4,
   83                help='Volumes will be chunked into objects of this size '
   84                     '(in megabytes).'),
   85     cfg.IntOpt('rados_connect_timeout', default=-1,
   86                help='Timeout value (in seconds) used when connecting to '
   87                     'ceph cluster. If value < 0, no timeout is set and '
   88                     'default librados value is used.'),
   89     cfg.IntOpt('rados_connection_retries', default=3,
   90                help='Number of retries if connection to ceph cluster '
   91                     'failed.'),
   92     cfg.IntOpt('rados_connection_interval', default=5,
   93                help='Interval value (in seconds) between connection '
   94                     'retries to ceph cluster.'),
   95     cfg.IntOpt('replication_connect_timeout', default=5,
   96                help='Timeout value (in seconds) used when connecting to '
   97                     'ceph cluster to do a demotion/promotion of volumes. '
   98                     'If value < 0, no timeout is set and default librados '
   99                     'value is used.'),
  100     cfg.BoolOpt('report_dynamic_total_capacity', default=True,
  101                 help='Set to True for driver to report total capacity as a '
  102                      'dynamic value (used + current free) and to False to '
  103                      'report a static value (quota max bytes if defined and '
  104                      'global size of cluster if not).'),
  105     cfg.BoolOpt('rbd_exclusive_cinder_pool', default=False,
  106                 help="Set to True if the pool is used exclusively by Cinder. "
  107                      "On exclusive use driver won't query images' provisioned "
  108                      "size as they will match the value calculated by the "
  109                      "Cinder core code for allocated_capacity_gb. This "
  110                      "reduces the load on the Ceph cluster as well as on the "
  111                      "volume service."),
  112     cfg.BoolOpt('enable_deferred_deletion', default=False,
  113                 help='Enable deferred deletion. Upon deletion, volumes are '
  114                      'tagged for deletion but will only be removed '
  115                      'asynchronously at a later time.'),
  116     cfg.IntOpt('deferred_deletion_delay', default=0,
  117                help='Time delay in seconds before a volume is eligible '
  118                     'for permanent removal after being tagged for deferred '
  119                     'deletion.'),
  120     cfg.IntOpt('deferred_deletion_purge_interval', default=60,
  121                help='Number of seconds between runs of the periodic task '
  122                     'to purge volumes tagged for deletion.'),
  123 ]
  124 
  125 CONF = cfg.CONF
  126 CONF.register_opts(RBD_OPTS, group=configuration.SHARED_CONF_GROUP)
  127 
  128 EXTRA_SPECS_REPL_ENABLED = "replication_enabled"
  129 EXTRA_SPECS_MULTIATTACH = "multiattach"
  130 
  131 
  132 # RBD
  133 class RBDDriverException(exception.VolumeDriverException):
  134     message = _("RBD Cinder driver failure: %(reason)s")
  135 
  136 
  137 class RBDVolumeProxy(object):
  138     """Context manager for dealing with an existing rbd volume.
  139 
  140     This handles connecting to rados and opening an ioctx automatically, and
  141     otherwise acts like a librbd Image object.
  142 
  143     Also this may reuse an external connection (client and ioctx args), but
  144     note, that caller will be responsible for opening/closing connection.
  145     Also `pool`, `remote`, `timeout` args will be ignored in that case.
  146 
  147     The underlying librados client and ioctx can be accessed as the attributes
  148     'client' and 'ioctx'.
  149     """
  150     def __init__(self, driver, name, pool=None, snapshot=None,
  151                  read_only=False, remote=None, timeout=None,
  152                  client=None, ioctx=None):
  153         self._close_conn = not (client and ioctx)
  154         rados_client, rados_ioctx = driver._connect_to_rados(
  155             pool, remote, timeout) if self._close_conn else (client, ioctx)
  156 
  157         if snapshot is not None:
  158             snapshot = utils.convert_str(snapshot)
  159         try:
  160             self.volume = driver.rbd.Image(rados_ioctx,
  161                                            utils.convert_str(name),
  162                                            snapshot=snapshot,
  163                                            read_only=read_only)
  164             self.volume = tpool.Proxy(self.volume)
  165         except driver.rbd.Error:
  166             if self._close_conn:
  167                 driver._disconnect_from_rados(rados_client, rados_ioctx)
  168             raise
  169         self.driver = driver
  170         self.client = rados_client
  171         self.ioctx = rados_ioctx
  172 
  173     def __enter__(self):
  174         return self
  175 
  176     def __exit__(self, type_, value, traceback):
  177         try:
  178             self.volume.close()
  179         finally:
  180             if self._close_conn:
  181                 self.driver._disconnect_from_rados(self.client, self.ioctx)
  182 
  183     def __getattr__(self, attrib):
  184         return getattr(self.volume, attrib)
  185 
  186 
  187 class RADOSClient(object):
  188     """Context manager to simplify error handling for connecting to ceph."""
  189     def __init__(self, driver, pool=None):
  190         self.driver = driver
  191         self.cluster, self.ioctx = driver._connect_to_rados(pool)
  192 
  193     def __enter__(self):
  194         return self
  195 
  196     def __exit__(self, type_, value, traceback):
  197         self.driver._disconnect_from_rados(self.cluster, self.ioctx)
  198 
  199     @property
  200     def features(self):
  201         features = self.cluster.conf_get('rbd_default_features')
  202         if ((features is None) or (int(features) == 0)):
  203             features = self.driver.RBD_FEATURE_LAYERING
  204         return int(features)
  205 
  206 
  207 @interface.volumedriver
  208 class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
  209                 driver.ManageableVD, driver.ManageableSnapshotsVD,
  210                 driver.BaseVD):
  211     """Implements RADOS block device (RBD) volume commands."""
  212 
  213     VERSION = '1.2.0'
  214 
  215     # ThirdPartySystems wiki page
  216     CI_WIKI_NAME = "Cinder_Jenkins"
  217 
  218     SUPPORTS_ACTIVE_ACTIVE = True
  219 
  220     SYSCONFDIR = '/etc/ceph/'
  221 
  222     RBD_FEATURE_LAYERING = 1
  223     RBD_FEATURE_EXCLUSIVE_LOCK = 4
  224     RBD_FEATURE_OBJECT_MAP = 8
  225     RBD_FEATURE_FAST_DIFF = 16
  226     RBD_FEATURE_JOURNALING = 64
  227 
  228     def __init__(self, active_backend_id=None, *args, **kwargs):
  229         super(RBDDriver, self).__init__(*args, **kwargs)
  230         self.configuration.append_config_values(RBD_OPTS)
  231         self._stats = {}
  232         # allow overrides for testing
  233         self.rados = kwargs.get('rados', rados)
  234         self.rbd = kwargs.get('rbd', rbd)
  235 
  236         # All string args used with librbd must be None or utf-8 otherwise
  237         # librbd will break.
  238         for attr in ['rbd_cluster_name', 'rbd_user',
  239                      'rbd_ceph_conf', 'rbd_pool']:
  240             val = getattr(self.configuration, attr)
  241             if val is not None:
  242                 setattr(self.configuration, attr, utils.convert_str(val))
  243 
  244         self._backend_name = (self.configuration.volume_backend_name or
  245                               self.__class__.__name__)
  246         self._active_backend_id = active_backend_id
  247         self._active_config = {}
  248         self._is_replication_enabled = False
  249         self._replication_targets = []
  250         self._target_names = []
  251         self._clone_v2_api_checked = False
  252 
  253         if self.rbd is not None:
  254             self.RBD_FEATURE_LAYERING = self.rbd.RBD_FEATURE_LAYERING
  255             self.RBD_FEATURE_EXCLUSIVE_LOCK = \
  256                 self.rbd.RBD_FEATURE_EXCLUSIVE_LOCK
  257             self.RBD_FEATURE_OBJECT_MAP = self.rbd.RBD_FEATURE_OBJECT_MAP
  258             self.RBD_FEATURE_FAST_DIFF = self.rbd.RBD_FEATURE_FAST_DIFF
  259             self.RBD_FEATURE_JOURNALING = self.rbd.RBD_FEATURE_JOURNALING
  260 
  261         self.MULTIATTACH_EXCLUSIONS = (
  262             self.RBD_FEATURE_JOURNALING |
  263             self.RBD_FEATURE_FAST_DIFF |
  264             self.RBD_FEATURE_OBJECT_MAP |
  265             self.RBD_FEATURE_EXCLUSIVE_LOCK)
  266 
  267         self._set_keyring_attributes()
  268 
  269     def _set_keyring_attributes(self):
  270         # The rbd_keyring_conf option is not available for OpenStack usage
  271         # for security reasons (OSSN-0085) and in OpenStack we use
  272         # rbd_secret_uuid or make sure that the keyring files are present on
  273         # the hosts (where os-brick will look for them).
  274         # For cinderlib usage this option is necessary (no security issue, as
  275         # in those cases the contents of the connection are not available to
  276         # users). By using getattr Oslo-conf won't read the option from the
  277         # file even if it's there (because we have removed the conf option
  278         # definition), but cinderlib will find it because it sets the option
  279         # directly as an attribute.
  280         self.keyring_file = getattr(self.configuration, 'rbd_keyring_conf',
  281                                     None)
  282 
  283         self.keyring_data = None
  284         try:
  285             if self.keyring_file and os.path.isfile(self.keyring_file):
  286                 with open(self.keyring_file, 'r') as k_file:
  287                     self.keyring_data = k_file.read()
  288         except IOError:
  289             LOG.debug('Cannot read RBD keyring file: %s.', self.keyring_file)
  290 
  291     @classmethod
  292     def get_driver_options(cls):
  293         additional_opts = cls._get_oslo_driver_opts(
  294             'replication_device', 'reserved_percentage',
  295             'max_over_subscription_ratio', 'volume_dd_blocksize')
  296         return RBD_OPTS + additional_opts
  297 
  298     def _show_msg_check_clone_v2_api(self, volume_name):
  299         if not self._clone_v2_api_checked:
  300             self._clone_v2_api_checked = True
  301             with RBDVolumeProxy(self, volume_name) as volume:
  302                 try:
  303                     if (volume.volume.op_features() &
  304                             self.rbd.RBD_OPERATION_FEATURE_CLONE_PARENT):
  305                         LOG.info('Using v2 Clone API')
  306                         return
  307                 except AttributeError:
  308                     pass
  309                 LOG.warning('Not using v2 clone API, please upgrade to'
  310                             ' mimic+ and set the OSD minimum client'
  311                             ' compat version to mimic for better'
  312                             ' performance, fewer deletion issues')
  313 
  314     def _get_target_config(self, target_id):
  315         """Get a replication target from known replication targets."""
  316         for target in self._replication_targets:
  317             if target['name'] == target_id:
  318                 return target
  319         if not target_id or target_id == 'default':
  320             return {
  321                 'name': self.configuration.rbd_cluster_name,
  322                 'conf': self.configuration.rbd_ceph_conf,
  323                 'user': self.configuration.rbd_user,
  324                 'secret_uuid': self.configuration.rbd_secret_uuid
  325             }
  326         raise exception.InvalidReplicationTarget(
  327             reason=_('RBD: Unknown failover target host %s.') % target_id)
  328 
  329     def do_setup(self, context):
  330         """Performs initialization steps that could raise exceptions."""
  331         self._do_setup_replication()
  332         self._active_config = self._get_target_config(self._active_backend_id)
  333 
  334     def _do_setup_replication(self):
  335         replication_devices = self.configuration.safe_get(
  336             'replication_device')
  337         if replication_devices:
  338             self._parse_replication_configs(replication_devices)
  339             self._is_replication_enabled = True
  340             self._target_names.append('default')
  341 
  342     def _parse_replication_configs(self, replication_devices):
  343         for replication_device in replication_devices:
  344             if 'backend_id' not in replication_device:
  345                 msg = _('Missing backend_id in replication_device '
  346                         'configuration.')
  347                 raise exception.InvalidConfigurationValue(msg)
  348 
  349             name = replication_device['backend_id']
  350             conf = replication_device.get('conf',
  351                                           self.SYSCONFDIR + name + '.conf')
  352             user = replication_device.get(
  353                 'user', self.configuration.rbd_user or 'cinder')
  354             secret_uuid = replication_device.get(
  355                 'secret_uuid', self.configuration.rbd_secret_uuid)
  356             # Pool has to be the same in all clusters
  357             replication_target = {'name': name,
  358                                   'conf': utils.convert_str(conf),
  359                                   'user': utils.convert_str(user),
  360                                   'secret_uuid': secret_uuid}
  361             LOG.info('Adding replication target: %s.', name)
  362             self._replication_targets.append(replication_target)
  363             self._target_names.append(name)
  364 
  365     def _get_config_tuple(self, remote=None):
  366         if not remote:
  367             remote = self._active_config
  368         return (remote.get('name'), remote.get('conf'), remote.get('user'),
  369                 remote.get('secret_uuid', None))
  370 
  371     def _trash_purge(self):
  372         LOG.info("Purging trash for backend '%s'", self._backend_name)
  373         with RADOSClient(self) as client:
  374             for vol in self.RBDProxy().trash_list(client.ioctx):
  375                 try:
  376                     self.RBDProxy().trash_remove(client.ioctx, vol.get('id'))
  377                     LOG.info("Deleted %s from trash for backend '%s'",
  378                              vol.get('name'),
  379                              self._backend_name)
  380                 except Exception as e:
  381                     # NOTE(arne_wiebalck): trash_remove raises EPERM in case
  382                     # the volume's deferral time has not expired yet, so we
  383                     # want to explicitly handle this "normal" situation.
  384                     # All other exceptions, e.g. ImageBusy, are not re-raised
  385                     # so that the periodic purge retries on the next iteration
  386                     # and leaves ERRORs in the logs in case the deletion fails
  387                     # repeatedly.
  388                     if e.errno == errno.EPERM:
  389                         LOG.debug("%s has not expired yet on backend '%s'",
  390                                   vol.get('name'),
  391                                   self._backend_name)
  392                     else:
  393                         LOG.exception("Error deleting %s from trash "
  394                                       "backend '%s'",
  395                                       vol.get('name'),
  396                                       self._backend_name)
  397 
  398     def _start_periodic_tasks(self):
  399         if self.configuration.enable_deferred_deletion:
  400             LOG.info("Starting periodic trash purge for backend '%s'",
  401                      self._backend_name)
  402             deferred_deletion_ptask = loopingcall.FixedIntervalLoopingCall(
  403                 self._trash_purge)
  404             deferred_deletion_ptask.start(
  405                 interval=self.configuration.deferred_deletion_purge_interval)
  406 
  407     def check_for_setup_error(self):
  408         """Returns an error if prerequisites aren't met."""
  409         if rados is None:
  410             msg = _('rados and rbd python libraries not found')
  411             raise exception.VolumeBackendAPIException(data=msg)
  412 
  413         for attr in ['rbd_cluster_name', 'rbd_pool']:
  414             val = getattr(self.configuration, attr)
  415             if not val:
  416                 raise exception.InvalidConfigurationValue(option=attr,
  417                                                           value=val)
  418         # NOTE: Checking connection to ceph
  419         # RADOSClient __init__ method invokes _connect_to_rados
  420         # so no need to check for self.rados.Error here.
  421         with RADOSClient(self):
  422             pass
  423 
  424         # NOTE(arne_wiebalck): If deferred deletion is enabled, check if the
  425         # local Ceph client has support for the trash API.
  426         if self.configuration.enable_deferred_deletion:
  427             if not hasattr(self.RBDProxy(), 'trash_list'):
  428                 msg = _("Deferred deletion is enabled, but the local Ceph "
  429                         "client has no support for the trash API. Support "
  430                         "for this feature started with v12.2.0 Luminous.")
  431                 LOG.error(msg)
  432                 raise exception.VolumeBackendAPIException(data=msg)
  433 
  434         # If the keyring is defined (cinderlib usage), then the contents are
  435         # necessary.
  436         if self.keyring_file and not self.keyring_data:
  437             msg = _('No keyring data found')
  438             LOG.error(msg)
  439             raise exception.InvalidConfigurationValue(
  440                 option='rbd_keyring_conf', value=self.keyring_file)
  441 
  442         self._start_periodic_tasks()
  443 
  444     def RBDProxy(self):
  445         return tpool.Proxy(self.rbd.RBD())
  446 
  447     def _ceph_args(self):
  448         args = []
  449 
  450         name, conf, user, secret_uuid = self._get_config_tuple()
  451 
  452         if user:
  453             args.extend(['--id', user])
  454         if name:
  455             args.extend(['--cluster', name])
  456         if conf:
  457             args.extend(['--conf', conf])
  458 
  459         return args
  460 
  461     def _connect_to_rados(self, pool=None, remote=None, timeout=None):
  462         @utils.retry(exception.VolumeBackendAPIException,
  463                      self.configuration.rados_connection_interval,
  464                      self.configuration.rados_connection_retries)
  465         def _do_conn(pool, remote, timeout):
  466             name, conf, user, secret_uuid = self._get_config_tuple(remote)
  467 
  468             if pool is not None:
  469                 pool = utils.convert_str(pool)
  470             else:
  471                 pool = self.configuration.rbd_pool
  472 
  473             if timeout is None:
  474                 timeout = self.configuration.rados_connect_timeout
  475 
  476             LOG.debug("connecting to %(user)s@%(name)s (conf=%(conf)s, "
  477                       "timeout=%(timeout)s).",
  478                       {'user': user, 'name': name, 'conf': conf,
  479                        'timeout': timeout})
  480 
  481             client = self.rados.Rados(rados_id=user,
  482                                       clustername=name,
  483                                       conffile=conf)
  484 
  485             try:
  486                 if timeout >= 0:
  487                     timeout = six.text_type(timeout)
  488                     client.conf_set('rados_osd_op_timeout', timeout)
  489                     client.conf_set('rados_mon_op_timeout', timeout)
  490                     client.conf_set('client_mount_timeout', timeout)
  491 
  492                 client.connect()
  493                 ioctx = client.open_ioctx(pool)
  494                 return client, ioctx
  495             except self.rados.Error:
  496                 msg = _("Error connecting to ceph cluster.")
  497                 LOG.exception(msg)
  498                 client.shutdown()
  499                 raise exception.VolumeBackendAPIException(data=msg)
  500 
  501         return _do_conn(pool, remote, timeout)
  502 
  503     def _disconnect_from_rados(self, client, ioctx):
  504         # closing an ioctx cannot raise an exception
  505         ioctx.close()
  506         client.shutdown()
  507 
  508     def _get_backup_snaps(self, rbd_image):
  509         """Get list of any backup snapshots that exist on this volume.
  510 
  511         There should only ever be one but accept all since they need to be
  512         deleted before the volume can be.
  513         """
  514         # NOTE(dosaboy): we do the import here otherwise we get import conflict
  515         # issues between the rbd driver and the ceph backup driver. These
  516         # issues only seem to occur when NOT using them together and are
  517         # triggered when the ceph backup driver imports the rbd volume driver.
  518         from cinder.backup.drivers import ceph
  519         return ceph.CephBackupDriver.get_backup_snaps(rbd_image)
  520 
  521     def _get_mon_addrs(self):
  522         args = ['ceph', 'mon', 'dump', '--format=json']
  523         args.extend(self._ceph_args())
  524         out, _ = self._execute(*args)
  525         lines = out.split('\n')
  526         if lines[0].startswith('dumped monmap epoch'):
  527             lines = lines[1:]
  528         monmap = json.loads('\n'.join(lines))
  529         addrs = [mon['addr'] for mon in monmap['mons']]
  530         hosts = []
  531         ports = []
  532         for addr in addrs:
  533             host_port = addr[:addr.rindex('/')]
  534             host, port = host_port.rsplit(':', 1)
  535             hosts.append(host.strip('[]'))
  536             ports.append(port)
  537         return hosts, ports
  538 
  539     def _get_usage_info(self):
  540         """Calculate provisioned volume space in GiB.
  541 
  542         Stats report should send provisioned size of volumes (snapshot must not
  543         be included) and not the physical size of those volumes.
  544 
  545         We must include all volumes, not only Cinder created volumes, because
  546         Cinder created volumes are reported by the Cinder core code as
  547         allocated_capacity_gb.
  548         """
  549         total_provisioned = 0
  550         with RADOSClient(self) as client:
  551             for t in self.RBDProxy().list(client.ioctx):
  552                 try:
  553                     with RBDVolumeProxy(self, t, read_only=True,
  554                                         client=client.cluster,
  555                                         ioctx=client.ioctx) as v:
  556                         size = v.size()
  557                 except (self.rbd.ImageNotFound, self.rbd.OSError):
  558                     LOG.debug("Image %s is not found.", t)
  559                 else:
  560                     total_provisioned += size
  561 
  562         total_provisioned = math.ceil(float(total_provisioned) / units.Gi)
  563         return total_provisioned
  564 
  565     def _get_pool_stats(self):
  566         """Gets pool free and total capacity in GiB.
  567 
  568         Calculate free and total capacity of the pool based on the pool's
  569         defined quota and pools stats.
  570 
  571         Returns a tuple with (free, total) where they are either unknown or a
  572         real number with a 2 digit precision.
  573         """
  574         pool_name = self.configuration.rbd_pool
  575 
  576         with RADOSClient(self) as client:
  577             ret, df_outbuf, __ = client.cluster.mon_command(
  578                 '{"prefix":"df", "format":"json"}', b'')
  579             if ret:
  580                 LOG.warning('Unable to get rados pool stats.')
  581                 return 'unknown', 'unknown'
  582 
  583             ret, quota_outbuf, __ = client.cluster.mon_command(
  584                 '{"prefix":"osd pool get-quota", "pool": "%s",'
  585                 ' "format":"json"}' % pool_name, b'')
  586             if ret:
  587                 LOG.warning('Unable to get rados pool quotas.')
  588                 return 'unknown', 'unknown'
  589 
  590         df_outbuf = encodeutils.safe_decode(df_outbuf)
  591         df_data = json.loads(df_outbuf)
  592         pool_stats = [pool for pool in df_data['pools']
  593                       if pool['name'] == pool_name][0]['stats']
  594 
  595         quota_outbuf = encodeutils.safe_decode(quota_outbuf)
  596         bytes_quota = json.loads(quota_outbuf)['quota_max_bytes']
  597         # With quota the total is the quota limit and free is quota - used
  598         if bytes_quota:
  599             total_capacity = bytes_quota
  600             free_capacity = max(min(total_capacity - pool_stats['bytes_used'],
  601                                     pool_stats['max_avail']),
  602                                 0)
  603         # Without quota free is pools max available and total is global size
  604         else:
  605             total_capacity = df_data['stats']['total_bytes']
  606             free_capacity = pool_stats['max_avail']
  607 
  608         # If we want dynamic total capacity (default behavior)
  609         if self.configuration.safe_get('report_dynamic_total_capacity'):
  610             total_capacity = free_capacity + pool_stats['bytes_used']
  611 
  612         free_capacity = round((float(free_capacity) / units.Gi), 2)
  613         total_capacity = round((float(total_capacity) / units.Gi), 2)
  614 
  615         return free_capacity, total_capacity
  616 
  617     def _update_volume_stats(self):
  618         location_info = '%s:%s:%s:%s:%s' % (
  619             self.configuration.rbd_cluster_name,
  620             self.configuration.rbd_ceph_conf,
  621             self._get_fsid(),
  622             self.configuration.rbd_user,
  623             self.configuration.rbd_pool)
  624 
  625         stats = {
  626             'vendor_name': 'Open Source',
  627             'driver_version': self.VERSION,
  628             'storage_protocol': 'ceph',
  629             'total_capacity_gb': 'unknown',
  630             'free_capacity_gb': 'unknown',
  631             'reserved_percentage': (
  632                 self.configuration.safe_get('reserved_percentage')),
  633             'multiattach': True,
  634             'thin_provisioning_support': True,
  635             'max_over_subscription_ratio': (
  636                 self.configuration.safe_get('max_over_subscription_ratio')),
  637             'location_info': location_info,
  638             'backend_state': 'down'
  639         }
  640 
  641         backend_name = self.configuration.safe_get('volume_backend_name')
  642         stats['volume_backend_name'] = backend_name or 'RBD'
  643 
  644         stats['replication_enabled'] = self._is_replication_enabled
  645         if self._is_replication_enabled:
  646             stats['replication_targets'] = self._target_names
  647 
  648         try:
  649             free_capacity, total_capacity = self._get_pool_stats()
  650             stats['free_capacity_gb'] = free_capacity
  651             stats['total_capacity_gb'] = total_capacity
  652 
  653             # For exclusive pools let scheduler set provisioned_capacity_gb to
  654             # allocated_capacity_gb, and for non exclusive query the value.
  655             if not self.configuration.safe_get('rbd_exclusive_cinder_pool'):
  656                 total_gbi = self._get_usage_info()
  657                 stats['provisioned_capacity_gb'] = total_gbi
  658 
  659             stats['backend_state'] = 'up'
  660         except self.rados.Error:
  661             # just log and return unknown capacities and let scheduler set
  662             # provisioned_capacity_gb = allocated_capacity_gb
  663             LOG.exception('error refreshing volume stats')
  664         self._stats = stats
  665 
  666     def _get_clone_depth(self, client, volume_name, depth=0):
  667         """Returns the number of ancestral clones of the given volume."""
  668         parent_volume = self.rbd.Image(client.ioctx, volume_name)
  669         try:
  670             _pool, parent, _snap = self._get_clone_info(parent_volume,
  671                                                         volume_name)
  672         finally:
  673             parent_volume.close()
  674 
  675         if not parent:
  676             return depth
  677 
  678         return self._get_clone_depth(client, parent, depth + 1)
  679 
  680     def _extend_if_required(self, volume, src_vref):
  681         """Extends a volume if required
  682 
  683         In case src_vref size is smaller than the size if the requested
  684         new volume call _resize().
  685         """
  686         if volume.size != src_vref.size:
  687             LOG.debug("resize volume '%(dst_vol)s' from %(src_size)d to "
  688                       "%(dst_size)d",
  689                       {'dst_vol': volume.name, 'src_size': src_vref.size,
  690                        'dst_size': volume.size})
  691             self._resize(volume)
  692 
  693     def create_cloned_volume(self, volume, src_vref):
  694         """Create a cloned volume from another volume.
  695 
  696         Since we are cloning from a volume and not a snapshot, we must first
  697         create a snapshot of the source volume.
  698 
  699         The user has the option to limit how long a volume's clone chain can be
  700         by setting rbd_max_clone_depth. If a clone is made of another clone
  701         and that clone has rbd_max_clone_depth clones behind it, the dest
  702         volume will be flattened.
  703         """
  704         src_name = utils.convert_str(src_vref.name)
  705         dest_name = utils.convert_str(volume.name)
  706         clone_snap = "%s.clone_snap" % dest_name
  707 
  708         # Do full copy if requested
  709         if self.configuration.rbd_max_clone_depth <= 0:
  710             with RBDVolumeProxy(self, src_name, read_only=True) as vol:
  711                 vol.copy(vol.ioctx, dest_name)
  712                 self._extend_if_required(volume, src_vref)
  713             return
  714 
  715         # Otherwise do COW clone.
  716         with RADOSClient(self) as client:
  717             src_volume = self.rbd.Image(client.ioctx, src_name)
  718             LOG.debug("creating snapshot='%s'", clone_snap)
  719             try:
  720                 # Create new snapshot of source volume
  721                 src_volume.create_snap(clone_snap)
  722                 src_volume.protect_snap(clone_snap)
  723                 # Now clone source volume snapshot
  724                 LOG.debug("cloning '%(src_vol)s@%(src_snap)s' to "
  725                           "'%(dest)s'",
  726                           {'src_vol': src_name, 'src_snap': clone_snap,
  727                            'dest': dest_name})
  728                 self.RBDProxy().clone(client.ioctx, src_name, clone_snap,
  729                                       client.ioctx, dest_name,
  730                                       features=client.features)
  731             except Exception as e:
  732                 src_volume.unprotect_snap(clone_snap)
  733                 src_volume.remove_snap(clone_snap)
  734                 src_volume.close()
  735                 msg = (_("Failed to clone '%(src_vol)s@%(src_snap)s' to "
  736                          "'%(dest)s', error: %(error)s") %
  737                        {'src_vol': src_name,
  738                         'src_snap': clone_snap,
  739                         'dest': dest_name,
  740                         'error': e})
  741                 LOG.exception(msg)
  742                 raise exception.VolumeBackendAPIException(data=msg)
  743 
  744             depth = self._get_clone_depth(client, src_name)
  745             # If dest volume is a clone and rbd_max_clone_depth reached,
  746             # flatten the dest after cloning. Zero rbd_max_clone_depth means
  747             # volumes are always flattened.
  748             if depth >= self.configuration.rbd_max_clone_depth:
  749                 LOG.info("maximum clone depth (%d) has been reached - "
  750                          "flattening dest volume",
  751                          self.configuration.rbd_max_clone_depth)
  752 
  753                 # Flatten destination volume
  754                 try:
  755                     with RBDVolumeProxy(self, dest_name, client=client,
  756                                         ioctx=client.ioctx) as dest_volume:
  757                         LOG.debug("flattening dest volume %s", dest_name)
  758                         dest_volume.flatten()
  759                 except Exception as e:
  760                     msg = (_("Failed to flatten volume %(volume)s with "
  761                              "error: %(error)s.") %
  762                            {'volume': dest_name,
  763                             'error': e})
  764                     LOG.exception(msg)
  765                     src_volume.close()
  766                     raise exception.VolumeBackendAPIException(data=msg)
  767 
  768                 try:
  769                     # remove temporary snap
  770                     LOG.debug("remove temporary snap %s", clone_snap)
  771                     src_volume.unprotect_snap(clone_snap)
  772                     src_volume.remove_snap(clone_snap)
  773                 except Exception as e:
  774                     msg = (_("Failed to remove temporary snap "
  775                              "%(snap_name)s, error: %(error)s") %
  776                            {'snap_name': clone_snap,
  777                             'error': e})
  778                     LOG.exception(msg)
  779                     src_volume.close()
  780                     raise exception.VolumeBackendAPIException(data=msg)
  781 
  782             try:
  783                 volume_update = self._setup_volume(volume)
  784             except Exception:
  785                 self.RBDProxy().remove(client.ioctx, dest_name)
  786                 src_volume.unprotect_snap(clone_snap)
  787                 src_volume.remove_snap(clone_snap)
  788                 err_msg = (_('Failed to enable image replication'))
  789                 raise exception.ReplicationError(reason=err_msg,
  790                                                  volume_id=volume.id)
  791             finally:
  792                 src_volume.close()
  793 
  794             self._extend_if_required(volume, src_vref)
  795 
  796         LOG.debug("clone created successfully")
  797         return volume_update
  798 
  799     def _enable_replication(self, volume):
  800         """Enable replication for a volume.
  801 
  802         Returns required volume update.
  803         """
  804         vol_name = utils.convert_str(volume.name)
  805         with RBDVolumeProxy(self, vol_name) as image:
  806             had_exclusive_lock = (image.features() &
  807                                   self.RBD_FEATURE_EXCLUSIVE_LOCK)
  808             had_journaling = image.features() & self.RBD_FEATURE_JOURNALING
  809             if not had_exclusive_lock:
  810                 image.update_features(self.RBD_FEATURE_EXCLUSIVE_LOCK,
  811                                       True)
  812             if not had_journaling:
  813                 image.update_features(self.RBD_FEATURE_JOURNALING, True)
  814             image.mirror_image_enable()
  815 
  816         driver_data = self._dumps({
  817             'had_journaling': bool(had_journaling),
  818             'had_exclusive_lock': bool(had_exclusive_lock)
  819         })
  820         return {'replication_status': fields.ReplicationStatus.ENABLED,
  821                 'replication_driver_data': driver_data}
  822 
  823     def _enable_multiattach(self, volume):
  824         vol_name = utils.convert_str(volume.name)
  825         with RBDVolumeProxy(self, vol_name) as image:
  826             image_features = image.features()
  827             change_features = self.MULTIATTACH_EXCLUSIONS & image_features
  828             image.update_features(change_features, False)
  829 
  830         return {'provider_location':
  831                 self._dumps({'saved_features': image_features})}
  832 
  833     def _disable_multiattach(self, volume):
  834         vol_name = utils.convert_str(volume.name)
  835         with RBDVolumeProxy(self, vol_name) as image:
  836             try:
  837                 provider_location = json.loads(volume.provider_location)
  838                 image_features = provider_location['saved_features']
  839                 change_features = self.MULTIATTACH_EXCLUSIONS & image_features
  840                 image.update_features(change_features, True)
  841             except IndexError:
  842                 msg = "Could not find saved image features."
  843                 raise RBDDriverException(reason=msg)
  844             except self.rbd.InvalidArgument:
  845                 msg = "Failed to restore image features."
  846                 raise RBDDriverException(reason=msg)
  847 
  848         return {'provider_location': None}
  849 
  850     def _is_replicated_type(self, volume_type):
  851         try:
  852             extra_specs = volume_type.extra_specs
  853             LOG.debug('extra_specs: %s', extra_specs)
  854             return extra_specs.get(EXTRA_SPECS_REPL_ENABLED) == "<is> True"
  855         except Exception:
  856             LOG.debug('Unable to retrieve extra specs info')
  857             return False
  858 
  859     def _is_multiattach_type(self, volume_type):
  860         try:
  861             extra_specs = volume_type.extra_specs
  862             LOG.debug('extra_specs: %s', extra_specs)
  863             return extra_specs.get(EXTRA_SPECS_MULTIATTACH) == "<is> True"
  864         except Exception:
  865             LOG.debug('Unable to retrieve extra specs info')
  866             return False
  867 
  868     def _setup_volume(self, volume, volume_type=None):
  869 
  870         if volume_type:
  871             had_replication = self._is_replicated_type(volume.volume_type)
  872             had_multiattach = self._is_multiattach_type(volume.volume_type)
  873         else:
  874             had_replication = False
  875             had_multiattach = False
  876             volume_type = volume.volume_type
  877 
  878         want_replication = self._is_replicated_type(volume_type)
  879         want_multiattach = self._is_multiattach_type(volume_type)
  880 
  881         if want_replication and want_multiattach:
  882             msg = _('Replication and Multiattach are mutually exclusive.')
  883             raise RBDDriverException(reason=msg)
  884 
  885         volume_update = dict()
  886 
  887         if want_replication:
  888             if had_multiattach:
  889                 volume_update.update(self._disable_multiattach(volume))
  890             if not had_replication:
  891                 try:
  892                     volume_update.update(self._enable_replication(volume))
  893                 except Exception:
  894                     err_msg = (_('Failed to enable image replication'))
  895                     raise exception.ReplicationError(reason=err_msg,
  896                                                      volume_id=volume.id)
  897         elif had_replication:
  898             try:
  899                 volume_update.update(self._disable_replication(volume))
  900             except Exception:
  901                 err_msg = (_('Failed to disable image replication'))
  902                 raise exception.ReplicationError(reason=err_msg,
  903                                                  volume_id=volume.id)
  904         elif self._is_replication_enabled:
  905             volume_update.update({'replication_status':
  906                                   fields.ReplicationStatus.DISABLED})
  907 
  908         if want_multiattach:
  909             volume_update.update(self._enable_multiattach(volume))
  910         elif had_multiattach:
  911             volume_update.update(self._disable_multiattach(volume))
  912 
  913         return volume_update
  914 
  915     def _create_encrypted_volume(self, volume, context):
  916         """Create an encrypted volume.
  917 
  918         This works by creating an encrypted image locally,
  919         and then uploading it to the volume.
  920         """
  921 
  922         encryption = volume_utils.check_encryption_provider(self.db,
  923                                                             volume,
  924                                                             context)
  925 
  926         # Fetch the key associated with the volume and decode the passphrase
  927         keymgr = key_manager.API(CONF)
  928         key = keymgr.get(context, encryption['encryption_key_id'])
  929         passphrase = binascii.hexlify(key.get_encoded()).decode('utf-8')
  930 
  931         # create a file
  932         tmp_dir = volume_utils.image_conversion_dir()
  933 
  934         with tempfile.NamedTemporaryFile(dir=tmp_dir) as tmp_image:
  935             with tempfile.NamedTemporaryFile(dir=tmp_dir) as tmp_key:
  936                 with open(tmp_key.name, 'w') as f:
  937                     f.write(passphrase)
  938 
  939                 cipher_spec = image_utils.decode_cipher(encryption['cipher'],
  940                                                         encryption['key_size'])
  941 
  942                 create_cmd = (
  943                     'qemu-img', 'create', '-f', 'luks',
  944                     '-o', 'cipher-alg=%(cipher_alg)s,'
  945                     'cipher-mode=%(cipher_mode)s,'
  946                     'ivgen-alg=%(ivgen_alg)s' % cipher_spec,
  947                     '--object', 'secret,id=luks_sec,'
  948                     'format=raw,file=%(passfile)s' % {'passfile':
  949                                                       tmp_key.name},
  950                     '-o', 'key-secret=luks_sec',
  951                     tmp_image.name,
  952                     '%sM' % (volume.size * 1024))
  953                 self._execute(*create_cmd)
  954 
  955             # Copy image into RBD
  956             chunk_size = self.configuration.rbd_store_chunk_size * units.Mi
  957             order = int(math.log(chunk_size, 2))
  958 
  959             cmd = ['rbd', 'import',
  960                    '--pool', self.configuration.rbd_pool,
  961                    '--order', order,
  962                    tmp_image.name, volume.name]
  963             cmd.extend(self._ceph_args())
  964             self._execute(*cmd)
  965 
  966     def create_volume(self, volume):
  967         """Creates a logical volume."""
  968 
  969         if volume.encryption_key_id:
  970             return self._create_encrypted_volume(volume, volume.obj_context)
  971 
  972         size = int(volume.size) * units.Gi
  973 
  974         LOG.debug("creating volume '%s'", volume.name)
  975 
  976         chunk_size = self.configuration.rbd_store_chunk_size * units.Mi
  977         order = int(math.log(chunk_size, 2))
  978         vol_name = utils.convert_str(volume.name)
  979 
  980         with RADOSClient(self) as client:
  981             self.RBDProxy().create(client.ioctx,
  982                                    vol_name,
  983                                    size,
  984                                    order,
  985                                    old_format=False,
  986                                    features=client.features)
  987 
  988         try:
  989             volume_update = self._setup_volume(volume)
  990         except Exception:
  991             with excutils.save_and_reraise_exception():
  992                 LOG.error('Error creating rbd image %(vol)s.',
  993                           {'vol': vol_name})
  994                 self.RBDProxy().remove(client.ioctx, vol_name)
  995 
  996         return volume_update
  997 
  998     def _flatten(self, pool, volume_name):
  999         LOG.debug('flattening %(pool)s/%(img)s',
 1000                   dict(pool=pool, img=volume_name))
 1001         with RBDVolumeProxy(self, volume_name, pool) as vol:
 1002             vol.flatten()
 1003 
 1004     def _clone(self, volume, src_pool, src_image, src_snap):
 1005         LOG.debug('cloning %(pool)s/%(img)s@%(snap)s to %(dst)s',
 1006                   dict(pool=src_pool, img=src_image, snap=src_snap,
 1007                        dst=volume.name))
 1008 
 1009         chunk_size = self.configuration.rbd_store_chunk_size * units.Mi
 1010         order = int(math.log(chunk_size, 2))
 1011         vol_name = utils.convert_str(volume.name)
 1012 
 1013         with RADOSClient(self, src_pool) as src_client:
 1014             with RADOSClient(self) as dest_client:
 1015                 self.RBDProxy().clone(src_client.ioctx,
 1016                                       utils.convert_str(src_image),
 1017                                       utils.convert_str(src_snap),
 1018                                       dest_client.ioctx,
 1019                                       vol_name,
 1020                                       features=src_client.features,
 1021                                       order=order)
 1022 
 1023             try:
 1024                 volume_update = self._setup_volume(volume)
 1025             except Exception:
 1026                 self.RBDProxy().remove(dest_client.ioctx, vol_name)
 1027                 err_msg = (_('Failed to enable image replication'))
 1028                 raise exception.ReplicationError(reason=err_msg,
 1029                                                  volume_id=volume.id)
 1030             return volume_update or {}
 1031 
 1032     def _resize(self, volume, **kwargs):
 1033         size = kwargs.get('size', None)
 1034         if not size:
 1035             size = int(volume.size) * units.Gi
 1036 
 1037         with RBDVolumeProxy(self, volume.name) as vol:
 1038             vol.resize(size)
 1039 
 1040     def create_volume_from_snapshot(self, volume, snapshot):
 1041         """Creates a volume from a snapshot."""
 1042         volume_update = self._clone(volume, self.configuration.rbd_pool,
 1043                                     snapshot.volume_name, snapshot.name)
 1044         if self.configuration.rbd_flatten_volume_from_snapshot:
 1045             self._flatten(self.configuration.rbd_pool, volume.name)
 1046         if int(volume.size):
 1047             self._resize(volume)
 1048 
 1049         self._show_msg_check_clone_v2_api(snapshot.volume_name)
 1050         return volume_update
 1051 
 1052     def _delete_backup_snaps(self, rbd_image):
 1053         backup_snaps = self._get_backup_snaps(rbd_image)
 1054         if backup_snaps:
 1055             for snap in backup_snaps:
 1056                 rbd_image.remove_snap(snap['name'])
 1057         else:
 1058             LOG.debug("volume has no backup snaps")
 1059 
 1060     def _get_clone_info(self, volume, volume_name, snap=None):
 1061         """If volume is a clone, return its parent info.
 1062 
 1063         Returns a tuple of (pool, parent, snap). A snapshot may optionally be
 1064         provided for the case where a cloned volume has been flattened but it's
 1065         snapshot still depends on the parent.
 1066         """
 1067         try:
 1068             if snap:
 1069                 volume.set_snap(snap)
 1070             pool, parent, parent_snap = tuple(volume.parent_info())
 1071             if snap:
 1072                 volume.set_snap(None)
 1073             # Strip the tag off the end of the volume name since it will not be
 1074             # in the snap name.
 1075             if volume_name.endswith('.deleted'):
 1076                 volume_name = volume_name[:-len('.deleted')]
 1077             # Now check the snap name matches.
 1078             if parent_snap == "%s.clone_snap" % volume_name:
 1079                 return pool, parent, parent_snap
 1080         except self.rbd.ImageNotFound:
 1081             LOG.debug("Volume %s is not a clone.", volume_name)
 1082             volume.set_snap(None)
 1083 
 1084         return (None, None, None)
 1085 
 1086     def _get_children_info(self, volume, snap):
 1087         """List children for the given snapshot of a volume(image).
 1088 
 1089         Returns a list of (pool, image).
 1090         """
 1091 
 1092         children_list = []
 1093 
 1094         if snap:
 1095             volume.set_snap(snap)
 1096             children_list = volume.list_children()
 1097             volume.set_snap(None)
 1098 
 1099         return children_list
 1100 
 1101     def _delete_clone_parent_refs(self, client, parent_name, parent_snap):
 1102         """Walk back up the clone chain and delete references.
 1103 
 1104         Deletes references i.e. deleted parent volumes and snapshots.
 1105         """
 1106         parent_rbd = self.rbd.Image(client.ioctx, parent_name)
 1107         parent_has_snaps = False
 1108         try:
 1109             # Check for grandparent
 1110             _pool, g_parent, g_parent_snap = self._get_clone_info(parent_rbd,
 1111                                                                   parent_name,
 1112                                                                   parent_snap)
 1113 
 1114             LOG.debug("deleting parent snapshot %s", parent_snap)
 1115             parent_rbd.unprotect_snap(parent_snap)
 1116             parent_rbd.remove_snap(parent_snap)
 1117 
 1118             parent_has_snaps = bool(list(parent_rbd.list_snaps()))
 1119         finally:
 1120             parent_rbd.close()
 1121 
 1122         # If parent has been deleted in Cinder, delete the silent reference and
 1123         # keep walking up the chain if it is itself a clone.
 1124         if (not parent_has_snaps) and parent_name.endswith('.deleted'):
 1125             LOG.debug("deleting parent %s", parent_name)
 1126             if self.configuration.enable_deferred_deletion:
 1127                 LOG.debug("moving volume %s to trash", parent_name)
 1128                 delay = self.configuration.deferred_deletion_delay
 1129                 self.RBDProxy().trash_move(client.ioctx,
 1130                                            parent_name,
 1131                                            delay)
 1132             else:
 1133                 self.RBDProxy().remove(client.ioctx, parent_name)
 1134 
 1135             # Now move up to grandparent if there is one
 1136             if g_parent:
 1137                 self._delete_clone_parent_refs(client, g_parent, g_parent_snap)
 1138 
 1139     def delete_volume(self, volume):
 1140         """Deletes a logical volume."""
 1141         # NOTE(dosaboy): this was broken by commit cbe1d5f. Ensure names are
 1142         #                utf-8 otherwise librbd will barf.
 1143         volume_name = utils.convert_str(volume.name)
 1144         with RADOSClient(self) as client:
 1145             try:
 1146                 rbd_image = self.rbd.Image(client.ioctx, volume_name)
 1147             except self.rbd.ImageNotFound:
 1148                 LOG.info("volume %s no longer exists in backend",
 1149                          volume_name)
 1150                 return
 1151 
 1152             clone_snap = None
 1153             parent = None
 1154 
 1155             # Ensure any backup snapshots are deleted
 1156             self._delete_backup_snaps(rbd_image)
 1157 
 1158             # If the volume has non-clone snapshots this delete is expected to
 1159             # raise VolumeIsBusy so do so straight away.
 1160             try:
 1161                 snaps = rbd_image.list_snaps()
 1162                 for snap in snaps:
 1163                     if snap['name'].endswith('.clone_snap'):
 1164                         LOG.debug("volume has clone snapshot(s)")
 1165                         # We grab one of these and use it when fetching parent
 1166                         # info in case the volume has been flattened.
 1167                         clone_snap = snap['name']
 1168                         break
 1169 
 1170                     raise exception.VolumeIsBusy(volume_name=volume_name)
 1171 
 1172                 # Determine if this volume is itself a clone
 1173                 _pool, parent, parent_snap = self._get_clone_info(rbd_image,
 1174                                                                   volume_name,
 1175                                                                   clone_snap)
 1176             finally:
 1177                 rbd_image.close()
 1178 
 1179             @utils.retry(self.rbd.ImageBusy,
 1180                          self.configuration.rados_connection_interval,
 1181                          self.configuration.rados_connection_retries)
 1182             def _try_remove_volume(client, volume_name):
 1183                 if self.configuration.enable_deferred_deletion:
 1184                     LOG.debug("moving volume %s to trash", volume_name)
 1185                     delay = self.configuration.deferred_deletion_delay
 1186                     self.RBDProxy().trash_move(client.ioctx,
 1187                                                volume_name,
 1188                                                delay)
 1189                 else:
 1190                     self.RBDProxy().remove(client.ioctx, volume_name)
 1191 
 1192             if clone_snap is None:
 1193                 LOG.debug("deleting rbd volume %s", volume_name)
 1194                 try:
 1195                     _try_remove_volume(client, volume_name)
 1196                 except self.rbd.ImageBusy:
 1197                     msg = (_("ImageBusy error raised while deleting rbd "
 1198                              "volume. This may have been caused by a "
 1199                              "connection from a client that has crashed and, "
 1200                              "if so, may be resolved by retrying the delete "
 1201                              "after 30 seconds has elapsed."))
 1202                     LOG.warning(msg)
 1203                     # Now raise this so that the volume stays available and the
 1204                     # deletion can be retried.
 1205                     raise exception.VolumeIsBusy(msg, volume_name=volume_name)
 1206                 except self.rbd.ImageNotFound:
 1207                     LOG.info("RBD volume %s not found, allowing delete "
 1208                              "operation to proceed.", volume_name)
 1209                     return
 1210 
 1211                 # If it is a clone, walk back up the parent chain deleting
 1212                 # references.
 1213                 if parent:
 1214                     LOG.debug("volume is a clone so cleaning references")
 1215                     self._delete_clone_parent_refs(client, parent, parent_snap)
 1216             else:
 1217                 # If the volume has copy-on-write clones we will not be able to
 1218                 # delete it. Instead we will keep it as a silent volume which
 1219                 # will be deleted when it's snapshot and clones are deleted.
 1220                 new_name = "%s.deleted" % (volume_name)
 1221                 self.RBDProxy().rename(client.ioctx, volume_name, new_name)
 1222 
 1223     def create_snapshot(self, snapshot):
 1224         """Creates an rbd snapshot."""
 1225         with RBDVolumeProxy(self, snapshot.volume_name) as volume:
 1226             snap = utils.convert_str(snapshot.name)
 1227             volume.create_snap(snap)
 1228             volume.protect_snap(snap)
 1229 
 1230     def delete_snapshot(self, snapshot):
 1231         """Deletes an rbd snapshot."""
 1232         # NOTE(dosaboy): this was broken by commit cbe1d5f. Ensure names are
 1233         #                utf-8 otherwise librbd will barf.
 1234         volume_name = utils.convert_str(snapshot.volume_name)
 1235         snap_name = utils.convert_str(snapshot.name)
 1236 
 1237         with RBDVolumeProxy(self, volume_name) as volume:
 1238             try:
 1239                 volume.unprotect_snap(snap_name)
 1240             except self.rbd.InvalidArgument:
 1241                 LOG.info(
 1242                     "InvalidArgument: Unable to unprotect snapshot %s.",
 1243                     snap_name)
 1244             except self.rbd.ImageNotFound:
 1245                 LOG.info(
 1246                     "ImageNotFound: Unable to unprotect snapshot %s.",
 1247                     snap_name)
 1248             except self.rbd.ImageBusy:
 1249                 children_list = self._get_children_info(volume, snap_name)
 1250 
 1251                 if children_list:
 1252                     for (pool, image) in children_list:
 1253                         LOG.info('Image %(pool)s/%(image)s is dependent '
 1254                                  'on the snapshot %(snap)s.',
 1255                                  {'pool': pool,
 1256                                   'image': image,
 1257                                   'snap': snap_name})
 1258 
 1259                 raise exception.SnapshotIsBusy(snapshot_name=snap_name)
 1260             try:
 1261                 volume.remove_snap(snap_name)
 1262             except self.rbd.ImageNotFound:
 1263                 LOG.info("Snapshot %s does not exist in backend.",
 1264                          snap_name)
 1265 
 1266     def snapshot_revert_use_temp_snapshot(self):
 1267         """Disable the use of a temporary snapshot on revert."""
 1268         return False
 1269 
 1270     def revert_to_snapshot(self, context, volume, snapshot):
 1271         """Revert a volume to a given snapshot."""
 1272         # NOTE(rosmaita): The Ceph documentation notes that this operation is
 1273         # inefficient on the backend for large volumes, and that the preferred
 1274         # method of returning to a pre-existing state in Ceph is to clone from
 1275         # a snapshot.
 1276         # So why don't we do something like that here?
 1277         # (a) an end user can do the more efficient operation on their own if
 1278         #     they value speed over the convenience of reverting their existing
 1279         #     volume
 1280         # (b) revert-to-snapshot is properly a backend operation, and should
 1281         #     be handled by the backend -- trying to "fake it" in this driver
 1282         #     is both dishonest and likely to cause subtle bugs
 1283         # (c) the Ceph project undergoes continual improvement.  It may be
 1284         #     the case that there are things an operator can do on the Ceph
 1285         #     side (for example, use BlueStore for the Ceph backend storage)
 1286         #     to improve the efficiency of this operation.
 1287         # Thus, a motivated operator reading this is encouraged to consult
 1288         # the Ceph documentation.
 1289         with RBDVolumeProxy(self, volume.name) as image:
 1290             image.rollback_to_snap(snapshot.name)
 1291 
 1292     def _disable_replication(self, volume):
 1293         """Disable replication on the given volume."""
 1294         vol_name = utils.convert_str(volume.name)
 1295         with RBDVolumeProxy(self, vol_name) as image:
 1296             image.mirror_image_disable(False)
 1297             driver_data = json.loads(volume.replication_driver_data)
 1298             # If 'journaling' and/or 'exclusive-lock' have
 1299             # been enabled in '_enable_replication',
 1300             # they will be disabled here. If not, it will keep
 1301             # what it was before.
 1302             if not driver_data['had_journaling']:
 1303                 image.update_features(self.RBD_FEATURE_JOURNALING, False)
 1304             if not driver_data['had_exclusive_lock']:
 1305                 image.update_features(self.RBD_FEATURE_EXCLUSIVE_LOCK, False)
 1306         return {'replication_status': fields.ReplicationStatus.DISABLED,
 1307                 'replication_driver_data': None}
 1308 
 1309     def retype(self, context, volume, new_type, diff, host):
 1310         """Retype from one volume type to another on the same backend."""
 1311         return True, self._setup_volume(volume, new_type)
 1312 
 1313     def _dumps(self, obj):
 1314         return json.dumps(obj, separators=(',', ':'), sort_keys=True)
 1315 
 1316     def _exec_on_volume(self, volume_name, remote, operation, *args, **kwargs):
 1317         @utils.retry(rbd.ImageBusy,
 1318                      self.configuration.rados_connection_interval,
 1319                      self.configuration.rados_connection_retries)
 1320         def _do_exec():
 1321             timeout = self.configuration.replication_connect_timeout
 1322             with RBDVolumeProxy(self, volume_name, self.configuration.rbd_pool,
 1323                                 remote=remote, timeout=timeout) as rbd_image:
 1324                 return getattr(rbd_image, operation)(*args, **kwargs)
 1325         return _do_exec()
 1326 
 1327     def _failover_volume(self, volume, remote, is_demoted, replication_status):
 1328         """Process failover for a volume.
 1329 
 1330         There are 2 different cases that will return different update values
 1331         for the volume:
 1332 
 1333         - Volume has replication enabled and failover succeeded: Set
 1334           replication status to failed-over.
 1335         - Volume has replication enabled and failover fails: Set status to
 1336           error, replication status to failover-error, and store previous
 1337           status in previous_status field.
 1338         """
 1339         # Failover is allowed when volume has it enabled or it has already
 1340         # failed over, because we may want to do a second failover.
 1341         vol_name = utils.convert_str(volume.name)
 1342         try:
 1343             self._exec_on_volume(vol_name, remote,
 1344                                  'mirror_image_promote', not is_demoted)
 1345 
 1346             return {'volume_id': volume.id,
 1347                     'updates': {'replication_status': replication_status}}
 1348         except Exception as e:
 1349             replication_status = fields.ReplicationStatus.FAILOVER_ERROR
 1350             LOG.error('Failed to failover volume %(volume)s with '
 1351                       'error: %(error)s.',
 1352                       {'volume': volume.name, 'error': e})
 1353 
 1354         # Failover failed
 1355         error_result = {
 1356             'volume_id': volume.id,
 1357             'updates': {
 1358                 'status': 'error',
 1359                 'previous_status': volume.status,
 1360                 'replication_status': replication_status
 1361             }
 1362         }
 1363 
 1364         return error_result
 1365 
 1366     def _demote_volumes(self, volumes, until_failure=True):
 1367         """Try to demote volumes on the current primary cluster."""
 1368         result = []
 1369         try_demoting = True
 1370         for volume in volumes:
 1371             demoted = False
 1372             if try_demoting:
 1373                 vol_name = utils.convert_str(volume.name)
 1374                 try:
 1375                     self._exec_on_volume(vol_name, self._active_config,
 1376                                          'mirror_image_demote')
 1377                     demoted = True
 1378                 except Exception as e:
 1379                     LOG.debug('Failed to demote %(volume)s with error: '
 1380                               '%(error)s.',
 1381                               {'volume': volume.name, 'error': e})
 1382                     try_demoting = not until_failure
 1383             result.append(demoted)
 1384         return result
 1385 
 1386     def _get_failover_target_config(self, secondary_id=None):
 1387         if not secondary_id:
 1388             # In auto mode exclude failback and active
 1389             candidates = set(self._target_names).difference(
 1390                 ('default', self._active_backend_id))
 1391             if not candidates:
 1392                 raise exception.InvalidReplicationTarget(
 1393                     reason=_('RBD: No available failover target host.'))
 1394             secondary_id = candidates.pop()
 1395         return secondary_id, self._get_target_config(secondary_id)
 1396 
 1397     def failover(self, context, volumes, secondary_id=None, groups=None):
 1398         """Failover replicated volumes."""
 1399         LOG.info('RBD driver failover started.')
 1400         if not self._is_replication_enabled:
 1401             raise exception.UnableToFailOver(
 1402                 reason=_('RBD: Replication is not enabled.'))
 1403 
 1404         if secondary_id == 'default':
 1405             replication_status = fields.ReplicationStatus.ENABLED
 1406         else:
 1407             replication_status = fields.ReplicationStatus.FAILED_OVER
 1408 
 1409         secondary_id, remote = self._get_failover_target_config(secondary_id)
 1410 
 1411         # Try to demote the volumes first
 1412         demotion_results = self._demote_volumes(volumes)
 1413 
 1414         # Do the failover taking into consideration if they have been demoted
 1415         updates = [self._failover_volume(volume, remote, is_demoted,
 1416                                          replication_status)
 1417                    for volume, is_demoted in zip(volumes, demotion_results)]
 1418 
 1419         LOG.info('RBD driver failover completed.')
 1420         return secondary_id, updates, []
 1421 
 1422     def failover_completed(self, context, secondary_id=None):
 1423         """Failover to replication target."""
 1424         LOG.info('RBD driver failover completion started.')
 1425         secondary_id, remote = self._get_failover_target_config(secondary_id)
 1426 
 1427         self._active_backend_id = secondary_id
 1428         self._active_config = remote
 1429         LOG.info('RBD driver failover completion completed.')
 1430 
 1431     def failover_host(self, context, volumes, secondary_id=None, groups=None):
 1432         """Failover to replication target.
 1433 
 1434         This function combines calls to failover() and failover_completed() to
 1435         perform failover when Active/Active is not enabled.
 1436         """
 1437         active_backend_id, volume_update_list, group_update_list = (
 1438             self.failover(context, volumes, secondary_id, groups))
 1439         self.failover_completed(context, secondary_id)
 1440         return active_backend_id, volume_update_list, group_update_list
 1441 
 1442     def ensure_export(self, context, volume):
 1443         """Synchronously recreates an export for a logical volume."""
 1444         pass
 1445 
 1446     def create_export(self, context, volume, connector):
 1447         """Exports the volume."""
 1448         pass
 1449 
 1450     def remove_export(self, context, volume):
 1451         """Removes an export for a logical volume."""
 1452         pass
 1453 
 1454     def initialize_connection(self, volume, connector):
 1455         hosts, ports = self._get_mon_addrs()
 1456         name, conf, user, secret_uuid = self._get_config_tuple()
 1457         data = {
 1458             'driver_volume_type': 'rbd',
 1459             'data': {
 1460                 'name': '%s/%s' % (self.configuration.rbd_pool,
 1461                                    volume.name),
 1462                 'hosts': hosts,
 1463                 'ports': ports,
 1464                 'cluster_name': name,
 1465                 'auth_enabled': (user is not None),
 1466                 'auth_username': user,
 1467                 'secret_type': 'ceph',
 1468                 'secret_uuid': secret_uuid,
 1469                 'volume_id': volume.id,
 1470                 "discard": True,
 1471             }
 1472         }
 1473         if self.keyring_data:
 1474             data['data']['keyring'] = self.keyring_data
 1475         LOG.debug('connection data: %s', data)
 1476         return data
 1477 
 1478     def terminate_connection(self, volume, connector, **kwargs):
 1479         pass
 1480 
 1481     def _parse_location(self, location):
 1482         prefix = 'rbd://'
 1483         if not location.startswith(prefix):
 1484             reason = _('Not stored in rbd')
 1485             raise exception.ImageUnacceptable(image_id=location, reason=reason)
 1486         pieces = [urllib.parse.unquote(loc)
 1487                   for loc in location[len(prefix):].split('/')]
 1488         if any(map(lambda p: p == '', pieces)):
 1489             reason = _('Blank components')
 1490             raise exception.ImageUnacceptable(image_id=location, reason=reason)
 1491         if len(pieces) != 4:
 1492             reason = _('Not an rbd snapshot')
 1493             raise exception.ImageUnacceptable(image_id=location, reason=reason)
 1494         return pieces
 1495 
 1496     def _get_fsid(self):
 1497         with RADOSClient(self) as client:
 1498             # Librados's get_fsid is represented as binary
 1499             # in py3 instead of str as it is in py2.
 1500             # This causes problems with cinder rbd
 1501             # driver as we rely on get_fsid return value
 1502             # which should be string, not bytes.
 1503             # Decode binary to str fixes these issues.
 1504             # Fix with encodeutils.safe_decode CAN BE REMOVED
 1505             # after librados's fix will be in stable for some time.
 1506             #
 1507             # More informations:
 1508             # https://bugs.launchpad.net/glance-store/+bug/1816721
 1509             # https://bugs.launchpad.net/cinder/+bug/1816468
 1510             # https://tracker.ceph.com/issues/38381
 1511             return encodeutils.safe_decode(client.cluster.get_fsid())
 1512 
 1513     def _is_cloneable(self, image_location, image_meta):
 1514         try:
 1515             fsid, pool, image, snapshot = self._parse_location(image_location)
 1516         except exception.ImageUnacceptable as e:
 1517             LOG.debug('not cloneable: %s.', e)
 1518             return False
 1519 
 1520         if self._get_fsid() != fsid:
 1521             LOG.debug('%s is in a different ceph cluster.', image_location)
 1522             return False
 1523 
 1524         if image_meta['disk_format'] != 'raw':
 1525             LOG.debug("rbd image clone requires image format to be "
 1526                       "'raw' but image %(image)s is '%(format)s'",
 1527                       {"image": image_location,
 1528                        "format": image_meta['disk_format']})
 1529             return False
 1530 
 1531         # check that we can read the image
 1532         try:
 1533             with RBDVolumeProxy(self, image,
 1534                                 pool=pool,
 1535                                 snapshot=snapshot,
 1536                                 read_only=True):
 1537                 return True
 1538         except self.rbd.Error as e:
 1539             LOG.debug('Unable to open image %(loc)s: %(err)s.',
 1540                       dict(loc=image_location, err=e))
 1541             return False
 1542 
 1543     def clone_image(self, context, volume,
 1544                     image_location, image_meta,
 1545                     image_service):
 1546         if image_location:
 1547             # Note: image_location[0] is glance image direct_url.
 1548             # image_location[1] contains the list of all locations (including
 1549             # direct_url) or None if show_multiple_locations is False in
 1550             # glance configuration.
 1551             if image_location[1]:
 1552                 url_locations = [location['url'] for
 1553                                  location in image_location[1]]
 1554             else:
 1555                 url_locations = [image_location[0]]
 1556 
 1557             # iterate all locations to look for a cloneable one.
 1558             for url_location in url_locations:
 1559                 if url_location and self._is_cloneable(
 1560                         url_location, image_meta):
 1561                     _prefix, pool, image, snapshot = \
 1562                         self._parse_location(url_location)
 1563                     volume_update = self._clone(volume, pool, image, snapshot)
 1564                     volume_update['provider_location'] = None
 1565                     self._resize(volume)
 1566                     return volume_update, True
 1567         return ({}, False)
 1568 
 1569     def copy_image_to_encrypted_volume(self, context, volume, image_service,
 1570                                        image_id):
 1571         self._copy_image_to_volume(context, volume, image_service, image_id,
 1572                                    encrypted=True)
 1573 
 1574     def copy_image_to_volume(self, context, volume, image_service, image_id):
 1575         self._copy_image_to_volume(context, volume, image_service, image_id)
 1576 
 1577     def _encrypt_image(self, context, volume, tmp_dir, src_image_path):
 1578         encryption = volume_utils.check_encryption_provider(
 1579             self.db,
 1580             volume,
 1581             context)
 1582 
 1583         # Fetch the key associated with the volume and decode the passphrase
 1584         keymgr = key_manager.API(CONF)
 1585         key = keymgr.get(context, encryption['encryption_key_id'])
 1586         passphrase = binascii.hexlify(key.get_encoded()).decode('utf-8')
 1587 
 1588         # Decode the dm-crypt style cipher spec into something qemu-img can use
 1589         cipher_spec = image_utils.decode_cipher(encryption['cipher'],
 1590                                                 encryption['key_size'])
 1591 
 1592         tmp_dir = volume_utils.image_conversion_dir()
 1593 
 1594         with tempfile.NamedTemporaryFile(prefix='luks_',
 1595                                          dir=tmp_dir) as pass_file:
 1596             with open(pass_file.name, 'w') as f:
 1597                 f.write(passphrase)
 1598 
 1599             # Convert the raw image to luks
 1600             dest_image_path = src_image_path + '.luks'
 1601             try:
 1602                 image_utils.convert_image(src_image_path, dest_image_path,
 1603                                           'luks', src_format='raw',
 1604                                           cipher_spec=cipher_spec,
 1605                                           passphrase_file=pass_file.name)
 1606 
 1607                 # Replace the original image with the now encrypted image
 1608                 os.rename(dest_image_path, src_image_path)
 1609             finally:
 1610                 fileutils.delete_if_exists(dest_image_path)
 1611 
 1612     def _copy_image_to_volume(self, context, volume, image_service, image_id,
 1613                               encrypted=False):
 1614 
 1615         tmp_dir = volume_utils.image_conversion_dir()
 1616 
 1617         with tempfile.NamedTemporaryFile(dir=tmp_dir) as tmp:
 1618             image_utils.fetch_to_raw(context, image_service, image_id,
 1619                                      tmp.name,
 1620                                      self.configuration.volume_dd_blocksize,
 1621                                      size=volume.size)
 1622 
 1623             if encrypted:
 1624                 self._encrypt_image(context, volume, tmp_dir, tmp.name)
 1625 
 1626             @utils.retry(exception.VolumeIsBusy,
 1627                          self.configuration.rados_connection_interval,
 1628                          self.configuration.rados_connection_retries)
 1629             def _delete_volume(volume):
 1630                 self.delete_volume(volume)
 1631 
 1632             _delete_volume(volume)
 1633 
 1634             chunk_size = self.configuration.rbd_store_chunk_size * units.Mi
 1635             order = int(math.log(chunk_size, 2))
 1636             # keep using the command line import instead of librbd since it
 1637             # detects zeroes to preserve sparseness in the image
 1638             args = ['rbd', 'import',
 1639                     '--pool', self.configuration.rbd_pool,
 1640                     '--order', order,
 1641                     tmp.name, volume.name,
 1642                     '--new-format']
 1643             args.extend(self._ceph_args())
 1644             self._try_execute(*args)
 1645         self._resize(volume)
 1646         # We may need to re-enable replication because we have deleted the
 1647         # original image and created a new one using the command line import.
 1648         try:
 1649             self._setup_volume(volume)
 1650         except Exception:
 1651             err_msg = (_('Failed to enable image replication'))
 1652             raise exception.ReplicationError(reason=err_msg,
 1653                                              volume_id=volume.id)
 1654 
 1655     def copy_volume_to_image(self, context, volume, image_service, image_meta):
 1656         tmp_dir = volume_utils.image_conversion_dir()
 1657         tmp_file = os.path.join(tmp_dir,
 1658                                 volume.name + '-' + image_meta['id'])
 1659         with fileutils.remove_path_on_error(tmp_file):
 1660             args = ['rbd', 'export',
 1661                     '--pool', self.configuration.rbd_pool,
 1662                     volume.name, tmp_file]
 1663             args.extend(self._ceph_args())
 1664             self._try_execute(*args)
 1665             volume_utils.upload_volume(context, image_service,
 1666                                        image_meta, tmp_file,
 1667                                        volume)
 1668         os.unlink(tmp_file)
 1669 
 1670     def extend_volume(self, volume, new_size):
 1671         """Extend an existing volume."""
 1672         old_size = volume.size
 1673 
 1674         try:
 1675             size = int(new_size) * units.Gi
 1676             self._resize(volume, size=size)
 1677         except Exception:
 1678             msg = _('Failed to Extend Volume '
 1679                     '%(volname)s') % {'volname': volume.name}
 1680             LOG.error(msg)
 1681             raise exception.VolumeBackendAPIException(data=msg)
 1682 
 1683         LOG.debug("Extend volume from %(old_size)s GB to %(new_size)s GB.",
 1684                   {'old_size': old_size, 'new_size': new_size})
 1685 
 1686     def manage_existing(self, volume, existing_ref):
 1687         """Manages an existing image.
 1688 
 1689         Renames the image name to match the expected name for the volume.
 1690         Error checking done by manage_existing_get_size is not repeated.
 1691 
 1692         :param volume:
 1693             volume ref info to be set
 1694         :param existing_ref:
 1695             existing_ref is a dictionary of the form:
 1696             {'source-name': <name of rbd image>}
 1697         """
 1698         # Raise an exception if we didn't find a suitable rbd image.
 1699         with RADOSClient(self) as client:
 1700             rbd_name = existing_ref['source-name']
 1701             self.RBDProxy().rename(client.ioctx,
 1702                                    utils.convert_str(rbd_name),
 1703                                    utils.convert_str(volume.name))
 1704 
 1705     def manage_existing_get_size(self, volume, existing_ref):
 1706         """Return size of an existing image for manage_existing.
 1707 
 1708         :param volume:
 1709             volume ref info to be set
 1710         :param existing_ref:
 1711             existing_ref is a dictionary of the form:
 1712             {'source-name': <name of rbd image>}
 1713         """
 1714 
 1715         # Check that the reference is valid
 1716         if 'source-name' not in existing_ref:
 1717             reason = _('Reference must contain source-name element.')
 1718             raise exception.ManageExistingInvalidReference(
 1719                 existing_ref=existing_ref, reason=reason)
 1720 
 1721         rbd_name = utils.convert_str(existing_ref['source-name'])
 1722 
 1723         with RADOSClient(self) as client:
 1724             # Raise an exception if we didn't find a suitable rbd image.
 1725             try:
 1726                 rbd_image = self.rbd.Image(client.ioctx, rbd_name)
 1727             except self.rbd.ImageNotFound:
 1728                 kwargs = {'existing_ref': rbd_name,
 1729                           'reason': 'Specified rbd image does not exist.'}
 1730                 raise exception.ManageExistingInvalidReference(**kwargs)
 1731 
 1732             image_size = rbd_image.size()
 1733             rbd_image.close()
 1734 
 1735             # RBD image size is returned in bytes.  Attempt to parse
 1736             # size as a float and round up to the next integer.
 1737             try:
 1738                 convert_size = int(math.ceil(float(image_size) / units.Gi))
 1739                 return convert_size
 1740             except ValueError:
 1741                 exception_message = (_("Failed to manage existing volume "
 1742                                        "%(name)s, because reported size "
 1743                                        "%(size)s was not a floating-point"
 1744                                        " number.")
 1745                                      % {'name': rbd_name,
 1746                                         'size': image_size})
 1747                 raise exception.VolumeBackendAPIException(
 1748                     data=exception_message)
 1749 
 1750     def _get_image_status(self, image_name):
 1751         args = ['rbd', 'status',
 1752                 '--pool', self.configuration.rbd_pool,
 1753                 '--format=json',
 1754                 image_name]
 1755         args.extend(self._ceph_args())
 1756         out, _ = self._execute(*args)
 1757         return json.loads(out)
 1758 
 1759     def get_manageable_volumes(self, cinder_volumes, marker, limit, offset,
 1760                                sort_keys, sort_dirs):
 1761         manageable_volumes = []
 1762         cinder_ids = [resource['id'] for resource in cinder_volumes]
 1763 
 1764         with RADOSClient(self) as client:
 1765             for image_name in self.RBDProxy().list(client.ioctx):
 1766                 image_id = volume_utils.extract_id_from_volume_name(image_name)
 1767                 with RBDVolumeProxy(self, image_name, read_only=True,
 1768                                     client=client.cluster,
 1769                                     ioctx=client.ioctx) as image:
 1770                     try:
 1771                         image_info = {
 1772                             'reference': {'source-name': image_name},
 1773                             'size': int(math.ceil(
 1774                                 float(image.size()) / units.Gi)),
 1775                             'cinder_id': None,
 1776                             'extra_info': None
 1777                         }
 1778                         if image_id in cinder_ids:
 1779                             image_info['cinder_id'] = image_id
 1780                             image_info['safe_to_manage'] = False
 1781                             image_info['reason_not_safe'] = 'already managed'
 1782                         elif len(self._get_image_status(
 1783                                 image_name)['watchers']) > 0:
 1784                             # If the num of watchers of image is >= 1, then the
 1785                             # image is considered to be used by client(s).
 1786                             image_info['safe_to_manage'] = False
 1787                             image_info['reason_not_safe'] = 'volume in use'
 1788                         elif image_name.endswith('.deleted'):
 1789                             # parent of cloned volume which marked as deleted
 1790                             # should not be manageable.
 1791                             image_info['safe_to_manage'] = False
 1792                             image_info['reason_not_safe'] = (
 1793                                 'volume marked as deleted')
 1794                         else:
 1795                             image_info['safe_to_manage'] = True
 1796                             image_info['reason_not_safe'] = None
 1797                         manageable_volumes.append(image_info)
 1798                     except self.rbd.ImageNotFound:
 1799                         LOG.debug("Image %s is not found.", image_name)
 1800 
 1801         return volume_utils.paginate_entries_list(
 1802             manageable_volumes, marker, limit, offset, sort_keys, sort_dirs)
 1803 
 1804     def unmanage(self, volume):
 1805         pass
 1806 
 1807     def update_migrated_volume(self, ctxt, volume, new_volume,
 1808                                original_volume_status):
 1809         """Return model update from RBD for migrated volume.
 1810 
 1811         This method should rename the back-end volume name(id) on the
 1812         destination host back to its original name(id) on the source host.
 1813 
 1814         :param ctxt: The context used to run the method update_migrated_volume
 1815         :param volume: The original volume that was migrated to this backend
 1816         :param new_volume: The migration volume object that was created on
 1817                            this backend as part of the migration process
 1818         :param original_volume_status: The status of the original volume
 1819         :returns: model_update to update DB with any needed changes
 1820         """
 1821         name_id = None
 1822         provider_location = None
 1823 
 1824         if original_volume_status == 'in-use':
 1825             # The back-end will not be renamed.
 1826             name_id = new_volume['_name_id'] or new_volume['id']
 1827             provider_location = new_volume['provider_location']
 1828             return {'_name_id': name_id,
 1829                     'provider_location': provider_location}
 1830 
 1831         existing_name = CONF.volume_name_template % new_volume.id
 1832         wanted_name = CONF.volume_name_template % volume.id
 1833         with RADOSClient(self) as client:
 1834             try:
 1835                 self.RBDProxy().rename(client.ioctx,
 1836                                        utils.convert_str(existing_name),
 1837                                        utils.convert_str(wanted_name))
 1838             except (self.rbd.ImageNotFound, self.rbd.ImageExists):
 1839                 LOG.error('Unable to rename the logical volume '
 1840                           'for volume %s.', volume.id)
 1841                 # If the rename fails, _name_id should be set to the new
 1842                 # volume id and provider_location should be set to the
 1843                 # one from the new volume as well.
 1844                 name_id = new_volume._name_id or new_volume.id
 1845                 provider_location = new_volume['provider_location']
 1846         return {'_name_id': name_id,
 1847                 'provider_location': provider_location}
 1848 
 1849     def migrate_volume(self, context, volume, host):
 1850 
 1851         refuse_to_migrate = (False, None)
 1852 
 1853         if volume.status not in ('available', 'retyping', 'maintenance',
 1854                                  'in-use'):
 1855             LOG.debug('Only available or in-use volumes can be migrated using '
 1856                       'backend assisted migration. Falling back to generic '
 1857                       'migration.')
 1858             return refuse_to_migrate
 1859 
 1860         if (host['capabilities']['storage_protocol'] != 'ceph'):
 1861             LOG.debug('Source and destination drivers need to be RBD '
 1862                       'to use backend assisted migration. Falling back to '
 1863                       'generic migration.')
 1864             return refuse_to_migrate
 1865 
 1866         loc_info = host['capabilities'].get('location_info')
 1867 
 1868         LOG.debug('Attempting RBD assisted volume migration. volume: %(id)s, '
 1869                   'host: %(host)s, status=%(status)s.',
 1870                   {'id': volume.id, 'host': host, 'status': volume.status})
 1871 
 1872         if not loc_info:
 1873             LOG.debug('Could not find location_info in capabilities reported '
 1874                       'by the destination driver. Falling back to generic '
 1875                       'migration.')
 1876             return refuse_to_migrate
 1877 
 1878         try:
 1879             (rbd_cluster_name, rbd_ceph_conf, rbd_fsid, rbd_user, rbd_pool) = (
 1880                 utils.convert_str(loc_info).split(':'))
 1881         except ValueError:
 1882             LOG.error('Location info needed for backend enabled volume '
 1883                       'migration not in correct format: %s. Falling back to '
 1884                       'generic volume migration.', loc_info)
 1885             return refuse_to_migrate
 1886 
 1887         with linuxrbd.RBDClient(rbd_user, rbd_pool, conffile=rbd_ceph_conf,
 1888                                 rbd_cluster_name=rbd_cluster_name) as target:
 1889             if (rbd_fsid != self._get_fsid()) or \
 1890                     (rbd_fsid != encodeutils.safe_decode(
 1891                         target.client.get_fsid())):
 1892                 LOG.info('Migration between clusters is not supported. '
 1893                          'Falling back to generic migration.')
 1894                 return refuse_to_migrate
 1895 
 1896             if rbd_pool == self.configuration.rbd_pool:
 1897                 LOG.debug('Migration in the same pool, just need to update '
 1898                           "volume's host value to destination host.")
 1899                 return (True, None)
 1900 
 1901             if volume.status == 'in-use':
 1902                 LOG.debug('Migration in-use volume between different pools. '
 1903                           'Falling back to generic migration.')
 1904                 return refuse_to_migrate
 1905 
 1906             with RBDVolumeProxy(self, volume.name, read_only=True) as source:
 1907                 try:
 1908                     source.copy(target.ioctx, volume.name)
 1909                 except Exception:
 1910                     with excutils.save_and_reraise_exception():
 1911                         LOG.error('Error copying rbd image %(vol)s to target '
 1912                                   'pool %(pool)s.',
 1913                                   {'vol': volume.name, 'pool': rbd_pool})
 1914                         self.RBDProxy().remove(target.ioctx, volume.name)
 1915 
 1916         try:
 1917             # If the source fails to delete for some reason, we want to leave
 1918             # the target volume in place in case deleting it might cause a lose
 1919             # of data.
 1920             self.delete_volume(volume)
 1921         except Exception:
 1922             reason = 'Failed to delete migration source volume %s.', volume.id
 1923             raise exception.VolumeMigrationFailed(reason=reason)
 1924 
 1925         LOG.info('Successful RBD assisted volume migration.')
 1926 
 1927         return (True, None)
 1928 
 1929     def manage_existing_snapshot_get_size(self, snapshot, existing_ref):
 1930         """Return size of an existing image for manage_existing.
 1931 
 1932         :param snapshot:
 1933             snapshot ref info to be set
 1934         :param existing_ref:
 1935             existing_ref is a dictionary of the form:
 1936             {'source-name': <name of snapshot>}
 1937         """
 1938         # Check that the reference is valid
 1939         if not isinstance(existing_ref, dict):
 1940             existing_ref = {"source-name": existing_ref}
 1941         if 'source-name' not in existing_ref:
 1942             reason = _('Reference must contain source-name element.')
 1943             raise exception.ManageExistingInvalidReference(
 1944                 existing_ref=existing_ref, reason=reason)
 1945 
 1946         volume_name = utils.convert_str(snapshot.volume_name)
 1947         snapshot_name = utils.convert_str(existing_ref['source-name'])
 1948 
 1949         with RADOSClient(self) as client:
 1950             # Raise an exception if we didn't find a suitable rbd image.
 1951             try:
 1952                 rbd_snapshot = self.rbd.Image(client.ioctx, volume_name,
 1953                                               snapshot=snapshot_name)
 1954             except self.rbd.ImageNotFound:
 1955                 kwargs = {'existing_ref': snapshot_name,
 1956                           'reason': 'Specified snapshot does not exist.'}
 1957                 raise exception.ManageExistingInvalidReference(**kwargs)
 1958 
 1959             snapshot_size = rbd_snapshot.size()
 1960             rbd_snapshot.close()
 1961 
 1962             # RBD image size is returned in bytes.  Attempt to parse
 1963             # size as a float and round up to the next integer.
 1964             try:
 1965                 convert_size = int(math.ceil(float(snapshot_size) / units.Gi))
 1966                 return convert_size
 1967             except ValueError:
 1968                 exception_message = (_("Failed to manage existing snapshot "
 1969                                        "%(name)s, because reported size "
 1970                                        "%(size)s was not a floating-point"
 1971                                        " number.")
 1972                                      % {'name': snapshot_name,
 1973                                         'size': snapshot_size})
 1974                 raise exception.VolumeBackendAPIException(
 1975                     data=exception_message)
 1976 
 1977     def manage_existing_snapshot(self, snapshot, existing_ref):
 1978         """Manages an existing snapshot.
 1979 
 1980         Renames the snapshot name to match the expected name for the snapshot.
 1981         Error checking done by manage_existing_get_size is not repeated.
 1982 
 1983         :param snapshot:
 1984             snapshot ref info to be set
 1985         :param existing_ref:
 1986             existing_ref is a dictionary of the form:
 1987             {'source-name': <name of rbd snapshot>}
 1988         """
 1989         if not isinstance(existing_ref, dict):
 1990             existing_ref = {"source-name": existing_ref}
 1991         volume_name = utils.convert_str(snapshot.volume_name)
 1992         with RBDVolumeProxy(self, volume_name) as volume:
 1993             snapshot_name = existing_ref['source-name']
 1994             volume.rename_snap(utils.convert_str(snapshot_name),
 1995                                utils.convert_str(snapshot.name))
 1996             if not volume.is_protected_snap(snapshot.name):
 1997                 volume.protect_snap(snapshot.name)
 1998 
 1999     def get_manageable_snapshots(self, cinder_snapshots, marker, limit, offset,
 2000                                  sort_keys, sort_dirs):
 2001         """List manageable snapshots on RBD backend."""
 2002         manageable_snapshots = []
 2003         cinder_snapshot_ids = [resource['id'] for resource in cinder_snapshots]
 2004 
 2005         with RADOSClient(self) as client:
 2006             for image_name in self.RBDProxy().list(client.ioctx):
 2007                 with RBDVolumeProxy(self, image_name, read_only=True,
 2008                                     client=client.cluster,
 2009                                     ioctx=client.ioctx) as image:
 2010                     try:
 2011                         for snapshot in image.list_snaps():
 2012                             snapshot_id = (
 2013                                 volume_utils.extract_id_from_snapshot_name(
 2014                                     snapshot['name']))
 2015                             snapshot_info = {
 2016                                 'reference': {'source-name': snapshot['name']},
 2017                                 'size': int(math.ceil(
 2018                                     float(snapshot['size']) / units.Gi)),
 2019                                 'cinder_id': None,
 2020                                 'extra_info': None,
 2021                                 'safe_to_manage': False,
 2022                                 'reason_not_safe': None,
 2023                                 'source_reference': {'source-name': image_name}
 2024                             }
 2025 
 2026                             if snapshot_id in cinder_snapshot_ids:
 2027                                 # Exclude snapshots already managed.
 2028                                 snapshot_info['reason_not_safe'] = (
 2029                                     'already managed')
 2030                                 snapshot_info['cinder_id'] = snapshot_id
 2031                             elif snapshot['name'].endswith('.clone_snap'):
 2032                                 # Exclude clone snapshot.
 2033                                 snapshot_info['reason_not_safe'] = (
 2034                                     'used for clone snap')
 2035                             elif (snapshot['name'].startswith('backup')
 2036                                   and '.snap.' in snapshot['name']):
 2037                                 # Exclude intermediate snapshots created by the
 2038                                 # Ceph backup driver.
 2039                                 snapshot_info['reason_not_safe'] = (
 2040                                     'used for volume backup')
 2041                             else:
 2042                                 snapshot_info['safe_to_manage'] = True
 2043                             manageable_snapshots.append(snapshot_info)
 2044                     except self.rbd.ImageNotFound:
 2045                         LOG.debug("Image %s is not found.", image_name)
 2046 
 2047         return volume_utils.paginate_entries_list(
 2048             manageable_snapshots, marker, limit, offset, sort_keys, sort_dirs)
 2049 
 2050     def unmanage_snapshot(self, snapshot):
 2051         """Removes the specified snapshot from Cinder management."""
 2052         with RBDVolumeProxy(self, snapshot.volume_name) as volume:
 2053             volume.set_snap(snapshot.name)
 2054             children = volume.list_children()
 2055             volume.set_snap(None)
 2056             if not children and volume.is_protected_snap(snapshot.name):
 2057                 volume.unprotect_snap(snapshot.name)
 2058 
 2059     def get_backup_device(self, context, backup):
 2060         """Get a backup device from an existing volume.
 2061 
 2062         To support incremental backups on Ceph to Ceph we don't clone
 2063         the volume.
 2064         """
 2065 
 2066         if not ('backup.drivers.ceph' in backup.service) or backup.snapshot_id:
 2067             return super(RBDDriver, self).get_backup_device(context, backup)
 2068 
 2069         volume = objects.Volume.get_by_id(context, backup.volume_id)
 2070         return (volume, False)