web2ldap  1.7.7
About: web2ldap is a full-featured web-based LDAPv3 client.
  Fossies Dox: web2ldap-1.7.7.tar.gz  ("unofficial" and yet experimental doxygen-generated source code documentation)  

dns.py
Go to the documentation of this file.
1# -*- coding: ascii -*-
2"""
3web2ldap plugin classes for DNS attributes
4
5https://drift.uninett.no/nett/ip-nett/dnsattributes.schema
6"""
7
8import re
9import hashlib
10
11import ldap0
12import ldap0.schema.models
13from ldap0.dn import DNObj
14from ldap0.res import SearchResultEntry
15from ldap0.schema.models import AttributeType
16
17import web2ldapcnf
18
19from ..searchform import (
20 SEARCH_OPT_ATTR_EXISTS,
21 SEARCH_OPT_IS_EQUAL,
22)
23from ..schema.syntaxes import (
24 IA5String,
25 DNSDomain,
26 DynamicValueSelectList,
27 IPv4HostAddress,
28 IPv6HostAddress,
29 syntax_registry,
30)
31
32
34 oid: str = 'AssociatedDomain-oid'
35 desc: str = 'Associated DNS domain name (see RFC 4524, section 2.1.)'
36
37 def _validate(self, attr_value: bytes) -> bool:
38 result = DNSDomain._validate(self, attr_value)
39 ocs = self._entry.object_class_oid_set()
40 if 'dNSDomain' in ocs or 'dNSDomain2' in ocs:
41 try:
42 dc_aval = self._entry['dc'][0]
43 except KeyError:
44 pass
45 else:
46 result = result and (attr_value == dc_aval or attr_value.startswith(dc_aval+b'.'))
47 return result
48
49 def _parent_domain(self):
50 """
51 Return the best matching domain entry for the given DN
52 """
53 if not self._dn:
54 return None
55 ldap_result = self._app.ls.l.search_s(
56 str(self._app.ls.get_search_root(self._dn)),
57 ldap0.SCOPE_SUBTREE,
58 '(&(objectClass=dNSDomain)(|(sOARecord=*)(nSRecord=*))(associatedDomain=*))',
59 attrlist=['associatedDomain'],
60 )
61 if not ldap_result:
62 return None
63 dn2domain = {
64 DNObj.from_str(res.dn_s): res.entry_s['associatedDomain'][0]
65 for res in ldap_result
66 if isinstance(res, SearchResultEntry)
67 }
68 if not dn2domain:
69 return None
70 matched_dn = self.dn.match(dn2domain.keys())
71 if not matched_dn:
72 return None
73 return dn2domain.get(matched_dn, None)
74 # end of _parent_domain()
75
76 def sanitize(self, attr_value: bytes) -> bytes:
77 attr_value = DNSDomain.sanitize(self, attr_value)
78 if not attr_value:
79 parent_domain = (self._parent_domain() or '').encode(self._app.ls.charset)
80 try:
81 dc_value = self._entry['dc'][0]
82 except (KeyError, IndexError):
83 pass
84 else:
85 attr_value = DNSDomain.sanitize(self, b'.'.join((dc_value, parent_domain)))
86 return attr_value
87
88 def form_value(self) -> str:
89 fval = DNSDomain.form_value(self)
90 parent_domain = self._parent_domain() or ''
91 if not fval:
92 try:
93 dc_value = self._entry['dc'][0].decode(self._app.ls.charset)
94 except (KeyError, IndexError):
95 pass
96 else:
97 fval = '.'.join((dc_value, parent_domain))
98 return fval
99
100 def display(self, vidx, links) -> str:
101 res = [DNSDomain.display(self, vidx, links)]
102 if links:
103 aval = self.av_u.lower()
104 res.append(self._app.anchor(
105 'search', 'Ref. RRs',
106 (
107 ('dn', str(self._app.naming_context)),
108 ('searchform_mode', 'adv'),
109 ('search_mode', '(|%s)'),
110 ('search_attr', 'cNAMERecord'),
111 ('search_option', SEARCH_OPT_IS_EQUAL),
112 ('search_string', aval),
113 ('search_attr', 'nSRecord'),
114 ('search_option', SEARCH_OPT_IS_EQUAL),
115 ('search_string', aval),
116 ('search_attr', 'pTRRecord'),
117 ('search_option', SEARCH_OPT_IS_EQUAL),
118 ('search_string', aval),
119 ),
120 title='Search referencing DNS RR entries',
121 ))
122 parent_domain = '.'.join(aval.strip().split('.')[1:])
123 if parent_domain and 'sOARecord' not in self._entry:
124 res.append(self._app.anchor(
125 'search', 'SOA RR',
126 (
127 ('dn', str(self._app.naming_context)),
128 ('searchform_mode', 'adv'),
129 ('search_attr', 'sOARecord'),
130 ('search_option', SEARCH_OPT_ATTR_EXISTS),
131 ('search_string', ''),
132 ('search_attr', 'associatedDomain'),
133 ('search_option', SEARCH_OPT_IS_EQUAL),
134 ('search_string', parent_domain),
135 ),
136 title='Search SOA RR entry of parent domain',
137 ))
138 if aval.endswith('.in-addr.arpa'):
139 try:
140 ip_addr_u = '.'.join(
141 map(str, reversed(list(map(int, aval.split('.')[0:4]))))
142 )
143 except ValueError:
144 pass
145 else:
146 res.append(self._app.anchor(
147 'search', 'A RRs',
148 (
149 ('dn', str(self._app.naming_context)),
150 ('searchform_mode', 'adv'),
151 ('search_attr', 'aRecord'),
152 ('search_option', SEARCH_OPT_IS_EQUAL),
153 ('search_string', ip_addr_u),
154 ),
155 title='Search referencing DNS A RR entries',
156 ))
157 if '1.3.6.1.1.1.1.19' in self._schema.sed[AttributeType]:
158 res.append(self._app.anchor(
159 'search', 'IP host(s)',
160 (
161 ('dn', str(self._app.naming_context)),
162 ('searchform_mode', 'adv'),
163 ('search_attr', 'ipHostNumber'),
164 ('search_option', SEARCH_OPT_IS_EQUAL),
165 ('search_string', ip_addr_u),
166 ),
167 title='Search IP host(s) for this A address',
168 ))
169 if '2.16.840.1.113719.1.203.4.3' in self._schema.sed[AttributeType]:
170 res.append(self._app.anchor(
171 'search', 'DHCP host(s)',
172 (
173 ('dn', str(self._app.naming_context)),
174 ('searchform_mode', 'adv'),
175 ('search_attr', 'dhcpStatements'),
176 ('search_option', SEARCH_OPT_IS_EQUAL),
177 ('search_string', 'fixed-address %s' % ip_addr_u),
178 ),
179 title='Search DHCP host(s) for this A address',
180 ))
181 return web2ldapcnf.command_link_separator.join(res)
182
183syntax_registry.reg_at(
184 AssociatedDomain.oid, [
185 '0.9.2342.19200300.100.1.37', # associatedDomain
186 ],
187 #structural_oc_oids=[
188 # '0.9.2342.19200300.100.4.15', # dNSDomain
189 # '1.3.6.1.4.1.2428.20.2', # dNSDomain2
190 #],
191)
192
193
195 oid: str = 'ResourceRecord-oid'
196 desc: str = 'A resource record pointing to another DNS RR'
197 ldap_url = 'ldap:///_?associatedDomain,associatedDomain?sub?(objectClass=domainRelatedObject)'
198
199 def __init__(self, app, dn: str, schema, attrType: str, attr_value: bytes, entry=None):
200 DynamicValueSelectList.__init__(self, app, dn, schema, attrType, attr_value, entry)
201
202 def display(self, vidx, links) -> str:
203 return DynamicValueSelectList.display(self, vidx, links)
204
205syntax_registry.reg_at(
206 ResourceRecord.oid, [
207 '1.3.6.1.4.1.2428.20.1.12', # pTRRecord
208 '0.9.2342.19200300.100.1.29', # nSRecord
209 ]
210)
211
212
214 oid: str = 'CNAMERecord-oid'
215 desc: str = 'A resource record used as alias (CNAME)'
216 max_values = 1 # It's illegal to have multiple CNAME RR values
217
218syntax_registry.reg_at(
219 CNAMERecord.oid, [
220 '0.9.2342.19200300.100.1.31', # cNAMERecord
221 ]
222)
223
224
226 oid: str = 'MXRecord-oid'
227 desc: str = 'A resource record pointing to a mail exchanger (MX)'
228 pattern = re.compile(r'^[0-9]+[ ]+[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)*$')
229
230 def _search_ref(self, attr_value: str):
231 try:
232 _, hostname = attr_value.split(' ', 1)
233 except ValueError:
234 return None
235 return ResourceRecord._search_ref(self, hostname.strip())
236
237syntax_registry.reg_at(
238 MXRecord.oid, [
239 '0.9.2342.19200300.100.1.28', # mXRecord
240 ]
241)
242
243
245 oid: str = 'ARecord-oid'
246 desc: str = 'A resource record pointing to IPv4 address'
247
248 def display(self, vidx, links) -> str:
249 res = [IPv4HostAddress.display(self, vidx, links)]
250 if links:
251 ip_addr = self.addr_classaddr_class(self.av_u)
252 res.append(self._app.anchor(
253 'search', 'PTR RR',
254 (
255 ('dn', str(self._app.naming_context)),
256 ('searchform_mode', 'adv'),
257 ('search_attr', 'associatedDomain'),
258 ('search_option', SEARCH_OPT_IS_EQUAL),
259 ('search_string', ip_addr.reverse_pointer),
260 ),
261 title='Search PTR RR for this A address',
262 ))
263 if '1.3.6.1.1.1.1.19' in self._schema.sed[AttributeType]:
264 res.append(self._app.anchor(
265 'search', 'IP host(s)',
266 (
267 ('dn', str(self._app.naming_context)),
268 ('searchform_mode', 'adv'),
269 ('search_attr', 'ipHostNumber'),
270 ('search_option', SEARCH_OPT_IS_EQUAL),
271 ('search_string', str(ip_addr)),
272 ),
273 title='Search IP host(s) for this A address',
274 ))
275 if '2.16.840.1.113719.1.203.4.3' in self._schema.sed[AttributeType]:
276 res.append(self._app.anchor(
277 'search', 'DHCP host(s)',
278 (
279 ('dn', str(self._app.naming_context)),
280 ('searchform_mode', 'adv'),
281 ('search_attr', 'dhcpStatements'),
282 ('search_option', SEARCH_OPT_IS_EQUAL),
283 ('search_string', 'fixed-address %s' % str(ip_addr)),
284 ),
285 title='Search DHCP host(s) for this A address',
286 ))
287 return web2ldapcnf.command_link_separator.join(res)
288
289syntax_registry.reg_at(
290 ARecord.oid, [
291 '0.9.2342.19200300.100.1.26', # aRecord
292 ]
293)
294
295
297 oid: str = 'AAAARecord-oid'
298 desc: str = 'AAAA resource record pointing to IPv6 address'
299
300 def display(self, vidx, links) -> str:
301 res = [IPv6HostAddress.display(self, vidx, links)]
302 if links:
303 ip_addr = self.addr_classaddr_class(self.av_u)
304 res.append(self._app.anchor(
305 'search', 'PTR RR',
306 (
307 ('dn', str(self._app.naming_context)),
308 ('searchform_mode', 'adv'),
309 ('search_attr', 'associatedDomain'),
310 ('search_option', SEARCH_OPT_IS_EQUAL),
311 ('search_string', ip_addr.reverse_pointer),
312 ),
313 title='Search PTR RR for this AAAA address',
314 ))
315 return web2ldapcnf.command_link_separator.join(res)
316
317syntax_registry.reg_at(
318 AAAARecord.oid, [
319 '1.3.6.1.4.1.2428.20.1.28', # aAAARecord
320 ]
321)
322
323
325 oid: str = 'SSHFPRecord-oid'
326 desc: str = 'A resource record with SSH fingerprint (SSHFP)'
327 pattern = re.compile('^[0-4]? [0-2]? [0-9a-fA-F]+$')
328 key_algo_dict = {
329 b'0': 'reserved',
330 b'1': 'RSA',
331 b'2': 'DSA',
332 b'3': 'ECDSA',
333 b'4': 'ED25519',
334 }
335 fp_algo_dict = {
336 b'0': 'reserved',
337 b'1': 'SHA-1',
338 b'2': 'SHA-256',
339 }
340 fp_algo_len = {
341 b'1': 2*hashlib.sha1().digest_size,
342 b'2': 2*hashlib.sha256().digest_size,
343 }
344
345 def sanitize(self, attr_value: bytes) -> bytes:
346 if not attr_value:
347 return attr_value
348 try:
349 key_algo, fp_algo, fp_value = filter(
350 None,
351 map(bytes.strip, attr_value.lower().split(b' '))
352 )
353 except ValueError:
354 return attr_value
355 return b' '.join((key_algo, fp_algo, fp_value))
356
357 def _validate(self, attr_value: bytes) -> bool:
358 try:
359 key_algo, fp_algo, fp_value = tuple(
360 filter(None, map(bytes.strip, attr_value.split(b' ')))
361 )
362 except ValueError:
363 return False
364 else:
365 result = key_algo in self.key_algo_dict and fp_algo in self.fp_algo_dict
366 try:
367 fp_algo_len = self.fp_algo_len[fp_algo]
368 except KeyError:
369 pass
370 else:
371 result = result and len(fp_value) == fp_algo_len
372 return result
373
374 def display(self, vidx, links) -> str:
375 display_value = IA5String.display(self, vidx, links)
376 try:
377 key_algo, fp_algo, _ = tuple(filter(None, map(bytes.strip, self._av.split(b' '))))
378 except ValueError:
379 res = display_value
380 else:
381 try:
382 key_algo_name = self.key_algo_dict[key_algo]
383 except KeyError:
384 key_algo_name = '?'
385 try:
386 fp_algo_name = self.fp_algo_dict[fp_algo]
387 except KeyError:
388 fp_algo_name = '?'
389 res = (
390 'key_algo={key_algo_name} '
391 'fp_algo={fp_algo_name}:<br>'
392 '<code>{display_value}</code>'
393 ).format(
394 key_algo_name=key_algo_name,
395 fp_algo_name=fp_algo_name,
396 display_value=display_value,
397 )
398 return res
399
400syntax_registry.reg_at(
401 SSHFPRecord.oid, [
402 '1.3.6.1.4.1.2428.20.1.44', # sSHFPRecord
403 ]
404)
405
406
407# Register all syntax classes in this module
408syntax_registry.reg_syntaxes(__name__)
str display(self, vidx, links)
Definition: dns.py:300
str display(self, vidx, links)
Definition: dns.py:248
str display(self, vidx, links)
Definition: dns.py:100
bytes sanitize(self, bytes attr_value)
Definition: dns.py:76
bool _validate(self, bytes attr_value)
Definition: dns.py:37
def _search_ref(self, str attr_value)
Definition: dns.py:230
str display(self, vidx, links)
Definition: dns.py:202
def __init__(self, app, str dn, schema, str attrType, bytes attr_value, entry=None)
Definition: dns.py:199
bytes sanitize(self, bytes attr_value)
Definition: dns.py:345
str display(self, vidx, links)
Definition: dns.py:374
bool _validate(self, bytes attr_value)
Definition: dns.py:357