"Fossies" - the Fresh Open Source Software Archive

Member "senlin-8.0.0/senlin/engine/actions/base.py" (16 Oct 2019, 26333 Bytes) of package /linux/misc/openstack/senlin-8.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "base.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 7.0.0_vs_8.0.0.

    1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 # not use this file except in compliance with the License. You may obtain
    3 # a copy of the License at
    4 #
    5 #         http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 # License for the specific language governing permissions and limitations
   11 # under the License.
   12 
   13 import eventlet
   14 import six
   15 import time
   16 
   17 from oslo_config import cfg
   18 from oslo_log import log as logging
   19 from oslo_utils import timeutils
   20 
   21 from senlin.common import consts
   22 from senlin.common import context as req_context
   23 from senlin.common import exception
   24 from senlin.common import utils
   25 from senlin.engine import dispatcher
   26 from senlin.engine import event as EVENT
   27 from senlin.objects import action as ao
   28 from senlin.objects import cluster_lock as cl
   29 from senlin.objects import cluster_policy as cpo
   30 from senlin.objects import dependency as dobj
   31 from senlin.objects import node_lock as nl
   32 from senlin.policies import base as policy_mod
   33 
   34 wallclock = time.time
   35 LOG = logging.getLogger(__name__)
   36 
   37 
   38 class Action(object):
   39     """An action can be performed on a cluster or a node of a cluster."""
   40 
   41     RETURNS = (
   42         RES_OK, RES_ERROR, RES_RETRY, RES_CANCEL, RES_TIMEOUT,
   43         RES_LIFECYCLE_COMPLETE, RES_LIFECYCLE_HOOK_TIMEOUT,
   44     ) = (
   45         'OK', 'ERROR', 'RETRY', 'CANCEL', 'TIMEOUT', 'LIFECYCLE_COMPLETE',
   46         'LIFECYCLE_HOOK_TIMEOUT'
   47     )
   48 
   49     # Action status definitions:
   50     #  INIT:      Not ready to be executed because fields are being modified,
   51     #             or dependency with other actions are being analyzed.
   52     #  READY:     Initialized and ready to be executed by a worker.
   53     #  RUNNING:   Being executed by a worker thread.
   54     #  SUCCEEDED: Completed with success.
   55     #  FAILED:    Completed with failure.
   56     #  CANCELLED: Action cancelled because worker thread was cancelled.
   57     STATUSES = (
   58         INIT, WAITING, READY, RUNNING, SUSPENDED,
   59         SUCCEEDED, FAILED, CANCELLED, WAITING_LIFECYCLE_COMPLETION
   60     ) = (
   61         'INIT', 'WAITING', 'READY', 'RUNNING', 'SUSPENDED',
   62         'SUCCEEDED', 'FAILED', 'CANCELLED', 'WAITING_LIFECYCLE_COMPLETION'
   63     )
   64 
   65     # Signal commands
   66     COMMANDS = (
   67         SIG_CANCEL, SIG_SUSPEND, SIG_RESUME,
   68     ) = (
   69         'CANCEL', 'SUSPEND', 'RESUME',
   70     )
   71 
   72     def __new__(cls, target, action, ctx, **kwargs):
   73         if (cls != Action):
   74             return super(Action, cls).__new__(cls)
   75 
   76         target_type = action.split('_')[0]
   77         if target_type == 'CLUSTER':
   78             from senlin.engine.actions import cluster_action
   79             ActionClass = cluster_action.ClusterAction
   80         elif target_type == 'NODE':
   81             from senlin.engine.actions import node_action
   82             ActionClass = node_action.NodeAction
   83         else:
   84             from senlin.engine.actions import custom_action
   85             ActionClass = custom_action.CustomAction
   86 
   87         return super(Action, cls).__new__(ActionClass)
   88 
   89     def __init__(self, target, action, ctx, **kwargs):
   90         # context will be persisted into database so that any worker thread
   91         # can pick the action up and execute it on behalf of the initiator
   92 
   93         self.id = kwargs.get('id', None)
   94         self.name = kwargs.get('name', '')
   95 
   96         self.context = ctx
   97         self.user = ctx.user_id
   98         self.project = ctx.project_id
   99         self.domain = ctx.domain_id
  100 
  101         self.action = action
  102         self.target = target
  103 
  104         # Why this action is fired, it can be a UUID of another action
  105         self.cause = kwargs.get('cause', '')
  106 
  107         # Owner can be an UUID format ID for the worker that is currently
  108         # working on the action.  It also serves as a lock.
  109         self.owner = kwargs.get('owner', None)
  110 
  111         # An action may need to be executed repeatitively, interval is the
  112         # time in seconds between two consecutive execution.
  113         # A value of -1 indicates that this action is only to be executed once
  114         self.interval = kwargs.get('interval', -1)
  115 
  116         # Start time can be an absolute time or a time relative to another
  117         # action. E.g.
  118         #   - '2014-12-18 08:41:39.908569'
  119         #   - 'AFTER: 57292917-af90-4c45-9457-34777d939d4d'
  120         #   - 'WHEN: 0265f93b-b1d7-421f-b5ad-cb83de2f559d'
  121         self.start_time = kwargs.get('start_time', None)
  122         self.end_time = kwargs.get('end_time', None)
  123 
  124         # Timeout is a placeholder in case some actions may linger too long
  125         self.timeout = kwargs.get('timeout', cfg.CONF.default_action_timeout)
  126 
  127         # Return code, useful when action is not automatically deleted
  128         # after execution
  129         self.status = kwargs.get('status', self.INIT)
  130         self.status_reason = kwargs.get('status_reason', '')
  131 
  132         # All parameters are passed in using keyword arguments which is
  133         # a dictionary stored as JSON in DB
  134         self.inputs = kwargs.get('inputs', {})
  135         self.outputs = kwargs.get('outputs', {})
  136 
  137         self.created_at = kwargs.get('created_at', None)
  138         self.updated_at = kwargs.get('updated_at', None)
  139 
  140         self.data = kwargs.get('data', {})
  141 
  142     def store(self, ctx):
  143         """Store the action record into database table.
  144 
  145         :param ctx: An instance of the request context.
  146         :return: The ID of the stored object.
  147         """
  148 
  149         timestamp = timeutils.utcnow(True)
  150 
  151         values = {
  152             'name': self.name,
  153             'context': self.context.to_dict(),
  154             'target': self.target,
  155             'action': self.action,
  156             'cause': self.cause,
  157             'owner': self.owner,
  158             'interval': self.interval,
  159             'start_time': self.start_time,
  160             'end_time': self.end_time,
  161             'timeout': self.timeout,
  162             'status': self.status,
  163             'status_reason': self.status_reason,
  164             'inputs': self.inputs,
  165             'outputs': self.outputs,
  166             'created_at': self.created_at,
  167             'updated_at': self.updated_at,
  168             'data': self.data,
  169             'user': self.user,
  170             'project': self.project,
  171             'domain': self.domain,
  172         }
  173 
  174         if self.id:
  175             self.updated_at = timestamp
  176             values['updated_at'] = timestamp
  177             ao.Action.update(ctx, self.id, values)
  178         else:
  179             self.created_at = timestamp
  180             values['created_at'] = timestamp
  181             action = ao.Action.create(ctx, values)
  182             self.id = action.id
  183 
  184         return self.id
  185 
  186     @classmethod
  187     def _from_object(cls, obj):
  188         """Construct an action from database object.
  189 
  190         :param obj: a DB action object that contains all fields.
  191         :return: An `Action` object deserialized from the DB action object.
  192         """
  193         ctx = req_context.RequestContext.from_dict(obj.context)
  194         kwargs = {
  195             'id': obj.id,
  196             'name': obj.name,
  197             'cause': obj.cause,
  198             'owner': obj.owner,
  199             'interval': obj.interval,
  200             'start_time': obj.start_time,
  201             'end_time': obj.end_time,
  202             'timeout': obj.timeout,
  203             'status': obj.status,
  204             'status_reason': obj.status_reason,
  205             'inputs': obj.inputs or {},
  206             'outputs': obj.outputs or {},
  207             'created_at': obj.created_at,
  208             'updated_at': obj.updated_at,
  209             'data': obj.data,
  210         }
  211 
  212         target_type = obj.action.split('_')[0]
  213         if target_type == 'CLUSTER':
  214             from senlin.engine.actions import cluster_action
  215             ActionClass = cluster_action.ClusterAction
  216         elif target_type == 'NODE':
  217             from senlin.engine.actions import node_action
  218             ActionClass = node_action.NodeAction
  219         else:
  220             from senlin.engine.actions import custom_action
  221             ActionClass = custom_action.CustomAction
  222 
  223         return ActionClass(obj.target, obj.action, ctx, **kwargs)
  224 
  225     @classmethod
  226     def load(cls, ctx, action_id=None, db_action=None, project_safe=True):
  227         """Retrieve an action from database.
  228 
  229         :param ctx: Instance of request context.
  230         :param action_id: An UUID for the action to deserialize.
  231         :param db_action: An action object for the action to deserialize.
  232         :return: A `Action` object instance.
  233         """
  234         if db_action is None:
  235             db_action = ao.Action.get(ctx, action_id,
  236                                       project_safe=project_safe)
  237             if db_action is None:
  238                 raise exception.ResourceNotFound(type='action', id=action_id)
  239 
  240         return cls._from_object(db_action)
  241 
  242     @classmethod
  243     def create(cls, ctx, target, action, force=False, **kwargs):
  244         """Create an action object.
  245 
  246         :param ctx: The requesting context.
  247         :param target: The ID of the target cluster/node.
  248         :param action: Name of the action.
  249         :param force: Skip checking locks/conflicts
  250         :param dict kwargs: Other keyword arguments for the action.
  251         :return: ID of the action created.
  252         """
  253         if not force:
  254             cls._check_action_lock(target, action)
  255             cls._check_conflicting_actions(ctx, target, action)
  256 
  257         params = {
  258             'user_id': ctx.user_id,
  259             'project_id': ctx.project_id,
  260             'domain_id': ctx.domain_id,
  261             'is_admin': ctx.is_admin,
  262             'request_id': ctx.request_id,
  263             'trusts': ctx.trusts,
  264         }
  265         c = req_context.RequestContext.from_dict(params)
  266 
  267         if action in consts.CLUSTER_SCALE_ACTIONS:
  268             Action.validate_scaling_action(c, target, action)
  269 
  270         obj = cls(target, action, c, **kwargs)
  271         return obj.store(ctx)
  272 
  273     @staticmethod
  274     def _check_action_lock(target, action):
  275         if action in consts.LOCK_BYPASS_ACTIONS:
  276             return
  277         elif (action in list(consts.CLUSTER_ACTION_NAMES) and
  278                 cl.ClusterLock.is_locked(target)):
  279             raise exception.ResourceIsLocked(
  280                 action=action, type='cluster', id=target)
  281         elif (action in list(consts.NODE_ACTION_NAMES) and
  282               nl.NodeLock.is_locked(target)):
  283             raise exception.ResourceIsLocked(
  284                 action=action, type='node', id=target)
  285 
  286     @staticmethod
  287     def _check_conflicting_actions(ctx, target, action):
  288         conflict_actions = ao.Action.get_all_active_by_target(ctx, target)
  289         # Ignore conflicting actions on deletes.
  290         if not conflict_actions or action in consts.CONFLICT_BYPASS_ACTIONS:
  291             return
  292         else:
  293             action_ids = [a['id'] for a in conflict_actions]
  294             raise exception.ActionConflict(
  295                 type=action, target=target, actions=",".join(action_ids))
  296 
  297     @classmethod
  298     def delete(cls, ctx, action_id):
  299         """Delete an action from database.
  300 
  301         :param ctx: An instance of the request context.
  302         :param action_id: The UUID of the target action to be deleted.
  303         :return: Nothing.
  304         """
  305         ao.Action.delete(ctx, action_id)
  306 
  307     def signal(self, cmd):
  308         """Send a signal to the action.
  309 
  310         :param cmd: One of the command word defined in self.COMMANDS.
  311         :returns: None
  312         """
  313         if cmd not in self.COMMANDS:
  314             return
  315 
  316         if cmd == self.SIG_CANCEL:
  317             expected = (self.INIT, self.WAITING, self.READY, self.RUNNING,
  318                         self.WAITING_LIFECYCLE_COMPLETION)
  319         elif cmd == self.SIG_SUSPEND:
  320             expected = (self.RUNNING)
  321         else:  # SIG_RESUME
  322             expected = (self.SUSPENDED)
  323 
  324         if self.status not in expected:
  325             LOG.info("Action (%(id)s) is in status (%(actual)s) while "
  326                      "expected status must be one of (%(expected)s).",
  327                      dict(id=self.id[:8], expected=expected,
  328                           actual=self.status))
  329             return
  330 
  331         ao.Action.signal(self.context, self.id, cmd)
  332 
  333     def signal_cancel(self):
  334         """Signal the action and any depended actions to cancel.
  335 
  336         If the action or any depended actions are in status
  337         'WAITING_LIFECYCLE_COMPLETION' or 'INIT' update the status to cancelled
  338          directly.
  339 
  340         :raises: `ActionImmutable` if the action is in an unchangeable state
  341         """
  342         expected = (self.INIT, self.WAITING, self.READY, self.RUNNING,
  343                     self.WAITING_LIFECYCLE_COMPLETION)
  344 
  345         if self.status not in expected:
  346             raise exception.ActionImmutable(id=self.id[:8], expected=expected,
  347                                             actual=self.status)
  348 
  349         ao.Action.signal(self.context, self.id, self.SIG_CANCEL)
  350 
  351         if self.status in (self.WAITING_LIFECYCLE_COMPLETION, self.INIT):
  352             self.set_status(self.RES_CANCEL, 'Action execution cancelled')
  353 
  354         depended = dobj.Dependency.get_depended(self.context, self.id)
  355         if not depended:
  356             return
  357 
  358         for child in depended:
  359             # Try to cancel all dependant actions
  360             action = self.load(self.context, action_id=child)
  361             if not action.is_cancelled():
  362                 ao.Action.signal(self.context, child, self.SIG_CANCEL)
  363             # If the action is in WAITING_LIFECYCLE_COMPLETION or INIT update
  364             # the status to CANCELLED immediately.
  365             if action.status in (action.WAITING_LIFECYCLE_COMPLETION,
  366                                  action.INIT):
  367                 action.set_status(action.RES_CANCEL,
  368                                   'Action execution cancelled')
  369 
  370     def force_cancel(self):
  371         """Force the action and any depended actions to cancel.
  372 
  373         If the action or any depended actions are in status 'INIT', 'WAITING',
  374         'READY', 'RUNNING', or 'WAITING_LIFECYCLE_COMPLETION' immediately
  375         update their status to cancelled. This should only be used if an action
  376         is stuck/dead and has no expectation of ever completing.
  377 
  378         :raises: `ActionImmutable` if the action is in an unchangeable state
  379         """
  380         expected = (self.INIT, self.WAITING, self.READY, self.RUNNING,
  381                     self.WAITING_LIFECYCLE_COMPLETION)
  382         if self.status not in expected:
  383             raise exception.ActionImmutable(id=self.id[:8], expected=expected,
  384                                             actual=self.status)
  385         LOG.debug('Forcing action %s to cancel.', self.id)
  386         self.set_status(self.RES_CANCEL, 'Action execution force cancelled')
  387 
  388         depended = dobj.Dependency.get_depended(self.context, self.id)
  389         if not depended:
  390             return
  391 
  392         for child in depended:
  393             # Force cancel all dependant actions
  394             action = self.load(self.context, action_id=child)
  395             if action.status in (action.INIT, action.WAITING, action.READY,
  396                                  action.RUNNING,
  397                                  action.WAITING_LIFECYCLE_COMPLETION):
  398                 LOG.debug('Forcing action %s to cancel.', action.id)
  399                 action.set_status(action.RES_CANCEL,
  400                                   'Action execution force cancelled')
  401 
  402     def execute(self, **kwargs):
  403         """Execute the action.
  404 
  405         In theory, the action encapsulates all information needed for
  406         execution.  'kwargs' may specify additional parameters.
  407         :param kwargs: additional parameters that may override the default
  408                        properties stored in the action record.
  409         """
  410         raise NotImplementedError
  411 
  412     def set_status(self, result, reason=None):
  413         """Set action status based on return value from execute."""
  414 
  415         timestamp = wallclock()
  416 
  417         if result == self.RES_OK:
  418             status = self.SUCCEEDED
  419             ao.Action.mark_succeeded(self.context, self.id, timestamp)
  420 
  421         elif result == self.RES_ERROR:
  422             status = self.FAILED
  423             ao.Action.mark_failed(self.context, self.id, timestamp,
  424                                   reason or 'ERROR')
  425 
  426         elif result == self.RES_TIMEOUT:
  427             status = self.FAILED
  428             ao.Action.mark_failed(self.context, self.id, timestamp,
  429                                   reason or 'TIMEOUT')
  430 
  431         elif result == self.RES_CANCEL:
  432             status = self.CANCELLED
  433             ao.Action.mark_cancelled(self.context, self.id, timestamp)
  434 
  435         else:  # result == self.RES_RETRY:
  436             retries = self.data.get('retries', 0)
  437             # Action failed at the moment, but can be retried
  438             # retries time is configurable
  439             if retries < cfg.CONF.lock_retry_times:
  440                 status = self.READY
  441                 retries += 1
  442 
  443                 self.data.update({'retries': retries})
  444                 ao.Action.abandon(self.context, self.id, {'data': self.data})
  445                 # sleep for a while
  446                 eventlet.sleep(cfg.CONF.lock_retry_interval)
  447                 dispatcher.start_action(self.id)
  448             else:
  449                 status = self.RES_ERROR
  450                 if not reason:
  451                     reason = ('Exceeded maximum number of retries (%d)'
  452                               '') % cfg.CONF.lock_retry_times
  453                 ao.Action.mark_failed(self.context, self.id, timestamp, reason)
  454 
  455         if status == self.SUCCEEDED:
  456             EVENT.info(self, consts.PHASE_END, reason or 'SUCCEEDED')
  457         elif status == self.READY:
  458             EVENT.warning(self, consts.PHASE_ERROR, reason or 'RETRY')
  459         else:
  460             EVENT.error(self, consts.PHASE_ERROR, reason or 'ERROR')
  461 
  462         self.status = status
  463         self.status_reason = reason
  464 
  465     def get_status(self):
  466         timestamp = wallclock()
  467         status = ao.Action.check_status(self.context, self.id, timestamp)
  468         self.status = status
  469         return status
  470 
  471     def is_timeout(self, timeout=None):
  472         if timeout is None:
  473             timeout = self.timeout
  474         if self.start_time is None:
  475             return False
  476         time_elapse = wallclock() - self.start_time
  477         return time_elapse > timeout
  478 
  479     def _check_signal(self):
  480         # Check timeout first, if true, return timeout message
  481         if self.timeout is not None and self.is_timeout():
  482             EVENT.debug(self, consts.PHASE_ERROR, 'TIMEOUT')
  483             return self.RES_TIMEOUT
  484 
  485         result = ao.Action.signal_query(self.context, self.id)
  486         return result
  487 
  488     def is_cancelled(self):
  489         return self._check_signal() == self.SIG_CANCEL
  490 
  491     def is_suspended(self):
  492         return self._check_signal() == self.SIG_SUSPEND
  493 
  494     def is_resumed(self):
  495         return self._check_signal() == self.SIG_RESUME
  496 
  497     def _check_result(self, name):
  498         """Check policy status and generate event.
  499 
  500         :param name: Name of policy checked
  501         :return: True if the policy checking can be continued, or False if the
  502                  policy checking should be aborted.
  503         """
  504         reason = self.data['reason']
  505         if self.data['status'] == policy_mod.CHECK_OK:
  506             return True
  507 
  508         self.data['reason'] = ("Failed policy '%(name)s': %(reason)s"
  509                                ) % {'name': name, 'reason': reason}
  510         return False
  511 
  512     def policy_check(self, cluster_id, target):
  513         """Check all policies attached to cluster and give result.
  514 
  515         :param cluster_id: The ID of the cluster to which the policy is
  516             attached.
  517         :param target: A tuple of ('when', action_name)
  518         :return: A dictionary that contains the check result.
  519         """
  520 
  521         if target not in ['BEFORE', 'AFTER']:
  522             return
  523 
  524         bindings = cpo.ClusterPolicy.get_all(self.context, cluster_id,
  525                                              sort='priority',
  526                                              filters={'enabled': True})
  527         # default values
  528         self.data['status'] = policy_mod.CHECK_OK
  529         self.data['reason'] = 'Completed policy checking.'
  530 
  531         for pb in bindings:
  532             policy = policy_mod.Policy.load(self.context, pb.policy_id)
  533 
  534             # add last_op as input for the policy so that it can be used
  535             # during pre_op
  536             self.inputs['last_op'] = pb.last_op
  537 
  538             if not policy.need_check(target, self):
  539                 continue
  540 
  541             if target == 'BEFORE':
  542                 method = getattr(policy, 'pre_op', None)
  543             else:  # target == 'AFTER'
  544                 method = getattr(policy, 'post_op', None)
  545 
  546             if method is not None:
  547                 method(cluster_id, self)
  548 
  549             res = self._check_result(policy.name)
  550             if res is False:
  551                 return
  552         return
  553 
  554     @staticmethod
  555     def validate_scaling_action(ctx, cluster_id, action):
  556         """Validate scaling action against actions table and policy cooldown.
  557 
  558         :param ctx: An instance of the request context.
  559         :param cluster_id: ID of the cluster the scaling action is targeting.
  560         :param action: Scaling action being validated.
  561         :return: None
  562         :raises: An exception of ``ActionCooldown`` when the action being
  563         validated is still in cooldown based off the policy or
  564         ``ActionConflict`` when a scaling action is already in the action
  565         table.
  566         """
  567         # Check for conflicting actions in the actions table.
  568         conflicting_actions = Action._get_conflicting_scaling_actions(
  569             ctx, cluster_id)
  570         if conflicting_actions:
  571             action_ids = [a.get('id', None) for a in conflicting_actions]
  572             LOG.info("Unable to process %(action)s for cluster %(cluster_id)s "
  573                      "the action conflicts with %(conflicts)s",
  574                      {'action': action,
  575                       'cluster_id': cluster_id,
  576                       'conflicts': action_ids})
  577             raise exception.ActionConflict(
  578                 type=action,
  579                 target=cluster_id,
  580                 actions=",".join(action_ids))
  581 
  582         # Check to see if action cooldown should be observed.
  583         bindings = cpo.ClusterPolicy.get_all(ctx, cluster_id,
  584                                              sort='priority',
  585                                              filters={'enabled': True})
  586         for pb in bindings:
  587             policy = policy_mod.Policy.load(ctx, pb.policy_id)
  588             if getattr(policy, 'cooldown', None) and policy.event == action:
  589                 if pb.last_op and not timeutils.is_older_than(
  590                         pb.last_op, policy.cooldown):
  591                     LOG.info("Unable to process %(action)s for cluster "
  592                              "%(cluster_id)s the actions policy %(policy)s "
  593                              "cooldown still in progress",
  594                              {'action': action,
  595                               'cluster_id': cluster_id,
  596                               'policy': pb.policy_id})
  597                     raise exception.ActionCooldown(
  598                         type=action,
  599                         cluster=cluster_id,
  600                         policy_id=pb.policy_id)
  601         return
  602 
  603     @staticmethod
  604     def _get_conflicting_scaling_actions(ctx, cluster_id):
  605         """Check actions table for conflicting scaling actions.
  606 
  607         :param ctx: An instance of the request context.
  608         :param cluster_id: ID of the cluster the scaling action is targeting.
  609         :return: A list of conflicting actions.
  610         """
  611         scaling_actions = ao.Action.action_list_active_scaling(
  612             ctx, cluster_id)
  613         if scaling_actions:
  614             return [a.to_dict() for a in scaling_actions]
  615         else:
  616             return None
  617 
  618     def to_dict(self):
  619         if self.id:
  620             dep_on = dobj.Dependency.get_depended(self.context, self.id)
  621             dep_by = dobj.Dependency.get_dependents(self.context, self.id)
  622         else:
  623             dep_on = []
  624             dep_by = []
  625         action_dict = {
  626             'id': self.id,
  627             'name': self.name,
  628             'action': self.action,
  629             'target': self.target,
  630             'cause': self.cause,
  631             'owner': self.owner,
  632             'interval': self.interval,
  633             'start_time': self.start_time,
  634             'end_time': self.end_time,
  635             'timeout': self.timeout,
  636             'status': self.status,
  637             'status_reason': self.status_reason,
  638             'inputs': self.inputs,
  639             'outputs': self.outputs,
  640             'depends_on': dep_on,
  641             'depended_by': dep_by,
  642             'created_at': utils.isotime(self.created_at),
  643             'updated_at': utils.isotime(self.updated_at),
  644             'data': self.data,
  645             'user': self.user,
  646             'project': self.project,
  647         }
  648         return action_dict
  649 
  650 
  651 def ActionProc(ctx, action_id):
  652     """Action process."""
  653 
  654     # Step 1: materialize the action object
  655     action = Action.load(ctx, action_id=action_id, project_safe=False)
  656     if action is None:
  657         LOG.error('Action "%s" could not be found.', action_id)
  658         return False
  659 
  660     if action.is_cancelled():
  661         reason = '%(action)s [%(id)s] cancelled' % {
  662             'action': action.action, 'id': action.id[:8]}
  663         action.set_status(action.RES_CANCEL, reason)
  664         LOG.info(reason)
  665         return True
  666 
  667     EVENT.info(action, consts.PHASE_START, action_id[:8])
  668 
  669     reason = 'Action completed'
  670     success = True
  671     try:
  672         # Step 2: execute the action
  673         result, reason = action.execute()
  674         if result == action.RES_RETRY:
  675             success = False
  676     except Exception as ex:
  677         # We catch exception here to make sure the following logics are
  678         # executed.
  679         result = action.RES_ERROR
  680         reason = six.text_type(ex)
  681         LOG.exception('Unexpected exception occurred during action '
  682                       '%(action)s (%(id)s) execution: %(reason)s',
  683                       {'action': action.action, 'id': action.id,
  684                        'reason': reason})
  685         success = False
  686     finally:
  687         # NOTE: locks on action is eventually released here by status update
  688         action.set_status(result, reason)
  689 
  690     return success