"Fossies" - the Fresh Open Source Software Archive

Member "glance-24.1.0/glance/async_/flows/base_import.py" (8 Jun 2022, 20847 Bytes) of package /linux/misc/openstack/glance-24.1.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "base_import.py" see the Fossies "Dox" file reference documentation.

    1 # Copyright 2015 OpenStack Foundation
    2 # All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 
   16 import json
   17 import os
   18 
   19 import glance_store as store_api
   20 from glance_store import backend
   21 from oslo_concurrency import processutils as putils
   22 from oslo_config import cfg
   23 from oslo_log import log as logging
   24 from oslo_utils import encodeutils
   25 from oslo_utils import excutils
   26 from stevedore import named
   27 from taskflow.patterns import linear_flow as lf
   28 from taskflow import retry
   29 from taskflow import task
   30 from taskflow.types import failure
   31 
   32 from glance.async_ import utils
   33 from glance.common import exception
   34 from glance.common.scripts.image_import import main as image_import
   35 from glance.common.scripts import utils as script_utils
   36 from glance.i18n import _, _LE, _LI
   37 
   38 
   39 LOG = logging.getLogger(__name__)
   40 
   41 
   42 CONF = cfg.CONF
   43 
   44 
   45 class _CreateImage(task.Task):
   46 
   47     default_provides = 'image_id'
   48 
   49     def __init__(self, task_id, task_type, task_repo, image_repo,
   50                  image_factory):
   51         self.task_id = task_id
   52         self.task_type = task_type
   53         self.task_repo = task_repo
   54         self.image_repo = image_repo
   55         self.image_factory = image_factory
   56         super(_CreateImage, self).__init__(
   57             name='%s-CreateImage-%s' % (task_type, task_id))
   58 
   59     def execute(self):
   60         task = script_utils.get_task(self.task_repo, self.task_id)
   61         if task is None:
   62             return
   63         task_input = script_utils.unpack_task_input(task)
   64         image = image_import.create_image(
   65             self.image_repo, self.image_factory,
   66             task_input.get('image_properties'), self.task_id)
   67 
   68         LOG.debug("Task %(task_id)s created image %(image_id)s",
   69                   {'task_id': task.task_id, 'image_id': image.image_id})
   70         return image.image_id
   71 
   72     def revert(self, *args, **kwargs):
   73         # TODO(NiallBunting): Deleting the image like this could be considered
   74         # a brute force way of reverting images. It may be worth checking if
   75         # data has been written.
   76         result = kwargs.get('result', None)
   77         if result is not None:
   78             if kwargs.get('flow_failures', None) is not None:
   79                 image = self.image_repo.get(result)
   80                 LOG.debug("Deleting image whilst reverting.")
   81                 image.delete()
   82                 self.image_repo.remove(image)
   83 
   84 
   85 class _ImportToFS(task.Task):
   86 
   87     default_provides = 'file_path'
   88 
   89     def __init__(self, task_id, task_type, task_repo, uri):
   90         self.task_id = task_id
   91         self.task_type = task_type
   92         self.task_repo = task_repo
   93         self.uri = uri
   94         super(_ImportToFS, self).__init__(
   95             name='%s-ImportToFS-%s' % (task_type, task_id))
   96 
   97         # NOTE(abhishekk): Use reserved 'os_glance_tasks_store' for tasks,
   98         # the else part will be removed once old way of configuring store
   99         # is deprecated.
  100         if CONF.enabled_backends:
  101             self.store = store_api.get_store_from_store_identifier(
  102                 'os_glance_tasks_store')
  103         else:
  104             if CONF.task.work_dir is None:
  105                 msg = (_("%(task_id)s of %(task_type)s not configured "
  106                          "properly. Missing work dir: %(work_dir)s") %
  107                        {'task_id': self.task_id,
  108                         'task_type': self.task_type,
  109                         'work_dir': CONF.task.work_dir})
  110                 raise exception.BadTaskConfiguration(msg)
  111 
  112             self.store = self._build_store()
  113 
  114     def _build_store(self):
  115         # NOTE(flaper87): Due to the nice glance_store api (#sarcasm), we're
  116         # forced to build our own config object, register the required options
  117         # (and by required I mean *ALL* of them, even the ones we don't want),
  118         # and create our own store instance by calling a private function.
  119         # This is certainly unfortunate but it's the best we can do until the
  120         # glance_store refactor is done. A good thing is that glance_store is
  121         # under our team's management and it gates on Glance so changes to
  122         # this API will (should?) break task's tests.
  123         conf = cfg.ConfigOpts()
  124         backend.register_opts(conf)
  125         conf.set_override('filesystem_store_datadir',
  126                           CONF.task.work_dir,
  127                           group='glance_store')
  128 
  129         # NOTE(flaper87): Do not even try to judge me for this... :(
  130         # With the glance_store refactor, this code will change, until
  131         # that happens, we don't have a better option and this is the
  132         # least worst one, IMHO.
  133         store = backend._load_store(conf, 'file')
  134 
  135         if store is None:
  136             msg = (_("%(task_id)s of %(task_type)s not configured "
  137                      "properly. Could not load the filesystem store") %
  138                    {'task_id': self.task_id, 'task_type': self.task_type})
  139             raise exception.BadTaskConfiguration(msg)
  140 
  141         store.configure()
  142         return store
  143 
  144     def execute(self, image_id):
  145         """Create temp file into store and return path to it
  146 
  147         :param image_id: Glance Image ID
  148         """
  149         # NOTE(flaper87): We've decided to use a separate `work_dir` for
  150         # this task - and tasks coming after this one - as a way to expect
  151         # users to configure a local store for pre-import works on the image
  152         # to happen.
  153         #
  154         # While using any path should be "technically" fine, it's not what
  155         # we recommend as the best solution. For more details on this, please
  156         # refer to the comment in the `_ImportToStore.execute` method.
  157         data = script_utils.get_image_data_iter(self.uri)
  158 
  159         path = self.store.add(image_id, data, 0, context=None)[0]
  160 
  161         try:
  162             # NOTE(flaper87): Consider moving this code to a common
  163             # place that other tasks can consume as well.
  164             stdout, stderr = putils.trycmd('qemu-img', 'info',
  165                                            '--output=json', path,
  166                                            prlimit=utils.QEMU_IMG_PROC_LIMITS,
  167                                            log_errors=putils.LOG_ALL_ERRORS)
  168         except OSError as exc:
  169             with excutils.save_and_reraise_exception():
  170                 exc_message = encodeutils.exception_to_unicode(exc)
  171                 msg = _LE('Failed to execute security checks on the image '
  172                           '%(task_id)s: %(exc)s')
  173                 LOG.error(msg, {'task_id': self.task_id, 'exc': exc_message})
  174 
  175         metadata = json.loads(stdout)
  176 
  177         backing_file = metadata.get('backing-filename')
  178         if backing_file is not None:
  179             msg = _("File %(path)s has invalid backing file "
  180                     "%(bfile)s, aborting.") % {'path': path,
  181                                                'bfile': backing_file}
  182             raise RuntimeError(msg)
  183 
  184         return path
  185 
  186     def revert(self, image_id, result, **kwargs):
  187         if isinstance(result, failure.Failure):
  188             LOG.exception(_LE('Task: %(task_id)s failed to import image '
  189                               '%(image_id)s to the filesystem.'),
  190                           {'task_id': self.task_id, 'image_id': image_id})
  191             return
  192 
  193         if os.path.exists(result.split("file://")[-1]):
  194             if CONF.enabled_backends:
  195                 store_api.delete(result, 'os_glance_tasks_store')
  196             else:
  197                 store_api.delete_from_backend(result)
  198 
  199 
  200 class _DeleteFromFS(task.Task):
  201 
  202     def __init__(self, task_id, task_type):
  203         self.task_id = task_id
  204         self.task_type = task_type
  205         super(_DeleteFromFS, self).__init__(
  206             name='%s-DeleteFromFS-%s' % (task_type, task_id))
  207 
  208     def execute(self, file_path):
  209         """Remove file from the backend
  210 
  211         :param file_path: path to the file being deleted
  212         """
  213         if CONF.enabled_backends:
  214             store_api.delete(file_path, 'os_glance_tasks_store')
  215         else:
  216             store_api.delete_from_backend(file_path)
  217 
  218 
  219 class _ImportToStore(task.Task):
  220 
  221     def __init__(self, task_id, task_type, image_repo, uri, backend):
  222         self.task_id = task_id
  223         self.task_type = task_type
  224         self.image_repo = image_repo
  225         self.uri = uri
  226         self.backend = backend
  227         super(_ImportToStore, self).__init__(
  228             name='%s-ImportToStore-%s' % (task_type, task_id))
  229 
  230     def execute(self, image_id, file_path=None):
  231         """Bringing the introspected image to back end store
  232 
  233         :param image_id: Glance Image ID
  234         :param file_path: path to the image file
  235         """
  236         # NOTE(flaper87): There are a couple of interesting bits in the
  237         # interaction between this task and the `_ImportToFS` one. I'll try
  238         # to cover them in this comment.
  239         #
  240         # NOTE(flaper87):
  241         # `_ImportToFS` downloads the image to a dedicated `work_dir` which
  242         # needs to be configured in advance (please refer to the config option
  243         # docs for more info). The motivation behind this is also explained in
  244         # the `_ImportToFS.execute` method.
  245         #
  246         # Due to the fact that we have an `_ImportToFS` task which downloads
  247         # the image data already, we need to be as smart as we can in this task
  248         # to avoid downloading the data several times and reducing the copy or
  249         # write times. There are several scenarios where the interaction
  250         # between this task and `_ImportToFS` could be improved. All these
  251         # scenarios assume the `_ImportToFS` task has been executed before
  252         # and/or in a more abstract scenario, that `file_path` is being
  253         # provided.
  254         #
  255         # Scenario 1: FS Store is Remote, introspection enabled,
  256         # conversion disabled
  257         #
  258         # In this scenario, the user would benefit from having the scratch path
  259         # being the same path as the fs store. Only one write would happen and
  260         # an extra read will happen in order to introspect the image. Note that
  261         # this read is just for the image headers and not the entire file.
  262         #
  263         # Scenario 2: FS Store is remote, introspection enabled,
  264         # conversion enabled
  265         #
  266         # In this scenario, the user would benefit from having a *local* store
  267         # into which the image can be converted. This will require downloading
  268         # the image locally, converting it and then copying the converted image
  269         # to the remote store.
  270         #
  271         # Scenario 3: FS Store is local, introspection enabled,
  272         # conversion disabled
  273         # Scenario 4: FS Store is local, introspection enabled,
  274         # conversion enabled
  275         #
  276         # In both these scenarios the user shouldn't care if the FS
  277         # store path and the work dir are the same, therefore probably
  278         # benefit, about the scratch path and the FS store being the
  279         # same from a performance perspective. Space wise, regardless
  280         # of the scenario, the user will have to account for it in
  281         # advance.
  282         #
  283         # Lets get to it and identify the different scenarios in the
  284         # implementation
  285         image = self.image_repo.get(image_id)
  286         image.status = 'saving'
  287         self.image_repo.save(image)
  288 
  289         # NOTE(flaper87): Let's dance... and fall
  290         #
  291         # Unfortunately, because of the way our domain layers work and
  292         # the checks done in the FS store, we can't simply rename the file
  293         # and set the location. To do that, we'd have to duplicate the logic
  294         # of every and each of the domain factories (quota, location, etc)
  295         # and we'd also need to hack the FS store to prevent it from raising
  296         # a "duplication path" error. I'd rather have this task copying the
  297         # image bits one more time than duplicating all that logic.
  298         #
  299         # Since I don't think this should be the definitive solution, I'm
  300         # leaving the code below as a reference for what should happen here
  301         # once the FS store and domain code will be able to handle this case.
  302         #
  303         # if file_path is None:
  304         #    image_import.set_image_data(image, self.uri, None)
  305         #    return
  306 
  307         # NOTE(flaper87): Don't assume the image was stored in the
  308         # work_dir. Think in the case this path was provided by another task.
  309         # Also, lets try to neither assume things nor create "logic"
  310         # dependencies between this task and `_ImportToFS`
  311         #
  312         # base_path = os.path.dirname(file_path.split("file://")[-1])
  313 
  314         # NOTE(flaper87): Hopefully just scenarios #3 and #4. I say
  315         # hopefully because nothing prevents the user to use the same
  316         # FS store path as a work dir
  317         #
  318         # image_path = os.path.join(base_path, image_id)
  319         #
  320         # if (base_path == CONF.glance_store.filesystem_store_datadir or
  321         #      base_path in CONF.glance_store.filesystem_store_datadirs):
  322         #     os.rename(file_path, image_path)
  323         #
  324         # image_import.set_image_data(image, image_path, None)
  325         try:
  326             image_import.set_image_data(image,
  327                                         file_path or self.uri, self.task_id,
  328                                         backend=self.backend)
  329         except IOError as e:
  330             msg = (_('Uploading the image failed due to: %(exc)s') %
  331                    {'exc': encodeutils.exception_to_unicode(e)})
  332             LOG.error(msg)
  333             raise exception.UploadException(message=msg)
  334         # NOTE(flaper87): We need to save the image again after the locations
  335         # have been set in the image.
  336         self.image_repo.save(image)
  337 
  338 
  339 class _SaveImage(task.Task):
  340 
  341     def __init__(self, task_id, task_type, image_repo):
  342         self.task_id = task_id
  343         self.task_type = task_type
  344         self.image_repo = image_repo
  345         super(_SaveImage, self).__init__(
  346             name='%s-SaveImage-%s' % (task_type, task_id))
  347 
  348     def execute(self, image_id):
  349         """Transition image status to active
  350 
  351         :param image_id: Glance Image ID
  352         """
  353         new_image = self.image_repo.get(image_id)
  354         if new_image.status == 'saving':
  355             # NOTE(flaper87): THIS IS WRONG!
  356             # we should be doing atomic updates to avoid
  357             # race conditions. This happens in other places
  358             # too.
  359             new_image.status = 'active'
  360         self.image_repo.save(new_image)
  361 
  362 
  363 class _CompleteTask(task.Task):
  364 
  365     def __init__(self, task_id, task_type, task_repo):
  366         self.task_id = task_id
  367         self.task_type = task_type
  368         self.task_repo = task_repo
  369         super(_CompleteTask, self).__init__(
  370             name='%s-CompleteTask-%s' % (task_type, task_id))
  371 
  372     def execute(self, image_id):
  373         """Finishing the task flow
  374 
  375         :param image_id: Glance Image ID
  376         """
  377         task = script_utils.get_task(self.task_repo, self.task_id)
  378         if task is None:
  379             return
  380         try:
  381             task.succeed({'image_id': image_id})
  382         except Exception as e:
  383             # Note: The message string contains Error in it to indicate
  384             # in the task.message that it's a error message for the user.
  385 
  386             # TODO(nikhil): need to bring back save_and_reraise_exception when
  387             # necessary
  388             log_msg = _LE("Task ID %(task_id)s failed. Error: %(exc_type)s: "
  389                           "%(e)s")
  390             LOG.exception(log_msg, {'exc_type': str(type(e)),
  391                                     'e': encodeutils.exception_to_unicode(e),
  392                                     'task_id': task.task_id})
  393 
  394             err_msg = _("Error: %(exc_type)s: %(e)s")
  395             task.fail(err_msg % {'exc_type': str(type(e)),
  396                                  'e': encodeutils.exception_to_unicode(e)})
  397         finally:
  398             self.task_repo.save(task)
  399 
  400         LOG.info(_LI("%(task_id)s of %(task_type)s completed"),
  401                  {'task_id': self.task_id, 'task_type': self.task_type})
  402 
  403 
  404 def _get_import_flows(**kwargs):
  405     # NOTE(flaper87): Until we have a better infrastructure to enable
  406     # and disable tasks plugins, hard-code the tasks we know exist,
  407     # instead of loading everything from the namespace. This guarantees
  408     # both, the load order of these plugins and the fact that no random
  409     # plugins will be added/loaded until we feel comfortable with this.
  410     # Future patches will keep using NamedExtensionManager but they'll
  411     # rely on a config option to control this process.
  412     extensions = named.NamedExtensionManager('glance.flows.import',
  413                                              names=['ovf_process',
  414                                                     'convert',
  415                                                     'introspect'],
  416                                              name_order=True,
  417                                              invoke_on_load=True,
  418                                              invoke_kwds=kwargs)
  419 
  420     for ext in extensions.extensions:
  421         yield ext.obj
  422 
  423 
  424 def get_flow(**kwargs):
  425     """Return task flow
  426 
  427     :param task_id: Task ID
  428     :param task_type: Type of the task
  429     :param task_repo: Task repo
  430     :param image_repo: Image repository used
  431     :param image_factory: Glance Image Factory
  432     :param uri: uri for the image file
  433     """
  434     task_id = kwargs.get('task_id')
  435     task_type = kwargs.get('task_type')
  436     task_repo = kwargs.get('task_repo')
  437     image_repo = kwargs.get('image_repo')
  438     image_factory = kwargs.get('image_factory')
  439     uri = kwargs.get('uri')
  440     backend = kwargs.get('backend')
  441 
  442     flow = lf.Flow(task_type, retry=retry.AlwaysRevert()).add(
  443         _CreateImage(task_id, task_type, task_repo, image_repo, image_factory))
  444 
  445     import_to_store = _ImportToStore(task_id, task_type, image_repo, uri,
  446                                      backend)
  447 
  448     try:
  449         # NOTE(flaper87): ImportToLocal and DeleteFromLocal shouldn't be here.
  450         # Ideally, we should have the different import flows doing this for us
  451         # and this function should clean up duplicated tasks. For example, say
  452         # 2 flows need to have a local copy of the image - ImportToLocal - in
  453         # order to be able to complete the task - i.e Introspect-. In that
  454         # case, the introspect.get_flow call should add both, ImportToLocal and
  455         # DeleteFromLocal, to the flow and this function will reduce the
  456         # duplicated calls to those tasks by creating a linear flow that
  457         # ensures those are called before the other tasks.  For now, I'm
  458         # keeping them here, though.
  459         limbo = lf.Flow(task_type).add(_ImportToFS(task_id,
  460                                                    task_type,
  461                                                    task_repo,
  462                                                    uri))
  463 
  464         for subflow in _get_import_flows(**kwargs):
  465             limbo.add(subflow)
  466 
  467         # NOTE(flaper87): We have hard-coded 2 tasks,
  468         # if there aren't more than 2, it means that
  469         # no subtask has been registered.
  470         if len(limbo) > 1:
  471             flow.add(limbo)
  472 
  473             # NOTE(flaper87): Until this implementation gets smarter,
  474             # make sure ImportToStore is called *after* the imported
  475             # flow stages. If not, the image will be set to saving state
  476             # invalidating tasks like Introspection or Convert.
  477             flow.add(import_to_store)
  478 
  479             # NOTE(flaper87): Since this is an "optional" task but required
  480             # when `limbo` is executed, we're adding it in its own subflow
  481             # to isolate it from the rest of the flow.
  482             delete_flow = lf.Flow(task_type).add(_DeleteFromFS(task_id,
  483                                                                task_type))
  484             flow.add(delete_flow)
  485         else:
  486             flow.add(import_to_store)
  487     except exception.BadTaskConfiguration as exc:
  488         # NOTE(flaper87): If something goes wrong with the load of
  489         # import tasks, make sure we go on.
  490         LOG.error(_LE('Bad task configuration: %s'), exc.message)
  491         flow.add(import_to_store)
  492 
  493     flow.add(
  494         _SaveImage(task_id, task_type, image_repo),
  495         _CompleteTask(task_id, task_type, task_repo)
  496     )
  497     return flow