"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "scripts/irods/test/test_native_rule_engine_plugin.py" between
irods-4.2.5.tar.gz and irods-4.2.6.tar.gz

About: iRODS (the integrated Rule Oriented Data System) is a distributed data-management system for creating data grids, digital libraries, persistent archives, and real-time data systems.

test_native_rule_engine_plugin.py  (irods-4.2.5):test_native_rule_engine_plugin.py  (irods-4.2.6)
skipping to change at line 12 skipping to change at line 12
import sys import sys
if sys.version_info < (2, 7): if sys.version_info < (2, 7):
import unittest2 as unittest import unittest2 as unittest
else: else:
import unittest import unittest
import inspect import inspect
import json import json
import os import os
import time import time
import tempfile
from textwrap import dedent
from . import resource_suite from . import resource_suite
from .. import test from .. import test
from . import settings from . import settings
from .. import paths from .. import paths
from .. import lib from .. import lib
from ..configuration import IrodsConfig from ..configuration import IrodsConfig
from ..core_file import temporary_core_file from ..core_file import temporary_core_file
from .rule_texts_for_tests import rule_texts from .rule_texts_for_tests import rule_texts
from ..controller import IrodsController from ..controller import IrodsController
skipping to change at line 65 skipping to change at line 67
super(Test_Native_Rule_Engine_Plugin, self).setUp() super(Test_Native_Rule_Engine_Plugin, self).setUp()
def tearDown(self): def tearDown(self):
super(Test_Native_Rule_Engine_Plugin, self).tearDown() super(Test_Native_Rule_Engine_Plugin, self).tearDown()
def helper_test_pep(self, rules_to_add, icommand, strings_to_check_for=['THI S IS AN OUT VARIABLE'], number_of_strings_to_look_for=1): def helper_test_pep(self, rules_to_add, icommand, strings_to_check_for=['THI S IS AN OUT VARIABLE'], number_of_strings_to_look_for=1):
with temporary_core_file() as core: with temporary_core_file() as core:
time.sleep(1) # remove once file hash fix is committed #2279 time.sleep(1) # remove once file hash fix is committed #2279
core.add_rule(rules_to_add) core.add_rule(rules_to_add)
time.sleep(1) # remove once file hash fix is committed #2279 time.sleep(1) # remove once file hash fix is committed #2279
initial_size_of_server_log = lib.get_file_size_by_path(paths.server_ log_path()) initial_size_of_server_log = lib.get_file_size_by_path(paths.server_ log_path())
self.admin.run_icommand(icommand) self.admin.run_icommand(icommand)
for s in strings_to_check_for: def check_string_count_in_log_section(string, n_occurrences):
count = lib.count_occurrences_of_string_in_log(paths.server_log_path count = lib.count_occurrences_of_string_in_log(paths.server_log_path
(), s, start_index=initial_size_of_server_log) (), string, start_index=initial_size_of_server_log)
assert number_of_strings_to_look_for == count, 'Found {0} instead of self.assertTrue (n_occurrences == count, msg='Found {0} instead of {
{1} occurrences of {2}'.format(count, number_of_strings_to_look_for, s) 1} occurrences of {2}'.format(count, n_occurrences, string))
if isinstance(strings_to_check_for, dict):
for s,n in strings_to_check_for.items():
check_string_count_in_log_section (s,n)
else:
for s in strings_to_check_for:
check_string_count_in_log_section (s,number_of_strings_to_look_f
or)
@unittest.skipIf(plugin_name == 'irods_rule_engine_plugin-python' or test.se
ttings.TOPOLOGY_FROM_RESOURCE_SERVER, 'Native only test when not in a topology')
def test_peps_for_parallel_mode_transfers__4404(self):
(fd, largefile) = tempfile.mkstemp()
os.write(fd,"123456789abcdef\n"*(6*1024**2))
os.close(fd)
temp_base = os.path.basename(largefile)
temp_dir = os.path.dirname(largefile)
extrafile = ""
try:
get_peps = dedent("""\
pep_api_data_obj_get_pre (*INSTANCE_NAME, *COMM, *DATAOBJINP, *B
UFFER, *PORTAL_OPR_OUT)
{
writeLine("serverLog", "data-obj-get-pre")
}
pep_api_data_obj_get_post (*INSTANCE_NAME, *COMM, *DATAOBJINP, *
BUFFER, *PORTAL_OPR_OUT)
{
writeLine("serverLog", "data-obj-get-post")
}
pep_api_data_obj_get_except (*INSTANCE_NAME, *COMM, *DATAOBJINP,
*BUFFER, *PORTAL_OPR_OUT)
{
writeLine("serverLog", "data-obj-get-except")
}
""")
put_peps = dedent("""\
pep_api_data_obj_put_pre (*INSTANCE_NAME, *COMM, *DATAOBJINP, *B
UFFER, *PORTAL_OPR_OUT)
{
writeLine("serverLog", "data-obj-put-pre")
}
pep_api_data_obj_put_post (*INSTANCE_NAME, *COMM, *DATAOBJINP, *
BUFFER, *PORTAL_OPR_OUT)
{
writeLine("serverLog", "data-obj-put-post")
}
pep_api_data_obj_put_except (*INSTANCE_NAME, *COMM, *DATAOBJINP,
*BUFFER, *PORTAL_OPR_OUT)
{
writeLine("serverLog", "data-obj-put-except")
}
""")
self.helper_test_pep( put_peps, "iput -f {}".format(largefile),
{ "data-obj-put-pre": 1, "data-obj-put-post": 1, "d
ata-obj-put-except": 0 } )
self.helper_test_pep( get_peps, "iget -f {} {}".format(temp_base,tem
p_dir),
{ "data-obj-get-pre": 1, "data-obj-get-post": 1, "d
ata-obj-get-except": 0 } )
self.helper_test_pep( put_peps, "iput {}".format(largefile),
{ "data-obj-put-pre": 1, "data-obj-put-post": 0, "d
ata-obj-put-except": 1 } )
self.admin.run_icommand('ichmod null {} {}'.format(self.admin.userna
me,temp_base))
extrafile = os.path.join(temp_dir, temp_base + ".x")
self.helper_test_pep( get_peps, "iget {} {}".format(temp_base,extraf
ile),
{ "data-obj-get-pre": 1, "data-obj-get-post": 0, "d
ata-obj-get-except": 1 } )
finally:
self.admin.run_icommand('ichmod own {} {}'.format(self.admin.usernam
e,temp_base))
os.unlink(largefile)
if extrafile and os.path.isfile(extrafile):
os.unlink(extrafile)
@unittest.skipIf(plugin_name == 'irods_rule_engine_plugin-python' or test.se ttings.TOPOLOGY_FROM_RESOURCE_SERVER, 'Native only test when not in a topology') @unittest.skipIf(plugin_name == 'irods_rule_engine_plugin-python' or test.se ttings.TOPOLOGY_FROM_RESOURCE_SERVER, 'Native only test when not in a topology')
def test_dynamic_policy_enforcement_point_exception_for_plugins__4128(self): def test_dynamic_policy_enforcement_point_exception_for_plugins__4128(self):
path = self.admin.get_vault_session_path('demoResc') path = self.admin.get_vault_session_path('demoResc')
self.admin.run_icommand('iput -f '+self.testfile+' good_file') self.admin.run_icommand('iput -f '+self.testfile+' good_file')
self.admin.run_icommand('iput -f '+self.testfile+' bad_file') self.admin.run_icommand('iput -f '+self.testfile+' bad_file')
os.unlink(os.path.join(path, 'bad_file')) os.unlink(os.path.join(path, 'bad_file'))
pre_pep_fail = """ pre_pep_fail = """
pep_resource_open_pre(*INST, *CTX, *OUT) { pep_resource_open_pre(*INST, *CTX, *OUT) {
skipping to change at line 312 skipping to change at line 384
} }
INPUT null INPUT null
OUTPUT ruleExecOut OUTPUT ruleExecOut
''' '''
with open(rule_file, 'w') as f: with open(rule_file, 'w') as f:
f.write(rule_string) f.write(rule_string)
self.admin.assert_icommand('irule -F ' + rule_file, 'STDERR_SINGLELINE', 'SYS_NOT_SUPPORTED') self.admin.assert_icommand('irule -F ' + rule_file, 'STDERR_SINGLELINE', 'SYS_NOT_SUPPORTED')
os.unlink(rule_file) os.unlink(rule_file)
max_literal_strlen = 1021 # MAX_TOKEN_TEXT_LEN - 2
@unittest.skipIf(plugin_name == 'irods_rule_engine_plugin-python', 'rule lan
guage only: irods#4311')
def test_string_literal__4311(self):
rule_text = '''
main {
*b=".%s"
msiStrlen(*b,*L)
writeLine("stdout",*L)
}
INPUT null
OUTPUT ruleExecOut
''' % ('a'*self.max_literal_strlen,)
rule_file = 'test_string_literal__4311.r'
with open(rule_file, 'w') as f:
f.write(rule_text)
self.admin.assert_icommand(['irule', '-F', rule_file], 'STDERR', ["ERROR
"], desired_rc = 4)
rule_file_2 = 'test_string_literal__4311_noerr.r'
with open(rule_file_2, 'w') as f:
f.write(rule_text.replace('".','"'))
self.admin.assert_icommand(['irule', '-F', rule_file_2],'STDOUT_SINGLELI
NE',str(self.max_literal_strlen))
os.remove(rule_file)
os.remove(rule_file_2)
@unittest.skipIf(plugin_name == 'irods_rule_engine_plugin-python', 'rule lan
guage only: irods#4311')
def test_string_input__4311(self):
rule_text = '''
main {
msiStrlen(*a,*L)
writeLine("stdout",*L)
}
INPUT *a=".%s"
OUTPUT ruleExecOut
''' % ('a'*self.max_literal_strlen,)
rule_file = 'test_string_input__4311.r'
with open(rule_file, 'w') as f:
f.write(rule_text)
self.admin.assert_icommand(['irule', '-F', rule_file], 'STDERR', ["ERROR
"], desired_rc = 4)
rule_file_2 = 'test_string_input__4311_noerr.r'
with open(rule_file_2, 'w') as f:
f.write(rule_text.replace('".','"'))
self.admin.assert_icommand(['irule', '-F', rule_file_2],'STDOUT_SINGLELI
NE',str(self.max_literal_strlen))
os.remove(rule_file)
os.remove(rule_file_2)
@unittest.skipIf(plugin_name == 'irods_rule_engine_plugin-python', 'rule lan
guage only')
def test_msiSegFault(self):
rule_text = rule_texts[self.plugin_name][self.class_name]['test_msiSegFa
ult']
rule_file = 'test_msiSegFault.r'
with open(rule_file, 'w') as f:
f.write(rule_text)
try:
# Should get SYS_INTERNAL_ERR because it's a segmentation fault
self.admin.assert_icommand(['irule', '-F', rule_file],'STDERR','SYS_
INTERNAL_ERR')
# Should get CAT_INSUFFICIENT_PRIVILEGE_LEVEL because this is for ad
min users only
self.user0.assert_icommand(['irule', '-F', rule_file],'STDERR','CAT_
INSUFFICIENT_PRIVILEGE_LEVEL')
finally:
os.unlink(rule_file)
 End of changes. 4 change blocks. 
6 lines changed or deleted 94 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)