dnspython  1.16.0
About: dnspython is a DNS toolkit (for Python 2.x) that supports almost all record types.
  Fossies Dox: dnspython-1.16.0.tar.gz  ("inofficial" and yet experimental doxygen-generated source code documentation)  

rdata.py
Go to the documentation of this file.
1 # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
2 
3 # Copyright (C) 2001-2017 Nominum, Inc.
4 #
5 # Permission to use, copy, modify, and distribute this software and its
6 # documentation for any purpose with or without fee is hereby granted,
7 # provided that the above copyright notice and this permission notice
8 # appear in all copies.
9 #
10 # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 
18 """DNS rdata."""
19 
20 from io import BytesIO
21 import base64
22 import binascii
23 
24 import dns.exception
25 import dns.name
26 import dns.rdataclass
27 import dns.rdatatype
28 import dns.tokenizer
29 import dns.wiredata
30 from ._compat import xrange, string_types, text_type
31 
32 try:
33  import threading as _threading
34 except ImportError:
35  import dummy_threading as _threading
36 
37 _hex_chunksize = 32
38 
39 
40 def _hexify(data, chunksize=_hex_chunksize):
41  """Convert a binary string into its hex encoding, broken up into chunks
42  of chunksize characters separated by a space.
43  """
44 
45  line = binascii.hexlify(data)
46  return b' '.join([line[i:i + chunksize]
47  for i
48  in range(0, len(line), chunksize)]).decode()
49 
50 _base64_chunksize = 32
51 
52 
53 def _base64ify(data, chunksize=_base64_chunksize):
54  """Convert a binary string into its base64 encoding, broken up into chunks
55  of chunksize characters separated by a space.
56  """
57 
58  line = base64.b64encode(data)
59  return b' '.join([line[i:i + chunksize]
60  for i
61  in range(0, len(line), chunksize)]).decode()
62 
63 __escaped = bytearray(b'"\\')
64 
65 def _escapify(qstring):
66  """Escape the characters in a quoted string which need it."""
67 
68  if isinstance(qstring, text_type):
69  qstring = qstring.encode()
70  if not isinstance(qstring, bytearray):
71  qstring = bytearray(qstring)
72 
73  text = ''
74  for c in qstring:
75  if c in __escaped:
76  text += '\\' + chr(c)
77  elif c >= 0x20 and c < 0x7F:
78  text += chr(c)
79  else:
80  text += '\\%03d' % c
81  return text
82 
83 
84 def _truncate_bitmap(what):
85  """Determine the index of greatest byte that isn't all zeros, and
86  return the bitmap that contains all the bytes less than that index.
87  """
88 
89  for i in xrange(len(what) - 1, -1, -1):
90  if what[i] != 0:
91  return what[0: i + 1]
92  return what[0:1]
93 
94 
95 class Rdata(object):
96  """Base class for all DNS rdata types."""
97 
98  __slots__ = ['rdclass', 'rdtype']
99 
100  def __init__(self, rdclass, rdtype):
101  """Initialize an rdata.
102 
103  *rdclass*, an ``int`` is the rdataclass of the Rdata.
104  *rdtype*, an ``int`` is the rdatatype of the Rdata.
105  """
106 
107  self.rdclass = rdclass
108  self.rdtype = rdtype
109 
110  def covers(self):
111  """Return the type a Rdata covers.
112 
113  DNS SIG/RRSIG rdatas apply to a specific type; this type is
114  returned by the covers() function. If the rdata type is not
115  SIG or RRSIG, dns.rdatatype.NONE is returned. This is useful when
116  creating rdatasets, allowing the rdataset to contain only RRSIGs
117  of a particular type, e.g. RRSIG(NS).
118 
119  Returns an ``int``.
120  """
121 
122  return dns.rdatatype.NONE
123 
125  """Return a 32-bit type value, the least significant 16 bits of
126  which are the ordinary DNS type, and the upper 16 bits of which are
127  the "covered" type, if any.
128 
129  Returns an ``int``.
130  """
131 
132  return self.covers() << 16 | self.rdtype
133 
134  def to_text(self, origin=None, relativize=True, **kw):
135  """Convert an rdata to text format.
136 
137  Returns a ``text``.
138  """
139 
140  raise NotImplementedError
141 
142  def to_wire(self, file, compress=None, origin=None):
143  """Convert an rdata to wire format.
144 
145  Returns a ``binary``.
146  """
147 
148  raise NotImplementedError
149 
150  def to_digestable(self, origin=None):
151  """Convert rdata to a format suitable for digesting in hashes. This
152  is also the DNSSEC canonical form.
153 
154  Returns a ``binary``.
155  """
156 
157  f = BytesIO()
158  self.to_wire(f, None, origin)
159  return f.getvalue()
160 
161  def validate(self):
162  """Check that the current contents of the rdata's fields are
163  valid.
164 
165  If you change an rdata by assigning to its fields,
166  it is a good idea to call validate() when you are done making
167  changes.
168 
169  Raises various exceptions if there are problems.
170 
171  Returns ``None``.
172  """
173 
174  dns.rdata.from_text(self.rdclass, self.rdtype, self.to_text())
175 
176  def __repr__(self):
177  covers = self.covers()
178  if covers == dns.rdatatype.NONE:
179  ctext = ''
180  else:
181  ctext = '(' + dns.rdatatype.to_text(covers) + ')'
182  return '<DNS ' + dns.rdataclass.to_text(self.rdclass) + ' ' + \
183  dns.rdatatype.to_text(self.rdtype) + ctext + ' rdata: ' + \
184  str(self) + '>'
185 
186  def __str__(self):
187  return self.to_text()
188 
189  def _cmp(self, other):
190  """Compare an rdata with another rdata of the same rdtype and
191  rdclass.
192 
193  Return < 0 if self < other in the DNSSEC ordering, 0 if self
194  == other, and > 0 if self > other.
195 
196  """
197  our = self.to_digestable(dns.name.root)
198  their = other.to_digestable(dns.name.root)
199  if our == their:
200  return 0
201  elif our > their:
202  return 1
203  else:
204  return -1
205 
206  def __eq__(self, other):
207  if not isinstance(other, Rdata):
208  return False
209  if self.rdclass != other.rdclass or self.rdtype != other.rdtype:
210  return False
211  return self._cmp(other) == 0
212 
213  def __ne__(self, other):
214  if not isinstance(other, Rdata):
215  return True
216  if self.rdclass != other.rdclass or self.rdtype != other.rdtype:
217  return True
218  return self._cmp(other) != 0
219 
220  def __lt__(self, other):
221  if not isinstance(other, Rdata) or \
222  self.rdclass != other.rdclass or self.rdtype != other.rdtype:
223 
224  return NotImplemented
225  return self._cmp(other) < 0
226 
227  def __le__(self, other):
228  if not isinstance(other, Rdata) or \
229  self.rdclass != other.rdclass or self.rdtype != other.rdtype:
230  return NotImplemented
231  return self._cmp(other) <= 0
232 
233  def __ge__(self, other):
234  if not isinstance(other, Rdata) or \
235  self.rdclass != other.rdclass or self.rdtype != other.rdtype:
236  return NotImplemented
237  return self._cmp(other) >= 0
238 
239  def __gt__(self, other):
240  if not isinstance(other, Rdata) or \
241  self.rdclass != other.rdclass or self.rdtype != other.rdtype:
242  return NotImplemented
243  return self._cmp(other) > 0
244 
245  def __hash__(self):
246  return hash(self.to_digestable(dns.name.root))
247 
248  @classmethod
249  def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
250  raise NotImplementedError
251 
252  @classmethod
253  def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
254  raise NotImplementedError
255 
256  def choose_relativity(self, origin=None, relativize=True):
257  """Convert any domain names in the rdata to the specified
258  relativization.
259  """
260 
261 class GenericRdata(Rdata):
262 
263  """Generic Rdata Class
264 
265  This class is used for rdata types for which we have no better
266  implementation. It implements the DNS "unknown RRs" scheme.
267  """
268 
269  __slots__ = ['data']
270 
271  def __init__(self, rdclass, rdtype, data):
272  super(GenericRdata, self).__init__(rdclass, rdtype)
273  self.data = data
274 
275  def to_text(self, origin=None, relativize=True, **kw):
276  return r'\# %d ' % len(self.data) + _hexify(self.data)
277 
278  @classmethod
279  def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
280  token = tok.get()
281  if not token.is_identifier() or token.value != r'\#':
283  r'generic rdata does not start with \#')
284  length = tok.get_int()
285  chunks = []
286  while 1:
287  token = tok.get()
288  if token.is_eol_or_eof():
289  break
290  chunks.append(token.value.encode())
291  hex = b''.join(chunks)
292  data = binascii.unhexlify(hex)
293  if len(data) != length:
295  'generic rdata hex data has wrong length')
296  return cls(rdclass, rdtype, data)
297 
298  def to_wire(self, file, compress=None, origin=None):
299  file.write(self.data)
300 
301  @classmethod
302  def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
303  return cls(rdclass, rdtype, wire[current: current + rdlen])
304 
305 _rdata_modules = {}
306 _module_prefix = 'dns.rdtypes'
307 _import_lock = _threading.Lock()
308 
309 def get_rdata_class(rdclass, rdtype):
310 
311  def import_module(name):
312  with _import_lock:
313  mod = __import__(name)
314  components = name.split('.')
315  for comp in components[1:]:
316  mod = getattr(mod, comp)
317  return mod
318 
319  mod = _rdata_modules.get((rdclass, rdtype))
320  rdclass_text = dns.rdataclass.to_text(rdclass)
321  rdtype_text = dns.rdatatype.to_text(rdtype)
322  rdtype_text = rdtype_text.replace('-', '_')
323  if not mod:
324  mod = _rdata_modules.get((dns.rdatatype.ANY, rdtype))
325  if not mod:
326  try:
327  mod = import_module('.'.join([_module_prefix,
328  rdclass_text, rdtype_text]))
329  _rdata_modules[(rdclass, rdtype)] = mod
330  except ImportError:
331  try:
332  mod = import_module('.'.join([_module_prefix,
333  'ANY', rdtype_text]))
334  _rdata_modules[(dns.rdataclass.ANY, rdtype)] = mod
335  except ImportError:
336  mod = None
337  if mod:
338  cls = getattr(mod, rdtype_text)
339  else:
340  cls = GenericRdata
341  return cls
342 
343 
344 def from_text(rdclass, rdtype, tok, origin=None, relativize=True):
345  """Build an rdata object from text format.
346 
347  This function attempts to dynamically load a class which
348  implements the specified rdata class and type. If there is no
349  class-and-type-specific implementation, the GenericRdata class
350  is used.
351 
352  Once a class is chosen, its from_text() class method is called
353  with the parameters to this function.
354 
355  If *tok* is a ``text``, then a tokenizer is created and the string
356  is used as its input.
357 
358  *rdclass*, an ``int``, the rdataclass.
359 
360  *rdtype*, an ``int``, the rdatatype.
361 
362  *tok*, a ``dns.tokenizer.Tokenizer`` or a ``text``.
363 
364  *origin*, a ``dns.name.Name`` (or ``None``), the
365  origin to use for relative names.
366 
367  *relativize*, a ``bool``. If true, name will be relativized to
368  the specified origin.
369 
370  Returns an instance of the chosen Rdata subclass.
371  """
372 
373  if isinstance(tok, string_types):
374  tok = dns.tokenizer.Tokenizer(tok)
375  cls = get_rdata_class(rdclass, rdtype)
376  if cls != GenericRdata:
377  # peek at first token
378  token = tok.get()
379  tok.unget(token)
380  if token.is_identifier() and \
381  token.value == r'\#':
382  #
383  # Known type using the generic syntax. Extract the
384  # wire form from the generic syntax, and then run
385  # from_wire on it.
386  #
387  rdata = GenericRdata.from_text(rdclass, rdtype, tok, origin,
388  relativize)
389  return from_wire(rdclass, rdtype, rdata.data, 0, len(rdata.data),
390  origin)
391  return cls.from_text(rdclass, rdtype, tok, origin, relativize)
392 
393 
394 def from_wire(rdclass, rdtype, wire, current, rdlen, origin=None):
395  """Build an rdata object from wire format
396 
397  This function attempts to dynamically load a class which
398  implements the specified rdata class and type. If there is no
399  class-and-type-specific implementation, the GenericRdata class
400  is used.
401 
402  Once a class is chosen, its from_wire() class method is called
403  with the parameters to this function.
404 
405  *rdclass*, an ``int``, the rdataclass.
406 
407  *rdtype*, an ``int``, the rdatatype.
408 
409  *wire*, a ``binary``, the wire-format message.
410 
411  *current*, an ``int``, the offset in wire of the beginning of
412  the rdata.
413 
414  *rdlen*, an ``int``, the length of the wire-format rdata
415 
416  *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``,
417  then names will be relativized to this origin.
418 
419  Returns an instance of the chosen Rdata subclass.
420  """
421 
422  wire = dns.wiredata.maybe_wrap(wire)
423  cls = get_rdata_class(rdclass, rdtype)
424  return cls.from_wire(rdclass, rdtype, wire, current, rdlen, origin)
425 
426 
428  """DNS rdatatype already exists."""
429  supp_kwargs = {'rdclass', 'rdtype'}
430  fmt = "The rdata type with class {rdclass} and rdtype {rdtype} " + \
431  "already exists."
432 
433 
434 def register_type(implementation, rdtype, rdtype_text, is_singleton=False,
435  rdclass=dns.rdataclass.IN):
436  """Dynamically register a module to handle an rdatatype.
437 
438  *implementation*, a module implementing the type in the usual dnspython
439  way.
440 
441  *rdtype*, an ``int``, the rdatatype to register.
442 
443  *rdtype_text*, a ``text``, the textual form of the rdatatype.
444 
445  *is_singleton*, a ``bool``, indicating if the type is a singleton (i.e.
446  RRsets of the type can have only one member.)
447 
448  *rdclass*, the rdataclass of the type, or ``dns.rdataclass.ANY`` if
449  it applies to all classes.
450  """
451 
452  existing_cls = get_rdata_class(rdclass, rdtype)
453  if existing_cls != GenericRdata:
454  raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype)
455  _rdata_modules[(rdclass, rdtype)] = implementation
456  dns.rdatatype.register_type(rdtype, rdtype_text, is_singleton)
dns.exception.SyntaxError
Definition: exception.py:113
dns.rdata.Rdata.__le__
def __le__(self, other)
Definition: rdata.py:227
dns.rdata.Rdata.__ne__
def __ne__(self, other)
Definition: rdata.py:213
dns.rdata.Rdata.__init__
def __init__(self, rdclass, rdtype)
Definition: rdata.py:100
dns.rdata.Rdata.__str__
def __str__(self)
Definition: rdata.py:186
dns.rdata.Rdata._cmp
def _cmp(self, other)
Definition: rdata.py:189
dns.rdata.GenericRdata.to_wire
def to_wire(self, file, compress=None, origin=None)
Definition: rdata.py:298
dns.rdata.get_rdata_class
def get_rdata_class(rdclass, rdtype)
Definition: rdata.py:309
dns.rdata._truncate_bitmap
def _truncate_bitmap(what)
Definition: rdata.py:84
dns.rdata.GenericRdata.__init__
def __init__(self, rdclass, rdtype, data)
Definition: rdata.py:271
dns.rdata.Rdata.from_wire
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None)
Definition: rdata.py:253
dns.rdataclass
Definition: rdataclass.py:1
dns.exception.DNSException
Definition: exception.py:24
dns.rdata.GenericRdata.from_text
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True)
Definition: rdata.py:279
dns.rdata._hexify
def _hexify(data, chunksize=_hex_chunksize)
Definition: rdata.py:40
dns.rdata.Rdata.validate
def validate(self)
Definition: rdata.py:161
dns.rdata.GenericRdata.data
data
Definition: rdata.py:273
dns.rdata.Rdata.__repr__
def __repr__(self)
Definition: rdata.py:176
dns.rdata.Rdata.to_text
def to_text(self, origin=None, relativize=True, **kw)
Definition: rdata.py:134
dns.rdata.Rdata.rdclass
rdclass
Definition: rdata.py:107
dns.rdata.Rdata.__hash__
def __hash__(self)
Definition: rdata.py:245
dns.rdata.Rdata.__ge__
def __ge__(self, other)
Definition: rdata.py:233
dns.rdata.GenericRdata.from_wire
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None)
Definition: rdata.py:302
dns.rdata.register_type
def register_type(implementation, rdtype, rdtype_text, is_singleton=False, rdclass=dns.rdataclass.IN)
Definition: rdata.py:434
dns.rdata.Rdata.choose_relativity
def choose_relativity(self, origin=None, relativize=True)
Definition: rdata.py:256
dns.rdata.GenericRdata.to_text
def to_text(self, origin=None, relativize=True, **kw)
Definition: rdata.py:275
dns.rdata.from_text
def from_text(rdclass, rdtype, tok, origin=None, relativize=True)
Definition: rdata.py:344
dns.rdata._escapify
def _escapify(qstring)
Definition: rdata.py:65
dns.rdata.Rdata.__eq__
def __eq__(self, other)
Definition: rdata.py:206
dns.rdataclass.to_text
def to_text(value)
Definition: rdataclass.py:93
dns.rdata.Rdata.__lt__
def __lt__(self, other)
Definition: rdata.py:220
dns.rdatatype.register_type
def register_type(rdtype, rdtype_text, is_singleton=False)
Definition: rdatatype.py:273
dns.wiredata
Definition: wiredata.py:1
dns.rdata.Rdata.from_text
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True)
Definition: rdata.py:249
dns.tokenizer.Tokenizer
Definition: tokenizer.py:152
dns.name
Definition: name.py:1
dns.rdatatype
Definition: rdatatype.py:1
dns.rdata.RdatatypeExists
Definition: rdata.py:427
dns.rdatatype.to_text
def to_text(value)
Definition: rdatatype.py:219
dns.rdata.Rdata.to_digestable
def to_digestable(self, origin=None)
Definition: rdata.py:150
dns.rdata._base64ify
def _base64ify(data, chunksize=_base64_chunksize)
Definition: rdata.py:53
dns.rdata.Rdata.to_wire
def to_wire(self, file, compress=None, origin=None)
Definition: rdata.py:142
dns.rdata.from_wire
def from_wire(rdclass, rdtype, wire, current, rdlen, origin=None)
Definition: rdata.py:394
dns._compat.xrange
xrange
Definition: _compat.py:11
dns.rdata.Rdata.extended_rdatatype
def extended_rdatatype(self)
Definition: rdata.py:124
dns.rdata.Rdata.__gt__
def __gt__(self, other)
Definition: rdata.py:239
dns.rdata.Rdata.covers
def covers(self)
Definition: rdata.py:110
dns.tokenizer
Definition: tokenizer.py:1
dns.rdata.Rdata.rdtype
rdtype
Definition: rdata.py:108
dns.exception
Definition: exception.py:1
dns.rdata.Rdata
Definition: rdata.py:95
dns.wiredata.maybe_wrap
def maybe_wrap(wire)
Definition: wiredata.py:96