"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "source4/dsdb/tests/python/vlv.py" between
samba-4.12.3.tar.gz and samba-4.12.5.tar.gz

About: Samba is the standard Windows interoperability suite of programs for Linux and Unix providing secure, stable and fast file and print services for all clients using the SMB/CIFS protocol. 4.12 series.

vlv.py  (samba-4.12.3):vlv.py  (samba-4.12.5)
skipping to change at line 151 skipping to change at line 151
del user[k] del user[k]
self.users.append(user) self.users.append(user)
self.ldb.add(user) self.ldb.add(user)
return user return user
def setUp(self): def setUp(self):
super(TestsWithUserOU, self).setUp() super(TestsWithUserOU, self).setUp()
self.ldb = SamDB(host, credentials=creds, self.ldb = SamDB(host, credentials=creds,
session_info=system_session(lp), lp=lp) session_info=system_session(lp), lp=lp)
self.ldb_ro = self.ldb
self.base_dn = self.ldb.domain_dn() self.base_dn = self.ldb.domain_dn()
self.tree_dn = "ou=vlvtesttree,%s" % self.base_dn self.tree_dn = "ou=vlvtesttree,%s" % self.base_dn
self.ou = "ou=vlvou,%s" % self.tree_dn self.ou = "ou=vlvou,%s" % self.tree_dn
if opts.delete_in_setup: if opts.delete_in_setup:
try: try:
self.ldb.delete(self.tree_dn, ['tree_delete:1']) self.ldb.delete(self.tree_dn, ['tree_delete:1'])
except ldb.LdbError as e: except ldb.LdbError as e:
print("tried deleting %s, got error %s" % (self.tree_dn, e)) print("tried deleting %s, got error %s" % (self.tree_dn, e))
self.ldb.add({ self.ldb.add({
"dn": self.tree_dn, "dn": self.tree_dn,
skipping to change at line 197 skipping to change at line 197
self.numeric_sorted_keys)] self.numeric_sorted_keys)]
# don't try spaces, etc in cn # don't try spaces, etc in cn
self.delicate_keys = ['cn'] self.delicate_keys = ['cn']
def tearDown(self): def tearDown(self):
super(TestsWithUserOU, self).tearDown() super(TestsWithUserOU, self).tearDown()
if not opts.delete_in_setup: if not opts.delete_in_setup:
self.ldb.delete(self.tree_dn, ['tree_delete:1']) self.ldb.delete(self.tree_dn, ['tree_delete:1'])
class VLVTests(TestsWithUserOU): class VLVTestsBase(TestsWithUserOU):
# Run a vlv search and return important fields of the response control
def vlv_search(self, attr, expr, cookie="", after_count=0, offset=1):
sort_ctrl = "server_sort:1:0:%s" % attr
ctrl = "vlv:1:0:%d:%d:0" % (after_count, offset)
if cookie:
ctrl += ":" + cookie
res = self.ldb_ro.search(self.ou,
expression=expr,
scope=ldb.SCOPE_ONELEVEL,
attrs=[attr],
controls=[ctrl, sort_ctrl])
results = [str(x[attr][0]) for x in res]
ctrls = [str(c) for c in res.controls if
str(c).startswith('vlv')]
self.assertEqual(len(ctrls), 1)
spl = ctrls[0].rsplit(':')
cookie = ""
if len(spl) == 6:
cookie = spl[-1]
return results, cookie
class VLVTestsRO(VLVTestsBase):
def test_vlv_simple_double_run(self):
"""Do the simplest possible VLV query to confirm if VLV
works at all. Useful for showing VLV as a whole works
on Global Catalog (for example)"""
attr = 'roomNumber'
expr = "(objectclass=user)"
# Start new search
full_results, cookie = self.vlv_search(attr, expr,
after_count=len(self.users))
results, cookie = self.vlv_search(attr, expr, cookie=cookie,
after_count=len(self.users))
expected_results = full_results
self.assertEqual(results, expected_results)
class VLVTestsGC(VLVTestsRO):
def setUp(self):
super(VLVTestsRO, self).setUp()
self.ldb_ro = SamDB(host + ":3268", credentials=creds,
session_info=system_session(lp), lp=lp)
class VLVTests(VLVTestsBase):
def get_full_list(self, attr, include_cn=False): def get_full_list(self, attr, include_cn=False):
"""Fetch the whole list sorted on the attribute, using the VLV. """Fetch the whole list sorted on the attribute, using the VLV.
This way you get a VLV cookie.""" This way you get a VLV cookie."""
n_users = len(self.users) n_users = len(self.users)
sort_control = "server_sort:1:0:%s" % attr sort_control = "server_sort:1:0:%s" % attr
half_n = n_users // 2 half_n = n_users // 2
vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1) vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
attrs = [attr] attrs = [attr]
if include_cn: if include_cn:
attrs.append('cn') attrs.append('cn')
skipping to change at line 1079 skipping to change at line 1128
controls=[sort_control, controls=[sort_control,
"vlv:1:1:1:1:0:%s" % vlv_cookies[-1]]) "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
# but now without the critical flag it just does no VLV. # but now without the critical flag it just does no VLV.
new_ldb.search(self.ou, new_ldb.search(self.ou,
scope=ldb.SCOPE_ONELEVEL, scope=ldb.SCOPE_ONELEVEL,
attrs=[attr], attrs=[attr],
controls=[sort_control, controls=[sort_control,
"vlv:0:1:1:1:0:%s" % vlv_cookies[-1]]) "vlv:0:1:1:1:0:%s" % vlv_cookies[-1]])
# Run a vlv search and return important fields of the response control
def vlv_search(self, attr, expr, cookie="", after_count=0, offset=1):
sort_ctrl = "server_sort:1:0:%s" % attr
ctrl = "vlv:1:0:%d:%d:0" % (after_count, offset)
if cookie:
ctrl += ":" + cookie
res = self.ldb.search(self.ou,
expression=expr,
scope=ldb.SCOPE_ONELEVEL,
attrs=[attr],
controls=[ctrl, sort_ctrl])
results = [str(x[attr][0]) for x in res]
ctrls = [str(c) for c in res.controls if
str(c).startswith('vlv')]
self.assertEqual(len(ctrls), 1)
spl = ctrls[0].rsplit(':')
cookie = ""
if len(spl) == 6:
cookie = spl[-1]
return results, cookie
def test_vlv_modify_during_view(self): def test_vlv_modify_during_view(self):
attr = 'roomNumber' attr = 'roomNumber'
expr = "(objectclass=user)" expr = "(objectclass=user)"
# Start new search # Start new search
full_results, cookie = self.vlv_search(attr, expr, full_results, cookie = self.vlv_search(attr, expr,
after_count=len(self.users)) after_count=len(self.users))
# Edit a user # Edit a user
edit_index = len(self.users)//2 edit_index = len(self.users)//2
skipping to change at line 1215 skipping to change at line 1239
controls.append(sort_ctrl) controls.append(sort_ctrl)
kwargs = {} kwargs = {}
if attrs is not None: if attrs is not None:
kwargs = {"attrs": attrs} kwargs = {"attrs": attrs}
scope = ldb.SCOPE_ONELEVEL scope = ldb.SCOPE_ONELEVEL
if subtree: if subtree:
scope = ldb.SCOPE_SUBTREE scope = ldb.SCOPE_SUBTREE
res = self.ldb.search(ou, res = self.ldb_ro.search(ou,
expression=expr, expression=expr,
scope=scope, scope=scope,
controls=controls, controls=controls,
**kwargs) **kwargs)
results = [str(r['cn'][0]) for r in res] results = [str(r['cn'][0]) for r in res]
ctrls = [str(c) for c in res.controls if ctrls = [str(c) for c in res.controls if
str(c).startswith("paged_results")] str(c).startswith("paged_results")]
assert len(ctrls) == 1, "no paged_results response" assert len(ctrls) == 1, "no paged_results response"
spl = ctrls[0].rsplit(':', 3) spl = ctrls[0].rsplit(':', 3)
cookie = "" cookie = ""
if len(spl) == 3: if len(spl) == 3:
cookie = spl[-1] cookie = spl[-1]
return results, cookie return results, cookie
class PagedResultsTestsRO(PagedResultsTests):
def test_paged_search_lockstep(self):
expr = "(objectClass=*)"
ps = 3
all_results, _ = self.paged_search(expr, page_size=len(self.users)+1)
# Run two different but overlapping paged searches simultaneously.
set_1_index = int((len(all_results))//3)
set_2_index = int((2*len(all_results))//3)
set_1 = all_results[set_1_index:]
set_2 = all_results[:set_2_index+1]
set_1_expr = "(cn>=%s)" % (all_results[set_1_index])
set_2_expr = "(cn<=%s)" % (all_results[set_2_index])
results, cookie1 = self.paged_search(set_1_expr, page_size=ps)
self.assertEqual(results, set_1[:ps])
results, cookie2 = self.paged_search(set_2_expr, page_size=ps)
self.assertEqual(results, set_2[:ps])
results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1,
page_size=ps)
self.assertEqual(results, set_1[ps:ps*2])
results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2,
page_size=ps)
self.assertEqual(results, set_2[ps:ps*2])
results, _ = self.paged_search(set_1_expr, cookie=cookie1,
page_size=len(self.users))
self.assertEqual(results, set_1[ps*2:])
results, _ = self.paged_search(set_2_expr, cookie=cookie2,
page_size=len(self.users))
self.assertEqual(results, set_2[ps*2:])
class PagedResultsTestsGC(PagedResultsTestsRO):
def setUp(self):
super(PagedResultsTestsRO, self).setUp()
self.ldb_ro = SamDB(host + ":3268", credentials=creds,
session_info=system_session(lp), lp=lp)
class PagedResultsTestsRW(PagedResultsTests):
def test_paged_delete_during_search(self, sort=True): def test_paged_delete_during_search(self, sort=True):
expr = "(objectClass=*)" expr = "(objectClass=*)"
# Start new search # Start new search
first_page_size = 3 first_page_size = 3
results, cookie = self.paged_search(expr, sort=sort, results, cookie = self.paged_search(expr, sort=sort,
page_size=first_page_size) page_size=first_page_size)
# Run normal search to get expected results # Run normal search to get expected results
unedited_results, _ = self.paged_search(expr, sort=sort, unedited_results, _ = self.paged_search(expr, sort=sort,
skipping to change at line 1633 skipping to change at line 1701
results, cookie = self.paged_search(expr, page_size=3, attrs=attrs) results, cookie = self.paged_search(expr, page_size=3, attrs=attrs)
results, cookie = self.paged_search(expr, cookie=cookie, page_size=3, results, cookie = self.paged_search(expr, cookie=cookie, page_size=3,
attrs=attrs) attrs=attrs)
changed_attrs = attrs + ['roomNumber'] changed_attrs = attrs + ['roomNumber']
self.assertPagedSearchRaises(expected_error_num, expr, self.assertPagedSearchRaises(expected_error_num, expr,
cookie, attrs=changed_attrs, cookie, attrs=changed_attrs,
extra_ctrls=[]) extra_ctrls=[])
def test_paged_search_lockstep(self): def test_vlv_paged(self):
expr = "(objectClass=*)" """Testing behaviour with VLV and paged_results set.
ps = 3
all_results, _ = self.paged_search(expr, page_size=len(self.users)+1) A strange combination, certainly
# Run two different but overlapping paged searches simultaneously. Thankfully combining both of these gives
set_1_index = int((len(all_results))//3) unavailable-critical-extension against Windows 1709
set_2_index = int((2*len(all_results))//3)
set_1 = all_results[set_1_index:]
set_2 = all_results[:set_2_index+1]
set_1_expr = "(cn>=%s)" % (all_results[set_1_index])
set_2_expr = "(cn<=%s)" % (all_results[set_2_index])
results, cookie1 = self.paged_search(set_1_expr, page_size=ps) """
self.assertEqual(results, set_1[:ps]) sort_control = "server_sort:1:0:cn"
results, cookie2 = self.paged_search(set_2_expr, page_size=ps)
self.assertEqual(results, set_2[:ps])
results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1,
page_size=ps)
self.assertEqual(results, set_1[ps:ps*2])
results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2,
page_size=ps)
self.assertEqual(results, set_2[ps:ps*2])
results, _ = self.paged_search(set_1_expr, cookie=cookie1, try:
page_size=len(self.users)) msgs = self.ldb.search(base=self.base_dn,
self.assertEqual(results, set_1[ps*2:]) scope=ldb.SCOPE_SUBTREE,
results, _ = self.paged_search(set_2_expr, cookie=cookie2, attrs=["objectGUID", "cn", "member"],
page_size=len(self.users)) controls=["vlv:1:20:20:11:0",
self.assertEqual(results, set_2[ps*2:]) sort_control,
"paged_results:1:1024"])
self.fail("should have failed with LDAP_UNAVAILABLE_CRITICAL_EXTENSI
ON")
except ldb.LdbError as e:
(enum, estr) = e.args
self.assertEqual(enum, ldb.ERR_UNSUPPORTED_CRITICAL_EXTENSION)
if "://" not in host: if "://" not in host:
if os.path.isfile(host): if os.path.isfile(host):
host = "tdb://%s" % host host = "tdb://%s" % host
else: else:
host = "ldap://%s" % host host = "ldap://%s" % host
TestProgram(module=__name__, opts=subunitopts) TestProgram(module=__name__, opts=subunitopts)
 End of changes. 11 change blocks. 
60 lines changed or deleted 119 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)