api_image_import.py (glance-20.0.0) | : | api_image_import.py (glance-20.0.1) | ||
---|---|---|---|---|
skipping to change at line 75 | skipping to change at line 75 | |||
* Any provided Task object name to be included | * Any provided Task object name to be included | |||
in to the flow. | in to the flow. | |||
""")), | """)), | |||
] | ] | |||
CONF.register_opts(api_import_opts, group='image_import_opts') | CONF.register_opts(api_import_opts, group='image_import_opts') | |||
# TODO(jokke): We should refactor the task implementations so that we do not | # TODO(jokke): We should refactor the task implementations so that we do not | |||
# need to duplicate what we have already for example in base_import.py. | # need to duplicate what we have already for example in base_import.py. | |||
class _NoStoresSucceeded(exception.GlanceException): | ||||
def __init__(self, message): | ||||
super(_NoStoresSucceeded, self).__init__(message) | ||||
class _DeleteFromFS(task.Task): | class _DeleteFromFS(task.Task): | |||
def __init__(self, task_id, task_type): | def __init__(self, task_id, task_type): | |||
self.task_id = task_id | self.task_id = task_id | |||
self.task_type = task_type | self.task_type = task_type | |||
super(_DeleteFromFS, self).__init__( | super(_DeleteFromFS, self).__init__( | |||
name='%s-DeleteFromFS-%s' % (task_type, task_id)) | name='%s-DeleteFromFS-%s' % (task_type, task_id)) | |||
def execute(self, file_path): | def execute(self, file_path): | |||
"""Remove file from the backend | """Remove file from the backend | |||
:param file_path: path to the file being deleted | :param file_path: path to the file being deleted | |||
""" | """ | |||
if CONF.enabled_backends: | if CONF.enabled_backends: | |||
store_api.delete(file_path, 'os_glance_staging_store') | try: | |||
store_api.delete(file_path, 'os_glance_staging_store') | ||||
except store_api.exceptions.NotFound as e: | ||||
LOG.error(_("After upload to backend, deletion of staged " | ||||
"image data from %(fn)s has failed because " | ||||
"%(em)s"), {'fn': file_path, | ||||
'em': e.message}) | ||||
else: | else: | |||
# TODO(abhishekk): After removal of backend module from | # TODO(abhishekk): After removal of backend module from | |||
# glance_store need to change this to use multi_backend | # glance_store need to change this to use multi_backend | |||
# module. | # module. | |||
file_path = file_path[7:] | file_path = file_path[7:] | |||
if os.path.exists(file_path): | if os.path.exists(file_path): | |||
try: | try: | |||
LOG.debug(_("After upload to the backend, deleting staged " | LOG.debug(_("After upload to the backend, deleting staged " | |||
"image data from %(fn)s"), {'fn': file_path}) | "image data from %(fn)s"), {'fn': file_path}) | |||
os.unlink(file_path) | os.unlink(file_path) | |||
skipping to change at line 314 | skipping to change at line 325 | |||
LOG.warning(msg) | LOG.warning(msg) | |||
else: | else: | |||
if len(image.locations) == 0: | if len(image.locations) == 0: | |||
image.checksum = None | image.checksum = None | |||
image.os_hash_algo = None | image.os_hash_algo = None | |||
image.os_hash_value = None | image.os_hash_value = None | |||
image.size = None | image.size = None | |||
self.image_repo.save(image) | self.image_repo.save(image) | |||
break | break | |||
class _SaveImage(task.Task): | class _VerifyImageState(task.Task): | |||
def __init__(self, task_id, task_type, image_repo, image_id, | def __init__(self, task_id, task_type, image_repo, image_id, | |||
import_method): | import_method): | |||
self.task_id = task_id | self.task_id = task_id | |||
self.task_type = task_type | self.task_type = task_type | |||
self.image_repo = image_repo | self.image_repo = image_repo | |||
self.image_id = image_id | self.image_id = image_id | |||
self.import_method = import_method | self.import_method = import_method | |||
super(_SaveImage, self).__init__( | super(_VerifyImageState, self).__init__( | |||
name='%s-SaveImage-%s' % (task_type, task_id)) | name='%s-VerifyImageState-%s' % (task_type, task_id)) | |||
def execute(self): | def execute(self): | |||
"""Transition image status to active | """Verify we have active image | |||
:param image_id: Glance Image ID | :param image_id: Glance Image ID | |||
""" | """ | |||
new_image = self.image_repo.get(self.image_id) | new_image = self.image_repo.get(self.image_id) | |||
if (self.import_method != 'copy-image' | if new_image.status != 'active': | |||
and new_image.status == 'importing'): | raise _NoStoresSucceeded(_('None of the uploads finished!')) | |||
# NOTE(flaper87): THIS IS WRONG! | ||||
# we should be doing atomic updates to avoid | def revert(self, result, **kwargs): | |||
# race conditions. This happens in other places | """Set back to queued if this wasn't copy-image job.""" | |||
# too. | if self.import_method != 'copy-image': | |||
new_image.status = 'active' | new_image = self.image_repo.get(self.image_id) | |||
self.image_repo.save(new_image) | new_image.status = 'queued' | |||
self.image_repo.save_image(new_image) | ||||
class _CompleteTask(task.Task): | class _CompleteTask(task.Task): | |||
def __init__(self, task_id, task_type, task_repo, image_id): | def __init__(self, task_id, task_type, task_repo, image_id): | |||
self.task_id = task_id | self.task_id = task_id | |||
self.task_type = task_type | self.task_type = task_type | |||
self.task_repo = task_repo | self.task_repo = task_repo | |||
self.image_id = image_id | self.image_id = image_id | |||
super(_CompleteTask, self).__init__( | super(_CompleteTask, self).__init__( | |||
name='%s-CompleteTask-%s' % (task_type, task_id)) | name='%s-CompleteTask-%s' % (task_type, task_id)) | |||
skipping to change at line 429 | skipping to change at line 441 | |||
if CONF.enabled_backends: | if CONF.enabled_backends: | |||
separator, staging_dir = store_utils.get_dir_separator() | separator, staging_dir = store_utils.get_dir_separator() | |||
file_uri = separator.join((staging_dir, str(image_id))) | file_uri = separator.join((staging_dir, str(image_id))) | |||
else: | else: | |||
file_uri = separator.join((CONF.node_staging_uri, str(image_id))) | file_uri = separator.join((CONF.node_staging_uri, str(image_id))) | |||
else: | else: | |||
file_uri = uri | file_uri = uri | |||
flow.add(_VerifyStaging(task_id, task_type, task_repo, file_uri)) | flow.add(_VerifyStaging(task_id, task_type, task_repo, file_uri)) | |||
for plugin in import_plugins.get_import_plugins(**kwargs): | # Note(jokke): The plugins were designed to act on the image data or | |||
flow.add(plugin) | # metadata during the import process before the image goes active. It | |||
# does not make sense to try to execute them during 'copy-image'. | ||||
if import_method != 'copy-image': | ||||
for plugin in import_plugins.get_import_plugins(**kwargs): | ||||
flow.add(plugin) | ||||
else: | ||||
LOG.debug("Skipping plugins on 'copy-image' job.") | ||||
for idx, store in enumerate(stores, 1): | for idx, store in enumerate(stores, 1): | |||
set_active = (not all_stores_must_succeed) or (idx == len(stores)) | set_active = (not all_stores_must_succeed) or (idx == len(stores)) | |||
if import_method == 'copy-image': | if import_method == 'copy-image': | |||
set_active = False | set_active = False | |||
task_name = task_type + "-" + (store or "") | task_name = task_type + "-" + (store or "") | |||
import_task = lf.Flow(task_name) | import_task = lf.Flow(task_name) | |||
import_to_store = _ImportToStore(task_id, | import_to_store = _ImportToStore(task_id, | |||
task_name, | task_name, | |||
image_repo, | image_repo, | |||
skipping to change at line 452 | skipping to change at line 470 | |||
image_id, | image_id, | |||
store, | store, | |||
all_stores_must_succeed, | all_stores_must_succeed, | |||
set_active) | set_active) | |||
import_task.add(import_to_store) | import_task.add(import_to_store) | |||
flow.add(import_task) | flow.add(import_task) | |||
delete_task = lf.Flow(task_type).add(_DeleteFromFS(task_id, task_type)) | delete_task = lf.Flow(task_type).add(_DeleteFromFS(task_id, task_type)) | |||
flow.add(delete_task) | flow.add(delete_task) | |||
save_task = _SaveImage(task_id, | verify_task = _VerifyImageState(task_id, | |||
task_type, | task_type, | |||
image_repo, | image_repo, | |||
image_id, | image_id, | |||
import_method) | import_method) | |||
flow.add(save_task) | flow.add(verify_task) | |||
complete_task = _CompleteTask(task_id, | complete_task = _CompleteTask(task_id, | |||
task_type, | task_type, | |||
task_repo, | task_repo, | |||
image_id) | image_id) | |||
flow.add(complete_task) | flow.add(complete_task) | |||
image = image_repo.get(image_id) | image = image_repo.get(image_id) | |||
from_state = image.status | from_state = image.status | |||
if import_method != 'copy-image': | if import_method != 'copy-image': | |||
End of changes. 8 change blocks. | ||||
21 lines changed or deleted | 39 lines changed or added |