"Fossies" - the Fresh Open Source Software Archive

Member "glance-20.0.1/glance/async_/flows/api_image_import.py" (12 Aug 2020, 20817 Bytes) of package /linux/misc/openstack/glance-20.0.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "api_image_import.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 20.0.0_vs_20.0.1.

    1 # Copyright 2015 OpenStack Foundation
    2 # All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 import os
   16 
   17 import glance_store as store_api
   18 from glance_store import backend
   19 from glance_store import exceptions as store_exceptions
   20 from oslo_config import cfg
   21 from oslo_log import log as logging
   22 from oslo_utils import encodeutils
   23 import six
   24 from taskflow.patterns import linear_flow as lf
   25 from taskflow import retry
   26 from taskflow import task
   27 
   28 import glance.async_.flows._internal_plugins as internal_plugins
   29 import glance.async_.flows.plugins as import_plugins
   30 from glance.common import exception
   31 from glance.common.scripts.image_import import main as image_import
   32 from glance.common.scripts import utils as script_utils
   33 from glance.common import store_utils
   34 from glance.i18n import _, _LE, _LI
   35 
   36 
   37 LOG = logging.getLogger(__name__)
   38 
   39 
   40 CONF = cfg.CONF
   41 
   42 
   43 api_import_opts = [
   44     cfg.ListOpt('image_import_plugins',
   45                 item_type=cfg.types.String(quotes=True),
   46                 bounds=True,
   47                 sample_default='[no_op]',
   48                 default=[],
   49                 help=_("""
   50 Image import plugins to be enabled for task processing.
   51 
   52 Provide list of strings reflecting to the task Objects
   53 that should be included to the Image Import flow. The
   54 task objects needs to be defined in the 'glance/async/
   55 flows/plugins/*' and may be implemented by OpenStack
   56 Glance project team, deployer or 3rd party.
   57 
   58 By default no plugins are enabled and to take advantage
   59 of the plugin model the list of plugins must be set
   60 explicitly in the glance-image-import.conf file.
   61 
   62 The allowed values for this option is comma separated
   63 list of object names in between ``[`` and ``]``.
   64 
   65 Possible values:
   66     * no_op (only logs debug level message that the
   67       plugin has been executed)
   68     * Any provided Task object name to be included
   69       in to the flow.
   70 """)),
   71 ]
   72 
   73 CONF.register_opts(api_import_opts, group='image_import_opts')
   74 
   75 # TODO(jokke): We should refactor the task implementations so that we do not
   76 # need to duplicate what we have already for example in base_import.py.
   77 
   78 
   79 class _NoStoresSucceeded(exception.GlanceException):
   80 
   81     def __init__(self, message):
   82         super(_NoStoresSucceeded, self).__init__(message)
   83 
   84 
   85 class _DeleteFromFS(task.Task):
   86 
   87     def __init__(self, task_id, task_type):
   88         self.task_id = task_id
   89         self.task_type = task_type
   90         super(_DeleteFromFS, self).__init__(
   91             name='%s-DeleteFromFS-%s' % (task_type, task_id))
   92 
   93     def execute(self, file_path):
   94         """Remove file from the backend
   95 
   96         :param file_path: path to the file being deleted
   97         """
   98         if CONF.enabled_backends:
   99             try:
  100                 store_api.delete(file_path, 'os_glance_staging_store')
  101             except store_api.exceptions.NotFound as e:
  102                 LOG.error(_("After upload to backend, deletion of staged "
  103                             "image data from %(fn)s has failed because "
  104                             "%(em)s"), {'fn': file_path,
  105                                         'em': e.message})
  106         else:
  107             # TODO(abhishekk): After removal of backend module from
  108             # glance_store need to change this to use multi_backend
  109             # module.
  110             file_path = file_path[7:]
  111             if os.path.exists(file_path):
  112                 try:
  113                     LOG.debug(_("After upload to the backend, deleting staged "
  114                                 "image data from %(fn)s"), {'fn': file_path})
  115                     os.unlink(file_path)
  116                 except OSError as e:
  117                     LOG.error(_("After upload to backend, deletion of staged "
  118                                 "image data from %(fn)s has failed because "
  119                                 "[Errno %(en)d]"), {'fn': file_path,
  120                                                     'en': e.errno})
  121             else:
  122                 LOG.warning(_("After upload to backend, deletion of staged "
  123                               "image data has failed because "
  124                               "it cannot be found at %(fn)s"), {
  125                     'fn': file_path})
  126 
  127 
  128 class _VerifyStaging(task.Task):
  129 
  130     # NOTE(jokke): This could be also for example "staging_path" but to
  131     # keep this compatible with other flows  we want to stay consistent
  132     # with base_import
  133     default_provides = 'file_path'
  134 
  135     def __init__(self, task_id, task_type, task_repo, uri):
  136         self.task_id = task_id
  137         self.task_type = task_type
  138         self.task_repo = task_repo
  139         self.uri = uri
  140         super(_VerifyStaging, self).__init__(
  141             name='%s-ConfigureStaging-%s' % (task_type, task_id))
  142 
  143         # NOTE(jokke): If we want to use other than 'file' store in the
  144         # future, this is one thing that needs to change.
  145         try:
  146             uri.index('file:///', 0)
  147         except ValueError:
  148             msg = (_("%(task_id)s of %(task_type)s not configured "
  149                      "properly. Value of node_staging_uri must be "
  150                      " in format 'file://<absolute-path>'") %
  151                    {'task_id': self.task_id,
  152                     'task_type': self.task_type})
  153             raise exception.BadTaskConfiguration(msg)
  154 
  155         if not CONF.enabled_backends:
  156             # NOTE(jokke): We really don't need the store for anything but
  157             # verifying that we actually can build the store will allow us to
  158             # fail the flow early with clear message why that happens.
  159             self._build_store()
  160 
  161     def _build_store(self):
  162         # TODO(abhishekk): After removal of backend module from glance_store
  163         # need to change this to use multi_backend module.
  164         # NOTE(jokke): If we want to use some other store for staging, we can
  165         # implement the logic more general here. For now this should do.
  166         # NOTE(flaper87): Due to the nice glance_store api (#sarcasm), we're
  167         # forced to build our own config object, register the required options
  168         # (and by required I mean *ALL* of them, even the ones we don't want),
  169         # and create our own store instance by calling a private function.
  170         # This is certainly unfortunate but it's the best we can do until the
  171         # glance_store refactor is done. A good thing is that glance_store is
  172         # under our team's management and it gates on Glance so changes to
  173         # this API will (should?) break task's tests.
  174         conf = cfg.ConfigOpts()
  175         try:
  176             backend.register_opts(conf)
  177         except cfg.DuplicateOptError:
  178             pass
  179         conf.set_override('filesystem_store_datadir',
  180                           CONF.node_staging_uri[7:],
  181                           group='glance_store')
  182 
  183         # NOTE(flaper87): Do not even try to judge me for this... :(
  184         # With the glance_store refactor, this code will change, until
  185         # that happens, we don't have a better option and this is the
  186         # least worst one, IMHO.
  187         store = backend._load_store(conf, 'file')
  188 
  189         try:
  190             store.configure()
  191         except AttributeError:
  192             msg = (_("%(task_id)s of %(task_type)s not configured "
  193                      "properly. Could not load the filesystem store") %
  194                    {'task_id': self.task_id, 'task_type': self.task_type})
  195             raise exception.BadTaskConfiguration(msg)
  196 
  197     def execute(self):
  198         """Test the backend store and return the 'file_path'"""
  199         return self.uri
  200 
  201 
  202 class _ImportToStore(task.Task):
  203 
  204     def __init__(self, task_id, task_type, image_repo, uri, image_id, backend,
  205                  all_stores_must_succeed, set_active):
  206         self.task_id = task_id
  207         self.task_type = task_type
  208         self.image_repo = image_repo
  209         self.uri = uri
  210         self.image_id = image_id
  211         self.backend = backend
  212         self.all_stores_must_succeed = all_stores_must_succeed
  213         self.set_active = set_active
  214         super(_ImportToStore, self).__init__(
  215             name='%s-ImportToStore-%s' % (task_type, task_id))
  216 
  217     def execute(self, file_path=None):
  218         """Bringing the imported image to back end store
  219 
  220         :param image_id: Glance Image ID
  221         :param file_path: path to the image file
  222         """
  223         # NOTE(flaper87): Let's dance... and fall
  224         #
  225         # Unfortunatelly, because of the way our domain layers work and
  226         # the checks done in the FS store, we can't simply rename the file
  227         # and set the location. To do that, we'd have to duplicate the logic
  228         # of every and each of the domain factories (quota, location, etc)
  229         # and we'd also need to hack the FS store to prevent it from raising
  230         # a "duplication path" error. I'd rather have this task copying the
  231         # image bits one more time than duplicating all that logic.
  232         #
  233         # Since I don't think this should be the definitive solution, I'm
  234         # leaving the code below as a reference for what should happen here
  235         # once the FS store and domain code will be able to handle this case.
  236         #
  237         # if file_path is None:
  238         #    image_import.set_image_data(image, self.uri, None)
  239         #    return
  240 
  241         # NOTE(flaper87): Don't assume the image was stored in the
  242         # work_dir. Think in the case this path was provided by another task.
  243         # Also, lets try to neither assume things nor create "logic"
  244         # dependencies between this task and `_ImportToFS`
  245         #
  246         # base_path = os.path.dirname(file_path.split("file://")[-1])
  247 
  248         # NOTE(flaper87): Hopefully just scenarios #3 and #4. I say
  249         # hopefully because nothing prevents the user to use the same
  250         # FS store path as a work dir
  251         #
  252         # image_path = os.path.join(base_path, image_id)
  253         #
  254         # if (base_path == CONF.glance_store.filesystem_store_datadir or
  255         #      base_path in CONF.glance_store.filesystem_store_datadirs):
  256         #     os.rename(file_path, image_path)
  257         #
  258         # image_import.set_image_data(image, image_path, None)
  259 
  260         # NOTE(jokke): The different options here are kind of pointless as we
  261         # will need the file path anyways for our delete workflow for now.
  262         # For future proofing keeping this as is.
  263         image = self.image_repo.get(self.image_id)
  264         if image.status == "deleted":
  265             raise exception.ImportTaskError("Image has been deleted, aborting"
  266                                             " import.")
  267         try:
  268             image_import.set_image_data(image, file_path or self.uri,
  269                                         self.task_id, backend=self.backend,
  270                                         set_active=self.set_active)
  271         # NOTE(yebinama): set_image_data catches Exception and raises from
  272         # them. Can't be more specific on exceptions catched.
  273         except Exception:
  274             if self.all_stores_must_succeed:
  275                 raise
  276             msg = (_("%(task_id)s of %(task_type)s failed but since "
  277                      "all_stores_must_succeed is set to false, continue.") %
  278                    {'task_id': self.task_id, 'task_type': self.task_type})
  279             LOG.warning(msg)
  280             if self.backend is not None:
  281                 failed_import = image.extra_properties.get(
  282                     'os_glance_failed_import', '').split(',')
  283                 failed_import.append(self.backend)
  284                 image.extra_properties['os_glance_failed_import'] = ','.join(
  285                     failed_import).lstrip(',')
  286         if self.backend is not None:
  287             importing = image.extra_properties.get(
  288                 'os_glance_importing_to_stores', '').split(',')
  289             try:
  290                 importing.remove(self.backend)
  291                 image.extra_properties[
  292                     'os_glance_importing_to_stores'] = ','.join(
  293                     importing).lstrip(',')
  294             except ValueError:
  295                 LOG.debug("Store %s not found in property "
  296                           "os_glance_importing_to_stores.", self.backend)
  297         # NOTE(flaper87): We need to save the image again after
  298         # the locations have been set in the image.
  299         self.image_repo.save(image)
  300 
  301     def revert(self, result, **kwargs):
  302         """
  303         Remove location from image in case of failure
  304 
  305         :param result: taskflow result object
  306         """
  307         image = self.image_repo.get(self.image_id)
  308         for i, location in enumerate(image.locations):
  309             if location.get('metadata', {}).get('store') == self.backend:
  310                 try:
  311                     image.locations.pop(i)
  312                 except (store_exceptions.NotFound,
  313                         store_exceptions.Forbidden):
  314                     msg = (_("Error deleting from store %{store}s when "
  315                              "reverting.") % {'store': self.backend})
  316                     LOG.warning(msg)
  317                 # NOTE(yebinama): Some store drivers doesn't document which
  318                 # exceptions they throw.
  319                 except Exception:
  320                     msg = (_("Unexpected exception when deleting from store"
  321                              "%{store}s.") % {'store': self.backend})
  322                     LOG.warning(msg)
  323                 else:
  324                     if len(image.locations) == 0:
  325                         image.checksum = None
  326                         image.os_hash_algo = None
  327                         image.os_hash_value = None
  328                         image.size = None
  329                     self.image_repo.save(image)
  330                 break
  331 
  332 
  333 class _VerifyImageState(task.Task):
  334 
  335     def __init__(self, task_id, task_type, image_repo, image_id,
  336                  import_method):
  337         self.task_id = task_id
  338         self.task_type = task_type
  339         self.image_repo = image_repo
  340         self.image_id = image_id
  341         self.import_method = import_method
  342         super(_VerifyImageState, self).__init__(
  343             name='%s-VerifyImageState-%s' % (task_type, task_id))
  344 
  345     def execute(self):
  346         """Verify we have active image
  347 
  348         :param image_id: Glance Image ID
  349         """
  350         new_image = self.image_repo.get(self.image_id)
  351         if new_image.status != 'active':
  352             raise _NoStoresSucceeded(_('None of the uploads finished!'))
  353 
  354     def revert(self, result, **kwargs):
  355         """Set back to queued if this wasn't copy-image job."""
  356         if self.import_method != 'copy-image':
  357             new_image = self.image_repo.get(self.image_id)
  358             new_image.status = 'queued'
  359             self.image_repo.save_image(new_image)
  360 
  361 
  362 class _CompleteTask(task.Task):
  363 
  364     def __init__(self, task_id, task_type, task_repo, image_id):
  365         self.task_id = task_id
  366         self.task_type = task_type
  367         self.task_repo = task_repo
  368         self.image_id = image_id
  369         super(_CompleteTask, self).__init__(
  370             name='%s-CompleteTask-%s' % (task_type, task_id))
  371 
  372     def execute(self):
  373         """Finishing the task flow
  374 
  375         :param image_id: Glance Image ID
  376         """
  377         task = script_utils.get_task(self.task_repo, self.task_id)
  378         if task is None:
  379             return
  380         try:
  381             task.succeed({'image_id': self.image_id})
  382         except Exception as e:
  383             # Note: The message string contains Error in it to indicate
  384             # in the task.message that it's a error message for the user.
  385 
  386             # TODO(nikhil): need to bring back save_and_reraise_exception when
  387             # necessary
  388             log_msg = _LE("Task ID %(task_id)s failed. Error: %(exc_type)s: "
  389                           "%(e)s")
  390             LOG.exception(log_msg, {'exc_type': six.text_type(type(e)),
  391                                     'e': encodeutils.exception_to_unicode(e),
  392                                     'task_id': task.task_id})
  393 
  394             err_msg = _("Error: %(exc_type)s: %(e)s")
  395             task.fail(err_msg % {'exc_type': six.text_type(type(e)),
  396                                  'e': encodeutils.exception_to_unicode(e)})
  397         finally:
  398             self.task_repo.save(task)
  399 
  400         LOG.info(_LI("%(task_id)s of %(task_type)s completed"),
  401                  {'task_id': self.task_id, 'task_type': self.task_type})
  402 
  403 
  404 def get_flow(**kwargs):
  405     """Return task flow
  406 
  407     :param task_id: Task ID
  408     :param task_type: Type of the task
  409     :param task_repo: Task repo
  410     :param image_repo: Image repository used
  411     :param image_id: ID of the Image to be processed
  412     :param uri: uri for the image file
  413     """
  414     task_id = kwargs.get('task_id')
  415     task_type = kwargs.get('task_type')
  416     task_repo = kwargs.get('task_repo')
  417     image_repo = kwargs.get('image_repo')
  418     image_id = kwargs.get('image_id')
  419     import_method = kwargs.get('import_req')['method']['name']
  420     uri = kwargs.get('import_req')['method'].get('uri')
  421     stores = kwargs.get('backend', [None])
  422     all_stores_must_succeed = kwargs.get('import_req').get(
  423         'all_stores_must_succeed', True)
  424 
  425     separator = ''
  426     if not CONF.enabled_backends and not CONF.node_staging_uri.endswith('/'):
  427         separator = '/'
  428 
  429     if not uri and import_method in ['glance-direct', 'copy-image']:
  430         if CONF.enabled_backends:
  431             separator, staging_dir = store_utils.get_dir_separator()
  432             uri = separator.join((staging_dir, str(image_id)))
  433         else:
  434             uri = separator.join((CONF.node_staging_uri, str(image_id)))
  435 
  436     flow = lf.Flow(task_type, retry=retry.AlwaysRevert())
  437 
  438     if import_method in ['web-download', 'copy-image']:
  439         internal_plugin = internal_plugins.get_import_plugin(**kwargs)
  440         flow.add(internal_plugin)
  441         if CONF.enabled_backends:
  442             separator, staging_dir = store_utils.get_dir_separator()
  443             file_uri = separator.join((staging_dir, str(image_id)))
  444         else:
  445             file_uri = separator.join((CONF.node_staging_uri, str(image_id)))
  446     else:
  447         file_uri = uri
  448 
  449     flow.add(_VerifyStaging(task_id, task_type, task_repo, file_uri))
  450 
  451     # Note(jokke): The plugins were designed to act on the image data or
  452     # metadata during the import process before the image goes active. It
  453     # does not make sense to try to execute them during 'copy-image'.
  454     if import_method != 'copy-image':
  455         for plugin in import_plugins.get_import_plugins(**kwargs):
  456             flow.add(plugin)
  457     else:
  458         LOG.debug("Skipping plugins on 'copy-image' job.")
  459 
  460     for idx, store in enumerate(stores, 1):
  461         set_active = (not all_stores_must_succeed) or (idx == len(stores))
  462         if import_method == 'copy-image':
  463             set_active = False
  464         task_name = task_type + "-" + (store or "")
  465         import_task = lf.Flow(task_name)
  466         import_to_store = _ImportToStore(task_id,
  467                                          task_name,
  468                                          image_repo,
  469                                          file_uri,
  470                                          image_id,
  471                                          store,
  472                                          all_stores_must_succeed,
  473                                          set_active)
  474         import_task.add(import_to_store)
  475         flow.add(import_task)
  476 
  477     delete_task = lf.Flow(task_type).add(_DeleteFromFS(task_id, task_type))
  478     flow.add(delete_task)
  479 
  480     verify_task = _VerifyImageState(task_id,
  481                                     task_type,
  482                                     image_repo,
  483                                     image_id,
  484                                     import_method)
  485     flow.add(verify_task)
  486 
  487     complete_task = _CompleteTask(task_id,
  488                                   task_type,
  489                                   task_repo,
  490                                   image_id)
  491     flow.add(complete_task)
  492 
  493     image = image_repo.get(image_id)
  494     from_state = image.status
  495     if import_method != 'copy-image':
  496         image.status = 'importing'
  497 
  498     image.extra_properties[
  499         'os_glance_importing_to_stores'] = ','.join((store for store in
  500                                                      stores if
  501                                                      store is not None))
  502     image.extra_properties['os_glance_failed_import'] = ''
  503     image_repo.save(image, from_state=from_state)
  504 
  505     return flow