"Fossies" - the Fresh Open Source Software Archive 
Member "neutron-14.0.3/neutron/services/logapi/drivers/openvswitch/ovs_firewall_log.py" (22 Oct 2019, 18665 Bytes) of package /linux/misc/openstack/neutron-14.0.3.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "ovs_firewall_log.py" see the
Fossies "Dox" file reference documentation and the last
Fossies "Diffs" side-by-side code changes report:
14.0.2_vs_15.0.0.
1 # Copyright (c) 2017 Fujitsu Limited
2 # All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15
16 import collections
17
18 from neutron_lib import constants as lib_const
19 from os_ken.base import app_manager
20 from os_ken.lib.packet import packet
21 from oslo_config import cfg
22 from oslo_log import formatters
23 from oslo_log import handlers
24 from oslo_log import log as logging
25
26 from neutron.agent.linux.openvswitch_firewall import constants as ovsfw_consts
27 from neutron.agent.linux.openvswitch_firewall import firewall as ovsfw
28 from neutron.agent.linux.openvswitch_firewall import rules
29 from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants \
30 as ovs_consts
31 from neutron.services.logapi.agent import log_extension as log_ext
32 from neutron.services.logapi.common import constants as log_const
33 from neutron.services.logapi.common import exceptions as log_exc
34 from neutron.services.logapi.drivers.openvswitch import log_oskenapp
35
36 LOG = logging.getLogger(__name__)
37
38 OVS_FW_TO_LOG_TABLES = {
39 ovs_consts.RULES_EGRESS_TABLE: ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE,
40 ovs_consts.RULES_INGRESS_TABLE: ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE,
41 }
42
43 FIELDS_TO_REMOVE = ['priority', 'actions', 'dl_type',
44 'reg_port', 'reg_remote_group']
45
46 REMOTE_RULE_PRIORITY = 70
47
48
49 def setup_logging():
50 log_file = cfg.CONF.network_log.local_output_log_base
51 if log_file:
52 from logging import handlers as watch_handler
53 log_file_handler = watch_handler.WatchedFileHandler(log_file)
54 log_file_handler.setLevel(
55 logging.DEBUG if cfg.CONF.debug else logging.INFO)
56 LOG.logger.addHandler(log_file_handler)
57 log_file_handler.setFormatter(
58 formatters.ContextFormatter(
59 fmt=cfg.CONF.logging_default_format_string,
60 datefmt=cfg.CONF.log_date_format))
61 elif cfg.CONF.use_journal:
62 journal_handler = handlers.OSJournalHandler()
63 LOG.logger.addHandler(journal_handler)
64 else:
65 syslog_handler = handlers.OSSysLogHandler()
66 LOG.logger.addHandler(syslog_handler)
67
68
69 def find_deleted_sg_rules(old_port, new_ports):
70 del_rules = list()
71 for port in new_ports:
72 if old_port.id == port.id:
73 for rule in old_port.secgroup_rules:
74 if rule not in port.secgroup_rules:
75 del_rules.append(rule)
76 return del_rules
77 return del_rules
78
79
80 class Cookie(object):
81
82 def __init__(self, cookie_id, port, action, project):
83 self.id = cookie_id
84 self.port = port
85 self.action = action
86 self.project = project
87 self.log_object_refs = set()
88
89 def __eq__(self, other):
90 return (self.id == other.id and
91 self.action == other.action and
92 self.port == other.port)
93
94 def __hash__(self):
95 return hash(self.id)
96
97 def add_log_obj_ref(self, log_id):
98 self.log_object_refs.add(log_id)
99
100 def remove_log_obj_ref(self, log_id):
101 self.log_object_refs.discard(log_id)
102
103 @property
104 def is_empty(self):
105 return not self.log_object_refs
106
107
108 class OFPortLog(object):
109
110 def __init__(self, port, ovs_port, log_event):
111 self.id = port['port_id']
112 self.ofport = ovs_port.ofport
113 self.secgroup_rules = [self._update_rule(rule) for rule in
114 port['security_group_rules']]
115 # event can be ALL, DROP and ACCEPT
116 self.event = log_event
117
118 def _update_rule(self, rule):
119 protocol = rule.get('protocol')
120 if protocol is not None:
121 if not isinstance(protocol, int) and protocol.isdigit():
122 rule['protocol'] = int(protocol)
123 elif (rule.get('ethertype') == lib_const.IPv6 and
124 protocol == lib_const.PROTO_NAME_ICMP):
125 rule['protocol'] = lib_const.PROTO_NUM_IPV6_ICMP
126 else:
127 rule['protocol'] = lib_const.IP_PROTOCOL_MAP.get(
128 protocol, protocol)
129 return rule
130
131
132 class OVSFirewallLoggingDriver(log_ext.LoggingDriver):
133
134 SUPPORTED_LOGGING_TYPES = ['security_group']
135 REQUIRED_PROTOCOLS = [
136 ovs_consts.OPENFLOW13,
137 ovs_consts.OPENFLOW14,
138 ]
139
140 def __init__(self, agent_api):
141 integration_bridge = agent_api.request_int_br()
142 self.int_br = self.initialize_bridge(integration_bridge)
143 self._deferred = False
144 self.log_ports = collections.defaultdict(dict)
145 self.cookies_table = set()
146 self.cookie_ids_to_delete = set()
147 self.conj_id_map = ovsfw.ConjIdMap()
148
149 def initialize(self, resource_rpc, **kwargs):
150 self.resource_rpc = resource_rpc
151 setup_logging()
152 self.start_logapp()
153
154 @staticmethod
155 def initialize_bridge(bridge):
156 bridge.add_protocols(*OVSFirewallLoggingDriver.REQUIRED_PROTOCOLS)
157 # set rate limit and burst limit for controller
158 bridge.set_controller_rate_limit(cfg.CONF.network_log.rate_limit)
159 bridge.set_controller_burst_limit(cfg.CONF.network_log.burst_limit)
160 return bridge.deferred(full_ordered=True)
161
162 def start_logapp(self):
163 app_mgr = app_manager.AppManager.get_instance()
164 self.log_app = app_mgr.instantiate(log_oskenapp.OVSLogOSKenApp)
165 self.log_app.start()
166 self.log_app.register_packet_in_handler(self.packet_in_handler)
167
168 def packet_in_handler(self, ev):
169 msg = ev.msg
170 cookie_id = msg.cookie
171 pkt = packet.Packet(msg.data)
172 try:
173 cookie_entry = self._get_cookie_by_id(cookie_id)
174 LOG.info("action=%s project_id=%s log_resource_ids=%s vm_port=%s "
175 "pkt=%s", cookie_entry.action, cookie_entry.project,
176 list(cookie_entry.log_object_refs),
177 cookie_entry.port, pkt)
178 except log_exc.CookieNotFound:
179 LOG.warning("Unknown cookie=%s packet_in pkt=%s", cookie_id, pkt)
180
181 def defer_apply_on(self):
182 self._deferred = True
183
184 def defer_apply_off(self):
185 if self._deferred:
186 self.int_br.apply_flows()
187 self._cleanup_cookies()
188 self._deferred = False
189
190 def _get_cookie(self, port_id, action):
191 for cookie in self.cookies_table:
192 if cookie.port == port_id and cookie.action == action:
193 return cookie
194
195 def _get_cookies_by_port(self, port_id):
196 cookies_list = []
197 for cookie in self.cookies_table:
198 if cookie.port == port_id:
199 cookies_list.append(cookie)
200 return cookies_list
201
202 def _get_cookie_by_id(self, cookie_id):
203 for cookie in self.cookies_table:
204 if str(cookie.id) == str(cookie_id):
205 return cookie
206 raise log_exc.CookieNotFound(cookie_id=cookie_id)
207
208 def _cleanup_cookies(self):
209 cookie_ids = self.cookie_ids_to_delete
210 self.cookie_ids_to_delete = set()
211 for cookie_id in cookie_ids:
212 self.int_br.br.unset_cookie(cookie_id)
213
214 def generate_cookie(self, port_id, action, log_id, project_id):
215 cookie = self._get_cookie(port_id, action)
216 if not cookie:
217 cookie_id = self.int_br.br.request_cookie()
218 cookie = Cookie(cookie_id=cookie_id, port=port_id,
219 action=action, project=project_id)
220 self.cookies_table.add(cookie)
221 cookie.add_log_obj_ref(log_id)
222 return cookie.id
223
224 def _schedule_cookie_deletion(self, cookie):
225 # discard a cookie object
226 self.cookies_table.remove(cookie)
227 # schedule to cleanup cookie_ids later
228 self.cookie_ids_to_delete.add(cookie.id)
229
230 def start_logging(self, context, **kwargs):
231 LOG.debug("start logging: %s", str(kwargs))
232 for resource_type in self.SUPPORTED_LOGGING_TYPES:
233 # handle port updated, agent restarted
234 if 'port_id' in kwargs:
235 self._handle_logging('_create', context,
236 resource_type, **kwargs)
237 else:
238 self._handle_log_resources_by_type(
239 '_create', context, resource_type, **kwargs)
240
241 def stop_logging(self, context, **kwargs):
242 LOG.debug("stop logging: %s", str(kwargs))
243 for resource_type in self.SUPPORTED_LOGGING_TYPES:
244 # handle port deleted
245 if 'port_id' in kwargs:
246 self._handle_logging('_delete', context,
247 resource_type, **kwargs)
248 else:
249 self._handle_log_resources_by_type(
250 '_delete', context, resource_type, **kwargs)
251
252 def _handle_log_resources_by_type(
253 self, action, context, resource_type, **kwargs):
254
255 log_resources = []
256 for log_obj in kwargs.get('log_resources', []):
257 if log_obj['resource_type'] == resource_type:
258 log_resources.append(log_obj)
259 if log_resources:
260 self._handle_logging(
261 action, context, resource_type, log_resources=log_resources)
262
263 def _handle_logging(self, action, context, resource_type, **kwargs):
264 handler_name = "%s_%s_log" % (action, resource_type)
265 handler = getattr(self, handler_name)
266 handler(context, **kwargs)
267
268 def create_ofport_log(self, port, log_id, log_event):
269 port_id = port['port_id']
270 ovs_port = self.int_br.br.get_vif_port_by_id(port_id)
271 if ovs_port:
272 of_port_log = OFPortLog(port, ovs_port, log_event)
273 self.log_ports[log_id].add(of_port_log)
274
275 def _create_security_group_log(self, context, **kwargs):
276
277 port_id = kwargs.get('port_id')
278 log_resources = kwargs.get('log_resources')
279 logs_info = []
280 if port_id:
281 # try to clean port flows log for port updated/create event
282 self._cleanup_port_flows_log(port_id)
283 logs_info = self.resource_rpc.get_sg_log_info_for_port(
284 context,
285 resource_type=log_const.SECURITY_GROUP,
286 port_id=port_id)
287 elif log_resources:
288 logs_info = self.resource_rpc.get_sg_log_info_for_log_resources(
289 context,
290 resource_type=log_const.SECURITY_GROUP,
291 log_resources=log_resources)
292
293 for log_info in logs_info:
294 log_id = log_info['id']
295 old_ofport_logs = self.log_ports.get(log_id, [])
296 ports = log_info.get('ports_log')
297 self.log_ports[log_id] = set()
298 for port in ports:
299 self.create_ofport_log(port, log_id, log_info.get('event'))
300
301 # try to clean flows log if sg_rules are deleted
302 for port in old_ofport_logs:
303 del_rules = find_deleted_sg_rules(
304 port, self.log_ports[log_id])
305 if del_rules:
306 self._delete_sg_rules_flow_log(port, del_rules)
307
308 for port_log in self.log_ports[log_id]:
309 self.add_flows_from_rules(port_log, log_info)
310
311 def _cleanup_port_flows_log(self, port_id):
312 cookies_list = self._get_cookies_by_port(port_id)
313 for cookie in cookies_list:
314 if cookie.action == log_const.ACCEPT_EVENT:
315 self._delete_flows(
316 table=ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE,
317 cookie=cookie.id)
318 self._delete_flows(
319 table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE,
320 cookie=cookie.id)
321 if cookie.action == log_const.DROP_EVENT:
322 self._delete_flows(
323 table=ovs_consts.DROPPED_TRAFFIC_TABLE,
324 cookie=cookie.id)
325 self._schedule_cookie_deletion(cookie)
326
327 def _delete_security_group_log(self, context, **kwargs):
328 # port deleted event
329 port_id = kwargs.get('port_id')
330
331 if port_id:
332 self._cleanup_port_flows_log(port_id)
333
334 # log resources deleted events
335 for log_resource in kwargs.get('log_resources', []):
336 log_id = log_resource.get('id')
337 of_port_logs = self.log_ports.get(log_id, [])
338 for of_port_log in of_port_logs:
339 self.delete_port_flows_log(of_port_log, log_id)
340
341 def _log_accept_flow(self, **flow):
342 # log first accepted packet
343 flow['table'] = OVS_FW_TO_LOG_TABLES[flow['table']]
344 flow['actions'] = 'controller'
345 # forward egress accepted packet and log
346 if flow['table'] == ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE:
347 flow['actions'] = 'resubmit(,%d),controller' % (
348 ovs_consts.ACCEPTED_EGRESS_TRAFFIC_NORMAL_TABLE)
349 self._add_flow(**flow)
350
351 def _add_flow(self, **kwargs):
352 dl_type = kwargs.get('dl_type')
353 ovsfw.create_reg_numbers(kwargs)
354 if isinstance(dl_type, int):
355 kwargs['dl_type'] = "0x{:04x}".format(dl_type)
356 LOG.debug("Add flow firewall log %s", str(kwargs))
357 if self._deferred:
358 self.int_br.add_flow(**kwargs)
359 else:
360 self.int_br.br.add_flow(**kwargs)
361
362 def _delete_flows(self, **kwargs):
363 ovsfw.create_reg_numbers(kwargs)
364 if self._deferred:
365 self.int_br.delete_flows(**kwargs)
366 else:
367 self.int_br.br.delete_flows(**kwargs)
368
369 def _log_drop_packet(self, port, log_id, project_id):
370 cookie = self.generate_cookie(port.id, log_const.DROP_EVENT,
371 log_id, project_id)
372 self._add_flow(
373 cookie=cookie,
374 table=ovs_consts.DROPPED_TRAFFIC_TABLE,
375 priority=53,
376 reg_port=port.ofport,
377 actions='controller'
378 )
379
380 def create_rules_generator_for_port(self, port):
381 for rule in port.secgroup_rules:
382 yield rule
383
384 def _create_conj_flows_log(self, remote_rule, port):
385 ethertype = remote_rule['ethertype']
386 direction = remote_rule['direction']
387 remote_sg_id = remote_rule['remote_group_id']
388 secgroup_id = remote_rule['security_group_id']
389 # we only want to log first accept packet, that means a packet with
390 # ct_state=+new-est, reg_remote_group=conj_id + 1 will be logged
391 flow_template = {
392 'priority': REMOTE_RULE_PRIORITY,
393 'dl_type': ovsfw_consts.ethertype_to_dl_type_map[ethertype],
394 'reg_port': port.ofport,
395 'reg_remote_group': self.conj_id_map.get_conj_id(
396 secgroup_id, remote_sg_id, direction, ethertype) + 1,
397 }
398 if direction == lib_const.INGRESS_DIRECTION:
399 flow_template['table'] = ovs_consts.RULES_INGRESS_TABLE
400 elif direction == lib_const.EGRESS_DIRECTION:
401 flow_template['table'] = ovs_consts.RULES_EGRESS_TABLE
402 return [flow_template]
403
404 def _log_accept_packet(self, port, log_id, project_id):
405 cookie = self.generate_cookie(port.id, log_const.ACCEPT_EVENT,
406 log_id, project_id)
407 for rule in self.create_rules_generator_for_port(port):
408 if 'remote_group_id' in rule:
409 flows = self._create_conj_flows_log(rule, port)
410 else:
411 flows = rules.create_flows_from_rule_and_port(rule, port)
412 for flow in flows:
413 flow['cookie'] = cookie
414 self._log_accept_flow(**flow)
415
416 def add_flows_from_rules(self, port, log_info):
417 # log event can be ACCEPT or DROP or ALL(both ACCEPT and DROP)
418 event = log_info['event']
419 project_id = log_info['project_id']
420 log_id = log_info['id']
421 if event == log_const.ACCEPT_EVENT:
422 self._log_accept_packet(port, log_id, project_id)
423 elif event == log_const.DROP_EVENT:
424 self._log_drop_packet(port, log_id, project_id)
425 else:
426 self._log_drop_packet(port, log_id, project_id)
427 self._log_accept_packet(port, log_id, project_id)
428
429 def _delete_accept_flows_log(self, port, log_id):
430 cookie = self._get_cookie(port.id, log_const.ACCEPT_EVENT)
431 if cookie:
432 cookie.remove_log_obj_ref(log_id)
433 if cookie.is_empty:
434 self._delete_flows(
435 table=ovs_consts.ACCEPTED_INGRESS_TRAFFIC_TABLE,
436 cookie=cookie.id)
437 self._delete_flows(
438 table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_TABLE,
439 cookie=cookie.id)
440 self._schedule_cookie_deletion(cookie)
441
442 def _delete_drop_flows_log(self, port, log_id):
443 cookie = self._get_cookie(port.id, log_const.DROP_EVENT)
444 if cookie:
445 cookie.remove_log_obj_ref(log_id)
446 if cookie.is_empty:
447 self._delete_flows(table=ovs_consts.DROPPED_TRAFFIC_TABLE,
448 cookie=cookie.id)
449 self._schedule_cookie_deletion(cookie)
450
451 def delete_port_flows_log(self, port, log_id):
452 """Delete all flows log for given port and log_id"""
453 event = port.event
454 if event == log_const.ACCEPT_EVENT:
455 self._delete_accept_flows_log(port, log_id)
456 elif event == log_const.DROP_EVENT:
457 self._delete_drop_flows_log(port, log_id)
458 else:
459 self._delete_accept_flows_log(port, log_id)
460 self._delete_drop_flows_log(port, log_id)
461
462 def _delete_sg_rules_flow_log(self, port, del_rules):
463 cookie = self._get_cookie(port.id, log_const.ACCEPT_EVENT)
464 if not cookie:
465 return
466 for rule in del_rules:
467 if 'remote_group_id' in rule:
468 flows = self._create_conj_flows_log(rule, port)
469 else:
470 flows = rules.create_flows_from_rule_and_port(rule, port)
471 for flow in flows:
472 for kw in FIELDS_TO_REMOVE:
473 flow.pop(kw, None)
474 flow['table'] = OVS_FW_TO_LOG_TABLES[flow['table']]
475 flow['cookie'] = cookie.id
476 self._delete_flows(**flow)