"Fossies" - the Fresh Open Source Software Archive

Member "manila-11.0.1/manila/share/api.py" (1 Feb 2021, 119932 Bytes) of package /linux/misc/openstack/manila-11.0.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "api.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 11.0.0_vs_11.0.1.

    1 # Copyright 2010 United States Government as represented by the
    2 # Administrator of the National Aeronautics and Space Administration.
    3 # All Rights Reserved.
    4 # Copyright (c) 2015 Tom Barron.  All rights reserved.
    5 # Copyright (c) 2015 Mirantis Inc.
    6 #
    7 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    8 #    not use this file except in compliance with the License. You may obtain
    9 #    a copy of the License at
   10 #
   11 #         http://www.apache.org/licenses/LICENSE-2.0
   12 #
   13 #    Unless required by applicable law or agreed to in writing, software
   14 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   15 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   16 #    License for the specific language governing permissions and limitations
   17 #    under the License.
   18 
   19 """
   20 Handles all requests relating to shares.
   21 """
   22 
   23 from oslo_config import cfg
   24 from oslo_log import log
   25 from oslo_utils import excutils
   26 from oslo_utils import strutils
   27 from oslo_utils import timeutils
   28 import six
   29 
   30 from manila.common import constants
   31 from manila.data import rpcapi as data_rpcapi
   32 from manila.db import base
   33 from manila import exception
   34 from manila.i18n import _
   35 from manila import policy
   36 from manila import quota
   37 from manila.scheduler import rpcapi as scheduler_rpcapi
   38 from manila.share import access
   39 from manila.share import rpcapi as share_rpcapi
   40 from manila.share import share_types
   41 from manila.share import utils as share_utils
   42 from manila import utils
   43 
   44 share_api_opts = [
   45     cfg.BoolOpt('use_scheduler_creating_share_from_snapshot',
   46                 default=False,
   47                 help='If set to False, then share creation from snapshot will '
   48                      'be performed on the same host. '
   49                      'If set to True, then scheduler will be used.'
   50                      'When enabling this option make sure that filter '
   51                      'CreateShareFromSnapshot is enabled and to have hosts '
   52                      'reporting replication_domain option.'
   53                 )
   54 ]
   55 
   56 CONF = cfg.CONF
   57 CONF.register_opts(share_api_opts)
   58 
   59 LOG = log.getLogger(__name__)
   60 GB = 1048576 * 1024
   61 QUOTAS = quota.QUOTAS
   62 
   63 
   64 class API(base.Base):
   65     """API for interacting with the share manager."""
   66 
   67     def __init__(self, db_driver=None):
   68         super(API, self).__init__(db_driver)
   69         self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
   70         self.share_rpcapi = share_rpcapi.ShareAPI()
   71         self.access_helper = access.ShareInstanceAccess(self.db, None)
   72 
   73     def _get_all_availability_zones_with_subnets(self, context,
   74                                                  share_network_id):
   75         compatible_azs = []
   76         for az in self.db.availability_zone_get_all(context):
   77             if self.db.share_network_subnet_get_by_availability_zone_id(
   78                     context, share_network_id=share_network_id,
   79                     availability_zone_id=az['id']):
   80                 compatible_azs.append(az['name'])
   81         return compatible_azs
   82 
   83     def _check_if_share_quotas_exceeded(self, context, quota_exception,
   84                                         share_size, operation='create'):
   85         overs = quota_exception.kwargs['overs']
   86         usages = quota_exception.kwargs['usages']
   87         quotas = quota_exception.kwargs['quotas']
   88 
   89         def _consumed(name):
   90             return (usages[name]['reserved'] + usages[name]['in_use'])
   91 
   92         if 'gigabytes' in overs:
   93             LOG.warning("Quota exceeded for %(s_pid)s, "
   94                         "tried to %(operation)s "
   95                         "%(s_size)sG share (%(d_consumed)dG of "
   96                         "%(d_quota)dG already consumed).", {
   97                             's_pid': context.project_id,
   98                             's_size': share_size,
   99                             'd_consumed': _consumed('gigabytes'),
  100                             'd_quota': quotas['gigabytes'],
  101                             'operation': operation})
  102             raise exception.ShareSizeExceedsAvailableQuota()
  103         elif 'shares' in overs:
  104             LOG.warning("Quota exceeded for %(s_pid)s, "
  105                         "tried to %(operation)s "
  106                         "share (%(d_consumed)d shares "
  107                         "already consumed).", {
  108                             's_pid': context.project_id,
  109                             'd_consumed': _consumed('shares'),
  110                             'operation': operation})
  111             raise exception.ShareLimitExceeded(allowed=quotas['shares'])
  112 
  113     def _check_if_replica_quotas_exceeded(self, context, quota_exception,
  114                                           replica_size,
  115                                           resource_type='share_replica'):
  116         overs = quota_exception.kwargs['overs']
  117         usages = quota_exception.kwargs['usages']
  118         quotas = quota_exception.kwargs['quotas']
  119 
  120         def _consumed(name):
  121             return (usages[name]['reserved'] + usages[name]['in_use'])
  122 
  123         if 'share_replicas' in overs:
  124             LOG.warning("Quota exceeded for %(s_pid)s, "
  125                         "unable to create share-replica (%(d_consumed)d "
  126                         "of %(d_quota)d already consumed).", {
  127                             's_pid': context.project_id,
  128                             'd_consumed': _consumed('share_replicas'),
  129                             'd_quota': quotas['share_replicas']})
  130             exception_kwargs = {}
  131             if resource_type != 'share_replica':
  132                 msg = _("Failed while creating a share with replication "
  133                         "support. Maximum number of allowed share-replicas "
  134                         "is exceeded.")
  135                 exception_kwargs['message'] = msg
  136             raise exception.ShareReplicasLimitExceeded(**exception_kwargs)
  137         elif 'replica_gigabytes' in overs:
  138             LOG.warning("Quota exceeded for %(s_pid)s, "
  139                         "unable to create a share replica size of "
  140                         "%(s_size)sG (%(d_consumed)dG of "
  141                         "%(d_quota)dG already consumed).", {
  142                             's_pid': context.project_id,
  143                             's_size': replica_size,
  144                             'd_consumed': _consumed('replica_gigabytes'),
  145                             'd_quota': quotas['replica_gigabytes']})
  146             exception_kwargs = {}
  147             if resource_type != 'share_replica':
  148                 msg = _("Failed while creating a share with replication "
  149                         "support. Requested share replica exceeds allowed "
  150                         "project/user or share type gigabytes quota.")
  151                 exception_kwargs['message'] = msg
  152             raise exception.ShareReplicaSizeExceedsAvailableQuota(
  153                 **exception_kwargs)
  154 
  155     def create(self, context, share_proto, size, name, description,
  156                snapshot_id=None, availability_zone=None, metadata=None,
  157                share_network_id=None, share_type=None, is_public=False,
  158                share_group_id=None, share_group_snapshot_member=None,
  159                availability_zones=None):
  160         """Create new share."""
  161 
  162         self._check_metadata_properties(metadata)
  163 
  164         if snapshot_id is not None:
  165             snapshot = self.get_snapshot(context, snapshot_id)
  166             if snapshot['aggregate_status'] != constants.STATUS_AVAILABLE:
  167                 msg = _("status must be '%s'") % constants.STATUS_AVAILABLE
  168                 raise exception.InvalidShareSnapshot(reason=msg)
  169             if not size:
  170                 size = snapshot['size']
  171         else:
  172             snapshot = None
  173 
  174         def as_int(s):
  175             try:
  176                 return int(s)
  177             except (ValueError, TypeError):
  178                 return s
  179 
  180         # tolerate size as stringified int
  181         size = as_int(size)
  182 
  183         if not isinstance(size, int) or size <= 0:
  184             msg = (_("Share size '%s' must be an integer and greater than 0")
  185                    % size)
  186             raise exception.InvalidInput(reason=msg)
  187 
  188         if snapshot and size < snapshot['size']:
  189             msg = (_("Share size '%s' must be equal or greater "
  190                      "than snapshot size") % size)
  191             raise exception.InvalidInput(reason=msg)
  192 
  193         if snapshot is None:
  194             share_type_id = share_type['id'] if share_type else None
  195         else:
  196             source_share = self.db.share_get(context, snapshot['share_id'])
  197             source_share_az = source_share['instance']['availability_zone']
  198             if availability_zone is None:
  199                 availability_zone = source_share_az
  200             elif (availability_zone != source_share_az
  201                   and not CONF.use_scheduler_creating_share_from_snapshot):
  202                 LOG.error("The specified availability zone must be the same "
  203                           "as parent share when you have the configuration "
  204                           "option 'use_scheduler_creating_share_from_snapshot'"
  205                           " set to False.")
  206                 msg = _("The specified availability zone must be the same "
  207                         "as the parent share when creating from snapshot.")
  208                 raise exception.InvalidInput(reason=msg)
  209             if share_type is None:
  210                 # Grab the source share's share_type if no new share type
  211                 # has been provided.
  212                 share_type_id = source_share['instance']['share_type_id']
  213                 share_type = share_types.get_share_type(context, share_type_id)
  214             else:
  215                 share_type_id = share_type['id']
  216                 if share_type_id != source_share['instance']['share_type_id']:
  217                     msg = _("Invalid share type specified: the requested "
  218                             "share type must match the type of the source "
  219                             "share. If a share type is not specified when "
  220                             "requesting a new share from a snapshot, the "
  221                             "share type of the source share will be applied "
  222                             "to the new share.")
  223                     raise exception.InvalidInput(reason=msg)
  224 
  225         supported_share_protocols = (
  226             proto.upper() for proto in CONF.enabled_share_protocols)
  227         if not (share_proto and
  228                 share_proto.upper() in supported_share_protocols):
  229             msg = (_("Invalid share protocol provided: %(provided)s. "
  230                      "It is either disabled or unsupported. Available "
  231                      "protocols: %(supported)s") % dict(
  232                          provided=share_proto,
  233                          supported=CONF.enabled_share_protocols))
  234             raise exception.InvalidInput(reason=msg)
  235 
  236         deltas = {'shares': 1, 'gigabytes': size}
  237         share_type_attributes = self.get_share_attributes_from_share_type(
  238             share_type)
  239         share_type_supports_replication = share_type_attributes.get(
  240             'replication_type', None)
  241         if share_type_supports_replication:
  242             deltas.update(
  243                 {'share_replicas': 1, 'replica_gigabytes': size})
  244 
  245         try:
  246             reservations = QUOTAS.reserve(
  247                 context, share_type_id=share_type_id, **deltas)
  248         except exception.OverQuota as e:
  249             self._check_if_share_quotas_exceeded(context, e, size)
  250             if share_type_supports_replication:
  251                 self._check_if_replica_quotas_exceeded(context, e, size,
  252                                                        resource_type='share')
  253 
  254         share_group = None
  255         if share_group_id:
  256             try:
  257                 share_group = self.db.share_group_get(context, share_group_id)
  258             except exception.NotFound as e:
  259                 raise exception.InvalidParameterValue(six.text_type(e))
  260 
  261             if (not share_group_snapshot_member and
  262                     not (share_group['status'] == constants.STATUS_AVAILABLE)):
  263                 params = {
  264                     'avail': constants.STATUS_AVAILABLE,
  265                     'status': share_group['status'],
  266                 }
  267                 msg = _("Share group status must be %(avail)s, got "
  268                         "%(status)s.") % params
  269                 raise exception.InvalidShareGroup(message=msg)
  270 
  271             if share_type_id:
  272                 share_group_st_ids = [
  273                     st['share_type_id']
  274                     for st in share_group.get('share_types', [])]
  275                 if share_type_id not in share_group_st_ids:
  276                     params = {
  277                         'type': share_type_id,
  278                         'group': share_group_id,
  279                     }
  280                     msg = _("The specified share type (%(type)s) is not "
  281                             "supported by the specified share group "
  282                             "(%(group)s).") % params
  283                     raise exception.InvalidParameterValue(msg)
  284 
  285             if not share_group.get('share_network_id') == share_network_id:
  286                 params = {
  287                     'net': share_network_id,
  288                     'group': share_group_id
  289                 }
  290                 msg = _("The specified share network (%(net)s) is not "
  291                         "supported by the specified share group "
  292                         "(%(group)s).") % params
  293                 raise exception.InvalidParameterValue(msg)
  294 
  295         options = {
  296             'size': size,
  297             'user_id': context.user_id,
  298             'project_id': context.project_id,
  299             'snapshot_id': snapshot_id,
  300             'metadata': metadata,
  301             'display_name': name,
  302             'display_description': description,
  303             'share_proto': share_proto,
  304             'is_public': is_public,
  305             'share_group_id': share_group_id,
  306         }
  307         options.update(share_type_attributes)
  308 
  309         if share_group_snapshot_member:
  310             options['source_share_group_snapshot_member_id'] = (
  311                 share_group_snapshot_member['id'])
  312 
  313         # NOTE(dviroel): If a target availability zone was not provided, the
  314         # scheduler will receive a list with all availability zones that
  315         # contains a subnet within the selected share network.
  316         if share_network_id and not availability_zone:
  317             azs_with_subnet = self._get_all_availability_zones_with_subnets(
  318                 context, share_network_id)
  319             if not availability_zones:
  320                 availability_zones = azs_with_subnet
  321             else:
  322                 availability_zones = (
  323                     [az for az in availability_zones if az in azs_with_subnet])
  324             if not availability_zones:
  325                 msg = _(
  326                     "The share network is not supported within any requested "
  327                     "availability zone. Check the share type's "
  328                     "'availability_zones' extra-spec and the availability "
  329                     "zones of the share network subnets")
  330                 raise exception.InvalidInput(message=msg)
  331 
  332         try:
  333             share = self.db.share_create(context, options,
  334                                          create_share_instance=False)
  335             QUOTAS.commit(context, reservations, share_type_id=share_type_id)
  336         except Exception:
  337             with excutils.save_and_reraise_exception():
  338                 try:
  339                     self.db.share_delete(context, share['id'])
  340                 finally:
  341                     QUOTAS.rollback(
  342                         context, reservations, share_type_id=share_type_id)
  343 
  344         host = None
  345         snapshot_host = None
  346         if snapshot:
  347             snapshot_host = snapshot['share']['instance']['host']
  348             if not CONF.use_scheduler_creating_share_from_snapshot:
  349                 # Shares from snapshots with restriction - source host only.
  350                 # It is common situation for different types of backends.
  351                 host = snapshot['share']['instance']['host']
  352 
  353         if share_group and host is None:
  354             host = share_group['host']
  355 
  356         self.create_instance(
  357             context, share, share_network_id=share_network_id, host=host,
  358             availability_zone=availability_zone, share_group=share_group,
  359             share_group_snapshot_member=share_group_snapshot_member,
  360             share_type_id=share_type_id, availability_zones=availability_zones,
  361             snapshot_host=snapshot_host)
  362 
  363         # Retrieve the share with instance details
  364         share = self.db.share_get(context, share['id'])
  365 
  366         return share
  367 
  368     def get_share_attributes_from_share_type(self, share_type):
  369         """Determine share attributes from the share type.
  370 
  371         The share type can change any time after shares of that type are
  372         created, so we copy some share type attributes to the share to
  373         consistently govern the behavior of that share over its lifespan.
  374         """
  375 
  376         inferred_map = constants.ExtraSpecs.INFERRED_OPTIONAL_MAP
  377         snapshot_support_key = constants.ExtraSpecs.SNAPSHOT_SUPPORT
  378         create_share_from_snapshot_key = (
  379             constants.ExtraSpecs.CREATE_SHARE_FROM_SNAPSHOT_SUPPORT)
  380         revert_to_snapshot_key = (
  381             constants.ExtraSpecs.REVERT_TO_SNAPSHOT_SUPPORT)
  382         mount_snapshot_support_key = (
  383             constants.ExtraSpecs.MOUNT_SNAPSHOT_SUPPORT)
  384 
  385         snapshot_support_default = inferred_map.get(snapshot_support_key)
  386         create_share_from_snapshot_support_default = inferred_map.get(
  387             create_share_from_snapshot_key)
  388         revert_to_snapshot_support_default = inferred_map.get(
  389             revert_to_snapshot_key)
  390         mount_snapshot_support_default = inferred_map.get(
  391             constants.ExtraSpecs.MOUNT_SNAPSHOT_SUPPORT)
  392 
  393         if share_type:
  394             snapshot_support = share_types.parse_boolean_extra_spec(
  395                 snapshot_support_key,
  396                 share_type.get('extra_specs', {}).get(
  397                     snapshot_support_key, snapshot_support_default))
  398             create_share_from_snapshot_support = (
  399                 share_types.parse_boolean_extra_spec(
  400                     create_share_from_snapshot_key,
  401                     share_type.get('extra_specs', {}).get(
  402                         create_share_from_snapshot_key,
  403                         create_share_from_snapshot_support_default)))
  404             revert_to_snapshot_support = (
  405                 share_types.parse_boolean_extra_spec(
  406                     revert_to_snapshot_key,
  407                     share_type.get('extra_specs', {}).get(
  408                         revert_to_snapshot_key,
  409                         revert_to_snapshot_support_default)))
  410             mount_snapshot_support = share_types.parse_boolean_extra_spec(
  411                 mount_snapshot_support_key, share_type.get(
  412                     'extra_specs', {}).get(
  413                     mount_snapshot_support_key,
  414                     mount_snapshot_support_default))
  415             replication_type = share_type.get('extra_specs', {}).get(
  416                 'replication_type')
  417         else:
  418             snapshot_support = snapshot_support_default
  419             create_share_from_snapshot_support = (
  420                 create_share_from_snapshot_support_default)
  421             revert_to_snapshot_support = revert_to_snapshot_support_default
  422             mount_snapshot_support = mount_snapshot_support_default
  423             replication_type = None
  424 
  425         return {
  426             'snapshot_support': snapshot_support,
  427             'create_share_from_snapshot_support':
  428                 create_share_from_snapshot_support,
  429             'revert_to_snapshot_support': revert_to_snapshot_support,
  430             'replication_type': replication_type,
  431             'mount_snapshot_support': mount_snapshot_support,
  432         }
  433 
  434     def create_instance(self, context, share, share_network_id=None,
  435                         host=None, availability_zone=None,
  436                         share_group=None, share_group_snapshot_member=None,
  437                         share_type_id=None, availability_zones=None,
  438                         snapshot_host=None):
  439         request_spec, share_instance = (
  440             self.create_share_instance_and_get_request_spec(
  441                 context, share, availability_zone=availability_zone,
  442                 share_group=share_group, host=host,
  443                 share_network_id=share_network_id,
  444                 share_type_id=share_type_id,
  445                 availability_zones=availability_zones,
  446                 snapshot_host=snapshot_host))
  447 
  448         if share_group_snapshot_member:
  449             # Inherit properties from the share_group_snapshot_member
  450             member_share_instance = share_group_snapshot_member[
  451                 'share_instance']
  452             updates = {
  453                 'host': member_share_instance['host'],
  454                 'share_network_id': member_share_instance['share_network_id'],
  455                 'share_server_id': member_share_instance['share_server_id'],
  456             }
  457             share = self.db.share_instance_update(context,
  458                                                   share_instance['id'],
  459                                                   updates)
  460             # NOTE(ameade): Do not cast to driver if creating from share group
  461             # snapshot
  462             return
  463 
  464         if host:
  465             self.share_rpcapi.create_share_instance(
  466                 context,
  467                 share_instance,
  468                 host,
  469                 request_spec=request_spec,
  470                 filter_properties={},
  471                 snapshot_id=share['snapshot_id'],
  472             )
  473         else:
  474             # Create share instance from scratch or from snapshot could happen
  475             # on hosts other than the source host.
  476             self.scheduler_rpcapi.create_share_instance(
  477                 context, request_spec=request_spec, filter_properties={})
  478 
  479         return share_instance
  480 
  481     def create_share_instance_and_get_request_spec(
  482             self, context, share, availability_zone=None,
  483             share_group=None, host=None, share_network_id=None,
  484             share_type_id=None, cast_rules_to_readonly=False,
  485             availability_zones=None, snapshot_host=None):
  486 
  487         availability_zone_id = None
  488         if availability_zone:
  489             availability_zone_id = self.db.availability_zone_get(
  490                 context, availability_zone).id
  491 
  492         # TODO(u_glide): Add here validation that provided share network
  493         # doesn't conflict with provided availability_zone when Neutron
  494         # will have AZ support.
  495         share_instance = self.db.share_instance_create(
  496             context, share['id'],
  497             {
  498                 'share_network_id': share_network_id,
  499                 'status': constants.STATUS_CREATING,
  500                 'scheduled_at': timeutils.utcnow(),
  501                 'host': host if host else '',
  502                 'availability_zone_id': availability_zone_id,
  503                 'share_type_id': share_type_id,
  504                 'cast_rules_to_readonly': cast_rules_to_readonly,
  505             }
  506         )
  507 
  508         share_properties = {
  509             'id': share['id'],
  510             'size': share['size'],
  511             'user_id': share['user_id'],
  512             'project_id': share['project_id'],
  513             'metadata': self.db.share_metadata_get(context, share['id']),
  514             'share_server_id': share_instance['share_server_id'],
  515             'snapshot_support': share['snapshot_support'],
  516             'create_share_from_snapshot_support':
  517                 share['create_share_from_snapshot_support'],
  518             'revert_to_snapshot_support': share['revert_to_snapshot_support'],
  519             'mount_snapshot_support': share['mount_snapshot_support'],
  520             'share_proto': share['share_proto'],
  521             'share_type_id': share_type_id,
  522             'is_public': share['is_public'],
  523             'share_group_id': share['share_group_id'],
  524             'source_share_group_snapshot_member_id': share[
  525                 'source_share_group_snapshot_member_id'],
  526             'snapshot_id': share['snapshot_id'],
  527             'replication_type': share['replication_type'],
  528         }
  529         share_instance_properties = {
  530             'id': share_instance['id'],
  531             'availability_zone_id': share_instance['availability_zone_id'],
  532             'share_network_id': share_instance['share_network_id'],
  533             'share_server_id': share_instance['share_server_id'],
  534             'share_id': share_instance['share_id'],
  535             'host': share_instance['host'],
  536             'status': share_instance['status'],
  537             'replica_state': share_instance['replica_state'],
  538             'share_type_id': share_instance['share_type_id'],
  539         }
  540 
  541         share_type = None
  542         if share_instance['share_type_id']:
  543             share_type = self.db.share_type_get(
  544                 context, share_instance['share_type_id'])
  545 
  546         request_spec = {
  547             'share_properties': share_properties,
  548             'share_instance_properties': share_instance_properties,
  549             'share_proto': share['share_proto'],
  550             'share_id': share['id'],
  551             'snapshot_id': share['snapshot_id'],
  552             'snapshot_host': snapshot_host,
  553             'share_type': share_type,
  554             'share_group': share_group,
  555             'availability_zone_id': availability_zone_id,
  556             'availability_zones': availability_zones,
  557         }
  558         return request_spec, share_instance
  559 
  560     def create_share_replica(self, context, share, availability_zone=None,
  561                              share_network_id=None):
  562 
  563         if not share.get('replication_type'):
  564             msg = _("Replication not supported for share %s.")
  565             raise exception.InvalidShare(message=msg % share['id'])
  566 
  567         if share.get('share_group_id'):
  568             msg = _("Replication not supported for shares in a group.")
  569             raise exception.InvalidShare(message=msg)
  570 
  571         self._check_is_share_busy(share)
  572 
  573         active_replica = self.db.share_replicas_get_available_active_replica(
  574             context, share['id'])
  575 
  576         if not active_replica:
  577             msg = _("Share %s does not have any active replica in available "
  578                     "state.")
  579             raise exception.ReplicationException(reason=msg % share['id'])
  580 
  581         share_type = share_types.get_share_type(
  582             context, share.instance['share_type_id'])
  583         type_azs = share_type['extra_specs'].get('availability_zones', '')
  584         type_azs = [t for t in type_azs.split(',') if type_azs]
  585         if (availability_zone and type_azs and
  586                 availability_zone not in type_azs):
  587             msg = _("Share replica cannot be created since the share type "
  588                     "%(type)s is not supported within the availability zone "
  589                     "chosen %(az)s.")
  590             type_name = '%s' % (share_type['name'] or '')
  591             type_id = '(ID: %s)' % share_type['id']
  592             payload = {'type': '%s%s' % (type_name, type_id),
  593                        'az': availability_zone}
  594             raise exception.InvalidShare(message=msg % payload)
  595 
  596         try:
  597             reservations = QUOTAS.reserve(
  598                 context, share_replicas=1, replica_gigabytes=share['size'],
  599                 share_type_id=share_type['id']
  600             )
  601         except exception.OverQuota as e:
  602             self._check_if_replica_quotas_exceeded(context, e, share['size'])
  603 
  604         if share_network_id:
  605             if availability_zone:
  606                 try:
  607                     az = self.db.availability_zone_get(context,
  608                                                        availability_zone)
  609                 except exception.AvailabilityZoneNotFound:
  610                     msg = _("Share replica cannot be created because the "
  611                             "specified availability zone does not exist.")
  612                     raise exception.InvalidInput(message=msg)
  613                 if self.db.share_network_subnet_get_by_availability_zone_id(
  614                         context, share_network_id, az.get('id')) is None:
  615                     msg = _("Share replica cannot be created because the "
  616                             "share network is not available within the "
  617                             "specified availability zone.")
  618                     raise exception.InvalidShare(message=msg)
  619             else:
  620                 # NOTE(dviroel): If a target availability zone was not
  621                 # provided, the scheduler will receive a list with all
  622                 # availability zones that contains subnets within the
  623                 # selected share network.
  624                 azs_subnet = self._get_all_availability_zones_with_subnets(
  625                     context, share_network_id)
  626                 if not type_azs:
  627                     type_azs = azs_subnet
  628                 else:
  629                     type_azs = (
  630                         [az for az in type_azs if az in azs_subnet])
  631                 if not type_azs:
  632                     msg = _(
  633                         "The share network is not supported within any "
  634                         "requested  availability zone. Check the share type's "
  635                         "'availability_zones' extra-spec and the availability "
  636                         "zones of the share network subnets")
  637                     raise exception.InvalidInput(message=msg)
  638 
  639         if share['replication_type'] == constants.REPLICATION_TYPE_READABLE:
  640             cast_rules_to_readonly = True
  641         else:
  642             cast_rules_to_readonly = False
  643 
  644         try:
  645             request_spec, share_replica = (
  646                 self.create_share_instance_and_get_request_spec(
  647                     context, share, availability_zone=availability_zone,
  648                     share_network_id=share_network_id,
  649                     share_type_id=share['instance']['share_type_id'],
  650                     cast_rules_to_readonly=cast_rules_to_readonly,
  651                     availability_zones=type_azs)
  652             )
  653             QUOTAS.commit(
  654                 context, reservations, project_id=share['project_id'],
  655                 share_type_id=share_type['id'],
  656             )
  657         except Exception:
  658             with excutils.save_and_reraise_exception():
  659                 try:
  660                     self.db.share_replica_delete(
  661                         context, share_replica['id'],
  662                         need_to_update_usages=False)
  663                 finally:
  664                     QUOTAS.rollback(
  665                         context, reservations, share_type_id=share_type['id'])
  666 
  667         all_replicas = self.db.share_replicas_get_all_by_share(
  668             context, share['id'])
  669         all_hosts = [r['host'] for r in all_replicas]
  670 
  671         request_spec['active_replica_host'] = active_replica['host']
  672         request_spec['all_replica_hosts'] = ','.join(all_hosts)
  673 
  674         self.db.share_replica_update(
  675             context, share_replica['id'],
  676             {'replica_state': constants.REPLICA_STATE_OUT_OF_SYNC})
  677 
  678         existing_snapshots = (
  679             self.db.share_snapshot_get_all_for_share(
  680                 context, share_replica['share_id'])
  681         )
  682         snapshot_instance = {
  683             'status': constants.STATUS_CREATING,
  684             'progress': '0%',
  685             'share_instance_id': share_replica['id'],
  686         }
  687         for snapshot in existing_snapshots:
  688             self.db.share_snapshot_instance_create(
  689                 context, snapshot['id'], snapshot_instance)
  690 
  691         self.scheduler_rpcapi.create_share_replica(
  692             context, request_spec=request_spec, filter_properties={})
  693 
  694         return share_replica
  695 
  696     def delete_share_replica(self, context, share_replica, force=False):
  697         # Disallow deletion of ONLY active replica, *even* when this
  698         # operation is forced.
  699         replicas = self.db.share_replicas_get_all_by_share(
  700             context, share_replica['share_id'])
  701         active_replicas = list(filter(
  702             lambda x: x['replica_state'] == constants.REPLICA_STATE_ACTIVE,
  703             replicas))
  704         if (share_replica.get('replica_state') ==
  705                 constants.REPLICA_STATE_ACTIVE and len(active_replicas) == 1):
  706             msg = _("Cannot delete last active replica.")
  707             raise exception.ReplicationException(reason=msg)
  708 
  709         LOG.info("Deleting replica %s.", share_replica['id'])
  710 
  711         self.db.share_replica_update(
  712             context, share_replica['id'],
  713             {
  714                 'status': constants.STATUS_DELETING,
  715                 'terminated_at': timeutils.utcnow(),
  716             }
  717         )
  718 
  719         if not share_replica['host']:
  720             # Delete any snapshot instances created on the database
  721             replica_snapshots = (
  722                 self.db.share_snapshot_instance_get_all_with_filters(
  723                     context, {'share_instance_ids': share_replica['id']})
  724             )
  725             for snapshot in replica_snapshots:
  726                 self.db.share_snapshot_instance_delete(context, snapshot['id'])
  727 
  728             # Delete the replica from the database
  729             self.db.share_replica_delete(context, share_replica['id'])
  730         else:
  731 
  732             self.share_rpcapi.delete_share_replica(context,
  733                                                    share_replica,
  734                                                    force=force)
  735 
  736     def promote_share_replica(self, context, share_replica):
  737 
  738         if share_replica.get('status') != constants.STATUS_AVAILABLE:
  739             msg = _("Replica %(replica_id)s must be in %(status)s state to be "
  740                     "promoted.")
  741             raise exception.ReplicationException(
  742                 reason=msg % {'replica_id': share_replica['id'],
  743                               'status': constants.STATUS_AVAILABLE})
  744 
  745         replica_state = share_replica['replica_state']
  746 
  747         if (replica_state in (constants.REPLICA_STATE_OUT_OF_SYNC,
  748                               constants.STATUS_ERROR)
  749                 and not context.is_admin):
  750             msg = _("Promoting a replica with 'replica_state': %s requires "
  751                     "administrator privileges.")
  752             raise exception.AdminRequired(
  753                 message=msg % replica_state)
  754 
  755         self.db.share_replica_update(
  756             context, share_replica['id'],
  757             {'status': constants.STATUS_REPLICATION_CHANGE})
  758 
  759         self.share_rpcapi.promote_share_replica(context, share_replica)
  760 
  761         return self.db.share_replica_get(context, share_replica['id'])
  762 
  763     def update_share_replica(self, context, share_replica):
  764 
  765         if not share_replica['host']:
  766             msg = _("Share replica does not have a valid host.")
  767             raise exception.InvalidHost(reason=msg)
  768 
  769         self.share_rpcapi.update_share_replica(context, share_replica)
  770 
  771     def manage(self, context, share_data, driver_options):
  772 
  773         # Check whether there's a share already with the provided options:
  774         filters = {
  775             'export_location_path': share_data['export_location_path'],
  776             'host': share_data['host'],
  777         }
  778         share_server_id = share_data.get('share_server_id')
  779         if share_server_id:
  780             filters['share_server_id'] = share_data['share_server_id']
  781 
  782         already_managed = self.db.share_instances_get_all(context,
  783                                                           filters=filters)
  784 
  785         if already_managed:
  786             LOG.error("Found an existing share with export location %s!",
  787                       share_data['export_location_path'])
  788             msg = _("A share already exists with the export path specified.")
  789             raise exception.InvalidShare(reason=msg)
  790 
  791         share_type_id = share_data['share_type_id']
  792         share_type = share_types.get_share_type(context, share_type_id)
  793 
  794         dhss = share_types.parse_boolean_extra_spec(
  795             'driver_handles_share_servers',
  796             share_type['extra_specs']['driver_handles_share_servers'])
  797 
  798         if dhss and not share_server_id:
  799             msg = _("Share Server ID parameter is required when managing a "
  800                     "share using a share type with "
  801                     "driver_handles_share_servers extra-spec set to True.")
  802             raise exception.InvalidInput(reason=msg)
  803         if not dhss and share_server_id:
  804             msg = _("Share Server ID parameter is not expected when managing a"
  805                     " share using a share type with "
  806                     "driver_handles_share_servers extra-spec set to False.")
  807             raise exception.InvalidInput(reason=msg)
  808 
  809         if share_server_id:
  810             try:
  811                 share_server = self.db.share_server_get(
  812                     context, share_data['share_server_id'])
  813             except exception.ShareServerNotFound:
  814                 msg = _("Share Server specified was not found.")
  815                 raise exception.InvalidInput(reason=msg)
  816 
  817             if share_server['status'] != constants.STATUS_ACTIVE:
  818                 msg = _("The provided share server is not active.")
  819                 raise exception.InvalidShareServer(reason=msg)
  820             subnet = self.db.share_network_subnet_get(
  821                 context, share_server['share_network_subnet_id'])
  822             share_data['share_network_id'] = subnet['share_network_id']
  823 
  824         share_data.update({
  825             'user_id': context.user_id,
  826             'project_id': context.project_id,
  827             'status': constants.STATUS_MANAGING,
  828             'scheduled_at': timeutils.utcnow(),
  829         })
  830         share_data.update(
  831             self.get_share_attributes_from_share_type(share_type))
  832 
  833         share = self.db.share_create(context, share_data)
  834 
  835         export_location_path = share_data.pop('export_location_path')
  836         self.db.share_export_locations_update(context, share.instance['id'],
  837                                               export_location_path)
  838 
  839         request_spec = self._get_request_spec_dict(
  840             share, share_type, size=0, share_proto=share_data['share_proto'],
  841             host=share_data['host'])
  842 
  843         # NOTE(ganso): Scheduler is called to validate if share type
  844         # provided can fit in host provided. It will invoke manage upon
  845         # successful validation.
  846         self.scheduler_rpcapi.manage_share(context, share['id'],
  847                                            driver_options, request_spec)
  848 
  849         return self.db.share_get(context, share['id'])
  850 
  851     def _get_request_spec_dict(self, share, share_type, **kwargs):
  852 
  853         if share is None:
  854             share = {'instance': {}}
  855 
  856         # NOTE(dviroel): The share object can be a share instance object with
  857         # share data.
  858         share_instance = share.get('instance', share)
  859 
  860         share_properties = {
  861             'size': kwargs.get('size', share.get('size')),
  862             'user_id': kwargs.get('user_id', share.get('user_id')),
  863             'project_id': kwargs.get('project_id', share.get('project_id')),
  864             'snapshot_support': kwargs.get(
  865                 'snapshot_support',
  866                 share_type.get('extra_specs', {}).get('snapshot_support')
  867             ),
  868             'create_share_from_snapshot_support': kwargs.get(
  869                 'create_share_from_snapshot_support',
  870                 share_type.get('extra_specs', {}).get(
  871                     'create_share_from_snapshot_support')
  872             ),
  873             'revert_to_snapshot_support': kwargs.get(
  874                 'revert_to_snapshot_support',
  875                 share_type.get('extra_specs', {}).get(
  876                     'revert_to_snapshot_support')
  877             ),
  878             'mount_snapshot_support': kwargs.get(
  879                 'mount_snapshot_support',
  880                 share_type.get('extra_specs', {}).get(
  881                     'mount_snapshot_support')
  882             ),
  883             'share_proto': kwargs.get('share_proto', share.get('share_proto')),
  884             'share_type_id': share_type['id'],
  885             'is_public': kwargs.get('is_public', share.get('is_public')),
  886             'share_group_id': kwargs.get(
  887                 'share_group_id', share.get('share_group_id')),
  888             'source_share_group_snapshot_member_id': kwargs.get(
  889                 'source_share_group_snapshot_member_id',
  890                 share.get('source_share_group_snapshot_member_id')),
  891             'snapshot_id': kwargs.get('snapshot_id', share.get('snapshot_id')),
  892         }
  893         share_instance_properties = {
  894             'availability_zone_id': kwargs.get(
  895                 'availability_zone_id',
  896                 share_instance.get('availability_zone_id')),
  897             'share_network_id': kwargs.get(
  898                 'share_network_id', share_instance.get('share_network_id')),
  899             'share_server_id': kwargs.get(
  900                 'share_server_id', share_instance.get('share_server_id')),
  901             'share_id': kwargs.get('share_id', share_instance.get('share_id')),
  902             'host': kwargs.get('host', share_instance.get('host')),
  903             'status': kwargs.get('status', share_instance.get('status')),
  904         }
  905 
  906         request_spec = {
  907             'share_properties': share_properties,
  908             'share_instance_properties': share_instance_properties,
  909             'share_type': share_type,
  910             'share_id': share.get('id'),
  911         }
  912         return request_spec
  913 
  914     def unmanage(self, context, share):
  915         policy.check_policy(context, 'share', 'unmanage')
  916 
  917         self._check_is_share_busy(share)
  918 
  919         update_data = {'status': constants.STATUS_UNMANAGING,
  920                        'terminated_at': timeutils.utcnow()}
  921         share_ref = self.db.share_update(context, share['id'], update_data)
  922 
  923         self.share_rpcapi.unmanage_share(context, share_ref)
  924 
  925         # NOTE(u_glide): We should update 'updated_at' timestamp of
  926         # share server here, when manage/unmanage operations will be supported
  927         # for driver_handles_share_servers=True mode
  928 
  929     def manage_snapshot(self, context, snapshot_data, driver_options):
  930         try:
  931             share = self.db.share_get(context, snapshot_data['share_id'])
  932         except exception.NotFound:
  933             raise exception.ShareNotFound(share_id=snapshot_data['share_id'])
  934 
  935         if share['has_replicas']:
  936             msg = (_("Share %s has replicas. Snapshots of this share cannot "
  937                      "currently be managed until all replicas are removed.")
  938                    % share['id'])
  939             raise exception.InvalidShare(reason=msg)
  940 
  941         existing_snapshots = self.db.share_snapshot_get_all_for_share(
  942             context, snapshot_data['share_id'])
  943 
  944         for existing_snap in existing_snapshots:
  945             for inst in existing_snap.get('instances'):
  946                 if (snapshot_data['provider_location'] ==
  947                         inst['provider_location']):
  948                     msg = _("A share snapshot %(share_snapshot_id)s is "
  949                             "already managed for provider location "
  950                             "%(provider_location)s.") % {
  951                         'share_snapshot_id': existing_snap['id'],
  952                         'provider_location':
  953                             snapshot_data['provider_location'],
  954                     }
  955                     raise exception.ManageInvalidShareSnapshot(
  956                         reason=msg)
  957 
  958         snapshot_data.update({
  959             'user_id': context.user_id,
  960             'project_id': context.project_id,
  961             'status': constants.STATUS_MANAGING,
  962             'share_size': share['size'],
  963             'progress': '0%',
  964             'share_proto': share['share_proto']
  965         })
  966 
  967         snapshot = self.db.share_snapshot_create(context, snapshot_data)
  968 
  969         self.share_rpcapi.manage_snapshot(context, snapshot, share['host'],
  970                                           driver_options)
  971         return snapshot
  972 
  973     def unmanage_snapshot(self, context, snapshot, host):
  974         update_data = {'status': constants.STATUS_UNMANAGING,
  975                        'terminated_at': timeutils.utcnow()}
  976         snapshot_ref = self.db.share_snapshot_update(context,
  977                                                      snapshot['id'],
  978                                                      update_data)
  979 
  980         self.share_rpcapi.unmanage_snapshot(context, snapshot_ref, host)
  981 
  982     def revert_to_snapshot(self, context, share, snapshot):
  983         """Revert a share to a snapshot."""
  984 
  985         reservations = self._handle_revert_to_snapshot_quotas(
  986             context, share, snapshot)
  987 
  988         try:
  989             if share.get('has_replicas'):
  990                 self._revert_to_replicated_snapshot(
  991                     context, share, snapshot, reservations)
  992             else:
  993                 self._revert_to_snapshot(
  994                     context, share, snapshot, reservations)
  995         except Exception:
  996             with excutils.save_and_reraise_exception():
  997                 if reservations:
  998                     QUOTAS.rollback(
  999                         context, reservations,
 1000                         share_type_id=share['instance']['share_type_id'])
 1001 
 1002     def _handle_revert_to_snapshot_quotas(self, context, share, snapshot):
 1003         """Reserve extra quota if a revert will result in a larger share."""
 1004 
 1005         # Note(cknight): This value may be positive or negative.
 1006         size_increase = snapshot['size'] - share['size']
 1007         if not size_increase:
 1008             return None
 1009 
 1010         try:
 1011             return QUOTAS.reserve(
 1012                 context,
 1013                 project_id=share['project_id'],
 1014                 gigabytes=size_increase,
 1015                 user_id=share['user_id'],
 1016                 share_type_id=share['instance']['share_type_id'])
 1017         except exception.OverQuota as exc:
 1018             usages = exc.kwargs['usages']
 1019             quotas = exc.kwargs['quotas']
 1020             consumed_gb = (usages['gigabytes']['reserved'] +
 1021                            usages['gigabytes']['in_use'])
 1022 
 1023             msg = _("Quota exceeded for %(s_pid)s. Reverting share "
 1024                     "%(s_sid)s to snapshot %(s_ssid)s will increase the "
 1025                     "share's size by %(s_size)sG, "
 1026                     "(%(d_consumed)dG of %(d_quota)dG already consumed).")
 1027             msg_args = {
 1028                 's_pid': context.project_id,
 1029                 's_sid': share['id'],
 1030                 's_ssid': snapshot['id'],
 1031                 's_size': size_increase,
 1032                 'd_consumed': consumed_gb,
 1033                 'd_quota': quotas['gigabytes'],
 1034             }
 1035             message = msg % msg_args
 1036             LOG.error(message)
 1037             raise exception.ShareSizeExceedsAvailableQuota(message=message)
 1038 
 1039     def _revert_to_snapshot(self, context, share, snapshot, reservations):
 1040         """Revert a non-replicated share to a snapshot."""
 1041 
 1042         # Set status of share to 'reverting'
 1043         self.db.share_update(
 1044             context, snapshot['share_id'],
 1045             {'status': constants.STATUS_REVERTING})
 1046 
 1047         # Set status of snapshot to 'restoring'
 1048         self.db.share_snapshot_update(
 1049             context, snapshot['id'],
 1050             {'status': constants.STATUS_RESTORING})
 1051 
 1052         # Send revert API to share host
 1053         self.share_rpcapi.revert_to_snapshot(
 1054             context, share, snapshot, share['instance']['host'], reservations)
 1055 
 1056     def _revert_to_replicated_snapshot(self, context, share, snapshot,
 1057                                        reservations):
 1058         """Revert a replicated share to a snapshot."""
 1059 
 1060         # Get active replica
 1061         active_replica = self.db.share_replicas_get_available_active_replica(
 1062             context, share['id'])
 1063 
 1064         if not active_replica:
 1065             msg = _('Share %s has no active replica in available state.')
 1066             raise exception.ReplicationException(reason=msg % share['id'])
 1067 
 1068         # Get snapshot instance on active replica
 1069         snapshot_instance_filters = {
 1070             'share_instance_ids': active_replica['id'],
 1071             'snapshot_ids': snapshot['id'],
 1072         }
 1073         snapshot_instances = (
 1074             self.db.share_snapshot_instance_get_all_with_filters(
 1075                 context, snapshot_instance_filters))
 1076         active_snapshot_instance = (
 1077             snapshot_instances[0] if snapshot_instances else None)
 1078 
 1079         if not active_snapshot_instance:
 1080             msg = _('Share %(share)s has no snapshot %(snap)s associated with '
 1081                     'its active replica.')
 1082             msg_args = {'share': share['id'], 'snap': snapshot['id']}
 1083             raise exception.ReplicationException(reason=msg % msg_args)
 1084 
 1085         # Set active replica to 'reverting'
 1086         self.db.share_replica_update(
 1087             context, active_replica['id'],
 1088             {'status': constants.STATUS_REVERTING})
 1089 
 1090         # Set snapshot instance on active replica to 'restoring'
 1091         self.db.share_snapshot_instance_update(
 1092             context, active_snapshot_instance['id'],
 1093             {'status': constants.STATUS_RESTORING})
 1094 
 1095         # Send revert API to active replica host
 1096         self.share_rpcapi.revert_to_snapshot(
 1097             context, share, snapshot, active_replica['host'], reservations)
 1098 
 1099     @policy.wrap_check_policy('share')
 1100     def delete(self, context, share, force=False):
 1101         """Delete share."""
 1102         share = self.db.share_get(context, share['id'])
 1103         share_id = share['id']
 1104         statuses = (constants.STATUS_AVAILABLE, constants.STATUS_ERROR,
 1105                     constants.STATUS_INACTIVE)
 1106         if not (force or share['status'] in statuses):
 1107             msg = _("Share status must be one of %(statuses)s") % {
 1108                 "statuses": statuses}
 1109             raise exception.InvalidShare(reason=msg)
 1110 
 1111         # NOTE(gouthamr): If the share has more than one replica,
 1112         # it can't be deleted until the additional replicas are removed.
 1113         if share.has_replicas:
 1114             msg = _("Share %s has replicas. Remove the replicas before "
 1115                     "deleting the share.") % share_id
 1116             raise exception.Conflict(err=msg)
 1117 
 1118         snapshots = self.db.share_snapshot_get_all_for_share(context, share_id)
 1119         if len(snapshots):
 1120             msg = _("Share still has %d dependent snapshots.") % len(snapshots)
 1121             raise exception.InvalidShare(reason=msg)
 1122 
 1123         share_group_snapshot_members_count = (
 1124             self.db.count_share_group_snapshot_members_in_share(
 1125                 context, share_id))
 1126         if share_group_snapshot_members_count:
 1127             msg = (
 1128                 _("Share still has %d dependent share group snapshot "
 1129                   "members.") % share_group_snapshot_members_count)
 1130             raise exception.InvalidShare(reason=msg)
 1131 
 1132         self._check_is_share_busy(share)
 1133         for share_instance in share.instances:
 1134             if share_instance['host']:
 1135                 self.delete_instance(context, share_instance, force=force)
 1136             else:
 1137                 self.db.share_instance_delete(
 1138                     context, share_instance['id'], need_to_update_usages=True)
 1139 
 1140     def delete_instance(self, context, share_instance, force=False):
 1141         policy.check_policy(context, 'share', 'delete')
 1142 
 1143         statuses = (constants.STATUS_AVAILABLE, constants.STATUS_ERROR,
 1144                     constants.STATUS_INACTIVE)
 1145         if not (force or share_instance['status'] in statuses):
 1146             msg = _("Share instance status must be one of %(statuses)s") % {
 1147                 "statuses": statuses}
 1148             raise exception.InvalidShareInstance(reason=msg)
 1149 
 1150         share_instance = self.db.share_instance_update(
 1151             context, share_instance['id'],
 1152             {'status': constants.STATUS_DELETING,
 1153              'terminated_at': timeutils.utcnow()}
 1154         )
 1155 
 1156         self.share_rpcapi.delete_share_instance(context, share_instance,
 1157                                                 force=force)
 1158 
 1159         # NOTE(u_glide): 'updated_at' timestamp is used to track last usage of
 1160         # share server. This is required for automatic share servers cleanup
 1161         # because we should track somehow period of time when share server
 1162         # doesn't have shares (unused). We do this update only on share
 1163         # deletion because share server with shares cannot be deleted, so no
 1164         # need to do this update on share creation or any other share operation
 1165         if share_instance['share_server_id']:
 1166             self.db.share_server_update(
 1167                 context,
 1168                 share_instance['share_server_id'],
 1169                 {'updated_at': timeutils.utcnow()})
 1170 
 1171     def delete_share_server(self, context, server):
 1172         """Delete share server."""
 1173         policy.check_policy(context, 'share_server', 'delete', server)
 1174         shares = self.db.share_instances_get_all_by_share_server(context,
 1175                                                                  server['id'])
 1176 
 1177         if shares:
 1178             raise exception.ShareServerInUse(share_server_id=server['id'])
 1179 
 1180         share_groups = self.db.share_group_get_all_by_share_server(
 1181             context, server['id'])
 1182         if share_groups:
 1183             LOG.error("share server '%(ssid)s' in use by share groups.",
 1184                       {'ssid': server['id']})
 1185             raise exception.ShareServerInUse(share_server_id=server['id'])
 1186 
 1187         # NOTE(vponomaryov): There is no share_server status update here,
 1188         # it is intentional.
 1189         # Status will be changed in manila.share.manager after verification
 1190         # for race condition between share creation on server
 1191         # and server deletion.
 1192         self.share_rpcapi.delete_share_server(context, server)
 1193 
 1194     def manage_share_server(
 1195             self, context, identifier, host, share_net_subnet, driver_opts):
 1196         """Manage a share server."""
 1197 
 1198         try:
 1199             matched_servers = self.db.share_server_search_by_identifier(
 1200                 context, identifier)
 1201         except exception.ShareServerNotFound:
 1202             pass
 1203         else:
 1204             msg = _("Identifier %(identifier)s specified matches existing "
 1205                     "share servers: %(servers)s.") % {
 1206                 'identifier': identifier,
 1207                 'servers': ', '.join(s['identifier'] for s in matched_servers)
 1208             }
 1209             raise exception.InvalidInput(reason=msg)
 1210 
 1211         values = {
 1212             'host': host,
 1213             'share_network_subnet_id': share_net_subnet['id'],
 1214             'status': constants.STATUS_MANAGING,
 1215             'is_auto_deletable': False,
 1216             'identifier': identifier,
 1217         }
 1218 
 1219         server = self.db.share_server_create(context, values)
 1220 
 1221         self.share_rpcapi.manage_share_server(
 1222             context, server, identifier, driver_opts)
 1223 
 1224         return self.db.share_server_get(context, server['id'])
 1225 
 1226     def unmanage_share_server(self, context, share_server, force=False):
 1227         """Unmanage a share server."""
 1228 
 1229         shares = self.db.share_instances_get_all_by_share_server(
 1230             context, share_server['id'])
 1231 
 1232         if shares:
 1233             raise exception.ShareServerInUse(
 1234                 share_server_id=share_server['id'])
 1235 
 1236         share_groups = self.db.share_group_get_all_by_share_server(
 1237             context, share_server['id'])
 1238         if share_groups:
 1239             LOG.error("share server '%(ssid)s' in use by share groups.",
 1240                       {'ssid': share_server['id']})
 1241             raise exception.ShareServerInUse(
 1242                 share_server_id=share_server['id'])
 1243 
 1244         update_data = {'status': constants.STATUS_UNMANAGING,
 1245                        'terminated_at': timeutils.utcnow()}
 1246 
 1247         share_server = self.db.share_server_update(
 1248             context, share_server['id'], update_data)
 1249 
 1250         self.share_rpcapi.unmanage_share_server(
 1251             context, share_server, force=force)
 1252 
 1253     def create_snapshot(self, context, share, name, description,
 1254                         force=False):
 1255         policy.check_policy(context, 'share', 'create_snapshot', share)
 1256 
 1257         if ((not force) and (share['status'] != constants.STATUS_AVAILABLE)):
 1258             msg = _("Source share status must be "
 1259                     "%s") % constants.STATUS_AVAILABLE
 1260             raise exception.InvalidShare(reason=msg)
 1261 
 1262         size = share['size']
 1263 
 1264         self._check_is_share_busy(share)
 1265 
 1266         try:
 1267             reservations = QUOTAS.reserve(
 1268                 context, snapshots=1, snapshot_gigabytes=size,
 1269                 share_type_id=share['instance']['share_type_id'])
 1270         except exception.OverQuota as e:
 1271             overs = e.kwargs['overs']
 1272             usages = e.kwargs['usages']
 1273             quotas = e.kwargs['quotas']
 1274 
 1275             def _consumed(name):
 1276                 return (usages[name]['reserved'] + usages[name]['in_use'])
 1277 
 1278             if 'snapshot_gigabytes' in overs:
 1279                 msg = ("Quota exceeded for %(s_pid)s, tried to create "
 1280                        "%(s_size)sG snapshot (%(d_consumed)dG of "
 1281                        "%(d_quota)dG already consumed).")
 1282                 LOG.warning(msg, {
 1283                     's_pid': context.project_id,
 1284                     's_size': size,
 1285                     'd_consumed': _consumed('snapshot_gigabytes'),
 1286                     'd_quota': quotas['snapshot_gigabytes']})
 1287                 raise exception.SnapshotSizeExceedsAvailableQuota()
 1288             elif 'snapshots' in overs:
 1289                 msg = ("Quota exceeded for %(s_pid)s, tried to create "
 1290                        "snapshot (%(d_consumed)d snapshots "
 1291                        "already consumed).")
 1292                 LOG.warning(msg, {'s_pid': context.project_id,
 1293                                   'd_consumed': _consumed('snapshots')})
 1294                 raise exception.SnapshotLimitExceeded(
 1295                     allowed=quotas['snapshots'])
 1296         options = {'share_id': share['id'],
 1297                    'size': share['size'],
 1298                    'user_id': context.user_id,
 1299                    'project_id': context.project_id,
 1300                    'status': constants.STATUS_CREATING,
 1301                    'progress': '0%',
 1302                    'share_size': share['size'],
 1303                    'display_name': name,
 1304                    'display_description': description,
 1305                    'share_proto': share['share_proto']}
 1306 
 1307         try:
 1308             snapshot = self.db.share_snapshot_create(context, options)
 1309             QUOTAS.commit(
 1310                 context, reservations,
 1311                 share_type_id=share['instance']['share_type_id'])
 1312         except Exception:
 1313             with excutils.save_and_reraise_exception():
 1314                 try:
 1315                     self.db.snapshot_delete(context, share['id'])
 1316                 finally:
 1317                     QUOTAS.rollback(
 1318                         context, reservations,
 1319                         share_type_id=share['instance']['share_type_id'])
 1320 
 1321         # If replicated share, create snapshot instances for each replica
 1322         if share.get('has_replicas'):
 1323             snapshot = self.db.share_snapshot_get(context, snapshot['id'])
 1324             share_instance_id = snapshot['instance']['share_instance_id']
 1325             replicas = self.db.share_replicas_get_all_by_share(
 1326                 context, share['id'])
 1327             replicas = [r for r in replicas if r['id'] != share_instance_id]
 1328             snapshot_instance = {
 1329                 'status': constants.STATUS_CREATING,
 1330                 'progress': '0%',
 1331             }
 1332             for replica in replicas:
 1333                 snapshot_instance.update({'share_instance_id': replica['id']})
 1334                 self.db.share_snapshot_instance_create(
 1335                     context, snapshot['id'], snapshot_instance)
 1336             self.share_rpcapi.create_replicated_snapshot(
 1337                 context, share, snapshot)
 1338 
 1339         else:
 1340             self.share_rpcapi.create_snapshot(context, share, snapshot)
 1341 
 1342         return snapshot
 1343 
 1344     def migration_start(
 1345             self, context, share, dest_host, force_host_assisted_migration,
 1346             preserve_metadata, writable, nondisruptive, preserve_snapshots,
 1347             new_share_network=None, new_share_type=None):
 1348         """Migrates share to a new host."""
 1349 
 1350         if force_host_assisted_migration and (
 1351                 preserve_metadata or writable or nondisruptive or
 1352                 preserve_snapshots):
 1353             msg = _('Invalid parameter combination. Cannot set parameters '
 1354                     '"nondisruptive", "writable", "preserve_snapshots" or '
 1355                     '"preserve_metadata" to True when enabling the '
 1356                     '"force_host_assisted_migration" option.')
 1357             LOG.error(msg)
 1358             raise exception.InvalidInput(reason=msg)
 1359 
 1360         share_instance = share.instance
 1361 
 1362         # NOTE(gouthamr): Ensure share does not have replicas.
 1363         # Currently share migrations are disallowed for replicated shares.
 1364         if share.has_replicas:
 1365             msg = _('Share %s has replicas. Remove the replicas before '
 1366                     'attempting to migrate the share.') % share['id']
 1367             LOG.error(msg)
 1368             raise exception.Conflict(err=msg)
 1369 
 1370         # TODO(ganso): We do not support migrating shares in or out of groups
 1371         # for now.
 1372         if share.get('share_group_id'):
 1373             msg = _('Share %s is a member of a group. This operation is not '
 1374                     'currently supported for shares that are members of '
 1375                     'groups.') % share['id']
 1376             LOG.error(msg)
 1377             raise exception.InvalidShare(reason=msg)
 1378 
 1379         # We only handle "available" share for now
 1380         if share_instance['status'] != constants.STATUS_AVAILABLE:
 1381             msg = _('Share instance %(instance_id)s status must be available, '
 1382                     'but current status is: %(instance_status)s.') % {
 1383                 'instance_id': share_instance['id'],
 1384                 'instance_status': share_instance['status']}
 1385             raise exception.InvalidShare(reason=msg)
 1386 
 1387         # Access rules status must not be error
 1388         if share_instance['access_rules_status'] == constants.STATUS_ERROR:
 1389             msg = _('Share instance %(instance_id)s access rules status must '
 1390                     'not be in %(error)s when attempting to start a '
 1391                     'migration.') % {
 1392                 'instance_id': share_instance['id'],
 1393                 'error': constants.STATUS_ERROR}
 1394             raise exception.InvalidShare(reason=msg)
 1395 
 1396         self._check_is_share_busy(share)
 1397 
 1398         if force_host_assisted_migration:
 1399             # We only handle shares without snapshots for
 1400             # host-assisted migration
 1401             snaps = self.db.share_snapshot_get_all_for_share(context,
 1402                                                              share['id'])
 1403             if snaps:
 1404                 msg = _("Share %s must not have snapshots when using "
 1405                         "host-assisted migration.") % share['id']
 1406                 raise exception.Conflict(err=msg)
 1407 
 1408         dest_host_host = share_utils.extract_host(dest_host)
 1409 
 1410         # Make sure the host is in the list of available hosts
 1411         utils.validate_service_host(context, dest_host_host)
 1412 
 1413         if new_share_type:
 1414             share_type = new_share_type
 1415             new_share_type_id = new_share_type['id']
 1416             dhss = share_type['extra_specs']['driver_handles_share_servers']
 1417             dhss = strutils.bool_from_string(dhss, strict=True)
 1418             if (dhss and not new_share_network and
 1419                     not share_instance['share_network_id']):
 1420                 msg = _(
 1421                     "New share network must be provided when share type of"
 1422                     " given share %s has extra_spec "
 1423                     "'driver_handles_share_servers' as True.") % share['id']
 1424                 raise exception.InvalidInput(reason=msg)
 1425         else:
 1426             share_type = {}
 1427             share_type_id = share_instance['share_type_id']
 1428             if share_type_id:
 1429                 share_type = share_types.get_share_type(context, share_type_id)
 1430             new_share_type_id = share_instance['share_type_id']
 1431 
 1432         dhss = share_type['extra_specs']['driver_handles_share_servers']
 1433         dhss = strutils.bool_from_string(dhss, strict=True)
 1434 
 1435         if dhss:
 1436             if new_share_network:
 1437                 new_share_network_id = new_share_network['id']
 1438             else:
 1439                 new_share_network_id = share_instance['share_network_id']
 1440         else:
 1441             if new_share_network:
 1442                 msg = _(
 1443                     "New share network must not be provided when share type of"
 1444                     " given share %s has extra_spec "
 1445                     "'driver_handles_share_servers' as False.") % share['id']
 1446                 raise exception.InvalidInput(reason=msg)
 1447 
 1448             new_share_network_id = None
 1449 
 1450         # Make sure the destination is different than the source
 1451         if (new_share_network_id == share_instance['share_network_id'] and
 1452                 new_share_type_id == share_instance['share_type_id'] and
 1453                 dest_host == share_instance['host']):
 1454             msg = ("Destination host (%(dest_host)s), share network "
 1455                    "(%(dest_sn)s) or share type (%(dest_st)s) are the same "
 1456                    "as the current host's '%(src_host)s', '%(src_sn)s' and "
 1457                    "'%(src_st)s' respectively. Nothing to be done.") % {
 1458                        'dest_host': dest_host,
 1459                        'dest_sn': new_share_network_id,
 1460                        'dest_st': new_share_type_id,
 1461                        'src_host': share_instance['host'],
 1462                        'src_sn': share_instance['share_network_id'],
 1463                        'src_st': share_instance['share_type_id'],
 1464                        }
 1465             LOG.info(msg)
 1466             self.db.share_update(
 1467                 context, share['id'],
 1468                 {'task_state': constants.TASK_STATE_MIGRATION_SUCCESS})
 1469             return 200
 1470 
 1471         service = self.db.service_get_by_args(
 1472             context, dest_host_host, 'manila-share')
 1473 
 1474         type_azs = share_type['extra_specs'].get('availability_zones', '')
 1475         type_azs = [t for t in type_azs.split(',') if type_azs]
 1476         if type_azs and service['availability_zone']['name'] not in type_azs:
 1477             msg = _("Share %(shr)s cannot be migrated to host %(dest)s "
 1478                     "because share type %(type)s is not supported within the "
 1479                     "availability zone (%(az)s) that the host is in.")
 1480             type_name = '%s' % (share_type['name'] or '')
 1481             type_id = '(ID: %s)' % share_type['id']
 1482             payload = {'type': '%s%s' % (type_name, type_id),
 1483                        'az': service['availability_zone']['name'],
 1484                        'shr': share['id'],
 1485                        'dest': dest_host}
 1486             raise exception.InvalidShare(reason=msg % payload)
 1487 
 1488         request_spec = self._get_request_spec_dict(
 1489             share,
 1490             share_type,
 1491             availability_zone_id=service['availability_zone_id'],
 1492             share_network_id=new_share_network_id)
 1493 
 1494         self.db.share_update(
 1495             context, share['id'],
 1496             {'task_state': constants.TASK_STATE_MIGRATION_STARTING})
 1497 
 1498         self.db.share_instance_update(context, share_instance['id'],
 1499                                       {'status': constants.STATUS_MIGRATING})
 1500 
 1501         self.scheduler_rpcapi.migrate_share_to_host(
 1502             context, share['id'], dest_host, force_host_assisted_migration,
 1503             preserve_metadata, writable, nondisruptive, preserve_snapshots,
 1504             new_share_network_id, new_share_type_id, request_spec)
 1505 
 1506         return 202
 1507 
 1508     def migration_complete(self, context, share):
 1509 
 1510         if share['task_state'] not in (
 1511                 constants.TASK_STATE_DATA_COPYING_COMPLETED,
 1512                 constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE):
 1513             msg = self._migration_validate_error_message(share)
 1514             if msg is None:
 1515                 msg = _("First migration phase of share %s not completed"
 1516                         " yet.") % share['id']
 1517             LOG.error(msg)
 1518             raise exception.InvalidShare(reason=msg)
 1519 
 1520         share_instance_id, new_share_instance_id = (
 1521             self.get_migrating_instances(share))
 1522 
 1523         share_instance_ref = self.db.share_instance_get(
 1524             context, share_instance_id, with_share_data=True)
 1525 
 1526         self.share_rpcapi.migration_complete(context, share_instance_ref,
 1527                                              new_share_instance_id)
 1528 
 1529     def get_migrating_instances(self, share):
 1530 
 1531         share_instance_id = None
 1532         new_share_instance_id = None
 1533 
 1534         for instance in share.instances:
 1535             if instance['status'] == constants.STATUS_MIGRATING:
 1536                 share_instance_id = instance['id']
 1537             if instance['status'] == constants.STATUS_MIGRATING_TO:
 1538                 new_share_instance_id = instance['id']
 1539 
 1540         if None in (share_instance_id, new_share_instance_id):
 1541             msg = _("Share instances %(instance_id)s and "
 1542                     "%(new_instance_id)s in inconsistent states, cannot"
 1543                     " continue share migration for share %(share_id)s"
 1544                     ".") % {'instance_id': share_instance_id,
 1545                             'new_instance_id': new_share_instance_id,
 1546                             'share_id': share['id']}
 1547             raise exception.ShareMigrationFailed(reason=msg)
 1548 
 1549         return share_instance_id, new_share_instance_id
 1550 
 1551     def migration_get_progress(self, context, share):
 1552 
 1553         if share['task_state'] == (
 1554                 constants.TASK_STATE_MIGRATION_DRIVER_IN_PROGRESS):
 1555 
 1556             share_instance_id, migrating_instance_id = (
 1557                 self.get_migrating_instances(share))
 1558 
 1559             share_instance_ref = self.db.share_instance_get(
 1560                 context, share_instance_id, with_share_data=True)
 1561 
 1562             service_host = share_utils.extract_host(share_instance_ref['host'])
 1563 
 1564             service = self.db.service_get_by_args(
 1565                 context, service_host, 'manila-share')
 1566 
 1567             if utils.service_is_up(service):
 1568                 try:
 1569                     result = self.share_rpcapi.migration_get_progress(
 1570                         context, share_instance_ref, migrating_instance_id)
 1571                 except exception.InvalidShare:
 1572                     # reload to get the latest task_state
 1573                     share = self.db.share_get(context, share['id'])
 1574                     result = self._migration_get_progress_state(share)
 1575                 except Exception:
 1576                     msg = _("Failed to obtain migration progress of share "
 1577                             "%s.") % share['id']
 1578                     LOG.exception(msg)
 1579                     raise exception.ShareMigrationError(reason=msg)
 1580             else:
 1581                 result = None
 1582 
 1583         elif share['task_state'] == (
 1584                 constants.TASK_STATE_DATA_COPYING_IN_PROGRESS):
 1585             data_rpc = data_rpcapi.DataAPI()
 1586             LOG.info("Sending request to get share migration information"
 1587                      " of share %s.", share['id'])
 1588 
 1589             services = self.db.service_get_all_by_topic(context, 'manila-data')
 1590 
 1591             if len(services) > 0 and utils.service_is_up(services[0]):
 1592 
 1593                 try:
 1594                     result = data_rpc.data_copy_get_progress(
 1595                         context, share['id'])
 1596                 except Exception:
 1597                     msg = _("Failed to obtain migration progress of share "
 1598                             "%s.") % share['id']
 1599                     LOG.exception(msg)
 1600                     raise exception.ShareMigrationError(reason=msg)
 1601             else:
 1602                 result = None
 1603         else:
 1604             result = self._migration_get_progress_state(share)
 1605 
 1606         if not (result and result.get('total_progress') is not None):
 1607             msg = self._migration_validate_error_message(share)
 1608             if msg is None:
 1609                 msg = _("Migration progress of share %s cannot be obtained at "
 1610                         "this moment.") % share['id']
 1611             LOG.error(msg)
 1612             raise exception.InvalidShare(reason=msg)
 1613 
 1614         return result
 1615 
 1616     def _migration_get_progress_state(self, share):
 1617 
 1618         task_state = share['task_state']
 1619         if task_state in (constants.TASK_STATE_MIGRATION_SUCCESS,
 1620                           constants.TASK_STATE_DATA_COPYING_ERROR,
 1621                           constants.TASK_STATE_MIGRATION_CANCELLED,
 1622                           constants.TASK_STATE_MIGRATION_CANCEL_IN_PROGRESS,
 1623                           constants.TASK_STATE_MIGRATION_COMPLETING,
 1624                           constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE,
 1625                           constants.TASK_STATE_DATA_COPYING_COMPLETED,
 1626                           constants.TASK_STATE_DATA_COPYING_COMPLETING,
 1627                           constants.TASK_STATE_DATA_COPYING_CANCELLED,
 1628                           constants.TASK_STATE_MIGRATION_ERROR):
 1629             return {'total_progress': 100}
 1630         elif task_state in (constants.TASK_STATE_MIGRATION_STARTING,
 1631                             constants.TASK_STATE_MIGRATION_DRIVER_STARTING,
 1632                             constants.TASK_STATE_DATA_COPYING_STARTING,
 1633                             constants.TASK_STATE_MIGRATION_IN_PROGRESS):
 1634             return {'total_progress': 0}
 1635         else:
 1636             return None
 1637 
 1638     def _migration_validate_error_message(self, resource,
 1639                                           resource_type='share'):
 1640         task_state = resource['task_state']
 1641         if task_state == constants.TASK_STATE_MIGRATION_SUCCESS:
 1642             msg = _("Migration of %(resource_type)s %(resource_id)s has "
 1643                     "already completed.") % {
 1644                 'resource_id': resource['id'],
 1645                 'resource_type': resource_type}
 1646         elif task_state in (None, constants.TASK_STATE_MIGRATION_ERROR):
 1647             msg = _("There is no migration being performed for "
 1648                     "%(resource_type)s %(resource_id)s at this moment.") % {
 1649                 'resource_id': resource['id'],
 1650                 'resource_type': resource_type}
 1651         elif task_state == constants.TASK_STATE_MIGRATION_CANCELLED:
 1652             msg = _("Migration of %(resource_type)s %(resource_id)s was "
 1653                     "already cancelled.") % {
 1654                 'resource_id': resource['id'],
 1655                 'resource_type': resource_type}
 1656         elif task_state in (constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE,
 1657                             constants.TASK_STATE_DATA_COPYING_COMPLETED):
 1658             msg = _("Migration of %(resource_type)s %(resource_id)s has "
 1659                     "already completed first phase.") % {
 1660                 'resource_id': resource['id'],
 1661                 'resource_type': resource_type}
 1662         else:
 1663             return None
 1664         return msg
 1665 
 1666     def migration_cancel(self, context, share):
 1667 
 1668         migrating = True
 1669         if share['task_state'] in (
 1670                 constants.TASK_STATE_DATA_COPYING_COMPLETED,
 1671                 constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE,
 1672                 constants.TASK_STATE_MIGRATION_DRIVER_IN_PROGRESS):
 1673 
 1674             share_instance_id, migrating_instance_id = (
 1675                 self.get_migrating_instances(share))
 1676 
 1677             share_instance_ref = self.db.share_instance_get(
 1678                 context, share_instance_id, with_share_data=True)
 1679 
 1680             service_host = share_utils.extract_host(share_instance_ref['host'])
 1681 
 1682             service = self.db.service_get_by_args(
 1683                 context, service_host, 'manila-share')
 1684 
 1685             if utils.service_is_up(service):
 1686                 self.share_rpcapi.migration_cancel(
 1687                     context, share_instance_ref, migrating_instance_id)
 1688             else:
 1689                 migrating = False
 1690 
 1691         elif share['task_state'] == (
 1692                 constants.TASK_STATE_DATA_COPYING_IN_PROGRESS):
 1693 
 1694             data_rpc = data_rpcapi.DataAPI()
 1695             LOG.info("Sending request to cancel migration of "
 1696                      "share %s.", share['id'])
 1697 
 1698             services = self.db.service_get_all_by_topic(context, 'manila-data')
 1699 
 1700             if len(services) > 0 and utils.service_is_up(services[0]):
 1701                 try:
 1702                     data_rpc.data_copy_cancel(context, share['id'])
 1703                 except Exception:
 1704                     msg = _("Failed to cancel migration of share "
 1705                             "%s.") % share['id']
 1706                     LOG.exception(msg)
 1707                     raise exception.ShareMigrationError(reason=msg)
 1708             else:
 1709                 migrating = False
 1710 
 1711         else:
 1712             migrating = False
 1713 
 1714         if not migrating:
 1715             msg = self._migration_validate_error_message(share)
 1716             if msg is None:
 1717                 msg = _("Migration of share %s cannot be cancelled at this "
 1718                         "moment.") % share['id']
 1719             LOG.error(msg)
 1720             raise exception.InvalidShare(reason=msg)
 1721 
 1722     @policy.wrap_check_policy('share')
 1723     def delete_snapshot(self, context, snapshot, force=False):
 1724         statuses = (constants.STATUS_AVAILABLE, constants.STATUS_ERROR)
 1725         if not (force or snapshot['aggregate_status'] in statuses):
 1726             msg = _("Share Snapshot status must be one of %(statuses)s.") % {
 1727                 "statuses": statuses}
 1728             raise exception.InvalidShareSnapshot(reason=msg)
 1729 
 1730         share = self.db.share_get(context, snapshot['share_id'])
 1731 
 1732         snapshot_instances = (
 1733             self.db.share_snapshot_instance_get_all_with_filters(
 1734                 context, {'snapshot_ids': snapshot['id']})
 1735         )
 1736 
 1737         for snapshot_instance in snapshot_instances:
 1738             self.db.share_snapshot_instance_update(
 1739                 context, snapshot_instance['id'],
 1740                 {'status': constants.STATUS_DELETING})
 1741 
 1742         if share['has_replicas']:
 1743             self.share_rpcapi.delete_replicated_snapshot(
 1744                 context, snapshot, share['instance']['host'],
 1745                 share_id=share['id'], force=force)
 1746         else:
 1747             self.share_rpcapi.delete_snapshot(
 1748                 context, snapshot, share['instance']['host'], force=force)
 1749 
 1750     @policy.wrap_check_policy('share')
 1751     def update(self, context, share, fields):
 1752         return self.db.share_update(context, share['id'], fields)
 1753 
 1754     @policy.wrap_check_policy('share')
 1755     def snapshot_update(self, context, snapshot, fields):
 1756         return self.db.share_snapshot_update(context, snapshot['id'], fields)
 1757 
 1758     def get(self, context, share_id):
 1759         rv = self.db.share_get(context, share_id)
 1760         if not rv['is_public']:
 1761             policy.check_policy(context, 'share', 'get', rv)
 1762         return rv
 1763 
 1764     def get_all(self, context, search_opts=None, sort_key='created_at',
 1765                 sort_dir='desc'):
 1766         policy.check_policy(context, 'share', 'get_all')
 1767 
 1768         if search_opts is None:
 1769             search_opts = {}
 1770 
 1771         LOG.debug("Searching for shares by: %s", search_opts)
 1772 
 1773         # Prepare filters
 1774         filters = {}
 1775 
 1776         filter_keys = [
 1777             'display_name', 'share_group_id', 'display_name~',
 1778             'display_description', 'display_description~', 'snapshot_id',
 1779             'status', 'share_type_id', 'project_id', 'export_location_id',
 1780             'export_location_path', 'limit', 'offset', 'host',
 1781             'share_network_id']
 1782 
 1783         for key in filter_keys:
 1784             if key in search_opts:
 1785                 filters[key] = search_opts.pop(key)
 1786 
 1787         if 'metadata' in search_opts:
 1788             filters['metadata'] = search_opts.pop('metadata')
 1789             if not isinstance(filters['metadata'], dict):
 1790                 msg = _("Wrong metadata filter provided: "
 1791                         "%s.") % six.text_type(filters['metadata'])
 1792                 raise exception.InvalidInput(reason=msg)
 1793         if 'extra_specs' in search_opts:
 1794             # Verify policy for extra-specs access
 1795             policy.check_policy(context, 'share_types_extra_spec', 'index')
 1796             filters['extra_specs'] = search_opts.pop('extra_specs')
 1797             if not isinstance(filters['extra_specs'], dict):
 1798                 msg = _("Wrong extra specs filter provided: "
 1799                         "%s.") % six.text_type(filters['extra_specs'])
 1800                 raise exception.InvalidInput(reason=msg)
 1801 
 1802         if not (isinstance(sort_key, six.string_types) and sort_key):
 1803             msg = _("Wrong sort_key filter provided: "
 1804                     "'%s'.") % six.text_type(sort_key)
 1805             raise exception.InvalidInput(reason=msg)
 1806         if not (isinstance(sort_dir, six.string_types) and sort_dir):
 1807             msg = _("Wrong sort_dir filter provided: "
 1808                     "'%s'.") % six.text_type(sort_dir)
 1809             raise exception.InvalidInput(reason=msg)
 1810 
 1811         is_public = search_opts.pop('is_public', False)
 1812         is_public = strutils.bool_from_string(is_public, strict=True)
 1813 
 1814         # Get filtered list of shares
 1815         if 'host' in filters:
 1816             policy.check_policy(context, 'share', 'list_by_host')
 1817         if 'share_server_id' in search_opts:
 1818             # NOTE(vponomaryov): this is project_id independent
 1819             policy.check_policy(context, 'share', 'list_by_share_server_id')
 1820             shares = self.db.share_get_all_by_share_server(
 1821                 context, search_opts.pop('share_server_id'), filters=filters,
 1822                 sort_key=sort_key, sort_dir=sort_dir)
 1823         elif (context.is_admin and utils.is_all_tenants(search_opts)):
 1824             shares = self.db.share_get_all(
 1825                 context, filters=filters, sort_key=sort_key, sort_dir=sort_dir)
 1826         else:
 1827             shares = self.db.share_get_all_by_project(
 1828                 context, project_id=context.project_id, filters=filters,
 1829                 is_public=is_public, sort_key=sort_key, sort_dir=sort_dir)
 1830 
 1831         # NOTE(vponomaryov): we do not need 'all_tenants' opt anymore
 1832         search_opts.pop('all_tenants', None)
 1833 
 1834         if search_opts:
 1835             results = []
 1836             for s in shares:
 1837                 # values in search_opts can be only strings
 1838                 if (all(s.get(k, None) == v or (v in (s.get(k.rstrip('~'))
 1839                         if k.endswith('~') and s.get(k.rstrip('~')) else ()))
 1840                         for k, v in search_opts.items())):
 1841                     results.append(s)
 1842             shares = results
 1843         return shares
 1844 
 1845     def get_snapshot(self, context, snapshot_id):
 1846         policy.check_policy(context, 'share_snapshot', 'get_snapshot')
 1847         return self.db.share_snapshot_get(context, snapshot_id)
 1848 
 1849     def get_all_snapshots(self, context, search_opts=None,
 1850                           sort_key='share_id', sort_dir='desc'):
 1851         policy.check_policy(context, 'share_snapshot', 'get_all_snapshots')
 1852 
 1853         search_opts = search_opts or {}
 1854         LOG.debug("Searching for snapshots by: %s", search_opts)
 1855 
 1856         # Read and remove key 'all_tenants' if was provided
 1857         all_tenants = search_opts.pop('all_tenants', None)
 1858 
 1859         string_args = {'sort_key': sort_key, 'sort_dir': sort_dir}
 1860         string_args.update(search_opts)
 1861         for k, v in string_args.items():
 1862             if not (isinstance(v, six.string_types) and v):
 1863                 msg = _("Wrong '%(k)s' filter provided: "
 1864                         "'%(v)s'.") % {'k': k, 'v': string_args[k]}
 1865                 raise exception.InvalidInput(reason=msg)
 1866 
 1867         if (context.is_admin and all_tenants):
 1868             snapshots = self.db.share_snapshot_get_all(
 1869                 context, filters=search_opts,
 1870                 sort_key=sort_key, sort_dir=sort_dir)
 1871         else:
 1872             snapshots = self.db.share_snapshot_get_all_by_project(
 1873                 context, context.project_id, filters=search_opts,
 1874                 sort_key=sort_key, sort_dir=sort_dir)
 1875 
 1876         # Remove key 'usage' if provided
 1877         search_opts.pop('usage', None)
 1878 
 1879         if search_opts:
 1880             results = []
 1881             not_found = object()
 1882             for snapshot in snapshots:
 1883                 if (all(snapshot.get(k, not_found) == v or
 1884                         (v in snapshot.get(k.rstrip('~'))
 1885                         if k.endswith('~') and
 1886                         snapshot.get(k.rstrip('~')) else ())
 1887                         for k, v in search_opts.items())):
 1888                     results.append(snapshot)
 1889             snapshots = results
 1890         return snapshots
 1891 
 1892     def get_latest_snapshot_for_share(self, context, share_id):
 1893         """Get the newest snapshot of a share."""
 1894         return self.db.share_snapshot_get_latest_for_share(context, share_id)
 1895 
 1896     @staticmethod
 1897     def _is_invalid_share_instance(instance):
 1898         return (instance['host'] is None
 1899                 or instance['status'] in constants.
 1900                 INVALID_SHARE_INSTANCE_STATUSES_FOR_ACCESS_RULE_UPDATES)
 1901 
 1902     def allow_access(self, ctx, share, access_type, access_to,
 1903                      access_level=None, metadata=None):
 1904         """Allow access to share."""
 1905 
 1906         # Access rule validation:
 1907         if access_level not in constants.ACCESS_LEVELS + (None, ):
 1908             msg = _("Invalid share access level: %s.") % access_level
 1909             raise exception.InvalidShareAccess(reason=msg)
 1910 
 1911         self._check_metadata_properties(metadata)
 1912         access_exists = self.db.share_access_check_for_existing_access(
 1913             ctx, share['id'], access_type, access_to)
 1914 
 1915         if access_exists:
 1916             raise exception.ShareAccessExists(access_type=access_type,
 1917                                               access=access_to)
 1918 
 1919         # Share instance validation
 1920         if any(instance for instance in share.instances
 1921                if self._is_invalid_share_instance(instance)):
 1922             msg = _("New access rules cannot be applied while the share or "
 1923                     "any of its replicas or migration copies lacks a valid "
 1924                     "host or is in an invalid state.")
 1925             raise exception.InvalidShare(message=msg)
 1926 
 1927         values = {
 1928             'share_id': share['id'],
 1929             'access_type': access_type,
 1930             'access_to': access_to,
 1931             'access_level': access_level,
 1932             'metadata': metadata,
 1933         }
 1934 
 1935         access = self.db.share_access_create(ctx, values)
 1936 
 1937         for share_instance in share.instances:
 1938             self.allow_access_to_instance(ctx, share_instance)
 1939 
 1940         return access
 1941 
 1942     def allow_access_to_instance(self, context, share_instance):
 1943         self._conditionally_transition_share_instance_access_rules_status(
 1944             context, share_instance)
 1945         self.share_rpcapi.update_access(context, share_instance)
 1946 
 1947     def _conditionally_transition_share_instance_access_rules_status(
 1948             self, context, share_instance):
 1949         conditionally_change = {
 1950             constants.STATUS_ACTIVE: constants.SHARE_INSTANCE_RULES_SYNCING,
 1951         }
 1952         self.access_helper.get_and_update_share_instance_access_rules_status(
 1953             context, conditionally_change=conditionally_change,
 1954             share_instance_id=share_instance['id'])
 1955 
 1956     def deny_access(self, ctx, share, access):
 1957         """Deny access to share."""
 1958 
 1959         if any(instance for instance in share.instances if
 1960                self._is_invalid_share_instance(instance)):
 1961             msg = _("Access rules cannot be denied while the share, "
 1962                     "any of its replicas or migration copies lacks a valid "
 1963                     "host or is in an invalid state.")
 1964             raise exception.InvalidShare(message=msg)
 1965 
 1966         for share_instance in share.instances:
 1967             self.deny_access_to_instance(ctx, share_instance, access)
 1968 
 1969     def deny_access_to_instance(self, context, share_instance, access):
 1970         self._conditionally_transition_share_instance_access_rules_status(
 1971             context, share_instance)
 1972         updates = {'state': constants.ACCESS_STATE_QUEUED_TO_DENY}
 1973         self.access_helper.get_and_update_share_instance_access_rule(
 1974             context, access['id'], updates=updates,
 1975             share_instance_id=share_instance['id'])
 1976 
 1977         self.share_rpcapi.update_access(context, share_instance)
 1978 
 1979     def access_get_all(self, context, share, filters=None):
 1980         """Returns all access rules for share."""
 1981         policy.check_policy(context, 'share', 'access_get_all')
 1982         rules = self.db.share_access_get_all_for_share(
 1983             context, share['id'], filters=filters)
 1984         return rules
 1985 
 1986     def access_get(self, context, access_id):
 1987         """Returns access rule with the id."""
 1988         policy.check_policy(context, 'share', 'access_get')
 1989         rule = self.db.share_access_get(context, access_id)
 1990         return rule
 1991 
 1992     @policy.wrap_check_policy('share')
 1993     def get_share_metadata(self, context, share):
 1994         """Get all metadata associated with a share."""
 1995         rv = self.db.share_metadata_get(context, share['id'])
 1996         return dict(rv.items())
 1997 
 1998     @policy.wrap_check_policy('share')
 1999     def delete_share_metadata(self, context, share, key):
 2000         """Delete the given metadata item from a share."""
 2001         self.db.share_metadata_delete(context, share['id'], key)
 2002 
 2003     def _check_is_share_busy(self, share):
 2004         """Raises an exception if share is busy with an active task."""
 2005         if share.is_busy:
 2006             msg = _("Share %(share_id)s is busy as part of an active "
 2007                     "task: %(task)s.") % {
 2008                 'share_id': share['id'],
 2009                 'task': share['task_state']
 2010             }
 2011             raise exception.ShareBusyException(reason=msg)
 2012 
 2013     def _check_metadata_properties(self, metadata=None):
 2014         if not metadata:
 2015             metadata = {}
 2016 
 2017         for k, v in metadata.items():
 2018             if not k:
 2019                 msg = _("Metadata property key is blank.")
 2020                 LOG.warning(msg)
 2021                 raise exception.InvalidMetadata(message=msg)
 2022             if len(k) > 255:
 2023                 msg = _("Metadata property key is "
 2024                         "greater than 255 characters.")
 2025                 LOG.warning(msg)
 2026                 raise exception.InvalidMetadataSize(message=msg)
 2027             if not v:
 2028                 msg = _("Metadata property value is blank.")
 2029                 LOG.warning(msg)
 2030                 raise exception.InvalidMetadata(message=msg)
 2031             if len(v) > 1023:
 2032                 msg = _("Metadata property value is "
 2033                         "greater than 1023 characters.")
 2034                 LOG.warning(msg)
 2035                 raise exception.InvalidMetadataSize(message=msg)
 2036 
 2037     def update_share_access_metadata(self, context, access_id, metadata):
 2038         """Updates share access metadata."""
 2039         self._check_metadata_properties(metadata)
 2040         return self.db.share_access_metadata_update(
 2041             context, access_id, metadata)
 2042 
 2043     @policy.wrap_check_policy('share')
 2044     def update_share_metadata(self, context, share, metadata, delete=False):
 2045         """Updates or creates share metadata.
 2046 
 2047         If delete is True, metadata items that are not specified in the
 2048         `metadata` argument will be deleted.
 2049 
 2050         """
 2051         orig_meta = self.get_share_metadata(context, share)
 2052         if delete:
 2053             _metadata = metadata
 2054         else:
 2055             _metadata = orig_meta.copy()
 2056             _metadata.update(metadata)
 2057 
 2058         self._check_metadata_properties(_metadata)
 2059         self.db.share_metadata_update(context, share['id'],
 2060                                       _metadata, delete)
 2061 
 2062         return _metadata
 2063 
 2064     def get_share_network(self, context, share_net_id):
 2065         return self.db.share_network_get(context, share_net_id)
 2066 
 2067     def extend(self, context, share, new_size):
 2068         policy.check_policy(context, 'share', 'extend')
 2069 
 2070         if share['status'] != constants.STATUS_AVAILABLE:
 2071             msg_params = {
 2072                 'valid_status': constants.STATUS_AVAILABLE,
 2073                 'share_id': share['id'],
 2074                 'status': share['status'],
 2075             }
 2076             msg = _("Share %(share_id)s status must be '%(valid_status)s' "
 2077                     "to extend, but current status is: "
 2078                     "%(status)s.") % msg_params
 2079             raise exception.InvalidShare(reason=msg)
 2080 
 2081         self._check_is_share_busy(share)
 2082 
 2083         size_increase = int(new_size) - share['size']
 2084         if size_increase <= 0:
 2085             msg = (_("New size for extend must be greater "
 2086                      "than current size. (current: %(size)s, "
 2087                      "extended: %(new_size)s).") % {'new_size': new_size,
 2088                                                     'size': share['size']})
 2089             raise exception.InvalidInput(reason=msg)
 2090 
 2091         replicas = self.db.share_replicas_get_all_by_share(
 2092             context, share['id'])
 2093         supports_replication = len(replicas) > 0
 2094 
 2095         deltas = {
 2096             'project_id': share['project_id'],
 2097             'gigabytes': size_increase,
 2098             'user_id': share['user_id'],
 2099             'share_type_id': share['instance']['share_type_id']
 2100         }
 2101 
 2102         # NOTE(carloss): If the share type supports replication, we must get
 2103         # all the replicas that pertain to the share and calculate the final
 2104         # size (size to increase * amount of replicas), since all the replicas
 2105         # are going to be extended when the driver sync them.
 2106         if supports_replication:
 2107             replica_gigs_to_increase = len(replicas) * size_increase
 2108             deltas.update({'replica_gigabytes': replica_gigs_to_increase})
 2109 
 2110         try:
 2111             # we give the user_id of the share, to update the quota usage
 2112             # for the user, who created the share, because on share delete
 2113             # only this quota will be decreased
 2114             reservations = QUOTAS.reserve(context, **deltas)
 2115         except exception.OverQuota as exc:
 2116             # Check if the exceeded quota was 'gigabytes'
 2117             self._check_if_share_quotas_exceeded(context, exc, share['size'],
 2118                                                  operation='extend')
 2119             # NOTE(carloss): Check if the exceeded quota is
 2120             # 'replica_gigabytes'. If so the failure could be caused due to
 2121             # lack of quotas to extend the share's replicas, then the
 2122             # '_check_if_replica_quotas_exceeded' method can't be reused here
 2123             # since the error message must be different from the default one.
 2124             if supports_replication:
 2125                 overs = exc.kwargs['overs']
 2126                 usages = exc.kwargs['usages']
 2127                 quotas = exc.kwargs['quotas']
 2128 
 2129                 def _consumed(name):
 2130                     return (usages[name]['reserved'] + usages[name]['in_use'])
 2131 
 2132                 if 'replica_gigabytes' in overs:
 2133                     LOG.warning("Replica gigabytes quota exceeded "
 2134                                 "for %(s_pid)s, tried to extend "
 2135                                 "%(s_size)sG share (%(d_consumed)dG of "
 2136                                 "%(d_quota)dG already consumed).", {
 2137                                     's_pid': context.project_id,
 2138                                     's_size': share['size'],
 2139                                     'd_consumed': _consumed(
 2140                                         'replica_gigabytes'),
 2141                                     'd_quota': quotas['replica_gigabytes']})
 2142                     msg = _("Failed while extending a share with replication "
 2143                             "support. There is no available quota to extend "
 2144                             "the share and its %(count)d replicas. Maximum "
 2145                             "number of allowed replica_gigabytes is "
 2146                             "exceeded.") % {'count': len(replicas)}
 2147                     raise exception.ShareReplicaSizeExceedsAvailableQuota(
 2148                         message=msg)
 2149 
 2150         self.update(context, share, {'status': constants.STATUS_EXTENDING})
 2151         self.share_rpcapi.extend_share(context, share, new_size, reservations)
 2152         LOG.info("Extend share request issued successfully.",
 2153                  resource=share)
 2154 
 2155     def shrink(self, context, share, new_size):
 2156         policy.check_policy(context, 'share', 'shrink')
 2157 
 2158         status = six.text_type(share['status']).lower()
 2159         valid_statuses = (constants.STATUS_AVAILABLE,
 2160                           constants.STATUS_SHRINKING_POSSIBLE_DATA_LOSS_ERROR)
 2161 
 2162         if status not in valid_statuses:
 2163             msg_params = {
 2164                 'valid_status': ", ".join(valid_statuses),
 2165                 'share_id': share['id'],
 2166                 'status': status,
 2167             }
 2168             msg = _("Share %(share_id)s status must in (%(valid_status)s) "
 2169                     "to shrink, but current status is: "
 2170                     "%(status)s.") % msg_params
 2171             raise exception.InvalidShare(reason=msg)
 2172 
 2173         self._check_is_share_busy(share)
 2174 
 2175         size_decrease = int(share['size']) - int(new_size)
 2176         if size_decrease <= 0 or new_size <= 0:
 2177             msg = (_("New size for shrink must be less "
 2178                      "than current size and greater than 0 (current: %(size)s,"
 2179                      " new: %(new_size)s)") % {'new_size': new_size,
 2180                                                'size': share['size']})
 2181             raise exception.InvalidInput(reason=msg)
 2182 
 2183         self.update(context, share, {'status': constants.STATUS_SHRINKING})
 2184         self.share_rpcapi.shrink_share(context, share, new_size)
 2185         LOG.info("Shrink share (id=%(id)s) request issued successfully."
 2186                  " New size: %(size)s", {'id': share['id'],
 2187                                          'size': new_size})
 2188 
 2189     def snapshot_allow_access(self, context, snapshot, access_type, access_to):
 2190         """Allow access to a share snapshot."""
 2191         access_exists = self.db.share_snapshot_check_for_existing_access(
 2192             context, snapshot['id'], access_type, access_to)
 2193 
 2194         if access_exists:
 2195             raise exception.ShareSnapshotAccessExists(access_type=access_type,
 2196                                                       access=access_to)
 2197 
 2198         values = {
 2199             'share_snapshot_id': snapshot['id'],
 2200             'access_type': access_type,
 2201             'access_to': access_to,
 2202         }
 2203 
 2204         if any((instance['status'] != constants.STATUS_AVAILABLE) or
 2205                (instance['share_instance']['host'] is None)
 2206                for instance in snapshot.instances):
 2207             msg = _("New access rules cannot be applied while the snapshot or "
 2208                     "any of its replicas or migration copies lacks a valid "
 2209                     "host or is not in %s state.") % constants.STATUS_AVAILABLE
 2210 
 2211             raise exception.InvalidShareSnapshotInstance(reason=msg)
 2212 
 2213         access = self.db.share_snapshot_access_create(context, values)
 2214 
 2215         for snapshot_instance in snapshot.instances:
 2216             self.share_rpcapi.snapshot_update_access(
 2217                 context, snapshot_instance)
 2218 
 2219         return access
 2220 
 2221     def snapshot_deny_access(self, context, snapshot, access):
 2222         """Deny access to a share snapshot."""
 2223         if any((instance['status'] != constants.STATUS_AVAILABLE) or
 2224                (instance['share_instance']['host'] is None)
 2225                for instance in snapshot.instances):
 2226             msg = _("Access rules cannot be denied while the snapshot or "
 2227                     "any of its replicas or migration copies lacks a valid "
 2228                     "host or is not in %s state.") % constants.STATUS_AVAILABLE
 2229 
 2230             raise exception.InvalidShareSnapshotInstance(reason=msg)
 2231 
 2232         for snapshot_instance in snapshot.instances:
 2233             rule = self.db.share_snapshot_instance_access_get(
 2234                 context, access['id'], snapshot_instance['id'])
 2235             self.db.share_snapshot_instance_access_update(
 2236                 context, rule['access_id'], snapshot_instance['id'],
 2237                 {'state': constants.ACCESS_STATE_QUEUED_TO_DENY})
 2238             self.share_rpcapi.snapshot_update_access(
 2239                 context, snapshot_instance)
 2240 
 2241     def snapshot_access_get_all(self, context, snapshot):
 2242         """Returns all access rules for share snapshot."""
 2243         rules = self.db.share_snapshot_access_get_all_for_share_snapshot(
 2244             context, snapshot['id'], {})
 2245         return rules
 2246 
 2247     def snapshot_access_get(self, context, access_id):
 2248         """Returns snapshot access rule with the id."""
 2249         rule = self.db.share_snapshot_access_get(context, access_id)
 2250         return rule
 2251 
 2252     def snapshot_export_locations_get(self, context, snapshot):
 2253         return self.db.share_snapshot_export_locations_get(context, snapshot)
 2254 
 2255     def snapshot_export_location_get(self, context, el_id):
 2256         return self.db.share_snapshot_instance_export_location_get(context,
 2257                                                                    el_id)
 2258 
 2259     def share_server_migration_get_destination(self, context, source_server_id,
 2260                                                status=None):
 2261         filters = {'source_share_server_id': source_server_id}
 2262         if status:
 2263             filters.update({'status': status})
 2264 
 2265         dest_share_servers = self.db.share_server_get_all_with_filters(
 2266             context, filters=filters)
 2267         if not dest_share_servers:
 2268             msg = _("A destination share server wasn't found for source "
 2269                     "share server %s.") % source_server_id
 2270             raise exception.InvalidShareServer(reason=msg)
 2271         if len(dest_share_servers) > 1:
 2272             msg = _("More than one destination share server was found for "
 2273                     "source share server %s. Aborting...") % source_server_id
 2274             raise exception.InvalidShareServer(reason=msg)
 2275 
 2276         return dest_share_servers[0]
 2277 
 2278     def get_share_server_migration_request_spec_dict(
 2279             self, context, share_instances, snapshot_instances, **kwargs):
 2280         """Returns request specs related to share server and all its shares."""
 2281 
 2282         shares_total_size = sum([instance.get('size', 0)
 2283                                  for instance in share_instances])
 2284         snapshots_total_size = sum([instance.get('size', 0)
 2285                                     for instance in snapshot_instances])
 2286 
 2287         shares_req_spec = []
 2288         for share_instance in share_instances:
 2289             share_type_id = share_instance['share_type_id']
 2290             share_type = share_types.get_share_type(context, share_type_id)
 2291             req_spec = self._get_request_spec_dict(share_instance,
 2292                                                    share_type,
 2293                                                    **kwargs)
 2294             shares_req_spec.append(req_spec)
 2295 
 2296         server_request_spec = {
 2297             'shares_size': shares_total_size,
 2298             'snapshots_size': snapshots_total_size,
 2299             'shares_req_spec': shares_req_spec,
 2300         }
 2301         return server_request_spec
 2302 
 2303     def _migration_initial_checks(self, context, share_server, dest_host,
 2304                                   new_share_network):
 2305         shares = self.db.share_get_all_by_share_server(
 2306             context, share_server['id'])
 2307 
 2308         if len(shares) == 0:
 2309             msg = _("Share server %s does not have shares."
 2310                     % share_server['id'])
 2311             raise exception.InvalidShareServer(reason=msg)
 2312 
 2313         # We only handle "active" share servers for now
 2314         if share_server['status'] != constants.STATUS_ACTIVE:
 2315             msg = _('Share server %(server_id)s status must be active, '
 2316                     'but current status is: %(server_status)s.') % {
 2317                         'server_id': share_server['id'],
 2318                         'server_status': share_server['status']}
 2319             raise exception.InvalidShareServer(reason=msg)
 2320 
 2321         share_groups_related_to_share_server = (
 2322             self.db.share_group_get_all_by_share_server(
 2323                 context, share_server['id']))
 2324 
 2325         if share_groups_related_to_share_server:
 2326             msg = _("The share server %s can not be migrated because it is "
 2327                     "related to a share group.") % share_server['id']
 2328             raise exception.InvalidShareServer(reason=msg)
 2329 
 2330         # Same backend and same network, nothing changes
 2331         src_backend = share_utils.extract_host(share_server['host'],
 2332                                                level='backend_name')
 2333         dest_backend = share_utils.extract_host(dest_host,
 2334                                                 level='backend_name')
 2335         current_share_network_id = shares[0]['instance']['share_network_id']
 2336         if (src_backend == dest_backend and
 2337                 (new_share_network is None or
 2338                  new_share_network['id'] == current_share_network_id)):
 2339             msg = _('There is no difference between source and destination '
 2340                     'backends and between source and destination share '
 2341                     'networks. Share server migration will not proceed.')
 2342             raise exception.InvalidShareServer(reason=msg)
 2343 
 2344         filters = {'source_share_server_id': share_server['id'],
 2345                    'status': constants.STATUS_SERVER_MIGRATING_TO}
 2346         dest_share_servers = self.db.share_server_get_all_with_filters(
 2347             context, filters=filters)
 2348         if len(dest_share_servers):
 2349             msg = _("There is at least one destination share server pointing "
 2350                     "to this source share server. Clean up your environment "
 2351                     "before starting a new migration.")
 2352             raise exception.InvalidShareServer(reason=msg)
 2353 
 2354         dest_service_host = share_utils.extract_host(dest_host)
 2355         # Make sure the host is in the list of available hosts
 2356         utils.validate_service_host(context, dest_service_host)
 2357 
 2358         service = self.db.service_get_by_args(
 2359             context, dest_service_host, 'manila-share')
 2360 
 2361         # Get all share types
 2362         type_ids = set([share['instance']['share_type_id']
 2363                         for share in shares])
 2364         types = [share_types.get_share_type(context, type_id)
 2365                  for type_id in type_ids]
 2366 
 2367         # Check if share type azs are supported by the destination host
 2368         for share_type in types:
 2369             azs = share_type['extra_specs'].get('availability_zones', '')
 2370             if azs and service['availability_zone']['name'] not in azs:
 2371                 msg = _("Share server %(server)s cannot be migrated to host "
 2372                         "%(dest)s because the share type %(type)s is used by "
 2373                         "one of the shares, and this share type is not "
 2374                         "supported within the availability zone (%(az)s) that "
 2375                         "the host is in.")
 2376                 type_name = '%s' % (share_type['name'] or '')
 2377                 type_id = '(ID: %s)' % share_type['id']
 2378                 payload = {'type': '%s%s' % (type_name, type_id),
 2379                            'az': service['availability_zone']['name'],
 2380                            'server': share_server['id'],
 2381                            'dest': dest_host}
 2382                 raise exception.InvalidShareServer(reason=msg % payload)
 2383 
 2384         if new_share_network:
 2385             new_share_network_id = new_share_network['id']
 2386         else:
 2387             new_share_network_id = shares[0]['instance']['share_network_id']
 2388         # NOTE(carloss): check if the new or old share network has a subnet
 2389         # that spans the availability zone of the destination host, otherwise
 2390         # we should deny this operation.
 2391         dest_az = self.db.availability_zone_get(
 2392             context, service['availability_zone']['name'])
 2393         compatible_subnet = (
 2394             self.db.share_network_subnet_get_by_availability_zone_id(
 2395                 context, new_share_network_id, dest_az['id']))
 2396 
 2397         if not compatible_subnet:
 2398             msg = _("The share network %(network)s does not have a subnet "
 2399                     "that spans the destination host availability zone.")
 2400             payload = {'network': new_share_network_id}
 2401             raise exception.InvalidShareServer(reason=msg % payload)
 2402 
 2403         # NOTE(carloss): Refreshing the list of shares since something could've
 2404         # changed from the initial list.
 2405         shares = self.db.share_get_all_by_share_server(
 2406             context, share_server['id'])
 2407         for share in shares:
 2408             if share['status'] != constants.STATUS_AVAILABLE:
 2409                 msg = _('Share %(share_id)s status must be available, '
 2410                         'but current status is: %(share_status)s.') % {
 2411                             'share_id': share['id'],
 2412                             'share_status': share['status']}
 2413                 raise exception.InvalidShareServer(reason=msg)
 2414 
 2415             if share.has_replicas:
 2416                 msg = _('Share %s has replicas. Remove the replicas of all '
 2417                         'shares in the share server before attempting to '
 2418                         'migrate it.') % share['id']
 2419                 LOG.error(msg)
 2420                 raise exception.InvalidShareServer(reason=msg)
 2421 
 2422             # NOTE(carloss): Not validating the flag preserve_snapshots at this
 2423             # point, considering that even if the admin set the value to False,
 2424             # the driver can still support preserving snapshots and the
 2425             # snapshots would be copied anyway. So the share/manager will be
 2426             # responsible for checking if the driver does not support snapshot
 2427             # preservation, and if there are snapshots in the share server.
 2428             share_snapshots = self.db.share_snapshot_get_all_for_share(
 2429                 context, share['id'])
 2430             all_snapshots_are_available = all(
 2431                 [snapshot['status'] == constants.STATUS_AVAILABLE
 2432                  for snapshot in share_snapshots])
 2433             if not all_snapshots_are_available:
 2434                 msg = _(
 2435                     "All snapshots must have '%(status)s' status to be "
 2436                     "migrated by the driver along with share "
 2437                     "%(resource_id)s.") % {
 2438                         'resource_id': share['id'],
 2439                         'status': constants.STATUS_AVAILABLE,
 2440                 }
 2441                 LOG.error(msg)
 2442                 raise exception.InvalidShareServer(reason=msg)
 2443 
 2444             if share.get('share_group_id'):
 2445                 msg = _('Share %s is a member of a group. This operation is '
 2446                         'not currently supported for share servers that '
 2447                         'contain shares members of  groups.') % share['id']
 2448                 LOG.error(msg)
 2449                 raise exception.InvalidShareServer(reason=msg)
 2450 
 2451             share_instance = share['instance']
 2452             # Access rules status must not be error
 2453             if share_instance['access_rules_status'] == constants.STATUS_ERROR:
 2454                 msg = _(
 2455                     'Share instance %(instance_id)s access rules status must '
 2456                     'not be in %(error)s when attempting to start a share '
 2457                     'server migration.') % {
 2458                         'instance_id': share_instance['id'],
 2459                         'error': constants.STATUS_ERROR}
 2460                 raise exception.InvalidShareServer(reason=msg)
 2461             try:
 2462                 self._check_is_share_busy(share)
 2463             except exception.ShareBusyException as e:
 2464                 raise exception.InvalidShareServer(reason=e.msg)
 2465 
 2466         return shares, types, service, new_share_network_id
 2467 
 2468     def share_server_migration_check(self, context, share_server, dest_host,
 2469                                      writable, nondisruptive,
 2470                                      preserve_snapshots,
 2471                                      new_share_network=None):
 2472         """Migrates share server to a new host."""
 2473         shares, types, service, new_share_network_id = (
 2474             self._migration_initial_checks(context, share_server, dest_host,
 2475                                            new_share_network))
 2476 
 2477         # NOTE(dviroel): Service is up according to validations made on initial
 2478         # checks
 2479         result = self.share_rpcapi.share_server_migration_check(
 2480             context, share_server['id'], dest_host, writable, nondisruptive,
 2481             preserve_snapshots, new_share_network_id)
 2482 
 2483         return result
 2484 
 2485     def share_server_migration_start(
 2486             self, context, share_server, dest_host, writable, nondisruptive,
 2487             preserve_snapshots, new_share_network=None):
 2488         """Migrates share server to a new host."""
 2489 
 2490         shares, types, dest_service, new_share_network_id = (
 2491             self._migration_initial_checks(context, share_server,
 2492                                            dest_host,
 2493                                            new_share_network))
 2494 
 2495         # Updates the share server status to migration starting
 2496         self.db.share_server_update(
 2497             context, share_server['id'],
 2498             {'task_state': constants.TASK_STATE_MIGRATION_STARTING,
 2499              'status': constants.STATUS_SERVER_MIGRATING})
 2500 
 2501         share_snapshots = [
 2502             self.db.share_snapshot_get_all_for_share(context, share['id'])
 2503             for share in shares]
 2504         snapshot_instance_ids = []
 2505         for snapshot_list in share_snapshots:
 2506             for snapshot in snapshot_list:
 2507                 snapshot_instance_ids.append(snapshot['instance']['id'])
 2508         share_instance_ids = [share['instance']['id'] for share in shares]
 2509 
 2510         # Updates all shares and snapshot instances
 2511         self.db.share_and_snapshot_instances_status_update(
 2512             context, {'status': constants.STATUS_SERVER_MIGRATING},
 2513             share_instance_ids=share_instance_ids,
 2514             snapshot_instance_ids=snapshot_instance_ids,
 2515             current_expected_status=constants.STATUS_AVAILABLE
 2516         )
 2517 
 2518         # NOTE(dviroel): Service is up according to validations made on initial
 2519         # checks
 2520         self.share_rpcapi.share_server_migration_start(
 2521             context, share_server, dest_host, writable, nondisruptive,
 2522             preserve_snapshots, new_share_network_id)
 2523 
 2524     def share_server_migration_complete(self, context, share_server):
 2525         """Invokes 2nd phase of share server migration."""
 2526         if share_server['status'] != constants.STATUS_SERVER_MIGRATING:
 2527             msg = _("Share server %s is not migrating") % share_server['id']
 2528             LOG.error(msg)
 2529             raise exception.InvalidShareServer(reason=msg)
 2530         if (share_server['task_state'] !=
 2531                 constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE):
 2532             msg = _("The first phase of migration has to finish to "
 2533                     "request the completion of server %s's "
 2534                     "migration.") % share_server['id']
 2535             LOG.error(msg)
 2536             raise exception.InvalidShareServer(reason=msg)
 2537 
 2538         dest_share_server = self.share_server_migration_get_destination(
 2539             context, share_server['id'],
 2540             status=constants.STATUS_SERVER_MIGRATING_TO
 2541         )
 2542 
 2543         dest_host = share_utils.extract_host(dest_share_server['host'])
 2544         utils.validate_service_host(context, dest_host)
 2545 
 2546         self.share_rpcapi.share_server_migration_complete(
 2547             context, dest_share_server['host'], share_server,
 2548             dest_share_server)
 2549 
 2550         return {
 2551             'destination_share_server_id': dest_share_server['id']
 2552         }
 2553 
 2554     def share_server_migration_cancel(self, context, share_server):
 2555         """Attempts to cancel share server migration."""
 2556         if share_server['status'] != constants.STATUS_SERVER_MIGRATING:
 2557             msg = _("Migration of share server %s cannot be cancelled because "
 2558                     "the provided share server is not being migrated.")
 2559             LOG.error(msg)
 2560             raise exception.InvalidShareServer(reason=msg)
 2561 
 2562         if share_server['task_state'] in (
 2563                 constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE,
 2564                 constants.TASK_STATE_MIGRATION_DRIVER_IN_PROGRESS):
 2565 
 2566             dest_share_server = self.share_server_migration_get_destination(
 2567                 context, share_server['id'],
 2568                 status=constants.STATUS_SERVER_MIGRATING_TO
 2569             )
 2570 
 2571             dest_host = share_utils.extract_host(dest_share_server['host'])
 2572             utils.validate_service_host(context, dest_host)
 2573 
 2574             self.share_rpcapi.share_server_migration_cancel(
 2575                 context, dest_share_server['host'], share_server,
 2576                 dest_share_server)
 2577         else:
 2578             msg = self._migration_validate_error_message(
 2579                 share_server, resource_type='share_server')
 2580             if msg is None:
 2581                 msg = _("Migration of share server %s can be cancelled only "
 2582                         "after the driver already started the migration, or "
 2583                         "when the first phase of the migration gets "
 2584                         "completed.") % share_server['id']
 2585             LOG.error(msg)
 2586             raise exception.InvalidShareServer(reason=msg)
 2587 
 2588     def share_server_migration_get_progress(self, context,
 2589                                             src_share_server_id):
 2590         """Retrieve migration progress for a given share server."""
 2591         try:
 2592             share_server = self.db.share_server_get(context,
 2593                                                     src_share_server_id)
 2594         except exception.ShareServerNotFound:
 2595             msg = _('Share server %s was not found. We will search for a '
 2596                     'successful migration') % src_share_server_id
 2597             LOG.debug(msg)
 2598             # Search for a successful migration, raise an error if not found
 2599             dest_share_server = self.share_server_migration_get_destination(
 2600                 context, src_share_server_id,
 2601                 status=constants.STATUS_ACTIVE
 2602             )
 2603             return {
 2604                 'total_progress': 100,
 2605                 'destination_share_server_id': dest_share_server['id'],
 2606                 'task_state': dest_share_server['task_state'],
 2607             }
 2608         # Source server still exists so it must be in 'server_migrating' status
 2609         if share_server['status'] != constants.STATUS_SERVER_MIGRATING:
 2610             msg = _("Migration progress of share server %s cannot be "
 2611                     "obtained. The provided share server is not being "
 2612                     "migrated.") % share_server['id']
 2613             LOG.error(msg)
 2614             raise exception.InvalidShareServer(reason=msg)
 2615 
 2616         dest_share_server = self.share_server_migration_get_destination(
 2617             context, share_server['id'],
 2618             status=constants.STATUS_SERVER_MIGRATING_TO
 2619         )
 2620 
 2621         if (share_server['task_state'] ==
 2622                 constants.TASK_STATE_MIGRATION_DRIVER_IN_PROGRESS):
 2623 
 2624             dest_host = share_utils.extract_host(dest_share_server['host'])
 2625             utils.validate_service_host(context, dest_host)
 2626 
 2627             try:
 2628                 result = (
 2629                     self.share_rpcapi.share_server_migration_get_progress(
 2630                         context, dest_share_server['host'],
 2631                         share_server, dest_share_server))
 2632             except Exception:
 2633                 msg = _("Failed to obtain migration progress of share "
 2634                         "server %s.") % share_server['id']
 2635                 LOG.exception(msg)
 2636                 raise exception.ShareServerMigrationError(reason=msg)
 2637 
 2638         else:
 2639             result = self._migration_get_progress_state(share_server)
 2640 
 2641         if not (result and result.get('total_progress') is not None):
 2642             msg = self._migration_validate_error_message(
 2643                 share_server, resource_type='share_server')
 2644             if msg is None:
 2645                 msg = _("Migration progress of share server %s cannot be "
 2646                         "obtained at this moment.") % share_server['id']
 2647             LOG.error(msg)
 2648             raise exception.InvalidShareServer(reason=msg)
 2649 
 2650         result.update({
 2651             'destination_share_server_id': dest_share_server['id'],
 2652             'task_state': dest_share_server['task_state']
 2653         })
 2654         return result