"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "test/db_test_base.py" between
roundup-1.6.1.tar.gz and roundup-2.0.0.tar.gz

About: Roundup is an highly customisable issue-tracking system with command-line, web and e-mail interfaces (written in Python).

db_test_base.py  (roundup-1.6.1):db_test_base.py  (roundup-2.0.0)
skipping to change at line 18 skipping to change at line 18
# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING # DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
# OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE # OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
# #
# BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, # BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
# BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS # BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" # FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, # BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
import unittest, os, shutil, errno, imp, sys, time, pprint, base64, os.path from __future__ import print_function
import unittest, os, shutil, errno, imp, sys, time, pprint, os.path
try:
from base64 import encodebytes as base64_encode # python3 only
except ImportError:
# python2 and deplricated in 3
from base64 import encodestring as base64_encode
import logging, cgi import logging, cgi
import gpgmelib from . import gpgmelib
from email.parser import FeedParser from email import message_from_string
import pytest import pytest
from roundup.hyperdb import String, Password, Link, Multilink, Date, \ from roundup.hyperdb import String, Password, Link, Multilink, Date, \
Interval, DatabaseError, Boolean, Number, Node, Integer Interval, DatabaseError, Boolean, Number, Node, Integer
from roundup.mailer import Mailer from roundup.mailer import Mailer
from roundup import date, password, init, instance, configuration, \ from roundup import date, password, init, instance, configuration, \
roundupdb, i18n, hyperdb roundupdb, i18n, hyperdb
from roundup.cgi.templating import HTMLItem from roundup.cgi.templating import HTMLItem
from roundup.cgi.templating import HTMLProperty, _HTMLItem, anti_csrf_nonce from roundup.cgi.templating import HTMLProperty, _HTMLItem, anti_csrf_nonce
from roundup.cgi import client, actions from roundup.cgi import client, actions
from roundup.cgi.engine_zopetal import RoundupPageTemplate from roundup.cgi.engine_zopetal import RoundupPageTemplate
from roundup.cgi.templating import HTMLItem from roundup.cgi.templating import HTMLItem
from roundup.exceptions import UsageError, Reject from roundup.exceptions import UsageError, Reject
from mocknull import MockNull from roundup.anypy.strings import b2s, s2b, u2s
from roundup.anypy.cmp_ import NoneAndDictComparable
from roundup.anypy.email_ import message_from_bytes
from .mocknull import MockNull
config = configuration.CoreConfig() config = configuration.CoreConfig()
config.DATABASE = "db" config.DATABASE = "db"
config.RDBMS_NAME = "rounduptest" config.RDBMS_NAME = "rounduptest"
config.RDBMS_HOST = "localhost" config.RDBMS_HOST = "localhost"
config.RDBMS_USER = "rounduptest" config.RDBMS_USER = "rounduptest"
config.RDBMS_PASSWORD = "rounduptest" config.RDBMS_PASSWORD = "rounduptest"
config.RDBMS_TEMPLATE = "template0" config.RDBMS_TEMPLATE = "template0"
# these TRACKER_WEB and MAIL_DOMAIN values are used in mailgw tests # these TRACKER_WEB and MAIL_DOMAIN values are used in mailgw tests
config.MAIL_DOMAIN = "your.tracker.email.domain.example" config.MAIL_DOMAIN = "your.tracker.email.domain.example"
config.TRACKER_WEB = "http://tracker.example/cgi-bin/roundup.cgi/bugs/" config.TRACKER_WEB = "http://tracker.example/cgi-bin/roundup.cgi/bugs/"
# uncomment the following to have excessive debug output from test cases # uncomment the following to have excessive debug output from test cases
# FIXME: tracker logging level should be increased by -v arguments # FIXME: tracker logging level should be increased by -v arguments
# to 'run_tests.py' script # to 'run_tests.py' script
#config.LOGGING_FILENAME = "/tmp/logfile" #config.LOGGING_FILENAME = "/tmp/logfile"
#config.LOGGING_LEVEL = "DEBUG" #config.LOGGING_LEVEL = "DEBUG"
config.init_logging() config.init_logging()
def setupTracker(dirname, backend="anydbm"): def setupTracker(dirname, backend="anydbm", optimize=False):
"""Install and initialize new tracker in dirname; return tracker instance. """Install and initialize new tracker in dirname; return tracker instance.
If the directory exists, it is wiped out before the operation. If the directory exists, it is wiped out before the operation.
""" """
global config global config
try: try:
shutil.rmtree(dirname) shutil.rmtree(dirname)
except OSError as error: except OSError as error:
if error.errno not in (errno.ENOENT, errno.ESRCH): raise if error.errno not in (errno.ENOENT, errno.ESRCH): raise
# create the instance # create the instance
init.install(dirname, os.path.join(os.path.dirname(__file__), init.install(dirname, os.path.join(os.path.dirname(__file__),
'..', '..',
'share', 'share',
'roundup', 'roundup',
'templates', 'templates',
'classic')) 'classic'))
config.RDBMS_BACKEND = backend config.RDBMS_BACKEND = backend
config.save(os.path.join(dirname, 'config.ini')) config.save(os.path.join(dirname, 'config.ini'))
tracker = instance.open(dirname) tracker = instance.open(dirname, optimize=optimize)
if tracker.exists(): if tracker.exists():
tracker.nuke() tracker.nuke()
tracker.init(password.Password('sekrit')) tracker.init(password.Password('sekrit'))
return tracker return tracker
def setupSchema(db, create, module): def setupSchema(db, create, module):
mls = module.Class(db, "mls", name=String()) mls = module.Class(db, "mls", name=String())
mls.setkey("name") mls.setkey("name")
status = module.Class(db, "status", name=String(), mls=Multilink("mls")) status = module.Class(db, "status", name=String(), mls=Multilink("mls"))
status.setkey("name") status.setkey("name")
skipping to change at line 99 skipping to change at line 111
password=Password(quiet=True), assignable=Boolean(quiet=True), password=Password(quiet=True), assignable=Boolean(quiet=True),
age=Number(quiet=True), roles=String(), address=String(), age=Number(quiet=True), roles=String(), address=String(),
rating=Integer(quiet=True), supervisor=Link('user'), rating=Integer(quiet=True), supervisor=Link('user'),
realname=String(quiet=True), longnumber=Number(use_double=True)) realname=String(quiet=True), longnumber=Number(use_double=True))
user.setkey("username") user.setkey("username")
file = module.FileClass(db, "file", name=String(), type=String(), file = module.FileClass(db, "file", name=String(), type=String(),
comment=String(indexme="yes"), fooz=Password()) comment=String(indexme="yes"), fooz=Password())
file_nidx = module.FileClass(db, "file_nidx", content=String(indexme='no')) file_nidx = module.FileClass(db, "file_nidx", content=String(indexme='no'))
# initialize quiet mode a second way without using Multilink("user", quiet=T rue) # initialize quiet mode a second way without using Multilink("user", quiet=T rue)
mynosy = Multilink("user") mynosy = Multilink("user", rev_multilink='nosy_issues')
mynosy.quiet = True mynosy.quiet = True
issue = module.IssueClass(db, "issue", title=String(indexme="yes"), issue = module.IssueClass(db, "issue", title=String(indexme="yes"),
status=Link("status"), nosy=mynosy, deadline=Date(quiet=True), status=Link("status"), nosy=mynosy, deadline=Date(quiet=True),
foo=Interval(quiet=True, default_value=date.Interval('-1w')), foo=Interval(quiet=True, default_value=date.Interval('-1w')),
files=Multilink("file"), assignedto=Link('user', quiet=True), files=Multilink("file"), assignedto=Link('user', quiet=True,
priority=Link('priority'), spam=Multilink('msg'), feedback=Link('msg')) rev_multilink='issues'), priority=Link('priority'),
spam=Multilink('msg'), feedback=Link('msg'))
stuff = module.Class(db, "stuff", stuff=String()) stuff = module.Class(db, "stuff", stuff=String())
session = module.Class(db, 'session', title=String()) session = module.Class(db, 'session', title=String())
msg = module.FileClass(db, "msg", date=Date(), msg = module.FileClass(db, "msg", date=Date(),
author=Link("user", do_journal='no'), files=Multilink('file'), author=Link("user", do_journal='no'), files=Multilink('file'),
inreplyto=String(), messageid=String(), inreplyto=String(), messageid=String(),
recipients=Multilink("user", do_journal='no')) recipients=Multilink("user", do_journal='no'))
session.disableJournalling() session.disableJournalling()
db.post_init() db.post_init()
if create: if create:
user.create(username="admin", roles='Admin', user.create(username="admin", roles='Admin',
skipping to change at line 151 skipping to change at line 164
class MyTestCase(object): class MyTestCase(object):
def tearDown(self): def tearDown(self):
if hasattr(self, 'db'): if hasattr(self, 'db'):
self.db.close() self.db.close()
if os.path.exists(config.DATABASE): if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE) shutil.rmtree(config.DATABASE)
def open_database(self, user='admin'): def open_database(self, user='admin'):
self.db = self.module.Database(config, user) self.db = self.module.Database(config, user)
if os.environ.has_key('LOGGING_LEVEL'): if 'LOGGING_LEVEL' in os.environ:
logger = logging.getLogger('roundup.hyperdb') logger = logging.getLogger('roundup.hyperdb')
logger.setLevel(os.environ['LOGGING_LEVEL']) logger.setLevel(os.environ['LOGGING_LEVEL'])
class commonDBTest(MyTestCase): class commonDBTest(MyTestCase):
def setUp(self): def setUp(self):
# remove previous test, ignore errors # remove previous test, ignore errors
if os.path.exists(config.DATABASE): if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE) shutil.rmtree(config.DATABASE)
os.makedirs(config.DATABASE + '/files') os.makedirs(config.DATABASE + '/files')
self.open_database() self.open_database()
skipping to change at line 285 skipping to change at line 298
nid = self.db.file.create(content="spam") nid = self.db.file.create(content="spam")
self.assertEqual(self.db.file.get(nid, 'content'), 'spam') self.assertEqual(self.db.file.get(nid, 'content'), 'spam')
# change and make sure we retrieve the correct value # change and make sure we retrieve the correct value
self.db.file.set(nid, content='eggs') self.db.file.set(nid, content='eggs')
if commit: self.db.commit() if commit: self.db.commit()
self.assertEqual(self.db.file.get(nid, 'content'), 'eggs') self.assertEqual(self.db.file.get(nid, 'content'), 'eggs')
def testStringUnicode(self): def testStringUnicode(self):
# test set & retrieve # test set & retrieve
ustr = u'\xe4\xf6\xfc\u20ac'.encode('utf8') ustr = u2s(u'\xe4\xf6\xfc\u20ac')
nid = self.db.issue.create(title=ustr, status='1') nid = self.db.issue.create(title=ustr, status='1')
self.assertEqual(self.db.issue.get(nid, 'title'), ustr) self.assertEqual(self.db.issue.get(nid, 'title'), ustr)
# change and make sure we retrieve the correct value # change and make sure we retrieve the correct value
ustr2 = u'change \u20ac change'.encode('utf8') ustr2 = u2s(u'change \u20ac change')
self.db.issue.set(nid, title=ustr2) self.db.issue.set(nid, title=ustr2)
self.db.commit() self.db.commit()
self.assertEqual(self.db.issue.get(nid, 'title'), ustr2) self.assertEqual(self.db.issue.get(nid, 'title'), ustr2)
# test set & retrieve (this time for file contents)
nid = self.db.file.create(content=ustr)
self.assertEqual(self.db.file.get(nid, 'content'), ustr)
self.assertEqual(self.db.file.get(nid, 'binary_content'), s2b(ustr))
def testStringBinary(self):
''' Create file with binary content that is not able
to be interpreted as unicode. Try to cause file module
trigger and handle UnicodeDecodeError
and get valid output
'''
# test set & retrieve
bstr = b'\x00\xF0\x34\x33' # random binary data
# test set & retrieve (this time for file contents)
nid = self.db.file.create(content=bstr)
print(nid)
print(repr(self.db.file.get(nid, 'content')))
print(repr(self.db.file.get(nid, 'binary_content')))
p3val='file1 is not text, retrieve using binary_content property. mdsum:
0e1d1b47e4bd1beab3afc9b79f596c1d'
if sys.version_info[0] > 2:
# python 3
self.assertEqual(self.db.file.get(nid, 'content'), p3val)
self.assertEqual(self.db.file.get(nid, 'binary_content'),
bstr)
else:
# python 2
self.assertEqual(self.db.file.get(nid, 'content'), bstr)
self.assertEqual(self.db.file.get(nid, 'binary_content'), bstr)
# Link # Link
def testLinkChange(self): def testLinkChange(self):
self.assertRaises(IndexError, self.db.issue.create, title="spam", self.assertRaises(IndexError, self.db.issue.create, title="spam",
status='100') status='100')
for commit in (0,1): for commit in (0,1):
nid = self.db.issue.create(title="spam", status='1') nid = self.db.issue.create(title="spam", status='1')
if commit: self.db.commit() if commit: self.db.commit()
self.assertEqual(self.db.issue.get(nid, "status"), '1') self.assertEqual(self.db.issue.get(nid, "status"), '1')
self.db.issue.set(nid, status='2') self.db.issue.set(nid, status='2')
if commit: self.db.commit() if commit: self.db.commit()
skipping to change at line 414 skipping to change at line 458
if commit: self.db.commit() if commit: self.db.commit()
self.db.issue.set(nid, deadline=date.Date()) self.db.issue.set(nid, deadline=date.Date())
b = self.db.issue.get(nid, "deadline") b = self.db.issue.get(nid, "deadline")
if commit: self.db.commit() if commit: self.db.commit()
self.assertNotEqual(a, b) self.assertNotEqual(a, b)
self.assertNotEqual(b, date.Date('1970-1-1.00:00:00')) self.assertNotEqual(b, date.Date('1970-1-1.00:00:00'))
# The 1970 date will fail for metakit -- it is used # The 1970 date will fail for metakit -- it is used
# internally for storing NULL. The others would, too # internally for storing NULL. The others would, too
# because metakit tries to convert date.timestamp to an int # because metakit tries to convert date.timestamp to an int
# for storing and fails with an overflow. # for storing and fails with an overflow.
for d in [date.Date (x) for x in '2038', '1970', '0033', '9999']: for d in [date.Date (x) for x in ('2038', '1970', '0033', '9999')]:
self.db.issue.set(nid, deadline=d) self.db.issue.set(nid, deadline=d)
if commit: self.db.commit() if commit: self.db.commit()
c = self.db.issue.get(nid, "deadline") c = self.db.issue.get(nid, "deadline")
self.assertEqual(c, d) self.assertEqual(c, d)
def testDateLeapYear(self): def testDateLeapYear(self):
nid = self.db.issue.create(title='spam', status='1', nid = self.db.issue.create(title='spam', status='1',
deadline=date.Date('2008-02-29')) deadline=date.Date('2008-02-29'))
self.assertEquals(str(self.db.issue.get(nid, 'deadline')), self.assertEqual(str(self.db.issue.get(nid, 'deadline')),
'2008-02-29.00:00:00') '2008-02-29.00:00:00')
self.assertEquals(self.db.issue.filter(None, self.assertEqual(self.db.issue.filter(None,
{'deadline': '2008-02-29'}), [nid]) {'deadline': '2008-02-29'}), [nid])
self.assertEquals(list(self.db.issue.filter_iter(None, self.assertEqual(list(self.db.issue.filter_iter(None,
{'deadline': '2008-02-29'})), [nid]) {'deadline': '2008-02-29'})), [nid])
self.db.issue.set(nid, deadline=date.Date('2008-03-01')) self.db.issue.set(nid, deadline=date.Date('2008-03-01'))
self.assertEquals(str(self.db.issue.get(nid, 'deadline')), self.assertEqual(str(self.db.issue.get(nid, 'deadline')),
'2008-03-01.00:00:00') '2008-03-01.00:00:00')
self.assertEquals(self.db.issue.filter(None, self.assertEqual(self.db.issue.filter(None,
{'deadline': '2008-02-29'}), []) {'deadline': '2008-02-29'}), [])
self.assertEquals(list(self.db.issue.filter_iter(None, self.assertEqual(list(self.db.issue.filter_iter(None,
{'deadline': '2008-02-29'})), []) {'deadline': '2008-02-29'})), [])
def testDateUnset(self): def testDateUnset(self):
for commit in (0,1): for commit in (0,1):
nid = self.db.issue.create(title="spam", status='1') nid = self.db.issue.create(title="spam", status='1')
self.db.issue.set(nid, deadline=date.Date()) self.db.issue.set(nid, deadline=date.Date())
if commit: self.db.commit() if commit: self.db.commit()
self.assertNotEqual(self.db.issue.get(nid, "deadline"), None) self.assertNotEqual(self.db.issue.get(nid, "deadline"), None)
self.db.issue.set(nid, deadline=None) self.db.issue.set(nid, deadline=None)
if commit: self.db.commit() if commit: self.db.commit()
skipping to change at line 663 skipping to change at line 707
others = nodeids[:] others = nodeids[:]
others.remove('1') others.remove('1')
self.assertEqual(set(self.db.status.getnodeids()), self.assertEqual(set(self.db.status.getnodeids()),
set(nodeids)) set(nodeids))
self.assertEqual(set(self.db.status.getnodeids(retired=True)), self.assertEqual(set(self.db.status.getnodeids(retired=True)),
set(['1'])) set(['1']))
self.assertEqual(set(self.db.status.getnodeids(retired=False)), self.assertEqual(set(self.db.status.getnodeids(retired=False)),
set(others)) set(others))
self.assert_(self.db.status.is_retired('1')) self.assertTrue(self.db.status.is_retired('1'))
# make sure the list is different # make sure the list is different
self.assertNotEqual(a, self.db.status.list()) self.assertNotEqual(a, self.db.status.list())
# can still access the node if necessary # can still access the node if necessary
self.assertEqual(self.db.status.get('1', 'name'), b) self.assertEqual(self.db.status.get('1', 'name'), b)
self.assertRaises(IndexError, self.db.status.set, '1', name='hello') self.assertRaises(IndexError, self.db.status.set, '1', name='hello')
self.db.commit() self.db.commit()
self.assert_(self.db.status.is_retired('1')) self.assertTrue(self.db.status.is_retired('1'))
self.assertEqual(self.db.status.get('1', 'name'), b) self.assertEqual(self.db.status.get('1', 'name'), b)
self.assertNotEqual(a, self.db.status.list()) self.assertNotEqual(a, self.db.status.list())
# try to restore retired node # try to restore retired node
self.db.status.restore('1') self.db.status.restore('1')
self.assert_(not self.db.status.is_retired('1')) self.assertTrue(not self.db.status.is_retired('1'))
def testCacheCreateSet(self): def testCacheCreateSet(self):
self.db.issue.create(title="spam", status='1') self.db.issue.create(title="spam", status='1')
a = self.db.issue.get('1', 'title') a = self.db.issue.get('1', 'title')
self.assertEqual(a, 'spam') self.assertEqual(a, 'spam')
self.db.issue.set('1', title='ham') self.db.issue.set('1', title='ham')
b = self.db.issue.get('1', 'title') b = self.db.issue.get('1', 'title')
self.assertEqual(b, 'ham') self.assertEqual(b, 'ham')
def testSerialisation(self): def testSerialisation(self):
skipping to change at line 1047 skipping to change at line 1091
# change all quiet params on issue. # change all quiet params on issue.
result=self.db.issue.set(new_issue, title="title2", result=self.db.issue.set(new_issue, title="title2",
deadline=date.Date('2016-07-30.22:39'), deadline=date.Date('2016-07-30.22:39'),
assignedto="2", nosy=["3", "2"]) assignedto="2", nosy=["3", "2"])
result=self.db.issue.generateCreateNote(new_issue) result=self.db.issue.generateCreateNote(new_issue)
self.assertEqual(result, '\n----------\ntitle: title2') self.assertEqual(result, '\n----------\ntitle: title2')
# check history including quiet properties # check history including quiet properties
result=self.db.issue.history(new_issue, skipquiet=False) result=self.db.issue.history(new_issue, skipquiet=False)
print result print(result)
''' output should be like: ''' output should be like:
[ ... ('1', <Date 2017-04-14.01:41:08.466>, '1', 'set', [ ... ('1', <Date 2017-04-14.01:41:08.466>, '1', 'set',
{'assignedto': None, 'nosy': (('+', ['3', '2']),), {'assignedto': None, 'nosy': (('+', ['3', '2']),),
'deadline': <Date 2016-06-30.22:39:00.000>, 'deadline': <Date 2016-06-30.22:39:00.000>,
'title': 'title'}) 'title': 'title'})
''' '''
expected = {'assignedto': None, expected = {'assignedto': None,
'nosy': (('+', ['3', '2']),), 'nosy': (('+', ['3', '2']),),
'deadline': date.Date('2016-06-30.22:39'), 'deadline': date.Date('2016-06-30.22:39'),
'title': 'title'} 'title': 'title'}
result.sort() result.sort()
print "history include quiet props", result[-1] print("history include quiet props", result[-1])
(id, tx_date, user, action, args) = result[-1] (id, tx_date, user, action, args) = result[-1]
# check piecewise ignoring date of transaction # check piecewise ignoring date of transaction
self.assertEqual('1', id) self.assertEqual('1', id)
self.assertEqual('1', user) self.assertEqual('1', user)
self.assertEqual('set', action) self.assertEqual('set', action)
self.assertEqual(expected, args) self.assertEqual(expected, args)
# check history removing quiet properties # check history removing quiet properties
result=self.db.issue.history(new_issue) result=self.db.issue.history(new_issue)
''' output should be like: ''' output should be like:
[ ... ('1', <Date 2017-04-14.01:41:08.466>, '1', 'set', [ ... ('1', <Date 2017-04-14.01:41:08.466>, '1', 'set',
{'title': 'title'}) {'title': 'title'})
''' '''
expected = {'title': 'title'} expected = {'title': 'title'}
result.sort() result.sort()
print "history remove quiet props", result[-1] print("history remove quiet props", result[-1])
(id, tx_date, user, action, args) = result[-1] (id, tx_date, user, action, args) = result[-1]
# check piecewise # check piecewise
self.assertEqual('1', id) self.assertEqual('1', id)
self.assertEqual('1', user) self.assertEqual('1', user)
self.assertEqual('set', action) self.assertEqual('set', action)
self.assertEqual(expected, args) self.assertEqual(expected, args)
# also test that we can make a property noisy # also test that we can make a property noisy
self.db.issue.properties['nosy'].quiet=False self.db.issue.properties['nosy'].quiet=False
self.db.issue.properties['deadline'].quiet=False self.db.issue.properties['deadline'].quiet=False
skipping to change at line 1110 skipping to change at line 1154
assignedto="2", nosy=["1", "2"]) assignedto="2", nosy=["1", "2"])
result=self.db.issue.generateCreateNote(new_issue) result=self.db.issue.generateCreateNote(new_issue)
self.assertEqual(result, '\n----------\ndeadline: 2016-07-13.22:39:00\nn osy: admin, fred\ntitle: title2') self.assertEqual(result, '\n----------\ndeadline: 2016-07-13.22:39:00\nn osy: admin, fred\ntitle: title2')
# check history removing the current quiet properties # check history removing the current quiet properties
result=self.db.issue.history(new_issue) result=self.db.issue.history(new_issue)
expected = {'nosy': (('+', ['1']), ('-', ['3'])), expected = {'nosy': (('+', ['1']), ('-', ['3'])),
'deadline': date.Date("2016-07-30.22:39:00.000")} 'deadline': date.Date("2016-07-30.22:39:00.000")}
result.sort() result.sort()
print "result unquiet", result print("result unquiet", result)
(id, tx_date, user, action, args) = result[-1] (id, tx_date, user, action, args) = result[-1]
# check piecewise # check piecewise
self.assertEqual('1', id) self.assertEqual('1', id)
self.assertEqual('1', user) self.assertEqual('1', user)
self.assertEqual('set', action) self.assertEqual('set', action)
self.assertEqual(expected, args) self.assertEqual(expected, args)
result=self.db.user.history('2') result=self.db.user.history('2')
result.sort() result.sort()
skipping to change at line 1227 skipping to change at line 1271
self.db.issue.create(title="spam", status='1') self.db.issue.create(title="spam", status='1')
self.db.commit() self.db.commit()
# journal entry for issue create # journal entry for issue create
journal = self.db.getjournal('issue', '1') journal = self.db.getjournal('issue', '1')
self.assertEqual(1, len(journal)) self.assertEqual(1, len(journal))
(nodeid, date_stamp, journaltag, action, params) = journal[0] (nodeid, date_stamp, journaltag, action, params) = journal[0]
self.assertEqual(nodeid, '1') self.assertEqual(nodeid, '1')
self.assertEqual(journaltag, self.db.user.lookup('admin')) self.assertEqual(journaltag, self.db.user.lookup('admin'))
self.assertEqual(action, 'create') self.assertEqual(action, 'create')
keys = params.keys() keys = sorted(params.keys())
keys.sort()
self.assertEqual(keys, []) self.assertEqual(keys, [])
# journal entry for link # journal entry for link
journal = self.db.getjournal('user', '1') journal = self.db.getjournal('user', '1')
self.assertEqual(1, len(journal)) self.assertEqual(1, len(journal))
self.db.issue.set('1', assignedto='1') self.db.issue.set('1', assignedto='1')
self.db.commit() self.db.commit()
journal = self.db.getjournal('user', '1') journal = self.db.getjournal('user', '1')
self.assertEqual(2, len(journal)) self.assertEqual(2, len(journal))
(nodeid, date_stamp, journaltag, action, params) = journal[1] (nodeid, date_stamp, journaltag, action, params) = journal[1]
skipping to change at line 1313 skipping to change at line 1356
result.sort() result.sort()
# anydbm drops unknown properties during serialisation # anydbm drops unknown properties during serialisation
if self.db.dbtype == 'anydbm': if self.db.dbtype == 'anydbm':
self.assertEqual(len(result), 4) self.assertEqual(len(result), 4)
self.assertEqual(result [1][4], jp0) self.assertEqual(result [1][4], jp0)
self.assertEqual(result [2][4], jp2) self.assertEqual(result [2][4], jp2)
self.assertEqual(result [3][4], jp3) self.assertEqual(result [3][4], jp3)
else: else:
self.assertEqual(len(result), 5) self.assertEqual(len(result), 5)
self.assertEqual(result [1][4], jp0) self.assertEqual(result [1][4], jp0)
print(result) # following test fails sometimes under sqlite
# in travis. Looks like an ordering issue
# in python 3.5. Print result to debug.
self.assertEqual(result [2][4], jp1) self.assertEqual(result [2][4], jp1)
self.assertEqual(result [3][4], jp2) self.assertEqual(result [3][4], jp2)
self.assertEqual(result [4][4], jp3) self.assertEqual(result [4][4], jp3)
self.db.close() self.db.close()
# Verify that normal user doesn't see obsolete props/classes # Verify that normal user doesn't see obsolete props/classes
# Backend memorydb cannot re-open db for different user # Backend memorydb cannot re-open db for different user
if self.db.dbtype != 'memorydb': if self.db.dbtype != 'memorydb':
self.open_database('mary') self.open_database('mary')
setupSchema(self.db, 0, self.module) setupSchema(self.db, 0, self.module)
# allow mary to see issue fields like title # allow mary to see issue fields like title
skipping to change at line 1364 skipping to change at line 1410
self.assertEqual(jlen-1, len(self.db.getjournal('issue', id))) self.assertEqual(jlen-1, len(self.db.getjournal('issue', id)))
def testIndexerSearching(self): def testIndexerSearching(self):
f1 = self.db.file.create(content='hello', type="text/plain") f1 = self.db.file.create(content='hello', type="text/plain")
# content='world' has the wrong content-type and won't be indexed # content='world' has the wrong content-type and won't be indexed
f2 = self.db.file.create(content='world', type="text/frozz", f2 = self.db.file.create(content='world', type="text/frozz",
comment='blah blah') comment='blah blah')
i1 = self.db.issue.create(files=[f1, f2], title="flebble plop") i1 = self.db.issue.create(files=[f1, f2], title="flebble plop")
i2 = self.db.issue.create(title="flebble the frooz") i2 = self.db.issue.create(title="flebble the frooz")
self.db.commit() self.db.commit()
self.assertEquals(self.db.indexer.search([], self.db.issue), {}) self.assertEqual(self.db.indexer.search([], self.db.issue), {})
self.assertEquals(self.db.indexer.search(['hello'], self.db.issue), self.assertEqual(self.db.indexer.search(['hello'], self.db.issue),
{i1: {'files': [f1]}}) {i1: {'files': [f1]}})
# content='world' has the wrong content-type and shouldn't be indexed # content='world' has the wrong content-type and shouldn't be indexed
self.assertEquals(self.db.indexer.search(['world'], self.db.issue), {}) self.assertEqual(self.db.indexer.search(['world'], self.db.issue), {})
self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue), self.assertEqual(self.db.indexer.search(['frooz'], self.db.issue),
{i2: {}}) {i2: {}})
self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue), self.assertEqual(self.db.indexer.search(['flebble'], self.db.issue),
{i1: {}, i2: {}}) {i1: {}, i2: {}})
# test AND'ing of search terms # test AND'ing of search terms
self.assertEquals(self.db.indexer.search(['frooz', 'flebble'], self.assertEqual(self.db.indexer.search(['frooz', 'flebble'],
self.db.issue), {i2: {}}) self.db.issue), {i2: {}})
# unindexed stopword # unindexed stopword
self.assertEquals(self.db.indexer.search(['the'], self.db.issue), {}) self.assertEqual(self.db.indexer.search(['the'], self.db.issue), {})
def testIndexerSearchingLink(self): def testIndexerSearchingLink(self):
m1 = self.db.msg.create(content="one two") m1 = self.db.msg.create(content="one two")
i1 = self.db.issue.create(messages=[m1]) i1 = self.db.issue.create(messages=[m1])
m2 = self.db.msg.create(content="two three") m2 = self.db.msg.create(content="two three")
i2 = self.db.issue.create(feedback=m2) i2 = self.db.issue.create(feedback=m2)
self.db.commit() self.db.commit()
self.assertEquals(self.db.indexer.search(['two'], self.db.issue), self.assertEqual(self.db.indexer.search(['two'], self.db.issue),
{i1: {'messages': [m1]}, i2: {'feedback': [m2]}}) {i1: {'messages': [m1]}, i2: {'feedback': [m2]}})
def testIndexerSearchMulti(self): def testIndexerSearchMulti(self):
m1 = self.db.msg.create(content="one two") m1 = self.db.msg.create(content="one two")
m2 = self.db.msg.create(content="two three") m2 = self.db.msg.create(content="two three")
i1 = self.db.issue.create(messages=[m1]) i1 = self.db.issue.create(messages=[m1])
i2 = self.db.issue.create(spam=[m2]) i2 = self.db.issue.create(spam=[m2])
self.db.commit() self.db.commit()
self.assertEquals(self.db.indexer.search([], self.db.issue), {}) self.assertEqual(self.db.indexer.search([], self.db.issue), {})
self.assertEquals(self.db.indexer.search(['one'], self.db.issue), self.assertEqual(self.db.indexer.search(['one'], self.db.issue),
{i1: {'messages': [m1]}}) {i1: {'messages': [m1]}})
self.assertEquals(self.db.indexer.search(['two'], self.db.issue), self.assertEqual(self.db.indexer.search(['two'], self.db.issue),
{i1: {'messages': [m1]}, i2: {'spam': [m2]}}) {i1: {'messages': [m1]}, i2: {'spam': [m2]}})
self.assertEquals(self.db.indexer.search(['three'], self.db.issue), self.assertEqual(self.db.indexer.search(['three'], self.db.issue),
{i2: {'spam': [m2]}}) {i2: {'spam': [m2]}})
def testReindexingChange(self): def testReindexingChange(self):
search = self.db.indexer.search search = self.db.indexer.search
issue = self.db.issue issue = self.db.issue
i1 = issue.create(title="flebble plop") i1 = issue.create(title="flebble plop")
i2 = issue.create(title="flebble frooz") i2 = issue.create(title="flebble frooz")
self.db.commit() self.db.commit()
self.assertEquals(search(['plop'], issue), {i1: {}}) self.assertEqual(search(['plop'], issue), {i1: {}})
self.assertEquals(search(['flebble'], issue), {i1: {}, i2: {}}) self.assertEqual(search(['flebble'], issue), {i1: {}, i2: {}})
# change i1's title # change i1's title
issue.set(i1, title="plop") issue.set(i1, title="plop")
self.db.commit() self.db.commit()
self.assertEquals(search(['plop'], issue), {i1: {}}) self.assertEqual(search(['plop'], issue), {i1: {}})
self.assertEquals(search(['flebble'], issue), {i2: {}}) self.assertEqual(search(['flebble'], issue), {i2: {}})
def testReindexingClear(self): def testReindexingClear(self):
search = self.db.indexer.search search = self.db.indexer.search
issue = self.db.issue issue = self.db.issue
i1 = issue.create(title="flebble plop") i1 = issue.create(title="flebble plop")
i2 = issue.create(title="flebble frooz") i2 = issue.create(title="flebble frooz")
self.db.commit() self.db.commit()
self.assertEquals(search(['plop'], issue), {i1: {}}) self.assertEqual(search(['plop'], issue), {i1: {}})
self.assertEquals(search(['flebble'], issue), {i1: {}, i2: {}}) self.assertEqual(search(['flebble'], issue), {i1: {}, i2: {}})
# unset i1's title # unset i1's title
issue.set(i1, title="") issue.set(i1, title="")
self.db.commit() self.db.commit()
self.assertEquals(search(['plop'], issue), {}) self.assertEqual(search(['plop'], issue), {})
self.assertEquals(search(['flebble'], issue), {i2: {}}) self.assertEqual(search(['flebble'], issue), {i2: {}})
def testFileClassReindexing(self): def testFileClassReindexing(self):
f1 = self.db.file.create(content='hello') f1 = self.db.file.create(content='hello')
f2 = self.db.file.create(content='hello, world') f2 = self.db.file.create(content='hello, world')
i1 = self.db.issue.create(files=[f1, f2]) i1 = self.db.issue.create(files=[f1, f2])
self.db.commit() self.db.commit()
d = self.db.indexer.search(['hello'], self.db.issue) d = self.db.indexer.search(['hello'], self.db.issue)
self.assert_(d.has_key(i1)) self.assertTrue(i1 in d)
d[i1]['files'].sort() d[i1]['files'].sort()
self.assertEquals(d, {i1: {'files': [f1, f2]}}) self.assertEqual(d, {i1: {'files': [f1, f2]}})
self.assertEquals(self.db.indexer.search(['world'], self.db.issue), self.assertEqual(self.db.indexer.search(['world'], self.db.issue),
{i1: {'files': [f2]}}) {i1: {'files': [f2]}})
self.db.file.set(f1, content="world") self.db.file.set(f1, content="world")
self.db.commit() self.db.commit()
d = self.db.indexer.search(['world'], self.db.issue) d = self.db.indexer.search(['world'], self.db.issue)
d[i1]['files'].sort() d[i1]['files'].sort()
self.assertEquals(d, {i1: {'files': [f1, f2]}}) self.assertEqual(d, {i1: {'files': [f1, f2]}})
self.assertEquals(self.db.indexer.search(['hello'], self.db.issue), self.assertEqual(self.db.indexer.search(['hello'], self.db.issue),
{i1: {'files': [f2]}}) {i1: {'files': [f2]}})
def testFileClassIndexingNoNoNo(self): def testFileClassIndexingNoNoNo(self):
f1 = self.db.file.create(content='hello') f1 = self.db.file.create(content='hello')
self.db.commit() self.db.commit()
self.assertEquals(self.db.indexer.search(['hello'], self.db.file), self.assertEqual(self.db.indexer.search(['hello'], self.db.file),
{'1': {}}) {'1': {}})
f1 = self.db.file_nidx.create(content='hello') f1 = self.db.file_nidx.create(content='hello')
self.db.commit() self.db.commit()
self.assertEquals(self.db.indexer.search(['hello'], self.db.file_nidx), self.assertEqual(self.db.indexer.search(['hello'], self.db.file_nidx),
{}) {})
def testForcedReindexing(self): def testForcedReindexing(self):
self.db.issue.create(title="flebble frooz") self.db.issue.create(title="flebble frooz")
self.db.commit() self.db.commit()
self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue), self.assertEqual(self.db.indexer.search(['flebble'], self.db.issue),
{'1': {}}) {'1': {}})
self.db.indexer.quiet = 1 self.db.indexer.quiet = 1
self.db.indexer.force_reindex() self.db.indexer.force_reindex()
self.db.post_init() self.db.post_init()
self.db.indexer.quiet = 9 self.db.indexer.quiet = 9
self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue), self.assertEqual(self.db.indexer.search(['flebble'], self.db.issue),
{'1': {}}) {'1': {}})
def testIndexingPropertiesOnImport(self): def testIndexingPropertiesOnImport(self):
# import an issue # import an issue
title = 'Bzzt' title = 'Bzzt'
nodeid = self.db.issue.import_list(['title', 'messages', 'files', nodeid = self.db.issue.import_list(['title', 'messages', 'files',
'spam', 'nosy', 'superseder'], [repr(title), '[]', '[]', 'spam', 'nosy', 'superseder'], [repr(title), '[]', '[]',
'[]', '[]', '[]']) '[]', '[]', '[]'])
self.db.commit() self.db.commit()
# Content of title attribute is indexed # Content of title attribute is indexed
self.assertEquals(self.db.indexer.search([title], self.db.issue), self.assertEqual(self.db.indexer.search([title], self.db.issue),
{str(nodeid):{}}) {str(nodeid):{}})
# #
# searching tests follow # searching tests follow
# #
def testFindIncorrectProperty(self): def testFindIncorrectProperty(self):
self.assertRaises(TypeError, self.db.issue.find, title='fubar') self.assertRaises(TypeError, self.db.issue.find, title='fubar')
def _find_test_setup(self): def _find_test_setup(self):
self.db.file.create(content='') self.db.file.create(content='')
skipping to change at line 1515 skipping to change at line 1561
def testFindLink(self): def testFindLink(self):
one, two, three, four = self._find_test_setup() one, two, three, four = self._find_test_setup()
got = self.db.issue.find(status='1') got = self.db.issue.find(status='1')
got.sort() got.sort()
self.assertEqual(got, [one, three]) self.assertEqual(got, [one, three])
got = self.db.issue.find(status={'1':1}) got = self.db.issue.find(status={'1':1})
got.sort() got.sort()
self.assertEqual(got, [one, three]) self.assertEqual(got, [one, three])
def testFindRevLinkMultilink(self):
ae, filter, filter_iter = self.filteringSetupTransitiveSearch('user')
ni = 'nosy_issues'
self.db.issue.set('6', nosy=['3', '4', '5'])
self.db.issue.set('7', nosy=['5'])
# After this setup we have the following values for nosy:
# issue assignedto nosy
# 1: 6 4
# 2: 6 5
# 3: 7
# 4: 8
# 5: 9
# 6: 10 3, 4, 5
# 7: 10 5
# 8: 10
# assignedto links back from 'issues'
# nosy links back from 'nosy_issues'
self.assertEqual(self.db.user.find(issues={'1':1}), ['6'])
self.assertEqual(self.db.user.find(issues={'8':1}), ['10'])
self.assertEqual(self.db.user.find(issues={'2':1, '5':1}), ['6', '9'])
self.assertEqual(self.db.user.find(nosy_issues={'8':1}), [])
self.assertEqual(self.db.user.find(nosy_issues={'6':1}),
['3', '4', '5'])
self.assertEqual(self.db.user.find(nosy_issues={'3':1, '5':1}), [])
self.assertEqual(self.db.user.find(nosy_issues={'2':1, '6':1, '7':1}),
['3', '4', '5'])
def testFindLinkFail(self): def testFindLinkFail(self):
self._find_test_setup() self._find_test_setup()
self.assertEqual(self.db.issue.find(status='4'), []) self.assertEqual(self.db.issue.find(status='4'), [])
self.assertEqual(self.db.issue.find(status={'4':1}), []) self.assertEqual(self.db.issue.find(status={'4':1}), [])
def testFindLinkUnset(self): def testFindLinkUnset(self):
one, two, three, four = self._find_test_setup() one, two, three, four = self._find_test_setup()
got = self.db.issue.find(assignedto=None) got = self.db.issue.find(assignedto=None)
got.sort() got.sort()
self.assertEqual(got, [one, three]) self.assertEqual(got, [one, three])
got = self.db.issue.find(assignedto={None:1}) got = self.db.issue.find(assignedto={None:1})
got.sort() got.sort()
self.assertEqual(got, [one, three]) self.assertEqual(got, [one, three])
def testFindMultipleLink(self): def testFindMultipleLink(self):
one, two, three, four = self._find_test_setup() one, two, three, four = self._find_test_setup()
l = self.db.issue.find(status={'1':1, '3':1}) l = self.db.issue.find(status={'1':1, '3':1})
l.sort() l.sort()
self.assertEqual(l, [one, three, four]) self.assertEqual(l, [one, three, four])
l = self.db.issue.find(status=('1', '3'))
l.sort()
self.assertEqual(l, [one, three, four])
l = self.db.issue.find(status=['1', '3'])
l.sort()
self.assertEqual(l, [one, three, four])
l = self.db.issue.find(assignedto={None:1, '1':1}) l = self.db.issue.find(assignedto={None:1, '1':1})
l.sort() l.sort()
self.assertEqual(l, [one, three, four]) self.assertEqual(l, [one, three, four])
def testFindMultilink(self): def testFindMultilink(self):
one, two, three, four = self._find_test_setup() one, two, three, four = self._find_test_setup()
got = self.db.issue.find(nosy='2') got = self.db.issue.find(nosy='2')
got.sort() got.sort()
self.assertEqual(got, [two, three]) self.assertEqual(got, [two, three])
got = self.db.issue.find(nosy={'2':1}) got = self.db.issue.find(nosy={'2':1})
skipping to change at line 1701 skipping to change at line 1780
ae(filt(None, {'title': ['One']}, ('+','id'), (None,None)), ['1']) ae(filt(None, {'title': ['One']}, ('+','id'), (None,None)), ['1'])
ae(filt(None, {'title': ['Issue One']}, ('+','id'), (None,None)), ae(filt(None, {'title': ['Issue One']}, ('+','id'), (None,None)),
['1']) ['1'])
ae(filt(None, {'title': ['ISSUE', 'ONE']}, ('+','id'), (None,None)), ae(filt(None, {'title': ['ISSUE', 'ONE']}, ('+','id'), (None,None)),
['1']) ['1'])
ae(filt(None, {'title': ['iSSUE']}, ('+','id'), (None,None)), ae(filt(None, {'title': ['iSSUE']}, ('+','id'), (None,None)),
['1','2','3']) ['1','2','3'])
ae(filt(None, {'title': ['One', 'Two']}, ('+','id'), (None,None)), ae(filt(None, {'title': ['One', 'Two']}, ('+','id'), (None,None)),
[]) [])
def testFilteringStringExactMatch(self):
ae, filter, filter_iter = self.filteringSetup()
# Change title of issue2 to 'issue' so we can test substring
# search vs exact search
self.db.issue.set('2', title='issue')
#self.db.commit()
for filt in filter, filter_iter:
ae(filt(None, {}, exact_match_spec =
{'title': ['one']}), [])
ae(filt(None, {}, exact_match_spec =
{'title': ['issue one']}), ['1'])
ae(filt(None, {}, exact_match_spec =
{'title': ['issue', 'one']}), [])
ae(filt(None, {}, exact_match_spec =
{'title': ['issue']}), ['2'])
ae(filt(None, {}, exact_match_spec =
{'title': ['one', 'two']}), [])
ae(filt(None, {}, exact_match_spec =
{'title': ['One']}), [])
ae(filt(None, {}, exact_match_spec =
{'title': ['Issue One']}), [])
ae(filt(None, {}, exact_match_spec =
{'title': ['ISSUE', 'ONE']}), [])
ae(filt(None, {}, exact_match_spec =
{'title': ['iSSUE']}), [])
ae(filt(None, {}, exact_match_spec =
{'title': ['One', 'Two']}), [])
ae(filt(None, {}, exact_match_spec =
{'title': ['non four']}), ['4'])
# Both, filterspec and exact_match_spec on same prop
ae(filt(None, {'title': 'iSSUE'}, exact_match_spec =
{'title': ['issue']}), ['2'])
def testFilteringSpecialChars(self): def testFilteringSpecialChars(self):
""" Special characters in SQL search are '%' and '_', some used """ Special characters in SQL search are '%' and '_', some used
to lead to a traceback. to lead to a traceback.
""" """
ae, filter, filter_iter = self.filteringSetup() ae, filter, filter_iter = self.filteringSetup()
self.db.issue.set('1', title="With % symbol") self.db.issue.set('1', title="With % symbol")
self.db.issue.set('2', title="With _ symbol") self.db.issue.set('2', title="With _ symbol")
self.db.issue.set('3', title="With \\ symbol") self.db.issue.set('3', title="With \\ symbol")
self.db.issue.set('4', title="With ' symbol") self.db.issue.set('4', title="With ' symbol")
d = dict (status = '1') d = dict (status = '1')
skipping to change at line 1729 skipping to change at line 1841
a = 'assignedto' a = 'assignedto'
grp = (None, None) grp = (None, None)
for filt in filter, filter_iter: for filt in filter, filter_iter:
ae(filt(None, {'status': '1'}, ('+','id'), grp), ['2','3']) ae(filt(None, {'status': '1'}, ('+','id'), grp), ['2','3'])
ae(filt(None, {a: '-1'}, ('+','id'), grp), ['3','4']) ae(filt(None, {a: '-1'}, ('+','id'), grp), ['3','4'])
ae(filt(None, {a: None}, ('+','id'), grp), ['3','4']) ae(filt(None, {a: None}, ('+','id'), grp), ['3','4'])
ae(filt(None, {a: [None]}, ('+','id'), grp), ['3','4']) ae(filt(None, {a: [None]}, ('+','id'), grp), ['3','4'])
ae(filt(None, {a: ['-1', None]}, ('+','id'), grp), ['3','4']) ae(filt(None, {a: ['-1', None]}, ('+','id'), grp), ['3','4'])
ae(filt(None, {a: ['1', None]}, ('+','id'), grp), ['1', '3','4']) ae(filt(None, {a: ['1', None]}, ('+','id'), grp), ['1', '3','4'])
def testFilteringRevLink(self):
ae, filter, filter_iter = self.filteringSetupTransitiveSearch('user')
# We have
# issue assignedto
# 1: 6
# 2: 6
# 3: 7
# 4: 8
# 5: 9
# 6: 10
# 7: 10
# 8: 10
for filt in filter, filter_iter:
ae(filt(None, {'issues': ['3', '4']}), ['7', '8'])
ae(filt(None, {'issues': ['1', '4', '8']}), ['6', '8', '10'])
ae(filt(None, {'issues.title': ['ts2']}), ['6'])
ae(filt(None, {'issues': ['-1']}), ['1', '2', '3', '4', '5'])
ae(filt(None, {'issues': '-1'}), ['1', '2', '3', '4', '5'])
def ls(x):
return list(sorted(x))
self.assertEqual(ls(self.db.user.get('6', 'issues')), ['1', '2'])
self.assertEqual(ls(self.db.user.get('7', 'issues')), ['3'])
self.assertEqual(ls(self.db.user.get('10', 'issues')), ['6', '7', '8'])
n = self.db.user.getnode('6')
self.assertEqual(ls(n.issues), ['1', '2'])
# Now retire some linked-to issues and retry
self.db.issue.retire('6')
self.db.issue.retire('2')
self.db.issue.retire('3')
self.db.commit()
for filt in filter, filter_iter:
ae(filt(None, {'issues': ['3', '4']}), ['8'])
ae(filt(None, {'issues': ['1', '4', '8']}), ['6', '8', '10'])
ae(filt(None, {'issues.title': ['ts2']}), [])
ae(filt(None, {'issues': ['-1']}), ['1', '2', '3', '4', '5', '7'])
ae(filt(None, {'issues': '-1'}), ['1', '2', '3', '4', '5', '7'])
self.assertEqual(ls(self.db.user.get('6', 'issues')), ['1'])
self.assertEqual(ls(self.db.user.get('7', 'issues')), [])
self.assertEqual(ls(self.db.user.get('10', 'issues')), ['7', '8'])
def testFilteringLinkSortSearchMultilink(self): def testFilteringLinkSortSearchMultilink(self):
ae, filter, filter_iter = self.filteringSetup() ae, filter, filter_iter = self.filteringSetup()
a = 'assignedto' a = 'assignedto'
grp = (None, None) grp = (None, None)
for filt in filter, filter_iter: for filt in filter, filter_iter:
ae(filt(None, {'status.mls': '1'}, ('+','status')), ['2','3']) ae(filt(None, {'status.mls': '1'}, ('+','status')), ['2','3'])
ae(filt(None, {'status.mls': '2'}, ('+','status')), ['2','3']) ae(filt(None, {'status.mls': '2'}, ('+','status')), ['2','3'])
def testFilteringMultilinkAndGroup(self): def testFilteringMultilinkAndGroup(self):
"""testFilteringMultilinkAndGroup: """testFilteringMultilinkAndGroup:
skipping to change at line 1760 skipping to change at line 1912
ae(f(None, {'status': '1'}, ('+','id'), (None,None)), ['3']) ae(f(None, {'status': '1'}, ('+','id'), (None,None)), ['3'])
def testFilteringMultilink(self): def testFilteringMultilink(self):
ae, filter, filter_iter = self.filteringSetup() ae, filter, filter_iter = self.filteringSetup()
for filt in filter, filter_iter: for filt in filter, filter_iter:
ae(filt(None, {'nosy': '3'}, ('+','id'), (None,None)), ['4']) ae(filt(None, {'nosy': '3'}, ('+','id'), (None,None)), ['4'])
ae(filt(None, {'nosy': '-1'}, ('+','id'), (None,None)), ['1', '2']) ae(filt(None, {'nosy': '-1'}, ('+','id'), (None,None)), ['1', '2'])
ae(filt(None, {'nosy': ['1','2']}, ('+', 'status'), ae(filt(None, {'nosy': ['1','2']}, ('+', 'status'),
('-', 'deadline')), ['4', '3']) ('-', 'deadline')), ['4', '3'])
def testFilteringRevMultilink(self):
ae, filter, filter_iter = self.filteringSetupTransitiveSearch('user')
ni = 'nosy_issues'
self.db.issue.set('6', nosy=['3', '4', '5'])
self.db.issue.set('7', nosy=['5'])
# After this setup we have the following values for nosy:
# issue nosy
# 1: 4
# 2: 5
# 3:
# 4:
# 5:
# 6: 3, 4, 5
# 7: 5
# 8:
for filt in filter, filter_iter:
ae(filt(None, {ni: ['1', '2']}), ['4', '5'])
ae(filt(None, {ni: ['6','7']}), ['3', '4', '5'])
ae(filt(None, {'nosy_issues.title': ['ts2']}), ['5'])
ae(filt(None, {ni: ['-1']}), ['1', '2', '6', '7', '8', '9', '10'])
ae(filt(None, {ni: '-1'}), ['1', '2', '6', '7', '8', '9', '10'])
def ls(x):
return list(sorted(x))
self.assertEqual(ls(self.db.user.get('4', ni)), ['1', '6'])
self.assertEqual(ls(self.db.user.get('5', ni)), ['2', '6', '7'])
n = self.db.user.getnode('4')
self.assertEqual(ls(n.nosy_issues), ['1', '6'])
# Now retire some linked-to issues and retry
self.db.issue.retire('2')
self.db.issue.retire('6')
self.db.commit()
for filt in filter, filter_iter:
ae(filt(None, {ni: ['1', '2']}), ['4'])
ae(filt(None, {ni: ['6','7']}), ['5'])
ae(filt(None, {'nosy_issues.title': ['ts2']}), [])
ae(filt(None, {ni: ['-1']}),
['1', '2', '3', '6', '7', '8', '9', '10'])
ae(filt(None, {ni: '-1'}),
['1', '2', '3', '6', '7', '8', '9', '10'])
self.assertEqual(ls(self.db.user.get('4', ni)), ['1'])
self.assertEqual(ls(self.db.user.get('5', ni)), ['7'])
def testFilteringMany(self): def testFilteringMany(self):
ae, filter, filter_iter = self.filteringSetup() ae, filter, filter_iter = self.filteringSetup()
for f in filter, filter_iter: for f in filter, filter_iter:
ae(f(None, {'nosy': '2', 'status': '1'}, ('+','id'), (None,None)), ae(f(None, {'nosy': '2', 'status': '1'}, ('+','id'), (None,None)),
['3']) ['3'])
def testFilteringRangeBasic(self): def testFilteringRangeBasic(self):
ae, filter, filter_iter = self.filteringSetup() ae, filter, filter_iter = self.filteringSetup()
d = 'deadline' d = 'deadline'
for f in filter, filter_iter: for f in filter, filter_iter:
skipping to change at line 1802 skipping to change at line 1996
for n in range(1, month+1): for n in range(1, month+1):
i = self.db.issue.create(title='%d.%d'%(month, n), i = self.db.issue.create(title='%d.%d'%(month, n),
deadline=date.Date('2001-%02d-%02d.00:00'%(month, n))) deadline=date.Date('2001-%02d-%02d.00:00'%(month, n)))
self.db.commit() self.db.commit()
for month in range(1, 13): for month in range(1, 13):
for filt in filter, filter_iter: for filt in filter, filter_iter:
r = filt(None, dict(deadline='2001-%02d'%month)) r = filt(None, dict(deadline='2001-%02d'%month))
assert len(r) == month, 'month %d != length %d'%(month, len(r)) assert len(r) == month, 'month %d != length %d'%(month, len(r))
def testFilteringDateRangeMulti(self):
ae, filter, filter_iter = self.filteringSetup()
self.db.issue.create(title='no deadline')
self.db.commit()
for filt in filter, filter_iter:
r = filt (None, dict(deadline='-'))
self.assertEqual(r, ['5'])
r = filt (None, dict(deadline=';2003-02-01,2004;'))
self.assertEqual(r, ['2', '4'])
r = filt (None, dict(deadline='-,;2003-02-01,2004;'))
self.assertEqual(r, ['2', '4', '5'])
def testFilteringRangeInterval(self): def testFilteringRangeInterval(self):
ae, filter, filter_iter = self.filteringSetup() ae, filter, filter_iter = self.filteringSetup()
for filt in filter, filter_iter: for filt in filter, filter_iter:
ae(filt(None, {'foo': 'from 0:50 to 2:00'}), ['1']) ae(filt(None, {'foo': 'from 0:50 to 2:00'}), ['1'])
ae(filt(None, {'foo': 'from 0:50 to 1d 2:00'}), ['1', '2']) ae(filt(None, {'foo': 'from 0:50 to 1d 2:00'}), ['1', '2'])
ae(filt(None, {'foo': 'from 5:50'}), ['2']) ae(filt(None, {'foo': 'from 5:50'}), ['2'])
ae(filt(None, {'foo': 'to 0:05'}), []) ae(filt(None, {'foo': 'to 0:05'}), [])
def testFilteringRangeGeekInterval(self): def testFilteringRangeGeekInterval(self):
ae, filter, filter_iter = self.filteringSetup() ae, filter, filter_iter = self.filteringSetup()
skipping to change at line 1974 skipping to change at line 2180
ae(f(None, {'supervisor.username': 'grouplead1'}, ('+','username')), ae(f(None, {'supervisor.username': 'grouplead1'}, ('+','username')),
['6', '7']) ['6', '7'])
ae(f(None, {'supervisor.username': 'grouplead2'}, ('+','username')), ae(f(None, {'supervisor.username': 'grouplead2'}, ('+','username')),
['8', '9', '10']) ['8', '9', '10'])
ae(f(None, {'supervisor.username': 'grouplead2', ae(f(None, {'supervisor.username': 'grouplead2',
'supervisor.supervisor.username': 'ceo'}, ('+','username')), 'supervisor.supervisor.username': 'ceo'}, ('+','username')),
['8', '9', '10']) ['8', '9', '10'])
ae(f(None, {'supervisor.supervisor': '3', 'supervisor': '4'}, ae(f(None, {'supervisor.supervisor': '3', 'supervisor': '4'},
('+','username')), ['6', '7']) ('+','username')), ['6', '7'])
def testFilteringTransitiveLinkUserLimit(self):
ae, filter, filter_iter = self.filteringSetupTransitiveSearch('user')
for f in filter, filter_iter:
ae(f(None, {'supervisor.username': 'ceo'}, ('+','username'),
limit=1), ['4'])
ae(f(None, {'supervisor.supervisor.username': 'ceo'},
('+','username'), limit=4), ['6', '7', '8', '9'])
ae(f(None, {'supervisor.supervisor': '3'}, ('+','username'),
limit=2, offset=2), ['8', '9'])
ae(f(None, {'supervisor.supervisor.id': '3'}, ('+','username'),
limit=3, offset=1), ['7', '8', '9'])
ae(f(None, {'supervisor.username': 'grouplead2'}, ('+','username'),
limit=2, offset=2), ['10'])
ae(f(None, {'supervisor.username': 'grouplead2',
'supervisor.supervisor.username': 'ceo'}, ('+','username'),
limit=4, offset=3), [])
ae(f(None, {'supervisor.supervisor': '3', 'supervisor': '4'},
('+','username'), limit=1, offset=5), [])
def testFilteringTransitiveLinkSort(self): def testFilteringTransitiveLinkSort(self):
ae, filter, filter_iter = self.filteringSetupTransitiveSearch() ae, filter, filter_iter = self.filteringSetupTransitiveSearch()
ae, ufilter, ufilter_iter = self.iterSetup('user') ae, ufilter, ufilter_iter = self.iterSetup('user')
# Need to make ceo his own (and first two users') supervisor, # Need to make ceo his own (and first two users') supervisor,
# otherwise we will depend on sorting order of NULL values. # otherwise we will depend on sorting order of NULL values.
# Leave that to a separate test. # Leave that to a separate test.
self.db.user.set('1', supervisor = '3') self.db.user.set('1', supervisor = '3')
self.db.user.set('2', supervisor = '3') self.db.user.set('2', supervisor = '3')
self.db.user.set('3', supervisor = '3') self.db.user.set('3', supervisor = '3')
for ufilt in ufilter, ufilter_iter: for ufilt in ufilter, ufilter_iter:
skipping to change at line 2275 skipping to change at line 2500
klass.import_files('_test_export', str(id)) klass.import_files('_test_export', str(id))
maxid = max(maxid, id) maxid = max(maxid, id)
self.db.setid(cn, str(maxid+1)) self.db.setid(cn, str(maxid+1))
klass.import_journals(journals[cn]) klass.import_journals(journals[cn])
# This is needed, otherwise journals won't be there for anydbm # This is needed, otherwise journals won't be there for anydbm
self.db.commit() self.db.commit()
finally: finally:
shutil.rmtree('_test_export') shutil.rmtree('_test_export')
# compare with snapshot of the database # compare with snapshot of the database
for cn, items in orig.iteritems(): for cn, items in orig.items():
klass = self.db.classes[cn] klass = self.db.classes[cn]
propdefs = klass.getprops(1) propdefs = klass.getprops(1)
# ensure retired items are retired :) # ensure retired items are retired :)
l = items.keys(); l.sort() l = sorted(items.keys())
m = klass.list(); m.sort() m = klass.list(); m.sort()
ae(l, m, '%s id list wrong %r vs. %r'%(cn, l, m)) ae(l, m, '%s id list wrong %r vs. %r'%(cn, l, m))
for id, props in items.items(): for id, props in items.items():
for name, value in props.items(): for name, value in props.items():
l = klass.get(id, name) l = klass.get(id, name)
if isinstance(value, type([])): if isinstance(value, type([])):
value.sort() value.sort()
l.sort() l.sort()
try: try:
ae(l, value) ae(l, value)
except AssertionError: except AssertionError:
if not isinstance(propdefs[name], Date): if not isinstance(propdefs[name], Date):
raise raise
# don't get hung up on rounding errors # don't get hung up on rounding errors
assert not l.__cmp__(value, int_seconds=1) assert not l.__cmp__(value, int_seconds=1)
for jc, items in origj.iteritems(): for jc, items in origj.items():
for id, oj in items.iteritems(): for id, oj in items.items():
rj = self.db.getjournal(jc, id) rj = self.db.getjournal(jc, id)
# Both mysql and postgresql have some minor issues with # Both mysql and postgresql have some minor issues with
# rounded seconds on export/import, so we compare only # rounded seconds on export/import, so we compare only
# the integer part. # the integer part.
for j in oj: for j in oj:
j[1].second = float(int(j[1].second)) j[1].second = float(int(j[1].second))
for j in rj: for j in rj:
j[1].second = float(int(j[1].second)) j[1].second = float(int(j[1].second))
oj.sort() oj.sort(key = NoneAndDictComparable)
rj.sort() rj.sort(key = NoneAndDictComparable)
ae(oj, rj) ae(oj, rj)
# make sure the retired items are actually imported # make sure the retired items are actually imported
ae(self.db.user.get('4', 'username'), 'blop') ae(self.db.user.get('4', 'username'), 'blop')
ae(self.db.issue.get('2', 'title'), 'issue two') ae(self.db.issue.get('2', 'title'), 'issue two')
# make sure id counters are set correctly # make sure id counters are set correctly
maxid = max([int(id) for id in self.db.user.list()]) maxid = max([int(id) for id in self.db.user.list()])
newid = self.db.user.create(username='testing') newid = int(self.db.user.create(username='testing'))
assert newid > maxid assert newid > maxid
# test import/export via admin interface # test import/export via admin interface
def testAdminImportExport(self): def testAdminImportExport(self):
import roundup.admin import roundup.admin
import csv import csv
# use the filtering setup to create a bunch of items # use the filtering setup to create a bunch of items
ae, dummy1, dummy2 = self.filteringSetup() ae, dummy1, dummy2 = self.filteringSetup()
# create large field # create large field
self.db.priority.create(name = 'X' * 500) self.db.priority.create(name = 'X' * 500)
skipping to change at line 2346 skipping to change at line 2571
try: try:
roundup.admin.sys.stderr.write = stderrwrite roundup.admin.sys.stderr.write = stderrwrite
tool = roundup.admin.AdminTool() tool = roundup.admin.AdminTool()
home = '.' home = '.'
tool.tracker_home = home tool.tracker_home = home
tool.db = self.db tool.db = self.db
tool.verbose = False tool.verbose = False
tool.do_export (['_test_export']) tool.do_export (['_test_export'])
self.assertEqual(len(output), 2) self.assertEqual(len(output), 2)
self.assertEqual(output [1], '\n') self.assertEqual(output [1], '\n')
self.failUnless(output [0].startswith self.assertTrue(output [0].startswith
('Warning: config csv_field_size should be at least')) ('Warning: config csv_field_size should be at least'))
self.failUnless(int(output[0].split()[-1]) > 500) self.assertTrue(int(output[0].split()[-1]) > 500)
if hasattr(roundup.admin.csv, 'field_size_limit'): if hasattr(roundup.admin.csv, 'field_size_limit'):
self.nukeAndCreate() self.nukeAndCreate()
self.db.config.CSV_FIELD_SIZE = 400 self.db.config.CSV_FIELD_SIZE = 400
tool = roundup.admin.AdminTool() tool = roundup.admin.AdminTool()
tool.tracker_home = home tool.tracker_home = home
tool.db = self.db tool.db = self.db
tool.verbose = False tool.verbose = False
self.assertRaises(csv.Error, tool.do_import, ['_test_export']) self.assertRaises(csv.Error, tool.do_import, ['_test_export'])
skipping to change at line 2417 skipping to change at line 2642
self.assertEqual(props, {'fullname': 'robert', 'friends': '+rouilj,+ other', 'key': None}) self.assertEqual(props, {'fullname': 'robert', 'friends': '+rouilj,+ other', 'key': None})
# test get_class() # test get_class()
self.assertRaises(UsageError, tool.get_class, "bar") # invalid class self.assertRaises(UsageError, tool.get_class, "bar") # invalid class
# This writes to stdout, need to figure out how to redirect to a var iable. # This writes to stdout, need to figure out how to redirect to a var iable.
# classhandle = tool.get_class("user") # valid class # classhandle = tool.get_class("user") # valid class
# FIXME there should be some test here # FIXME there should be some test here
issue_class_spec = tool.do_specification(["issue"]) issue_class_spec = tool.do_specification(["issue"])
self.assertEqual(soutput, ['files: <roundup.hyperdb.Multilink to "fi self.assertEqual(sorted (soutput),
le">\n', ['assignedto: <roundup.hyperdb.Link to "user">\n',
'status: <roundup.hyperdb.Link to "status 'deadline: <roundup.hyperdb.Date>\n',
">\n', 'feedback: <roundup.hyperdb.Link to "msg">\n',
'feedback: <roundup.hyperdb.Link to "msg" 'files: <roundup.hyperdb.Multilink to "file">\n',
>\n', 'foo: <roundup.hyperdb.Interval>\n',
'spam: <roundup.hyperdb.Multilink to "msg 'messages: <roundup.hyperdb.Multilink to "msg">\n'
">\n', ,
'nosy: <roundup.hyperdb.Multilink to "use 'nosy: <roundup.hyperdb.Multilink to "user">\n',
r">\n', 'priority: <roundup.hyperdb.Link to "priority">\n'
'title: <roundup.hyperdb.String>\n', ,
'messages: <roundup.hyperdb.Multilink to 'spam: <roundup.hyperdb.Multilink to "msg">\n',
"msg">\n', 'status: <roundup.hyperdb.Link to "status">\n',
'priority: <roundup.hyperdb.Link to "prio 'superseder: <roundup.hyperdb.Multilink to "issue"
rity">\n', >\n',
'assignedto: <roundup.hyperdb.Link to "us 'title: <roundup.hyperdb.String>\n'])
er">\n',
'deadline: <roundup.hyperdb.Date>\n',
'foo: <roundup.hyperdb.Interval>\n',
'superseder: <roundup.hyperdb.Multilink t
o "issue">\n'])
#userclassprop=tool.do_list(["mls"]) #userclassprop=tool.do_list(["mls"])
#tool.print_designator = False #tool.print_designator = False
#userclassprop=tool.do_get(["realname","user1"]) #userclassprop=tool.do_get(["realname","user1"])
# test do_create # test do_create
soutput[:] = [] # empty for next round of output soutput[:] = [] # empty for next round of output
userclass=tool.do_create(["issue", "title='title1 title'", "nosy=1,3 "]) # should be issue 5 userclass=tool.do_create(["issue", "title='title1 title'", "nosy=1,3 "]) # should be issue 5
userclass=tool.do_create(["issue", "title='title2 title'", "nosy=2,3 "]) # should be issue 6 userclass=tool.do_create(["issue", "title='title2 title'", "nosy=2,3 "]) # should be issue 6
self.assertEqual(soutput, ['5\n', '6\n']) self.assertEqual(soutput, ['5\n', '6\n'])
skipping to change at line 2456 skipping to change at line 2682
userclass=tool.do_set(["issue5,issue6", "nosy=-3"]) userclass=tool.do_set(["issue5,issue6", "nosy=-3"])
# verify proper result # verify proper result
props=self.db.issue.get('5', "nosy") props=self.db.issue.get('5', "nosy")
self.assertEqual(props, ['1']) self.assertEqual(props, ['1'])
props=self.db.issue.get('6', "nosy") props=self.db.issue.get('6', "nosy")
self.assertEqual(props, ['2']) self.assertEqual(props, ['2'])
# basic usage test. TODO add full output verification # basic usage test. TODO add full output verification
soutput[:] = [] # empty for next round of output soutput[:] = [] # empty for next round of output
tool.usage(message="Hello World") tool.usage(message="Hello World")
self.failUnless(soutput[0].startswith('Problem: Hello World'), None) self.assertTrue(soutput[0].startswith('Problem: Hello World'), None)
# check security output # check security output
soutput[:] = [] # empty for next round of output soutput[:] = [] # empty for next round of output
tool.do_security("Admin") tool.do_security("Admin")
self.assertEqual(soutput, [ 'New Web users get the Role "User"\n', expected = [ 'New Web users get the Role "User"\n',
'New Email users get the Role "User"\n', 'New Email users get the Role "User"\n',
'Role "admin":\n', 'Role "admin":\n',
' User may create everything (Create)\n', ' User may create everything (Create)\n',
' User may edit everything (Edit)\n', ' User may edit everything (Edit)\n',
' User may restore everything (Restore)\n ' User may restore everything (Restore)\n',
', ' User may retire everything (Retire)\n',
' User may retire everything (Retire)\n', ' User may view everything (View)\n',
' User may view everything (View)\n', ' User may access the web interface (Web Access)\n',
' User may access the web interface (Web ' User may access the rest interface (Rest Access)\n',
Access)\n', ' User may access the xmlrpc interface (Xmlrpc Access)
' User may manipulate user Roles through \n',
the web (Web Roles)\n', ' User may manipulate user Roles through the web (Web
' User may use the email interface (Email Roles)\n',
Access)\n', ' User may use the email interface (Email Access)\n',
'Role "anonymous":\n', 'Role "user":\n', 'Role "anonymous":\n', 'Role "user":\n',
' User is allowed to access msg (View for ' User is allowed to access msg (View for "msg" only)\
"msg" only)\n', n',
' Prevent users from seeing roles (View f ' Prevent users from seeing roles (View for "user": [\
or "user": [\'username\', \'supervisor\', \'assignable\'] only)\n']) 'username\', \'supervisor\', \'assignable\'] only)\n']
self.assertEqual(soutput, expected)
self.nukeAndCreate() self.nukeAndCreate()
tool = roundup.admin.AdminTool() tool = roundup.admin.AdminTool()
tool.tracker_home = home tool.tracker_home = home
tool.db = self.db tool.db = self.db
tool.verbose = False tool.verbose = False
finally: finally:
roundup.admin.sys = sys roundup.admin.sys = sys
# test duplicate relative tracker home initialisation (issue2550757) # test duplicate relative tracker home initialisation (issue2550757)
skipping to change at line 2519 skipping to change at line 2749
shutil.rmtree(t) shutil.rmtree(t)
def testAddProperty(self): def testAddProperty(self):
self.db.issue.create(title="spam", status='1') self.db.issue.create(title="spam", status='1')
self.db.commit() self.db.commit()
self.db.issue.addprop(fixer=Link("user")) self.db.issue.addprop(fixer=Link("user"))
# force any post-init stuff to happen # force any post-init stuff to happen
self.db.post_init() self.db.post_init()
props = self.db.issue.getprops() props = self.db.issue.getprops()
keys = props.keys() keys = sorted(props.keys())
keys.sort()
self.assertEqual(keys, ['activity', 'actor', 'assignedto', 'creation', self.assertEqual(keys, ['activity', 'actor', 'assignedto', 'creation',
'creator', 'deadline', 'feedback', 'files', 'fixer', 'foo', 'id', 'm essages', 'creator', 'deadline', 'feedback', 'files', 'fixer', 'foo', 'id', 'm essages',
'nosy', 'priority', 'spam', 'status', 'superseder', 'title']) 'nosy', 'priority', 'spam', 'status', 'superseder', 'title'])
self.assertEqual(self.db.issue.get('1', "fixer"), None) self.assertEqual(self.db.issue.get('1', "fixer"), None)
def testRemoveProperty(self): def testRemoveProperty(self):
self.db.issue.create(title="spam", status='1') self.db.issue.create(title="spam", status='1')
self.db.commit() self.db.commit()
del self.db.issue.properties['title'] del self.db.issue.properties['title']
self.db.post_init() self.db.post_init()
props = self.db.issue.getprops() props = self.db.issue.getprops()
keys = props.keys() keys = sorted(props.keys())
keys.sort()
self.assertEqual(keys, ['activity', 'actor', 'assignedto', 'creation', self.assertEqual(keys, ['activity', 'actor', 'assignedto', 'creation',
'creator', 'deadline', 'feedback', 'files', 'foo', 'id', 'messages', 'creator', 'deadline', 'feedback', 'files', 'foo', 'id', 'messages',
'nosy', 'priority', 'spam', 'status', 'superseder']) 'nosy', 'priority', 'spam', 'status', 'superseder'])
self.assertEqual(self.db.issue.list(), ['1']) self.assertEqual(self.db.issue.list(), ['1'])
def testAddRemoveProperty(self): def testAddRemoveProperty(self):
self.db.issue.create(title="spam", status='1') self.db.issue.create(title="spam", status='1')
self.db.commit() self.db.commit()
self.db.issue.addprop(fixer=Link("user")) self.db.issue.addprop(fixer=Link("user"))
del self.db.issue.properties['title'] del self.db.issue.properties['title']
self.db.post_init() self.db.post_init()
props = self.db.issue.getprops() props = self.db.issue.getprops()
keys = props.keys() keys = sorted(props.keys())
keys.sort()
self.assertEqual(keys, ['activity', 'actor', 'assignedto', 'creation', self.assertEqual(keys, ['activity', 'actor', 'assignedto', 'creation',
'creator', 'deadline', 'feedback', 'files', 'fixer', 'foo', 'id', 'creator', 'deadline', 'feedback', 'files', 'fixer', 'foo', 'id',
'messages', 'nosy', 'priority', 'spam', 'status', 'superseder']) 'messages', 'nosy', 'priority', 'spam', 'status', 'superseder'])
self.assertEqual(self.db.issue.list(), ['1']) self.assertEqual(self.db.issue.list(), ['1'])
def testNosyMail(self) : def testNosyMail(self) :
"""Creates one issue with two attachments, one smaller and one larger """Creates one issue with two attachments, one smaller and one larger
than the set max_attachment_size. than the set max_attachment_size.
""" """
old_translate_ = roundupdb._ old_translate_ = roundupdb._
roundupdb._ = i18n.get_translation(language='C').gettext roundupdb._ = i18n.get_translation(language='C').gettext
db = self.db db = self.db
db.config.NOSY_MAX_ATTACHMENT_SIZE = 4096 db.config.NOSY_MAX_ATTACHMENT_SIZE = 4096
res = dict(mail_to = None, mail_msg = None) res = dict(mail_to = None, mail_msg = None)
def dummy_snd(s, to, msg, res=res) : def dummy_snd(s, to, msg, res=res) :
res["mail_to"], res["mail_msg"] = to, msg res["mail_to"], res["mail_msg"] = to, msg
backup, Mailer.smtp_send = Mailer.smtp_send, dummy_snd backup, Mailer.smtp_send = Mailer.smtp_send, dummy_snd
try : try :
f1 = db.file.create(name="test1.txt", content="x" * 20) f1 = db.file.create(name="test1.txt", content="x" * 20, type="applic
f2 = db.file.create(name="test2.txt", content="y" * 5000) ation/octet-stream")
f2 = db.file.create(name="test2.txt", content="y" * 5000, type="appl
ication/octet-stream")
m = db.msg.create(content="one two", author="admin",
files = [f1, f2])
i = db.issue.create(title='spam', files = [f1, f2],
messages = [m], nosy = [db.user.lookup("fred")])
db.issue.nosymessage(i, m, {})
mail_msg = str(res["mail_msg"])
self.assertEqual(res["mail_to"], ["fred@example.com"])
self.assertTrue("From: admin" in mail_msg)
self.assertTrue("Subject: [issue1] spam" in mail_msg)
self.assertTrue("New submission from admin" in mail_msg)
self.assertTrue("one two" in mail_msg)
self.assertTrue("File 'test1.txt' not attached" not in mail_msg)
self.assertTrue(b2s(base64_encode(s2b("xxx"))).rstrip() in mail_msg)
self.assertTrue("File 'test2.txt' not attached" in mail_msg)
self.assertTrue(b2s(base64_encode(s2b("yyy"))).rstrip() not in mail_
msg)
finally :
roundupdb._ = old_translate_
Mailer.smtp_send = backup
def testNosyMailTextAndBinary(self) :
"""Creates one issue with two attachments, one as text and one as binary
.
"""
old_translate_ = roundupdb._
roundupdb._ = i18n.get_translation(language='C').gettext
db = self.db
res = dict(mail_to = None, mail_msg = None)
def dummy_snd(s, to, msg, res=res) :
res["mail_to"], res["mail_msg"] = to, msg
backup, Mailer.smtp_send = Mailer.smtp_send, dummy_snd
try :
f1 = db.file.create(name="test1.txt", content="Hello world", type="t
ext/plain")
f2 = db.file.create(name="test2.bin", content=b"\x01\x02\x03\xfe\xff
", type="application/octet-stream")
m = db.msg.create(content="one two", author="admin", m = db.msg.create(content="one two", author="admin",
files = [f1, f2]) files = [f1, f2])
i = db.issue.create(title='spam', files = [f1, f2], i = db.issue.create(title='spam', files = [f1, f2],
messages = [m], nosy = [db.user.lookup("fred")]) messages = [m], nosy = [db.user.lookup("fred")])
db.issue.nosymessage(i, m, {}) db.issue.nosymessage(i, m, {})
mail_msg = str(res["mail_msg"]) mail_msg = str(res["mail_msg"])
self.assertEqual(res["mail_to"], ["fred@example.com"]) self.assertEqual(res["mail_to"], ["fred@example.com"])
self.assert_("From: admin" in mail_msg) self.assertTrue("From: admin" in mail_msg)
self.assert_("Subject: [issue1] spam" in mail_msg) self.assertTrue("Subject: [issue1] spam" in mail_msg)
self.assert_("New submission from admin" in mail_msg) self.assertTrue("New submission from admin" in mail_msg)
self.assert_("one two" in mail_msg) self.assertTrue("one two" in mail_msg)
self.assert_("File 'test1.txt' not attached" not in mail_msg) self.assertTrue("Hello world" in mail_msg)
self.assert_(base64.encodestring("xxx").rstrip() in mail_msg) self.assertTrue(b2s(base64_encode(b"\x01\x02\x03\xfe\xff")).rstrip()
self.assert_("File 'test2.txt' not attached" in mail_msg) in mail_msg)
self.assert_(base64.encodestring("yyy").rstrip() not in mail_msg)
finally : finally :
roundupdb._ = old_translate_ roundupdb._ = old_translate_
Mailer.smtp_send = backup Mailer.smtp_send = backup
@pytest.mark.skipif(gpgmelib.pyme is None, reason='Skipping PGPNosy test') @pytest.mark.skipif(gpgmelib.gpg is None, reason='Skipping PGPNosy test')
def testPGPNosyMail(self) : def testPGPNosyMail(self) :
"""Creates one issue with two attachments, one smaller and one larger """Creates one issue with two attachments, one smaller and one larger
than the set max_attachment_size. Recipients are one with and than the set max_attachment_size. Recipients are one with and
one without encryption enabled via a gpg group. one without encryption enabled via a gpg group.
""" """
old_translate_ = roundupdb._ old_translate_ = roundupdb._
roundupdb._ = i18n.get_translation(language='C').gettext roundupdb._ = i18n.get_translation(language='C').gettext
db = self.db db = self.db
db.config.NOSY_MAX_ATTACHMENT_SIZE = 4096 db.config.NOSY_MAX_ATTACHMENT_SIZE = 4096
db.config['PGP_HOMEDIR'] = gpgmelib.pgphome db.config['PGP_HOMEDIR'] = gpgmelib.pgphome
skipping to change at line 2612 skipping to change at line 2870
db.config['PGP_ENABLE'] = True db.config['PGP_ENABLE'] = True
db.config['PGP_ENCRYPT'] = True db.config['PGP_ENCRYPT'] = True
gpgmelib.setUpPGP() gpgmelib.setUpPGP()
res = [] res = []
def dummy_snd(s, to, msg, res=res) : def dummy_snd(s, to, msg, res=res) :
res.append (dict (mail_to = to, mail_msg = msg)) res.append (dict (mail_to = to, mail_msg = msg))
backup, Mailer.smtp_send = Mailer.smtp_send, dummy_snd backup, Mailer.smtp_send = Mailer.smtp_send, dummy_snd
try : try :
john = db.user.create(username="john", roles='User,pgp', john = db.user.create(username="john", roles='User,pgp',
address='john@test.test', realname='John Doe') address='john@test.test', realname='John Doe')
f1 = db.file.create(name="test1.txt", content="x" * 20) f1 = db.file.create(name="test1.txt", content="x" * 20, type="applic
f2 = db.file.create(name="test2.txt", content="y" * 5000) ation/octet-stream")
f2 = db.file.create(name="test2.txt", content="y" * 5000, type="appl
ication/octet-stream")
m = db.msg.create(content="one two", author="admin", m = db.msg.create(content="one two", author="admin",
files = [f1, f2]) files = [f1, f2])
i = db.issue.create(title='spam', files = [f1, f2], i = db.issue.create(title='spam', files = [f1, f2],
messages = [m], nosy = [db.user.lookup("fred"), john]) messages = [m], nosy = [db.user.lookup("fred"), john])
db.issue.nosymessage(i, m, {}) db.issue.nosymessage(i, m, {})
res.sort(key=lambda x: x['mail_to']) res.sort(key=lambda x: x['mail_to'])
self.assertEqual(res[0]["mail_to"], ["fred@example.com"]) self.assertEqual(res[0]["mail_to"], ["fred@example.com"])
self.assertEqual(res[1]["mail_to"], ["john@test.test"]) self.assertEqual(res[1]["mail_to"], ["john@test.test"])
mail_msg = str(res[0]["mail_msg"]) mail_msg = str(res[0]["mail_msg"])
self.assert_("From: admin" in mail_msg) self.assertTrue("From: admin" in mail_msg)
self.assert_("Subject: [issue1] spam" in mail_msg) self.assertTrue("Subject: [issue1] spam" in mail_msg)
self.assert_("New submission from admin" in mail_msg) self.assertTrue("New submission from admin" in mail_msg)
self.assert_("one two" in mail_msg) self.assertTrue("one two" in mail_msg)
self.assert_("File 'test1.txt' not attached" not in mail_msg) self.assertTrue("File 'test1.txt' not attached" not in mail_msg)
self.assert_(base64.encodestring("xxx").rstrip() in mail_msg) self.assertTrue(b2s(base64_encode(s2b("xxx"))).rstrip() in mail_msg)
self.assert_("File 'test2.txt' not attached" in mail_msg) self.assertTrue("File 'test2.txt' not attached" in mail_msg)
self.assert_(base64.encodestring("yyy").rstrip() not in mail_msg) self.assertTrue(b2s(base64_encode(s2b("yyy"))).rstrip() not in mail_
fp = FeedParser() msg)
mail_msg = str(res[1]["mail_msg"]) mail_msg = str(res[1]["mail_msg"])
fp.feed(mail_msg) parts = message_from_string(mail_msg).get_payload()
parts = fp.close().get_payload()
self.assertEqual(len(parts),2) self.assertEqual(len(parts),2)
self.assertEqual(parts[0].get_payload().strip(), 'Version: 1') self.assertEqual(parts[0].get_payload().strip(), 'Version: 1')
crypt = gpgmelib.pyme.core.Data(parts[1].get_payload()) crypt = gpgmelib.gpg.core.Data(parts[1].get_payload())
plain = gpgmelib.pyme.core.Data() plain = gpgmelib.gpg.core.Data()
ctx = gpgmelib.pyme.core.Context() ctx = gpgmelib.gpg.core.Context()
res = ctx.op_decrypt(crypt, plain) res = ctx.op_decrypt(crypt, plain)
self.assertEqual(res, None) self.assertEqual(res, None)
plain.seek(0,0) plain.seek(0,0)
fp = FeedParser() self.assertTrue("From: admin" in mail_msg)
fp.feed(plain.read()) self.assertTrue("Subject: [issue1] spam" in mail_msg)
self.assert_("From: admin" in mail_msg) mail_msg = str(message_from_bytes(plain.read()))
self.assert_("Subject: [issue1] spam" in mail_msg) self.assertTrue("New submission from admin" in mail_msg)
mail_msg = str(fp.close()) self.assertTrue("one two" in mail_msg)
self.assert_("New submission from admin" in mail_msg) self.assertTrue("File 'test1.txt' not attached" not in mail_msg)
self.assert_("one two" in mail_msg) self.assertTrue(b2s(base64_encode(s2b("xxx"))).rstrip() in mail_msg)
self.assert_("File 'test1.txt' not attached" not in mail_msg) self.assertTrue("File 'test2.txt' not attached" in mail_msg)
self.assert_(base64.encodestring("xxx").rstrip() in mail_msg) self.assertTrue(b2s(base64_encode(s2b("yyy"))).rstrip() not in mail_
self.assert_("File 'test2.txt' not attached" in mail_msg) msg)
self.assert_(base64.encodestring("yyy").rstrip() not in mail_msg)
finally : finally :
roundupdb._ = old_translate_ roundupdb._ = old_translate_
Mailer.smtp_send = backup Mailer.smtp_send = backup
gpgmelib.tearDownPGP() gpgmelib.tearDownPGP()
class ROTest(MyTestCase): class ROTest(MyTestCase):
def setUp(self): def setUp(self):
# remove previous test, ignore errors # remove previous test, ignore errors
if os.path.exists(config.DATABASE): if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE) shutil.rmtree(config.DATABASE)
skipping to change at line 2709 skipping to change at line 2963
def init_a(self): def init_a(self):
self.open_database() self.open_database()
a = self.module.Class(self.db, "a", name=String()) a = self.module.Class(self.db, "a", name=String())
a.setkey("name") a.setkey("name")
self.db.post_init() self.db.post_init()
def test_fileClassProps(self): def test_fileClassProps(self):
self.open_database() self.open_database()
a = self.module.FileClass(self.db, 'a') a = self.module.FileClass(self.db, 'a')
l = a.getprops().keys() l = sorted(a.getprops().keys())
l.sort() self.assertTrue(l, ['activity', 'actor', 'content', 'created',
self.assert_(l, ['activity', 'actor', 'content', 'created',
'creation', 'type']) 'creation', 'type'])
def init_ab(self): def init_ab(self):
self.open_database() self.open_database()
a = self.module.Class(self.db, "a", name=String()) a = self.module.Class(self.db, "a", name=String())
a.setkey("name") a.setkey("name")
b = self.module.Class(self.db, "b", name=String(), b = self.module.Class(self.db, "b", name=String(),
fooz=Multilink('a')) fooz=Multilink('a'))
b.setkey("name") b.setkey("name")
self.db.post_init() self.db.post_init()
skipping to change at line 2777 skipping to change at line 3030
bid = self.db.b.create(name='bear') bid = self.db.b.create(name='bear')
self.db.commit(); self.db.close() self.db.commit(); self.db.close()
# modify "a" schema # modify "a" schema
self.init_amod() self.init_amod()
self.assertEqual(self.db.a.get(aid, 'name'), 'apple') self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
self.assertEqual(self.db.a.get(aid, 'newstr'), None) self.assertEqual(self.db.a.get(aid, 'newstr'), None)
self.assertEqual(self.db.a.get(aid, 'newint'), None) self.assertEqual(self.db.a.get(aid, 'newint'), None)
# hack - metakit can't return None for missing values, and we're not # hack - metakit can't return None for missing values, and we're not
# really checking for that behavior here anyway # really checking for that behavior here anyway
self.assert_(not self.db.a.get(aid, 'newnum')) self.assertTrue(not self.db.a.get(aid, 'newnum'))
self.assert_(not self.db.a.get(aid, 'newbool')) self.assertTrue(not self.db.a.get(aid, 'newbool'))
self.assertEqual(self.db.a.get(aid, 'newdate'), None) self.assertEqual(self.db.a.get(aid, 'newdate'), None)
self.assertEqual(self.db.b.get(aid, 'name'), 'bear') self.assertEqual(self.db.b.get(aid, 'name'), 'bear')
aid2 = self.db.a.create(name='aardvark', newstr='booz') aid2 = self.db.a.create(name='aardvark', newstr='booz')
self.db.commit(); self.db.close() self.db.commit(); self.db.close()
# test # test
self.init_amod() self.init_amod()
self.assertEqual(self.db.a.get(aid, 'name'), 'apple') self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
self.assertEqual(self.db.a.get(aid, 'newstr'), None) self.assertEqual(self.db.a.get(aid, 'newstr'), None)
self.assertEqual(self.db.b.get(aid, 'name'), 'bear') self.assertEqual(self.db.b.get(aid, 'name'), 'bear')
skipping to change at line 2997 skipping to change at line 3250
result = [] result = []
self.db.clearCache() self.db.clearCache()
for id in ufilt(None, {}, [('+','supervisor.supervisor.supervisor'), for id in ufilt(None, {}, [('+','supervisor.supervisor.supervisor'),
('-','supervisor.supervisor'), ('-','supervisor'), ('-','supervisor.supervisor'), ('-','supervisor'),
('+','username')]): ('+','username')]):
result.append(id) result.append(id)
nodeid = id nodeid = id
for x in range(4): for x in range(4):
assert(('user', nodeid) in self.db.cache) assert(('user', nodeid) in self.db.cache)
n = self.db.user.getnode(nodeid) n = self.db.user.getnode(nodeid)
for k, v in user_result[nodeid].iteritems(): for k, v in user_result[nodeid].items():
ae((k, n[k]), (k, v)) ae((k, n[k]), (k, v))
for k in 'creation', 'activity': for k in 'creation', 'activity':
assert(n[k]) assert(n[k])
nodeid = n.supervisor nodeid = n.supervisor
self.db.clearCache() self.db.clearCache()
ae (result, ['8', '9', '10', '6', '7', '1', '3', '2', '4', '5']) ae (result, ['8', '9', '10', '6', '7', '1', '3', '2', '4', '5'])
result = [] result = []
self.db.clearCache() self.db.clearCache()
for id in filt(None, {}, for id in filt(None, {},
[('+','assignedto.supervisor.supervisor.supervisor'), [('+','assignedto.supervisor.supervisor.supervisor'),
('+','assignedto.supervisor.supervisor'), ('+','assignedto.supervisor.supervisor'),
('-','assignedto.supervisor'), ('+','assignedto')]): ('-','assignedto.supervisor'), ('+','assignedto')]):
result.append(id) result.append(id)
assert(('issue', id) in self.db.cache) assert(('issue', id) in self.db.cache)
n = self.db.issue.getnode(id) n = self.db.issue.getnode(id)
for k, v in issue_result[id].iteritems(): for k, v in issue_result[id].items():
ae((k, n[k]), (k, v)) ae((k, n[k]), (k, v))
for k in 'creation', 'activity': for k in 'creation', 'activity':
assert(n[k]) assert(n[k])
nodeid = n.assignedto nodeid = n.assignedto
for x in range(4): for x in range(4):
assert(('user', nodeid) in self.db.cache) assert(('user', nodeid) in self.db.cache)
n = self.db.user.getnode(nodeid) n = self.db.user.getnode(nodeid)
for k, v in user_result[nodeid].iteritems(): for k, v in user_result[nodeid].items():
ae((k, n[k]), (k, v)) ae((k, n[k]), (k, v))
for k in 'creation', 'activity': for k in 'creation', 'activity':
assert(n[k]) assert(n[k])
nodeid = n.supervisor nodeid = n.supervisor
self.db.clearCache() self.db.clearCache()
ae (result, ['4', '5', '6', '7', '8', '1', '2', '3']) ae (result, ['4', '5', '6', '7', '8', '1', '2', '3'])
class ClassicInitBase(object): class ClassicInitBase(object):
count = 0 count = 0
db = None db = None
skipping to change at line 3229 skipping to change at line 3482
def setUp(self): def setUp(self):
self.dirname = '_test_cgi_form' self.dirname = '_test_cgi_form'
# set up and open a tracker # set up and open a tracker
self.instance = setupTracker(self.dirname, backend = self.backend) self.instance = setupTracker(self.dirname, backend = self.backend)
# We may want to register separate detectors # We may want to register separate detectors
self.setupDetectors() self.setupDetectors()
# open the database # open the database
self.db = self.instance.open('admin') self.db = self.instance.open('admin')
self.db.Otk = MockNull()
self.db.Otk.data = {}
self.db.Otk.getall = self.data_get
self.db.Otk.set = self.data_set
self.db.tx_Source = "web" self.db.tx_Source = "web"
self.db.user.create(username='Chef', address='chef@bork.bork.bork', self.db.user.create(username='Chef', address='chef@bork.bork.bork',
realname='Bork, Chef', roles='User') realname='Bork, Chef', roles='User')
self.db.user.create(username='mary', address='mary@test.test', self.db.user.create(username='mary', address='mary@test.test',
roles='User', realname='Contrary, Mary') roles='User', realname='Contrary, Mary')
self.db.issue.addprop(tx_Source=hyperdb.String()) self.db.issue.addprop(tx_Source=hyperdb.String())
self.db.msg.addprop(tx_Source=hyperdb.String()) self.db.msg.addprop(tx_Source=hyperdb.String())
self.db.post_init() self.db.post_init()
skipping to change at line 3254 skipping to change at line 3511
cl.nodeid = nodeid cl.nodeid = nodeid
cl.language = ('en',) cl.language = ('en',)
cl.userid = '1' cl.userid = '1'
cl.db = self.db cl.db = self.db
cl.user = 'admin' cl.user = 'admin'
cl.template = template cl.template = template
if env_addon is not None: if env_addon is not None:
cl.env.update(env_addon) cl.env.update(env_addon)
return cl return cl
def data_get(self, key):
return self.db.Otk.data[key]
def data_set(self, key, **value):
self.db.Otk.data[key] = value
def parseForm(self, form, classname='test', nodeid=None): def parseForm(self, form, classname='test', nodeid=None):
cl = self.setupClient(form, classname, nodeid) cl = self.setupClient(form, classname, nodeid)
return cl.parsePropsFromForm(create=1) return cl.parsePropsFromForm(create=1)
def tearDown(self): def tearDown(self):
self.db.close() self.db.close()
try: try:
shutil.rmtree(self.dirname) shutil.rmtree(self.dirname)
except OSError as error: except OSError as error:
if error.errno not in (errno.ENOENT, errno.ESRCH): raise if error.errno not in (errno.ENOENT, errno.ESRCH): raise
class SpecialAction(actions.EditItemAction): class SpecialAction(actions.EditItemAction):
x = False x = False
def handle(self): def handle(self):
self.__class__.x = True self.__class__.x = True
cl = self.db.getclass(self.classname) cl = self.db.getclass(self.classname)
cl.set(self.nodeid, status='2') cl.set(self.nodeid, status='2')
cl.set(self.nodeid, title="Just a test") cl.set(self.nodeid, title="Just a test")
assert(0, "not reached") assert 0, "not reached"
self.db.commit() self.db.commit()
def reject_title(db, cl, nodeid, newvalues): def reject_title(db, cl, nodeid, newvalues):
if 'title' in newvalues: if 'title' in newvalues:
raise Reject ("REJECT TITLE CHANGE") raise Reject ("REJECT TITLE CHANGE")
def init_reject(db): def init_reject(db):
db.issue.audit("set", reject_title) db.issue.audit("set", reject_title)
def get_extensions(self, what): def get_extensions(self, what):
skipping to change at line 3305 skipping to change at line 3568
def ge(what): def ge(what):
return get_extensions(self.instance, what) return get_extensions(self.instance, what)
self.instance.get_extensions = ge self.instance.get_extensions = ge
def setUp(self): def setUp(self):
FormTestParent.setUp(self) FormTestParent.setUp(self)
self.instance.registerAction('special', SpecialAction) self.instance.registerAction('special', SpecialAction)
self.issue = self.db.issue.create (title = "hello", status='1') self.issue = self.db.issue.create (title = "hello", status='1')
self.db.commit () self.db.commit ()
if not os.environ.has_key('SENDMAILDEBUG'): if 'SENDMAILDEBUG' not in os.environ:
os.environ['SENDMAILDEBUG'] = 'mail-test2.log' os.environ['SENDMAILDEBUG'] = 'mail-test2.log'
self.SENDMAILDEBUG = os.environ['SENDMAILDEBUG'] self.SENDMAILDEBUG = os.environ['SENDMAILDEBUG']
page_template = """ page_template = """
<html> <html>
<body> <body>
<p tal:condition="options/error_message|nothing" <p tal:condition="options/error_message|nothing"
tal:repeat="m options/error_message" tal:repeat="m options/error_message"
tal:content="structure m"/> tal:content="structure m"/>
<p tal:content="context/title/plain"/> <p tal:content="context/title/plain"/>
<p tal:content="context/status/plain"/> <p tal:content="context/status/plain"/>
skipping to change at line 3360 skipping to change at line 3623
actions.Action.hasPermission = self.hasPermission actions.Action.hasPermission = self.hasPermission
_HTMLItem.is_edit_ok = self.e1 _HTMLItem.is_edit_ok = self.e1
HTMLProperty.is_edit_ok = self.e2 HTMLProperty.is_edit_ok = self.e2
if os.path.exists(self.SENDMAILDEBUG): if os.path.exists(self.SENDMAILDEBUG):
#os.remove(self.SENDMAILDEBUG) #os.remove(self.SENDMAILDEBUG)
pass pass
def testInnerMain(self): def testInnerMain(self):
cl = self.client cl = self.client
cl.session_api = MockNull(_sid="1234567890") cl.session_api = MockNull(_sid="1234567890")
self.form ['@nonce'] = anti_csrf_nonce(cl, cl) self.form ['@nonce'] = anti_csrf_nonce(cl)
cl.form = makeForm(self.form) cl.form = makeForm(self.form)
# inner_main will re-open the database! # inner_main will re-open the database!
# Note that in the template above, the rendering of the # Note that in the template above, the rendering of the
# context/submit button will also call anti_csrf_nonce which # context/submit button will also call anti_csrf_nonce which
# does a commit of the otk to the database. # does a commit of the otk to the database.
cl.inner_main() cl.inner_main()
cl.db.close() cl.db.close()
print self.out print(self.out)
# Make sure the action was called # Make sure the action was called
self.assertEqual(SpecialAction.x, True) self.assertEqual(SpecialAction.x, True)
# Check that the Reject worked: # Check that the Reject worked:
self.assertNotEqual(-1, self.out[0].index('REJECT TITLE CHANGE')) self.assertNotEqual(-1, self.out[0].index('REJECT TITLE CHANGE'))
# Re-open db # Re-open db
self.db.close() self.db.close()
self.db = self.instance.open ('admin') self.db = self.instance.open ('admin')
# We shouldn't see any changes # We shouldn't see any changes
self.assertEqual(self.db.issue.get(self.issue, 'title'), 'hello') self.assertEqual(self.db.issue.get(self.issue, 'title'), 'hello')
self.assertEqual(self.db.issue.get(self.issue, 'status'), '1') self.assertEqual(self.db.issue.get(self.issue, 'status'), '1')
 End of changes. 87 change blocks. 
165 lines changed or deleted 432 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)