"Fossies" - the Fresh Open Source Software Archive

Member "neutron-14.0.3/neutron/agent/common/ovs_lib.py" (22 Oct 2019, 50654 Bytes) of package /linux/misc/openstack/neutron-14.0.3.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "ovs_lib.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 14.0.2_vs_14.0.3.

    1 # Copyright 2011 VMware, Inc.
    2 # All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 
   16 import collections
   17 import itertools
   18 import operator
   19 import random
   20 import time
   21 import uuid
   22 
   23 from neutron_lib import constants as p_const
   24 from neutron_lib import exceptions
   25 from neutron_lib.services.qos import constants as qos_constants
   26 from oslo_config import cfg
   27 from oslo_log import log as logging
   28 from oslo_utils import uuidutils
   29 from ovsdbapp.backend.ovs_idl import idlutils
   30 import six
   31 import tenacity
   32 
   33 from neutron._i18n import _
   34 from neutron.agent.common import ip_lib
   35 from neutron.agent.common import utils
   36 from neutron.agent.ovsdb import impl_idl
   37 from neutron.common import constants as common_constants
   38 from neutron.common import utils as common_utils
   39 from neutron.conf.agent import ovs_conf
   40 from neutron.plugins.ml2.drivers.openvswitch.agent.common \
   41     import constants
   42 
   43 UINT64_BITMASK = (1 << 64) - 1
   44 
   45 # Special return value for an invalid OVS ofport
   46 INVALID_OFPORT = -1
   47 UNASSIGNED_OFPORT = []
   48 
   49 # OVS bridge fail modes
   50 FAILMODE_SECURE = 'secure'
   51 FAILMODE_STANDALONE = 'standalone'
   52 
   53 # special values for cookies
   54 COOKIE_ANY = object()
   55 
   56 ovs_conf.register_ovs_agent_opts()
   57 
   58 LOG = logging.getLogger(__name__)
   59 
   60 OVS_DEFAULT_CAPS = {
   61     'datapath_types': [],
   62     'iface_types': [],
   63 }
   64 
   65 # It's default queue, all packets not tagged with 'set_queue' will go through
   66 # this one
   67 QOS_DEFAULT_QUEUE = 0
   68 
   69 _SENTINEL = object()
   70 
   71 CTRL_RATE_LIMIT_MIN = 100
   72 CTRL_BURST_LIMIT_MIN = 25
   73 
   74 
   75 def _ovsdb_result_pending(result):
   76     """Return True if ovsdb indicates the result is still pending."""
   77     # ovsdb can return '[]' for an ofport that has not yet been assigned
   78     return result == []
   79 
   80 
   81 def _ovsdb_retry(fn):
   82     """Decorator for retrying when OVS has yet to assign an ofport.
   83 
   84     The instance's ovsdb_timeout is used as the max waiting time. This relies
   85     on the fact that instance methods receive self as the first argument.
   86     """
   87     @six.wraps(fn)
   88     def wrapped(*args, **kwargs):
   89         self = args[0]
   90         new_fn = tenacity.retry(
   91             reraise=True,
   92             retry=tenacity.retry_if_result(_ovsdb_result_pending),
   93             wait=tenacity.wait_exponential(multiplier=0.01, max=1),
   94             stop=tenacity.stop_after_delay(
   95                 self.ovsdb_timeout))(fn)
   96         return new_fn(*args, **kwargs)
   97     return wrapped
   98 
   99 
  100 class VifPort(object):
  101     def __init__(self, port_name, ofport, vif_id, vif_mac, switch):
  102         self.port_name = port_name
  103         self.ofport = ofport
  104         self.vif_id = vif_id
  105         self.vif_mac = vif_mac
  106         self.switch = switch
  107 
  108     def __str__(self):
  109         return ("iface-id=%s, vif_mac=%s, port_name=%s, ofport=%s, "
  110                 "bridge_name=%s") % (
  111                     self.vif_id, self.vif_mac,
  112                     self.port_name, self.ofport,
  113                     self.switch.br_name)
  114 
  115 
  116 class BaseOVS(object):
  117 
  118     def __init__(self):
  119         self.ovsdb_timeout = cfg.CONF.OVS.ovsdb_timeout
  120         self.ovsdb = impl_idl.api_factory()
  121 
  122     def add_manager(self, connection_uri, timeout=_SENTINEL):
  123         """Have ovsdb-server listen for manager connections
  124 
  125         :param connection_uri: Manager target string
  126         :param timeout: The Manager probe_interval timeout value
  127                         (defaults to ovsdb_timeout)
  128         """
  129         if timeout is _SENTINEL:
  130             timeout = cfg.CONF.OVS.ovsdb_timeout
  131         with self.ovsdb.transaction() as txn:
  132             txn.add(self.ovsdb.add_manager(connection_uri))
  133             if timeout:
  134                 txn.add(
  135                     self.ovsdb.db_set('Manager', connection_uri,
  136                                       ('inactivity_probe', timeout * 1000)))
  137 
  138     def get_manager(self):
  139         return self.ovsdb.get_manager().execute()
  140 
  141     def remove_manager(self, connection_uri):
  142         self.ovsdb.remove_manager(connection_uri).execute()
  143 
  144     def add_bridge(self, bridge_name,
  145                    datapath_type=constants.OVS_DATAPATH_SYSTEM):
  146         br = OVSBridge(bridge_name, datapath_type=datapath_type)
  147         br.create()
  148         return br
  149 
  150     def delete_bridge(self, bridge_name):
  151         self.ovsdb.del_br(bridge_name).execute()
  152 
  153     def bridge_exists(self, bridge_name):
  154         return self.ovsdb.br_exists(bridge_name).execute()
  155 
  156     def port_exists(self, port_name):
  157         cmd = self.ovsdb.db_get('Port', port_name, 'name')
  158         return bool(cmd.execute(check_error=False, log_errors=False))
  159 
  160     def get_bridge_for_iface(self, iface):
  161         return self.ovsdb.iface_to_br(iface).execute()
  162 
  163     def get_bridges(self):
  164         return self.ovsdb.list_br().execute(check_error=True)
  165 
  166     def get_bridge_external_bridge_id(self, bridge, check_error=False,
  167                                       log_errors=True):
  168         return self.ovsdb.br_get_external_id(bridge, 'bridge-id').execute(
  169             check_error=check_error, log_errors=log_errors)
  170 
  171     def set_db_attribute(self, table_name, record, column, value,
  172                          check_error=False, log_errors=True):
  173         self.ovsdb.db_set(table_name, record, (column, value)).execute(
  174             check_error=check_error, log_errors=log_errors)
  175 
  176     def clear_db_attribute(self, table_name, record, column):
  177         self.ovsdb.db_clear(table_name, record, column).execute()
  178 
  179     def db_get_val(self, table, record, column, check_error=False,
  180                    log_errors=True):
  181         return self.ovsdb.db_get(table, record, column).execute(
  182             check_error=check_error, log_errors=log_errors)
  183 
  184     @property
  185     def config(self):
  186         """A dict containing the only row from the root Open_vSwitch table
  187 
  188         This row contains several columns describing the Open vSwitch install
  189         and the system on which it is installed. Useful keys include:
  190             datapath_types: a list of supported datapath types
  191             iface_types: a list of supported interface types
  192             ovs_version: the OVS version
  193         """
  194         return self.ovsdb.db_list("Open_vSwitch").execute()[0]
  195 
  196     @property
  197     def capabilities(self):
  198         _cfg = self.config
  199         return {k: _cfg.get(k, OVS_DEFAULT_CAPS[k]) for k in OVS_DEFAULT_CAPS}
  200 
  201 
  202 # Map from version string to on-the-wire protocol version encoding:
  203 OF_PROTOCOL_TO_VERSION = {
  204     constants.OPENFLOW10: 1,
  205     constants.OPENFLOW11: 2,
  206     constants.OPENFLOW12: 3,
  207     constants.OPENFLOW13: 4,
  208     constants.OPENFLOW14: 5,
  209     constants.OPENFLOW15: 6,
  210 }
  211 
  212 
  213 def version_from_protocol(protocol):
  214     if protocol not in OF_PROTOCOL_TO_VERSION:
  215         raise Exception(_("unknown OVS protocol string, cannot compare: "
  216                           "%(protocol)s, (known: %(known)s)") %
  217                         {'protocol': protocol,
  218                          'known': list(OF_PROTOCOL_TO_VERSION)})
  219     return OF_PROTOCOL_TO_VERSION[protocol]
  220 
  221 
  222 class OVSBridge(BaseOVS):
  223     def __init__(self, br_name, datapath_type=constants.OVS_DATAPATH_SYSTEM):
  224         super(OVSBridge, self).__init__()
  225         self.br_name = br_name
  226         self.datapath_type = datapath_type
  227         self._default_cookie = generate_random_cookie()
  228         self._highest_protocol_needed = constants.OPENFLOW10
  229         self._min_bw_qos_id = uuidutils.generate_uuid()
  230 
  231     @property
  232     def default_cookie(self):
  233         return self._default_cookie
  234 
  235     def set_agent_uuid_stamp(self, val):
  236         self._default_cookie = val
  237 
  238     def set_controller(self, controllers):
  239         self.ovsdb.set_controller(self.br_name,
  240                                   controllers).execute(check_error=True)
  241 
  242     def del_controller(self):
  243         self.ovsdb.del_controller(self.br_name).execute(check_error=True)
  244 
  245     def get_controller(self):
  246         return self.ovsdb.get_controller(self.br_name).execute(
  247             check_error=True)
  248 
  249     def _set_bridge_fail_mode(self, mode):
  250         self.ovsdb.set_fail_mode(self.br_name, mode).execute(check_error=True)
  251 
  252     def set_secure_mode(self):
  253         self._set_bridge_fail_mode(FAILMODE_SECURE)
  254 
  255     def set_standalone_mode(self):
  256         self._set_bridge_fail_mode(FAILMODE_STANDALONE)
  257 
  258     def add_protocols(self, *protocols):
  259         self.ovsdb.db_add('Bridge', self.br_name,
  260                           'protocols', *protocols).execute(check_error=True)
  261 
  262     def use_at_least_protocol(self, protocol):
  263         """Calls to ovs-ofctl will use a protocol version >= 'protocol'"""
  264         self.add_protocols(protocol)
  265         self._highest_protocol_needed = max(self._highest_protocol_needed,
  266                                             protocol,
  267                                             key=version_from_protocol)
  268 
  269     def create(self, secure_mode=False):
  270         other_config = {
  271             'mac-table-size': str(cfg.CONF.OVS.bridge_mac_table_size)}
  272         with self.ovsdb.transaction() as txn:
  273             txn.add(
  274                 self.ovsdb.add_br(self.br_name,
  275                                   datapath_type=self.datapath_type))
  276             # the ovs-ofctl commands below in run_ofctl use OF10, so we
  277             # need to ensure that this version is enabled ; we could reuse
  278             # add_protocols, but doing ovsdb.db_add avoids doing two
  279             # transactions
  280             txn.add(
  281                 self.ovsdb.db_add('Bridge', self.br_name,
  282                                   'protocols', self._highest_protocol_needed))
  283             txn.add(
  284                 self.ovsdb.db_set('Bridge', self.br_name,
  285                                   ('other_config', other_config)))
  286             if secure_mode:
  287                 txn.add(self.ovsdb.set_fail_mode(self.br_name,
  288                                                  FAILMODE_SECURE))
  289 
  290     def destroy(self):
  291         self.delete_bridge(self.br_name)
  292 
  293     def add_port(self, port_name, *interface_attr_tuples):
  294         with self.ovsdb.transaction() as txn:
  295             txn.add(self.ovsdb.add_port(self.br_name, port_name))
  296             if interface_attr_tuples:
  297                 txn.add(self.ovsdb.db_set('Interface', port_name,
  298                                           *interface_attr_tuples))
  299         return self.get_port_ofport(port_name)
  300 
  301     def replace_port(self, port_name, *interface_attr_tuples):
  302         """Replace existing port or create it, and configure port interface."""
  303 
  304         # NOTE(xiaohhui): If del_port is inside the transaction, there will
  305         # only be one command for replace_port. This will cause the new port
  306         # not be found by system, which will lead to Bug #1519926.
  307         self.ovsdb.del_port(port_name).execute()
  308         with self.ovsdb.transaction() as txn:
  309             txn.add(self.ovsdb.add_port(self.br_name, port_name,
  310                                         may_exist=False))
  311             # NOTE(mangelajo): Port is added to dead vlan (4095) by default
  312             # until it's handled by the neutron-openvswitch-agent. Otherwise it
  313             # becomes a trunk port on br-int (receiving traffic for all vlans),
  314             # and also triggers issues on ovs-vswitchd related to the
  315             # datapath flow revalidator thread, see lp#1767422
  316             txn.add(self.ovsdb.db_set(
  317                     'Port', port_name, ('tag', constants.DEAD_VLAN_TAG)))
  318 
  319             # TODO(mangelajo): We could accept attr tuples for the Port too
  320             # but, that could potentially break usage of this function in
  321             # stable branches (where we need to backport).
  322             # https://review.openstack.org/#/c/564825/4/neutron/agent/common/
  323             # ovs_lib.py@289
  324             if interface_attr_tuples:
  325                 txn.add(self.ovsdb.db_set('Interface', port_name,
  326                                           *interface_attr_tuples))
  327 
  328     def delete_port(self, port_name):
  329         self.ovsdb.del_port(port_name, self.br_name).execute()
  330 
  331     def run_ofctl(self, cmd, args, process_input=None):
  332         full_args = ["ovs-ofctl", cmd,
  333                      "-O", self._highest_protocol_needed,
  334                      self.br_name] + args
  335         # TODO(kevinbenton): This error handling is really brittle and only
  336         # detects one specific type of failure. The callers of this need to
  337         # be refactored to expect errors so we can re-raise and they can
  338         # take appropriate action based on the type of error.
  339         for i in range(1, 11):
  340             try:
  341                 return utils.execute(full_args, run_as_root=True,
  342                                      process_input=process_input)
  343             except Exception as e:
  344                 if "failed to connect to socket" in str(e):
  345                     LOG.debug("Failed to connect to OVS. Retrying "
  346                               "in 1 second. Attempt: %s/10", i)
  347                     time.sleep(1)
  348                     continue
  349                 LOG.error("Unable to execute %(cmd)s. Exception: "
  350                           "%(exception)s",
  351                           {'cmd': full_args, 'exception': e})
  352                 break
  353 
  354     def count_flows(self):
  355         flow_list = self.run_ofctl("dump-flows", []).split("\n")[1:]
  356         return len(flow_list) - 1
  357 
  358     def remove_all_flows(self):
  359         self.run_ofctl("del-flows", [])
  360 
  361     @_ovsdb_retry
  362     def _get_port_val(self, port_name, port_val):
  363         return self.db_get_val("Interface", port_name, port_val)
  364 
  365     def get_port_ofport(self, port_name):
  366         """Get the port's assigned ofport, retrying if not yet assigned."""
  367         ofport = INVALID_OFPORT
  368         try:
  369             ofport = self._get_port_val(port_name, "ofport")
  370         except tenacity.RetryError:
  371             LOG.exception("Timed out retrieving ofport on port %s.",
  372                           port_name)
  373         return ofport
  374 
  375     def get_port_external_ids(self, port_name):
  376         """Get the port's assigned ofport, retrying if not yet assigned."""
  377         port_external_ids = dict()
  378         try:
  379             port_external_ids = self._get_port_val(port_name, "external_ids")
  380         except tenacity.RetryError:
  381             LOG.exception("Timed out retrieving external_ids on port %s.",
  382                           port_name)
  383         return port_external_ids
  384 
  385     def get_port_mac(self, port_name):
  386         """Get the port's mac address.
  387 
  388         This is especially useful when the port is not a neutron port.
  389         E.g. networking-sfc needs the MAC address of "patch-tun
  390         """
  391         return self.db_get_val("Interface", port_name, "mac_in_use")
  392 
  393     @_ovsdb_retry
  394     def _get_datapath_id(self):
  395         return self.db_get_val('Bridge', self.br_name, 'datapath_id')
  396 
  397     def get_datapath_id(self):
  398         try:
  399             return self._get_datapath_id()
  400         except tenacity.RetryError:
  401             # if ovs fails to find datapath_id then something is likely to be
  402             # broken here
  403             LOG.exception("Timed out retrieving datapath_id on bridge %s.",
  404                           self.br_name)
  405             raise RuntimeError(_('No datapath_id on bridge %s') % self.br_name)
  406 
  407     def do_action_flows(self, action, kwargs_list, use_bundle=False):
  408         # we can't mix strict and non-strict, so we'll use the first kw
  409         # and check against other kw being different
  410         strict = kwargs_list[0].get('strict', False)
  411 
  412         for kw in kwargs_list:
  413             if action is 'del':
  414                 if kw.get('cookie') == COOKIE_ANY:
  415                     # special value COOKIE_ANY was provided, unset
  416                     # cookie to match flows whatever their cookie is
  417                     kw.pop('cookie')
  418                     if kw.get('cookie_mask'):  # non-zero cookie mask
  419                         raise Exception(_("cookie=COOKIE_ANY but cookie_mask "
  420                                           "set to %s") % kw.get('cookie_mask'))
  421                 elif 'cookie' in kw:
  422                     # a cookie was specified, use it
  423                     kw['cookie'] = check_cookie_mask(kw['cookie'])
  424                 else:
  425                     # nothing was specified about cookies, use default
  426                     kw['cookie'] = "%d/-1" % self._default_cookie
  427             else:
  428                 if 'cookie' not in kw:
  429                     kw['cookie'] = self._default_cookie
  430 
  431             if action in ('mod', 'del'):
  432                 if kw.pop('strict', False) != strict:
  433                     msg = ("cannot mix 'strict' and not 'strict' in a batch "
  434                            "call")
  435                     raise exceptions.InvalidInput(error_message=msg)
  436             else:
  437                 if kw.pop('strict', False):
  438                     msg = "cannot use 'strict' with 'add' action"
  439                     raise exceptions.InvalidInput(error_message=msg)
  440 
  441         extra_param = ["--strict"] if strict else []
  442 
  443         if action == 'del' and {} in kwargs_list:
  444             # the 'del' case simplifies itself if kwargs_list has at least
  445             # one item that matches everything
  446             self.run_ofctl('%s-flows' % action, [])
  447         else:
  448             flow_strs = [_build_flow_expr_str(kw, action, strict)
  449                          for kw in kwargs_list]
  450             LOG.debug("Processing %d OpenFlow rules.", len(flow_strs))
  451             if use_bundle:
  452                 extra_param.append('--bundle')
  453 
  454             step = common_constants.AGENT_RES_PROCESSING_STEP
  455             for i in range(0, len(flow_strs), step):
  456                 self.run_ofctl('%s-flows' % action, extra_param + ['-'],
  457                                '\n'.join(flow_strs[i:i + step]))
  458 
  459     def add_flow(self, **kwargs):
  460         self.do_action_flows('add', [kwargs])
  461 
  462     def mod_flow(self, **kwargs):
  463         self.do_action_flows('mod', [kwargs])
  464 
  465     def delete_flows(self, **kwargs):
  466         self.do_action_flows('del', [kwargs])
  467 
  468     def dump_flows_for_table(self, table):
  469         return self.dump_flows_for(table=table)
  470 
  471     def dump_flows_for(self, **kwargs):
  472         retval = None
  473         if "cookie" in kwargs:
  474             kwargs["cookie"] = check_cookie_mask(str(kwargs["cookie"]))
  475         flow_str = ",".join("=".join([key, str(val)])
  476                             for key, val in kwargs.items())
  477 
  478         flows = self.run_ofctl("dump-flows", [flow_str])
  479         if flows:
  480             retval = '\n'.join(item for item in flows.splitlines()
  481                                if is_a_flow_line(item))
  482         return retval
  483 
  484     def dump_all_flows(self):
  485         return [f for f in self.run_ofctl("dump-flows", []).splitlines()
  486                 if is_a_flow_line(f)]
  487 
  488     def deferred(self, **kwargs):
  489         return DeferredOVSBridge(self, **kwargs)
  490 
  491     def add_tunnel_port(self, port_name, remote_ip, local_ip,
  492                         tunnel_type=p_const.TYPE_GRE,
  493                         vxlan_udp_port=p_const.VXLAN_UDP_PORT,
  494                         dont_fragment=True,
  495                         tunnel_csum=False,
  496                         tos=None):
  497         attrs = [('type', tunnel_type)]
  498         # TODO(twilson) This is an OrderedDict solely to make a test happy
  499         options = collections.OrderedDict()
  500         vxlan_uses_custom_udp_port = (
  501             tunnel_type == p_const.TYPE_VXLAN and
  502             vxlan_udp_port != p_const.VXLAN_UDP_PORT
  503         )
  504         if vxlan_uses_custom_udp_port:
  505             options['dst_port'] = str(vxlan_udp_port)
  506         options['df_default'] = str(dont_fragment).lower()
  507         options['remote_ip'] = remote_ip
  508         options['local_ip'] = local_ip
  509         options['in_key'] = 'flow'
  510         options['out_key'] = 'flow'
  511         options['egress_pkt_mark'] = '0'
  512         if tunnel_csum:
  513             options['csum'] = str(tunnel_csum).lower()
  514         if tos:
  515             options['tos'] = str(tos)
  516         attrs.append(('options', options))
  517 
  518         return self.add_port(port_name, *attrs)
  519 
  520     def add_patch_port(self, local_name, remote_name):
  521         attrs = [('type', 'patch'),
  522                  ('options', {'peer': remote_name})]
  523         return self.add_port(local_name, *attrs)
  524 
  525     def get_iface_name_list(self):
  526         # get the interface name list for this bridge
  527         return self.ovsdb.list_ifaces(self.br_name).execute(check_error=True)
  528 
  529     def get_port_name_list(self):
  530         # get the port name list for this bridge
  531         return self.ovsdb.list_ports(self.br_name).execute(check_error=True)
  532 
  533     def get_port_stats(self, port_name):
  534         return self.db_get_val("Interface", port_name, "statistics")
  535 
  536     def get_ports_attributes(self, table, columns=None, ports=None,
  537                              check_error=True, log_errors=True,
  538                              if_exists=False):
  539         port_names = ports or self.get_port_name_list()
  540         if not port_names:
  541             return []
  542         return (self.ovsdb.db_list(table, port_names, columns=columns,
  543                                    if_exists=if_exists).
  544                 execute(check_error=check_error, log_errors=log_errors))
  545 
  546     # returns a VIF object for each VIF port
  547     def get_vif_ports(self, ofport_filter=None):
  548         edge_ports = []
  549         port_info = self.get_ports_attributes(
  550             'Interface', columns=['name', 'external_ids', 'ofport'],
  551             if_exists=True)
  552         for port in port_info:
  553             name = port['name']
  554             external_ids = port['external_ids']
  555             ofport = port['ofport']
  556             if ofport_filter and ofport in ofport_filter:
  557                 continue
  558             if "iface-id" in external_ids and "attached-mac" in external_ids:
  559                 p = VifPort(name, ofport, external_ids["iface-id"],
  560                             external_ids["attached-mac"], self)
  561                 edge_ports.append(p)
  562 
  563         return edge_ports
  564 
  565     def get_vif_port_to_ofport_map(self):
  566         results = self.get_ports_attributes(
  567             'Interface', columns=['name', 'external_ids', 'ofport'],
  568             if_exists=True)
  569         port_map = {}
  570         for r in results:
  571             # fall back to basic interface name
  572             key = self.portid_from_external_ids(r['external_ids']) or r['name']
  573             try:
  574                 port_map[key] = int(r['ofport'])
  575             except TypeError:
  576                 # port doesn't yet have an ofport entry so we ignore it
  577                 pass
  578         return port_map
  579 
  580     def get_vif_port_set(self):
  581         edge_ports = set()
  582         results = self.get_ports_attributes(
  583             'Interface', columns=['name', 'external_ids', 'ofport'],
  584             if_exists=True)
  585         for result in results:
  586             if result['ofport'] == UNASSIGNED_OFPORT:
  587                 LOG.warning("Found not yet ready openvswitch port: %s",
  588                             result['name'])
  589             elif result['ofport'] == INVALID_OFPORT:
  590                 LOG.warning("Found failed openvswitch port: %s",
  591                             result['name'])
  592             elif 'attached-mac' in result['external_ids']:
  593                 port_id = self.portid_from_external_ids(result['external_ids'])
  594                 if port_id:
  595                     edge_ports.add(port_id)
  596         return edge_ports
  597 
  598     def portid_from_external_ids(self, external_ids):
  599         if 'iface-id' in external_ids:
  600             return external_ids['iface-id']
  601 
  602     def get_port_tag_dict(self):
  603         """Get a dict of port names and associated vlan tags.
  604 
  605         e.g. the returned dict is of the following form::
  606 
  607             {u'int-br-eth2': [],
  608              u'patch-tun': [],
  609              u'qr-76d9e6b6-21': 1,
  610              u'tapce5318ff-78': 1,
  611              u'tape1400310-e6': 1}
  612 
  613         The TAG ID is only available in the "Port" table and is not available
  614         in the "Interface" table queried by the get_vif_port_set() method.
  615 
  616         """
  617         results = self.get_ports_attributes(
  618             'Port', columns=['name', 'tag'], if_exists=True)
  619         return {p['name']: p['tag'] for p in results}
  620 
  621     def get_vifs_by_ids(self, port_ids):
  622         interface_info = self.get_ports_attributes(
  623             "Interface", columns=["name", "external_ids", "ofport"],
  624             if_exists=True)
  625         by_id = {x['external_ids'].get('iface-id'): x for x in interface_info}
  626         result = {}
  627         for port_id in port_ids:
  628             result[port_id] = None
  629             if port_id not in by_id:
  630                 LOG.info("Port %(port_id)s not present in bridge "
  631                          "%(br_name)s",
  632                          {'port_id': port_id, 'br_name': self.br_name})
  633                 continue
  634             pinfo = by_id[port_id]
  635             if not self._check_ofport(port_id, pinfo):
  636                 continue
  637             mac = pinfo['external_ids'].get('attached-mac')
  638             result[port_id] = VifPort(pinfo['name'], pinfo['ofport'],
  639                                       port_id, mac, self)
  640         return result
  641 
  642     @staticmethod
  643     def _check_ofport(port_id, port_info):
  644         if port_info['ofport'] in [UNASSIGNED_OFPORT, INVALID_OFPORT]:
  645             LOG.warning("ofport: %(ofport)s for VIF: %(vif)s "
  646                         "is not a positive integer",
  647                         {'ofport': port_info['ofport'], 'vif': port_id})
  648             return False
  649         return True
  650 
  651     def get_vif_port_by_id(self, port_id):
  652         ports = self.ovsdb.db_find(
  653             'Interface', ('external_ids', '=', {'iface-id': port_id}),
  654             ('external_ids', '!=', {'attached-mac': ''}),
  655             columns=['external_ids', 'name', 'ofport']).execute()
  656         for port in ports:
  657             if self.br_name != self.get_bridge_for_iface(port['name']):
  658                 continue
  659             if not self._check_ofport(port_id, port):
  660                 continue
  661             mac = port['external_ids'].get('attached-mac')
  662             return VifPort(port['name'], port['ofport'], port_id, mac, self)
  663         LOG.info("Port %(port_id)s not present in bridge %(br_name)s",
  664                  {'port_id': port_id, 'br_name': self.br_name})
  665 
  666     def delete_ports(self, all_ports=False):
  667         if all_ports:
  668             port_names = self.get_port_name_list()
  669         else:
  670             port_names = (port.port_name for port in self.get_vif_ports())
  671 
  672         for port_name in port_names:
  673             self.delete_port(port_name)
  674 
  675     def get_local_port_mac(self):
  676         """Retrieve the mac of the bridge's local port."""
  677         address = ip_lib.IPDevice(self.br_name).link.address
  678         if address:
  679             return address
  680         else:
  681             msg = _('Unable to determine mac address for %s') % self.br_name
  682             raise Exception(msg)
  683 
  684     def set_controllers_connection_mode(self, connection_mode):
  685         """Set bridge controllers connection mode.
  686 
  687         :param connection_mode: "out-of-band" or "in-band"
  688         """
  689         self.set_controller_field('connection_mode', connection_mode)
  690 
  691     def set_controllers_inactivity_probe(self, interval):
  692         """Set bridge controllers inactivity probe interval.
  693 
  694         :param interval: inactivity_probe value in seconds.
  695         """
  696         self.set_controller_field('inactivity_probe', interval * 1000)
  697 
  698     def _set_egress_bw_limit_for_port(self, port_name, max_kbps,
  699                                       max_burst_kbps):
  700         with self.ovsdb.transaction(check_error=True) as txn:
  701             txn.add(self.ovsdb.db_set('Interface', port_name,
  702                                       ('ingress_policing_rate', max_kbps)))
  703             txn.add(self.ovsdb.db_set('Interface', port_name,
  704                                       ('ingress_policing_burst',
  705                                        max_burst_kbps)))
  706 
  707     def create_egress_bw_limit_for_port(self, port_name, max_kbps,
  708                                         max_burst_kbps):
  709         self._set_egress_bw_limit_for_port(
  710             port_name, max_kbps, max_burst_kbps)
  711 
  712     def get_egress_bw_limit_for_port(self, port_name):
  713 
  714         max_kbps = self.db_get_val('Interface', port_name,
  715                                    'ingress_policing_rate')
  716         max_burst_kbps = self.db_get_val('Interface', port_name,
  717                                          'ingress_policing_burst')
  718 
  719         max_kbps = max_kbps or None
  720         max_burst_kbps = max_burst_kbps or None
  721 
  722         return max_kbps, max_burst_kbps
  723 
  724     def delete_egress_bw_limit_for_port(self, port_name):
  725         if not self.port_exists(port_name):
  726             return
  727         self._set_egress_bw_limit_for_port(
  728             port_name, 0, 0)
  729 
  730     def find_qos(self, port_name):
  731         qos = self.ovsdb.db_find(
  732             'QoS',
  733             ('external_ids', '=', {'id': port_name}),
  734             columns=['_uuid', 'other_config']).execute(check_error=True)
  735         if qos:
  736             return qos[0]
  737 
  738     def find_queue(self, port_name, queue_type):
  739         queues = self.ovsdb.db_find(
  740             'Queue',
  741             ('external_ids', '=', {'id': port_name,
  742                                    'queue_type': str(queue_type)}),
  743             columns=['_uuid', 'other_config']).execute(check_error=True)
  744         if queues:
  745             return queues[0]
  746 
  747     def _update_bw_limit_queue(self, txn, port_name, queue_uuid, queue_type,
  748                                other_config):
  749         if queue_uuid:
  750             txn.add(self.ovsdb.db_set(
  751                 'Queue', queue_uuid,
  752                 ('other_config', other_config)))
  753         else:
  754             external_ids = {'id': port_name,
  755                             'queue_type': str(queue_type)}
  756             queue_uuid = txn.add(
  757                 self.ovsdb.db_create(
  758                     'Queue', external_ids=external_ids,
  759                     other_config=other_config))
  760         return queue_uuid
  761 
  762     def _update_bw_limit_profile(self, txn, port_name, qos_uuid,
  763                                  queue_uuid, queue_type, qos_other_config):
  764         queues = {queue_type: queue_uuid}
  765         if qos_uuid:
  766             txn.add(self.ovsdb.db_set(
  767                 'QoS', qos_uuid, ('queues', queues)))
  768             txn.add(self.ovsdb.db_set(
  769                 'QoS', qos_uuid, ('other_config', qos_other_config)))
  770         else:
  771             external_ids = {'id': port_name}
  772             qos_uuid = txn.add(
  773                 self.ovsdb.db_create(
  774                     'QoS', external_ids=external_ids,
  775                     type='linux-htb',
  776                     queues=queues,
  777                     other_config=qos_other_config))
  778         return qos_uuid
  779 
  780     def _update_bw_limit_profile_dpdk(self, txn, port_name, qos_uuid,
  781                                       other_config):
  782         if qos_uuid:
  783             txn.add(self.ovsdb.db_set(
  784                 'QoS', qos_uuid, ('other_config', other_config)))
  785         else:
  786             external_ids = {'id': port_name}
  787             qos_uuid = txn.add(
  788                 self.ovsdb.db_create(
  789                     'QoS', external_ids=external_ids, type='egress-policer',
  790                     other_config=other_config))
  791         return qos_uuid
  792 
  793     def _update_ingress_bw_limit_for_port(
  794             self, port_name, max_bw_in_bits, max_burst_in_bits):
  795         qos_other_config = {
  796             'max-rate': str(max_bw_in_bits)
  797         }
  798         queue_other_config = {
  799             'max-rate': str(max_bw_in_bits),
  800             'burst': str(max_burst_in_bits),
  801         }
  802         qos = self.find_qos(port_name)
  803         queue = self.find_queue(port_name, QOS_DEFAULT_QUEUE)
  804         qos_uuid = qos['_uuid'] if qos else None
  805         queue_uuid = queue['_uuid'] if queue else None
  806         with self.ovsdb.transaction(check_error=True) as txn:
  807             queue_uuid = self._update_bw_limit_queue(
  808                 txn, port_name, queue_uuid, QOS_DEFAULT_QUEUE,
  809                 queue_other_config
  810             )
  811 
  812             qos_uuid = self._update_bw_limit_profile(
  813                 txn, port_name, qos_uuid, queue_uuid, QOS_DEFAULT_QUEUE,
  814                 qos_other_config
  815             )
  816 
  817             txn.add(self.ovsdb.db_set(
  818                 'Port', port_name, ('qos', qos_uuid)))
  819 
  820     def _update_ingress_bw_limit_for_dpdk_port(
  821             self, port_name, max_bw_in_bits, max_burst_in_bits):
  822         # cir and cbs should be set in bytes instead of bits
  823         qos_other_config = {
  824             'cir': str(max_bw_in_bits / 8),
  825             'cbs': str(max_burst_in_bits / 8)
  826         }
  827         qos = self.find_qos(port_name)
  828         qos_uuid = qos['_uuid'] if qos else None
  829         with self.ovsdb.transaction(check_error=True) as txn:
  830             qos_uuid = self._update_bw_limit_profile_dpdk(
  831                 txn, port_name, qos_uuid, qos_other_config)
  832             txn.add(self.ovsdb.db_set(
  833                 'Port', port_name, ('qos', qos_uuid)))
  834 
  835     def update_ingress_bw_limit_for_port(self, port_name, max_kbps,
  836                                          max_burst_kbps):
  837         max_bw_in_bits = max_kbps * common_constants.SI_BASE
  838         max_burst_in_bits = max_burst_kbps * common_constants.SI_BASE
  839         port_type = self._get_port_val(port_name, "type")
  840         if port_type in constants.OVS_DPDK_PORT_TYPES:
  841             self._update_ingress_bw_limit_for_dpdk_port(
  842                 port_name, max_bw_in_bits, max_burst_in_bits)
  843         else:
  844             self._update_ingress_bw_limit_for_port(
  845                 port_name, max_bw_in_bits, max_burst_in_bits)
  846 
  847     def get_ingress_bw_limit_for_port(self, port_name):
  848         max_kbps = None
  849         qos_max_kbps = None
  850         queue_max_kbps = None
  851         max_burst_kbit = None
  852 
  853         qos_res = self.find_qos(port_name)
  854         if qos_res:
  855             other_config = qos_res['other_config']
  856             max_bw_in_bits = other_config.get('max-rate')
  857             if max_bw_in_bits is not None:
  858                 qos_max_kbps = int(max_bw_in_bits) / common_constants.SI_BASE
  859 
  860         queue_res = self.find_queue(port_name, QOS_DEFAULT_QUEUE)
  861         if queue_res:
  862             other_config = queue_res['other_config']
  863             max_bw_in_bits = other_config.get('max-rate')
  864             if max_bw_in_bits is not None:
  865                 queue_max_kbps = int(max_bw_in_bits) / common_constants.SI_BASE
  866             max_burst_in_bits = other_config.get('burst')
  867             if max_burst_in_bits is not None:
  868                 max_burst_kbit = (
  869                     int(max_burst_in_bits) / common_constants.SI_BASE)
  870 
  871         if qos_max_kbps == queue_max_kbps:
  872             max_kbps = qos_max_kbps
  873         else:
  874             LOG.warning("qos max-rate %(qos_max_kbps)s is not equal to "
  875                         "queue max-rate %(queue_max_kbps)s",
  876                         {'qos_max_kbps': qos_max_kbps,
  877                          'queue_max_kbps': queue_max_kbps})
  878         return max_kbps, max_burst_kbit
  879 
  880     def get_ingress_bw_limit_for_dpdk_port(self, port_name):
  881         max_kbps = None
  882         max_burst_kbit = None
  883         res = self.find_qos(port_name)
  884         if res:
  885             other_config = res['other_config']
  886             max_bw_in_bytes = other_config.get("cir")
  887             if max_bw_in_bytes is not None:
  888                 max_kbps = common_utils.bits_to_kilobits(
  889                     common_utils.bytes_to_bits(int(float(max_bw_in_bytes))),
  890                     common_constants.SI_BASE)
  891             max_burst_in_bytes = other_config.get("cbs")
  892             if max_burst_in_bytes is not None:
  893                 max_burst_kbit = common_utils.bits_to_kilobits(
  894                     common_utils.bytes_to_bits(int(float(max_burst_in_bytes))),
  895                     common_constants.SI_BASE)
  896         return max_kbps, max_burst_kbit
  897 
  898     def delete_ingress_bw_limit_for_port(self, port_name):
  899         qos = self.find_qos(port_name)
  900         queue = self.find_queue(port_name, QOS_DEFAULT_QUEUE)
  901         does_port_exist = self.port_exists(port_name)
  902         with self.ovsdb.transaction(check_error=True) as txn:
  903             if does_port_exist:
  904                 txn.add(self.ovsdb.db_clear("Port", port_name, 'qos'))
  905             if qos:
  906                 txn.add(self.ovsdb.db_destroy('QoS', qos['_uuid']))
  907             if queue:
  908                 txn.add(self.ovsdb.db_destroy('Queue', queue['_uuid']))
  909 
  910     def set_controller_field(self, field, value):
  911         attr = [(field, value)]
  912         controllers = self.db_get_val('Bridge', self.br_name, 'controller')
  913         controllers = [controllers] if isinstance(
  914             controllers, uuid.UUID) else controllers
  915         with self.ovsdb.transaction(check_error=True) as txn:
  916             for controller_uuid in controllers:
  917                 txn.add(self.ovsdb.db_set(
  918                     'Controller', controller_uuid, *attr))
  919 
  920     def set_controller_rate_limit(self, controller_rate_limit):
  921         """Set bridge controller_rate_limit
  922 
  923         :param controller_rate_limit: at least 100
  924         """
  925         if controller_rate_limit < CTRL_RATE_LIMIT_MIN:
  926             LOG.info("rate limit's value must be at least 100")
  927             controller_rate_limit = CTRL_RATE_LIMIT_MIN
  928         self.set_controller_field(
  929             'controller_rate_limit', controller_rate_limit)
  930 
  931     def set_controller_burst_limit(self, controller_burst_limit):
  932         """Set bridge controller_burst_limit
  933 
  934         :param controller_burst_limit: at least 25
  935         """
  936         if controller_burst_limit < CTRL_BURST_LIMIT_MIN:
  937             LOG.info("burst limit's value must be at least 25")
  938             controller_burst_limit = CTRL_BURST_LIMIT_MIN
  939         self.set_controller_field(
  940             'controller_burst_limit', controller_burst_limit)
  941 
  942     def set_datapath_id(self, datapath_id):
  943         dpid_cfg = {'datapath-id': datapath_id}
  944         self.set_db_attribute('Bridge', self.br_name, 'other_config', dpid_cfg,
  945                               check_error=True)
  946 
  947     def get_egress_min_bw_for_port(self, port_id):
  948         queue = self._find_queue(port_id)
  949         if not queue:
  950             return
  951 
  952         min_bps = queue['other_config'].get('min-rate')
  953         return int(int(min_bps) / 1000) if min_bps else None
  954 
  955     def _set_queue_for_minimum_bandwidth(self, queue_num):
  956         # reg4 is used to memoize if queue was set or not. If it is first visit
  957         # to table 0 for a packet (i.e. reg4 == 0), set queue and memoize (i.e.
  958         # load 1 to reg4), then goto table 0 again. The packet will be handled
  959         # as usual when the second visit to table 0.
  960         self.add_flow(
  961             table=constants.LOCAL_SWITCHING,
  962             in_port=queue_num,
  963             reg4=0,
  964             priority=200,
  965             actions=("set_queue:%s,load:1->NXM_NX_REG4[0],"
  966                      "resubmit(,%s)" % (queue_num, constants.LOCAL_SWITCHING)))
  967 
  968     def _unset_queue_for_minimum_bandwidth(self, queue_num):
  969         self.delete_flows(
  970             table=constants.LOCAL_SWITCHING,
  971             in_port=queue_num,
  972             reg4=0)
  973 
  974     def update_minimum_bandwidth_queue(self, port_id, egress_port_names,
  975                                        queue_num, min_kbps):
  976         queue_num = int(queue_num)
  977         queue_id = self._update_queue(port_id, queue_num, min_kbps=min_kbps)
  978         qos_id, qos_queues = self._find_qos()
  979         if qos_queues:
  980             qos_queues[queue_num] = queue_id
  981         else:
  982             qos_queues = {queue_num: queue_id}
  983         qos_id = self._update_qos(
  984             qos_id=qos_id, queues=qos_queues)
  985         for egress_port_name in egress_port_names:
  986             self._set_port_qos(egress_port_name, qos_id=qos_id)
  987         self._set_queue_for_minimum_bandwidth(queue_num)
  988         return qos_id
  989 
  990     def delete_minimum_bandwidth_queue(self, port_id):
  991         queue = self._find_queue(port_id)
  992         if not queue:
  993             return
  994         queue_num = int(queue['external_ids']['queue-num'])
  995         self._unset_queue_for_minimum_bandwidth(queue_num)
  996         qos_id, qos_queues = self._find_qos()
  997         if not qos_queues:
  998             return
  999         if queue_num in qos_queues.keys():
 1000             qos_queues.pop(queue_num)
 1001             self._update_qos(
 1002                 qos_id=qos_id, queues=qos_queues)
 1003             self._delete_queue(queue['_uuid'])
 1004 
 1005     def clear_minimum_bandwidth_qos(self):
 1006         qoses = self._list_qos(
 1007             qos_type=qos_constants.RULE_TYPE_MINIMUM_BANDWIDTH)
 1008 
 1009         for qos in qoses:
 1010             qos_id = qos['_uuid']
 1011             queues = {num: queue.uuid
 1012                       for num, queue in qos['queues'].items()}
 1013             ports = self.ovsdb.db_find(
 1014                 'Port',
 1015                 ('qos', '=', qos_id),
 1016                 colmuns=['name']).execute(check_error=True)
 1017             for port in ports:
 1018                 self._set_port_qos(port['name'])
 1019             self.ovsdb.db_destroy('QoS', qos_id).execute(check_error=True)
 1020             for queue_uuid in queues.values():
 1021                 self._delete_queue(queue_uuid)
 1022 
 1023     def _update_queue(self, port_id, queue_num, max_kbps=None,
 1024                       max_burst_kbps=None, min_kbps=None):
 1025         other_config = {}
 1026         if max_kbps:
 1027             other_config[six.u('max-rate')] = six.u(str(max_kbps * 1000))
 1028         if max_burst_kbps:
 1029             other_config[six.u('burst')] = six.u(str(max_burst_kbps * 1000))
 1030         if min_kbps:
 1031             other_config[six.u('min-rate')] = six.u(str(min_kbps * 1000))
 1032 
 1033         queue = self._find_queue(port_id)
 1034         if queue and queue['_uuid']:
 1035             if queue['other_config'] != other_config:
 1036                 self.set_db_attribute('Queue', queue['_uuid'], 'other_config',
 1037                                       other_config, check_error=True)
 1038         else:
 1039             # NOTE(ralonsoh): "external_ids" is a map of string-string pairs
 1040             external_ids = {
 1041                 'port': str(port_id),
 1042                 'type': str(qos_constants.RULE_TYPE_MINIMUM_BANDWIDTH),
 1043                 'queue-num': str(queue_num)}
 1044             self.ovsdb.db_create(
 1045                 'Queue', other_config=other_config,
 1046                 external_ids=external_ids).execute(check_error=True)
 1047             queue = self._find_queue(port_id)
 1048         return queue['_uuid']
 1049 
 1050     def _find_queue(self, port_id, _type=None):
 1051         # NOTE(ralonsoh): in ovsdb native library, '{>=}' operator is not
 1052         # implemented yet. This is a workaround: list all queues and compare
 1053         # the external_ids key needed.
 1054         _type = _type or qos_constants.RULE_TYPE_MINIMUM_BANDWIDTH
 1055         queues = self._list_queues(port=port_id, _type=_type)
 1056         if queues:
 1057             return queues[0]
 1058         return None
 1059 
 1060     def _list_queues(self, _type=None, port=None):
 1061         queues = self.ovsdb.db_list(
 1062             'Queue',
 1063             columns=['_uuid', 'external_ids', 'other_config']).execute(
 1064             check_error=True)
 1065         if port:
 1066             queues = [queue for queue in queues
 1067                       if queue['external_ids'].get('port') == str(port)]
 1068         if _type:
 1069             queues = [queue for queue in queues
 1070                       if queue['external_ids'].get('type') == str(_type)]
 1071         return queues
 1072 
 1073     def _delete_queue(self, queue_id):
 1074         try:
 1075             self.ovsdb.db_destroy('Queue', queue_id).execute(check_error=True)
 1076         except idlutils.RowNotFound:
 1077             LOG.info('OVS Queue %s was already deleted', queue_id)
 1078 
 1079     def _update_qos(self, qos_id=None, queues=None):
 1080         queues = queues or {}
 1081         if not qos_id:
 1082             external_ids = {'id': self._min_bw_qos_id,
 1083                             '_type': qos_constants.RULE_TYPE_MINIMUM_BANDWIDTH}
 1084             self.ovsdb.db_create(
 1085                 'QoS',
 1086                 type='linux-htb',
 1087                 queues=queues,
 1088                 external_ids=external_ids).execute(check_error=True)
 1089             qos_id, _ = self._find_qos()
 1090         else:
 1091             self.clear_db_attribute('QoS', qos_id, 'queues')
 1092             if queues:
 1093                 self.set_db_attribute('QoS', qos_id, 'queues', queues,
 1094                                       check_error=True)
 1095         return qos_id
 1096 
 1097     def _list_qos(self, _id=None, qos_type=None):
 1098         external_ids = {}
 1099         if _id:
 1100             external_ids['id'] = _id
 1101         if qos_type:
 1102             external_ids['_type'] = qos_type
 1103         if external_ids:
 1104             return self.ovsdb.db_find(
 1105                 'QoS',
 1106                 ('external_ids', '=', external_ids),
 1107                 colmuns=['_uuid', 'queues']).execute(check_error=True)
 1108 
 1109         return self.ovsdb.db_find(
 1110             'QoS', colmuns=['_uuid', 'queues']).execute(check_error=True)
 1111 
 1112     def _find_qos(self):
 1113         qos_regs = self._list_qos(_id=self._min_bw_qos_id)
 1114         if qos_regs:
 1115             queues = {num: queue.uuid
 1116                       for num, queue in qos_regs[0]['queues'].items()}
 1117             return qos_regs[0]['_uuid'], queues
 1118         return None, None
 1119 
 1120     def _set_port_qos(self, port_name, qos_id=None):
 1121         if qos_id:
 1122             self.set_db_attribute('Port', port_name, 'qos', qos_id,
 1123                                   check_error=True)
 1124         else:
 1125             self.clear_db_attribute('Port', port_name, 'qos')
 1126 
 1127     def get_bridge_ports(self, port_type=None):
 1128         port_names = self.get_port_name_list() + [self.br_name]
 1129         ports = self.get_ports_attributes('Interface',
 1130                                           ports=port_names,
 1131                                           columns=['name', 'type'],
 1132                                           if_exists=True) or []
 1133         if port_type is None:
 1134             return ports
 1135         elif not isinstance(port_type, list):
 1136             port_type = [port_type]
 1137         return [port['name'] for port in ports if port['type'] in port_type]
 1138 
 1139     def __enter__(self):
 1140         self.create()
 1141         return self
 1142 
 1143     def __exit__(self, exc_type, exc_value, exc_tb):
 1144         self.destroy()
 1145 
 1146 
 1147 class DeferredOVSBridge(object):
 1148     '''Deferred OVSBridge.
 1149 
 1150     This class wraps add_flow, mod_flow and delete_flows calls to an OVSBridge
 1151     and defers their application until apply_flows call in order to perform
 1152     bulk calls. It wraps also ALLOWED_PASSTHROUGHS calls to avoid mixing
 1153     OVSBridge and DeferredOVSBridge uses.
 1154     This class can be used as a context, in such case apply_flows is called on
 1155     __exit__ except if an exception is raised.
 1156     This class is not thread-safe, that's why for every use a new instance
 1157     must be implemented.
 1158     '''
 1159     ALLOWED_PASSTHROUGHS = 'add_port', 'add_tunnel_port', 'delete_port'
 1160 
 1161     def __init__(self, br, full_ordered=False,
 1162                  order=('add', 'mod', 'del'), use_bundle=False):
 1163         '''Constructor.
 1164 
 1165         :param br: wrapped bridge
 1166         :param full_ordered: Optional, disable flow reordering (slower)
 1167         :param order: Optional, define in which order flow are applied
 1168         :param use_bundle: Optional, a bool whether --bundle should be passed
 1169                            to all ofctl commands. Default is set to False.
 1170         '''
 1171 
 1172         self.br = br
 1173         self.full_ordered = full_ordered
 1174         self.order = order
 1175         if not self.full_ordered:
 1176             self.weights = dict((y, x) for x, y in enumerate(self.order))
 1177         self.action_flow_tuples = []
 1178         self.use_bundle = use_bundle
 1179 
 1180     def __getattr__(self, name):
 1181         if name in self.ALLOWED_PASSTHROUGHS:
 1182             return getattr(self.br, name)
 1183         raise AttributeError(name)
 1184 
 1185     def add_flow(self, **kwargs):
 1186         self.action_flow_tuples.append(('add', kwargs))
 1187 
 1188     def mod_flow(self, **kwargs):
 1189         self.action_flow_tuples.append(('mod', kwargs))
 1190 
 1191     def delete_flows(self, **kwargs):
 1192         self.action_flow_tuples.append(('del', kwargs))
 1193 
 1194     def apply_flows(self):
 1195         action_flow_tuples = self.action_flow_tuples
 1196         self.action_flow_tuples = []
 1197         if not action_flow_tuples:
 1198             return
 1199 
 1200         if not self.full_ordered:
 1201             action_flow_tuples.sort(key=lambda af: self.weights[af[0]])
 1202 
 1203         grouped = itertools.groupby(action_flow_tuples,
 1204                                     key=operator.itemgetter(0))
 1205         itemgetter_1 = operator.itemgetter(1)
 1206         for action, action_flow_list in grouped:
 1207             flows = list(map(itemgetter_1, action_flow_list))
 1208             self.br.do_action_flows(action, flows, self.use_bundle)
 1209 
 1210     def __enter__(self):
 1211         return self
 1212 
 1213     def __exit__(self, exc_type, exc_value, traceback):
 1214         if exc_type is None:
 1215             self.apply_flows()
 1216         else:
 1217             LOG.exception("OVS flows could not be applied on bridge %s",
 1218                           self.br.br_name)
 1219 
 1220 
 1221 def _build_flow_expr_str(flow_dict, cmd, strict):
 1222     flow_expr_arr = []
 1223     actions = None
 1224 
 1225     if cmd == 'add':
 1226         flow_expr_arr.append("hard_timeout=%s" %
 1227                              flow_dict.pop('hard_timeout', '0'))
 1228         flow_expr_arr.append("idle_timeout=%s" %
 1229                              flow_dict.pop('idle_timeout', '0'))
 1230         flow_expr_arr.append("priority=%s" %
 1231                              flow_dict.pop('priority', '1'))
 1232     elif 'priority' in flow_dict:
 1233         if not strict:
 1234             msg = _("Cannot match priority on flow deletion or modification "
 1235                     "without 'strict'")
 1236             raise exceptions.InvalidInput(error_message=msg)
 1237 
 1238     if cmd != 'del':
 1239         if "actions" not in flow_dict:
 1240             msg = _("Must specify one or more actions on flow addition"
 1241                     " or modification")
 1242             raise exceptions.InvalidInput(error_message=msg)
 1243         actions = "actions=%s" % flow_dict.pop('actions')
 1244 
 1245     for key, value in flow_dict.items():
 1246         if key == 'proto':
 1247             flow_expr_arr.append(value)
 1248         else:
 1249             flow_expr_arr.append("%s=%s" % (key, str(value)))
 1250 
 1251     if actions:
 1252         flow_expr_arr.append(actions)
 1253 
 1254     return ','.join(flow_expr_arr)
 1255 
 1256 
 1257 def generate_random_cookie():
 1258     # The OpenFlow spec forbids use of -1
 1259     return random.randrange(UINT64_BITMASK)
 1260 
 1261 
 1262 def check_cookie_mask(cookie):
 1263     cookie = str(cookie)
 1264     if '/' not in cookie:
 1265         return cookie + '/-1'
 1266     else:
 1267         return cookie
 1268 
 1269 
 1270 def is_a_flow_line(line):
 1271     # this is used to filter out from ovs-ofctl dump-flows the lines that
 1272     # are not flow descriptions but mere indications of the type of openflow
 1273     # message that was used ; e.g.:
 1274     #
 1275     # # ovs-ofctl dump-flows br-int
 1276     # NXST_FLOW reply (xid=0x4):
 1277     #  cookie=0xb7dff131a697c6a5, duration=2411726.809s, table=0, ...
 1278     #  cookie=0xb7dff131a697c6a5, duration=2411726.786s, table=23, ...
 1279     #  cookie=0xb7dff131a697c6a5, duration=2411726.760s, table=24, ...
 1280     #
 1281     return 'NXST' not in line and 'OFPST' not in line