"Fossies" - the Fresh Open Source Software Archive

Member "masakari-9.0.0/masakari/engine/manager.py" (13 May 2020, 17517 Bytes) of package /linux/misc/openstack/masakari-9.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "manager.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 8.0.0_vs_9.0.0.

    1 # Copyright 2016 NTT DATA
    2 # All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 
   16 """Handles all processes relating to notifications.
   17 
   18 The :py:class:`MasakariManager` class is a
   19 :py:class:`masakari.manager.Manager` that handles RPC calls relating to
   20 notifications. It is responsible for processing notifications and executing
   21 workflows.
   22 
   23 """
   24 import traceback
   25 
   26 from oslo_log import log as logging
   27 import oslo_messaging as messaging
   28 from oslo_service import periodic_task
   29 from oslo_utils import timeutils
   30 
   31 import masakari.conf
   32 from masakari.engine import driver
   33 from masakari.engine import instance_events as virt_events
   34 from masakari.engine import rpcapi
   35 from masakari.engine import utils as engine_utils
   36 from masakari import exception
   37 from masakari.i18n import _
   38 from masakari import manager
   39 from masakari import objects
   40 from masakari.objects import fields
   41 from masakari import utils
   42 
   43 CONF = masakari.conf.CONF
   44 
   45 LOG = logging.getLogger(__name__)
   46 
   47 
   48 def update_host_method(context, host_name, reserved=False):
   49     reserved_host = objects.Host.get_by_name(context, host_name)
   50     reserved_host.reserved = reserved
   51     reserved_host.save()
   52 
   53 
   54 class MasakariManager(manager.Manager):
   55     """Manages the running notifications"""
   56     RPC_API_VERSION = rpcapi.EngineAPI.RPC_API_VERSION
   57     target = messaging.Target(version=RPC_API_VERSION)
   58 
   59     def __init__(self, masakari_driver=None, *args, **kwargs):
   60         """Load configuration options"""
   61         LOG.debug("Initializing Masakari Manager.")
   62         super(MasakariManager, self).__init__(service_name="engine",
   63                                              *args, **kwargs)
   64 
   65         self.driver = driver.load_masakari_driver(masakari_driver)
   66 
   67     def _handle_notification_type_process(self, context, notification):
   68         notification_status = fields.NotificationStatus.FINISHED
   69         notification_event = notification.payload.get('event')
   70         process_name = notification.payload.get('process_name')
   71         exception_info = None
   72 
   73         if notification_event.upper() == 'STARTED':
   74             LOG.info("Notification type '%(type)s' received for host "
   75                      "'%(host_uuid)s': '%(process_name)s' has been "
   76                      "%(event)s.",
   77                      {'type': notification.type,
   78                       'host_uuid': notification.source_host_uuid,
   79                       'process_name': process_name,
   80                       'event': notification_event})
   81         elif notification_event.upper() == 'STOPPED':
   82             host_obj = objects.Host.get_by_uuid(
   83                 context, notification.source_host_uuid)
   84             host_name = host_obj.name
   85 
   86             # Mark host on_maintenance mode as True
   87             update_data = {
   88                 'on_maintenance': True,
   89             }
   90             host_obj.update(update_data)
   91             host_obj.save()
   92 
   93             try:
   94                 self.driver.execute_process_failure(
   95                     context, process_name, host_name,
   96                     notification.notification_uuid)
   97             except exception.SkipProcessRecoveryException as e:
   98                 notification_status = fields.NotificationStatus.FINISHED
   99             except (exception.MasakariException,
  100                     exception.ProcessRecoveryFailureException) as e:
  101                 notification_status = fields.NotificationStatus.ERROR
  102                 LOG.error("Failed to process notification '%(uuid)s'."
  103                           " Reason: %(error)s",
  104                           {"uuid": notification.notification_uuid,
  105                            "error": e.message})
  106                 exception_info = e
  107         else:
  108             LOG.warning("Invalid event: %(event)s received for "
  109                         "notification type: %(notification_type)s",
  110                         {'event': notification_event,
  111                          'notification_type': notification.type})
  112             notification_status = fields.NotificationStatus.IGNORED
  113 
  114         if exception_info:
  115             tb = traceback.format_exc()
  116             engine_utils.notify_about_notification_update(context,
  117                 notification,
  118                 action=fields.EventNotificationAction.NOTIFICATION_PROCESS,
  119                 phase=fields.EventNotificationPhase.ERROR,
  120                 exception=str(exception_info),
  121                 tb=tb)
  122         else:
  123             engine_utils.notify_about_notification_update(context,
  124                 notification,
  125                 action=fields.EventNotificationAction.NOTIFICATION_PROCESS,
  126                 phase=fields.EventNotificationPhase.END)
  127 
  128         return notification_status
  129 
  130     def _handle_notification_type_instance(self, context, notification):
  131         if not virt_events.is_valid_event(notification.payload):
  132             LOG.info("Notification '%(uuid)s' received with payload "
  133                      "%(payload)s is ignored.",
  134                      {"uuid": notification.notification_uuid,
  135                       "payload": notification.payload})
  136             return fields.NotificationStatus.IGNORED
  137 
  138         notification_status = fields.NotificationStatus.FINISHED
  139         exception_info = None
  140         try:
  141             self.driver.execute_instance_failure(
  142                 context, notification.payload.get('instance_uuid'),
  143                 notification.notification_uuid)
  144         except exception.IgnoreInstanceRecoveryException as e:
  145             notification_status = fields.NotificationStatus.IGNORED
  146             exception_info = e
  147         except exception.SkipInstanceRecoveryException as e:
  148             notification_status = fields.NotificationStatus.FINISHED
  149         except (exception.MasakariException,
  150                 exception.InstanceRecoveryFailureException) as e:
  151             notification_status = fields.NotificationStatus.ERROR
  152             LOG.error("Failed to process notification '%(uuid)s'."
  153                       " Reason: %(error)s",
  154                       {"uuid": notification.notification_uuid,
  155                        "error": e.message})
  156             exception_info = e
  157 
  158         if exception_info:
  159             tb = traceback.format_exc()
  160             engine_utils.notify_about_notification_update(context,
  161                 notification,
  162                 action=fields.EventNotificationAction.NOTIFICATION_PROCESS,
  163                 phase=fields.EventNotificationPhase.ERROR,
  164                 exception=str(exception_info),
  165                 tb=tb)
  166         else:
  167             engine_utils.notify_about_notification_update(context,
  168                 notification,
  169                 action=fields.EventNotificationAction.NOTIFICATION_PROCESS,
  170                 phase=fields.EventNotificationPhase.END)
  171 
  172         return notification_status
  173 
  174     def _handle_notification_type_host(self, context, notification):
  175         host_status = notification.payload.get('host_status')
  176         notification_status = fields.NotificationStatus.FINISHED
  177         notification_event = notification.payload.get('event')
  178         exception_info = None
  179 
  180         if host_status.upper() != fields.HostStatusType.NORMAL:
  181             # NOTE(shilpasd): Avoid host recovery for host_status other than
  182             # 'NORMAL' otherwise it could lead to unsafe evacuation of
  183             # instances running on the failed source host.
  184             LOG.warning("Notification '%(uuid)s' ignored as host_status"
  185                         "is '%(host_status)s'",
  186                         {'uuid': notification.notification_uuid,
  187                          'host_status': host_status.upper()})
  188             notification_status = fields.NotificationStatus.IGNORED
  189         elif notification_event.upper() == 'STARTED':
  190             LOG.info("Notification type '%(type)s' received for host "
  191                      "'%(host_uuid)s' has been %(event)s.",
  192                      {'type': notification.type,
  193                       'host_uuid': notification.source_host_uuid,
  194                       'event': notification_event})
  195         elif notification_event.upper() == 'STOPPED':
  196             host_obj = objects.Host.get_by_uuid(
  197                 context, notification.source_host_uuid)
  198             host_name = host_obj.name
  199             recovery_method = host_obj.failover_segment.recovery_method
  200 
  201             # Mark host on_maintenance mode as True
  202             update_data = {
  203                 'on_maintenance': True,
  204             }
  205 
  206             # Set reserved flag to False if this host is reserved
  207             if host_obj.reserved:
  208                 update_data['reserved'] = False
  209 
  210             host_obj.update(update_data)
  211             host_obj.save()
  212 
  213             reserved_host_list = None
  214 
  215             if not recovery_method == (
  216                     fields.FailoverSegmentRecoveryMethod.AUTO):
  217                 reserved_host_object_list = objects.HostList.get_all(
  218                     context, filters={
  219                         'failover_segment_id': host_obj.failover_segment_id,
  220                         'reserved': True,
  221                         'on_maintenance': False
  222                         })
  223                 # Create list of host name from reserved_host_object_list
  224                 reserved_host_list = [host.name for host in
  225                                       reserved_host_object_list]
  226 
  227             try:
  228                 self.driver.execute_host_failure(
  229                     context, host_name, recovery_method,
  230                     notification.notification_uuid,
  231                     update_host_method=update_host_method,
  232                     reserved_host_list=reserved_host_list)
  233             except exception.SkipHostRecoveryException as e:
  234                 notification_status = fields.NotificationStatus.FINISHED
  235             except (exception.HostRecoveryFailureException,
  236                     exception.ReservedHostsUnavailable,
  237                     exception.MasakariException) as e:
  238                 notification_status = fields.NotificationStatus.ERROR
  239                 LOG.error("Failed to process notification '%(uuid)s'."
  240                           " Reason: %(error)s",
  241                           {"uuid": notification.notification_uuid,
  242                            "error": e.message})
  243                 exception_info = e
  244         else:
  245             LOG.warning("Invalid event: %(event)s received for "
  246                         "notification type: %(type)s",
  247                         {'event': notification_event,
  248                          'type': notification.type})
  249             notification_status = fields.NotificationStatus.IGNORED
  250 
  251         if exception_info:
  252             tb = traceback.format_exc()
  253             engine_utils.notify_about_notification_update(context,
  254                 notification,
  255                 action=fields.EventNotificationAction.NOTIFICATION_PROCESS,
  256                 phase=fields.EventNotificationPhase.ERROR,
  257                 exception=str(exception_info),
  258                 tb=tb)
  259         else:
  260             engine_utils.notify_about_notification_update(context,
  261                 notification,
  262                 action=fields.EventNotificationAction.NOTIFICATION_PROCESS,
  263                 phase=fields.EventNotificationPhase.END)
  264 
  265         return notification_status
  266 
  267     def _process_notification(self, context, notification):
  268         @utils.synchronized(notification.source_host_uuid, blocking=True)
  269         def do_process_notification(notification):
  270             LOG.info('Processing notification %(notification_uuid)s of '
  271                      'type: %(type)s',
  272                      {'notification_uuid': notification.notification_uuid,
  273                       'type': notification.type})
  274 
  275             # Get notification from db
  276             notification_db = objects.Notification.get_by_uuid(context,
  277                                         notification.notification_uuid)
  278 
  279             # NOTE(tpatil): To fix bug 1773132, process notification only
  280             # if the notification status is New and the current notification
  281             # from DB status is Not New to avoid recovering from failure twice
  282             if (notification.status == fields.NotificationStatus.NEW and
  283                     notification_db.status != fields.NotificationStatus.NEW):
  284                 LOG.warning("Processing of notification is skipped to avoid "
  285                             "recovering from failure twice. "
  286                             "Notification received is '%(uuid)s' "
  287                             "and it's status is '%(new_status)s' and the "
  288                             "current status of same notification in db "
  289                             "is '%(old_status)s'",
  290                             {"uuid": notification.notification_uuid,
  291                             "new_status": notification.status,
  292                             "old_status": notification_db.status})
  293                 return
  294 
  295             update_data = {
  296                 'status': fields.NotificationStatus.RUNNING,
  297             }
  298             notification.update(update_data)
  299             notification.save()
  300 
  301             if notification.type == fields.NotificationType.PROCESS:
  302                 notification_status = self._handle_notification_type_process(
  303                     context, notification)
  304             elif notification.type == fields.NotificationType.VM:
  305                 notification_status = self._handle_notification_type_instance(
  306                     context, notification)
  307             elif notification.type == fields.NotificationType.COMPUTE_HOST:
  308                 notification_status = self._handle_notification_type_host(
  309                     context, notification)
  310 
  311             LOG.info("Notification %(notification_uuid)s exits with "
  312                      "status: %(status)s.",
  313                      {'notification_uuid': notification.notification_uuid,
  314                       'status': notification_status})
  315 
  316             update_data = {
  317                 'status': notification_status
  318             }
  319             notification.update(update_data)
  320             notification.save()
  321 
  322         engine_utils.notify_about_notification_update(context,
  323             notification,
  324             action=fields.EventNotificationAction.NOTIFICATION_PROCESS,
  325             phase=fields.EventNotificationPhase.START)
  326 
  327         do_process_notification(notification)
  328 
  329     def process_notification(self, context, notification=None):
  330         """Processes the notification"""
  331         self._process_notification(context, notification)
  332 
  333     @periodic_task.periodic_task(
  334         spacing=CONF.process_unfinished_notifications_interval)
  335     def _process_unfinished_notifications(self, context):
  336         filters = {
  337             'status': [fields.NotificationStatus.ERROR,
  338                        fields.NotificationStatus.NEW]
  339         }
  340         notifications_list = objects.NotificationList.get_all(context,
  341                                                               filters=filters)
  342 
  343         for notification in notifications_list:
  344             if (notification.status == fields.NotificationStatus.ERROR or
  345                     (notification.status == fields.NotificationStatus.NEW and
  346                 timeutils.is_older_than(
  347                     notification.generated_time,
  348                     CONF.retry_notification_new_status_interval))):
  349                 self._process_notification(context, notification)
  350 
  351             # get updated notification from db after workflow execution
  352             notification_db = objects.Notification.get_by_uuid(
  353                 context, notification.notification_uuid)
  354 
  355             if notification_db.status == fields.NotificationStatus.ERROR:
  356                 # update notification status as failed
  357                 notification_status = fields.NotificationStatus.FAILED
  358                 update_data = {
  359                     'status': notification_status
  360                 }
  361 
  362                 notification_db.update(update_data)
  363                 notification_db.save()
  364                 LOG.error(
  365                     "Periodic task 'process_unfinished_notifications': "
  366                     "Notification %(notification_uuid)s exits with "
  367                     "status: %(status)s.",
  368                     {'notification_uuid': notification.notification_uuid,
  369                      'status': notification_status})
  370 
  371     def get_notification_recovery_workflow_details(self, context,
  372                                                    notification):
  373         """Retrieve recovery workflow details of the notification"""
  374         try:
  375             host_obj = objects.Host.get_by_uuid(
  376                 context, notification.source_host_uuid)
  377             recovery_method = host_obj.failover_segment.recovery_method
  378 
  379             progress_details = (
  380                 self.driver.get_notification_recovery_workflow_details(
  381                     context, recovery_method, notification))
  382             notification['recovery_workflow_details'] = progress_details
  383         except Exception:
  384             msg = (_('Failed to fetch notification recovery workflow details '
  385                      'for %s') % notification.notification_uuid)
  386             LOG.exception(msg)
  387             raise exception.MasakariException(msg)
  388 
  389         return notification