"Fossies" - the Fresh Open Source Software Archive

Member "senlin-8.0.0/senlin/engine/node.py" (16 Oct 2019, 17329 Bytes) of package /linux/misc/openstack/senlin-8.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "node.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 7.0.0_vs_8.0.0.

    1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 # not use this file except in compliance with the License. You may obtain
    3 # a copy of the License at
    4 #
    5 #         http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 # License for the specific language governing permissions and limitations
   11 # under the License.
   12 
   13 from oslo_log import log as logging
   14 from oslo_serialization import jsonutils
   15 from oslo_utils import timeutils
   16 import six
   17 
   18 from senlin.common import consts
   19 from senlin.common import exception as exc
   20 from senlin.common import utils
   21 from senlin.objects import node as no
   22 from senlin.profiles import base as pb
   23 
   24 LOG = logging.getLogger(__name__)
   25 
   26 
   27 class Node(object):
   28     """A node is an object that can belong to at most one single cluster.
   29 
   30     All operations are performed without further checking because the
   31     checkings are supposed to be done before/after/during an action is
   32     executed.
   33     """
   34 
   35     def __init__(self, name, profile_id, cluster_id=None, context=None,
   36                  **kwargs):
   37         self.id = kwargs.get('id', None)
   38         if name:
   39             self.name = name
   40         else:
   41             self.name = 'node-' + utils.random_name(8)
   42 
   43         # This is a safe guard to ensure that we have orphan node's cluster
   44         # correctly set to an empty string
   45         if cluster_id is None:
   46             cluster_id = ''
   47 
   48         self.physical_id = kwargs.get('physical_id', None)
   49         self.profile_id = profile_id
   50         self.user = kwargs.get('user', '')
   51         self.project = kwargs.get('project', '')
   52         self.domain = kwargs.get('domain', '')
   53         self.cluster_id = cluster_id
   54         self.index = kwargs.get('index', -1)
   55         self.role = kwargs.get('role', '')
   56 
   57         self.init_at = kwargs.get('init_at', None)
   58         self.created_at = kwargs.get('created_at', None)
   59         self.updated_at = kwargs.get('updated_at', None)
   60 
   61         self.status = kwargs.get('status', consts.NS_INIT)
   62         self.status_reason = kwargs.get('status_reason', 'Initializing')
   63         self.data = kwargs.get('data', {})
   64         self.metadata = kwargs.get('metadata', {})
   65         self.dependents = kwargs.get('dependents', {})
   66         self.rt = {}
   67 
   68         if context is not None:
   69             if self.user == '':
   70                 self.user = context.user_id
   71             if self.project == '':
   72                 self.project = context.project_id
   73             if self.domain == '':
   74                 self.domain = context.domain_id
   75             self._load_runtime_data(context)
   76 
   77     def _load_runtime_data(self, context):
   78         profile = None
   79         try:
   80             profile = pb.Profile.load(context, profile_id=self.profile_id,
   81                                       project_safe=False)
   82         except exc.ResourceNotFound:
   83             LOG.debug('Profile not found: %s', self.profile_id)
   84 
   85         self.rt = {'profile': profile}
   86 
   87     def store(self, context):
   88         """Store the node into database table.
   89 
   90         The invocation of object API could be a node_create or a node_update,
   91         depending on whether node has an ID assigned.
   92 
   93         @param context: Request context for node creation.
   94         @return: UUID of node created.
   95         """
   96         values = {
   97             'name': self.name,
   98             'physical_id': self.physical_id,
   99             'cluster_id': self.cluster_id,
  100             'profile_id': self.profile_id,
  101             'user': self.user,
  102             'project': self.project,
  103             'domain': self.domain,
  104             'index': self.index,
  105             'role': self.role,
  106             'init_at': self.init_at,
  107             'created_at': self.created_at,
  108             'updated_at': self.updated_at,
  109             'status': self.status,
  110             'status_reason': self.status_reason,
  111             'meta_data': self.metadata,
  112             'data': self.data,
  113             'dependents': self.dependents,
  114         }
  115 
  116         if self.id:
  117             no.Node.update(context, self.id, values)
  118         else:
  119             init_at = timeutils.utcnow(True)
  120             self.init_at = init_at
  121             values['init_at'] = init_at
  122             node = no.Node.create(context, values)
  123             self.id = node.id
  124 
  125         self._load_runtime_data(context)
  126         return self.id
  127 
  128     @classmethod
  129     def _from_object(cls, context, obj):
  130         """Construct a node from node object.
  131 
  132         @param context: the context used for DB operations;
  133         @param obj: a node object that contains all fields;
  134         """
  135         kwargs = {
  136             'id': obj.id,
  137             'physical_id': obj.physical_id,
  138             'user': obj.user,
  139             'project': obj.project,
  140             'domain': obj.domain,
  141             'index': obj.index,
  142             'role': obj.role,
  143             'init_at': obj.init_at,
  144             'created_at': obj.created_at,
  145             'updated_at': obj.updated_at,
  146             'status': obj.status,
  147             'status_reason': obj.status_reason,
  148             'data': obj.data,
  149             'metadata': obj.metadata,
  150             'dependents': obj.dependents,
  151         }
  152 
  153         return cls(obj.name, obj.profile_id, obj.cluster_id,
  154                    context=context, **kwargs)
  155 
  156     @classmethod
  157     def load(cls, context, node_id=None, db_node=None, project_safe=True):
  158         """Retrieve a node from database."""
  159         if db_node is None:
  160             db_node = no.Node.get(context, node_id, project_safe=project_safe)
  161             if db_node is None:
  162                 raise exc.ResourceNotFound(type='node', id=node_id)
  163 
  164         return cls._from_object(context, db_node)
  165 
  166     @classmethod
  167     def load_all(cls, context, cluster_id=None, limit=None, marker=None,
  168                  sort=None, filters=None, project_safe=True):
  169         """Retrieve all nodes of from database."""
  170         objs = no.Node.get_all(context, cluster_id=cluster_id,
  171                                filters=filters, sort=sort,
  172                                limit=limit, marker=marker,
  173                                project_safe=project_safe)
  174 
  175         for obj in objs:
  176             node = cls._from_object(context, obj)
  177             yield node
  178 
  179     def set_status(self, context, status, reason=None, **params):
  180         """Set status of the node.
  181 
  182         :param context: The request context.
  183         :param status: New status for the node.
  184         :param reason: The reason that leads the node to its current status.
  185         :param kwargs params: Other properties that need an update.
  186         :returns: ``None``.
  187         """
  188         values = {}
  189         now = timeutils.utcnow(True)
  190         if status == consts.NS_ACTIVE and self.status == consts.NS_CREATING:
  191             self.created_at = values['created_at'] = now
  192         if status not in [consts.NS_CREATING, consts.NS_UPDATING,
  193                           consts.NS_RECOVERING, consts.NS_OPERATING]:
  194             self.updated_at = values['updated_at'] = now
  195 
  196         self.status = status
  197         values['status'] = status
  198         if reason:
  199             self.status_reason = reason
  200             values['status_reason'] = reason
  201         for p, v in params.items():
  202             setattr(self, p, v)
  203             values[p] = v
  204         no.Node.update(context, self.id, values)
  205 
  206     def get_details(self, context):
  207         if not self.physical_id:
  208             return {}
  209         return pb.Profile.get_details(context, self)
  210 
  211     def do_create(self, context):
  212         if self.status != consts.NS_INIT:
  213             LOG.error('Node is in status "%s"', self.status)
  214             self.set_status(context, consts.NS_ERROR,
  215                             'Node must be in INIT status')
  216             return False, 'Node must be in INIT status'
  217 
  218         self.set_status(context, consts.NS_CREATING, 'Creation in progress')
  219         try:
  220             physical_id = pb.Profile.create_object(context, self)
  221         except exc.EResourceCreation as ex:
  222             physical_id = ex.resource_id
  223             self.set_status(context, consts.NS_ERROR, six.text_type(ex),
  224                             physical_id=physical_id)
  225             return False, str(ex)
  226 
  227         self.set_status(context, consts.NS_ACTIVE, 'Creation succeeded',
  228                         physical_id=physical_id)
  229         return True, None
  230 
  231     def do_delete(self, context):
  232         self.set_status(context, consts.NS_DELETING, 'Deletion in progress')
  233         try:
  234             # The following operation always return True unless exception
  235             # is thrown
  236             pb.Profile.delete_object(context, self)
  237         except exc.EResourceDeletion as ex:
  238             self.set_status(context, consts.NS_ERROR, six.text_type(ex))
  239             return False
  240 
  241         no.Node.delete(context, self.id)
  242         return True
  243 
  244     def do_update(self, context, params):
  245         """Update a node's property.
  246 
  247         This function is supposed to be invoked from a NODE_UPDATE action.
  248         :param dict params: parameters in a dictionary that may contain keys
  249                             like 'new_profile_id', 'name', 'role', 'metadata'.
  250         """
  251         if not self.physical_id:
  252             return False
  253 
  254         self.set_status(context, consts.NS_UPDATING, 'Update in progress')
  255 
  256         new_profile_id = params.pop('new_profile_id', None)
  257         res = True
  258         if new_profile_id:
  259             try:
  260                 res = pb.Profile.update_object(context, self, new_profile_id,
  261                                                **params)
  262             except exc.EResourceUpdate as ex:
  263                 self.set_status(context, consts.NS_ERROR, six.text_type(ex))
  264                 return False
  265 
  266         # update was not successful
  267         if not res:
  268             return False
  269 
  270         props = dict([(k, v) for k, v in params.items()
  271                       if k in ('name', 'role', 'metadata')])
  272         if new_profile_id:
  273             props['profile_id'] = new_profile_id
  274             self.rt['profile'] = pb.Profile.load(context,
  275                                                  profile_id=new_profile_id)
  276 
  277         self.set_status(context, consts.NS_ACTIVE, 'Update succeeded', **props)
  278 
  279         return True
  280 
  281     def do_join(self, context, cluster_id):
  282         if self.cluster_id == cluster_id:
  283             return True
  284 
  285         try:
  286             res = pb.Profile.join_cluster(context, self, cluster_id)
  287         except exc.EResourceUpdate as ex:
  288             LOG.error('Node join cluster faild: %s.', ex)
  289             return False
  290 
  291         if not res:
  292             return False
  293         timestamp = timeutils.utcnow(True)
  294         db_node = no.Node.migrate(context, self.id, cluster_id, timestamp)
  295         self.cluster_id = cluster_id
  296         self.updated_at = timestamp
  297         self.index = db_node.index
  298         return True
  299 
  300     def do_leave(self, context):
  301         if self.cluster_id == '':
  302             return True
  303 
  304         try:
  305             res = pb.Profile.leave_cluster(context, self)
  306         except exc.EResourceDeletion as ex:
  307             LOG.error('Node leave cluster faild: %s.', ex)
  308             return False
  309 
  310         if not res:
  311             return False
  312         timestamp = timeutils.utcnow(True)
  313         no.Node.migrate(context, self.id, None, timestamp)
  314         self.cluster_id = ''
  315         self.updated_at = timestamp
  316         self.index = -1
  317         return True
  318 
  319     def do_check(self, context):
  320         if not self.physical_id:
  321             return False
  322 
  323         try:
  324             res = pb.Profile.check_object(context, self)
  325         except exc.EServerNotFound as ex:
  326             self.set_status(context, consts.NS_ERROR, six.text_type(ex),
  327                             physical_id=None)
  328             return True
  329         except exc.EResourceOperation as ex:
  330             self.set_status(context, consts.NS_ERROR, six.text_type(ex))
  331             return False
  332 
  333         # Physical object is ACTIVE but for some reason the node status in
  334         # senlin was WARNING. We only update the status_reason
  335         if res:
  336             if self.status == consts.NS_WARNING:
  337                 msg = ("Check: Physical object is ACTIVE but the node status "
  338                        "was WARNING. %s") % self.status_reason
  339                 self.set_status(context, consts.NS_WARNING, msg)
  340                 return True
  341 
  342             self.set_status(context, consts.NS_ACTIVE,
  343                             "Check: Node is ACTIVE.")
  344         else:
  345             self.set_status(context, consts.NS_ERROR,
  346                             "Check: Node is not ACTIVE.")
  347 
  348         return True
  349 
  350     def do_healthcheck(self, context):
  351         """health check a node.
  352 
  353         This function is supposed to be invoked from the health manager to
  354         check the health of a given node
  355         :param context: The request context of the action.
  356         :returns: True if node is healthy. False otherwise.
  357         """
  358 
  359         return pb.Profile.healthcheck_object(context, self)
  360 
  361     def do_recover(self, context, action):
  362         """recover a node.
  363 
  364         This function is supposed to be invoked from a NODE_RECOVER action.
  365         :param context: The request context of the action.
  366         :param dict options: A map containing the recovery actions (with
  367             parameters if any) and fencing settings.
  368         """
  369         options = action.inputs
  370 
  371         operation = options.get('operation', None)
  372 
  373         if (not self.physical_id and operation and
  374                 (operation.upper() == consts.RECOVER_REBOOT or
  375                  operation.upper() == consts.RECOVER_REBUILD)):
  376             # physical id is required for REBOOT or REBUILD operations
  377             LOG.warning('Recovery failed because node has no physical id'
  378                         ' was provided for reboot or rebuild operation.')
  379             return False
  380 
  381         if options.get('check', False):
  382             res = False
  383             try:
  384                 res = pb.Profile.check_object(context, self)
  385             except exc.EResourceOperation:
  386                 pass
  387 
  388             if res:
  389                 self.set_status(context, consts.NS_ACTIVE,
  390                                 reason="Recover: Node is ACTIVE.")
  391                 return True
  392 
  393         self.set_status(context, consts.NS_RECOVERING,
  394                         reason='Recovery in progress')
  395 
  396         try:
  397             physical_id, status = pb.Profile.recover_object(context,
  398                                                             self, **options)
  399         except exc.EResourceOperation as ex:
  400             physical_id = ex.resource_id
  401             self.set_status(context, consts.NS_ERROR, reason=six.text_type(ex),
  402                             physical_id=physical_id)
  403             return False
  404 
  405         if not status:
  406             self.set_status(context, consts.NS_ERROR, reason='Recovery failed')
  407             return False
  408 
  409         params = {}
  410         if physical_id and self.physical_id != physical_id:
  411             self.data['recovery'] = consts.RECOVER_RECREATE
  412             params['data'] = self.data
  413             params['physical_id'] = physical_id
  414         self.set_status(context, consts.NS_ACTIVE,
  415                         reason='Recovery succeeded', **params)
  416 
  417         return True
  418 
  419     def do_operation(self, context, **inputs):
  420         """Perform an operation on a node.
  421 
  422         :param context: The request context.
  423         :param dict inputs: The operation and parameters if any.
  424         :returns: A boolean indicating whether the operation was a success.
  425         """
  426         if not self.physical_id:
  427             return False
  428 
  429         op = inputs['operation']
  430         params = inputs.get('params', {})
  431         self.set_status(context, consts.NS_OPERATING,
  432                         reason="Operation '%s' in progress" % op)
  433 
  434         try:
  435             profile = self.rt['profile']
  436             method = getattr(profile, 'handle_%s' % op)
  437             method(self, **params)
  438         except exc.EResourceOperation as ex:
  439             LOG.error('Node operation %s failed: %s.', op, ex)
  440             self.set_status(context, consts.NS_ERROR, reason=six.text_type(ex))
  441             return False
  442 
  443         self.set_status(context, consts.NS_ACTIVE,
  444                         reason="Operation '%s' succeeded" % op)
  445         return True
  446 
  447     def run_workflow(self, **options):
  448         if not self.physical_id:
  449             return False
  450 
  451         workflow_name = options.pop('workflow_name')
  452         inputs = options.pop('inputs')
  453         definition = inputs.pop('definition', None)
  454         params = {
  455             'cluster_id': self.cluster_id,
  456             'node_id': self.physical_id,
  457         }
  458         params.update(inputs)
  459 
  460         try:
  461             profile = self.rt['profile']
  462             wfc = profile.workflow(self)
  463             workflow = wfc.workflow_find(workflow_name)
  464             if workflow is None:
  465                 wfc.workflow_create(definition, scope="private")
  466             else:
  467                 definition = workflow.definition
  468             inputs_str = jsonutils.dumps(params)
  469             wfc.execution_create(workflow_name, str(inputs_str))
  470         except exc.InternalError as ex:
  471             raise exc.EResourceOperation(op='executing', type='workflow',
  472                                          id=workflow_name,
  473                                          message=six.text_type(ex))
  474         return True