"Fossies" - the Fresh Open Source Software Archive

Member "neutron-14.0.3/neutron/agent/l3/agent.py" (22 Oct 2019, 42190 Bytes) of package /linux/misc/openstack/neutron-14.0.3.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "agent.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 14.0.2_vs_14.0.3.

    1 # Copyright 2012 VMware, Inc.  All rights reserved.
    2 #
    3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 #    not use this file except in compliance with the License. You may obtain
    5 #    a copy of the License at
    6 #
    7 #         http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 #    Unless required by applicable law or agreed to in writing, software
   10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 #    License for the specific language governing permissions and limitations
   13 #    under the License.
   14 #
   15 
   16 import eventlet
   17 import netaddr
   18 from neutron_lib.agent import constants as agent_consts
   19 from neutron_lib.agent import topics
   20 from neutron_lib.callbacks import events
   21 from neutron_lib.callbacks import registry
   22 from neutron_lib.callbacks import resources
   23 from neutron_lib import constants as lib_const
   24 from neutron_lib import context as n_context
   25 from neutron_lib.exceptions import l3 as l3_exc
   26 from neutron_lib import rpc as n_rpc
   27 from oslo_concurrency import lockutils
   28 from oslo_config import cfg
   29 from oslo_context import context as common_context
   30 from oslo_log import log as logging
   31 import oslo_messaging
   32 from oslo_serialization import jsonutils
   33 from oslo_service import loopingcall
   34 from oslo_service import periodic_task
   35 from oslo_utils import excutils
   36 from oslo_utils import timeutils
   37 from osprofiler import profiler
   38 
   39 from neutron.agent.common import resource_processing_queue as queue
   40 from neutron.agent.common import utils as common_utils
   41 from neutron.agent.l3 import dvr
   42 from neutron.agent.l3 import dvr_edge_ha_router
   43 from neutron.agent.l3 import dvr_edge_router as dvr_router
   44 from neutron.agent.l3 import dvr_local_router
   45 from neutron.agent.l3 import ha
   46 from neutron.agent.l3 import ha_router
   47 from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api
   48 from neutron.agent.l3 import l3_agent_extensions_manager as l3_ext_manager
   49 from neutron.agent.l3 import legacy_router
   50 from neutron.agent.l3 import namespace_manager
   51 from neutron.agent.linux import external_process
   52 from neutron.agent.linux import pd
   53 from neutron.agent.linux import utils as linux_utils
   54 from neutron.agent.metadata import driver as metadata_driver
   55 from neutron.agent import rpc as agent_rpc
   56 from neutron.common import constants as l3_constants
   57 from neutron.common import ipv6_utils
   58 from neutron.common import utils
   59 from neutron import manager
   60 
   61 LOG = logging.getLogger(__name__)
   62 
   63 # Number of routers to fetch from server at a time on resync.
   64 # Needed to reduce load on server side and to speed up resync on agent side.
   65 SYNC_ROUTERS_MAX_CHUNK_SIZE = 256
   66 SYNC_ROUTERS_MIN_CHUNK_SIZE = 32
   67 
   68 # Priorities - lower value is higher priority
   69 PRIORITY_RELATED_ROUTER = 0
   70 PRIORITY_RPC = 1
   71 PRIORITY_SYNC_ROUTERS_TASK = 2
   72 PRIORITY_PD_UPDATE = 3
   73 
   74 # Actions
   75 DELETE_ROUTER = 1
   76 DELETE_RELATED_ROUTER = 2
   77 ADD_UPDATE_ROUTER = 3
   78 ADD_UPDATE_RELATED_ROUTER = 4
   79 PD_UPDATE = 5
   80 
   81 RELATED_ACTION_MAP = {DELETE_ROUTER: DELETE_RELATED_ROUTER,
   82                       ADD_UPDATE_ROUTER: ADD_UPDATE_RELATED_ROUTER}
   83 
   84 ROUTER_PROCESS_GREENLET_MAX = 32
   85 ROUTER_PROCESS_GREENLET_MIN = 8
   86 
   87 
   88 def log_verbose_exc(message, router_payload):
   89     LOG.exception(message)
   90     LOG.debug("Payload:\n%s",
   91               utils.DelayedStringRenderer(jsonutils.dumps,
   92                                           router_payload, indent=5))
   93 
   94 
   95 class L3PluginApi(object):
   96     """Agent side of the l3 agent RPC API.
   97 
   98     API version history:
   99         1.0 - Initial version.
  100         1.1 - Floating IP operational status updates
  101         1.2 - DVR support: new L3 plugin methods added.
  102               - get_ports_by_subnet
  103               - get_agent_gateway_port
  104               Needed by the agent when operating in DVR/DVR_SNAT mode
  105         1.3 - Get the list of activated services
  106         1.4 - Added L3 HA update_router_state. This method was reworked in
  107               to update_ha_routers_states
  108         1.5 - Added update_ha_routers_states
  109         1.6 - Added process_prefix_update
  110         1.7 - DVR support: new L3 plugin methods added.
  111               - delete_agent_gateway_port
  112         1.8 - Added address scope information
  113         1.9 - Added get_router_ids
  114         1.10 Added update_all_ha_network_port_statuses
  115         1.11 Added get_host_ha_router_count
  116     """
  117 
  118     def __init__(self, topic, host):
  119         self.host = host
  120         target = oslo_messaging.Target(topic=topic, version='1.0')
  121         self.client = n_rpc.get_client(target)
  122 
  123     def get_routers(self, context, router_ids=None):
  124         """Make a remote process call to retrieve the sync data for routers."""
  125         cctxt = self.client.prepare()
  126         return cctxt.call(context, 'sync_routers', host=self.host,
  127                           router_ids=router_ids)
  128 
  129     def update_all_ha_network_port_statuses(self, context):
  130         """Make a remote process call to update HA network port status."""
  131         cctxt = self.client.prepare(version='1.10')
  132         return cctxt.call(context, 'update_all_ha_network_port_statuses',
  133                           host=self.host)
  134 
  135     def get_router_ids(self, context):
  136         """Make a remote process call to retrieve scheduled routers ids."""
  137         cctxt = self.client.prepare(version='1.9')
  138         return cctxt.call(context, 'get_router_ids', host=self.host)
  139 
  140     def get_external_network_id(self, context):
  141         """Make a remote process call to retrieve the external network id.
  142 
  143         @raise oslo_messaging.RemoteError: with TooManyExternalNetworks as
  144                                            exc_type if there are more than one
  145                                            external network
  146         """
  147         cctxt = self.client.prepare()
  148         return cctxt.call(context, 'get_external_network_id', host=self.host)
  149 
  150     def update_floatingip_statuses(self, context, router_id, fip_statuses):
  151         """Call the plugin update floating IPs's operational status."""
  152         cctxt = self.client.prepare(version='1.1')
  153         return cctxt.call(context, 'update_floatingip_statuses',
  154                           router_id=router_id, fip_statuses=fip_statuses)
  155 
  156     def get_ports_by_subnet(self, context, subnet_id):
  157         """Retrieve ports by subnet id."""
  158         cctxt = self.client.prepare(version='1.2')
  159         return cctxt.call(context, 'get_ports_by_subnet', host=self.host,
  160                           subnet_id=subnet_id)
  161 
  162     def get_agent_gateway_port(self, context, fip_net):
  163         """Get or create an agent_gateway_port."""
  164         cctxt = self.client.prepare(version='1.2')
  165         return cctxt.call(context, 'get_agent_gateway_port',
  166                           network_id=fip_net, host=self.host)
  167 
  168     def get_service_plugin_list(self, context):
  169         """Make a call to get the list of activated services."""
  170         cctxt = self.client.prepare(version='1.3')
  171         return cctxt.call(context, 'get_service_plugin_list')
  172 
  173     def update_ha_routers_states(self, context, states):
  174         """Update HA routers states."""
  175         cctxt = self.client.prepare(version='1.5')
  176         return cctxt.cast(context, 'update_ha_routers_states',
  177                           host=self.host, states=states)
  178 
  179     def process_prefix_update(self, context, prefix_update):
  180         """Process prefix update whenever prefixes get changed."""
  181         cctxt = self.client.prepare(version='1.6')
  182         return cctxt.call(context, 'process_prefix_update',
  183                           subnets=prefix_update)
  184 
  185     def delete_agent_gateway_port(self, context, fip_net):
  186         """Delete Floatingip_agent_gateway_port."""
  187         cctxt = self.client.prepare(version='1.7')
  188         return cctxt.call(context, 'delete_agent_gateway_port',
  189                           host=self.host, network_id=fip_net)
  190 
  191     def get_host_ha_router_count(self, context):
  192         """Make a call to get the count of HA router."""
  193         cctxt = self.client.prepare(version='1.11')
  194         return cctxt.call(context, 'get_host_ha_router_count', host=self.host)
  195 
  196 
  197 @profiler.trace_cls("l3-agent")
  198 class L3NATAgent(ha.AgentMixin,
  199                  dvr.AgentMixin,
  200                  manager.Manager):
  201     """Manager for L3NatAgent
  202 
  203         API version history:
  204         1.0 initial Version
  205         1.1 changed the type of the routers parameter
  206             to the routers_updated method.
  207             It was previously a list of routers in dict format.
  208             It is now a list of router IDs only.
  209             Per rpc versioning rules,  it is backwards compatible.
  210         1.2 - DVR support: new L3 agent methods added.
  211               - add_arp_entry
  212               - del_arp_entry
  213         1.3 - fipnamespace_delete_on_ext_net - to delete fipnamespace
  214               after the external network is removed
  215               Needed by the L3 service when dealing with DVR
  216         1.4 - support network_update to get MTU updates
  217     """
  218     target = oslo_messaging.Target(version='1.4')
  219 
  220     def __init__(self, host, conf=None):
  221         if conf:
  222             self.conf = conf
  223         else:
  224             self.conf = cfg.CONF
  225         self.router_info = {}
  226 
  227         self._check_config_params()
  228 
  229         self.process_monitor = external_process.ProcessMonitor(
  230             config=self.conf,
  231             resource_type='router')
  232 
  233         self.driver = common_utils.load_interface_driver(self.conf)
  234 
  235         self._context = n_context.get_admin_context_without_session()
  236         self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
  237         self.fullsync = True
  238         self.sync_routers_chunk_size = SYNC_ROUTERS_MAX_CHUNK_SIZE
  239 
  240         # Get the HA router count from Neutron Server
  241         # This is the first place where we contact neutron-server on startup
  242         # so retry in case its not ready to respond.
  243         while True:
  244             try:
  245                 self.ha_router_count = int(
  246                     self.plugin_rpc.get_host_ha_router_count(self.context))
  247             except oslo_messaging.MessagingTimeout as e:
  248                 LOG.warning('l3-agent cannot contact neutron server '
  249                             'to retrieve HA router count. '
  250                             'Check connectivity to neutron server. '
  251                             'Retrying... '
  252                             'Detailed message: %(msg)s.', {'msg': e})
  253                 continue
  254             break
  255         LOG.info("Agent HA routers count %s", self.ha_router_count)
  256 
  257         self.init_extension_manager(self.plugin_rpc)
  258 
  259         self.metadata_driver = None
  260         if self.conf.enable_metadata_proxy:
  261             self.metadata_driver = metadata_driver.MetadataDriver(self)
  262 
  263         self.namespaces_manager = namespace_manager.NamespaceManager(
  264             self.conf,
  265             self.driver,
  266             self.metadata_driver)
  267 
  268         # L3 agent router processing green pool
  269         self._pool_size = ROUTER_PROCESS_GREENLET_MIN
  270         self._pool = eventlet.GreenPool(size=self._pool_size)
  271         self._queue = queue.ResourceProcessingQueue()
  272         super(L3NATAgent, self).__init__(host=self.conf.host)
  273 
  274         self.target_ex_net_id = None
  275         self.use_ipv6 = ipv6_utils.is_enabled_and_bind_by_default()
  276 
  277         self.pd = pd.PrefixDelegation(self.context, self.process_monitor,
  278                                       self.driver,
  279                                       self.plugin_rpc.process_prefix_update,
  280                                       self.create_pd_router_update,
  281                                       self.conf)
  282 
  283         # Consume network updates to trigger router resync
  284         consumers = [[topics.NETWORK, topics.UPDATE]]
  285         agent_rpc.create_consumers([self], topics.AGENT, consumers)
  286 
  287         self._check_ha_router_process_status()
  288 
  289     def _check_ha_router_process_status(self):
  290         """Check HA router VRRP process status in network node.
  291 
  292         Check if the HA router HA routers VRRP (keepalived) process count
  293         and state change python monitor process count meet the expected
  294         quantity. If so, l3-agent will not call neutron to set all related
  295         HA port to down state, this can prevent some unexpected VRRP
  296         re-election. If not, a physical host may have down and just
  297         restarted, set HA network port status to DOWN.
  298         """
  299         if (self.conf.agent_mode not in [lib_const.L3_AGENT_MODE_DVR_SNAT,
  300                                          lib_const.L3_AGENT_MODE_LEGACY]):
  301             return
  302 
  303         if self.ha_router_count <= 0:
  304             return
  305 
  306         # HA routers VRRP (keepalived) process count
  307         vrrp_pcount = linux_utils.get_process_count_by_name("keepalived")
  308         LOG.debug("VRRP process count %s.", vrrp_pcount)
  309         # HA routers state change python monitor process count
  310         vrrp_st_pcount = linux_utils.get_process_count_by_name(
  311             "neutron-keepalived-state-change")
  312         LOG.debug("neutron-keepalived-state-change process count %s.",
  313                   vrrp_st_pcount)
  314 
  315         # Due to the process structure design of keepalived and the current
  316         # config of l3-ha router, it will run one main 'keepalived' process
  317         # and a child  'VRRP' process. So in the following check, we divided
  318         # number of processes by 2 to match the ha router count.
  319         if (not (vrrp_pcount / 2 >= self.ha_router_count and
  320                  vrrp_st_pcount >= self.ha_router_count)):
  321             LOG.debug("Call neutron server to set HA port to DOWN state.")
  322             try:
  323                 # We set HA network port status to DOWN to let l2 agent
  324                 # update it to ACTIVE after wiring. This allows us to spawn
  325                 # keepalived only when l2 agent finished wiring the port.
  326                 self.plugin_rpc.update_all_ha_network_port_statuses(
  327                     self.context)
  328             except Exception:
  329                 LOG.exception('update_all_ha_network_port_statuses failed')
  330 
  331     def _check_config_params(self):
  332         """Check items in configuration files.
  333 
  334         Check for required and invalid configuration items.
  335         The actual values are not verified for correctness.
  336         """
  337         if not self.conf.interface_driver:
  338             msg = 'An interface driver must be specified'
  339             LOG.error(msg)
  340             raise SystemExit(1)
  341 
  342         if self.conf.ipv6_gateway:
  343             # ipv6_gateway configured. Check for valid v6 link-local address.
  344             try:
  345                 msg = ("%s used in config as ipv6_gateway is not a valid "
  346                        "IPv6 link-local address.")
  347                 ip_addr = netaddr.IPAddress(self.conf.ipv6_gateway)
  348                 if ip_addr.version != 6 or not ip_addr.is_link_local():
  349                     LOG.error(msg, self.conf.ipv6_gateway)
  350                     raise SystemExit(1)
  351             except netaddr.AddrFormatError:
  352                 LOG.error(msg, self.conf.ipv6_gateway)
  353                 raise SystemExit(1)
  354 
  355     def _fetch_external_net_id(self, force=False):
  356         """Find UUID of single external network for this agent."""
  357         if self.conf.gateway_external_network_id:
  358             return self.conf.gateway_external_network_id
  359 
  360         if not force and self.target_ex_net_id:
  361             return self.target_ex_net_id
  362 
  363         try:
  364             self.target_ex_net_id = self.plugin_rpc.get_external_network_id(
  365                 self.context)
  366             return self.target_ex_net_id
  367         except oslo_messaging.RemoteError as e:
  368             with excutils.save_and_reraise_exception() as ctx:
  369                 if e.exc_type == 'TooManyExternalNetworks':
  370                     # At this point we know gateway_external_network_id is not
  371                     # defined. Since there are more than one external network,
  372                     # we will handle all of them
  373                     ctx.reraise = False
  374 
  375     def _create_router(self, router_id, router):
  376         args = []
  377         kwargs = {
  378             'agent': self,
  379             'router_id': router_id,
  380             'router': router,
  381             'use_ipv6': self.use_ipv6,
  382             'agent_conf': self.conf,
  383             'interface_driver': self.driver,
  384         }
  385 
  386         if router.get('distributed'):
  387             kwargs['host'] = self.host
  388 
  389         if router.get('distributed') and router.get('ha'):
  390             # Case 1: If the router contains information about the HA interface
  391             # and if the requesting agent is a DVR_SNAT agent then go ahead
  392             # and create a HA router.
  393             # Case 2: If the router does not contain information about the HA
  394             # interface this means that this DVR+HA router needs to host only
  395             # the edge side of it, typically because it's landing on a node
  396             # that needs to provision a router namespace because of a DVR
  397             # service port (e.g. DHCP). So go ahead and create a regular DVR
  398             # edge router.
  399             if (self.conf.agent_mode == lib_const.L3_AGENT_MODE_DVR_SNAT and
  400                     router.get(lib_const.HA_INTERFACE_KEY) is not None):
  401                 kwargs['state_change_callback'] = self.enqueue_state_change
  402                 return dvr_edge_ha_router.DvrEdgeHaRouter(*args, **kwargs)
  403 
  404         if router.get('distributed'):
  405             if self.conf.agent_mode == lib_const.L3_AGENT_MODE_DVR_SNAT:
  406                 return dvr_router.DvrEdgeRouter(*args, **kwargs)
  407             else:
  408                 return dvr_local_router.DvrLocalRouter(*args, **kwargs)
  409 
  410         if router.get('ha'):
  411             kwargs['state_change_callback'] = self.enqueue_state_change
  412             return ha_router.HaRouter(*args, **kwargs)
  413 
  414         return legacy_router.LegacyRouter(*args, **kwargs)
  415 
  416     @lockutils.synchronized('resize_greenpool')
  417     def _resize_process_pool(self):
  418         pool_size = max([ROUTER_PROCESS_GREENLET_MIN,
  419                          min([ROUTER_PROCESS_GREENLET_MAX,
  420                               len(self.router_info)])])
  421         if pool_size == self._pool_size:
  422             return
  423         LOG.info("Resizing router processing queue green pool size to: %d",
  424                  pool_size)
  425         self._pool.resize(pool_size)
  426         self._pool_size = pool_size
  427 
  428     def _router_added(self, router_id, router):
  429         ri = self._create_router(router_id, router)
  430         registry.notify(resources.ROUTER, events.BEFORE_CREATE,
  431                         self, router=ri)
  432 
  433         self.router_info[router_id] = ri
  434 
  435         # If initialize() fails, cleanup and retrigger complete sync
  436         try:
  437             ri.initialize(self.process_monitor)
  438         except Exception:
  439             with excutils.save_and_reraise_exception():
  440                 del self.router_info[router_id]
  441                 LOG.exception('Error while initializing router %s',
  442                               router_id)
  443                 self.namespaces_manager.ensure_router_cleanup(router_id)
  444                 try:
  445                     ri.delete()
  446                 except Exception:
  447                     LOG.exception('Error while deleting router %s',
  448                                   router_id)
  449 
  450         self._resize_process_pool()
  451 
  452     def _safe_router_removed(self, router_id):
  453         """Try to delete a router and return True if successful."""
  454         # The l3_ext_manager API expects a router dict, look it up
  455         ri = self.router_info.get(router_id)
  456 
  457         try:
  458             self._router_removed(ri, router_id)
  459             if ri:
  460                 self.l3_ext_manager.delete_router(self.context, ri.router)
  461         except Exception:
  462             LOG.exception('Error while deleting router %s', router_id)
  463             return False
  464 
  465         self._resize_process_pool()
  466         return True
  467 
  468     def _router_removed(self, ri, router_id):
  469         """Delete the router and stop the auxiliary processes
  470 
  471         This stops the auxiliary processes (keepalived, keepvalived-state-
  472         change, radvd, etc) and deletes the router ports and the namespace.
  473         The "router_info" cache is updated too at the beginning of the process,
  474         to avoid any other concurrent process to handle the router being
  475         deleted. If an exception is raised, the "router_info" cache is
  476         restored.
  477         """
  478         if ri is None:
  479             LOG.warning("Info for router %s was not found. "
  480                         "Performing router cleanup", router_id)
  481             self.namespaces_manager.ensure_router_cleanup(router_id)
  482             return
  483 
  484         registry.publish(resources.ROUTER, events.BEFORE_DELETE, self,
  485                          payload=events.DBEventPayload(
  486                              self.context, states=(ri,),
  487                              resource_id=router_id))
  488 
  489         del self.router_info[router_id]
  490         try:
  491             ri.delete()
  492         except Exception:
  493             with excutils.save_and_reraise_exception():
  494                 self.router_info[router_id] = ri
  495 
  496         registry.notify(resources.ROUTER, events.AFTER_DELETE, self, router=ri)
  497 
  498     def init_extension_manager(self, connection):
  499         l3_ext_manager.register_opts(self.conf)
  500         self.agent_api = l3_ext_api.L3AgentExtensionAPI(self.router_info)
  501         self.l3_ext_manager = (
  502             l3_ext_manager.L3AgentExtensionsManager(self.conf))
  503         self.l3_ext_manager.initialize(
  504             connection, lib_const.L3_AGENT_MODE,
  505             self.agent_api)
  506 
  507     def router_deleted(self, context, router_id):
  508         """Deal with router deletion RPC message."""
  509         LOG.debug('Got router deleted notification for %s', router_id)
  510         update = queue.ResourceUpdate(router_id,
  511                                       PRIORITY_RPC,
  512                                       action=DELETE_ROUTER)
  513         self._queue.add(update)
  514 
  515     def routers_updated(self, context, routers):
  516         """Deal with routers modification and creation RPC message."""
  517         LOG.debug('Got routers updated notification :%s', routers)
  518         if routers:
  519             # This is needed for backward compatibility
  520             if isinstance(routers[0], dict):
  521                 routers = [router['id'] for router in routers]
  522             for id in routers:
  523                 update = queue.ResourceUpdate(
  524                     id, PRIORITY_RPC, action=ADD_UPDATE_ROUTER)
  525                 self._queue.add(update)
  526 
  527     def router_removed_from_agent(self, context, payload):
  528         LOG.debug('Got router removed from agent :%r', payload)
  529         router_id = payload['router_id']
  530         update = queue.ResourceUpdate(router_id,
  531                                       PRIORITY_RPC,
  532                                       action=DELETE_ROUTER)
  533         self._queue.add(update)
  534 
  535     def router_added_to_agent(self, context, payload):
  536         LOG.debug('Got router added to agent :%r', payload)
  537         self.routers_updated(context, payload)
  538 
  539     def network_update(self, context, **kwargs):
  540         network_id = kwargs['network']['id']
  541         for ri in self.router_info.values():
  542             ports = list(ri.internal_ports)
  543             if ri.ex_gw_port:
  544                 ports.append(ri.ex_gw_port)
  545             port_belongs = lambda p: p['network_id'] == network_id
  546             if any(port_belongs(p) for p in ports):
  547                 update = queue.ResourceUpdate(
  548                     ri.router_id, PRIORITY_SYNC_ROUTERS_TASK)
  549                 self._resync_router(update)
  550 
  551     def _process_router_if_compatible(self, router):
  552         # Either ex_net_id or handle_internal_only_routers must be set
  553         ex_net_id = (router['external_gateway_info'] or {}).get('network_id')
  554         if not ex_net_id and not self.conf.handle_internal_only_routers:
  555             raise l3_exc.RouterNotCompatibleWithAgent(router_id=router['id'])
  556 
  557         # If target_ex_net_id and ex_net_id are set they must be equal
  558         target_ex_net_id = self._fetch_external_net_id()
  559         if (target_ex_net_id and ex_net_id and ex_net_id != target_ex_net_id):
  560             # Double check that our single external_net_id has not changed
  561             # by forcing a check by RPC.
  562             if ex_net_id != self._fetch_external_net_id(force=True):
  563                 raise l3_exc.RouterNotCompatibleWithAgent(
  564                     router_id=router['id'])
  565 
  566         if router['id'] not in self.router_info:
  567             self._process_added_router(router)
  568         else:
  569             self._process_updated_router(router)
  570 
  571     def _process_added_router(self, router):
  572         self._router_added(router['id'], router)
  573         ri = self.router_info[router['id']]
  574         ri.router = router
  575         ri.process()
  576         registry.notify(resources.ROUTER, events.AFTER_CREATE, self, router=ri)
  577         self.l3_ext_manager.add_router(self.context, router)
  578 
  579     def _process_updated_router(self, router):
  580         ri = self.router_info[router['id']]
  581         is_dvr_snat_agent = (self.conf.agent_mode ==
  582                              lib_const.L3_AGENT_MODE_DVR_SNAT)
  583         is_dvr_only_agent = (self.conf.agent_mode in
  584                              [lib_const.L3_AGENT_MODE_DVR,
  585                               lib_const.L3_AGENT_MODE_DVR_NO_EXTERNAL])
  586         old_router_ha_interface = ri.router.get(lib_const.HA_INTERFACE_KEY)
  587         current_router_ha_interface = router.get(lib_const.HA_INTERFACE_KEY)
  588         ha_interface_change = ((old_router_ha_interface is None and
  589                                 current_router_ha_interface is not None) or
  590                                (old_router_ha_interface is not None and
  591                                 current_router_ha_interface is None))
  592         is_dvr_ha_router = router.get('distributed') and router.get('ha')
  593 
  594         if is_dvr_snat_agent and is_dvr_ha_router and ha_interface_change:
  595             LOG.debug("Removing HA router %s, since it is not bound to "
  596                       "the current agent, and recreating regular DVR router "
  597                       "based on service port requirements.",
  598                       router['id'])
  599             if self._safe_router_removed(router['id']):
  600                 self._process_added_router(router)
  601         else:
  602             is_ha_router = getattr(ri, 'ha_state', False)
  603             # For HA routers check that DB state matches actual state
  604             if router.get('ha') and not is_dvr_only_agent and is_ha_router:
  605                 self.check_ha_state_for_router(
  606                     router['id'], router.get(l3_constants.HA_ROUTER_STATE_KEY))
  607             ri.router = router
  608             registry.notify(resources.ROUTER, events.BEFORE_UPDATE,
  609                             self, router=ri)
  610             ri.process()
  611             registry.notify(
  612                 resources.ROUTER, events.AFTER_UPDATE, self, router=ri)
  613             self.l3_ext_manager.update_router(self.context, router)
  614 
  615     def _resync_router(self, router_update,
  616                        priority=PRIORITY_SYNC_ROUTERS_TASK):
  617         # Don't keep trying to resync if it's failing
  618         if router_update.hit_retry_limit():
  619             LOG.warning("Hit retry limit with router update for %s, action %s",
  620                         router_update.id, router_update.action)
  621             if router_update.action != DELETE_ROUTER:
  622                 LOG.debug("Deleting router %s", router_update.id)
  623                 self._safe_router_removed(router_update.id)
  624             return
  625         router_update.timestamp = timeutils.utcnow()
  626         router_update.priority = priority
  627         router_update.resource = None  # Force the agent to resync the router
  628         self._queue.add(router_update)
  629 
  630     def _process_router_update(self):
  631         for rp, update in self._queue.each_update_to_next_resource():
  632             LOG.info("Starting router update for %s, action %s, priority %s, "
  633                      "update_id %s. Wait time elapsed: %.3f",
  634                      update.id, update.action, update.priority,
  635                      update.update_id,
  636                      update.time_elapsed_since_create)
  637             if update.action == PD_UPDATE:
  638                 self.pd.process_prefix_update()
  639                 LOG.info("Finished a router update for %s IPv6 PD, "
  640                          "update_id. %s. Time elapsed: %.3f",
  641                          update.id, update.update_id,
  642                          update.time_elapsed_since_start)
  643                 continue
  644 
  645             routers = [update.resource] if update.resource else []
  646 
  647             not_delete_no_routers = (update.action != DELETE_ROUTER and
  648                                      not routers)
  649             related_action = update.action in (DELETE_RELATED_ROUTER,
  650                                                ADD_UPDATE_RELATED_ROUTER)
  651             if not_delete_no_routers or related_action:
  652                 try:
  653                     update.timestamp = timeutils.utcnow()
  654                     routers = self.plugin_rpc.get_routers(self.context,
  655                                                           [update.id])
  656                 except Exception:
  657                     msg = "Failed to fetch router information for '%s'"
  658                     LOG.exception(msg, update.id)
  659                     self._resync_router(update)
  660                     continue
  661 
  662                 # For a related action, verify the router is still hosted here,
  663                 # since it could have just been deleted and we don't want to
  664                 # add it back.
  665                 if related_action:
  666                     routers = [r for r in routers if r['id'] == update.id]
  667 
  668             if not routers:
  669                 removed = self._safe_router_removed(update.id)
  670                 if not removed:
  671                     self._resync_router(update)
  672                 else:
  673                     # need to update timestamp of removed router in case
  674                     # there are older events for the same router in the
  675                     # processing queue (like events from fullsync) in order to
  676                     # prevent deleted router re-creation
  677                     rp.fetched_and_processed(update.timestamp)
  678                 LOG.info("Finished a router update for %s, update_id %s. "
  679                          "Time elapsed: %.3f",
  680                          update.id, update.update_id,
  681                          update.time_elapsed_since_start)
  682                 continue
  683 
  684             if not self._process_routers_if_compatible(routers, update):
  685                 self._resync_router(update)
  686                 continue
  687 
  688             rp.fetched_and_processed(update.timestamp)
  689             LOG.info("Finished a router update for %s, update_id %s. "
  690                      "Time elapsed: %.3f",
  691                      update.id, update.update_id,
  692                      update.time_elapsed_since_start)
  693 
  694     def _process_routers_if_compatible(self, routers, update):
  695         process_result = True
  696         for router in routers:
  697             if router['id'] != update.id:
  698                 # Don't do the work here, instead create a new update and
  699                 # enqueue it, since there could be another thread working
  700                 # on it already and we don't want to race.
  701                 new_action = RELATED_ACTION_MAP.get(
  702                     update.action, ADD_UPDATE_RELATED_ROUTER)
  703                 new_update = queue.ResourceUpdate(
  704                     router['id'],
  705                     priority=PRIORITY_RELATED_ROUTER,
  706                     action=new_action)
  707                 self._queue.add(new_update)
  708                 LOG.debug('Queued a router update for %(router_id)s '
  709                           '(related router %(related_router_id)s). '
  710                           'Original event action %(action)s, '
  711                           'priority %(priority)s. '
  712                           'New event action %(new_action)s, '
  713                           'priority %(new_priority)s',
  714                           {'router_id': router['id'],
  715                            'related_router_id': update.id,
  716                            'action': update.action,
  717                            'priority': update.priority,
  718                            'new_action': new_update.action,
  719                            'new_priority': new_update.priority})
  720                 continue
  721 
  722             try:
  723                 self._process_router_if_compatible(router)
  724             except l3_exc.RouterNotCompatibleWithAgent as e:
  725                 log_verbose_exc(e.msg, router)
  726                 # Was the router previously handled by this agent?
  727                 if router['id'] in self.router_info:
  728                     LOG.error("Removing incompatible router '%s'",
  729                               router['id'])
  730                     self._safe_router_removed(router['id'])
  731             except Exception:
  732                 log_verbose_exc(
  733                     "Failed to process compatible router: %s" % update.id,
  734                     router)
  735                 process_result = False
  736         return process_result
  737 
  738     def _process_routers_loop(self):
  739         LOG.debug("Starting _process_routers_loop")
  740         while True:
  741             self._pool.spawn_n(self._process_router_update)
  742 
  743     # NOTE(kevinbenton): this is set to 1 second because the actual interval
  744     # is controlled by a FixedIntervalLoopingCall in neutron/service.py that
  745     # is responsible for task execution.
  746     @periodic_task.periodic_task(spacing=1, run_immediately=True)
  747     def periodic_sync_routers_task(self, context):
  748         if not self.fullsync:
  749             return
  750         LOG.debug("Starting fullsync periodic_sync_routers_task")
  751 
  752         # self.fullsync is True at this point. If an exception -- caught or
  753         # uncaught -- prevents setting it to False below then the next call
  754         # to periodic_sync_routers_task will re-enter this code and try again.
  755 
  756         # Context manager self.namespaces_manager captures a picture of
  757         # namespaces *before* fetch_and_sync_all_routers fetches the full list
  758         # of routers from the database.  This is important to correctly
  759         # identify stale ones.
  760 
  761         try:
  762             with self.namespaces_manager as ns_manager:
  763                 self.fetch_and_sync_all_routers(context, ns_manager)
  764         except l3_exc.AbortSyncRouters:
  765             self.fullsync = True
  766 
  767     def fetch_and_sync_all_routers(self, context, ns_manager):
  768         prev_router_ids = set(self.router_info)
  769         curr_router_ids = set()
  770         timestamp = timeutils.utcnow()
  771         router_ids = []
  772         chunk = []
  773         is_snat_agent = (self.conf.agent_mode ==
  774                          lib_const.L3_AGENT_MODE_DVR_SNAT)
  775         try:
  776             router_ids = self.plugin_rpc.get_router_ids(context)
  777             # fetch routers by chunks to reduce the load on server and to
  778             # start router processing earlier
  779             for i in range(0, len(router_ids), self.sync_routers_chunk_size):
  780                 chunk = router_ids[i:i + self.sync_routers_chunk_size]
  781                 routers = self.plugin_rpc.get_routers(context, chunk)
  782                 LOG.debug('Processing :%r', routers)
  783                 for r in routers:
  784                     curr_router_ids.add(r['id'])
  785                     ns_manager.keep_router(r['id'])
  786                     if r.get('distributed'):
  787                         # need to keep fip namespaces as well
  788                         ext_net_id = (r['external_gateway_info'] or {}).get(
  789                             'network_id')
  790                         if ext_net_id:
  791                             ns_manager.keep_ext_net(ext_net_id)
  792                         elif is_snat_agent and not r.get('ha'):
  793                             ns_manager.ensure_snat_cleanup(r['id'])
  794                     update = queue.ResourceUpdate(
  795                         r['id'],
  796                         PRIORITY_SYNC_ROUTERS_TASK,
  797                         resource=r,
  798                         action=ADD_UPDATE_ROUTER,
  799                         timestamp=timestamp)
  800                     self._queue.add(update)
  801         except oslo_messaging.MessagingTimeout:
  802             if self.sync_routers_chunk_size > SYNC_ROUTERS_MIN_CHUNK_SIZE:
  803                 self.sync_routers_chunk_size = max(
  804                     self.sync_routers_chunk_size / 2,
  805                     SYNC_ROUTERS_MIN_CHUNK_SIZE)
  806                 LOG.error('Server failed to return info for routers in '
  807                           'required time, decreasing chunk size to: %s',
  808                           self.sync_routers_chunk_size)
  809             else:
  810                 LOG.error('Server failed to return info for routers in '
  811                           'required time even with min chunk size: %s. '
  812                           'It might be under very high load or '
  813                           'just inoperable',
  814                           self.sync_routers_chunk_size)
  815             raise
  816         except oslo_messaging.MessagingException:
  817             failed_routers = chunk or router_ids
  818             LOG.exception("Failed synchronizing routers '%s' "
  819                           "due to RPC error", failed_routers)
  820             raise l3_exc.AbortSyncRouters()
  821 
  822         self.fullsync = False
  823         LOG.debug("periodic_sync_routers_task successfully completed")
  824         # adjust chunk size after successful sync
  825         if self.sync_routers_chunk_size < SYNC_ROUTERS_MAX_CHUNK_SIZE:
  826             self.sync_routers_chunk_size = min(
  827                 self.sync_routers_chunk_size + SYNC_ROUTERS_MIN_CHUNK_SIZE,
  828                 SYNC_ROUTERS_MAX_CHUNK_SIZE)
  829 
  830         # Delete routers that have disappeared since the last sync
  831         for router_id in prev_router_ids - curr_router_ids:
  832             ns_manager.keep_router(router_id)
  833             update = queue.ResourceUpdate(router_id,
  834                                           PRIORITY_SYNC_ROUTERS_TASK,
  835                                           timestamp=timestamp,
  836                                           action=DELETE_ROUTER)
  837             self._queue.add(update)
  838 
  839     @property
  840     def context(self):
  841         # generate a new request-id on each call to make server side tracking
  842         # of RPC calls easier.
  843         self._context.request_id = common_context.generate_request_id()
  844         return self._context
  845 
  846     def after_start(self):
  847         # Note: the FWaaS' vArmourL3NATAgent is a subclass of L3NATAgent. It
  848         # calls this method here. So Removing this after_start() would break
  849         # vArmourL3NATAgent. We need to find out whether vArmourL3NATAgent
  850         # can have L3NATAgentWithStateReport as its base class instead of
  851         # L3NATAgent.
  852         eventlet.spawn_n(self._process_routers_loop)
  853         LOG.info("L3 agent started")
  854 
  855     def create_pd_router_update(self):
  856         router_id = None
  857         update = queue.ResourceUpdate(router_id,
  858                                       PRIORITY_PD_UPDATE,
  859                                       timestamp=timeutils.utcnow(),
  860                                       action=PD_UPDATE)
  861         self._queue.add(update)
  862 
  863 
  864 class L3NATAgentWithStateReport(L3NATAgent):
  865 
  866     def __init__(self, host, conf=None):
  867         super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf)
  868         self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
  869         self.failed_report_state = False
  870         self.agent_state = {
  871             'binary': 'neutron-l3-agent',
  872             'host': host,
  873             'availability_zone': self.conf.AGENT.availability_zone,
  874             'topic': topics.L3_AGENT,
  875             'configurations': {
  876                 'agent_mode': self.conf.agent_mode,
  877                 'handle_internal_only_routers':
  878                 self.conf.handle_internal_only_routers,
  879                 'gateway_external_network_id':
  880                 self.conf.gateway_external_network_id,
  881                 'interface_driver': self.conf.interface_driver,
  882                 'log_agent_heartbeats': self.conf.AGENT.log_agent_heartbeats},
  883             'start_flag': True,
  884             'agent_type': lib_const.AGENT_TYPE_L3}
  885         report_interval = self.conf.AGENT.report_interval
  886         if report_interval:
  887             self.heartbeat = loopingcall.FixedIntervalLoopingCall(
  888                 self._report_state)
  889             self.heartbeat.start(interval=report_interval)
  890 
  891     def _report_state(self):
  892         num_ex_gw_ports = 0
  893         num_interfaces = 0
  894         num_floating_ips = 0
  895         router_infos = self.router_info.values()
  896         num_routers = len(router_infos)
  897         for ri in router_infos:
  898             ex_gw_port = ri.get_ex_gw_port()
  899             if ex_gw_port:
  900                 num_ex_gw_ports += 1
  901             num_interfaces += len(ri.router.get(lib_const.INTERFACE_KEY,
  902                                                 []))
  903             num_floating_ips += len(ri.router.get(lib_const.FLOATINGIP_KEY,
  904                                                   []))
  905         configurations = self.agent_state['configurations']
  906         configurations['routers'] = num_routers
  907         configurations['ex_gw_ports'] = num_ex_gw_ports
  908         configurations['interfaces'] = num_interfaces
  909         configurations['floating_ips'] = num_floating_ips
  910         try:
  911             agent_status = self.state_rpc.report_state(self.context,
  912                                                        self.agent_state,
  913                                                        True)
  914             if agent_status == agent_consts.AGENT_REVIVED:
  915                 LOG.info('Agent has just been revived. '
  916                          'Doing a full sync.')
  917                 self.fullsync = True
  918             self.agent_state.pop('start_flag', None)
  919         except AttributeError:
  920             # This means the server does not support report_state
  921             LOG.warning("Neutron server does not support state report. "
  922                         "State report for this agent will be disabled.")
  923             self.heartbeat.stop()
  924             return
  925         except Exception:
  926             self.failed_report_state = True
  927             LOG.exception("Failed reporting state!")
  928             return
  929         if self.failed_report_state:
  930             self.failed_report_state = False
  931             LOG.info("Successfully reported state after a previous failure.")
  932 
  933     def after_start(self):
  934         eventlet.spawn_n(self._process_routers_loop)
  935         LOG.info("L3 agent started")
  936         # Do the report state before we do the first full sync.
  937         self._report_state()
  938 
  939         self.pd.after_start()
  940 
  941     def agent_updated(self, context, payload):
  942         """Handle the agent_updated notification event."""
  943         self.fullsync = True
  944         LOG.info("agent_updated by server side %s!", payload)