3web2ldap plugin classes for DNS attributes
5https://drift.uninett.no/nett/ip-nett/dnsattributes.schema
12import ldap0.schema.models
13from ldap0.dn
import DNObj
14from ldap0.res
import SearchResultEntry
15from ldap0.schema.models
import AttributeType
19from ..searchform
import (
20 SEARCH_OPT_ATTR_EXISTS,
23from ..schema.syntaxes
import (
26 DynamicValueSelectList,
34 oid: str =
'AssociatedDomain-oid'
35 desc: str =
'Associated DNS domain name (see RFC 4524, section 2.1.)'
38 result = DNSDomain._validate(self, attr_value)
39 ocs = self.
_entry.object_class_oid_set()
40 if 'dNSDomain' in ocs
or 'dNSDomain2' in ocs:
42 dc_aval = self.
_entry[
'dc'][0]
46 result = result
and (attr_value == dc_aval
or attr_value.startswith(dc_aval+b
'.'))
51 Return the best matching domain entry for the given DN
55 ldap_result = self.
_app.ls.l.search_s(
58 '(&(objectClass=dNSDomain)(|(sOARecord=*)(nSRecord=*))(associatedDomain=*))',
59 attrlist=[
'associatedDomain'],
64 DNObj.from_str(res.dn_s): res.entry_s[
'associatedDomain'][0]
65 for res
in ldap_result
66 if isinstance(res, SearchResultEntry)
70 matched_dn = self.
dn.match(dn2domain.keys())
73 return dn2domain.get(matched_dn,
None)
76 def sanitize(self, attr_value: bytes) -> bytes:
77 attr_value = DNSDomain.sanitize(self, attr_value)
81 dc_value = self.
_entry[
'dc'][0]
82 except (KeyError, IndexError):
85 attr_value = DNSDomain.sanitize(self, b
'.'.join((dc_value, parent_domain)))
89 fval = DNSDomain.form_value(self)
93 dc_value = self.
_entry[
'dc'][0].decode(self.
_app.ls.charset)
94 except (KeyError, IndexError):
97 fval =
'.'.join((dc_value, parent_domain))
101 res = [DNSDomain.display(self, vidx, links)]
103 aval = self.
av_u.lower()
104 res.append(self.
_app.anchor(
105 'search',
'Ref. RRs',
107 (
'dn',
str(self.
_app.naming_context)),
108 (
'searchform_mode',
'adv'),
109 (
'search_mode',
'(|%s)'),
110 (
'search_attr',
'cNAMERecord'),
111 (
'search_option', SEARCH_OPT_IS_EQUAL),
112 (
'search_string', aval),
113 (
'search_attr',
'nSRecord'),
114 (
'search_option', SEARCH_OPT_IS_EQUAL),
115 (
'search_string', aval),
116 (
'search_attr',
'pTRRecord'),
117 (
'search_option', SEARCH_OPT_IS_EQUAL),
118 (
'search_string', aval),
120 title=
'Search referencing DNS RR entries',
122 parent_domain =
'.'.join(aval.strip().split(
'.')[1:])
123 if parent_domain
and 'sOARecord' not in self.
_entry:
124 res.append(self.
_app.anchor(
127 (
'dn',
str(self.
_app.naming_context)),
128 (
'searchform_mode',
'adv'),
129 (
'search_attr',
'sOARecord'),
130 (
'search_option', SEARCH_OPT_ATTR_EXISTS),
131 (
'search_string',
''),
132 (
'search_attr',
'associatedDomain'),
133 (
'search_option', SEARCH_OPT_IS_EQUAL),
134 (
'search_string', parent_domain),
136 title=
'Search SOA RR entry of parent domain',
138 if aval.endswith(
'.in-addr.arpa'):
140 ip_addr_u =
'.'.join(
141 map(str, reversed(list(map(int, aval.split(
'.')[0:4]))))
146 res.append(self.
_app.anchor(
149 (
'dn',
str(self.
_app.naming_context)),
150 (
'searchform_mode',
'adv'),
151 (
'search_attr',
'aRecord'),
152 (
'search_option', SEARCH_OPT_IS_EQUAL),
153 (
'search_string', ip_addr_u),
155 title=
'Search referencing DNS A RR entries',
157 if '1.3.6.1.1.1.1.19' in self.
_schema.sed[AttributeType]:
158 res.append(self.
_app.anchor(
159 'search',
'IP host(s)',
161 (
'dn',
str(self.
_app.naming_context)),
162 (
'searchform_mode',
'adv'),
163 (
'search_attr',
'ipHostNumber'),
164 (
'search_option', SEARCH_OPT_IS_EQUAL),
165 (
'search_string', ip_addr_u),
167 title=
'Search IP host(s) for this A address',
169 if '2.16.840.1.113719.1.203.4.3' in self.
_schema.sed[AttributeType]:
170 res.append(self.
_app.anchor(
171 'search',
'DHCP host(s)',
173 (
'dn',
str(self.
_app.naming_context)),
174 (
'searchform_mode',
'adv'),
175 (
'search_attr',
'dhcpStatements'),
176 (
'search_option', SEARCH_OPT_IS_EQUAL),
177 (
'search_string',
'fixed-address %s' % ip_addr_u),
179 title=
'Search DHCP host(s) for this A address',
181 return web2ldapcnf.command_link_separator.join(res)
183syntax_registry.reg_at(
184 AssociatedDomain.oid, [
185 '0.9.2342.19200300.100.1.37',
195 oid: str =
'ResourceRecord-oid'
196 desc: str =
'A resource record pointing to another DNS RR'
197 ldap_url =
'ldap:///_?associatedDomain,associatedDomain?sub?(objectClass=domainRelatedObject)'
199 def __init__(self, app, dn: str, schema, attrType: str, attr_value: bytes, entry=
None):
200 DynamicValueSelectList.__init__(self, app, dn, schema, attrType, attr_value, entry)
203 return DynamicValueSelectList.display(self, vidx, links)
205syntax_registry.reg_at(
206 ResourceRecord.oid, [
207 '1.3.6.1.4.1.2428.20.1.12',
208 '0.9.2342.19200300.100.1.29',
214 oid: str =
'CNAMERecord-oid'
215 desc: str =
'A resource record used as alias (CNAME)'
218syntax_registry.reg_at(
220 '0.9.2342.19200300.100.1.31',
226 oid: str =
'MXRecord-oid'
227 desc: str =
'A resource record pointing to a mail exchanger (MX)'
228 pattern = re.compile(
r'^[0-9]+[ ]+[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)*$')
232 _, hostname = attr_value.split(
' ', 1)
235 return ResourceRecord._search_ref(self, hostname.strip())
237syntax_registry.reg_at(
239 '0.9.2342.19200300.100.1.28',
245 oid: str =
'ARecord-oid'
246 desc: str =
'A resource record pointing to IPv4 address'
249 res = [IPv4HostAddress.display(self, vidx, links)]
252 res.append(self.
_app.anchor(
255 (
'dn',
str(self.
_app.naming_context)),
256 (
'searchform_mode',
'adv'),
257 (
'search_attr',
'associatedDomain'),
258 (
'search_option', SEARCH_OPT_IS_EQUAL),
259 (
'search_string', ip_addr.reverse_pointer),
261 title=
'Search PTR RR for this A address',
263 if '1.3.6.1.1.1.1.19' in self.
_schema.sed[AttributeType]:
264 res.append(self.
_app.anchor(
265 'search',
'IP host(s)',
267 (
'dn',
str(self.
_app.naming_context)),
268 (
'searchform_mode',
'adv'),
269 (
'search_attr',
'ipHostNumber'),
270 (
'search_option', SEARCH_OPT_IS_EQUAL),
271 (
'search_string',
str(ip_addr)),
273 title=
'Search IP host(s) for this A address',
275 if '2.16.840.1.113719.1.203.4.3' in self.
_schema.sed[AttributeType]:
276 res.append(self.
_app.anchor(
277 'search',
'DHCP host(s)',
279 (
'dn',
str(self.
_app.naming_context)),
280 (
'searchform_mode',
'adv'),
281 (
'search_attr',
'dhcpStatements'),
282 (
'search_option', SEARCH_OPT_IS_EQUAL),
283 (
'search_string',
'fixed-address %s' %
str(ip_addr)),
285 title=
'Search DHCP host(s) for this A address',
287 return web2ldapcnf.command_link_separator.join(res)
289syntax_registry.reg_at(
291 '0.9.2342.19200300.100.1.26',
297 oid: str =
'AAAARecord-oid'
298 desc: str =
'AAAA resource record pointing to IPv6 address'
301 res = [IPv6HostAddress.display(self, vidx, links)]
304 res.append(self.
_app.anchor(
307 (
'dn',
str(self.
_app.naming_context)),
308 (
'searchform_mode',
'adv'),
309 (
'search_attr',
'associatedDomain'),
310 (
'search_option', SEARCH_OPT_IS_EQUAL),
311 (
'search_string', ip_addr.reverse_pointer),
313 title=
'Search PTR RR for this AAAA address',
315 return web2ldapcnf.command_link_separator.join(res)
317syntax_registry.reg_at(
319 '1.3.6.1.4.1.2428.20.1.28',
325 oid: str =
'SSHFPRecord-oid'
326 desc: str =
'A resource record with SSH fingerprint (SSHFP)'
327 pattern = re.compile(
'^[0-4]? [0-2]? [0-9a-fA-F]+$')
341 b
'1': 2*hashlib.sha1().digest_size,
342 b
'2': 2*hashlib.sha256().digest_size,
349 key_algo, fp_algo, fp_value = filter(
351 map(bytes.strip, attr_value.lower().split(b
' '))
355 return b
' '.join((key_algo, fp_algo, fp_value))
359 key_algo, fp_algo, fp_value = tuple(
360 filter(
None, map(bytes.strip, attr_value.split(b
' ')))
371 result = result
and len(fp_value) == fp_algo_len
375 display_value = IA5String.display(self, vidx, links)
377 key_algo, fp_algo, _ = tuple(filter(
None, map(bytes.strip, self.
_av.split(b
' '))))
390 'key_algo={key_algo_name} '
391 'fp_algo={fp_algo_name}:<br>'
392 '<code>{display_value}</code>'
394 key_algo_name=key_algo_name,
395 fp_algo_name=fp_algo_name,
396 display_value=display_value,
400syntax_registry.reg_at(
402 '1.3.6.1.4.1.2428.20.1.44',
408syntax_registry.reg_syntaxes(__name__)
str display(self, vidx, links)
str display(self, vidx, links)
str display(self, vidx, links)
bytes sanitize(self, bytes attr_value)
bool _validate(self, bytes attr_value)
def _search_ref(self, str attr_value)
str display(self, vidx, links)
def __init__(self, app, str dn, schema, str attrType, bytes attr_value, entry=None)
bytes sanitize(self, bytes attr_value)
str display(self, vidx, links)
bool _validate(self, bytes attr_value)