"Fossies" - the Fresh Open Source Software Archive

Member "octavia-8.0.0/octavia/controller/worker/v2/controller_worker.py" (14 Apr 2021, 49748 Bytes) of package /linux/misc/openstack/octavia-8.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "controller_worker.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 7.1.1_vs_8.0.0.

    1 # Copyright 2015 Hewlett-Packard Development Company, L.P.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 # http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 #
   15 
   16 from octavia_lib.common import constants as lib_consts
   17 from oslo_config import cfg
   18 from oslo_log import log as logging
   19 from oslo_utils import excutils
   20 from sqlalchemy.orm import exc as db_exceptions
   21 from stevedore import driver as stevedore_driver
   22 from taskflow.listeners import logging as tf_logging
   23 import tenacity
   24 
   25 from octavia.amphorae.driver_exceptions import exceptions as driver_exc
   26 from octavia.api.drivers import utils as provider_utils
   27 from octavia.common import base_taskflow
   28 from octavia.common import constants
   29 from octavia.common import exceptions
   30 from octavia.common import utils
   31 from octavia.controller.worker.v2.flows import flow_utils
   32 from octavia.controller.worker.v2 import taskflow_jobboard_driver as tsk_driver
   33 from octavia.db import api as db_apis
   34 from octavia.db import repositories as repo
   35 
   36 CONF = cfg.CONF
   37 LOG = logging.getLogger(__name__)
   38 
   39 
   40 # We do not need to log retry exception information. Warning "Could not connect
   41 #  to instance" will be logged as usual.
   42 def retryMaskFilter(record):
   43     if record.exc_info is not None and isinstance(
   44             record.exc_info[1], driver_exc.AmpConnectionRetry):
   45         return False
   46     return True
   47 
   48 
   49 LOG.logger.addFilter(retryMaskFilter)
   50 
   51 
   52 def _is_provisioning_status_pending_update(lb_obj):
   53     return not lb_obj.provisioning_status == constants.PENDING_UPDATE
   54 
   55 
   56 class ControllerWorker(object):
   57 
   58     def __init__(self):
   59 
   60         self._amphora_repo = repo.AmphoraRepository()
   61         self._amphora_health_repo = repo.AmphoraHealthRepository()
   62         self._health_mon_repo = repo.HealthMonitorRepository()
   63         self._lb_repo = repo.LoadBalancerRepository()
   64         self._listener_repo = repo.ListenerRepository()
   65         self._member_repo = repo.MemberRepository()
   66         self._pool_repo = repo.PoolRepository()
   67         self._l7policy_repo = repo.L7PolicyRepository()
   68         self._l7rule_repo = repo.L7RuleRepository()
   69         self._flavor_repo = repo.FlavorRepository()
   70         self._az_repo = repo.AvailabilityZoneRepository()
   71 
   72         if CONF.task_flow.jobboard_enabled:
   73             persistence = tsk_driver.MysqlPersistenceDriver()
   74 
   75             self.jobboard_driver = stevedore_driver.DriverManager(
   76                 namespace='octavia.worker.jobboard_driver',
   77                 name=CONF.task_flow.jobboard_backend_driver,
   78                 invoke_args=(persistence,),
   79                 invoke_on_load=True).driver
   80         else:
   81             self.tf_engine = base_taskflow.BaseTaskFlowEngine()
   82 
   83     @tenacity.retry(
   84         retry=(
   85             tenacity.retry_if_result(_is_provisioning_status_pending_update) |
   86             tenacity.retry_if_exception_type()),
   87         wait=tenacity.wait_incrementing(
   88             CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
   89             CONF.haproxy_amphora.api_db_commit_retry_backoff,
   90             CONF.haproxy_amphora.api_db_commit_retry_max),
   91         stop=tenacity.stop_after_attempt(
   92             CONF.haproxy_amphora.api_db_commit_retry_attempts))
   93     def _get_db_obj_until_pending_update(self, repo, id):
   94 
   95         return repo.get(db_apis.get_session(), id=id)
   96 
   97     @property
   98     def services_controller(self):
   99         return base_taskflow.TaskFlowServiceController(self.jobboard_driver)
  100 
  101     def run_flow(self, func, *args, **kwargs):
  102         if CONF.task_flow.jobboard_enabled:
  103             self.services_controller.run_poster(func, *args, **kwargs)
  104         else:
  105             tf = self.tf_engine.taskflow_load(
  106                 func(*args), **kwargs)
  107             with tf_logging.DynamicLoggingListener(tf, log=LOG):
  108                 tf.run()
  109 
  110     def create_amphora(self, availability_zone=None):
  111         """Creates an Amphora.
  112 
  113         This is used to create spare amphora.
  114 
  115         :returns: uuid
  116         """
  117         try:
  118             store = {constants.BUILD_TYPE_PRIORITY:
  119                      constants.LB_CREATE_SPARES_POOL_PRIORITY,
  120                      constants.FLAVOR: None,
  121                      constants.SERVER_GROUP_ID: None,
  122                      constants.AVAILABILITY_ZONE: None}
  123             if availability_zone:
  124                 store[constants.AVAILABILITY_ZONE] = (
  125                     self._az_repo.get_availability_zone_metadata_dict(
  126                         db_apis.get_session(), availability_zone))
  127             self.run_flow(
  128                 flow_utils.get_create_amphora_flow,
  129                 store=store, wait=True)
  130         except Exception as e:
  131             LOG.error('Failed to create an amphora due to: %s', str(e))
  132 
  133     def delete_amphora(self, amphora_id):
  134         """Deletes an existing Amphora.
  135 
  136         :param amphora_id: ID of the amphora to delete
  137         :returns: None
  138         :raises AmphoraNotFound: The referenced Amphora was not found
  139         """
  140         try:
  141             amphora = self._amphora_repo.get(db_apis.get_session(),
  142                                              id=amphora_id)
  143             store = {constants.AMPHORA: amphora.to_dict()}
  144             self.run_flow(
  145                 flow_utils.get_delete_amphora_flow,
  146                 store=store)
  147         except Exception as e:
  148             LOG.error('Failed to delete a amphora %s due to: %s',
  149                       amphora_id, str(e))
  150             return
  151         LOG.info('Finished deleting amphora %s.', amphora_id)
  152 
  153     @tenacity.retry(
  154         retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
  155         wait=tenacity.wait_incrementing(
  156             CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
  157             CONF.haproxy_amphora.api_db_commit_retry_backoff,
  158             CONF.haproxy_amphora.api_db_commit_retry_max),
  159         stop=tenacity.stop_after_attempt(
  160             CONF.haproxy_amphora.api_db_commit_retry_attempts))
  161     def create_health_monitor(self, health_monitor):
  162         """Creates a health monitor.
  163 
  164         :param health_monitor: Provider health monitor dict
  165         :returns: None
  166         :raises NoResultFound: Unable to find the object
  167         """
  168         db_health_monitor = self._health_mon_repo.get(
  169             db_apis.get_session(),
  170             id=health_monitor[constants.HEALTHMONITOR_ID])
  171 
  172         if not db_health_monitor:
  173             LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
  174                         '60 seconds.', 'healthmonitor',
  175                         health_monitor[constants.HEALTHMONITOR_ID])
  176             raise db_exceptions.NoResultFound
  177 
  178         pool = db_health_monitor.pool
  179         pool.health_monitor = db_health_monitor
  180         load_balancer = pool.load_balancer
  181         provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
  182             load_balancer).to_dict()
  183 
  184         listeners_dicts = (
  185             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  186                 pool.listeners))
  187 
  188         store = {constants.HEALTH_MON: health_monitor,
  189                  constants.POOL_ID: pool.id,
  190                  constants.LISTENERS: listeners_dicts,
  191                  constants.LOADBALANCER_ID: load_balancer.id,
  192                  constants.LOADBALANCER: provider_lb}
  193         self.run_flow(
  194             flow_utils.get_create_health_monitor_flow,
  195             store=store)
  196 
  197     def delete_health_monitor(self, health_monitor):
  198         """Deletes a health monitor.
  199 
  200         :param health_monitor: Provider health monitor dict
  201         :returns: None
  202         :raises HMNotFound: The referenced health monitor was not found
  203         """
  204         db_health_monitor = self._health_mon_repo.get(
  205             db_apis.get_session(),
  206             id=health_monitor[constants.HEALTHMONITOR_ID])
  207 
  208         pool = db_health_monitor.pool
  209         load_balancer = pool.load_balancer
  210         provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
  211             load_balancer).to_dict()
  212 
  213         listeners_dicts = (
  214             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  215                 pool.listeners))
  216 
  217         store = {constants.HEALTH_MON: health_monitor,
  218                  constants.POOL_ID: pool.id,
  219                  constants.LISTENERS: listeners_dicts,
  220                  constants.LOADBALANCER_ID: load_balancer.id,
  221                  constants.LOADBALANCER: provider_lb,
  222                  constants.PROJECT_ID: load_balancer.project_id}
  223         self.run_flow(
  224             flow_utils.get_delete_health_monitor_flow,
  225             store=store)
  226 
  227     def update_health_monitor(self, original_health_monitor,
  228                               health_monitor_updates):
  229         """Updates a health monitor.
  230 
  231         :param original_health_monitor: Provider health monitor dict
  232         :param health_monitor_updates: Dict containing updated health monitor
  233         :returns: None
  234         :raises HMNotFound: The referenced health monitor was not found
  235         """
  236         try:
  237             db_health_monitor = self._get_db_obj_until_pending_update(
  238                 self._health_mon_repo,
  239                 original_health_monitor[constants.HEALTHMONITOR_ID])
  240         except tenacity.RetryError as e:
  241             LOG.warning('Health monitor did not go into %s in 60 seconds. '
  242                         'This either due to an in-progress Octavia upgrade '
  243                         'or an overloaded and failing database. Assuming '
  244                         'an upgrade is in progress and continuing.',
  245                         constants.PENDING_UPDATE)
  246             db_health_monitor = e.last_attempt.result()
  247 
  248         pool = db_health_monitor.pool
  249 
  250         listeners_dicts = (
  251             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  252                 pool.listeners))
  253 
  254         load_balancer = pool.load_balancer
  255         provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
  256             load_balancer).to_dict()
  257 
  258         store = {constants.HEALTH_MON: original_health_monitor,
  259                  constants.POOL_ID: pool.id,
  260                  constants.LISTENERS: listeners_dicts,
  261                  constants.LOADBALANCER_ID: load_balancer.id,
  262                  constants.LOADBALANCER: provider_lb,
  263                  constants.UPDATE_DICT: health_monitor_updates}
  264         self.run_flow(
  265             flow_utils.get_update_health_monitor_flow,
  266             store=store)
  267 
  268     @tenacity.retry(
  269         retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
  270         wait=tenacity.wait_incrementing(
  271             CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
  272             CONF.haproxy_amphora.api_db_commit_retry_backoff,
  273             CONF.haproxy_amphora.api_db_commit_retry_max),
  274         stop=tenacity.stop_after_attempt(
  275             CONF.haproxy_amphora.api_db_commit_retry_attempts))
  276     def create_listener(self, listener):
  277         """Creates a listener.
  278 
  279         :param listener: A listener provider dictionary.
  280         :returns: None
  281         :raises NoResultFound: Unable to find the object
  282         """
  283         db_listener = self._listener_repo.get(
  284             db_apis.get_session(), id=listener[constants.LISTENER_ID])
  285         if not db_listener:
  286             LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
  287                         '60 seconds.', 'listener',
  288                         listener[constants.LISTENER_ID])
  289             raise db_exceptions.NoResultFound
  290 
  291         load_balancer = db_listener.load_balancer
  292         listeners = load_balancer.listeners
  293         dict_listeners = []
  294         for li in listeners:
  295             dict_listeners.append(
  296                 provider_utils.db_listener_to_provider_listener(li).to_dict())
  297         provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
  298             load_balancer).to_dict()
  299 
  300         store = {constants.LISTENERS: dict_listeners,
  301                  constants.LOADBALANCER: provider_lb,
  302                  constants.LOADBALANCER_ID: load_balancer.id}
  303 
  304         self.run_flow(
  305             flow_utils.get_create_listener_flow,
  306             store=store)
  307 
  308     def delete_listener(self, listener):
  309         """Deletes a listener.
  310 
  311         :param listener: A listener provider dictionary to delete
  312         :returns: None
  313         :raises ListenerNotFound: The referenced listener was not found
  314         """
  315         store = {constants.LISTENER: listener,
  316                  constants.LOADBALANCER_ID:
  317                      listener[constants.LOADBALANCER_ID],
  318                  constants.PROJECT_ID: listener[constants.PROJECT_ID]}
  319         self.run_flow(
  320             flow_utils.get_delete_listener_flow,
  321             store=store)
  322 
  323     def update_listener(self, listener, listener_updates):
  324         """Updates a listener.
  325 
  326         :param listener: A listener provider dictionary to update
  327         :param listener_updates: Dict containing updated listener attributes
  328         :returns: None
  329         :raises ListenerNotFound: The referenced listener was not found
  330         """
  331         db_lb = self._lb_repo.get(db_apis.get_session(),
  332                                   id=listener[constants.LOADBALANCER_ID])
  333         store = {constants.LISTENER: listener,
  334                  constants.UPDATE_DICT: listener_updates,
  335                  constants.LOADBALANCER_ID: db_lb.id,
  336                  constants.LISTENERS: [listener]}
  337         self.run_flow(
  338             flow_utils.get_update_listener_flow,
  339             store=store)
  340 
  341     @tenacity.retry(
  342         retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
  343         wait=tenacity.wait_incrementing(
  344             CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
  345             CONF.haproxy_amphora.api_db_commit_retry_backoff,
  346             CONF.haproxy_amphora.api_db_commit_retry_max),
  347         stop=tenacity.stop_after_attempt(
  348             CONF.haproxy_amphora.api_db_commit_retry_attempts))
  349     def create_load_balancer(self, loadbalancer, flavor=None,
  350                              availability_zone=None):
  351         """Creates a load balancer by allocating Amphorae.
  352 
  353         First tries to allocate an existing Amphora in READY state.
  354         If none are available it will attempt to build one specifically
  355         for this load balancer.
  356 
  357         :param loadbalancer: The dict of load balancer to create
  358         :returns: None
  359         :raises NoResultFound: Unable to find the object
  360         """
  361         lb = self._lb_repo.get(db_apis.get_session(),
  362                                id=loadbalancer[constants.LOADBALANCER_ID])
  363         if not lb:
  364             LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
  365                         '60 seconds.', 'load_balancer',
  366                         loadbalancer[constants.LOADBALANCER_ID])
  367             raise db_exceptions.NoResultFound
  368 
  369         store = {lib_consts.LOADBALANCER_ID:
  370                  loadbalancer[lib_consts.LOADBALANCER_ID],
  371                  constants.BUILD_TYPE_PRIORITY:
  372                  constants.LB_CREATE_NORMAL_PRIORITY,
  373                  lib_consts.FLAVOR: flavor,
  374                  lib_consts.AVAILABILITY_ZONE: availability_zone}
  375 
  376         topology = lb.topology
  377         if (not CONF.nova.enable_anti_affinity or
  378                 topology == constants.TOPOLOGY_SINGLE):
  379             store[constants.SERVER_GROUP_ID] = None
  380 
  381         listeners_dicts = (
  382             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  383                 lb.listeners)
  384         )
  385 
  386         store[constants.UPDATE_DICT] = {
  387             constants.TOPOLOGY: topology
  388         }
  389         self.run_flow(
  390             flow_utils.get_create_load_balancer_flow,
  391             topology, listeners=listeners_dicts,
  392             store=store)
  393 
  394     def delete_load_balancer(self, load_balancer, cascade=False):
  395         """Deletes a load balancer by de-allocating Amphorae.
  396 
  397         :param load_balancer: Dict of the load balancer to delete
  398         :returns: None
  399         :raises LBNotFound: The referenced load balancer was not found
  400         """
  401         loadbalancer_id = load_balancer[constants.LOADBALANCER_ID]
  402         db_lb = self._lb_repo.get(db_apis.get_session(), id=loadbalancer_id)
  403         store = {constants.LOADBALANCER: load_balancer,
  404                  constants.LOADBALANCER_ID: loadbalancer_id,
  405                  constants.SERVER_GROUP_ID: db_lb.server_group_id,
  406                  constants.PROJECT_ID: db_lb.project_id}
  407         if cascade:
  408             listeners = flow_utils.get_listeners_on_lb(db_lb)
  409             pools = flow_utils.get_pools_on_lb(db_lb)
  410 
  411             self.run_flow(
  412                 flow_utils.get_cascade_delete_load_balancer_flow,
  413                 load_balancer, listeners, pools, store=store)
  414         else:
  415             self.run_flow(
  416                 flow_utils.get_delete_load_balancer_flow,
  417                 load_balancer, store=store)
  418 
  419     def update_load_balancer(self, original_load_balancer,
  420                              load_balancer_updates):
  421         """Updates a load balancer.
  422 
  423         :param original_load_balancer: Dict of the load balancer to update
  424         :param load_balancer_updates: Dict containing updated load balancer
  425         :returns: None
  426         :raises LBNotFound: The referenced load balancer was not found
  427         """
  428         store = {constants.LOADBALANCER: original_load_balancer,
  429                  constants.LOADBALANCER_ID:
  430                      original_load_balancer[constants.LOADBALANCER_ID],
  431                  constants.UPDATE_DICT: load_balancer_updates}
  432 
  433         self.run_flow(
  434             flow_utils.get_update_load_balancer_flow,
  435             store=store)
  436 
  437     @tenacity.retry(
  438         retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
  439         wait=tenacity.wait_incrementing(
  440             CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
  441             CONF.haproxy_amphora.api_db_commit_retry_backoff,
  442             CONF.haproxy_amphora.api_db_commit_retry_max),
  443         stop=tenacity.stop_after_attempt(
  444             CONF.haproxy_amphora.api_db_commit_retry_attempts))
  445     def create_member(self, member):
  446         """Creates a pool member.
  447 
  448         :param member: A member provider dictionary to create
  449         :returns: None
  450         :raises NoSuitablePool: Unable to find the node pool
  451         """
  452         pool = self._pool_repo.get(db_apis.get_session(),
  453                                    id=member[constants.POOL_ID])
  454         load_balancer = pool.load_balancer
  455         provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
  456             load_balancer).to_dict()
  457 
  458         listeners_dicts = (
  459             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  460                 pool.listeners))
  461 
  462         store = {
  463             constants.MEMBER: member,
  464             constants.LISTENERS: listeners_dicts,
  465             constants.LOADBALANCER_ID: load_balancer.id,
  466             constants.LOADBALANCER: provider_lb,
  467             constants.POOL_ID: pool.id}
  468         if load_balancer.availability_zone:
  469             store[constants.AVAILABILITY_ZONE] = (
  470                 self._az_repo.get_availability_zone_metadata_dict(
  471                     db_apis.get_session(), load_balancer.availability_zone))
  472         else:
  473             store[constants.AVAILABILITY_ZONE] = {}
  474 
  475         self.run_flow(
  476             flow_utils.get_create_member_flow,
  477             store=store)
  478 
  479     def delete_member(self, member):
  480         """Deletes a pool member.
  481 
  482         :param member: A member provider dictionary to delete
  483         :returns: None
  484         :raises MemberNotFound: The referenced member was not found
  485         """
  486         pool = self._pool_repo.get(db_apis.get_session(),
  487                                    id=member[constants.POOL_ID])
  488 
  489         load_balancer = pool.load_balancer
  490         provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
  491             load_balancer).to_dict()
  492 
  493         listeners_dicts = (
  494             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  495                 pool.listeners))
  496 
  497         store = {
  498             constants.MEMBER: member,
  499             constants.LISTENERS: listeners_dicts,
  500             constants.LOADBALANCER_ID: load_balancer.id,
  501             constants.LOADBALANCER: provider_lb,
  502             constants.POOL_ID: pool.id,
  503             constants.PROJECT_ID: load_balancer.project_id}
  504         if load_balancer.availability_zone:
  505             store[constants.AVAILABILITY_ZONE] = (
  506                 self._az_repo.get_availability_zone_metadata_dict(
  507                     db_apis.get_session(), load_balancer.availability_zone))
  508         else:
  509             store[constants.AVAILABILITY_ZONE] = {}
  510 
  511         self.run_flow(
  512             flow_utils.get_delete_member_flow,
  513             store=store)
  514 
  515     def batch_update_members(self, old_members, new_members,
  516                              updated_members):
  517         updated_members = [
  518             (provider_utils.db_member_to_provider_member(
  519                 self._member_repo.get(db_apis.get_session(),
  520                                       id=m.get(constants.ID))).to_dict(),
  521              m)
  522             for m in updated_members]
  523         provider_old_members = [
  524             provider_utils.db_member_to_provider_member(
  525                 self._member_repo.get(db_apis.get_session(),
  526                                       id=m.get(constants.ID))).to_dict()
  527             for m in old_members]
  528         if old_members:
  529             pool = self._pool_repo.get(db_apis.get_session(),
  530                                        id=old_members[0][constants.POOL_ID])
  531         elif new_members:
  532             pool = self._pool_repo.get(db_apis.get_session(),
  533                                        id=new_members[0][constants.POOL_ID])
  534         else:
  535             pool = self._pool_repo.get(
  536                 db_apis.get_session(),
  537                 id=updated_members[0][0][constants.POOL_ID])
  538         load_balancer = pool.load_balancer
  539 
  540         listeners_dicts = (
  541             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  542                 pool.listeners))
  543         provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
  544             load_balancer).to_dict()
  545 
  546         store = {
  547             constants.LISTENERS: listeners_dicts,
  548             constants.LOADBALANCER_ID: load_balancer.id,
  549             constants.LOADBALANCER: provider_lb,
  550             constants.POOL_ID: pool.id,
  551             constants.PROJECT_ID: load_balancer.project_id}
  552         if load_balancer.availability_zone:
  553             store[constants.AVAILABILITY_ZONE] = (
  554                 self._az_repo.get_availability_zone_metadata_dict(
  555                     db_apis.get_session(), load_balancer.availability_zone))
  556         else:
  557             store[constants.AVAILABILITY_ZONE] = {}
  558 
  559         self.run_flow(
  560             flow_utils.get_batch_update_members_flow,
  561             provider_old_members, new_members, updated_members,
  562             store=store)
  563 
  564     def update_member(self, member, member_updates):
  565         """Updates a pool member.
  566 
  567         :param member_id: A member provider dictionary  to update
  568         :param member_updates: Dict containing updated member attributes
  569         :returns: None
  570         :raises MemberNotFound: The referenced member was not found
  571         """
  572         # TODO(ataraday) when other flows will use dicts - revisit this
  573         pool = self._pool_repo.get(db_apis.get_session(),
  574                                    id=member[constants.POOL_ID])
  575         load_balancer = pool.load_balancer
  576         provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
  577             load_balancer).to_dict()
  578 
  579         listeners_dicts = (
  580             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  581                 pool.listeners))
  582         store = {
  583             constants.MEMBER: member,
  584             constants.LISTENERS: listeners_dicts,
  585             constants.LOADBALANCER_ID: load_balancer.id,
  586             constants.LOADBALANCER: provider_lb,
  587             constants.POOL_ID: pool.id,
  588             constants.UPDATE_DICT: member_updates}
  589         if load_balancer.availability_zone:
  590             store[constants.AVAILABILITY_ZONE] = (
  591                 self._az_repo.get_availability_zone_metadata_dict(
  592                     db_apis.get_session(), load_balancer.availability_zone))
  593         else:
  594             store[constants.AVAILABILITY_ZONE] = {}
  595 
  596         self.run_flow(
  597             flow_utils.get_update_member_flow,
  598             store=store)
  599 
  600     @tenacity.retry(
  601         retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
  602         wait=tenacity.wait_incrementing(
  603             CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
  604             CONF.haproxy_amphora.api_db_commit_retry_backoff,
  605             CONF.haproxy_amphora.api_db_commit_retry_max),
  606         stop=tenacity.stop_after_attempt(
  607             CONF.haproxy_amphora.api_db_commit_retry_attempts))
  608     def create_pool(self, pool):
  609         """Creates a node pool.
  610 
  611         :param pool: Provider pool dict to create
  612         :returns: None
  613         :raises NoResultFound: Unable to find the object
  614         """
  615 
  616         # TODO(ataraday) It seems we need to get db pool here anyway to get
  617         # proper listeners
  618         db_pool = self._pool_repo.get(db_apis.get_session(),
  619                                       id=pool[constants.POOL_ID])
  620         if not db_pool:
  621             LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
  622                         '60 seconds.', 'pool', pool[constants.POOL_ID])
  623             raise db_exceptions.NoResultFound
  624 
  625         load_balancer = db_pool.load_balancer
  626         provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
  627             load_balancer).to_dict()
  628 
  629         listeners_dicts = (
  630             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  631                 db_pool.listeners))
  632 
  633         store = {constants.POOL_ID: pool[constants.POOL_ID],
  634                  constants.LISTENERS: listeners_dicts,
  635                  constants.LOADBALANCER_ID: load_balancer.id,
  636                  constants.LOADBALANCER: provider_lb}
  637         self.run_flow(
  638             flow_utils.get_create_pool_flow,
  639             store=store)
  640 
  641     def delete_pool(self, pool):
  642         """Deletes a node pool.
  643 
  644         :param pool: Provider pool dict to delete
  645         :returns: None
  646         :raises PoolNotFound: The referenced pool was not found
  647         """
  648         db_pool = self._pool_repo.get(db_apis.get_session(),
  649                                       id=pool[constants.POOL_ID])
  650 
  651         listeners_dicts = (
  652             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  653                 db_pool.listeners))
  654         load_balancer = db_pool.load_balancer
  655 
  656         provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
  657             load_balancer).to_dict()
  658 
  659         store = {constants.POOL_ID: pool[constants.POOL_ID],
  660                  constants.LISTENERS: listeners_dicts,
  661                  constants.LOADBALANCER: provider_lb,
  662                  constants.LOADBALANCER_ID: load_balancer.id,
  663                  constants.PROJECT_ID: db_pool.project_id}
  664         self.run_flow(
  665             flow_utils.get_delete_pool_flow,
  666             store=store)
  667 
  668     def update_pool(self, origin_pool, pool_updates):
  669         """Updates a node pool.
  670 
  671         :param origin_pool: Provider pool dict to update
  672         :param pool_updates: Dict containing updated pool attributes
  673         :returns: None
  674         :raises PoolNotFound: The referenced pool was not found
  675         """
  676         try:
  677             db_pool = self._get_db_obj_until_pending_update(
  678                 self._pool_repo, origin_pool[constants.POOL_ID])
  679         except tenacity.RetryError as e:
  680             LOG.warning('Pool did not go into %s in 60 seconds. '
  681                         'This either due to an in-progress Octavia upgrade '
  682                         'or an overloaded and failing database. Assuming '
  683                         'an upgrade is in progress and continuing.',
  684                         constants.PENDING_UPDATE)
  685             db_pool = e.last_attempt.result()
  686 
  687         load_balancer = db_pool.load_balancer
  688         provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
  689             load_balancer).to_dict()
  690 
  691         listeners_dicts = (
  692             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  693                 db_pool.listeners))
  694 
  695         store = {constants.POOL_ID: db_pool.id,
  696                  constants.LISTENERS: listeners_dicts,
  697                  constants.LOADBALANCER: provider_lb,
  698                  constants.LOADBALANCER_ID: load_balancer.id,
  699                  constants.UPDATE_DICT: pool_updates}
  700         self.run_flow(
  701             flow_utils.get_update_pool_flow,
  702             store=store)
  703 
  704     @tenacity.retry(
  705         retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
  706         wait=tenacity.wait_incrementing(
  707             CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
  708             CONF.haproxy_amphora.api_db_commit_retry_backoff,
  709             CONF.haproxy_amphora.api_db_commit_retry_max),
  710         stop=tenacity.stop_after_attempt(
  711             CONF.haproxy_amphora.api_db_commit_retry_attempts))
  712     def create_l7policy(self, l7policy):
  713         """Creates an L7 Policy.
  714 
  715         :param l7policy: Provider dict of the l7policy to create
  716         :returns: None
  717         :raises NoResultFound: Unable to find the object
  718         """
  719         db_listener = self._listener_repo.get(
  720             db_apis.get_session(), id=l7policy[constants.LISTENER_ID])
  721 
  722         listeners_dicts = (
  723             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  724                 [db_listener]))
  725 
  726         store = {constants.L7POLICY: l7policy,
  727                  constants.LISTENERS: listeners_dicts,
  728                  constants.LOADBALANCER_ID: db_listener.load_balancer.id
  729                  }
  730         self.run_flow(
  731             flow_utils.get_create_l7policy_flow,
  732             store=store)
  733 
  734     def delete_l7policy(self, l7policy):
  735         """Deletes an L7 policy.
  736 
  737         :param l7policy: Provider dict of the l7policy to delete
  738         :returns: None
  739         :raises L7PolicyNotFound: The referenced l7policy was not found
  740         """
  741         db_listener = self._listener_repo.get(
  742             db_apis.get_session(), id=l7policy[constants.LISTENER_ID])
  743         listeners_dicts = (
  744             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  745                 [db_listener]))
  746 
  747         store = {constants.L7POLICY: l7policy,
  748                  constants.LISTENERS: listeners_dicts,
  749                  constants.LOADBALANCER_ID: db_listener.load_balancer.id
  750                  }
  751         self.run_flow(
  752             flow_utils.get_delete_l7policy_flow,
  753             store=store)
  754 
  755     def update_l7policy(self, original_l7policy, l7policy_updates):
  756         """Updates an L7 policy.
  757 
  758         :param l7policy: Provider dict of the l7policy to update
  759         :param l7policy_updates: Dict containing updated l7policy attributes
  760         :returns: None
  761         :raises L7PolicyNotFound: The referenced l7policy was not found
  762         """
  763         db_listener = self._listener_repo.get(
  764             db_apis.get_session(), id=original_l7policy[constants.LISTENER_ID])
  765 
  766         listeners_dicts = (
  767             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  768                 [db_listener]))
  769 
  770         store = {constants.L7POLICY: original_l7policy,
  771                  constants.LISTENERS: listeners_dicts,
  772                  constants.LOADBALANCER_ID: db_listener.load_balancer.id,
  773                  constants.UPDATE_DICT: l7policy_updates}
  774         self.run_flow(
  775             flow_utils.get_update_l7policy_flow,
  776             store=store)
  777 
  778     @tenacity.retry(
  779         retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
  780         wait=tenacity.wait_incrementing(
  781             CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
  782             CONF.haproxy_amphora.api_db_commit_retry_backoff,
  783             CONF.haproxy_amphora.api_db_commit_retry_max),
  784         stop=tenacity.stop_after_attempt(
  785             CONF.haproxy_amphora.api_db_commit_retry_attempts))
  786     def create_l7rule(self, l7rule):
  787         """Creates an L7 Rule.
  788 
  789         :param l7rule: Provider dict l7rule
  790         :returns: None
  791         :raises NoResultFound: Unable to find the object
  792         """
  793         db_l7policy = self._l7policy_repo.get(db_apis.get_session(),
  794                                               id=l7rule[constants.L7POLICY_ID])
  795 
  796         load_balancer = db_l7policy.listener.load_balancer
  797 
  798         listeners_dicts = (
  799             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  800                 [db_l7policy.listener]))
  801         l7policy_dict = provider_utils.db_l7policy_to_provider_l7policy(
  802             db_l7policy)
  803 
  804         store = {constants.L7RULE: l7rule,
  805                  constants.L7POLICY: l7policy_dict.to_dict(),
  806                  constants.L7POLICY_ID: db_l7policy.id,
  807                  constants.LISTENERS: listeners_dicts,
  808                  constants.LOADBALANCER_ID: load_balancer.id
  809                  }
  810         self.run_flow(
  811             flow_utils.get_create_l7rule_flow,
  812             store=store)
  813 
  814     def delete_l7rule(self, l7rule):
  815         """Deletes an L7 rule.
  816 
  817         :param l7rule: Provider dict of the l7rule to delete
  818         :returns: None
  819         :raises L7RuleNotFound: The referenced l7rule was not found
  820         """
  821         db_l7policy = self._l7policy_repo.get(db_apis.get_session(),
  822                                               id=l7rule[constants.L7POLICY_ID])
  823         l7policy = provider_utils.db_l7policy_to_provider_l7policy(db_l7policy)
  824         load_balancer = db_l7policy.listener.load_balancer
  825 
  826         listeners_dicts = (
  827             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  828                 [db_l7policy.listener]))
  829 
  830         store = {constants.L7RULE: l7rule,
  831                  constants.L7POLICY: l7policy.to_dict(),
  832                  constants.LISTENERS: listeners_dicts,
  833                  constants.L7POLICY_ID: db_l7policy.id,
  834                  constants.LOADBALANCER_ID: load_balancer.id
  835                  }
  836         self.run_flow(
  837             flow_utils.get_delete_l7rule_flow,
  838             store=store)
  839 
  840     def update_l7rule(self, original_l7rule, l7rule_updates):
  841         """Updates an L7 rule.
  842 
  843         :param l7rule: Origin dict of the l7rule to update
  844         :param l7rule_updates: Dict containing updated l7rule attributes
  845         :returns: None
  846         :raises L7RuleNotFound: The referenced l7rule was not found
  847         """
  848         db_l7policy = self._l7policy_repo.get(
  849             db_apis.get_session(), id=original_l7rule[constants.L7POLICY_ID])
  850         load_balancer = db_l7policy.listener.load_balancer
  851 
  852         listeners_dicts = (
  853             provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
  854                 [db_l7policy.listener]))
  855         l7policy_dict = provider_utils.db_l7policy_to_provider_l7policy(
  856             db_l7policy)
  857 
  858         store = {constants.L7RULE: original_l7rule,
  859                  constants.L7POLICY: l7policy_dict.to_dict(),
  860                  constants.LISTENERS: listeners_dicts,
  861                  constants.L7POLICY_ID: db_l7policy.id,
  862                  constants.LOADBALANCER_ID: load_balancer.id,
  863                  constants.UPDATE_DICT: l7rule_updates}
  864         self.run_flow(
  865             flow_utils.get_update_l7rule_flow,
  866             store=store)
  867 
  868     def failover_amphora(self, amphora_id, reraise=False):
  869         """Perform failover operations for an amphora.
  870 
  871         Note: This expects the load balancer to already be in
  872         provisioning_status=PENDING_UPDATE state.
  873 
  874         :param amphora_id: ID for amphora to failover
  875         :param reraise: If enabled reraise any caught exception
  876         :returns: None
  877         :raises octavia.common.exceptions.NotFound: The referenced amphora was
  878                                                     not found
  879         """
  880         amphora = None
  881         try:
  882             amphora = self._amphora_repo.get(db_apis.get_session(),
  883                                              id=amphora_id)
  884             if amphora is None:
  885                 LOG.error('Amphora failover for amphora %s failed because '
  886                           'there is no record of this amphora in the '
  887                           'database. Check that the [house_keeping] '
  888                           'amphora_expiry_age configuration setting is not '
  889                           'too short. Skipping failover.', amphora_id)
  890                 raise exceptions.NotFound(resource=constants.AMPHORA,
  891                                           id=amphora_id)
  892 
  893             if amphora.status == constants.DELETED:
  894                 LOG.warning('Amphora %s is marked DELETED in the database but '
  895                             'was submitted for failover. Deleting it from the '
  896                             'amphora health table to exclude it from health '
  897                             'checks and skipping the failover.', amphora.id)
  898                 self._amphora_health_repo.delete(db_apis.get_session(),
  899                                                  amphora_id=amphora.id)
  900                 return
  901 
  902             loadbalancer = None
  903             if amphora.load_balancer_id:
  904                 loadbalancer = self._lb_repo.get(db_apis.get_session(),
  905                                                  id=amphora.load_balancer_id)
  906             lb_amp_count = None
  907             if loadbalancer:
  908                 if loadbalancer.topology == constants.TOPOLOGY_ACTIVE_STANDBY:
  909                     lb_amp_count = 2
  910                 elif loadbalancer.topology == constants.TOPOLOGY_SINGLE:
  911                     lb_amp_count = 1
  912 
  913             az_metadata = {}
  914             flavor_dict = {}
  915             lb_id = None
  916             vip_dict = {}
  917             server_group_id = None
  918             if loadbalancer:
  919                 lb_id = loadbalancer.id
  920                 # Even if the LB doesn't have a flavor, create one and
  921                 # pass through the topology.
  922                 if loadbalancer.flavor_id:
  923                     flavor_dict = self._flavor_repo.get_flavor_metadata_dict(
  924                         db_apis.get_session(), loadbalancer.flavor_id)
  925                     flavor_dict[constants.LOADBALANCER_TOPOLOGY] = (
  926                         loadbalancer.topology)
  927                 else:
  928                     flavor_dict = {constants.LOADBALANCER_TOPOLOGY:
  929                                    loadbalancer.topology}
  930                 if loadbalancer.availability_zone:
  931                     az_metadata = (
  932                         self._az_repo.get_availability_zone_metadata_dict(
  933                             db_apis.get_session(),
  934                             loadbalancer.availability_zone))
  935                 vip_dict = loadbalancer.vip.to_dict()
  936                 server_group_id = loadbalancer.server_group_id
  937             provider_lb_dict = (provider_utils.
  938                                 db_loadbalancer_to_provider_loadbalancer)(
  939                 loadbalancer).to_dict() if loadbalancer else loadbalancer
  940 
  941             stored_params = {constants.AVAILABILITY_ZONE: az_metadata,
  942                              constants.BUILD_TYPE_PRIORITY:
  943                                  constants.LB_CREATE_FAILOVER_PRIORITY,
  944                              constants.FLAVOR: flavor_dict,
  945                              constants.LOADBALANCER: provider_lb_dict,
  946                              constants.SERVER_GROUP_ID: server_group_id,
  947                              constants.LOADBALANCER_ID: lb_id,
  948                              constants.VIP: vip_dict,
  949                              constants.AMPHORA_ID: amphora_id}
  950 
  951             self.run_flow(
  952                 flow_utils.get_failover_amphora_flow,
  953                 amphora.to_dict(), lb_amp_count,
  954                 store=stored_params, wait=True)
  955 
  956             LOG.info("Successfully completed the failover for an amphora: %s",
  957                      {"id": amphora_id,
  958                       "load_balancer_id": lb_id,
  959                       "lb_network_ip": amphora.lb_network_ip,
  960                       "compute_id": amphora.compute_id,
  961                       "role": amphora.role})
  962 
  963         except Exception as e:
  964             with excutils.save_and_reraise_exception(reraise=reraise):
  965                 LOG.exception("Amphora %s failover exception: %s",
  966                               amphora_id, str(e))
  967                 self._amphora_repo.update(db_apis.get_session(),
  968                                           amphora_id, status=constants.ERROR)
  969                 if amphora and amphora.load_balancer_id:
  970                     self._lb_repo.update(
  971                         db_apis.get_session(), amphora.load_balancer_id,
  972                         provisioning_status=constants.ERROR)
  973 
  974     @staticmethod
  975     def _get_amphorae_for_failover(load_balancer):
  976         """Returns an ordered list of amphora to failover.
  977 
  978         :param load_balancer: The load balancer being failed over.
  979         :returns: An ordered list of amphora to failover,
  980                   first amp to failover is last in the list
  981         :raises octavia.common.exceptions.InvalidTopology: LB has an unknown
  982                                                            topology.
  983         """
  984         if load_balancer.topology == constants.TOPOLOGY_SINGLE:
  985             # In SINGLE topology, amp failover order does not matter
  986             return [a.to_dict() for a in load_balancer.amphorae
  987                     if a.status != constants.DELETED]
  988 
  989         if load_balancer.topology == constants.TOPOLOGY_ACTIVE_STANDBY:
  990             # In Active/Standby we should preference the standby amp
  991             # for failover first in case the Active is still able to pass
  992             # traffic.
  993             # Note: The active amp can switch at any time and in less than a
  994             #       second, so this is "best effort".
  995             amphora_driver = utils.get_amphora_driver()
  996             timeout_dict = {
  997                 constants.CONN_MAX_RETRIES:
  998                     CONF.haproxy_amphora.failover_connection_max_retries,
  999                 constants.CONN_RETRY_INTERVAL:
 1000                     CONF.haproxy_amphora.failover_connection_retry_interval}
 1001             amps = []
 1002             selected_amp = None
 1003             for amp in load_balancer.amphorae:
 1004                 if amp.status == constants.DELETED:
 1005                     continue
 1006                 if selected_amp is None:
 1007                     try:
 1008                         if amphora_driver.get_interface_from_ip(
 1009                                 amp, load_balancer.vip.ip_address,
 1010                                 timeout_dict):
 1011                             # This is a potential ACTIVE, add it to the list
 1012                             amps.append(amp.to_dict())
 1013                         else:
 1014                             # This one doesn't have the VIP IP, so start
 1015                             # failovers here.
 1016                             selected_amp = amp
 1017                             LOG.debug("Selected amphora %s as the initial "
 1018                                       "failover amphora.", amp.id)
 1019                     except Exception:
 1020                         # This amphora is broken, so start failovers here.
 1021                         selected_amp = amp
 1022                 else:
 1023                     # We have already found a STANDBY, so add the rest to the
 1024                     # list without querying them.
 1025                     amps.append(amp.to_dict())
 1026             # Put the selected amphora at the end of the list so it is
 1027             # first to failover.
 1028             if selected_amp:
 1029                 amps.append(selected_amp.to_dict())
 1030             return amps
 1031 
 1032         LOG.error('Unknown load balancer topology found: %s, aborting '
 1033                   'failover.', load_balancer.topology)
 1034         raise exceptions.InvalidTopology(topology=load_balancer.topology)
 1035 
 1036     def failover_loadbalancer(self, load_balancer_id):
 1037         """Perform failover operations for a load balancer.
 1038 
 1039         Note: This expects the load balancer to already be in
 1040         provisioning_status=PENDING_UPDATE state.
 1041 
 1042         :param load_balancer_id: ID for load balancer to failover
 1043         :returns: None
 1044         :raises octavia.commom.exceptions.NotFound: The load balancer was not
 1045                                                     found.
 1046         """
 1047         try:
 1048             lb = self._lb_repo.get(db_apis.get_session(),
 1049                                    id=load_balancer_id)
 1050             if lb is None:
 1051                 raise exceptions.NotFound(resource=constants.LOADBALANCER,
 1052                                           id=load_balancer_id)
 1053 
 1054             # Get the ordered list of amphorae to failover for this LB.
 1055             amps = self._get_amphorae_for_failover(lb)
 1056 
 1057             if lb.topology == constants.TOPOLOGY_SINGLE:
 1058                 if len(amps) != 1:
 1059                     LOG.warning('%d amphorae found on load balancer %s where '
 1060                                 'one should exist. Repairing.', len(amps),
 1061                                 load_balancer_id)
 1062             elif lb.topology == constants.TOPOLOGY_ACTIVE_STANDBY:
 1063 
 1064                 if len(amps) != 2:
 1065                     LOG.warning('%d amphorae found on load balancer %s where '
 1066                                 'two should exist. Repairing.', len(amps),
 1067                                 load_balancer_id)
 1068             else:
 1069                 LOG.error('Unknown load balancer topology found: %s, aborting '
 1070                           'failover!', lb.topology)
 1071                 raise exceptions.InvalidTopology(topology=lb.topology)
 1072 
 1073             # We must provide a topology in the flavor definition
 1074             # here for the amphora to be created with the correct
 1075             # configuration.
 1076             if lb.flavor_id:
 1077                 flavor = self._flavor_repo.get_flavor_metadata_dict(
 1078                     db_apis.get_session(), lb.flavor_id)
 1079                 flavor[constants.LOADBALANCER_TOPOLOGY] = lb.topology
 1080             else:
 1081                 flavor = {constants.LOADBALANCER_TOPOLOGY: lb.topology}
 1082 
 1083             provider_lb_dict = (
 1084                 provider_utils.db_loadbalancer_to_provider_loadbalancer(
 1085                     lb).to_dict() if lb else lb)
 1086 
 1087             provider_lb_dict[constants.FLAVOR] = flavor
 1088 
 1089             stored_params = {constants.LOADBALANCER: provider_lb_dict,
 1090                              constants.BUILD_TYPE_PRIORITY:
 1091                                  constants.LB_CREATE_FAILOVER_PRIORITY,
 1092                              constants.SERVER_GROUP_ID: lb.server_group_id,
 1093                              constants.LOADBALANCER_ID: lb.id,
 1094                              constants.FLAVOR: flavor}
 1095 
 1096             if lb.availability_zone:
 1097                 stored_params[constants.AVAILABILITY_ZONE] = (
 1098                     self._az_repo.get_availability_zone_metadata_dict(
 1099                         db_apis.get_session(), lb.availability_zone))
 1100             else:
 1101                 stored_params[constants.AVAILABILITY_ZONE] = {}
 1102 
 1103             self.run_flow(
 1104                 flow_utils.get_failover_LB_flow, amps, provider_lb_dict,
 1105                 store=stored_params, wait=True)
 1106 
 1107             LOG.info('Failover of load balancer %s completed successfully.',
 1108                      lb.id)
 1109 
 1110         except Exception as e:
 1111             with excutils.save_and_reraise_exception(reraise=False):
 1112                 LOG.exception("LB %(lbid)s failover exception: %(exc)s",
 1113                               {'lbid': load_balancer_id, 'exc': str(e)})
 1114                 self._lb_repo.update(
 1115                     db_apis.get_session(), load_balancer_id,
 1116                     provisioning_status=constants.ERROR)
 1117 
 1118     def amphora_cert_rotation(self, amphora_id):
 1119         """Perform cert rotation for an amphora.
 1120 
 1121         :param amphora_id: ID for amphora to rotate
 1122         :returns: None
 1123         :raises AmphoraNotFound: The referenced amphora was not found
 1124         """
 1125 
 1126         amp = self._amphora_repo.get(db_apis.get_session(),
 1127                                      id=amphora_id)
 1128         LOG.info("Start amphora cert rotation, amphora's id is: %s",
 1129                  amphora_id)
 1130 
 1131         store = {constants.AMPHORA: amp.to_dict(),
 1132                  constants.AMPHORA_ID: amphora_id}
 1133 
 1134         self.run_flow(
 1135             flow_utils.cert_rotate_amphora_flow,
 1136             store=store)
 1137         LOG.info("Finished amphora cert rotation, amphora's id was: %s",
 1138                  amphora_id)
 1139 
 1140     def update_amphora_agent_config(self, amphora_id):
 1141         """Update the amphora agent configuration.
 1142 
 1143         Note: This will update the amphora agent configuration file and
 1144               update the running configuration for mutatable configuration
 1145               items.
 1146 
 1147         :param amphora_id: ID of the amphora to update.
 1148         :returns: None
 1149         """
 1150         LOG.info("Start amphora agent configuration update, amphora's id "
 1151                  "is: %s", amphora_id)
 1152         amp = self._amphora_repo.get(db_apis.get_session(), id=amphora_id)
 1153         lb = self._amphora_repo.get_lb_for_amphora(db_apis.get_session(),
 1154                                                    amphora_id)
 1155         flavor = {}
 1156         if lb.flavor_id:
 1157             flavor = self._flavor_repo.get_flavor_metadata_dict(
 1158                 db_apis.get_session(), lb.flavor_id)
 1159 
 1160         store = {constants.AMPHORA: amp.to_dict(),
 1161                  constants.FLAVOR: flavor}
 1162 
 1163         self.run_flow(
 1164             flow_utils.update_amphora_config_flow,
 1165             store=store)
 1166         LOG.info("Finished amphora agent configuration update, amphora's id "
 1167                  "was: %s", amphora_id)