"Fossies" - the Fresh Open Source Software Archive

Member "rbldnsd-0.998/test_btrie.py" (7 Apr 2013, 15741 Bytes) of package /linux/misc/dns/rbldnsd-0.998.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file.

    1 """ Tests for btrie.c
    2 """
    3 
    4 import os
    5 import re
    6 from subprocess import Popen, PIPE
    7 from tempfile import TemporaryFile, NamedTemporaryFile
    8 import unittest
    9 
   10 from rbldnsd import Rbldnsd, ZoneFile
   11 
   12 __all__ = [
   13     'Test_coalesce_lc_node',
   14     'Test_shorten_lc_node',
   15     'Test_convert_lc_node_1',
   16     'Test_convert_lc_node',
   17     'Test_insert_lc_node',
   18     'Test_init_tbm_node',
   19     'Test_add_to_trie',
   20     'Test_search_trie',
   21     ]
   22 
   23 def deduce_pointer_size(makefile='./Makefile'):
   24     """ Deduce the pointer size (in the current compilation environment)
   25     """
   26     with file(makefile) as f:
   27         make_vars = dict(
   28             m.groups()
   29             for m in (re.match(r'\s*(\w+)\s*=\s*(.*?)\s*\Z', line)
   30                       for line in f)
   31             if m is not None)
   32     cc = make_vars['CC']
   33     cflags = make_vars['CFLAGS']
   34 
   35     test_c = NamedTemporaryFile(suffix=".c")
   36     test_c.write(r'''
   37 #include <stdio.h>
   38 #ifndef __SIZEOF_POINTER__
   39 # define __SIZEOF_POINTER__ sizeof(void *)
   40 #endif
   41 int main () {
   42   printf("%u\n", (unsigned)__SIZEOF_POINTER__);
   43   return 0;
   44 }
   45 ''')
   46     test_c.flush()
   47     src = test_c.name
   48 
   49     try:
   50         proc = Popen("%(cc)s %(cflags)s -o %(src)s.bin %(src)s && %(src)s.bin"
   51                      % locals(),
   52                      shell=True, stdout=PIPE)
   53         output = proc.stdout.read()
   54         if proc.wait() != 0:
   55             raise RuntimeError("test prog exited with code %d"
   56                                % proc.returncode)
   57         return int(output)
   58     finally:
   59         try:
   60             os.unlink(src + '.bin')
   61         except:
   62             pass
   63 
   64 try:
   65     sizeof_pointer = deduce_pointer_size()
   66 except Exception:
   67     print "Can not deduce size of pointer. Assuming pointer size of 8."
   68     sizeof_pointer = 8
   69 
   70 if sizeof_pointer == 8:
   71     STRIDE = 5
   72     LC_BYTES_PER_NODE = 7
   73 elif sizeof_pointer == 4:
   74     STRIDE = 4
   75     LC_BYTES_PER_NODE = 3
   76 else:
   77     raise RuntimeError("Unsupported pointer size (%d)" % sizeof_pointer)
   78 
   79 def pad_prefix(prefix, plen):
   80     """Pad prefix on the right with zeros to a full 128 bits
   81     """
   82     if not isinstance(prefix, (int, long)):
   83         raise TypeError("prefix must be an integer")
   84     if not 0 <= int(plen) <= 128:
   85         raise ValueError("plen out of range")
   86     if not 0 <= prefix < (1 << plen):
   87         raise ValueError("prefix out of range")
   88     return prefix << (128 - plen)
   89 
   90 class BTrie(object):
   91     """ A class to construct and perform lookups on a btrie.
   92 
   93     Since we do not have python bindings for btrie, we do this in a
   94     roundabout way by running rbldnsd with a single ip6trie dataset,
   95     and then querying the rbldnsd to perform the lookup.
   96 
   97     """
   98     def __init__(self, prefixes, **kwargs):
   99         self.rbldnsd = Rbldnsd(**kwargs)
  100         zonedata = (self._zone_entry(*prefix) for prefix in prefixes)
  101         self.rbldnsd.add_dataset('ip6trie', ZoneFile(zonedata))
  102 
  103     def __enter__(self):
  104         self.rbldnsd.__enter__()
  105         return self
  106 
  107     def __exit__(self, exc_type, exc_value, exc_tb):
  108         return self.rbldnsd.__exit__(exc_type, exc_value, exc_tb)
  109 
  110     def lookup(self, prefix, plen):
  111         prefix = pad_prefix(prefix, plen)
  112         nibbles = '.'.join("%x" % ((prefix >> n) & 0x0f)
  113                            for n in range(0, 128, 4))
  114         return self.rbldnsd.query(nibbles + '.example.com')
  115 
  116     def _zone_entry(self, prefix, plen, data):
  117         prefix = pad_prefix(prefix, plen)
  118         ip6addr = ':'.join("%x" % ((prefix >> n) & 0xffff)
  119                            for n in range(112, -16, -16))
  120         return "%s/%u :1:%s" % (ip6addr, plen, data)
  121 
  122 class CaptureOutput(object):
  123     def __init__(self):
  124         self._file = TemporaryFile()
  125 
  126     def fileno(self):
  127         return self._file.fileno()
  128 
  129     def __contains__(self, substr):
  130         return substr in str(self)
  131 
  132     def __str__(self):
  133         self._file.seek(0, 0)
  134         return self._file.read()
  135 
  136 class Test_coalesce_lc_node(unittest.TestCase):
  137     def test_merge(self):
  138         # test coverage of coalesce_lc_node
  139         prefixes = [
  140             # this prefix is too long for a single LC node
  141             # but after we stick the TBM node at 0/0 it should
  142             # just fit into a single LC extension path
  143             (0, 8 * (LC_BYTES_PER_NODE + 1), "term"),
  144             # Add a TBM node to shorten the above LC node
  145             (0, (8 - STRIDE), "root"),
  146             ]
  147         with BTrie(prefixes) as btrie:
  148             self.assertEqual(btrie.lookup(0, 8), "term")
  149             self.assertEqual(btrie.lookup(1, 8), "root")
  150 
  151     def test_steal_bits(self):
  152         # test coverage of coalesce_lc_node
  153         prefixes = [
  154             # This prefix is too long for a single LC node.  After we
  155             # stick the TBM node at 0/0 it should still be too long
  156             # for a single LC node, but the upper LC node should steal
  157             # bits from the terminal LC node.
  158             (0, 8 * (LC_BYTES_PER_NODE + 1) + 1, "term"),
  159             (0, (8 - STRIDE), "root"),
  160             ]
  161         with BTrie(prefixes) as btrie:
  162             self.assertEqual(btrie.lookup(0, 0), "term")
  163             self.assertEqual(btrie.lookup(1, 8), "root")
  164 
  165 class Test_shorten_lc_node(unittest.TestCase):
  166     def test_steal_child(self):
  167         # test coverage of coalesce_lc_node
  168         prefixes = [
  169             # this prefix is too long for a single LC node
  170             # but after we stick the TBM node at 0/0 it should
  171             # just fit into a single LC extension path
  172             (0, 9, "tbm root"),
  173             (0, 10, "term"),
  174             # Add a TBM node to shorten the above LC node
  175             (0, 9 - STRIDE, "root"),
  176             ]
  177         with BTrie(prefixes) as btrie:
  178             self.assertEqual(btrie.lookup(0, 8), "term")
  179             self.assertEqual(btrie.lookup(1, 8), "root")
  180 
  181 class Test_convert_lc_node_1(unittest.TestCase):
  182     def test_left_child(self):
  183         # test coverage of coalesce_lc_node
  184         prefixes = [
  185             # create TBM node at depth 1
  186             (0, 2, "term"),
  187             (0, 1, "tbm node"),
  188             # promote to depth 0
  189             (0, 0, "root"),
  190             ]
  191         with BTrie(prefixes) as btrie:
  192             self.assertEqual(btrie.lookup(0, 0), "term")
  193             self.assertEqual(btrie.lookup(1, 2), "tbm node")
  194             self.assertEqual(btrie.lookup(1, 1), "root")
  195 
  196     def test_right_child(self):
  197         # test coverage of coalesce_lc_node
  198         prefixes = [
  199             (3, 2, "term"),
  200             (1, 1, "tbm node"),
  201             (0, 0, "root"),
  202             ]
  203         with BTrie(prefixes) as btrie:
  204             self.assertEqual(btrie.lookup(3, 2), "term")
  205             self.assertEqual(btrie.lookup(2, 2), "tbm node")
  206             self.assertEqual(btrie.lookup(0, 1), "root")
  207 
  208 class Test_convert_lc_node(unittest.TestCase):
  209     def test_left_child(self):
  210         # test coverage of coalesce_lc_node
  211         prefixes = [
  212             # create TBM node at depth STRIDE - 1
  213             (0, STRIDE, "term"),
  214             (0, STRIDE - 1, "tbm node"),
  215             # promote to depth 0
  216             (0, 0, "root"),
  217             ]
  218         with BTrie(prefixes) as btrie:
  219             self.assertEqual(btrie.lookup(0, STRIDE), "term")
  220             self.assertEqual(btrie.lookup(1, STRIDE), "tbm node")
  221             self.assertEqual(btrie.lookup(1, 1), "root")
  222 
  223 class Test_insert_lc_node(unittest.TestCase):
  224     def test_insert_lc_len_1(self):
  225         prefixes = [
  226             # create TBM node at depth 1 with TBM extending path
  227             (0, STRIDE + 2, "term"),
  228             (0, STRIDE + 1, "tbm ext path"),
  229             (0, 1, "tbm node"),
  230             # promote to depth 0
  231             (0, 0, "root"),
  232             ]
  233         with BTrie(prefixes) as btrie:
  234             self.assertEqual(btrie.lookup(0, 0), "term")
  235             self.assertEqual(btrie.lookup(1, STRIDE + 2), "tbm ext path")
  236             self.assertEqual(btrie.lookup(1, 2), "tbm node")
  237             self.assertEqual(btrie.lookup(1, 1), "root")
  238 
  239     def test_extend_lc_tail_optimization(self):
  240         prefixes = [
  241             # create TBM node at depth 1 with LC extending path
  242             (1, STRIDE + 2, "term"),
  243             (0, 1, "tbm node"),
  244             # promote to depth 0
  245             (0, 0, "root"),
  246             ]
  247         with BTrie(prefixes) as btrie:
  248             self.assertEqual(btrie.lookup(1, STRIDE + 2), "term")
  249             self.assertEqual(btrie.lookup(0, 0), "tbm node")
  250             self.assertEqual(btrie.lookup(1, 1), "root")
  251 
  252     def test_coalesce_lc_tail(self):
  253         prefixes = [
  254             # create TBM node with LC extending path which starts
  255             # at a byte boundary.
  256             (0, 10, "term"),
  257             (0, 8 - STRIDE, "tbm node"),
  258             # promote one level
  259             (0, 7 - STRIDE, "promoted"),
  260             ]
  261         with BTrie(prefixes) as btrie:
  262             self.assertEqual(btrie.lookup(0, 0), "term")
  263             self.assertEqual(btrie.lookup(1, 9 - STRIDE), "tbm node")
  264             self.assertEqual(btrie.lookup(1, 8 - STRIDE), "promoted")
  265 
  266 class Test_init_tbm_node(unittest.TestCase):
  267     def test_short_lc_children(self):
  268         # this exercises the convert_lc_node calls in init_tbm_node()
  269         prefixes = [
  270             # create TBM node at depth 1, with two LC extending paths
  271             # from a deep internal node
  272             (0, 1, "tbm"),
  273             (0, STRIDE + 2, "term0"),
  274             (2, STRIDE + 2, "term1"),
  275             # promote one level
  276             (0, 0, "root"),
  277             ]
  278         with BTrie(prefixes) as btrie:
  279             self.assertEqual(btrie.lookup(0, 0), "term0")
  280             self.assertEqual(btrie.lookup(1, STRIDE + 1), "term1")
  281             self.assertEqual(btrie.lookup(1, 2), "tbm")
  282             self.assertEqual(btrie.lookup(1, 1), "root")
  283 
  284     def test_long_lc_children(self):
  285         # this exercises the shorten_lc_node calls in init_tbm_node()
  286         prefixes = [
  287             # create TBM node at depth 1, with two LC extending paths
  288             # from a deep internal node
  289             (0, 1, "tbm"),
  290             (0, STRIDE + 9, "term0"),
  291             (0x100, STRIDE + 9, "term1"),
  292             # promote one level
  293             (0, 0, "root"),
  294             ]
  295         with BTrie(prefixes) as btrie:
  296             self.assertEqual(btrie.lookup(0, 0), "term0")
  297             self.assertEqual(btrie.lookup(1, STRIDE + 1), "term1")
  298             self.assertEqual(btrie.lookup(1, 2), "tbm")
  299             self.assertEqual(btrie.lookup(1, 1), "root")
  300 
  301     def test_set_internal_data_for_root_prefix(self):
  302         # this exercises the "set internal data for root prefix" code
  303         prefixes = [
  304             # create TBM node at depth 1, with internal prefix data
  305             # and an extending path on a deep internal node
  306             (0, 1, "tbm"),
  307             (0, STRIDE, "int data"),
  308             (0, STRIDE + 1, "ext path"),
  309             # promote one level
  310             (0, 0, "root"),
  311             ]
  312         with BTrie(prefixes) as btrie:
  313             self.assertEqual(btrie.lookup(0, 0), "ext path")
  314             self.assertEqual(btrie.lookup(1, STRIDE + 1), "int data")
  315             self.assertEqual(btrie.lookup(1, 2), "tbm")
  316             self.assertEqual(btrie.lookup(1, 1), "root")
  317 
  318     def test_set_right_ext_path(self):
  319         # this exercises the insert_lc_node(right_ext) call in init_tbm_node()
  320         # this also exercises next_pbyte with (pos + TBM_STRIDE) % 8 == 0
  321         prefixes = [
  322             # create TBM node at depth (9 - STRIDE) with a right TBM
  323             # extending path on a deep internal node
  324             (0, 9 - STRIDE, "tbm"),
  325             (1, 9, "ext path"),
  326             (2, 10, "term"),
  327             # promote one level to depth (8 - STRIDE)
  328             (0, 8 - STRIDE, "top"),
  329             ]
  330         with BTrie(prefixes) as btrie:
  331             self.assertEqual(btrie.lookup(0, 0), "tbm")
  332             self.assertEqual(btrie.lookup(2, 10), "term")
  333             self.assertEqual(btrie.lookup(3, 10), "ext path")
  334             self.assertEqual(btrie.lookup(1, 9 - STRIDE), "top")
  335 
  336 class Test_add_to_trie(unittest.TestCase):
  337     def test_duplicate_terminal_lc(self):
  338         prefixes = [
  339             (0, 1, "term"),
  340             (0, 1, "term"),
  341             ]
  342         stderr = CaptureOutput()
  343         with BTrie(prefixes, stderr=stderr) as btrie:
  344             self.assertEqual(btrie.lookup(0, 0), "term")
  345         self.assertTrue("duplicated entry for" in stderr,
  346                         "No duplicated entry error message in stderr: %r"
  347                         % str(stderr))
  348 
  349     def test_duplicate_internal_data(self):
  350         prefixes = [
  351             (0, 0, "root"),
  352             (2, 3, "term"),
  353             (2, 3, "term"),
  354             ]
  355         stderr = CaptureOutput()
  356         with BTrie(prefixes, stderr=stderr) as btrie:
  357             self.assertEqual(btrie.lookup(4, 4), "term")
  358             self.assertEqual(btrie.lookup(0, 0), "root")
  359         self.assertTrue("duplicated entry for" in stderr,
  360                         "No duplicated entry error message in stderr: %r"
  361                         % str(stderr))
  362 
  363     def test_split_first_byte_of_lc_prefix(self):
  364         # this is for coverage of common_prefix()
  365         prefixes = [
  366             (0x1234, 16, "long"),
  367             (0x1000, 16, "splitter"),
  368             ]
  369         with BTrie(prefixes) as btrie:
  370             self.assertEqual(btrie.lookup(0x1234, 16), "long")
  371             self.assertEqual(btrie.lookup(0x1000, 16), "splitter")
  372 
  373     def test_split_last_byte_of_lc_prefix(self):
  374         # this is for coverage of common_prefix()
  375         prefixes = [
  376             (0x1234, 15, "long"),
  377             (0x1238, 15, "splitter"),
  378             ]
  379         with BTrie(prefixes) as btrie:
  380             self.assertEqual(btrie.lookup(0x1234, 15), "long")
  381             self.assertEqual(btrie.lookup(0x1238, 15), "splitter")
  382 
  383 class Test_search_trie(unittest.TestCase):
  384     def test_tbm_root_data(self):
  385         # test access to root internal node in a TBM node
  386         prefixes = [(0, 127, "tbm root"),
  387                     (1, 128, "int data")]
  388         with BTrie(prefixes) as btrie:
  389             self.assertEqual(btrie.lookup(0, 0), "tbm root")
  390 
  391     def test_tbm_internal_data(self):
  392         # test access to each (non-root) internal node in a TBM node
  393         for plen in range(1, STRIDE):
  394             # TBM node
  395             prefixes = [(0, 128 - plen, "tbm root")]
  396             prefixes.extend((pfx, 128, "%u/%u" % (pfx, plen))
  397                             for pfx in range(1 << plen))
  398             with BTrie(prefixes) as btrie:
  399                 for pfx in range(1 << plen):
  400                     self.assertEqual(btrie.lookup(pfx, 128),
  401                                      "%u/%u" % (pfx, plen))
  402 
  403     def test_tbm_extending_paths(self):
  404         # test access to each extended path of a TBM node
  405         prefixes = [(0,0,"root")] # make sure to create top-level TBM node
  406         prefixes.extend((pfx, STRIDE, str(pfx)) for pfx in range(1 << STRIDE))
  407         with BTrie(prefixes) as btrie:
  408             for pfx in range(1 << STRIDE):
  409                 self.assertEqual(btrie.lookup(pfx, STRIDE), str(pfx))
  410 
  411     def test_no_match(self):
  412         prefixes = [
  413             (1, 2, "term"),
  414             ]
  415         with BTrie(prefixes) as btrie:
  416             self.assertEqual(btrie.lookup(0,0), None)
  417 
  418     def test_follow_lc(self):
  419         prefixes = [
  420             (0, 2 * STRIDE, "term"),
  421             ]
  422         with BTrie(prefixes) as btrie:
  423             self.assertEqual(btrie.lookup(0,0), "term")
  424 
  425     def test_parents_internal_data(self):
  426         prefixes = [
  427             (0, 0, "root"),
  428             (2, 2, "int data"),
  429             (0x200, 10, "term"),
  430             ]
  431         with BTrie(prefixes) as btrie:
  432             self.assertEqual(btrie.lookup(0x201, 10), "int data")
  433 
  434 if __name__ == '__main__':
  435     unittest.main()