"Fossies" - the Fresh Open Source Software Archive

Member "neutron-14.0.3/neutron/scheduler/l3_agent_scheduler.py" (22 Oct 2019, 18288 Bytes) of package /linux/misc/openstack/neutron-14.0.3.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "l3_agent_scheduler.py" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 14.0.2_vs_15.0.0.

    1 # Copyright (c) 2013 OpenStack Foundation.
    2 # All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 
   16 import abc
   17 import collections
   18 import functools
   19 import itertools
   20 import random
   21 
   22 from neutron_lib.api.definitions import availability_zone as az_def
   23 from neutron_lib import constants as lib_const
   24 from neutron_lib.db import api as lib_db_api
   25 from neutron_lib.exceptions import l3 as l3_exc
   26 from oslo_config import cfg
   27 from oslo_db import exception as db_exc
   28 from oslo_log import log as logging
   29 import six
   30 
   31 from neutron.common import utils
   32 from neutron.conf.db import l3_hamode_db
   33 from neutron.db.models import l3agent as rb_model
   34 from neutron.objects import l3agent as rb_obj
   35 
   36 
   37 LOG = logging.getLogger(__name__)
   38 cfg.CONF.register_opts(l3_hamode_db.L3_HA_OPTS)
   39 
   40 
   41 @six.add_metaclass(abc.ABCMeta)
   42 class L3Scheduler(object):
   43 
   44     def __init__(self):
   45         self.max_ha_agents = cfg.CONF.max_l3_agents_per_router
   46 
   47     def schedule(self, plugin, context, router_id, candidates=None):
   48         """Schedule the router to an active L3 agent.
   49 
   50         Schedule the router only if it is not already scheduled.
   51         """
   52         return self._schedule_router(
   53             plugin, context, router_id, candidates=candidates)
   54 
   55     def _router_has_binding(self, context, router_id, l3_agent_id):
   56         router_binding_model = rb_model.RouterL3AgentBinding
   57 
   58         query = context.session.query(router_binding_model.router_id)
   59         query = query.filter(router_binding_model.router_id == router_id,
   60                              router_binding_model.l3_agent_id == l3_agent_id)
   61 
   62         return query.count() > 0
   63 
   64     def _get_routers_can_schedule(self, plugin, context, routers, l3_agent):
   65         """Get the subset of routers that can be scheduled on the L3 agent."""
   66         ids_to_discard = set()
   67         for router in routers:
   68             # check if the l3 agent is compatible with the router
   69             candidates = plugin.get_l3_agent_candidates(
   70                 context, router, [l3_agent])
   71             if not candidates:
   72                 ids_to_discard.add(router['id'])
   73 
   74         return [r for r in routers if r['id'] not in ids_to_discard]
   75 
   76     def auto_schedule_routers(self, plugin, context, host):
   77         """Schedule under-scheduled routers to L3 Agents.
   78 
   79         An under-scheduled router is a router that is either completely
   80         un-scheduled (scheduled to 0 agents), or an HA router that is
   81         under-scheduled (scheduled to less than max_l3_agents configuration
   82         option. The function finds all the under-scheduled routers and
   83         schedules them.
   84 
   85         :param host: if unspecified, under-scheduled routers are scheduled to
   86                      all agents (not necessarily from the requesting host). If
   87                      specified, under-scheduled routers are scheduled only to
   88                      the agent on 'host'.
   89         """
   90         l3_agent = plugin.get_enabled_agent_on_host(
   91             context, lib_const.AGENT_TYPE_L3, host)
   92         if not l3_agent:
   93             return
   94 
   95         underscheduled_routers = self._get_underscheduled_routers(
   96             plugin, context)
   97         target_routers = self._get_routers_can_schedule(
   98             plugin, context, underscheduled_routers, l3_agent)
   99 
  100         for router in target_routers:
  101             self.schedule(plugin, context, router['id'], candidates=[l3_agent])
  102 
  103     def _get_underscheduled_routers(self, plugin, context):
  104         underscheduled_routers = []
  105         max_agents_for_ha = plugin.get_number_of_agents_for_scheduling(context)
  106 
  107         for router, count in plugin.get_routers_l3_agents_count(context):
  108             if (count < 1 or
  109                     router.get('ha', False) and count < max_agents_for_ha):
  110                 # Either the router was un-scheduled (scheduled to 0 agents),
  111                 # or it's an HA router and it was under-scheduled (scheduled to
  112                 # less than max_agents_for_ha). Either way, it should be added
  113                 # to the list of routers we want to handle.
  114                 underscheduled_routers.append(router)
  115         return underscheduled_routers
  116 
  117     def _get_candidates(self, plugin, context, sync_router):
  118         """Return L3 agents where a router could be scheduled."""
  119         is_ha = sync_router.get('ha', False)
  120         with context.session.begin(subtransactions=True):
  121             # allow one router is hosted by just
  122             # one enabled l3 agent hosting since active is just a
  123             # timing problem. Non-active l3 agent can return to
  124             # active any time
  125             current_l3_agents = plugin.get_l3_agents_hosting_routers(
  126                 context, [sync_router['id']], admin_state_up=True)
  127             if current_l3_agents and not is_ha:
  128                 LOG.debug('Router %(router_id)s has already been hosted '
  129                           'by L3 agent %(agent_id)s',
  130                           {'router_id': sync_router['id'],
  131                            'agent_id': current_l3_agents[0]['id']})
  132                 return []
  133 
  134             active_l3_agents = plugin.get_l3_agents(context, active=True)
  135             if not active_l3_agents:
  136                 LOG.warning('No active L3 agents')
  137                 return []
  138             candidates = plugin.get_l3_agent_candidates(context,
  139                                                         sync_router,
  140                                                         active_l3_agents)
  141             if not candidates:
  142                 LOG.warning('No L3 agents can host the router %s',
  143                             sync_router['id'])
  144 
  145             return candidates
  146 
  147     def _bind_routers(self, plugin, context, routers, l3_agent):
  148         for router in routers:
  149             if router.get('ha'):
  150                 if not self._router_has_binding(context, router['id'],
  151                                                 l3_agent.id):
  152                     self.create_ha_port_and_bind(
  153                         plugin, context, router['id'],
  154                         router['tenant_id'], l3_agent)
  155             else:
  156                 self.bind_router(plugin, context, router['id'], l3_agent.id)
  157 
  158     @lib_db_api.retry_db_errors
  159     def bind_router(self, plugin, context, router_id, agent_id,
  160                     is_manual_scheduling=False, is_ha=False):
  161         """Bind the router to the l3 agent which has been chosen.
  162 
  163         The function tries to create a RouterL3AgentBinding object and add it
  164         to the database. It returns the binding that was created or None if it
  165         failed to create it due to some conflict.
  166 
  167         In the HA router case, when creating a RouterL3AgentBinding (with some
  168         binding_index) fails because some other RouterL3AgentBinding was
  169         concurrently created using the same binding_index, then the function
  170         will retry to create an entry with a new binding_index. This creation
  171         will be retried up to db_api.MAX_RETRIES times.
  172         If, still in the HA router case, the creation failed because the
  173         router has already been bound to the l3 agent in question or has been
  174         removed (by a concurrent operation), then no further attempts will be
  175         made and the function will return None.
  176 
  177         Note that for non-HA routers, the function will always perform exactly
  178         one try, regardless of the error preventing the addition of a new
  179         RouterL3AgentBinding object to the database.
  180         """
  181 
  182         if rb_obj.RouterL3AgentBinding.objects_exist(
  183                 context, router_id=router_id, l3_agent_id=agent_id):
  184             LOG.debug('Router %(router_id)s has already been scheduled '
  185                       'to L3 agent %(agent_id)s.',
  186                       {'router_id': router_id, 'agent_id': agent_id})
  187             return
  188 
  189         if not is_ha:
  190             binding_index = rb_model.LOWEST_BINDING_INDEX
  191             if rb_obj.RouterL3AgentBinding.objects_exist(
  192                     context, router_id=router_id, binding_index=binding_index):
  193                 LOG.debug('Non-HA router %s has already been scheduled',
  194                           router_id)
  195                 return
  196         else:
  197             binding_index = plugin.get_vacant_binding_index(
  198                 context, router_id, is_manual_scheduling)
  199             if binding_index < rb_model.LOWEST_BINDING_INDEX:
  200                 LOG.debug('Unable to find a vacant binding_index for '
  201                           'router %(router_id)s and agent %(agent_id)s',
  202                           {'router_id': router_id,
  203                            'agent_id': agent_id})
  204                 return
  205 
  206         try:
  207             binding = rb_obj.RouterL3AgentBinding(
  208                 context, l3_agent_id=agent_id,
  209                 router_id=router_id, binding_index=binding_index)
  210             binding.create()
  211             LOG.debug('Router %(router_id)s is scheduled to L3 agent '
  212                       '%(agent_id)s with binding_index %(binding_index)d',
  213                       {'router_id': router_id,
  214                        'agent_id': agent_id,
  215                        'binding_index': binding_index})
  216             return binding
  217         except db_exc.DBReferenceError:
  218             LOG.debug('Router %s has already been removed '
  219                       'by concurrent operation', router_id)
  220 
  221     def _schedule_router(self, plugin, context, router_id,
  222                          candidates=None):
  223         if not plugin.router_supports_scheduling(context, router_id):
  224             return
  225         sync_router = plugin.get_router(context, router_id)
  226         candidates = candidates or self._get_candidates(
  227             plugin, context, sync_router)
  228         if not candidates:
  229             return
  230         elif sync_router.get('ha', False):
  231             chosen_agents = self._bind_ha_router(plugin, context,
  232                                                  router_id,
  233                                                  sync_router.get('tenant_id'),
  234                                                  candidates)
  235             if not chosen_agents:
  236                 return
  237             chosen_agent = chosen_agents[-1]
  238         else:
  239             chosen_agent = self._choose_router_agent(
  240                 plugin, context, candidates)
  241             self.bind_router(plugin, context, router_id, chosen_agent.id)
  242         return chosen_agent
  243 
  244     @abc.abstractmethod
  245     def _choose_router_agent(self, plugin, context, candidates):
  246         """Choose an agent from candidates based on a specific policy."""
  247         pass
  248 
  249     @abc.abstractmethod
  250     def _choose_router_agents_for_ha(self, plugin, context, candidates):
  251         """Choose agents from candidates based on a specific policy."""
  252         pass
  253 
  254     def _get_num_of_agents_for_ha(self, candidates_count):
  255         return (min(self.max_ha_agents, candidates_count) if self.max_ha_agents
  256                 else candidates_count)
  257 
  258     def _add_port_from_net_and_ensure_vr_id(self, plugin, ctxt, router_db,
  259                                             tenant_id, ha_net):
  260         plugin._ensure_vr_id(ctxt, router_db, ha_net)
  261         return plugin.add_ha_port(ctxt, router_db.id, ha_net.network_id,
  262                                   tenant_id)
  263 
  264     def create_ha_port_and_bind(self, plugin, context, router_id,
  265                                 tenant_id, agent, is_manual_scheduling=False):
  266         """Creates and binds a new HA port for this agent."""
  267         ctxt = context.elevated()
  268         router_db = plugin._get_router(ctxt, router_id)
  269         creator = functools.partial(self._add_port_from_net_and_ensure_vr_id,
  270                                     plugin, ctxt, router_db, tenant_id)
  271         dep_getter = functools.partial(plugin.get_ha_network, ctxt, tenant_id)
  272         dep_creator = functools.partial(plugin._create_ha_network,
  273                                         ctxt, tenant_id)
  274         dep_deleter = functools.partial(plugin._delete_ha_network, ctxt)
  275         dep_id_attr = 'network_id'
  276 
  277         # This might fail in case of concurrent calls, which is good for us
  278         # as we can skip the rest of this function.
  279         binding = self.bind_router(
  280             plugin, context, router_id, agent['id'],
  281             is_manual_scheduling=is_manual_scheduling, is_ha=True)
  282         if not binding:
  283             return
  284 
  285         try:
  286             port_binding = utils.create_object_with_dependency(
  287                 creator, dep_getter, dep_creator,
  288                 dep_id_attr, dep_deleter)[0]
  289             with lib_db_api.autonested_transaction(context.session):
  290                 port_binding.l3_agent_id = agent['id']
  291         except db_exc.DBDuplicateEntry:
  292             LOG.debug("Router %(router)s already scheduled for agent "
  293                       "%(agent)s", {'router': router_id,
  294                                     'agent': agent['id']})
  295             port_id = port_binding.port_id
  296             # Below call will also delete entry from L3HARouterAgentPortBinding
  297             # and RouterPort tables
  298             plugin._core_plugin.delete_port(context, port_id,
  299                                             l3_port_check=False)
  300         except l3_exc.RouterNotFound:
  301             LOG.debug('Router %s has already been removed '
  302                       'by concurrent operation', router_id)
  303             # we try to clear the HA network here in case the port we created
  304             # blocked the concurrent router delete operation from getting rid
  305             # of the HA network
  306             ha_net = plugin.get_ha_network(ctxt, tenant_id)
  307             if ha_net:
  308                 plugin.safe_delete_ha_network(ctxt, ha_net, tenant_id)
  309 
  310     def _filter_scheduled_agents(self, plugin, context, router_id, candidates):
  311         hosting = plugin.get_l3_agents_hosting_routers(context, [router_id])
  312         # convert to comparable types
  313         hosting_list = [tuple(host) for host in hosting]
  314         return list(set(candidates) - set(hosting_list))
  315 
  316     def _bind_ha_router(self, plugin, context, router_id,
  317                         tenant_id, candidates):
  318         """Bind a HA router to agents based on a specific policy."""
  319 
  320         candidates = self._filter_scheduled_agents(plugin, context, router_id,
  321                                                    candidates)
  322 
  323         chosen_agents = self._choose_router_agents_for_ha(
  324             plugin, context, candidates)
  325 
  326         for agent in chosen_agents:
  327             self.create_ha_port_and_bind(plugin, context, router_id,
  328                                          tenant_id, agent)
  329 
  330         return chosen_agents
  331 
  332 
  333 class ChanceScheduler(L3Scheduler):
  334     """Randomly allocate an L3 agent for a router."""
  335 
  336     def _choose_router_agent(self, plugin, context, candidates):
  337         return random.choice(candidates)
  338 
  339     def _choose_router_agents_for_ha(self, plugin, context, candidates):
  340         num_agents = self._get_num_of_agents_for_ha(len(candidates))
  341         return random.sample(candidates, num_agents)
  342 
  343 
  344 class LeastRoutersScheduler(L3Scheduler):
  345     """Allocate to an L3 agent with the least number of routers bound."""
  346 
  347     def _choose_router_agent(self, plugin, context, candidates):
  348         candidate_ids = [candidate['id'] for candidate in candidates]
  349         chosen_agent = plugin.get_l3_agent_with_min_routers(
  350             context, candidate_ids)
  351         return chosen_agent
  352 
  353     def _choose_router_agents_for_ha(self, plugin, context, candidates):
  354         num_agents = self._get_num_of_agents_for_ha(len(candidates))
  355         ordered_agents = plugin.get_l3_agents_ordered_by_num_routers(
  356             context, [candidate['id'] for candidate in candidates])
  357         return ordered_agents[:num_agents]
  358 
  359 
  360 class AZLeastRoutersScheduler(LeastRoutersScheduler):
  361     """Availability zone aware scheduler.
  362 
  363        If a router is ha router, allocate L3 agents distributed AZs
  364        according to router's az_hints.
  365     """
  366     def _get_az_hints(self, router):
  367         return (router.get(az_def.AZ_HINTS) or
  368                 cfg.CONF.default_availability_zones)
  369 
  370     def _get_routers_can_schedule(self, plugin, context, routers, l3_agent):
  371         """Overwrite L3Scheduler's method to filter by availability zone."""
  372         target_routers = []
  373         for r in routers:
  374             az_hints = self._get_az_hints(r)
  375             if not az_hints or l3_agent['availability_zone'] in az_hints:
  376                 target_routers.append(r)
  377 
  378         if not target_routers:
  379             return []
  380 
  381         return super(AZLeastRoutersScheduler, self)._get_routers_can_schedule(
  382             plugin, context, target_routers, l3_agent)
  383 
  384     def _get_candidates(self, plugin, context, sync_router):
  385         """Overwrite L3Scheduler's method to filter by availability zone."""
  386         all_candidates = (
  387             super(AZLeastRoutersScheduler, self)._get_candidates(
  388                 plugin, context, sync_router))
  389 
  390         candidates = []
  391         az_hints = self._get_az_hints(sync_router)
  392         for agent in all_candidates:
  393             if not az_hints or agent['availability_zone'] in az_hints:
  394                 candidates.append(agent)
  395 
  396         return candidates
  397 
  398     def _choose_router_agents_for_ha(self, plugin, context, candidates):
  399         ordered_agents = plugin.get_l3_agents_ordered_by_num_routers(
  400             context, [candidate['id'] for candidate in candidates])
  401         num_agents = self._get_num_of_agents_for_ha(len(ordered_agents))
  402 
  403         # Order is kept in each az
  404         group_by_az = collections.defaultdict(list)
  405         for agent in ordered_agents:
  406             az = agent['availability_zone']
  407             group_by_az[az].append(agent)
  408 
  409         selected_agents = []
  410         for az, agents in itertools.cycle(group_by_az.items()):
  411             if not agents:
  412                 continue
  413             selected_agents.append(agents.pop(0))
  414             if len(selected_agents) >= num_agents:
  415                 break
  416         return selected_agents