"Fossies" - the Fresh Open Source Software Archive

Member "neutron-14.0.3/neutron/services/logapi/drivers/openvswitch/ovs_firewall_log.py" (22 Oct 2019, 18665 Bytes) of package /linux/misc/openstack/neutron-14.0.3.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "ovs_firewall_log.py" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 14.0.2_vs_15.0.0.

    1 # Copyright (c) 2017 Fujitsu Limited
    2 # All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 
   16 import collections
   17 
   18 from neutron_lib import constants as lib_const
   19 from os_ken.base import app_manager
   20 from os_ken.lib.packet import packet
   21 from oslo_config import cfg
   22 from oslo_log import formatters
   23 from oslo_log import handlers
   24 from oslo_log import log as logging
   25 
   26 from neutron.agent.linux.openvswitch_firewall import constants as ovsfw_consts
   27 from neutron.agent.linux.openvswitch_firewall import firewall as ovsfw
   28 from neutron.agent.linux.openvswitch_firewall import rules
   29 from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants \
   30         as ovs_consts
   31 from neutron.services.logapi.agent import log_extension as log_ext
   32 from neutron.services.logapi.common import constants as log_const
   33 from neutron.services.logapi.common import exceptions as log_exc
   34 from neutron.services.logapi.drivers.openvswitch import log_oskenapp
   35 
   36 LOG = logging.getLogger(__name__)
   37 
   38 OVS_FW_TO_LOG_TABLES = {
   39     ovs_consts.RULES_EGRESS_TABLE: ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE,
   40     ovs_consts.RULES_INGRESS_TABLE: ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE,
   41 }
   42 
   43 FIELDS_TO_REMOVE = ['priority', 'actions', 'dl_type',
   44                     'reg_port', 'reg_remote_group']
   45 
   46 REMOTE_RULE_PRIORITY = 70
   47 
   48 
   49 def setup_logging():
   50     log_file = cfg.CONF.network_log.local_output_log_base
   51     if log_file:
   52         from logging import handlers as watch_handler
   53         log_file_handler = watch_handler.WatchedFileHandler(log_file)
   54         log_file_handler.setLevel(
   55             logging.DEBUG if cfg.CONF.debug else logging.INFO)
   56         LOG.logger.addHandler(log_file_handler)
   57         log_file_handler.setFormatter(
   58             formatters.ContextFormatter(
   59                 fmt=cfg.CONF.logging_default_format_string,
   60                 datefmt=cfg.CONF.log_date_format))
   61     elif cfg.CONF.use_journal:
   62         journal_handler = handlers.OSJournalHandler()
   63         LOG.logger.addHandler(journal_handler)
   64     else:
   65         syslog_handler = handlers.OSSysLogHandler()
   66         LOG.logger.addHandler(syslog_handler)
   67 
   68 
   69 def find_deleted_sg_rules(old_port, new_ports):
   70     del_rules = list()
   71     for port in new_ports:
   72         if old_port.id == port.id:
   73             for rule in old_port.secgroup_rules:
   74                 if rule not in port.secgroup_rules:
   75                     del_rules.append(rule)
   76             return del_rules
   77     return del_rules
   78 
   79 
   80 class Cookie(object):
   81 
   82     def __init__(self, cookie_id, port, action, project):
   83         self.id = cookie_id
   84         self.port = port
   85         self.action = action
   86         self.project = project
   87         self.log_object_refs = set()
   88 
   89     def __eq__(self, other):
   90         return (self.id == other.id and
   91                 self.action == other.action and
   92                 self.port == other.port)
   93 
   94     def __hash__(self):
   95         return hash(self.id)
   96 
   97     def add_log_obj_ref(self, log_id):
   98         self.log_object_refs.add(log_id)
   99 
  100     def remove_log_obj_ref(self, log_id):
  101         self.log_object_refs.discard(log_id)
  102 
  103     @property
  104     def is_empty(self):
  105         return not self.log_object_refs
  106 
  107 
  108 class OFPortLog(object):
  109 
  110     def __init__(self, port, ovs_port, log_event):
  111         self.id = port['port_id']
  112         self.ofport = ovs_port.ofport
  113         self.secgroup_rules = [self._update_rule(rule) for rule in
  114                                port['security_group_rules']]
  115         # event can be ALL, DROP and ACCEPT
  116         self.event = log_event
  117 
  118     def _update_rule(self, rule):
  119         protocol = rule.get('protocol')
  120         if protocol is not None:
  121             if not isinstance(protocol, int) and protocol.isdigit():
  122                 rule['protocol'] = int(protocol)
  123             elif (rule.get('ethertype') == lib_const.IPv6 and
  124                   protocol == lib_const.PROTO_NAME_ICMP):
  125                 rule['protocol'] = lib_const.PROTO_NUM_IPV6_ICMP
  126             else:
  127                 rule['protocol'] = lib_const.IP_PROTOCOL_MAP.get(
  128                     protocol, protocol)
  129         return rule
  130 
  131 
  132 class OVSFirewallLoggingDriver(log_ext.LoggingDriver):
  133 
  134     SUPPORTED_LOGGING_TYPES = ['security_group']
  135     REQUIRED_PROTOCOLS = [
  136         ovs_consts.OPENFLOW13,
  137         ovs_consts.OPENFLOW14,
  138     ]
  139 
  140     def __init__(self, agent_api):
  141         integration_bridge = agent_api.request_int_br()
  142         self.int_br = self.initialize_bridge(integration_bridge)
  143         self._deferred = False
  144         self.log_ports = collections.defaultdict(dict)
  145         self.cookies_table = set()
  146         self.cookie_ids_to_delete = set()
  147         self.conj_id_map = ovsfw.ConjIdMap()
  148 
  149     def initialize(self, resource_rpc, **kwargs):
  150         self.resource_rpc = resource_rpc
  151         setup_logging()
  152         self.start_logapp()
  153 
  154     @staticmethod
  155     def initialize_bridge(bridge):
  156         bridge.add_protocols(*OVSFirewallLoggingDriver.REQUIRED_PROTOCOLS)
  157         # set rate limit and burst limit for controller
  158         bridge.set_controller_rate_limit(cfg.CONF.network_log.rate_limit)
  159         bridge.set_controller_burst_limit(cfg.CONF.network_log.burst_limit)
  160         return bridge.deferred(full_ordered=True)
  161 
  162     def start_logapp(self):
  163         app_mgr = app_manager.AppManager.get_instance()
  164         self.log_app = app_mgr.instantiate(log_oskenapp.OVSLogOSKenApp)
  165         self.log_app.start()
  166         self.log_app.register_packet_in_handler(self.packet_in_handler)
  167 
  168     def packet_in_handler(self, ev):
  169         msg = ev.msg
  170         cookie_id = msg.cookie
  171         pkt = packet.Packet(msg.data)
  172         try:
  173             cookie_entry = self._get_cookie_by_id(cookie_id)
  174             LOG.info("action=%s project_id=%s log_resource_ids=%s vm_port=%s "
  175                      "pkt=%s", cookie_entry.action, cookie_entry.project,
  176                      list(cookie_entry.log_object_refs),
  177                      cookie_entry.port, pkt)
  178         except log_exc.CookieNotFound:
  179             LOG.warning("Unknown cookie=%s packet_in pkt=%s", cookie_id, pkt)
  180 
  181     def defer_apply_on(self):
  182         self._deferred = True
  183 
  184     def defer_apply_off(self):
  185         if self._deferred:
  186             self.int_br.apply_flows()
  187             self._cleanup_cookies()
  188             self._deferred = False
  189 
  190     def _get_cookie(self, port_id, action):
  191         for cookie in self.cookies_table:
  192             if cookie.port == port_id and cookie.action == action:
  193                 return cookie
  194 
  195     def _get_cookies_by_port(self, port_id):
  196         cookies_list = []
  197         for cookie in self.cookies_table:
  198             if cookie.port == port_id:
  199                 cookies_list.append(cookie)
  200         return cookies_list
  201 
  202     def _get_cookie_by_id(self, cookie_id):
  203         for cookie in self.cookies_table:
  204             if str(cookie.id) == str(cookie_id):
  205                 return cookie
  206         raise log_exc.CookieNotFound(cookie_id=cookie_id)
  207 
  208     def _cleanup_cookies(self):
  209         cookie_ids = self.cookie_ids_to_delete
  210         self.cookie_ids_to_delete = set()
  211         for cookie_id in cookie_ids:
  212             self.int_br.br.unset_cookie(cookie_id)
  213 
  214     def generate_cookie(self, port_id, action, log_id, project_id):
  215         cookie = self._get_cookie(port_id, action)
  216         if not cookie:
  217             cookie_id = self.int_br.br.request_cookie()
  218             cookie = Cookie(cookie_id=cookie_id, port=port_id,
  219                             action=action, project=project_id)
  220             self.cookies_table.add(cookie)
  221         cookie.add_log_obj_ref(log_id)
  222         return cookie.id
  223 
  224     def _schedule_cookie_deletion(self, cookie):
  225         # discard a cookie object
  226         self.cookies_table.remove(cookie)
  227         # schedule to cleanup cookie_ids later
  228         self.cookie_ids_to_delete.add(cookie.id)
  229 
  230     def start_logging(self, context, **kwargs):
  231         LOG.debug("start logging: %s", str(kwargs))
  232         for resource_type in self.SUPPORTED_LOGGING_TYPES:
  233             # handle port updated, agent restarted
  234             if 'port_id' in kwargs:
  235                 self._handle_logging('_create', context,
  236                                      resource_type, **kwargs)
  237             else:
  238                 self._handle_log_resources_by_type(
  239                     '_create', context, resource_type, **kwargs)
  240 
  241     def stop_logging(self, context, **kwargs):
  242         LOG.debug("stop logging: %s", str(kwargs))
  243         for resource_type in self.SUPPORTED_LOGGING_TYPES:
  244             # handle port deleted
  245             if 'port_id' in kwargs:
  246                 self._handle_logging('_delete', context,
  247                                      resource_type, **kwargs)
  248             else:
  249                 self._handle_log_resources_by_type(
  250                     '_delete', context, resource_type, **kwargs)
  251 
  252     def _handle_log_resources_by_type(
  253             self, action, context, resource_type, **kwargs):
  254 
  255         log_resources = []
  256         for log_obj in kwargs.get('log_resources', []):
  257             if log_obj['resource_type'] == resource_type:
  258                 log_resources.append(log_obj)
  259         if log_resources:
  260             self._handle_logging(
  261                 action, context, resource_type, log_resources=log_resources)
  262 
  263     def _handle_logging(self, action, context, resource_type, **kwargs):
  264         handler_name = "%s_%s_log" % (action, resource_type)
  265         handler = getattr(self, handler_name)
  266         handler(context, **kwargs)
  267 
  268     def create_ofport_log(self, port, log_id, log_event):
  269         port_id = port['port_id']
  270         ovs_port = self.int_br.br.get_vif_port_by_id(port_id)
  271         if ovs_port:
  272             of_port_log = OFPortLog(port, ovs_port, log_event)
  273             self.log_ports[log_id].add(of_port_log)
  274 
  275     def _create_security_group_log(self, context, **kwargs):
  276 
  277         port_id = kwargs.get('port_id')
  278         log_resources = kwargs.get('log_resources')
  279         logs_info = []
  280         if port_id:
  281             # try to clean port flows log for port updated/create event
  282             self._cleanup_port_flows_log(port_id)
  283             logs_info = self.resource_rpc.get_sg_log_info_for_port(
  284                 context,
  285                 resource_type=log_const.SECURITY_GROUP,
  286                 port_id=port_id)
  287         elif log_resources:
  288             logs_info = self.resource_rpc.get_sg_log_info_for_log_resources(
  289                 context,
  290                 resource_type=log_const.SECURITY_GROUP,
  291                 log_resources=log_resources)
  292 
  293         for log_info in logs_info:
  294             log_id = log_info['id']
  295             old_ofport_logs = self.log_ports.get(log_id, [])
  296             ports = log_info.get('ports_log')
  297             self.log_ports[log_id] = set()
  298             for port in ports:
  299                 self.create_ofport_log(port, log_id, log_info.get('event'))
  300 
  301             # try to clean flows log if sg_rules are deleted
  302             for port in old_ofport_logs:
  303                 del_rules = find_deleted_sg_rules(
  304                     port, self.log_ports[log_id])
  305                 if del_rules:
  306                     self._delete_sg_rules_flow_log(port, del_rules)
  307 
  308             for port_log in self.log_ports[log_id]:
  309                 self.add_flows_from_rules(port_log, log_info)
  310 
  311     def _cleanup_port_flows_log(self, port_id):
  312         cookies_list = self._get_cookies_by_port(port_id)
  313         for cookie in cookies_list:
  314             if cookie.action == log_const.ACCEPT_EVENT:
  315                 self._delete_flows(
  316                     table=ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE,
  317                     cookie=cookie.id)
  318                 self._delete_flows(
  319                     table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE,
  320                     cookie=cookie.id)
  321             if cookie.action == log_const.DROP_EVENT:
  322                 self._delete_flows(
  323                     table=ovs_consts.DROPPED_TRAFFIC_TABLE,
  324                     cookie=cookie.id)
  325             self._schedule_cookie_deletion(cookie)
  326 
  327     def _delete_security_group_log(self, context, **kwargs):
  328         # port deleted event
  329         port_id = kwargs.get('port_id')
  330 
  331         if port_id:
  332             self._cleanup_port_flows_log(port_id)
  333 
  334         # log resources deleted events
  335         for log_resource in kwargs.get('log_resources', []):
  336             log_id = log_resource.get('id')
  337             of_port_logs = self.log_ports.get(log_id, [])
  338             for of_port_log in of_port_logs:
  339                 self.delete_port_flows_log(of_port_log, log_id)
  340 
  341     def _log_accept_flow(self, **flow):
  342         # log first accepted packet
  343         flow['table'] = OVS_FW_TO_LOG_TABLES[flow['table']]
  344         flow['actions'] = 'controller'
  345         # forward egress accepted packet and log
  346         if flow['table'] == ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE:
  347             flow['actions'] = 'resubmit(,%d),controller' % (
  348                 ovs_consts.ACCEPTED_EGRESS_TRAFFIC_NORMAL_TABLE)
  349         self._add_flow(**flow)
  350 
  351     def _add_flow(self, **kwargs):
  352         dl_type = kwargs.get('dl_type')
  353         ovsfw.create_reg_numbers(kwargs)
  354         if isinstance(dl_type, int):
  355             kwargs['dl_type'] = "0x{:04x}".format(dl_type)
  356         LOG.debug("Add flow firewall log %s", str(kwargs))
  357         if self._deferred:
  358             self.int_br.add_flow(**kwargs)
  359         else:
  360             self.int_br.br.add_flow(**kwargs)
  361 
  362     def _delete_flows(self, **kwargs):
  363         ovsfw.create_reg_numbers(kwargs)
  364         if self._deferred:
  365             self.int_br.delete_flows(**kwargs)
  366         else:
  367             self.int_br.br.delete_flows(**kwargs)
  368 
  369     def _log_drop_packet(self, port, log_id, project_id):
  370         cookie = self.generate_cookie(port.id, log_const.DROP_EVENT,
  371                                       log_id, project_id)
  372         self._add_flow(
  373             cookie=cookie,
  374             table=ovs_consts.DROPPED_TRAFFIC_TABLE,
  375             priority=53,
  376             reg_port=port.ofport,
  377             actions='controller'
  378         )
  379 
  380     def create_rules_generator_for_port(self, port):
  381         for rule in port.secgroup_rules:
  382             yield rule
  383 
  384     def _create_conj_flows_log(self, remote_rule, port):
  385         ethertype = remote_rule['ethertype']
  386         direction = remote_rule['direction']
  387         remote_sg_id = remote_rule['remote_group_id']
  388         secgroup_id = remote_rule['security_group_id']
  389         # we only want to log first accept packet, that means a packet with
  390         # ct_state=+new-est, reg_remote_group=conj_id + 1 will be logged
  391         flow_template = {
  392             'priority': REMOTE_RULE_PRIORITY,
  393             'dl_type': ovsfw_consts.ethertype_to_dl_type_map[ethertype],
  394             'reg_port': port.ofport,
  395             'reg_remote_group': self.conj_id_map.get_conj_id(
  396                 secgroup_id, remote_sg_id, direction, ethertype) + 1,
  397         }
  398         if direction == lib_const.INGRESS_DIRECTION:
  399             flow_template['table'] = ovs_consts.RULES_INGRESS_TABLE
  400         elif direction == lib_const.EGRESS_DIRECTION:
  401             flow_template['table'] = ovs_consts.RULES_EGRESS_TABLE
  402         return [flow_template]
  403 
  404     def _log_accept_packet(self, port, log_id, project_id):
  405         cookie = self.generate_cookie(port.id, log_const.ACCEPT_EVENT,
  406                                       log_id, project_id)
  407         for rule in self.create_rules_generator_for_port(port):
  408             if 'remote_group_id' in rule:
  409                 flows = self._create_conj_flows_log(rule, port)
  410             else:
  411                 flows = rules.create_flows_from_rule_and_port(rule, port)
  412             for flow in flows:
  413                 flow['cookie'] = cookie
  414                 self._log_accept_flow(**flow)
  415 
  416     def add_flows_from_rules(self, port, log_info):
  417         # log event can be ACCEPT or DROP or ALL(both ACCEPT and DROP)
  418         event = log_info['event']
  419         project_id = log_info['project_id']
  420         log_id = log_info['id']
  421         if event == log_const.ACCEPT_EVENT:
  422             self._log_accept_packet(port, log_id, project_id)
  423         elif event == log_const.DROP_EVENT:
  424             self._log_drop_packet(port, log_id, project_id)
  425         else:
  426             self._log_drop_packet(port, log_id, project_id)
  427             self._log_accept_packet(port, log_id, project_id)
  428 
  429     def _delete_accept_flows_log(self, port, log_id):
  430         cookie = self._get_cookie(port.id, log_const.ACCEPT_EVENT)
  431         if cookie:
  432             cookie.remove_log_obj_ref(log_id)
  433             if cookie.is_empty:
  434                 self._delete_flows(
  435                     table=ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE,
  436                     cookie=cookie.id)
  437                 self._delete_flows(
  438                     table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE,
  439                     cookie=cookie.id)
  440                 self._schedule_cookie_deletion(cookie)
  441 
  442     def _delete_drop_flows_log(self, port, log_id):
  443         cookie = self._get_cookie(port.id, log_const.DROP_EVENT)
  444         if cookie:
  445             cookie.remove_log_obj_ref(log_id)
  446             if cookie.is_empty:
  447                 self._delete_flows(table=ovs_consts.DROPPED_TRAFFIC_TABLE,
  448                                    cookie=cookie.id)
  449                 self._schedule_cookie_deletion(cookie)
  450 
  451     def delete_port_flows_log(self, port, log_id):
  452         """Delete all flows log for given port and log_id"""
  453         event = port.event
  454         if event == log_const.ACCEPT_EVENT:
  455             self._delete_accept_flows_log(port, log_id)
  456         elif event == log_const.DROP_EVENT:
  457             self._delete_drop_flows_log(port, log_id)
  458         else:
  459             self._delete_accept_flows_log(port, log_id)
  460             self._delete_drop_flows_log(port, log_id)
  461 
  462     def _delete_sg_rules_flow_log(self, port, del_rules):
  463         cookie = self._get_cookie(port.id, log_const.ACCEPT_EVENT)
  464         if not cookie:
  465             return
  466         for rule in del_rules:
  467             if 'remote_group_id' in rule:
  468                 flows = self._create_conj_flows_log(rule, port)
  469             else:
  470                 flows = rules.create_flows_from_rule_and_port(rule, port)
  471             for flow in flows:
  472                 for kw in FIELDS_TO_REMOVE:
  473                     flow.pop(kw, None)
  474                 flow['table'] = OVS_FW_TO_LOG_TABLES[flow['table']]
  475                 flow['cookie'] = cookie.id
  476                 self._delete_flows(**flow)