"Fossies" - the Fresh Open Source Software Archive

Member "manila-11.0.1/manila/share/drivers/zfsonlinux/driver.py" (1 Feb 2021, 68263 Bytes) of package /linux/misc/openstack/manila-11.0.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "driver.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 11.0.0_vs_11.0.1.

    1 # Copyright 2016 Mirantis Inc.
    2 # All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 
   16 """
   17 Module with ZFSonLinux share driver that utilizes ZFS filesystem resources
   18 and exports them as shares.
   19 """
   20 
   21 import math
   22 import os
   23 import time
   24 
   25 from oslo_config import cfg
   26 from oslo_log import log
   27 from oslo_utils import importutils
   28 from oslo_utils import strutils
   29 from oslo_utils import timeutils
   30 
   31 from manila.common import constants
   32 from manila import exception
   33 from manila.i18n import _
   34 from manila.share import configuration
   35 from manila.share import driver
   36 from manila.share.drivers.zfsonlinux import utils as zfs_utils
   37 from manila.share.manager import share_manager_opts  # noqa
   38 from manila.share import share_types
   39 from manila.share import utils as share_utils
   40 from manila import utils
   41 
   42 
   43 zfsonlinux_opts = [
   44     cfg.HostAddressOpt(
   45         "zfs_share_export_ip",
   46         required=True,
   47         help="IP to be added to user-facing export location. Required."),
   48     cfg.HostAddressOpt(
   49         "zfs_service_ip",
   50         required=True,
   51         help="IP to be added to admin-facing export location. Required."),
   52     cfg.ListOpt(
   53         "zfs_zpool_list",
   54         required=True,
   55         help="Specify list of zpools that are allowed to be used by backend. "
   56              "Can contain nested datasets. Examples: "
   57              "Without nested dataset: 'zpool_name'. "
   58              "With nested dataset: 'zpool_name/nested_dataset_name'. "
   59              "Required."),
   60     cfg.ListOpt(
   61         "zfs_dataset_creation_options",
   62         help="Define here list of options that should be applied "
   63              "for each dataset creation if needed. Example: "
   64              "compression=gzip,dedup=off. "
   65              "Note that, for secondary replicas option 'readonly' will be set "
   66              "to 'on' and for active replicas to 'off' in any way. "
   67              "Also, 'quota' will be equal to share size. Optional."),
   68     cfg.StrOpt(
   69         "zfs_dataset_name_prefix",
   70         default='manila_share_',
   71         help="Prefix to be used in each dataset name. Optional."),
   72     cfg.StrOpt(
   73         "zfs_dataset_snapshot_name_prefix",
   74         default='manila_share_snapshot_',
   75         help="Prefix to be used in each dataset snapshot name. Optional."),
   76     cfg.BoolOpt(
   77         "zfs_use_ssh",
   78         default=False,
   79         help="Remote ZFS storage hostname that should be used for SSH'ing. "
   80              "Optional."),
   81     cfg.StrOpt(
   82         "zfs_ssh_username",
   83         help="SSH user that will be used in 2 cases: "
   84              "1) By manila-share service in case it is located on different "
   85              "host than its ZFS storage. "
   86              "2) By manila-share services with other ZFS backends that "
   87              "perform replication. "
   88              "It is expected that SSH'ing will be key-based, passwordless. "
   89              "This user should be passwordless sudoer. Optional."),
   90     cfg.StrOpt(
   91         "zfs_ssh_user_password",
   92         secret=True,
   93         help="Password for user that is used for SSH'ing ZFS storage host. "
   94              "Not used for replication operations. They require "
   95              "passwordless SSH access. Optional."),
   96     cfg.StrOpt(
   97         "zfs_ssh_private_key_path",
   98         help="Path to SSH private key that should be used for SSH'ing ZFS "
   99              "storage host. Not used for replication operations. Optional."),
  100     cfg.ListOpt(
  101         "zfs_share_helpers",
  102         required=True,
  103         default=[
  104             "NFS=manila.share.drivers.zfsonlinux.utils.NFSviaZFSHelper",
  105         ],
  106         help="Specify list of share export helpers for ZFS storage. "
  107              "It should look like following: "
  108              "'FOO_protocol=foo.FooClass,BAR_protocol=bar.BarClass'. "
  109              "Required."),
  110     cfg.StrOpt(
  111         "zfs_replica_snapshot_prefix",
  112         required=True,
  113         default="tmp_snapshot_for_replication_",
  114         help="Set snapshot prefix for usage in ZFS replication. Required."),
  115     cfg.StrOpt(
  116         "zfs_migration_snapshot_prefix",
  117         required=True,
  118         default="tmp_snapshot_for_share_migration_",
  119         help="Set snapshot prefix for usage in ZFS migration. Required."),
  120 ]
  121 
  122 CONF = cfg.CONF
  123 CONF.register_opts(zfsonlinux_opts)
  124 LOG = log.getLogger(__name__)
  125 
  126 
  127 def ensure_share_server_not_provided(f):
  128 
  129     def wrap(self, context, *args, **kwargs):
  130         server = kwargs.get(
  131             "share_server", kwargs.get("destination_share_server"))
  132         if server:
  133             raise exception.InvalidInput(
  134                 reason=_("Share server handling is not available. "
  135                          "But 'share_server' was provided. '%s'. "
  136                          "Share network should not be used.") % server.get(
  137                              "id", server))
  138         return f(self, context, *args, **kwargs)
  139 
  140     return wrap
  141 
  142 
  143 def get_backend_configuration(backend_name):
  144     config_stanzas = CONF.list_all_sections()
  145     if backend_name not in config_stanzas:
  146         msg = _("Could not find backend stanza %(backend_name)s in "
  147                 "configuration which is required for share replication and "
  148                 "migration. Available stanzas are %(stanzas)s")
  149         params = {
  150             "stanzas": config_stanzas,
  151             "backend_name": backend_name,
  152         }
  153         raise exception.BadConfigurationException(reason=msg % params)
  154 
  155     config = configuration.Configuration(
  156         driver.share_opts, config_group=backend_name)
  157     config.append_config_values(zfsonlinux_opts)
  158     config.append_config_values(share_manager_opts)
  159     config.append_config_values(driver.ssh_opts)
  160 
  161     return config
  162 
  163 
  164 class ZFSonLinuxShareDriver(zfs_utils.ExecuteMixin, driver.ShareDriver):
  165 
  166     def __init__(self, *args, **kwargs):
  167         super(ZFSonLinuxShareDriver, self).__init__(
  168             [False], *args, config_opts=[zfsonlinux_opts], **kwargs)
  169         self.replica_snapshot_prefix = (
  170             self.configuration.zfs_replica_snapshot_prefix)
  171         self.migration_snapshot_prefix = (
  172             self.configuration.zfs_migration_snapshot_prefix)
  173         self.backend_name = self.configuration.safe_get(
  174             'share_backend_name') or 'ZFSonLinux'
  175         self.zpool_list = self._get_zpool_list()
  176         self.dataset_creation_options = (
  177             self.configuration.zfs_dataset_creation_options)
  178         self.share_export_ip = self.configuration.zfs_share_export_ip
  179         self.service_ip = self.configuration.zfs_service_ip
  180         self.private_storage = kwargs.get('private_storage')
  181         self._helpers = {}
  182 
  183         # Set config based capabilities
  184         self._init_common_capabilities()
  185 
  186         self._shell_executors = {}
  187 
  188     def _get_shell_executor_by_host(self, host):
  189         backend_name = share_utils.extract_host(host, level='backend_name')
  190         if backend_name in CONF.enabled_share_backends:
  191             # Return executor of this host
  192             return self.execute
  193         elif backend_name not in self._shell_executors:
  194             config = get_backend_configuration(backend_name)
  195             self._shell_executors[backend_name] = (
  196                 zfs_utils.get_remote_shell_executor(
  197                     ip=config.zfs_service_ip,
  198                     port=22,
  199                     conn_timeout=config.ssh_conn_timeout,
  200                     login=config.zfs_ssh_username,
  201                     password=config.zfs_ssh_user_password,
  202                     privatekey=config.zfs_ssh_private_key_path,
  203                     max_size=10,
  204                 )
  205             )
  206         # Return executor of remote host
  207         return self._shell_executors[backend_name]
  208 
  209     def _init_common_capabilities(self):
  210         self.common_capabilities = {}
  211         if 'dedup=on' in self.dataset_creation_options:
  212             self.common_capabilities['dedupe'] = [True]
  213         elif 'dedup=off' in self.dataset_creation_options:
  214             self.common_capabilities['dedupe'] = [False]
  215         else:
  216             self.common_capabilities['dedupe'] = [True, False]
  217 
  218         if 'compression=off' in self.dataset_creation_options:
  219             self.common_capabilities['compression'] = [False]
  220         elif any('compression=' in option
  221                  for option in self.dataset_creation_options):
  222             self.common_capabilities['compression'] = [True]
  223         else:
  224             self.common_capabilities['compression'] = [True, False]
  225 
  226         # NOTE(vponomaryov): Driver uses 'quota' approach for
  227         # ZFS dataset. So, we can consider it as
  228         # 'always thin provisioned' because this driver never reserves
  229         # space for dataset.
  230         self.common_capabilities['thin_provisioning'] = [True]
  231         self.common_capabilities['max_over_subscription_ratio'] = (
  232             self.configuration.max_over_subscription_ratio)
  233         self.common_capabilities['qos'] = [False]
  234 
  235     def _get_zpool_list(self):
  236         zpools = []
  237         for zpool in self.configuration.zfs_zpool_list:
  238             zpool_name = zpool.split('/')[0]
  239             if zpool_name in zpools:
  240                 raise exception.BadConfigurationException(
  241                     reason=_("Using the same zpool twice is prohibited. "
  242                              "Duplicate is '%(zpool)s'. List of zpools: "
  243                              "%(zpool_list)s.") % {
  244                                  'zpool': zpool,
  245                                  'zpool_list': ', '.join(
  246                                      self.configuration.zfs_zpool_list)})
  247             zpools.append(zpool_name)
  248         return zpools
  249 
  250     @zfs_utils.zfs_dataset_synchronized
  251     def _delete_dataset_or_snapshot_with_retry(self, name):
  252         """Attempts to destroy some dataset or snapshot with retries."""
  253         # NOTE(vponomaryov): it is possible to see 'dataset is busy' error
  254         # under the load. So, we are ok to perform retry in this case.
  255         mountpoint = self.get_zfs_option(name, 'mountpoint')
  256         if '@' not in name:
  257             # NOTE(vponomaryov): check that dataset has no open files.
  258             start_point = time.time()
  259             while time.time() - start_point < 60:
  260                 try:
  261                     out, err = self.execute('lsof', '-w', mountpoint)
  262                 except exception.ProcessExecutionError:
  263                     # NOTE(vponomaryov): lsof returns code 1 if search
  264                     # didn't give results.
  265                     break
  266                 LOG.debug("Cannot destroy dataset '%(name)s', it has "
  267                           "opened files. Will wait 2 more seconds. "
  268                           "Out: \n%(out)s", {
  269                               'name': name, 'out': out})
  270                 time.sleep(2)
  271             else:
  272                 raise exception.ZFSonLinuxException(
  273                     msg=_("Could not destroy '%s' dataset, "
  274                           "because it had opened files.") % name)
  275 
  276         @utils.retry(exception.ProcessExecutionError)
  277         def _zfs_destroy_with_retry():
  278             """Retry destroying dataset ten times with exponential backoff."""
  279             # NOTE(bswartz): There appears to be a bug in ZFS when creating and
  280             # destroying datasets concurrently where the filesystem remains
  281             # mounted even though ZFS thinks it's unmounted. The most reliable
  282             # workaround I've found is to force the unmount, then attempt the
  283             # destroy, with short pauses around the unmount. (See bug#1546723)
  284             try:
  285                 self.execute('sudo', 'umount', mountpoint)
  286             except exception.ProcessExecutionError:
  287                 # Ignore failed umount, it's normal
  288                 pass
  289             time.sleep(2)
  290 
  291             # NOTE(vponomaryov): Now, when no file usages and mounts of dataset
  292             # exist, destroy dataset.
  293             self.zfs('destroy', '-f', name)
  294 
  295         _zfs_destroy_with_retry()
  296 
  297     def _setup_helpers(self):
  298         """Setups share helper for ZFS backend."""
  299         self._helpers = {}
  300         helpers = self.configuration.zfs_share_helpers
  301         if helpers:
  302             for helper_str in helpers:
  303                 share_proto, __, import_str = helper_str.partition('=')
  304                 helper = importutils.import_class(import_str)
  305                 self._helpers[share_proto.upper()] = helper(
  306                     self.configuration)
  307         else:
  308             raise exception.BadConfigurationException(
  309                 reason=_(
  310                     "No share helpers selected for ZFSonLinux Driver. "
  311                     "Please specify using config option 'zfs_share_helpers'."))
  312 
  313     def _get_share_helper(self, share_proto):
  314         """Returns share helper specific for used share protocol."""
  315         helper = self._helpers.get(share_proto)
  316         if helper:
  317             return helper
  318         else:
  319             raise exception.InvalidShare(
  320                 reason=_("Wrong, unsupported or disabled protocol - "
  321                          "'%s'.") % share_proto)
  322 
  323     def do_setup(self, context):
  324         """Perform basic setup and checks."""
  325         super(ZFSonLinuxShareDriver, self).do_setup(context)
  326         self._setup_helpers()
  327         for ip in (self.share_export_ip, self.service_ip):
  328             if not utils.is_valid_ip_address(ip, 4):
  329                 raise exception.BadConfigurationException(
  330                     reason=_("Wrong IP address provided: "
  331                              "%s") % self.share_export_ip)
  332 
  333         if not self.zpool_list:
  334             raise exception.BadConfigurationException(
  335                 reason=_("No zpools specified for usage: "
  336                          "%s") % self.zpool_list)
  337 
  338         # Make pool mounts shared so that cloned namespaces receive unmounts
  339         # and don't prevent us from unmounting datasets
  340         for zpool in self.configuration.zfs_zpool_list:
  341             self.execute('sudo', 'mount', '--make-rshared', ('/%s' % zpool))
  342 
  343         if self.configuration.zfs_use_ssh:
  344             # Check workability of SSH executor
  345             self.ssh_executor('whoami')
  346 
  347     def _get_pools_info(self):
  348         """Returns info about all pools used by backend."""
  349         pools = []
  350         for zpool in self.zpool_list:
  351             free_size = self.get_zpool_option(zpool, 'free')
  352             free_size = utils.translate_string_size_to_float(free_size)
  353             total_size = self.get_zpool_option(zpool, 'size')
  354             total_size = utils.translate_string_size_to_float(total_size)
  355             pool = {
  356                 'pool_name': zpool,
  357                 'total_capacity_gb': float(total_size),
  358                 'free_capacity_gb': float(free_size),
  359                 'reserved_percentage':
  360                     self.configuration.reserved_share_percentage,
  361             }
  362             pool.update(self.common_capabilities)
  363             if self.configuration.replication_domain:
  364                 pool['replication_type'] = 'readable'
  365             pools.append(pool)
  366         return pools
  367 
  368     def _update_share_stats(self):
  369         """Retrieves share stats info."""
  370         data = {
  371             'share_backend_name': self.backend_name,
  372             'storage_protocol': 'NFS',
  373             'reserved_percentage':
  374                 self.configuration.reserved_share_percentage,
  375             'snapshot_support': True,
  376             'create_share_from_snapshot_support': True,
  377             'driver_name': 'ZFS',
  378             'pools': self._get_pools_info(),
  379         }
  380         if self.configuration.replication_domain:
  381             data['replication_type'] = 'readable'
  382         super(ZFSonLinuxShareDriver, self)._update_share_stats(data)
  383 
  384     def _get_share_name(self, share_id):
  385         """Returns name of dataset used for given share."""
  386         prefix = self.configuration.zfs_dataset_name_prefix or ''
  387         return prefix + share_id.replace('-', '_')
  388 
  389     def _get_snapshot_name(self, snapshot_id):
  390         """Returns name of dataset snapshot used for given share snapshot."""
  391         prefix = self.configuration.zfs_dataset_snapshot_name_prefix or ''
  392         return prefix + snapshot_id.replace('-', '_')
  393 
  394     def _get_dataset_creation_options(self, share, is_readonly=False):
  395         """Returns list of options to be used for dataset creation."""
  396         options = ['quota=%sG' % share['size']]
  397         extra_specs = share_types.get_extra_specs_from_share(share)
  398 
  399         dedupe_set = False
  400         dedupe = extra_specs.get('dedupe')
  401         if dedupe:
  402             dedupe = strutils.bool_from_string(
  403                 dedupe.lower().split(' ')[-1], default=dedupe)
  404             if (dedupe in self.common_capabilities['dedupe']):
  405                 options.append('dedup=%s' % ('on' if dedupe else 'off'))
  406                 dedupe_set = True
  407             else:
  408                 raise exception.ZFSonLinuxException(msg=_(
  409                     "Cannot use requested '%(requested)s' value of 'dedupe' "
  410                     "extra spec. It does not fit allowed value '%(allowed)s' "
  411                     "that is configured for backend.") % {
  412                         'requested': dedupe,
  413                         'allowed': self.common_capabilities['dedupe']})
  414 
  415         compression_set = False
  416         compression_type = extra_specs.get('zfsonlinux:compression')
  417         if compression_type:
  418             if (compression_type == 'off' and
  419                     False in self.common_capabilities['compression']):
  420                 options.append('compression=off')
  421                 compression_set = True
  422             elif (compression_type != 'off' and
  423                     True in self.common_capabilities['compression']):
  424                 options.append('compression=%s' % compression_type)
  425                 compression_set = True
  426             else:
  427                 raise exception.ZFSonLinuxException(msg=_(
  428                     "Cannot use value '%s' of extra spec "
  429                     "'zfsonlinux:compression' because compression is disabled "
  430                     "for this backend. Set extra spec 'compression=True' to "
  431                     "make scheduler pick up appropriate backend."
  432                 ) % compression_type)
  433 
  434         for option in self.dataset_creation_options or []:
  435             if any(v in option for v in (
  436                     'readonly', 'sharenfs', 'sharesmb', 'quota')):
  437                 continue
  438             if 'dedup' in option and dedupe_set is True:
  439                 continue
  440             if 'compression' in option and compression_set is True:
  441                 continue
  442             options.append(option)
  443         if is_readonly:
  444             options.append('readonly=on')
  445         else:
  446             options.append('readonly=off')
  447         return options
  448 
  449     def _get_dataset_name(self, share):
  450         """Returns name of dataset used for given share."""
  451         pool_name = share_utils.extract_host(share['host'], level='pool')
  452 
  453         # Pick pool with nested dataset name if set up
  454         for pool in self.configuration.zfs_zpool_list:
  455             pool_data = pool.split('/')
  456             if (pool_name == pool_data[0] and len(pool_data) > 1):
  457                 pool_name = pool
  458                 if pool_name[-1] == '/':
  459                     pool_name = pool_name[0:-1]
  460                 break
  461 
  462         dataset_name = self._get_share_name(share['id'])
  463         full_dataset_name = '%(pool)s/%(dataset)s' % {
  464             'pool': pool_name, 'dataset': dataset_name}
  465 
  466         return full_dataset_name
  467 
  468     @ensure_share_server_not_provided
  469     def create_share(self, context, share, share_server=None):
  470         """Is called to create a share."""
  471         options = self._get_dataset_creation_options(share, is_readonly=False)
  472         cmd = ['create']
  473         for option in options:
  474             cmd.extend(['-o', option])
  475         dataset_name = self._get_dataset_name(share)
  476         cmd.append(dataset_name)
  477 
  478         ssh_cmd = '%(username)s@%(host)s' % {
  479             'username': self.configuration.zfs_ssh_username,
  480             'host': self.service_ip,
  481         }
  482         pool_name = share_utils.extract_host(share['host'], level='pool')
  483         self.private_storage.update(
  484             share['id'], {
  485                 'entity_type': 'share',
  486                 'dataset_name': dataset_name,
  487                 'ssh_cmd': ssh_cmd,  # used with replication and migration
  488                 'pool_name': pool_name,  # used in replication
  489                 'used_options': ' '.join(options),
  490             }
  491         )
  492 
  493         self.zfs(*cmd)
  494 
  495         return self._get_share_helper(
  496             share['share_proto']).create_exports(dataset_name)
  497 
  498     @ensure_share_server_not_provided
  499     def delete_share(self, context, share, share_server=None):
  500         """Is called to remove a share."""
  501         pool_name = self.private_storage.get(share['id'], 'pool_name')
  502         pool_name = pool_name or share_utils.extract_host(
  503             share["host"], level="pool")
  504         dataset_name = self.private_storage.get(share['id'], 'dataset_name')
  505         if not dataset_name:
  506             dataset_name = self._get_dataset_name(share)
  507 
  508         out, err = self.zfs('list', '-r', pool_name)
  509         data = self.parse_zfs_answer(out)
  510         for datum in data:
  511             if datum['NAME'] != dataset_name:
  512                 continue
  513 
  514             # Delete dataset's snapshots first
  515             out, err = self.zfs('list', '-r', '-t', 'snapshot', pool_name)
  516             snapshots = self.parse_zfs_answer(out)
  517             full_snapshot_prefix = (
  518                 dataset_name + '@')
  519             for snap in snapshots:
  520                 if full_snapshot_prefix in snap['NAME']:
  521                     self._delete_dataset_or_snapshot_with_retry(snap['NAME'])
  522 
  523             self._get_share_helper(
  524                 share['share_proto']).remove_exports(dataset_name)
  525             self._delete_dataset_or_snapshot_with_retry(dataset_name)
  526             break
  527         else:
  528             LOG.warning(
  529                 "Share with '%(id)s' ID and '%(name)s' NAME is "
  530                 "absent on backend. Nothing has been deleted.",
  531                 {'id': share['id'], 'name': dataset_name})
  532         self.private_storage.delete(share['id'])
  533 
  534     @ensure_share_server_not_provided
  535     def create_snapshot(self, context, snapshot, share_server=None):
  536         """Is called to create a snapshot."""
  537         dataset_name = self.private_storage.get(
  538             snapshot['share_instance_id'], 'dataset_name')
  539         snapshot_tag = self._get_snapshot_name(snapshot['id'])
  540         snapshot_name = dataset_name + '@' + snapshot_tag
  541         self.private_storage.update(
  542             snapshot['snapshot_id'], {
  543                 'entity_type': 'snapshot',
  544                 'snapshot_tag': snapshot_tag,
  545             }
  546         )
  547         self.zfs('snapshot', snapshot_name)
  548         return {"provider_location": snapshot_name}
  549 
  550     @ensure_share_server_not_provided
  551     def delete_snapshot(self, context, snapshot, share_server=None):
  552         """Is called to remove a snapshot."""
  553         self._delete_snapshot(context, snapshot)
  554         self.private_storage.delete(snapshot['snapshot_id'])
  555 
  556     def _get_saved_snapshot_name(self, snapshot_instance):
  557         snapshot_tag = self.private_storage.get(
  558             snapshot_instance['snapshot_id'], 'snapshot_tag')
  559         dataset_name = self.private_storage.get(
  560             snapshot_instance['share_instance_id'], 'dataset_name')
  561         snapshot_name = dataset_name + '@' + snapshot_tag
  562         return snapshot_name
  563 
  564     def _delete_snapshot(self, context, snapshot):
  565         snapshot_name = self._get_saved_snapshot_name(snapshot)
  566         out, err = self.zfs('list', '-r', '-t', 'snapshot', snapshot_name)
  567         data = self.parse_zfs_answer(out)
  568         for datum in data:
  569             if datum['NAME'] == snapshot_name:
  570                 self._delete_dataset_or_snapshot_with_retry(snapshot_name)
  571                 break
  572         else:
  573             LOG.warning(
  574                 "Snapshot with '%(id)s' ID and '%(name)s' NAME is "
  575                 "absent on backend. Nothing has been deleted.",
  576                 {'id': snapshot['id'], 'name': snapshot_name})
  577 
  578     @ensure_share_server_not_provided
  579     def create_share_from_snapshot(self, context, share, snapshot,
  580                                    share_server=None, parent_share=None):
  581         """Is called to create a share from snapshot."""
  582         src_backend_name = share_utils.extract_host(
  583             snapshot.share_instance['host'], level='backend_name'
  584         )
  585         src_snapshot_name = self._get_saved_snapshot_name(snapshot)
  586         dataset_name = self._get_dataset_name(share)
  587 
  588         dst_backend_ssh_cmd = '%(username)s@%(host)s' % {
  589             'username': self.configuration.zfs_ssh_username,
  590             'host': self.service_ip,
  591         }
  592 
  593         dst_backend_pool_name = share_utils.extract_host(share['host'],
  594                                                          level='pool')
  595         options = self._get_dataset_creation_options(share, is_readonly=False)
  596 
  597         self.private_storage.update(
  598             share['id'], {
  599                 'entity_type': 'share',
  600                 'dataset_name': dataset_name,
  601                 'ssh_cmd': dst_backend_ssh_cmd,  # used in replication
  602                 'pool_name': dst_backend_pool_name,  # used in replication
  603                 'used_options': options,
  604             }
  605         )
  606 
  607         # NOTE(andrebeltrami): Implementing the support for create share
  608         # from snapshot in different backends in different hosts
  609         src_config = get_backend_configuration(src_backend_name)
  610         src_backend_ssh_cmd = '%(username)s@%(host)s' % {
  611             'username': src_config.zfs_ssh_username,
  612             'host': src_config.zfs_service_ip,
  613         }
  614         self.execute(
  615             # NOTE(vponomaryov): SSH is used as workaround for 'execute'
  616             # implementation restriction that does not support usage
  617             # of '|'.
  618             'ssh', src_backend_ssh_cmd,
  619             'sudo', 'zfs', 'send', '-vD', src_snapshot_name, '|',
  620             'ssh', dst_backend_ssh_cmd,
  621             'sudo', 'zfs', 'receive', '-v', dataset_name,
  622         )
  623 
  624         # Apply options based on used share type that may differ from
  625         # one used for original share.
  626         for option in options:
  627             self.zfs('set', option, dataset_name)
  628 
  629         # Delete with retry as right after creation it may be temporary busy.
  630         self.execute_with_retry(
  631             'sudo', 'zfs', 'destroy',
  632             dataset_name + '@' + src_snapshot_name.split('@')[-1])
  633 
  634         return self._get_share_helper(
  635             share['share_proto']).create_exports(dataset_name)
  636 
  637     def get_pool(self, share):
  638         """Return pool name where the share resides on.
  639 
  640         :param share: The share hosted by the driver.
  641         """
  642         pool_name = share_utils.extract_host(share['host'], level='pool')
  643         return pool_name
  644 
  645     @ensure_share_server_not_provided
  646     def ensure_share(self, context, share, share_server=None):
  647         """Invoked to ensure that given share is exported."""
  648         dataset_name = self.private_storage.get(share['id'], 'dataset_name')
  649         if not dataset_name:
  650             dataset_name = self._get_dataset_name(share)
  651 
  652         pool_name = share_utils.extract_host(share['host'], level='pool')
  653         out, err = self.zfs('list', '-r', pool_name)
  654         data = self.parse_zfs_answer(out)
  655         for datum in data:
  656             if datum['NAME'] == dataset_name:
  657                 ssh_cmd = '%(username)s@%(host)s' % {
  658                     'username': self.configuration.zfs_ssh_username,
  659                     'host': self.service_ip,
  660                 }
  661                 self.private_storage.update(
  662                     share['id'], {'ssh_cmd': ssh_cmd})
  663                 sharenfs = self.get_zfs_option(dataset_name, 'sharenfs')
  664                 if sharenfs != 'off':
  665                     self.zfs('share', dataset_name)
  666                 export_locations = self._get_share_helper(
  667                     share['share_proto']).get_exports(dataset_name)
  668                 return export_locations
  669         else:
  670             raise exception.ShareResourceNotFound(share_id=share['id'])
  671 
  672     def get_network_allocations_number(self):
  673         """ZFS does not handle networking. Return 0."""
  674         return 0
  675 
  676     @ensure_share_server_not_provided
  677     def extend_share(self, share, new_size, share_server=None):
  678         """Extends size of existing share."""
  679         dataset_name = self._get_dataset_name(share)
  680         self.zfs('set', 'quota=%sG' % new_size, dataset_name)
  681 
  682     @ensure_share_server_not_provided
  683     def shrink_share(self, share, new_size, share_server=None):
  684         """Shrinks size of existing share."""
  685         dataset_name = self._get_dataset_name(share)
  686         consumed_space = self.get_zfs_option(dataset_name, 'used')
  687         consumed_space = utils.translate_string_size_to_float(consumed_space)
  688         if consumed_space >= new_size:
  689             raise exception.ShareShrinkingPossibleDataLoss(
  690                 share_id=share['id'])
  691         self.zfs('set', 'quota=%sG' % new_size, dataset_name)
  692 
  693     @ensure_share_server_not_provided
  694     def update_access(self, context, share, access_rules, add_rules,
  695                       delete_rules, share_server=None):
  696         """Updates access rules for given share."""
  697         dataset_name = self._get_dataset_name(share)
  698         executor = self._get_shell_executor_by_host(share['host'])
  699         return self._get_share_helper(share['share_proto']).update_access(
  700             dataset_name, access_rules, add_rules, delete_rules,
  701             executor=executor)
  702 
  703     def manage_existing(self, share, driver_options):
  704         """Manage existing ZFS dataset as manila share.
  705 
  706         ZFSonLinux driver accepts only one driver_option 'size'.
  707         If an administrator provides this option, then such quota will be set
  708         to dataset and used as share size. Otherwise, driver will set quota
  709         equal to nearest bigger rounded integer of usage size.
  710         Driver does not expect mountpoint to be changed (should be equal
  711         to default that is "/%(dataset_name)s").
  712 
  713         :param share: share data
  714         :param driver_options: Empty dict or dict with 'size' option.
  715         :return: dict with share size and its export locations.
  716         """
  717         old_export_location = share["export_locations"][0]["path"]
  718         old_dataset_name = old_export_location.split(":/")[-1]
  719 
  720         scheduled_pool_name = share_utils.extract_host(
  721             share["host"], level="pool")
  722         actual_pool_name = old_dataset_name.split("/")[0]
  723 
  724         new_dataset_name = self._get_dataset_name(share)
  725 
  726         # Calculate quota for managed dataset
  727         quota = driver_options.get("size")
  728         if not quota:
  729             consumed_space = self.get_zfs_option(old_dataset_name, "used")
  730             consumed_space = utils.translate_string_size_to_float(
  731                 consumed_space)
  732             quota = int(consumed_space) + 1
  733         share["size"] = int(quota)
  734 
  735         # Save dataset-specific data in private storage
  736         options = self._get_dataset_creation_options(share, is_readonly=False)
  737         ssh_cmd = "%(username)s@%(host)s" % {
  738             "username": self.configuration.zfs_ssh_username,
  739             "host": self.service_ip,
  740         }
  741 
  742         # Perform checks on requested dataset
  743         if actual_pool_name != scheduled_pool_name:
  744             raise exception.ZFSonLinuxException(
  745                 _("Cannot manage share '%(share_id)s' "
  746                   "(share_instance '%(si_id)s'), because scheduled "
  747                   "pool '%(sch)s' and actual '%(actual)s' differ.") % {
  748                     "share_id": share["share_id"],
  749                     "si_id": share["id"],
  750                     "sch": scheduled_pool_name,
  751                     "actual": actual_pool_name})
  752 
  753         out, err = self.zfs("list", "-r", actual_pool_name)
  754         data = self.parse_zfs_answer(out)
  755         for datum in data:
  756             if datum["NAME"] == old_dataset_name:
  757                 break
  758         else:
  759             raise exception.ZFSonLinuxException(
  760                 _("Cannot manage share '%(share_id)s' "
  761                   "(share_instance '%(si_id)s'), because dataset "
  762                   "'%(dataset)s' not found in zpool '%(zpool)s'.") % {
  763                     "share_id": share["share_id"],
  764                     "si_id": share["id"],
  765                     "dataset": old_dataset_name,
  766                     "zpool": actual_pool_name})
  767 
  768         # Unmount the dataset before attempting to rename and mount
  769         try:
  770             self._unmount_share_with_retry(old_dataset_name)
  771         except exception.ZFSonLinuxException:
  772             msg = _("Unable to unmount share before renaming and re-mounting.")
  773             raise exception.ZFSonLinuxException(message=msg)
  774 
  775         # Rename the dataset and mount with new name
  776         self.zfs_with_retry("rename", old_dataset_name, new_dataset_name)
  777 
  778         try:
  779             self.zfs("mount", new_dataset_name)
  780         except exception.ProcessExecutionError:
  781             # Workaround for bug/1785180
  782             out, err = self.zfs("mount")
  783             mounted = any([new_dataset_name in mountedfs
  784                            for mountedfs in out.splitlines()])
  785             if not mounted:
  786                 raise
  787 
  788         # Apply options to dataset
  789         for option in options:
  790             self.zfs("set", option, new_dataset_name)
  791 
  792         # Get new export locations of renamed dataset
  793         export_locations = self._get_share_helper(
  794             share["share_proto"]).get_exports(new_dataset_name)
  795 
  796         self.private_storage.update(
  797             share["id"], {
  798                 "entity_type": "share",
  799                 "dataset_name": new_dataset_name,
  800                 "ssh_cmd": ssh_cmd,  # used in replication
  801                 "pool_name": actual_pool_name,  # used in replication
  802                 "used_options": " ".join(options),
  803             }
  804         )
  805 
  806         return {"size": share["size"], "export_locations": export_locations}
  807 
  808     def unmanage(self, share):
  809         """Removes the specified share from Manila management."""
  810         self.private_storage.delete(share['id'])
  811 
  812     def manage_existing_snapshot(self, snapshot_instance, driver_options):
  813         """Manage existing share snapshot with manila.
  814 
  815         :param snapshot_instance: SnapshotInstance data
  816         :param driver_options: expects only one optional key 'size'.
  817         :return: dict with share snapshot instance fields for update, example::
  818 
  819             {
  820 
  821                 'size': 1,
  822                 'provider_location': 'path/to/some/dataset@some_snapshot_tag',
  823 
  824             }
  825 
  826         """
  827         snapshot_size = int(driver_options.get("size", 0))
  828         old_provider_location = snapshot_instance.get("provider_location")
  829         old_snapshot_tag = old_provider_location.split("@")[-1]
  830         new_snapshot_tag = self._get_snapshot_name(snapshot_instance["id"])
  831 
  832         self.private_storage.update(
  833             snapshot_instance["snapshot_id"], {
  834                 "entity_type": "snapshot",
  835                 "old_snapshot_tag": old_snapshot_tag,
  836                 "snapshot_tag": new_snapshot_tag,
  837             }
  838         )
  839 
  840         try:
  841             self.zfs("list", "-r", "-t", "snapshot", old_provider_location)
  842         except exception.ProcessExecutionError as e:
  843             raise exception.ManageInvalidShareSnapshot(reason=e.stderr)
  844 
  845         if not snapshot_size:
  846             consumed_space = self.get_zfs_option(old_provider_location, "used")
  847             consumed_space = utils.translate_string_size_to_float(
  848                 consumed_space)
  849             snapshot_size = int(math.ceil(consumed_space))
  850 
  851         dataset_name = self.private_storage.get(
  852             snapshot_instance["share_instance_id"], "dataset_name")
  853         new_provider_location = dataset_name + "@" + new_snapshot_tag
  854 
  855         self.zfs("rename", old_provider_location, new_provider_location)
  856 
  857         return {
  858             "size": snapshot_size,
  859             "provider_location": new_provider_location,
  860         }
  861 
  862     def unmanage_snapshot(self, snapshot_instance):
  863         """Unmanage dataset snapshot."""
  864         self.private_storage.delete(snapshot_instance["snapshot_id"])
  865 
  866     @utils.retry(exception.ZFSonLinuxException)
  867     def _unmount_share_with_retry(self, share_name):
  868         out, err = self.execute("sudo", "mount")
  869         if "%s " % share_name not in out:
  870             return
  871         self.zfs_with_retry("umount", "-f", share_name)
  872         out, err = self.execute("sudo", "mount")
  873         if "%s " % share_name in out:
  874             raise exception.ZFSonLinuxException(
  875                 _("Unable to unmount dataset %s"), share_name)
  876 
  877     def _get_replication_snapshot_prefix(self, replica):
  878         """Returns replica-based snapshot prefix."""
  879         replication_snapshot_prefix = "%s_%s" % (
  880             self.replica_snapshot_prefix, replica['id'].replace('-', '_'))
  881         return replication_snapshot_prefix
  882 
  883     def _get_replication_snapshot_tag(self, replica):
  884         """Returns replica- and time-based snapshot tag."""
  885         current_time = timeutils.utcnow().isoformat()
  886         snapshot_tag = "%s_time_%s" % (
  887             self._get_replication_snapshot_prefix(replica), current_time)
  888         return snapshot_tag
  889 
  890     def _get_active_replica(self, replica_list):
  891         for replica in replica_list:
  892             if replica['replica_state'] == constants.REPLICA_STATE_ACTIVE:
  893                 return replica
  894         msg = _("Active replica not found.")
  895         raise exception.ReplicationException(reason=msg)
  896 
  897     def _get_migration_snapshot_prefix(self, share_instance):
  898         """Returns migration-based snapshot prefix."""
  899         migration_snapshot_prefix = "%s_%s" % (
  900             self.migration_snapshot_prefix,
  901             share_instance['id'].replace('-', '_'))
  902         return migration_snapshot_prefix
  903 
  904     def _get_migration_snapshot_tag(self, share_instance):
  905         """Returns migration- and time-based snapshot tag."""
  906         current_time = timeutils.utcnow().isoformat()
  907         snapshot_tag = "%s_time_%s" % (
  908             self._get_migration_snapshot_prefix(share_instance), current_time)
  909         snapshot_tag = (
  910             snapshot_tag.replace('-', '_').replace('.', '_').replace(':', '_'))
  911         return snapshot_tag
  912 
  913     @ensure_share_server_not_provided
  914     def create_replica(self, context, replica_list, new_replica,
  915                        access_rules, replica_snapshots, share_server=None):
  916         """Replicates the active replica to a new replica on this backend."""
  917         active_replica = self._get_active_replica(replica_list)
  918         src_dataset_name = self.private_storage.get(
  919             active_replica['id'], 'dataset_name')
  920         ssh_to_src_cmd = self.private_storage.get(
  921             active_replica['id'], 'ssh_cmd')
  922         dst_dataset_name = self._get_dataset_name(new_replica)
  923 
  924         ssh_cmd = '%(username)s@%(host)s' % {
  925             'username': self.configuration.zfs_ssh_username,
  926             'host': self.service_ip,
  927         }
  928 
  929         snapshot_tag = self._get_replication_snapshot_tag(new_replica)
  930         src_snapshot_name = (
  931             '%(dataset_name)s@%(snapshot_tag)s' % {
  932                 'snapshot_tag': snapshot_tag,
  933                 'dataset_name': src_dataset_name,
  934             }
  935         )
  936         # Save valuable data to DB
  937         self.private_storage.update(active_replica['id'], {
  938             'repl_snapshot_tag': snapshot_tag,
  939         })
  940         self.private_storage.update(new_replica['id'], {
  941             'entity_type': 'replica',
  942             'replica_type': 'readable',
  943             'dataset_name': dst_dataset_name,
  944             'ssh_cmd': ssh_cmd,
  945             'pool_name': share_utils.extract_host(
  946                 new_replica['host'], level='pool'),
  947             'repl_snapshot_tag': snapshot_tag,
  948         })
  949 
  950         # Create temporary snapshot. It will exist until following replica sync
  951         # After it - new one will appear and so in loop.
  952         self.execute(
  953             'ssh', ssh_to_src_cmd,
  954             'sudo', 'zfs', 'snapshot', src_snapshot_name,
  955         )
  956 
  957         # Send/receive temporary snapshot
  958         out, err = self.execute(
  959             'ssh', ssh_to_src_cmd,
  960             'sudo', 'zfs', 'send', '-vDR', src_snapshot_name, '|',
  961             'ssh', ssh_cmd,
  962             'sudo', 'zfs', 'receive', '-v', dst_dataset_name,
  963         )
  964         msg = ("Info about replica '%(replica_id)s' creation is following: "
  965                "\n%(out)s")
  966         LOG.debug(msg, {'replica_id': new_replica['id'], 'out': out})
  967 
  968         # Make replica readonly
  969         self.zfs('set', 'readonly=on', dst_dataset_name)
  970 
  971         # Set original share size as quota to new replica
  972         self.zfs('set', 'quota=%sG' % active_replica['size'], dst_dataset_name)
  973 
  974         # Apply access rules from original share
  975         self._get_share_helper(new_replica['share_proto']).update_access(
  976             dst_dataset_name, access_rules, add_rules=[], delete_rules=[],
  977             make_all_ro=True)
  978 
  979         return {
  980             'export_locations': self._get_share_helper(
  981                 new_replica['share_proto']).create_exports(dst_dataset_name),
  982             'replica_state': constants.REPLICA_STATE_IN_SYNC,
  983             'access_rules_status': constants.STATUS_ACTIVE,
  984         }
  985 
  986     @ensure_share_server_not_provided
  987     def delete_replica(self, context, replica_list, replica_snapshots, replica,
  988                        share_server=None):
  989         """Deletes a replica. This is called on the destination backend."""
  990         pool_name = self.private_storage.get(replica['id'], 'pool_name')
  991         dataset_name = self.private_storage.get(replica['id'], 'dataset_name')
  992         if not dataset_name:
  993             dataset_name = self._get_dataset_name(replica)
  994 
  995         # Delete dataset's snapshots first
  996         out, err = self.zfs('list', '-r', '-t', 'snapshot', pool_name)
  997         data = self.parse_zfs_answer(out)
  998         for datum in data:
  999             if dataset_name in datum['NAME']:
 1000                 self._delete_dataset_or_snapshot_with_retry(datum['NAME'])
 1001 
 1002         # Now we delete dataset itself
 1003         out, err = self.zfs('list', '-r', pool_name)
 1004         data = self.parse_zfs_answer(out)
 1005         for datum in data:
 1006             if datum['NAME'] == dataset_name:
 1007                 self._get_share_helper(
 1008                     replica['share_proto']).remove_exports(dataset_name)
 1009                 self._delete_dataset_or_snapshot_with_retry(dataset_name)
 1010                 break
 1011         else:
 1012             LOG.warning(
 1013                 "Share replica with '%(id)s' ID and '%(name)s' NAME is "
 1014                 "absent on backend. Nothing has been deleted.",
 1015                 {'id': replica['id'], 'name': dataset_name})
 1016         self.private_storage.delete(replica['id'])
 1017 
 1018     @ensure_share_server_not_provided
 1019     def update_replica_state(self, context, replica_list, replica,
 1020                              access_rules, replica_snapshots,
 1021                              share_server=None):
 1022         """Syncs replica and updates its 'replica_state'."""
 1023         return self._update_replica_state(
 1024             context, replica_list, replica, replica_snapshots, access_rules)
 1025 
 1026     def _update_replica_state(self, context, replica_list, replica,
 1027                               replica_snapshots=None, access_rules=None):
 1028         active_replica = self._get_active_replica(replica_list)
 1029         src_dataset_name = self.private_storage.get(
 1030             active_replica['id'], 'dataset_name')
 1031         ssh_to_src_cmd = self.private_storage.get(
 1032             active_replica['id'], 'ssh_cmd')
 1033         ssh_to_dst_cmd = self.private_storage.get(
 1034             replica['id'], 'ssh_cmd')
 1035         dst_dataset_name = self.private_storage.get(
 1036             replica['id'], 'dataset_name')
 1037 
 1038         # Create temporary snapshot
 1039         previous_snapshot_tag = self.private_storage.get(
 1040             replica['id'], 'repl_snapshot_tag')
 1041         snapshot_tag = self._get_replication_snapshot_tag(replica)
 1042         src_snapshot_name = src_dataset_name + '@' + snapshot_tag
 1043         self.execute(
 1044             'ssh', ssh_to_src_cmd,
 1045             'sudo', 'zfs', 'snapshot', src_snapshot_name,
 1046         )
 1047 
 1048         # Make sure it is readonly
 1049         self.zfs('set', 'readonly=on', dst_dataset_name)
 1050 
 1051         # Send/receive diff between previous snapshot and last one
 1052         out, err = self.execute(
 1053             'ssh', ssh_to_src_cmd,
 1054             'sudo', 'zfs', 'send', '-vDRI',
 1055             previous_snapshot_tag, src_snapshot_name, '|',
 1056             'ssh', ssh_to_dst_cmd,
 1057             'sudo', 'zfs', 'receive', '-vF', dst_dataset_name,
 1058         )
 1059         msg = ("Info about last replica '%(replica_id)s' sync is following: "
 1060                "\n%(out)s")
 1061         LOG.debug(msg, {'replica_id': replica['id'], 'out': out})
 1062 
 1063         # Update DB data that will be used on following replica sync
 1064         self.private_storage.update(active_replica['id'], {
 1065             'repl_snapshot_tag': snapshot_tag,
 1066         })
 1067         self.private_storage.update(
 1068             replica['id'], {'repl_snapshot_tag': snapshot_tag})
 1069 
 1070         # Destroy all snapshots on dst filesystem except referenced ones.
 1071         snap_references = set()
 1072         for repl in replica_list:
 1073             snap_references.add(
 1074                 self.private_storage.get(repl['id'], 'repl_snapshot_tag'))
 1075 
 1076         dst_pool_name = dst_dataset_name.split('/')[0]
 1077         out, err = self.zfs('list', '-r', '-t', 'snapshot', dst_pool_name)
 1078         data = self.parse_zfs_answer(out)
 1079         for datum in data:
 1080             if (dst_dataset_name in datum['NAME'] and
 1081                     '@' + self.replica_snapshot_prefix in datum['NAME'] and
 1082                     datum['NAME'].split('@')[-1] not in snap_references):
 1083                 self._delete_dataset_or_snapshot_with_retry(datum['NAME'])
 1084 
 1085         # Destroy all snapshots on src filesystem except referenced ones.
 1086         src_pool_name = src_snapshot_name.split('/')[0]
 1087         out, err = self.execute(
 1088             'ssh', ssh_to_src_cmd,
 1089             'sudo', 'zfs', 'list', '-r', '-t', 'snapshot', src_pool_name,
 1090         )
 1091         data = self.parse_zfs_answer(out)
 1092         full_src_snapshot_prefix = (
 1093             src_dataset_name + '@' +
 1094             self._get_replication_snapshot_prefix(replica))
 1095         for datum in data:
 1096             if (full_src_snapshot_prefix in datum['NAME'] and
 1097                     datum['NAME'].split('@')[-1] not in snap_references):
 1098                 self.execute_with_retry(
 1099                     'ssh', ssh_to_src_cmd,
 1100                     'sudo', 'zfs', 'destroy', '-f', datum['NAME'],
 1101                 )
 1102 
 1103         if access_rules:
 1104             # Apply access rules from original share
 1105             # TODO(vponomaryov): we should remove somehow rules that were
 1106             # deleted on active replica after creation of secondary replica.
 1107             # For the moment there will be difference and it can be considered
 1108             # as a bug.
 1109             self._get_share_helper(replica['share_proto']).update_access(
 1110                 dst_dataset_name, access_rules, add_rules=[], delete_rules=[],
 1111                 make_all_ro=True)
 1112 
 1113         # Return results
 1114         return constants.REPLICA_STATE_IN_SYNC
 1115 
 1116     @ensure_share_server_not_provided
 1117     def promote_replica(self, context, replica_list, replica, access_rules,
 1118                         share_server=None):
 1119         """Promotes secondary replica to active and active to secondary."""
 1120         active_replica = self._get_active_replica(replica_list)
 1121         src_dataset_name = self.private_storage.get(
 1122             active_replica['id'], 'dataset_name')
 1123         ssh_to_src_cmd = self.private_storage.get(
 1124             active_replica['id'], 'ssh_cmd')
 1125         dst_dataset_name = self.private_storage.get(
 1126             replica['id'], 'dataset_name')
 1127         replica_dict = {
 1128             r['id']: {
 1129                 'id': r['id'],
 1130                 # NOTE(vponomaryov): access rules will be updated in next
 1131                 # 'sync' operation.
 1132                 'access_rules_status': constants.SHARE_INSTANCE_RULES_SYNCING,
 1133             }
 1134             for r in replica_list
 1135         }
 1136         try:
 1137             # Mark currently active replica as readonly
 1138             self.execute(
 1139                 'ssh', ssh_to_src_cmd,
 1140                 'set', 'readonly=on', src_dataset_name,
 1141             )
 1142 
 1143             # Create temporary snapshot of currently active replica
 1144             snapshot_tag = self._get_replication_snapshot_tag(active_replica)
 1145             src_snapshot_name = src_dataset_name + '@' + snapshot_tag
 1146             self.execute(
 1147                 'ssh', ssh_to_src_cmd,
 1148                 'sudo', 'zfs', 'snapshot', src_snapshot_name,
 1149             )
 1150 
 1151             # Apply temporary snapshot to all replicas
 1152             for repl in replica_list:
 1153                 if repl['replica_state'] == constants.REPLICA_STATE_ACTIVE:
 1154                     continue
 1155                 previous_snapshot_tag = self.private_storage.get(
 1156                     repl['id'], 'repl_snapshot_tag')
 1157                 dataset_name = self.private_storage.get(
 1158                     repl['id'], 'dataset_name')
 1159                 ssh_to_dst_cmd = self.private_storage.get(
 1160                     repl['id'], 'ssh_cmd')
 1161 
 1162                 try:
 1163                     # Send/receive diff between previous snapshot and last one
 1164                     out, err = self.execute(
 1165                         'ssh', ssh_to_src_cmd,
 1166                         'sudo', 'zfs', 'send', '-vDRI',
 1167                         previous_snapshot_tag, src_snapshot_name, '|',
 1168                         'ssh', ssh_to_dst_cmd,
 1169                         'sudo', 'zfs', 'receive', '-vF', dataset_name,
 1170                     )
 1171                 except exception.ProcessExecutionError as e:
 1172                     LOG.warning("Failed to sync replica %(id)s. %(e)s",
 1173                                 {'id': repl['id'], 'e': e})
 1174                     replica_dict[repl['id']]['replica_state'] = (
 1175                         constants.REPLICA_STATE_OUT_OF_SYNC)
 1176                     continue
 1177 
 1178                 msg = ("Info about last replica '%(replica_id)s' "
 1179                        "sync is following: \n%(out)s")
 1180                 LOG.debug(msg, {'replica_id': repl['id'], 'out': out})
 1181 
 1182                 # Update latest replication snapshot for replica
 1183                 self.private_storage.update(
 1184                     repl['id'], {'repl_snapshot_tag': snapshot_tag})
 1185 
 1186             # Update latest replication snapshot for currently active replica
 1187             self.private_storage.update(
 1188                 active_replica['id'], {'repl_snapshot_tag': snapshot_tag})
 1189 
 1190             replica_dict[active_replica['id']]['replica_state'] = (
 1191                 constants.REPLICA_STATE_IN_SYNC)
 1192         except Exception as e:
 1193             LOG.warning(
 1194                 "Failed to update currently active replica. \n%s", e)
 1195 
 1196             replica_dict[active_replica['id']]['replica_state'] = (
 1197                 constants.REPLICA_STATE_OUT_OF_SYNC)
 1198 
 1199             # Create temporary snapshot of new replica and sync it with other
 1200             # secondary replicas.
 1201             snapshot_tag = self._get_replication_snapshot_tag(replica)
 1202             src_snapshot_name = dst_dataset_name + '@' + snapshot_tag
 1203             ssh_to_src_cmd = self.private_storage.get(replica['id'], 'ssh_cmd')
 1204             self.zfs('snapshot', src_snapshot_name)
 1205             for repl in replica_list:
 1206                 if (repl['replica_state'] == constants.REPLICA_STATE_ACTIVE or
 1207                         repl['id'] == replica['id']):
 1208                     continue
 1209                 previous_snapshot_tag = self.private_storage.get(
 1210                     repl['id'], 'repl_snapshot_tag')
 1211                 dataset_name = self.private_storage.get(
 1212                     repl['id'], 'dataset_name')
 1213                 ssh_to_dst_cmd = self.private_storage.get(
 1214                     repl['id'], 'ssh_cmd')
 1215 
 1216                 try:
 1217                     # Send/receive diff between previous snapshot and last one
 1218                     out, err = self.execute(
 1219                         'ssh', ssh_to_src_cmd,
 1220                         'sudo', 'zfs', 'send', '-vDRI',
 1221                         previous_snapshot_tag, src_snapshot_name, '|',
 1222                         'ssh', ssh_to_dst_cmd,
 1223                         'sudo', 'zfs', 'receive', '-vF', dataset_name,
 1224                     )
 1225                 except exception.ProcessExecutionError as e:
 1226                     LOG.warning("Failed to sync replica %(id)s. %(e)s",
 1227                                 {'id': repl['id'], 'e': e})
 1228                     replica_dict[repl['id']]['replica_state'] = (
 1229                         constants.REPLICA_STATE_OUT_OF_SYNC)
 1230                     continue
 1231 
 1232                 msg = ("Info about last replica '%(replica_id)s' "
 1233                        "sync is following: \n%(out)s")
 1234                 LOG.debug(msg, {'replica_id': repl['id'], 'out': out})
 1235 
 1236                 # Update latest replication snapshot for replica
 1237                 self.private_storage.update(
 1238                     repl['id'], {'repl_snapshot_tag': snapshot_tag})
 1239 
 1240             # Update latest replication snapshot for new active replica
 1241             self.private_storage.update(
 1242                 replica['id'], {'repl_snapshot_tag': snapshot_tag})
 1243 
 1244         replica_dict[replica['id']]['replica_state'] = (
 1245             constants.REPLICA_STATE_ACTIVE)
 1246 
 1247         self._get_share_helper(replica['share_proto']).update_access(
 1248             dst_dataset_name, access_rules, add_rules=[], delete_rules=[])
 1249 
 1250         replica_dict[replica['id']]['access_rules_status'] = (
 1251             constants.STATUS_ACTIVE)
 1252 
 1253         self.zfs('set', 'readonly=off', dst_dataset_name)
 1254 
 1255         return list(replica_dict.values())
 1256 
 1257     @ensure_share_server_not_provided
 1258     def create_replicated_snapshot(self, context, replica_list,
 1259                                    replica_snapshots, share_server=None):
 1260         """Create a snapshot and update across the replicas."""
 1261         active_replica = self._get_active_replica(replica_list)
 1262         src_dataset_name = self.private_storage.get(
 1263             active_replica['id'], 'dataset_name')
 1264         ssh_to_src_cmd = self.private_storage.get(
 1265             active_replica['id'], 'ssh_cmd')
 1266         replica_snapshots_dict = {
 1267             si['id']: {'id': si['id']} for si in replica_snapshots}
 1268 
 1269         active_snapshot_instance_id = [
 1270             si['id'] for si in replica_snapshots
 1271             if si['share_instance_id'] == active_replica['id']][0]
 1272         snapshot_tag = self._get_snapshot_name(active_snapshot_instance_id)
 1273         # Replication should not be dependent on manually created snapshots
 1274         # so, create additional one, newer, that will be used for replication
 1275         # synchronizations.
 1276         repl_snapshot_tag = self._get_replication_snapshot_tag(active_replica)
 1277         src_snapshot_name = src_dataset_name + '@' + repl_snapshot_tag
 1278 
 1279         self.private_storage.update(
 1280             replica_snapshots[0]['snapshot_id'], {
 1281                 'entity_type': 'snapshot',
 1282                 'snapshot_tag': snapshot_tag,
 1283             }
 1284         )
 1285         for tag in (snapshot_tag, repl_snapshot_tag):
 1286             self.execute(
 1287                 'ssh', ssh_to_src_cmd,
 1288                 'sudo', 'zfs', 'snapshot', src_dataset_name + '@' + tag,
 1289             )
 1290 
 1291         # Populate snapshot to all replicas
 1292         for replica_snapshot in replica_snapshots:
 1293             replica_id = replica_snapshot['share_instance_id']
 1294             if replica_id == active_replica['id']:
 1295                 replica_snapshots_dict[replica_snapshot['id']]['status'] = (
 1296                     constants.STATUS_AVAILABLE)
 1297                 continue
 1298             previous_snapshot_tag = self.private_storage.get(
 1299                 replica_id, 'repl_snapshot_tag')
 1300             dst_dataset_name = self.private_storage.get(
 1301                 replica_id, 'dataset_name')
 1302             ssh_to_dst_cmd = self.private_storage.get(replica_id, 'ssh_cmd')
 1303 
 1304             try:
 1305                 # Send/receive diff between previous snapshot and last one
 1306                 out, err = self.execute(
 1307                     'ssh', ssh_to_src_cmd,
 1308                     'sudo', 'zfs', 'send', '-vDRI',
 1309                     previous_snapshot_tag, src_snapshot_name, '|',
 1310                     'ssh', ssh_to_dst_cmd,
 1311                     'sudo', 'zfs', 'receive', '-vF', dst_dataset_name,
 1312                 )
 1313             except exception.ProcessExecutionError as e:
 1314                 LOG.warning(
 1315                     "Failed to sync snapshot instance %(id)s. %(e)s",
 1316                     {'id': replica_snapshot['id'], 'e': e})
 1317                 replica_snapshots_dict[replica_snapshot['id']]['status'] = (
 1318                     constants.STATUS_ERROR)
 1319                 continue
 1320 
 1321             replica_snapshots_dict[replica_snapshot['id']]['status'] = (
 1322                 constants.STATUS_AVAILABLE)
 1323 
 1324             msg = ("Info about last replica '%(replica_id)s' "
 1325                    "sync is following: \n%(out)s")
 1326             LOG.debug(msg, {'replica_id': replica_id, 'out': out})
 1327 
 1328             # Update latest replication snapshot for replica
 1329             self.private_storage.update(
 1330                 replica_id, {'repl_snapshot_tag': repl_snapshot_tag})
 1331 
 1332         # Update latest replication snapshot for currently active replica
 1333         self.private_storage.update(
 1334             active_replica['id'], {'repl_snapshot_tag': repl_snapshot_tag})
 1335 
 1336         return list(replica_snapshots_dict.values())
 1337 
 1338     @ensure_share_server_not_provided
 1339     def delete_replicated_snapshot(self, context, replica_list,
 1340                                    replica_snapshots, share_server=None):
 1341         """Delete a snapshot by deleting its instances across the replicas."""
 1342         active_replica = self._get_active_replica(replica_list)
 1343         replica_snapshots_dict = {
 1344             si['id']: {'id': si['id']} for si in replica_snapshots}
 1345 
 1346         for replica_snapshot in replica_snapshots:
 1347             replica_id = replica_snapshot['share_instance_id']
 1348             snapshot_name = self._get_saved_snapshot_name(replica_snapshot)
 1349             if active_replica['id'] == replica_id:
 1350                 self._delete_snapshot(context, replica_snapshot)
 1351                 replica_snapshots_dict[replica_snapshot['id']]['status'] = (
 1352                     constants.STATUS_DELETED)
 1353                 continue
 1354             ssh_cmd = self.private_storage.get(replica_id, 'ssh_cmd')
 1355             out, err = self.execute(
 1356                 'ssh', ssh_cmd,
 1357                 'sudo', 'zfs', 'list', '-r', '-t', 'snapshot', snapshot_name,
 1358             )
 1359             data = self.parse_zfs_answer(out)
 1360             for datum in data:
 1361                 if datum['NAME'] != snapshot_name:
 1362                     continue
 1363                 self.execute_with_retry(
 1364                     'ssh', ssh_cmd,
 1365                     'sudo', 'zfs', 'destroy', '-f', datum['NAME'],
 1366                 )
 1367 
 1368             self.private_storage.delete(replica_snapshot['id'])
 1369             replica_snapshots_dict[replica_snapshot['id']]['status'] = (
 1370                 constants.STATUS_DELETED)
 1371 
 1372         self.private_storage.delete(replica_snapshot['snapshot_id'])
 1373         return list(replica_snapshots_dict.values())
 1374 
 1375     @ensure_share_server_not_provided
 1376     def update_replicated_snapshot(self, context, replica_list,
 1377                                    share_replica, replica_snapshots,
 1378                                    replica_snapshot, share_server=None):
 1379         """Update the status of a snapshot instance that lives on a replica."""
 1380 
 1381         self._update_replica_state(context, replica_list, share_replica)
 1382 
 1383         snapshot_name = self._get_saved_snapshot_name(replica_snapshot)
 1384 
 1385         out, err = self.zfs('list', '-r', '-t', 'snapshot', snapshot_name)
 1386         data = self.parse_zfs_answer(out)
 1387         snapshot_found = False
 1388         for datum in data:
 1389             if datum['NAME'] == snapshot_name:
 1390                 snapshot_found = True
 1391                 break
 1392         return_dict = {'id': replica_snapshot['id']}
 1393         if snapshot_found:
 1394             return_dict.update({'status': constants.STATUS_AVAILABLE})
 1395         else:
 1396             return_dict.update({'status': constants.STATUS_ERROR})
 1397 
 1398         return return_dict
 1399 
 1400     @ensure_share_server_not_provided
 1401     def migration_check_compatibility(
 1402             self, context, source_share, destination_share,
 1403             share_server=None, destination_share_server=None):
 1404         """Is called to test compatibility with destination backend."""
 1405         backend_name = share_utils.extract_host(
 1406             destination_share['host'], level='backend_name')
 1407         config = get_backend_configuration(backend_name)
 1408         compatible = self.configuration.share_driver == config.share_driver
 1409         return {
 1410             'compatible': compatible,
 1411             'writable': False,
 1412             'preserve_metadata': True,
 1413             'nondisruptive': True,
 1414         }
 1415 
 1416     @ensure_share_server_not_provided
 1417     def migration_start(
 1418             self, context, source_share, destination_share, source_snapshots,
 1419             snapshot_mappings, share_server=None,
 1420             destination_share_server=None):
 1421         """Is called to start share migration."""
 1422 
 1423         src_dataset_name = self.private_storage.get(
 1424             source_share['id'], 'dataset_name')
 1425         dst_dataset_name = self._get_dataset_name(destination_share)
 1426         backend_name = share_utils.extract_host(
 1427             destination_share['host'], level='backend_name')
 1428         ssh_cmd = '%(username)s@%(host)s' % {
 1429             'username': self.configuration.zfs_ssh_username,
 1430             'host': self.configuration.zfs_service_ip,
 1431         }
 1432         config = get_backend_configuration(backend_name)
 1433         remote_ssh_cmd = '%(username)s@%(host)s' % {
 1434             'username': config.zfs_ssh_username,
 1435             'host': config.zfs_service_ip,
 1436         }
 1437         snapshot_tag = self._get_migration_snapshot_tag(destination_share)
 1438         src_snapshot_name = (
 1439             '%(dataset_name)s@%(snapshot_tag)s' % {
 1440                 'snapshot_tag': snapshot_tag,
 1441                 'dataset_name': src_dataset_name,
 1442             }
 1443         )
 1444 
 1445         # Save valuable data to DB
 1446         self.private_storage.update(source_share['id'], {
 1447             'migr_snapshot_tag': snapshot_tag,
 1448         })
 1449         self.private_storage.update(destination_share['id'], {
 1450             'entity_type': 'share',
 1451             'dataset_name': dst_dataset_name,
 1452             'ssh_cmd': remote_ssh_cmd,
 1453             'pool_name': share_utils.extract_host(
 1454                 destination_share['host'], level='pool'),
 1455             'migr_snapshot_tag': snapshot_tag,
 1456         })
 1457 
 1458         # Create temporary snapshot on src host.
 1459         self.execute('sudo', 'zfs', 'snapshot', src_snapshot_name)
 1460 
 1461         # Send/receive temporary snapshot
 1462         cmd = (
 1463             'ssh ' + ssh_cmd + ' '
 1464             'sudo zfs send -vDR ' + src_snapshot_name + ' '
 1465             '| ssh ' + remote_ssh_cmd + ' '
 1466             'sudo zfs receive -v ' + dst_dataset_name
 1467         )
 1468         filename = dst_dataset_name.replace('/', '_')
 1469         with utils.tempdir() as tmpdir:
 1470             tmpfilename = os.path.join(tmpdir, '%s.sh' % filename)
 1471             with open(tmpfilename, "w") as migr_script:
 1472                 migr_script.write(cmd)
 1473             self.execute('sudo', 'chmod', '755', tmpfilename)
 1474             self.execute('nohup', tmpfilename, '&')
 1475 
 1476     @ensure_share_server_not_provided
 1477     def migration_continue(
 1478             self, context, source_share, destination_share, source_snapshots,
 1479             snapshot_mappings, share_server=None,
 1480             destination_share_server=None):
 1481         """Is called in source share's backend to continue migration."""
 1482 
 1483         snapshot_tag = self.private_storage.get(
 1484             destination_share['id'], 'migr_snapshot_tag')
 1485 
 1486         out, err = self.execute('ps', 'aux')
 1487         if not '@%s' % snapshot_tag in out:
 1488             dst_dataset_name = self.private_storage.get(
 1489                 destination_share['id'], 'dataset_name')
 1490             try:
 1491                 self.execute(
 1492                     'sudo', 'zfs', 'get', 'quota', dst_dataset_name,
 1493                     executor=self._get_shell_executor_by_host(
 1494                         destination_share['host']),
 1495                 )
 1496                 return True
 1497             except exception.ProcessExecutionError as e:
 1498                 raise exception.ZFSonLinuxException(msg=_(
 1499                     'Migration process is absent and dst dataset '
 1500                     'returned following error: %s') % e)
 1501 
 1502     @ensure_share_server_not_provided
 1503     def migration_complete(
 1504             self, context, source_share, destination_share, source_snapshots,
 1505             snapshot_mappings, share_server=None,
 1506             destination_share_server=None):
 1507         """Is called to perform 2nd phase of driver migration of a given share.
 1508 
 1509         """
 1510         dst_dataset_name = self.private_storage.get(
 1511             destination_share['id'], 'dataset_name')
 1512         snapshot_tag = self.private_storage.get(
 1513             destination_share['id'], 'migr_snapshot_tag')
 1514         dst_snapshot_name = (
 1515             '%(dataset_name)s@%(snapshot_tag)s' % {
 1516                 'snapshot_tag': snapshot_tag,
 1517                 'dataset_name': dst_dataset_name,
 1518             }
 1519         )
 1520 
 1521         dst_executor = self._get_shell_executor_by_host(
 1522             destination_share['host'])
 1523 
 1524         # Destroy temporary migration snapshot on dst host
 1525         self.execute(
 1526             'sudo', 'zfs', 'destroy', dst_snapshot_name,
 1527             executor=dst_executor,
 1528         )
 1529 
 1530         # Get export locations of new share instance
 1531         export_locations = self._get_share_helper(
 1532             destination_share['share_proto']).create_exports(
 1533                 dst_dataset_name,
 1534                 executor=dst_executor)
 1535 
 1536         # Destroy src share and temporary migration snapshot on src (this) host
 1537         self.delete_share(context, source_share)
 1538 
 1539         return {'export_locations': export_locations}
 1540 
 1541     @ensure_share_server_not_provided
 1542     def migration_cancel(
 1543             self, context, source_share, destination_share, source_snapshots,
 1544             snapshot_mappings, share_server=None,
 1545             destination_share_server=None):
 1546         """Is called to cancel driver migration."""
 1547 
 1548         src_dataset_name = self.private_storage.get(
 1549             source_share['id'], 'dataset_name')
 1550         dst_dataset_name = self.private_storage.get(
 1551             destination_share['id'], 'dataset_name')
 1552         ssh_cmd = self.private_storage.get(
 1553             destination_share['id'], 'ssh_cmd')
 1554         snapshot_tag = self.private_storage.get(
 1555             destination_share['id'], 'migr_snapshot_tag')
 1556 
 1557         # Kill migration process if exists
 1558         try:
 1559             out, err = self.execute('ps', 'aux')
 1560             lines = out.split('\n')
 1561             for line in lines:
 1562                 if '@%s' % snapshot_tag in line:
 1563                     migr_pid = [
 1564                         x for x in line.strip().split(' ') if x != ''][1]
 1565                     self.execute('sudo', 'kill', '-9', migr_pid)
 1566         except exception.ProcessExecutionError as e:
 1567             LOG.warning(
 1568                 "Caught following error trying to kill migration process: %s",
 1569                 e)
 1570 
 1571         # Sleep couple of seconds before destroying updated objects
 1572         time.sleep(2)
 1573 
 1574         # Destroy snapshot on source host
 1575         self._delete_dataset_or_snapshot_with_retry(
 1576             src_dataset_name + '@' + snapshot_tag)
 1577 
 1578         # Destroy dataset and its migration snapshot on destination host
 1579         try:
 1580             self.execute(
 1581                 'ssh', ssh_cmd,
 1582                 'sudo', 'zfs', 'destroy', '-r', dst_dataset_name,
 1583             )
 1584         except exception.ProcessExecutionError as e:
 1585             LOG.warning(
 1586                 "Failed to destroy destination dataset with following error: "
 1587                 "%s",
 1588                 e)
 1589 
 1590         LOG.debug(
 1591             "Migration of share with ID '%s' has been canceled.",
 1592             source_share["id"])