"Fossies" - the Fresh Open Source Software Archive 
Member "manila-11.0.1/manila/share/drivers/zfsonlinux/driver.py" (1 Feb 2021, 68263 Bytes) of package /linux/misc/openstack/manila-11.0.1.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "driver.py" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
11.0.0_vs_11.0.1.
1 # Copyright 2016 Mirantis Inc.
2 # All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15
16 """
17 Module with ZFSonLinux share driver that utilizes ZFS filesystem resources
18 and exports them as shares.
19 """
20
21 import math
22 import os
23 import time
24
25 from oslo_config import cfg
26 from oslo_log import log
27 from oslo_utils import importutils
28 from oslo_utils import strutils
29 from oslo_utils import timeutils
30
31 from manila.common import constants
32 from manila import exception
33 from manila.i18n import _
34 from manila.share import configuration
35 from manila.share import driver
36 from manila.share.drivers.zfsonlinux import utils as zfs_utils
37 from manila.share.manager import share_manager_opts # noqa
38 from manila.share import share_types
39 from manila.share import utils as share_utils
40 from manila import utils
41
42
43 zfsonlinux_opts = [
44 cfg.HostAddressOpt(
45 "zfs_share_export_ip",
46 required=True,
47 help="IP to be added to user-facing export location. Required."),
48 cfg.HostAddressOpt(
49 "zfs_service_ip",
50 required=True,
51 help="IP to be added to admin-facing export location. Required."),
52 cfg.ListOpt(
53 "zfs_zpool_list",
54 required=True,
55 help="Specify list of zpools that are allowed to be used by backend. "
56 "Can contain nested datasets. Examples: "
57 "Without nested dataset: 'zpool_name'. "
58 "With nested dataset: 'zpool_name/nested_dataset_name'. "
59 "Required."),
60 cfg.ListOpt(
61 "zfs_dataset_creation_options",
62 help="Define here list of options that should be applied "
63 "for each dataset creation if needed. Example: "
64 "compression=gzip,dedup=off. "
65 "Note that, for secondary replicas option 'readonly' will be set "
66 "to 'on' and for active replicas to 'off' in any way. "
67 "Also, 'quota' will be equal to share size. Optional."),
68 cfg.StrOpt(
69 "zfs_dataset_name_prefix",
70 default='manila_share_',
71 help="Prefix to be used in each dataset name. Optional."),
72 cfg.StrOpt(
73 "zfs_dataset_snapshot_name_prefix",
74 default='manila_share_snapshot_',
75 help="Prefix to be used in each dataset snapshot name. Optional."),
76 cfg.BoolOpt(
77 "zfs_use_ssh",
78 default=False,
79 help="Remote ZFS storage hostname that should be used for SSH'ing. "
80 "Optional."),
81 cfg.StrOpt(
82 "zfs_ssh_username",
83 help="SSH user that will be used in 2 cases: "
84 "1) By manila-share service in case it is located on different "
85 "host than its ZFS storage. "
86 "2) By manila-share services with other ZFS backends that "
87 "perform replication. "
88 "It is expected that SSH'ing will be key-based, passwordless. "
89 "This user should be passwordless sudoer. Optional."),
90 cfg.StrOpt(
91 "zfs_ssh_user_password",
92 secret=True,
93 help="Password for user that is used for SSH'ing ZFS storage host. "
94 "Not used for replication operations. They require "
95 "passwordless SSH access. Optional."),
96 cfg.StrOpt(
97 "zfs_ssh_private_key_path",
98 help="Path to SSH private key that should be used for SSH'ing ZFS "
99 "storage host. Not used for replication operations. Optional."),
100 cfg.ListOpt(
101 "zfs_share_helpers",
102 required=True,
103 default=[
104 "NFS=manila.share.drivers.zfsonlinux.utils.NFSviaZFSHelper",
105 ],
106 help="Specify list of share export helpers for ZFS storage. "
107 "It should look like following: "
108 "'FOO_protocol=foo.FooClass,BAR_protocol=bar.BarClass'. "
109 "Required."),
110 cfg.StrOpt(
111 "zfs_replica_snapshot_prefix",
112 required=True,
113 default="tmp_snapshot_for_replication_",
114 help="Set snapshot prefix for usage in ZFS replication. Required."),
115 cfg.StrOpt(
116 "zfs_migration_snapshot_prefix",
117 required=True,
118 default="tmp_snapshot_for_share_migration_",
119 help="Set snapshot prefix for usage in ZFS migration. Required."),
120 ]
121
122 CONF = cfg.CONF
123 CONF.register_opts(zfsonlinux_opts)
124 LOG = log.getLogger(__name__)
125
126
127 def ensure_share_server_not_provided(f):
128
129 def wrap(self, context, *args, **kwargs):
130 server = kwargs.get(
131 "share_server", kwargs.get("destination_share_server"))
132 if server:
133 raise exception.InvalidInput(
134 reason=_("Share server handling is not available. "
135 "But 'share_server' was provided. '%s'. "
136 "Share network should not be used.") % server.get(
137 "id", server))
138 return f(self, context, *args, **kwargs)
139
140 return wrap
141
142
143 def get_backend_configuration(backend_name):
144 config_stanzas = CONF.list_all_sections()
145 if backend_name not in config_stanzas:
146 msg = _("Could not find backend stanza %(backend_name)s in "
147 "configuration which is required for share replication and "
148 "migration. Available stanzas are %(stanzas)s")
149 params = {
150 "stanzas": config_stanzas,
151 "backend_name": backend_name,
152 }
153 raise exception.BadConfigurationException(reason=msg % params)
154
155 config = configuration.Configuration(
156 driver.share_opts, config_group=backend_name)
157 config.append_config_values(zfsonlinux_opts)
158 config.append_config_values(share_manager_opts)
159 config.append_config_values(driver.ssh_opts)
160
161 return config
162
163
164 class ZFSonLinuxShareDriver(zfs_utils.ExecuteMixin, driver.ShareDriver):
165
166 def __init__(self, *args, **kwargs):
167 super(ZFSonLinuxShareDriver, self).__init__(
168 [False], *args, config_opts=[zfsonlinux_opts], **kwargs)
169 self.replica_snapshot_prefix = (
170 self.configuration.zfs_replica_snapshot_prefix)
171 self.migration_snapshot_prefix = (
172 self.configuration.zfs_migration_snapshot_prefix)
173 self.backend_name = self.configuration.safe_get(
174 'share_backend_name') or 'ZFSonLinux'
175 self.zpool_list = self._get_zpool_list()
176 self.dataset_creation_options = (
177 self.configuration.zfs_dataset_creation_options)
178 self.share_export_ip = self.configuration.zfs_share_export_ip
179 self.service_ip = self.configuration.zfs_service_ip
180 self.private_storage = kwargs.get('private_storage')
181 self._helpers = {}
182
183 # Set config based capabilities
184 self._init_common_capabilities()
185
186 self._shell_executors = {}
187
188 def _get_shell_executor_by_host(self, host):
189 backend_name = share_utils.extract_host(host, level='backend_name')
190 if backend_name in CONF.enabled_share_backends:
191 # Return executor of this host
192 return self.execute
193 elif backend_name not in self._shell_executors:
194 config = get_backend_configuration(backend_name)
195 self._shell_executors[backend_name] = (
196 zfs_utils.get_remote_shell_executor(
197 ip=config.zfs_service_ip,
198 port=22,
199 conn_timeout=config.ssh_conn_timeout,
200 login=config.zfs_ssh_username,
201 password=config.zfs_ssh_user_password,
202 privatekey=config.zfs_ssh_private_key_path,
203 max_size=10,
204 )
205 )
206 # Return executor of remote host
207 return self._shell_executors[backend_name]
208
209 def _init_common_capabilities(self):
210 self.common_capabilities = {}
211 if 'dedup=on' in self.dataset_creation_options:
212 self.common_capabilities['dedupe'] = [True]
213 elif 'dedup=off' in self.dataset_creation_options:
214 self.common_capabilities['dedupe'] = [False]
215 else:
216 self.common_capabilities['dedupe'] = [True, False]
217
218 if 'compression=off' in self.dataset_creation_options:
219 self.common_capabilities['compression'] = [False]
220 elif any('compression=' in option
221 for option in self.dataset_creation_options):
222 self.common_capabilities['compression'] = [True]
223 else:
224 self.common_capabilities['compression'] = [True, False]
225
226 # NOTE(vponomaryov): Driver uses 'quota' approach for
227 # ZFS dataset. So, we can consider it as
228 # 'always thin provisioned' because this driver never reserves
229 # space for dataset.
230 self.common_capabilities['thin_provisioning'] = [True]
231 self.common_capabilities['max_over_subscription_ratio'] = (
232 self.configuration.max_over_subscription_ratio)
233 self.common_capabilities['qos'] = [False]
234
235 def _get_zpool_list(self):
236 zpools = []
237 for zpool in self.configuration.zfs_zpool_list:
238 zpool_name = zpool.split('/')[0]
239 if zpool_name in zpools:
240 raise exception.BadConfigurationException(
241 reason=_("Using the same zpool twice is prohibited. "
242 "Duplicate is '%(zpool)s'. List of zpools: "
243 "%(zpool_list)s.") % {
244 'zpool': zpool,
245 'zpool_list': ', '.join(
246 self.configuration.zfs_zpool_list)})
247 zpools.append(zpool_name)
248 return zpools
249
250 @zfs_utils.zfs_dataset_synchronized
251 def _delete_dataset_or_snapshot_with_retry(self, name):
252 """Attempts to destroy some dataset or snapshot with retries."""
253 # NOTE(vponomaryov): it is possible to see 'dataset is busy' error
254 # under the load. So, we are ok to perform retry in this case.
255 mountpoint = self.get_zfs_option(name, 'mountpoint')
256 if '@' not in name:
257 # NOTE(vponomaryov): check that dataset has no open files.
258 start_point = time.time()
259 while time.time() - start_point < 60:
260 try:
261 out, err = self.execute('lsof', '-w', mountpoint)
262 except exception.ProcessExecutionError:
263 # NOTE(vponomaryov): lsof returns code 1 if search
264 # didn't give results.
265 break
266 LOG.debug("Cannot destroy dataset '%(name)s', it has "
267 "opened files. Will wait 2 more seconds. "
268 "Out: \n%(out)s", {
269 'name': name, 'out': out})
270 time.sleep(2)
271 else:
272 raise exception.ZFSonLinuxException(
273 msg=_("Could not destroy '%s' dataset, "
274 "because it had opened files.") % name)
275
276 @utils.retry(exception.ProcessExecutionError)
277 def _zfs_destroy_with_retry():
278 """Retry destroying dataset ten times with exponential backoff."""
279 # NOTE(bswartz): There appears to be a bug in ZFS when creating and
280 # destroying datasets concurrently where the filesystem remains
281 # mounted even though ZFS thinks it's unmounted. The most reliable
282 # workaround I've found is to force the unmount, then attempt the
283 # destroy, with short pauses around the unmount. (See bug#1546723)
284 try:
285 self.execute('sudo', 'umount', mountpoint)
286 except exception.ProcessExecutionError:
287 # Ignore failed umount, it's normal
288 pass
289 time.sleep(2)
290
291 # NOTE(vponomaryov): Now, when no file usages and mounts of dataset
292 # exist, destroy dataset.
293 self.zfs('destroy', '-f', name)
294
295 _zfs_destroy_with_retry()
296
297 def _setup_helpers(self):
298 """Setups share helper for ZFS backend."""
299 self._helpers = {}
300 helpers = self.configuration.zfs_share_helpers
301 if helpers:
302 for helper_str in helpers:
303 share_proto, __, import_str = helper_str.partition('=')
304 helper = importutils.import_class(import_str)
305 self._helpers[share_proto.upper()] = helper(
306 self.configuration)
307 else:
308 raise exception.BadConfigurationException(
309 reason=_(
310 "No share helpers selected for ZFSonLinux Driver. "
311 "Please specify using config option 'zfs_share_helpers'."))
312
313 def _get_share_helper(self, share_proto):
314 """Returns share helper specific for used share protocol."""
315 helper = self._helpers.get(share_proto)
316 if helper:
317 return helper
318 else:
319 raise exception.InvalidShare(
320 reason=_("Wrong, unsupported or disabled protocol - "
321 "'%s'.") % share_proto)
322
323 def do_setup(self, context):
324 """Perform basic setup and checks."""
325 super(ZFSonLinuxShareDriver, self).do_setup(context)
326 self._setup_helpers()
327 for ip in (self.share_export_ip, self.service_ip):
328 if not utils.is_valid_ip_address(ip, 4):
329 raise exception.BadConfigurationException(
330 reason=_("Wrong IP address provided: "
331 "%s") % self.share_export_ip)
332
333 if not self.zpool_list:
334 raise exception.BadConfigurationException(
335 reason=_("No zpools specified for usage: "
336 "%s") % self.zpool_list)
337
338 # Make pool mounts shared so that cloned namespaces receive unmounts
339 # and don't prevent us from unmounting datasets
340 for zpool in self.configuration.zfs_zpool_list:
341 self.execute('sudo', 'mount', '--make-rshared', ('/%s' % zpool))
342
343 if self.configuration.zfs_use_ssh:
344 # Check workability of SSH executor
345 self.ssh_executor('whoami')
346
347 def _get_pools_info(self):
348 """Returns info about all pools used by backend."""
349 pools = []
350 for zpool in self.zpool_list:
351 free_size = self.get_zpool_option(zpool, 'free')
352 free_size = utils.translate_string_size_to_float(free_size)
353 total_size = self.get_zpool_option(zpool, 'size')
354 total_size = utils.translate_string_size_to_float(total_size)
355 pool = {
356 'pool_name': zpool,
357 'total_capacity_gb': float(total_size),
358 'free_capacity_gb': float(free_size),
359 'reserved_percentage':
360 self.configuration.reserved_share_percentage,
361 }
362 pool.update(self.common_capabilities)
363 if self.configuration.replication_domain:
364 pool['replication_type'] = 'readable'
365 pools.append(pool)
366 return pools
367
368 def _update_share_stats(self):
369 """Retrieves share stats info."""
370 data = {
371 'share_backend_name': self.backend_name,
372 'storage_protocol': 'NFS',
373 'reserved_percentage':
374 self.configuration.reserved_share_percentage,
375 'snapshot_support': True,
376 'create_share_from_snapshot_support': True,
377 'driver_name': 'ZFS',
378 'pools': self._get_pools_info(),
379 }
380 if self.configuration.replication_domain:
381 data['replication_type'] = 'readable'
382 super(ZFSonLinuxShareDriver, self)._update_share_stats(data)
383
384 def _get_share_name(self, share_id):
385 """Returns name of dataset used for given share."""
386 prefix = self.configuration.zfs_dataset_name_prefix or ''
387 return prefix + share_id.replace('-', '_')
388
389 def _get_snapshot_name(self, snapshot_id):
390 """Returns name of dataset snapshot used for given share snapshot."""
391 prefix = self.configuration.zfs_dataset_snapshot_name_prefix or ''
392 return prefix + snapshot_id.replace('-', '_')
393
394 def _get_dataset_creation_options(self, share, is_readonly=False):
395 """Returns list of options to be used for dataset creation."""
396 options = ['quota=%sG' % share['size']]
397 extra_specs = share_types.get_extra_specs_from_share(share)
398
399 dedupe_set = False
400 dedupe = extra_specs.get('dedupe')
401 if dedupe:
402 dedupe = strutils.bool_from_string(
403 dedupe.lower().split(' ')[-1], default=dedupe)
404 if (dedupe in self.common_capabilities['dedupe']):
405 options.append('dedup=%s' % ('on' if dedupe else 'off'))
406 dedupe_set = True
407 else:
408 raise exception.ZFSonLinuxException(msg=_(
409 "Cannot use requested '%(requested)s' value of 'dedupe' "
410 "extra spec. It does not fit allowed value '%(allowed)s' "
411 "that is configured for backend.") % {
412 'requested': dedupe,
413 'allowed': self.common_capabilities['dedupe']})
414
415 compression_set = False
416 compression_type = extra_specs.get('zfsonlinux:compression')
417 if compression_type:
418 if (compression_type == 'off' and
419 False in self.common_capabilities['compression']):
420 options.append('compression=off')
421 compression_set = True
422 elif (compression_type != 'off' and
423 True in self.common_capabilities['compression']):
424 options.append('compression=%s' % compression_type)
425 compression_set = True
426 else:
427 raise exception.ZFSonLinuxException(msg=_(
428 "Cannot use value '%s' of extra spec "
429 "'zfsonlinux:compression' because compression is disabled "
430 "for this backend. Set extra spec 'compression=True' to "
431 "make scheduler pick up appropriate backend."
432 ) % compression_type)
433
434 for option in self.dataset_creation_options or []:
435 if any(v in option for v in (
436 'readonly', 'sharenfs', 'sharesmb', 'quota')):
437 continue
438 if 'dedup' in option and dedupe_set is True:
439 continue
440 if 'compression' in option and compression_set is True:
441 continue
442 options.append(option)
443 if is_readonly:
444 options.append('readonly=on')
445 else:
446 options.append('readonly=off')
447 return options
448
449 def _get_dataset_name(self, share):
450 """Returns name of dataset used for given share."""
451 pool_name = share_utils.extract_host(share['host'], level='pool')
452
453 # Pick pool with nested dataset name if set up
454 for pool in self.configuration.zfs_zpool_list:
455 pool_data = pool.split('/')
456 if (pool_name == pool_data[0] and len(pool_data) > 1):
457 pool_name = pool
458 if pool_name[-1] == '/':
459 pool_name = pool_name[0:-1]
460 break
461
462 dataset_name = self._get_share_name(share['id'])
463 full_dataset_name = '%(pool)s/%(dataset)s' % {
464 'pool': pool_name, 'dataset': dataset_name}
465
466 return full_dataset_name
467
468 @ensure_share_server_not_provided
469 def create_share(self, context, share, share_server=None):
470 """Is called to create a share."""
471 options = self._get_dataset_creation_options(share, is_readonly=False)
472 cmd = ['create']
473 for option in options:
474 cmd.extend(['-o', option])
475 dataset_name = self._get_dataset_name(share)
476 cmd.append(dataset_name)
477
478 ssh_cmd = '%(username)s@%(host)s' % {
479 'username': self.configuration.zfs_ssh_username,
480 'host': self.service_ip,
481 }
482 pool_name = share_utils.extract_host(share['host'], level='pool')
483 self.private_storage.update(
484 share['id'], {
485 'entity_type': 'share',
486 'dataset_name': dataset_name,
487 'ssh_cmd': ssh_cmd, # used with replication and migration
488 'pool_name': pool_name, # used in replication
489 'used_options': ' '.join(options),
490 }
491 )
492
493 self.zfs(*cmd)
494
495 return self._get_share_helper(
496 share['share_proto']).create_exports(dataset_name)
497
498 @ensure_share_server_not_provided
499 def delete_share(self, context, share, share_server=None):
500 """Is called to remove a share."""
501 pool_name = self.private_storage.get(share['id'], 'pool_name')
502 pool_name = pool_name or share_utils.extract_host(
503 share["host"], level="pool")
504 dataset_name = self.private_storage.get(share['id'], 'dataset_name')
505 if not dataset_name:
506 dataset_name = self._get_dataset_name(share)
507
508 out, err = self.zfs('list', '-r', pool_name)
509 data = self.parse_zfs_answer(out)
510 for datum in data:
511 if datum['NAME'] != dataset_name:
512 continue
513
514 # Delete dataset's snapshots first
515 out, err = self.zfs('list', '-r', '-t', 'snapshot', pool_name)
516 snapshots = self.parse_zfs_answer(out)
517 full_snapshot_prefix = (
518 dataset_name + '@')
519 for snap in snapshots:
520 if full_snapshot_prefix in snap['NAME']:
521 self._delete_dataset_or_snapshot_with_retry(snap['NAME'])
522
523 self._get_share_helper(
524 share['share_proto']).remove_exports(dataset_name)
525 self._delete_dataset_or_snapshot_with_retry(dataset_name)
526 break
527 else:
528 LOG.warning(
529 "Share with '%(id)s' ID and '%(name)s' NAME is "
530 "absent on backend. Nothing has been deleted.",
531 {'id': share['id'], 'name': dataset_name})
532 self.private_storage.delete(share['id'])
533
534 @ensure_share_server_not_provided
535 def create_snapshot(self, context, snapshot, share_server=None):
536 """Is called to create a snapshot."""
537 dataset_name = self.private_storage.get(
538 snapshot['share_instance_id'], 'dataset_name')
539 snapshot_tag = self._get_snapshot_name(snapshot['id'])
540 snapshot_name = dataset_name + '@' + snapshot_tag
541 self.private_storage.update(
542 snapshot['snapshot_id'], {
543 'entity_type': 'snapshot',
544 'snapshot_tag': snapshot_tag,
545 }
546 )
547 self.zfs('snapshot', snapshot_name)
548 return {"provider_location": snapshot_name}
549
550 @ensure_share_server_not_provided
551 def delete_snapshot(self, context, snapshot, share_server=None):
552 """Is called to remove a snapshot."""
553 self._delete_snapshot(context, snapshot)
554 self.private_storage.delete(snapshot['snapshot_id'])
555
556 def _get_saved_snapshot_name(self, snapshot_instance):
557 snapshot_tag = self.private_storage.get(
558 snapshot_instance['snapshot_id'], 'snapshot_tag')
559 dataset_name = self.private_storage.get(
560 snapshot_instance['share_instance_id'], 'dataset_name')
561 snapshot_name = dataset_name + '@' + snapshot_tag
562 return snapshot_name
563
564 def _delete_snapshot(self, context, snapshot):
565 snapshot_name = self._get_saved_snapshot_name(snapshot)
566 out, err = self.zfs('list', '-r', '-t', 'snapshot', snapshot_name)
567 data = self.parse_zfs_answer(out)
568 for datum in data:
569 if datum['NAME'] == snapshot_name:
570 self._delete_dataset_or_snapshot_with_retry(snapshot_name)
571 break
572 else:
573 LOG.warning(
574 "Snapshot with '%(id)s' ID and '%(name)s' NAME is "
575 "absent on backend. Nothing has been deleted.",
576 {'id': snapshot['id'], 'name': snapshot_name})
577
578 @ensure_share_server_not_provided
579 def create_share_from_snapshot(self, context, share, snapshot,
580 share_server=None, parent_share=None):
581 """Is called to create a share from snapshot."""
582 src_backend_name = share_utils.extract_host(
583 snapshot.share_instance['host'], level='backend_name'
584 )
585 src_snapshot_name = self._get_saved_snapshot_name(snapshot)
586 dataset_name = self._get_dataset_name(share)
587
588 dst_backend_ssh_cmd = '%(username)s@%(host)s' % {
589 'username': self.configuration.zfs_ssh_username,
590 'host': self.service_ip,
591 }
592
593 dst_backend_pool_name = share_utils.extract_host(share['host'],
594 level='pool')
595 options = self._get_dataset_creation_options(share, is_readonly=False)
596
597 self.private_storage.update(
598 share['id'], {
599 'entity_type': 'share',
600 'dataset_name': dataset_name,
601 'ssh_cmd': dst_backend_ssh_cmd, # used in replication
602 'pool_name': dst_backend_pool_name, # used in replication
603 'used_options': options,
604 }
605 )
606
607 # NOTE(andrebeltrami): Implementing the support for create share
608 # from snapshot in different backends in different hosts
609 src_config = get_backend_configuration(src_backend_name)
610 src_backend_ssh_cmd = '%(username)s@%(host)s' % {
611 'username': src_config.zfs_ssh_username,
612 'host': src_config.zfs_service_ip,
613 }
614 self.execute(
615 # NOTE(vponomaryov): SSH is used as workaround for 'execute'
616 # implementation restriction that does not support usage
617 # of '|'.
618 'ssh', src_backend_ssh_cmd,
619 'sudo', 'zfs', 'send', '-vD', src_snapshot_name, '|',
620 'ssh', dst_backend_ssh_cmd,
621 'sudo', 'zfs', 'receive', '-v', dataset_name,
622 )
623
624 # Apply options based on used share type that may differ from
625 # one used for original share.
626 for option in options:
627 self.zfs('set', option, dataset_name)
628
629 # Delete with retry as right after creation it may be temporary busy.
630 self.execute_with_retry(
631 'sudo', 'zfs', 'destroy',
632 dataset_name + '@' + src_snapshot_name.split('@')[-1])
633
634 return self._get_share_helper(
635 share['share_proto']).create_exports(dataset_name)
636
637 def get_pool(self, share):
638 """Return pool name where the share resides on.
639
640 :param share: The share hosted by the driver.
641 """
642 pool_name = share_utils.extract_host(share['host'], level='pool')
643 return pool_name
644
645 @ensure_share_server_not_provided
646 def ensure_share(self, context, share, share_server=None):
647 """Invoked to ensure that given share is exported."""
648 dataset_name = self.private_storage.get(share['id'], 'dataset_name')
649 if not dataset_name:
650 dataset_name = self._get_dataset_name(share)
651
652 pool_name = share_utils.extract_host(share['host'], level='pool')
653 out, err = self.zfs('list', '-r', pool_name)
654 data = self.parse_zfs_answer(out)
655 for datum in data:
656 if datum['NAME'] == dataset_name:
657 ssh_cmd = '%(username)s@%(host)s' % {
658 'username': self.configuration.zfs_ssh_username,
659 'host': self.service_ip,
660 }
661 self.private_storage.update(
662 share['id'], {'ssh_cmd': ssh_cmd})
663 sharenfs = self.get_zfs_option(dataset_name, 'sharenfs')
664 if sharenfs != 'off':
665 self.zfs('share', dataset_name)
666 export_locations = self._get_share_helper(
667 share['share_proto']).get_exports(dataset_name)
668 return export_locations
669 else:
670 raise exception.ShareResourceNotFound(share_id=share['id'])
671
672 def get_network_allocations_number(self):
673 """ZFS does not handle networking. Return 0."""
674 return 0
675
676 @ensure_share_server_not_provided
677 def extend_share(self, share, new_size, share_server=None):
678 """Extends size of existing share."""
679 dataset_name = self._get_dataset_name(share)
680 self.zfs('set', 'quota=%sG' % new_size, dataset_name)
681
682 @ensure_share_server_not_provided
683 def shrink_share(self, share, new_size, share_server=None):
684 """Shrinks size of existing share."""
685 dataset_name = self._get_dataset_name(share)
686 consumed_space = self.get_zfs_option(dataset_name, 'used')
687 consumed_space = utils.translate_string_size_to_float(consumed_space)
688 if consumed_space >= new_size:
689 raise exception.ShareShrinkingPossibleDataLoss(
690 share_id=share['id'])
691 self.zfs('set', 'quota=%sG' % new_size, dataset_name)
692
693 @ensure_share_server_not_provided
694 def update_access(self, context, share, access_rules, add_rules,
695 delete_rules, share_server=None):
696 """Updates access rules for given share."""
697 dataset_name = self._get_dataset_name(share)
698 executor = self._get_shell_executor_by_host(share['host'])
699 return self._get_share_helper(share['share_proto']).update_access(
700 dataset_name, access_rules, add_rules, delete_rules,
701 executor=executor)
702
703 def manage_existing(self, share, driver_options):
704 """Manage existing ZFS dataset as manila share.
705
706 ZFSonLinux driver accepts only one driver_option 'size'.
707 If an administrator provides this option, then such quota will be set
708 to dataset and used as share size. Otherwise, driver will set quota
709 equal to nearest bigger rounded integer of usage size.
710 Driver does not expect mountpoint to be changed (should be equal
711 to default that is "/%(dataset_name)s").
712
713 :param share: share data
714 :param driver_options: Empty dict or dict with 'size' option.
715 :return: dict with share size and its export locations.
716 """
717 old_export_location = share["export_locations"][0]["path"]
718 old_dataset_name = old_export_location.split(":/")[-1]
719
720 scheduled_pool_name = share_utils.extract_host(
721 share["host"], level="pool")
722 actual_pool_name = old_dataset_name.split("/")[0]
723
724 new_dataset_name = self._get_dataset_name(share)
725
726 # Calculate quota for managed dataset
727 quota = driver_options.get("size")
728 if not quota:
729 consumed_space = self.get_zfs_option(old_dataset_name, "used")
730 consumed_space = utils.translate_string_size_to_float(
731 consumed_space)
732 quota = int(consumed_space) + 1
733 share["size"] = int(quota)
734
735 # Save dataset-specific data in private storage
736 options = self._get_dataset_creation_options(share, is_readonly=False)
737 ssh_cmd = "%(username)s@%(host)s" % {
738 "username": self.configuration.zfs_ssh_username,
739 "host": self.service_ip,
740 }
741
742 # Perform checks on requested dataset
743 if actual_pool_name != scheduled_pool_name:
744 raise exception.ZFSonLinuxException(
745 _("Cannot manage share '%(share_id)s' "
746 "(share_instance '%(si_id)s'), because scheduled "
747 "pool '%(sch)s' and actual '%(actual)s' differ.") % {
748 "share_id": share["share_id"],
749 "si_id": share["id"],
750 "sch": scheduled_pool_name,
751 "actual": actual_pool_name})
752
753 out, err = self.zfs("list", "-r", actual_pool_name)
754 data = self.parse_zfs_answer(out)
755 for datum in data:
756 if datum["NAME"] == old_dataset_name:
757 break
758 else:
759 raise exception.ZFSonLinuxException(
760 _("Cannot manage share '%(share_id)s' "
761 "(share_instance '%(si_id)s'), because dataset "
762 "'%(dataset)s' not found in zpool '%(zpool)s'.") % {
763 "share_id": share["share_id"],
764 "si_id": share["id"],
765 "dataset": old_dataset_name,
766 "zpool": actual_pool_name})
767
768 # Unmount the dataset before attempting to rename and mount
769 try:
770 self._unmount_share_with_retry(old_dataset_name)
771 except exception.ZFSonLinuxException:
772 msg = _("Unable to unmount share before renaming and re-mounting.")
773 raise exception.ZFSonLinuxException(message=msg)
774
775 # Rename the dataset and mount with new name
776 self.zfs_with_retry("rename", old_dataset_name, new_dataset_name)
777
778 try:
779 self.zfs("mount", new_dataset_name)
780 except exception.ProcessExecutionError:
781 # Workaround for bug/1785180
782 out, err = self.zfs("mount")
783 mounted = any([new_dataset_name in mountedfs
784 for mountedfs in out.splitlines()])
785 if not mounted:
786 raise
787
788 # Apply options to dataset
789 for option in options:
790 self.zfs("set", option, new_dataset_name)
791
792 # Get new export locations of renamed dataset
793 export_locations = self._get_share_helper(
794 share["share_proto"]).get_exports(new_dataset_name)
795
796 self.private_storage.update(
797 share["id"], {
798 "entity_type": "share",
799 "dataset_name": new_dataset_name,
800 "ssh_cmd": ssh_cmd, # used in replication
801 "pool_name": actual_pool_name, # used in replication
802 "used_options": " ".join(options),
803 }
804 )
805
806 return {"size": share["size"], "export_locations": export_locations}
807
808 def unmanage(self, share):
809 """Removes the specified share from Manila management."""
810 self.private_storage.delete(share['id'])
811
812 def manage_existing_snapshot(self, snapshot_instance, driver_options):
813 """Manage existing share snapshot with manila.
814
815 :param snapshot_instance: SnapshotInstance data
816 :param driver_options: expects only one optional key 'size'.
817 :return: dict with share snapshot instance fields for update, example::
818
819 {
820
821 'size': 1,
822 'provider_location': 'path/to/some/dataset@some_snapshot_tag',
823
824 }
825
826 """
827 snapshot_size = int(driver_options.get("size", 0))
828 old_provider_location = snapshot_instance.get("provider_location")
829 old_snapshot_tag = old_provider_location.split("@")[-1]
830 new_snapshot_tag = self._get_snapshot_name(snapshot_instance["id"])
831
832 self.private_storage.update(
833 snapshot_instance["snapshot_id"], {
834 "entity_type": "snapshot",
835 "old_snapshot_tag": old_snapshot_tag,
836 "snapshot_tag": new_snapshot_tag,
837 }
838 )
839
840 try:
841 self.zfs("list", "-r", "-t", "snapshot", old_provider_location)
842 except exception.ProcessExecutionError as e:
843 raise exception.ManageInvalidShareSnapshot(reason=e.stderr)
844
845 if not snapshot_size:
846 consumed_space = self.get_zfs_option(old_provider_location, "used")
847 consumed_space = utils.translate_string_size_to_float(
848 consumed_space)
849 snapshot_size = int(math.ceil(consumed_space))
850
851 dataset_name = self.private_storage.get(
852 snapshot_instance["share_instance_id"], "dataset_name")
853 new_provider_location = dataset_name + "@" + new_snapshot_tag
854
855 self.zfs("rename", old_provider_location, new_provider_location)
856
857 return {
858 "size": snapshot_size,
859 "provider_location": new_provider_location,
860 }
861
862 def unmanage_snapshot(self, snapshot_instance):
863 """Unmanage dataset snapshot."""
864 self.private_storage.delete(snapshot_instance["snapshot_id"])
865
866 @utils.retry(exception.ZFSonLinuxException)
867 def _unmount_share_with_retry(self, share_name):
868 out, err = self.execute("sudo", "mount")
869 if "%s " % share_name not in out:
870 return
871 self.zfs_with_retry("umount", "-f", share_name)
872 out, err = self.execute("sudo", "mount")
873 if "%s " % share_name in out:
874 raise exception.ZFSonLinuxException(
875 _("Unable to unmount dataset %s"), share_name)
876
877 def _get_replication_snapshot_prefix(self, replica):
878 """Returns replica-based snapshot prefix."""
879 replication_snapshot_prefix = "%s_%s" % (
880 self.replica_snapshot_prefix, replica['id'].replace('-', '_'))
881 return replication_snapshot_prefix
882
883 def _get_replication_snapshot_tag(self, replica):
884 """Returns replica- and time-based snapshot tag."""
885 current_time = timeutils.utcnow().isoformat()
886 snapshot_tag = "%s_time_%s" % (
887 self._get_replication_snapshot_prefix(replica), current_time)
888 return snapshot_tag
889
890 def _get_active_replica(self, replica_list):
891 for replica in replica_list:
892 if replica['replica_state'] == constants.REPLICA_STATE_ACTIVE:
893 return replica
894 msg = _("Active replica not found.")
895 raise exception.ReplicationException(reason=msg)
896
897 def _get_migration_snapshot_prefix(self, share_instance):
898 """Returns migration-based snapshot prefix."""
899 migration_snapshot_prefix = "%s_%s" % (
900 self.migration_snapshot_prefix,
901 share_instance['id'].replace('-', '_'))
902 return migration_snapshot_prefix
903
904 def _get_migration_snapshot_tag(self, share_instance):
905 """Returns migration- and time-based snapshot tag."""
906 current_time = timeutils.utcnow().isoformat()
907 snapshot_tag = "%s_time_%s" % (
908 self._get_migration_snapshot_prefix(share_instance), current_time)
909 snapshot_tag = (
910 snapshot_tag.replace('-', '_').replace('.', '_').replace(':', '_'))
911 return snapshot_tag
912
913 @ensure_share_server_not_provided
914 def create_replica(self, context, replica_list, new_replica,
915 access_rules, replica_snapshots, share_server=None):
916 """Replicates the active replica to a new replica on this backend."""
917 active_replica = self._get_active_replica(replica_list)
918 src_dataset_name = self.private_storage.get(
919 active_replica['id'], 'dataset_name')
920 ssh_to_src_cmd = self.private_storage.get(
921 active_replica['id'], 'ssh_cmd')
922 dst_dataset_name = self._get_dataset_name(new_replica)
923
924 ssh_cmd = '%(username)s@%(host)s' % {
925 'username': self.configuration.zfs_ssh_username,
926 'host': self.service_ip,
927 }
928
929 snapshot_tag = self._get_replication_snapshot_tag(new_replica)
930 src_snapshot_name = (
931 '%(dataset_name)s@%(snapshot_tag)s' % {
932 'snapshot_tag': snapshot_tag,
933 'dataset_name': src_dataset_name,
934 }
935 )
936 # Save valuable data to DB
937 self.private_storage.update(active_replica['id'], {
938 'repl_snapshot_tag': snapshot_tag,
939 })
940 self.private_storage.update(new_replica['id'], {
941 'entity_type': 'replica',
942 'replica_type': 'readable',
943 'dataset_name': dst_dataset_name,
944 'ssh_cmd': ssh_cmd,
945 'pool_name': share_utils.extract_host(
946 new_replica['host'], level='pool'),
947 'repl_snapshot_tag': snapshot_tag,
948 })
949
950 # Create temporary snapshot. It will exist until following replica sync
951 # After it - new one will appear and so in loop.
952 self.execute(
953 'ssh', ssh_to_src_cmd,
954 'sudo', 'zfs', 'snapshot', src_snapshot_name,
955 )
956
957 # Send/receive temporary snapshot
958 out, err = self.execute(
959 'ssh', ssh_to_src_cmd,
960 'sudo', 'zfs', 'send', '-vDR', src_snapshot_name, '|',
961 'ssh', ssh_cmd,
962 'sudo', 'zfs', 'receive', '-v', dst_dataset_name,
963 )
964 msg = ("Info about replica '%(replica_id)s' creation is following: "
965 "\n%(out)s")
966 LOG.debug(msg, {'replica_id': new_replica['id'], 'out': out})
967
968 # Make replica readonly
969 self.zfs('set', 'readonly=on', dst_dataset_name)
970
971 # Set original share size as quota to new replica
972 self.zfs('set', 'quota=%sG' % active_replica['size'], dst_dataset_name)
973
974 # Apply access rules from original share
975 self._get_share_helper(new_replica['share_proto']).update_access(
976 dst_dataset_name, access_rules, add_rules=[], delete_rules=[],
977 make_all_ro=True)
978
979 return {
980 'export_locations': self._get_share_helper(
981 new_replica['share_proto']).create_exports(dst_dataset_name),
982 'replica_state': constants.REPLICA_STATE_IN_SYNC,
983 'access_rules_status': constants.STATUS_ACTIVE,
984 }
985
986 @ensure_share_server_not_provided
987 def delete_replica(self, context, replica_list, replica_snapshots, replica,
988 share_server=None):
989 """Deletes a replica. This is called on the destination backend."""
990 pool_name = self.private_storage.get(replica['id'], 'pool_name')
991 dataset_name = self.private_storage.get(replica['id'], 'dataset_name')
992 if not dataset_name:
993 dataset_name = self._get_dataset_name(replica)
994
995 # Delete dataset's snapshots first
996 out, err = self.zfs('list', '-r', '-t', 'snapshot', pool_name)
997 data = self.parse_zfs_answer(out)
998 for datum in data:
999 if dataset_name in datum['NAME']:
1000 self._delete_dataset_or_snapshot_with_retry(datum['NAME'])
1001
1002 # Now we delete dataset itself
1003 out, err = self.zfs('list', '-r', pool_name)
1004 data = self.parse_zfs_answer(out)
1005 for datum in data:
1006 if datum['NAME'] == dataset_name:
1007 self._get_share_helper(
1008 replica['share_proto']).remove_exports(dataset_name)
1009 self._delete_dataset_or_snapshot_with_retry(dataset_name)
1010 break
1011 else:
1012 LOG.warning(
1013 "Share replica with '%(id)s' ID and '%(name)s' NAME is "
1014 "absent on backend. Nothing has been deleted.",
1015 {'id': replica['id'], 'name': dataset_name})
1016 self.private_storage.delete(replica['id'])
1017
1018 @ensure_share_server_not_provided
1019 def update_replica_state(self, context, replica_list, replica,
1020 access_rules, replica_snapshots,
1021 share_server=None):
1022 """Syncs replica and updates its 'replica_state'."""
1023 return self._update_replica_state(
1024 context, replica_list, replica, replica_snapshots, access_rules)
1025
1026 def _update_replica_state(self, context, replica_list, replica,
1027 replica_snapshots=None, access_rules=None):
1028 active_replica = self._get_active_replica(replica_list)
1029 src_dataset_name = self.private_storage.get(
1030 active_replica['id'], 'dataset_name')
1031 ssh_to_src_cmd = self.private_storage.get(
1032 active_replica['id'], 'ssh_cmd')
1033 ssh_to_dst_cmd = self.private_storage.get(
1034 replica['id'], 'ssh_cmd')
1035 dst_dataset_name = self.private_storage.get(
1036 replica['id'], 'dataset_name')
1037
1038 # Create temporary snapshot
1039 previous_snapshot_tag = self.private_storage.get(
1040 replica['id'], 'repl_snapshot_tag')
1041 snapshot_tag = self._get_replication_snapshot_tag(replica)
1042 src_snapshot_name = src_dataset_name + '@' + snapshot_tag
1043 self.execute(
1044 'ssh', ssh_to_src_cmd,
1045 'sudo', 'zfs', 'snapshot', src_snapshot_name,
1046 )
1047
1048 # Make sure it is readonly
1049 self.zfs('set', 'readonly=on', dst_dataset_name)
1050
1051 # Send/receive diff between previous snapshot and last one
1052 out, err = self.execute(
1053 'ssh', ssh_to_src_cmd,
1054 'sudo', 'zfs', 'send', '-vDRI',
1055 previous_snapshot_tag, src_snapshot_name, '|',
1056 'ssh', ssh_to_dst_cmd,
1057 'sudo', 'zfs', 'receive', '-vF', dst_dataset_name,
1058 )
1059 msg = ("Info about last replica '%(replica_id)s' sync is following: "
1060 "\n%(out)s")
1061 LOG.debug(msg, {'replica_id': replica['id'], 'out': out})
1062
1063 # Update DB data that will be used on following replica sync
1064 self.private_storage.update(active_replica['id'], {
1065 'repl_snapshot_tag': snapshot_tag,
1066 })
1067 self.private_storage.update(
1068 replica['id'], {'repl_snapshot_tag': snapshot_tag})
1069
1070 # Destroy all snapshots on dst filesystem except referenced ones.
1071 snap_references = set()
1072 for repl in replica_list:
1073 snap_references.add(
1074 self.private_storage.get(repl['id'], 'repl_snapshot_tag'))
1075
1076 dst_pool_name = dst_dataset_name.split('/')[0]
1077 out, err = self.zfs('list', '-r', '-t', 'snapshot', dst_pool_name)
1078 data = self.parse_zfs_answer(out)
1079 for datum in data:
1080 if (dst_dataset_name in datum['NAME'] and
1081 '@' + self.replica_snapshot_prefix in datum['NAME'] and
1082 datum['NAME'].split('@')[-1] not in snap_references):
1083 self._delete_dataset_or_snapshot_with_retry(datum['NAME'])
1084
1085 # Destroy all snapshots on src filesystem except referenced ones.
1086 src_pool_name = src_snapshot_name.split('/')[0]
1087 out, err = self.execute(
1088 'ssh', ssh_to_src_cmd,
1089 'sudo', 'zfs', 'list', '-r', '-t', 'snapshot', src_pool_name,
1090 )
1091 data = self.parse_zfs_answer(out)
1092 full_src_snapshot_prefix = (
1093 src_dataset_name + '@' +
1094 self._get_replication_snapshot_prefix(replica))
1095 for datum in data:
1096 if (full_src_snapshot_prefix in datum['NAME'] and
1097 datum['NAME'].split('@')[-1] not in snap_references):
1098 self.execute_with_retry(
1099 'ssh', ssh_to_src_cmd,
1100 'sudo', 'zfs', 'destroy', '-f', datum['NAME'],
1101 )
1102
1103 if access_rules:
1104 # Apply access rules from original share
1105 # TODO(vponomaryov): we should remove somehow rules that were
1106 # deleted on active replica after creation of secondary replica.
1107 # For the moment there will be difference and it can be considered
1108 # as a bug.
1109 self._get_share_helper(replica['share_proto']).update_access(
1110 dst_dataset_name, access_rules, add_rules=[], delete_rules=[],
1111 make_all_ro=True)
1112
1113 # Return results
1114 return constants.REPLICA_STATE_IN_SYNC
1115
1116 @ensure_share_server_not_provided
1117 def promote_replica(self, context, replica_list, replica, access_rules,
1118 share_server=None):
1119 """Promotes secondary replica to active and active to secondary."""
1120 active_replica = self._get_active_replica(replica_list)
1121 src_dataset_name = self.private_storage.get(
1122 active_replica['id'], 'dataset_name')
1123 ssh_to_src_cmd = self.private_storage.get(
1124 active_replica['id'], 'ssh_cmd')
1125 dst_dataset_name = self.private_storage.get(
1126 replica['id'], 'dataset_name')
1127 replica_dict = {
1128 r['id']: {
1129 'id': r['id'],
1130 # NOTE(vponomaryov): access rules will be updated in next
1131 # 'sync' operation.
1132 'access_rules_status': constants.SHARE_INSTANCE_RULES_SYNCING,
1133 }
1134 for r in replica_list
1135 }
1136 try:
1137 # Mark currently active replica as readonly
1138 self.execute(
1139 'ssh', ssh_to_src_cmd,
1140 'set', 'readonly=on', src_dataset_name,
1141 )
1142
1143 # Create temporary snapshot of currently active replica
1144 snapshot_tag = self._get_replication_snapshot_tag(active_replica)
1145 src_snapshot_name = src_dataset_name + '@' + snapshot_tag
1146 self.execute(
1147 'ssh', ssh_to_src_cmd,
1148 'sudo', 'zfs', 'snapshot', src_snapshot_name,
1149 )
1150
1151 # Apply temporary snapshot to all replicas
1152 for repl in replica_list:
1153 if repl['replica_state'] == constants.REPLICA_STATE_ACTIVE:
1154 continue
1155 previous_snapshot_tag = self.private_storage.get(
1156 repl['id'], 'repl_snapshot_tag')
1157 dataset_name = self.private_storage.get(
1158 repl['id'], 'dataset_name')
1159 ssh_to_dst_cmd = self.private_storage.get(
1160 repl['id'], 'ssh_cmd')
1161
1162 try:
1163 # Send/receive diff between previous snapshot and last one
1164 out, err = self.execute(
1165 'ssh', ssh_to_src_cmd,
1166 'sudo', 'zfs', 'send', '-vDRI',
1167 previous_snapshot_tag, src_snapshot_name, '|',
1168 'ssh', ssh_to_dst_cmd,
1169 'sudo', 'zfs', 'receive', '-vF', dataset_name,
1170 )
1171 except exception.ProcessExecutionError as e:
1172 LOG.warning("Failed to sync replica %(id)s. %(e)s",
1173 {'id': repl['id'], 'e': e})
1174 replica_dict[repl['id']]['replica_state'] = (
1175 constants.REPLICA_STATE_OUT_OF_SYNC)
1176 continue
1177
1178 msg = ("Info about last replica '%(replica_id)s' "
1179 "sync is following: \n%(out)s")
1180 LOG.debug(msg, {'replica_id': repl['id'], 'out': out})
1181
1182 # Update latest replication snapshot for replica
1183 self.private_storage.update(
1184 repl['id'], {'repl_snapshot_tag': snapshot_tag})
1185
1186 # Update latest replication snapshot for currently active replica
1187 self.private_storage.update(
1188 active_replica['id'], {'repl_snapshot_tag': snapshot_tag})
1189
1190 replica_dict[active_replica['id']]['replica_state'] = (
1191 constants.REPLICA_STATE_IN_SYNC)
1192 except Exception as e:
1193 LOG.warning(
1194 "Failed to update currently active replica. \n%s", e)
1195
1196 replica_dict[active_replica['id']]['replica_state'] = (
1197 constants.REPLICA_STATE_OUT_OF_SYNC)
1198
1199 # Create temporary snapshot of new replica and sync it with other
1200 # secondary replicas.
1201 snapshot_tag = self._get_replication_snapshot_tag(replica)
1202 src_snapshot_name = dst_dataset_name + '@' + snapshot_tag
1203 ssh_to_src_cmd = self.private_storage.get(replica['id'], 'ssh_cmd')
1204 self.zfs('snapshot', src_snapshot_name)
1205 for repl in replica_list:
1206 if (repl['replica_state'] == constants.REPLICA_STATE_ACTIVE or
1207 repl['id'] == replica['id']):
1208 continue
1209 previous_snapshot_tag = self.private_storage.get(
1210 repl['id'], 'repl_snapshot_tag')
1211 dataset_name = self.private_storage.get(
1212 repl['id'], 'dataset_name')
1213 ssh_to_dst_cmd = self.private_storage.get(
1214 repl['id'], 'ssh_cmd')
1215
1216 try:
1217 # Send/receive diff between previous snapshot and last one
1218 out, err = self.execute(
1219 'ssh', ssh_to_src_cmd,
1220 'sudo', 'zfs', 'send', '-vDRI',
1221 previous_snapshot_tag, src_snapshot_name, '|',
1222 'ssh', ssh_to_dst_cmd,
1223 'sudo', 'zfs', 'receive', '-vF', dataset_name,
1224 )
1225 except exception.ProcessExecutionError as e:
1226 LOG.warning("Failed to sync replica %(id)s. %(e)s",
1227 {'id': repl['id'], 'e': e})
1228 replica_dict[repl['id']]['replica_state'] = (
1229 constants.REPLICA_STATE_OUT_OF_SYNC)
1230 continue
1231
1232 msg = ("Info about last replica '%(replica_id)s' "
1233 "sync is following: \n%(out)s")
1234 LOG.debug(msg, {'replica_id': repl['id'], 'out': out})
1235
1236 # Update latest replication snapshot for replica
1237 self.private_storage.update(
1238 repl['id'], {'repl_snapshot_tag': snapshot_tag})
1239
1240 # Update latest replication snapshot for new active replica
1241 self.private_storage.update(
1242 replica['id'], {'repl_snapshot_tag': snapshot_tag})
1243
1244 replica_dict[replica['id']]['replica_state'] = (
1245 constants.REPLICA_STATE_ACTIVE)
1246
1247 self._get_share_helper(replica['share_proto']).update_access(
1248 dst_dataset_name, access_rules, add_rules=[], delete_rules=[])
1249
1250 replica_dict[replica['id']]['access_rules_status'] = (
1251 constants.STATUS_ACTIVE)
1252
1253 self.zfs('set', 'readonly=off', dst_dataset_name)
1254
1255 return list(replica_dict.values())
1256
1257 @ensure_share_server_not_provided
1258 def create_replicated_snapshot(self, context, replica_list,
1259 replica_snapshots, share_server=None):
1260 """Create a snapshot and update across the replicas."""
1261 active_replica = self._get_active_replica(replica_list)
1262 src_dataset_name = self.private_storage.get(
1263 active_replica['id'], 'dataset_name')
1264 ssh_to_src_cmd = self.private_storage.get(
1265 active_replica['id'], 'ssh_cmd')
1266 replica_snapshots_dict = {
1267 si['id']: {'id': si['id']} for si in replica_snapshots}
1268
1269 active_snapshot_instance_id = [
1270 si['id'] for si in replica_snapshots
1271 if si['share_instance_id'] == active_replica['id']][0]
1272 snapshot_tag = self._get_snapshot_name(active_snapshot_instance_id)
1273 # Replication should not be dependent on manually created snapshots
1274 # so, create additional one, newer, that will be used for replication
1275 # synchronizations.
1276 repl_snapshot_tag = self._get_replication_snapshot_tag(active_replica)
1277 src_snapshot_name = src_dataset_name + '@' + repl_snapshot_tag
1278
1279 self.private_storage.update(
1280 replica_snapshots[0]['snapshot_id'], {
1281 'entity_type': 'snapshot',
1282 'snapshot_tag': snapshot_tag,
1283 }
1284 )
1285 for tag in (snapshot_tag, repl_snapshot_tag):
1286 self.execute(
1287 'ssh', ssh_to_src_cmd,
1288 'sudo', 'zfs', 'snapshot', src_dataset_name + '@' + tag,
1289 )
1290
1291 # Populate snapshot to all replicas
1292 for replica_snapshot in replica_snapshots:
1293 replica_id = replica_snapshot['share_instance_id']
1294 if replica_id == active_replica['id']:
1295 replica_snapshots_dict[replica_snapshot['id']]['status'] = (
1296 constants.STATUS_AVAILABLE)
1297 continue
1298 previous_snapshot_tag = self.private_storage.get(
1299 replica_id, 'repl_snapshot_tag')
1300 dst_dataset_name = self.private_storage.get(
1301 replica_id, 'dataset_name')
1302 ssh_to_dst_cmd = self.private_storage.get(replica_id, 'ssh_cmd')
1303
1304 try:
1305 # Send/receive diff between previous snapshot and last one
1306 out, err = self.execute(
1307 'ssh', ssh_to_src_cmd,
1308 'sudo', 'zfs', 'send', '-vDRI',
1309 previous_snapshot_tag, src_snapshot_name, '|',
1310 'ssh', ssh_to_dst_cmd,
1311 'sudo', 'zfs', 'receive', '-vF', dst_dataset_name,
1312 )
1313 except exception.ProcessExecutionError as e:
1314 LOG.warning(
1315 "Failed to sync snapshot instance %(id)s. %(e)s",
1316 {'id': replica_snapshot['id'], 'e': e})
1317 replica_snapshots_dict[replica_snapshot['id']]['status'] = (
1318 constants.STATUS_ERROR)
1319 continue
1320
1321 replica_snapshots_dict[replica_snapshot['id']]['status'] = (
1322 constants.STATUS_AVAILABLE)
1323
1324 msg = ("Info about last replica '%(replica_id)s' "
1325 "sync is following: \n%(out)s")
1326 LOG.debug(msg, {'replica_id': replica_id, 'out': out})
1327
1328 # Update latest replication snapshot for replica
1329 self.private_storage.update(
1330 replica_id, {'repl_snapshot_tag': repl_snapshot_tag})
1331
1332 # Update latest replication snapshot for currently active replica
1333 self.private_storage.update(
1334 active_replica['id'], {'repl_snapshot_tag': repl_snapshot_tag})
1335
1336 return list(replica_snapshots_dict.values())
1337
1338 @ensure_share_server_not_provided
1339 def delete_replicated_snapshot(self, context, replica_list,
1340 replica_snapshots, share_server=None):
1341 """Delete a snapshot by deleting its instances across the replicas."""
1342 active_replica = self._get_active_replica(replica_list)
1343 replica_snapshots_dict = {
1344 si['id']: {'id': si['id']} for si in replica_snapshots}
1345
1346 for replica_snapshot in replica_snapshots:
1347 replica_id = replica_snapshot['share_instance_id']
1348 snapshot_name = self._get_saved_snapshot_name(replica_snapshot)
1349 if active_replica['id'] == replica_id:
1350 self._delete_snapshot(context, replica_snapshot)
1351 replica_snapshots_dict[replica_snapshot['id']]['status'] = (
1352 constants.STATUS_DELETED)
1353 continue
1354 ssh_cmd = self.private_storage.get(replica_id, 'ssh_cmd')
1355 out, err = self.execute(
1356 'ssh', ssh_cmd,
1357 'sudo', 'zfs', 'list', '-r', '-t', 'snapshot', snapshot_name,
1358 )
1359 data = self.parse_zfs_answer(out)
1360 for datum in data:
1361 if datum['NAME'] != snapshot_name:
1362 continue
1363 self.execute_with_retry(
1364 'ssh', ssh_cmd,
1365 'sudo', 'zfs', 'destroy', '-f', datum['NAME'],
1366 )
1367
1368 self.private_storage.delete(replica_snapshot['id'])
1369 replica_snapshots_dict[replica_snapshot['id']]['status'] = (
1370 constants.STATUS_DELETED)
1371
1372 self.private_storage.delete(replica_snapshot['snapshot_id'])
1373 return list(replica_snapshots_dict.values())
1374
1375 @ensure_share_server_not_provided
1376 def update_replicated_snapshot(self, context, replica_list,
1377 share_replica, replica_snapshots,
1378 replica_snapshot, share_server=None):
1379 """Update the status of a snapshot instance that lives on a replica."""
1380
1381 self._update_replica_state(context, replica_list, share_replica)
1382
1383 snapshot_name = self._get_saved_snapshot_name(replica_snapshot)
1384
1385 out, err = self.zfs('list', '-r', '-t', 'snapshot', snapshot_name)
1386 data = self.parse_zfs_answer(out)
1387 snapshot_found = False
1388 for datum in data:
1389 if datum['NAME'] == snapshot_name:
1390 snapshot_found = True
1391 break
1392 return_dict = {'id': replica_snapshot['id']}
1393 if snapshot_found:
1394 return_dict.update({'status': constants.STATUS_AVAILABLE})
1395 else:
1396 return_dict.update({'status': constants.STATUS_ERROR})
1397
1398 return return_dict
1399
1400 @ensure_share_server_not_provided
1401 def migration_check_compatibility(
1402 self, context, source_share, destination_share,
1403 share_server=None, destination_share_server=None):
1404 """Is called to test compatibility with destination backend."""
1405 backend_name = share_utils.extract_host(
1406 destination_share['host'], level='backend_name')
1407 config = get_backend_configuration(backend_name)
1408 compatible = self.configuration.share_driver == config.share_driver
1409 return {
1410 'compatible': compatible,
1411 'writable': False,
1412 'preserve_metadata': True,
1413 'nondisruptive': True,
1414 }
1415
1416 @ensure_share_server_not_provided
1417 def migration_start(
1418 self, context, source_share, destination_share, source_snapshots,
1419 snapshot_mappings, share_server=None,
1420 destination_share_server=None):
1421 """Is called to start share migration."""
1422
1423 src_dataset_name = self.private_storage.get(
1424 source_share['id'], 'dataset_name')
1425 dst_dataset_name = self._get_dataset_name(destination_share)
1426 backend_name = share_utils.extract_host(
1427 destination_share['host'], level='backend_name')
1428 ssh_cmd = '%(username)s@%(host)s' % {
1429 'username': self.configuration.zfs_ssh_username,
1430 'host': self.configuration.zfs_service_ip,
1431 }
1432 config = get_backend_configuration(backend_name)
1433 remote_ssh_cmd = '%(username)s@%(host)s' % {
1434 'username': config.zfs_ssh_username,
1435 'host': config.zfs_service_ip,
1436 }
1437 snapshot_tag = self._get_migration_snapshot_tag(destination_share)
1438 src_snapshot_name = (
1439 '%(dataset_name)s@%(snapshot_tag)s' % {
1440 'snapshot_tag': snapshot_tag,
1441 'dataset_name': src_dataset_name,
1442 }
1443 )
1444
1445 # Save valuable data to DB
1446 self.private_storage.update(source_share['id'], {
1447 'migr_snapshot_tag': snapshot_tag,
1448 })
1449 self.private_storage.update(destination_share['id'], {
1450 'entity_type': 'share',
1451 'dataset_name': dst_dataset_name,
1452 'ssh_cmd': remote_ssh_cmd,
1453 'pool_name': share_utils.extract_host(
1454 destination_share['host'], level='pool'),
1455 'migr_snapshot_tag': snapshot_tag,
1456 })
1457
1458 # Create temporary snapshot on src host.
1459 self.execute('sudo', 'zfs', 'snapshot', src_snapshot_name)
1460
1461 # Send/receive temporary snapshot
1462 cmd = (
1463 'ssh ' + ssh_cmd + ' '
1464 'sudo zfs send -vDR ' + src_snapshot_name + ' '
1465 '| ssh ' + remote_ssh_cmd + ' '
1466 'sudo zfs receive -v ' + dst_dataset_name
1467 )
1468 filename = dst_dataset_name.replace('/', '_')
1469 with utils.tempdir() as tmpdir:
1470 tmpfilename = os.path.join(tmpdir, '%s.sh' % filename)
1471 with open(tmpfilename, "w") as migr_script:
1472 migr_script.write(cmd)
1473 self.execute('sudo', 'chmod', '755', tmpfilename)
1474 self.execute('nohup', tmpfilename, '&')
1475
1476 @ensure_share_server_not_provided
1477 def migration_continue(
1478 self, context, source_share, destination_share, source_snapshots,
1479 snapshot_mappings, share_server=None,
1480 destination_share_server=None):
1481 """Is called in source share's backend to continue migration."""
1482
1483 snapshot_tag = self.private_storage.get(
1484 destination_share['id'], 'migr_snapshot_tag')
1485
1486 out, err = self.execute('ps', 'aux')
1487 if not '@%s' % snapshot_tag in out:
1488 dst_dataset_name = self.private_storage.get(
1489 destination_share['id'], 'dataset_name')
1490 try:
1491 self.execute(
1492 'sudo', 'zfs', 'get', 'quota', dst_dataset_name,
1493 executor=self._get_shell_executor_by_host(
1494 destination_share['host']),
1495 )
1496 return True
1497 except exception.ProcessExecutionError as e:
1498 raise exception.ZFSonLinuxException(msg=_(
1499 'Migration process is absent and dst dataset '
1500 'returned following error: %s') % e)
1501
1502 @ensure_share_server_not_provided
1503 def migration_complete(
1504 self, context, source_share, destination_share, source_snapshots,
1505 snapshot_mappings, share_server=None,
1506 destination_share_server=None):
1507 """Is called to perform 2nd phase of driver migration of a given share.
1508
1509 """
1510 dst_dataset_name = self.private_storage.get(
1511 destination_share['id'], 'dataset_name')
1512 snapshot_tag = self.private_storage.get(
1513 destination_share['id'], 'migr_snapshot_tag')
1514 dst_snapshot_name = (
1515 '%(dataset_name)s@%(snapshot_tag)s' % {
1516 'snapshot_tag': snapshot_tag,
1517 'dataset_name': dst_dataset_name,
1518 }
1519 )
1520
1521 dst_executor = self._get_shell_executor_by_host(
1522 destination_share['host'])
1523
1524 # Destroy temporary migration snapshot on dst host
1525 self.execute(
1526 'sudo', 'zfs', 'destroy', dst_snapshot_name,
1527 executor=dst_executor,
1528 )
1529
1530 # Get export locations of new share instance
1531 export_locations = self._get_share_helper(
1532 destination_share['share_proto']).create_exports(
1533 dst_dataset_name,
1534 executor=dst_executor)
1535
1536 # Destroy src share and temporary migration snapshot on src (this) host
1537 self.delete_share(context, source_share)
1538
1539 return {'export_locations': export_locations}
1540
1541 @ensure_share_server_not_provided
1542 def migration_cancel(
1543 self, context, source_share, destination_share, source_snapshots,
1544 snapshot_mappings, share_server=None,
1545 destination_share_server=None):
1546 """Is called to cancel driver migration."""
1547
1548 src_dataset_name = self.private_storage.get(
1549 source_share['id'], 'dataset_name')
1550 dst_dataset_name = self.private_storage.get(
1551 destination_share['id'], 'dataset_name')
1552 ssh_cmd = self.private_storage.get(
1553 destination_share['id'], 'ssh_cmd')
1554 snapshot_tag = self.private_storage.get(
1555 destination_share['id'], 'migr_snapshot_tag')
1556
1557 # Kill migration process if exists
1558 try:
1559 out, err = self.execute('ps', 'aux')
1560 lines = out.split('\n')
1561 for line in lines:
1562 if '@%s' % snapshot_tag in line:
1563 migr_pid = [
1564 x for x in line.strip().split(' ') if x != ''][1]
1565 self.execute('sudo', 'kill', '-9', migr_pid)
1566 except exception.ProcessExecutionError as e:
1567 LOG.warning(
1568 "Caught following error trying to kill migration process: %s",
1569 e)
1570
1571 # Sleep couple of seconds before destroying updated objects
1572 time.sleep(2)
1573
1574 # Destroy snapshot on source host
1575 self._delete_dataset_or_snapshot_with_retry(
1576 src_dataset_name + '@' + snapshot_tag)
1577
1578 # Destroy dataset and its migration snapshot on destination host
1579 try:
1580 self.execute(
1581 'ssh', ssh_cmd,
1582 'sudo', 'zfs', 'destroy', '-r', dst_dataset_name,
1583 )
1584 except exception.ProcessExecutionError as e:
1585 LOG.warning(
1586 "Failed to destroy destination dataset with following error: "
1587 "%s",
1588 e)
1589
1590 LOG.debug(
1591 "Migration of share with ID '%s' has been canceled.",
1592 source_share["id"])