"Fossies" - the Fresh Open Source Software Archive

Member "LinOTP-release-2.10.5.2/linotpd/src/linotp/tests/load/test_ocra_p.py" (13 May 2019, 35994 Bytes) of package /linux/misc/LinOTP-release-2.10.5.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file.

    1 # -*- coding: utf-8 -*-
    2 #
    3 #    LinOTP - the open source solution for two factor authentication
    4 #    Copyright (C) 2010 - 2019 KeyIdentity GmbH
    5 #
    6 #    This file is part of LinOTP server.
    7 #
    8 #    This program is free software: you can redistribute it and/or
    9 #    modify it under the terms of the GNU Affero General Public
   10 #    License, version 3, as published by the Free Software Foundation.
   11 #
   12 #    This program is distributed in the hope that it will be useful,
   13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
   14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   15 #    GNU Affero General Public License for more details.
   16 #
   17 #    You should have received a copy of the
   18 #               GNU Affero General Public License
   19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
   20 #
   21 #
   22 #    E-mail: linotp@keyidentity.com
   23 #    Contact: www.linotp.org
   24 #    Support: www.keyidentity.com
   25 #
   26 """ paralell test"""
   27 
   28 import logging
   29 import unittest
   30 import binascii
   31 import base64
   32 import random
   33 import time
   34 import sys
   35 
   36 from linotp.lib.ext.pbkdf2 import PBKDF2
   37 
   38 from Cryptodome.Hash import HMAC
   39 from Cryptodome.Hash import SHA as SHA1
   40 from Cryptodome.Hash import SHA256 as SHA256
   41 
   42 from linotp.tokens.ocra import OcraSuite
   43 from linotp.lib.crypto import kdf2, createActivationCode, check
   44 from linotp.lib.crypto import geturandom
   45 from linotp.lib.crypto import encrypt, decrypt
   46 
   47 from linotp.lib.crypto import check
   48 
   49 from paste.registry import RegistryManager, StackedObjectProxy
   50 myglobal = StackedObjectProxy()
   51 
   52 
   53 from linotp.tests import *
   54 from linotp.lib.selftest import isSelfTest
   55 from simplejson import loads
   56 
   57 from datetime import datetime
   58 from datetime import timedelta
   59 import time
   60 import os
   61 
   62 try:
   63     import json
   64 except ImportError:
   65     import simplejson as json
   66 
   67 from urlparse import urlparse
   68 from urlparse import parse_qs
   69 
   70 import unittest
   71 import binascii
   72 
   73 
   74 import logging
   75 log = logging.getLogger(__name__)
   76 
   77 
   78 class OcraOtp(object):
   79 
   80     def __init__(self, ocrapin=None):
   81         self.ocra = None
   82         self.bkey = None
   83         self.ocrapin = ocrapin
   84         self.activationkey = None
   85         self.sharedsecret = None
   86         self.ocrasuite = None
   87         self.serial = None
   88         self.counter = 0
   89 
   90 
   91     def init_1(self, response):
   92         ''' take the response of the first init to setup the OcraOtp'''
   93 
   94         jresp = json.loads(response.body)
   95         app_import = str(jresp.get('detail').get('app_import'))
   96         self.sharedsecret = str(jresp.get('detail').get('sharedsecret'))
   97         self.serial = str(jresp.get('detail').get('serial'))
   98 
   99         ''' now parse the appurl for the ocrasuite '''
  100         uri = urlparse(app_import.replace('lseqr://', 'http://'))
  101         qs = uri.query
  102         qdict = parse_qs(qs)
  103 
  104         ocrasuite = qdict.get('os', None)
  105         if ocrasuite is not None and len(ocrasuite) > 0:
  106             ocrasuite = ocrasuite[0]
  107 
  108         self.ocrasuite = ocrasuite
  109 
  110         return (self.ocrasuite, self.sharedsecret, self.serial)
  111 
  112 
  113     def init_2(self, response, activationKey):
  114         self.activationkey = activationKey
  115 
  116         jresp = json.loads(response.body)
  117         self.nonce = str(jresp.get('detail').get('nonce'))
  118         self.transid = str(jresp.get('detail').get('transactionid'))
  119         app_import = str(jresp.get('detail').get('app_import'))
  120 
  121 
  122         ''' now parse the appurl for the ocrasuite '''
  123         uri = urlparse(app_import.replace('lseqr://', 'http://'))
  124         qs = uri.query
  125         qdict = parse_qs(qs)
  126         nonce = qdict.get('no', None)
  127         if nonce is not None and len(nonce) > 0:
  128             nonce = nonce[0]
  129 
  130         challenge = qdict.get('ch', None)
  131         if challenge is not None and len(challenge) > 0:
  132             challenge = challenge[0]
  133 
  134         self.challenge = challenge
  135         self.ocra = None
  136         self.bkey = None
  137 
  138         return (self.challenge, self.transid)
  139 
  140 
  141     def _setup_(self):
  142 
  143         if self.ocra is not None and self.bkey is not None:
  144             return
  145 
  146         key_len = 20
  147         if self.ocrasuite.find('-SHA256'):
  148             key_len = 32
  149         elif self.ocrasuite.find('-SHA512'):
  150             key_len = 64
  151 
  152         self.bkey = kdf2(self.sharedsecret, self.nonce, self.activationkey, len=key_len)
  153         self.ocra = OcraSuite(self.ocrasuite)
  154 
  155         self.counter = 0
  156 
  157         return
  158 
  159     def callcOtp(self, challenge=None, ocrapin=None, counter= -1):
  160 
  161         if self.ocra is None:
  162             self._setup_()
  163 
  164         if ocrapin is None:
  165             ocrapin = self.ocrapin
  166 
  167         if challenge is None:
  168             challenge = self.challenge
  169         if counter == -1:
  170             counter = self.counter
  171 
  172         param = {}
  173         param['C'] = counter
  174         param['Q'] = challenge
  175         param['P'] = ocrapin
  176         param['S'] = ''
  177         if self.ocra.T is not None:
  178             '''    Default value for G is 1M, i.e., time-step size is one minute and the
  179                    T represents the number of minutes since epoch time [UT].
  180             '''
  181             now = datetime.now()
  182             stime = now.strftime("%s")
  183             itime = int(stime)
  184             param['T'] = itime
  185 
  186         data = self.ocra.combineData(**param)
  187         otp = self.ocra.compute(data, self.bkey)
  188 
  189         if counter == -1:
  190             self.counter += 1
  191 
  192         return otp
  193 
  194 
  195 import threading
  196 
  197 class doRequest(threading.Thread):
  198 
  199     def __init__ (self, utest, rid=1, test=None):
  200 
  201         threading.Thread.__init__(self)
  202 
  203         ## unit test obj
  204         self.utest = utest
  205         self.test_name = test
  206 
  207         ## the identificator
  208         self.rid = rid
  209         self.response = None
  210 
  211     def run(self):
  212         if hasattr(self.utest, self.test_name):
  213             self.response = getattr(self.utest, self.test_name)(self.rid)
  214         return
  215 
  216     def status(self):
  217         res = '"status": true,' in self.response
  218         return res
  219 
  220     def stat(self):
  221         return (self.rid, self.response)
  222 
  223 
  224 def genUrl(controller='admin', action='init'):
  225     return '/%s/%s' % (controller, action)
  226 
  227 
  228 class OcraTest(TestController):
  229 
  230 
  231     fkey = 'a74f89f9251eda9a5d54a9955be4569f9720abe8'.decode('hex')
  232     key20h = '3132333435363738393031323334353637383930'
  233     key20 = key20h.decode('hex')
  234 
  235     key32h = '3132333435363738393031323334353637383930313233343536373839303132'
  236     key32 = key32h.decode('hex')
  237     key64h = '31323334353637383930313233343536373839303132333435363738393031323\
  238 334353637383930313233343536373839303132333435363738393031323334'
  239     key64 = key64h.decode('hex')
  240 
  241     pin = '1234'
  242     pin_sha1 = '7110eda4d09e062aa5e4a390b0a572ac0d2c0220'.decode('hex')
  243 
  244     testsnp = [ { 'ocrasuite': 'OCRA-1:HOTP-SHA1-6:QN08',
  245                 'key': key20,
  246                 'keyh': key20h,
  247                 'vectors': [
  248                     {'params': { 'Q': '00000000' }, 'result': '237653' },
  249                     {'params': { 'Q': '11111111' }, 'result': '243178' },
  250                     {'params': { 'Q': '22222222' }, 'result': '653583' },
  251                     {'params': { 'Q': '33333333' }, 'result': '740991' },
  252                     {'params': { 'Q': '44444444' }, 'result': '608993' },
  253                     {'params': { 'Q': '55555555' }, 'result': '388898' },
  254                     {'params': { 'Q': '66666666' }, 'result': '816933' },
  255                     {'params': { 'Q': '77777777' }, 'result': '224598' },
  256                     {'params': { 'Q': '88888888' }, 'result': '750600' },
  257                     {'params': { 'Q': '99999999' }, 'result': '294470' }
  258                 ]
  259               },
  260               { 'ocrasuite': 'OCRA-1:HOTP-SHA512-8:C-QN08',
  261                 'key': key64,
  262                 'keyh': key64h,
  263                 'vectors': [
  264                     {'params': { 'C': '00000', 'Q': '00000000' }, 'result': '07016083' },
  265                     {'params': { 'C': '00001', 'Q': '11111111' }, 'result': '63947962' },
  266                     {'params': { 'C': '00002', 'Q': '22222222' }, 'result': '70123924' },
  267                     {'params': { 'C': '00003', 'Q': '33333333' }, 'result': '25341727' },
  268                     {'params': { 'C': '00004', 'Q': '44444444' }, 'result': '33203315' },
  269                     {'params': { 'C': '00005', 'Q': '55555555' }, 'result': '34205738' },
  270                     {'params': { 'C': '00006', 'Q': '66666666' }, 'result': '44343969' },
  271                     {'params': { 'C': '00007', 'Q': '77777777' }, 'result': '51946085' },
  272                     {'params': { 'C': '00008', 'Q': '88888888' }, 'result': '20403879' },
  273                     {'params': { 'C': '00009', 'Q': '99999999' }, 'result': '31409299' }
  274                 ]
  275               },
  276               { 'ocrasuite': 'OCRA-1:HOTP-SHA512-8:QN08-T1M',
  277                 'key': key64,
  278                 'keyh': key64h,
  279                 'vectors': [
  280                     {'params': { 'Q': '00000000', 'T_precomputed': int('132d0b6', 16) },
  281                         'result': '95209754' },
  282                     {'params': { 'Q': '11111111', 'T_precomputed': int('132d0b6', 16) },
  283                         'result': '55907591' },
  284                     {'params': { 'Q': '22222222', 'T_precomputed': int('132d0b6', 16) },
  285                         'result': '22048402' },
  286                     {'params': { 'Q': '33333333', 'T_precomputed': int('132d0b6', 16) },
  287                         'result': '24218844' },
  288                     {'params': { 'Q': '44444444', 'T_precomputed': int('132d0b6', 16) },
  289                         'result': '36209546' },
  290                 ]
  291               },
  292             ]
  293 
  294 
  295 
  296     tests = [ { 'ocrasuite': 'OCRA-1:HOTP-SHA1-6:QN08',
  297                 'key': key20,
  298                 'keyh': key20h,
  299                 'vectors': [
  300                     {'params': { 'Q': '00000000' }, 'result': '237653' },
  301                     {'params': { 'Q': '11111111' }, 'result': '243178' },
  302                     {'params': { 'Q': '22222222' }, 'result': '653583' },
  303                     {'params': { 'Q': '33333333' }, 'result': '740991' },
  304                     {'params': { 'Q': '44444444' }, 'result': '608993' },
  305                     {'params': { 'Q': '55555555' }, 'result': '388898' },
  306                     {'params': { 'Q': '66666666' }, 'result': '816933' },
  307                     {'params': { 'Q': '77777777' }, 'result': '224598' },
  308                     {'params': { 'Q': '88888888' }, 'result': '750600' },
  309                     {'params': { 'Q': '99999999' }, 'result': '294470' }
  310                 ]
  311               },
  312               { 'ocrasuite': 'OCRA-1:HOTP-SHA256-8:C-QN08-PSHA1',
  313                 'key': key32,
  314                 'keyh': key32h,
  315                 'vectors': [
  316                     {'params': { 'C': 0, 'Q': '12345678' }, 'result': '65347737' },
  317                     {'params': { 'C': 1, 'Q': '12345678' }, 'result': '86775851' },
  318                     {'params': { 'C': 2, 'Q': '12345678' }, 'result': '78192410' },
  319                     {'params': { 'C': 3, 'Q': '12345678' }, 'result': '71565254' },
  320                     {'params': { 'C': 4, 'Q': '12345678' }, 'result': '10104329' },
  321                     {'params': { 'C': 5, 'Q': '12345678' }, 'result': '65983500' },
  322                     {'params': { 'C': 6, 'Q': '12345678' }, 'result': '70069104' },
  323                     {'params': { 'C': 7, 'Q': '12345678' }, 'result': '91771096' },
  324                     {'params': { 'C': 8, 'Q': '12345678' }, 'result': '75011558' },
  325                     {'params': { 'C': 9, 'Q': '12345678' }, 'result': '08522129' }
  326                 ]
  327               },
  328              { 'ocrasuite': 'OCRA-1:HOTP-SHA256-8:QN08-PSHA1',
  329                 'key': key32,
  330                 'keyh': key32h,
  331                 'vectors': [
  332                     {'params': { 'Q': '00000000' }, 'result': '83238735' },
  333                     {'params': { 'Q': '11111111' }, 'result': '01501458' },
  334                     {'params': { 'Q': '22222222' }, 'result': '17957585' },
  335                     {'params': { 'Q': '33333333' }, 'result': '86776967' },
  336                     {'params': { 'Q': '44444444' }, 'result': '86807031' }
  337                 ]
  338               },
  339 
  340               {'ocrasuite': 'OCRA-1:HOTP-SHA512-8:C-QN08',
  341                 'key': key64,
  342                 'keyh': key64h,
  343                 'vectors': [
  344                     {'params': { 'C': '00000', 'Q': '00000000' }, 'result': '07016083' },
  345                     {'params': { 'C': '00001', 'Q': '11111111' }, 'result': '63947962' },
  346                     {'params': { 'C': '00002', 'Q': '22222222' }, 'result': '70123924' },
  347                     {'params': { 'C': '00003', 'Q': '33333333' }, 'result': '25341727' },
  348                     {'params': { 'C': '00004', 'Q': '44444444' }, 'result': '33203315' },
  349                     {'params': { 'C': '00005', 'Q': '55555555' }, 'result': '34205738' },
  350                     {'params': { 'C': '00006', 'Q': '66666666' }, 'result': '44343969' },
  351                     {'params': { 'C': '00007', 'Q': '77777777' }, 'result': '51946085' },
  352                     {'params': { 'C': '00008', 'Q': '88888888' }, 'result': '20403879' },
  353                     {'params': { 'C': '00009', 'Q': '99999999' }, 'result': '31409299' }
  354                 ]
  355               },
  356               { 'ocrasuite': 'OCRA-1:HOTP-SHA512-8:QN08-T1M',
  357                 'key': key64,
  358                 'keyh': key64h,
  359                 'vectors': [
  360                     {'params': { 'Q': '00000000', 'T_precomputed': int('132d0b6', 16) },
  361                         'result': '95209754' },
  362                     {'params': { 'Q': '11111111', 'T_precomputed': int('132d0b6', 16) },
  363                         'result': '55907591' },
  364                     {'params': { 'Q': '22222222', 'T_precomputed': int('132d0b6', 16) },
  365                         'result': '22048402' },
  366                     {'params': { 'Q': '33333333', 'T_precomputed': int('132d0b6', 16) },
  367                         'result': '24218844' },
  368                     {'params': { 'Q': '44444444', 'T_precomputed': int('132d0b6', 16) },
  369                         'result': '36209546' },
  370                 ]
  371               },
  372             ]
  373 
  374     def setUp(self):
  375         TestController.setUp(self)
  376         self.removeTokens()
  377         self.setupPolicies()
  378         self.sqlconnect = self.appconf.get('sqlalchemy.url')
  379         # sys.argv[1] = 'arg'
  380         # del sys.argv[2] # remember that -s is in sys.argv[2], see below
  381         print sys.argv
  382 
  383         self.runs = 5
  384         self.threads = 6
  385 
  386         for arg in sys.argv:
  387             if arg.startswith('ptest'):
  388                 k = arg.split('=')
  389                 if k[0] == 'ptest.threads':
  390                     self.threads = int(k[1])
  391                 if k[0] == 'ptest.runs':
  392                     self.runs = int(k[1])
  393         return
  394 
  395     def tearDown(self):
  396         return
  397 
  398     def setupPolicies(self, check_url='http://127.0.0.1/ocra/check_t'):
  399 
  400         params = {
  401                 'name'  :   'CheckURLPolicy',
  402                 'scope' :   'authentication',
  403                 'realm' :   'mydefrealm',
  404         }
  405         params['action'] = 'qrtanurl=%s' % (str(check_url))
  406         response = self.app.get(genUrl(controller='system', action='setPolicy'), params=params)
  407 
  408         return response
  409 
  410     def check_otp(self, transid, otp, pin='pin'):
  411         ''' -3.a- verify the otp value to finish the rollout '''
  412         parameters = {"transactionid"   : transid, "pass" : '' + pin + otp }
  413         response = self.app.get(genUrl(controller='ocra', action='check_t'), params=parameters)
  414         return response
  415 
  416     def gen_challenge_data(self):
  417 
  418         testchall = [ { 'ocrasuite': 'OCRA-1:HOTP-SHA256-6:C-QA64',
  419             'key': '12345678901234567890',
  420             "app_import1": "lseqr://init?sh=12345678901234567890&os=OCRA-1%3AHOTP-SHA256-6%3AC-QA64&se=LSOC00000001",
  421             "app_import2": "lseqr://nonce?me=abcdefg+1234567+%2B-%2A%23+%C3%B6%C3%A4%C3%BC%C3%9F&ch=abcdefg12345670000Xch3tNAkIWpmj6du0PVBSvFOmJqWu0wq9AL9BKYxGjGkVg&no=492321549d56446d31682adabe64efc4bc6d7f0e31202ebdd75335b550a87690a1a3fcafc9e52a04e4dde40dea5634ad0c7becfe9d3961690b95d135844b866d&tr=954472011597&u=http%253A%252F%252F127.0.0.1%252Focra%252Fcheck_t&se=LSOC00000001&si=790eb52b398c5b37aaeba56b374947e0b3193ff98e2553c04ac15ae49440abb9",
  422             'vectors': [
  423                 { 'param' : { 'data':'irgendwas'} , 'otp' : '12345'},
  424                 { 'param' : { 'data':'DasisteinTest'}, 'otp' : '12345' },
  425                 { 'param' : { 'data':'Irgendwas'}, 'otp' : '12345' },
  426                 { 'param' : { 'data':'1234567890123'}, 'otp' : '12345' },
  427                 { 'param' : { 'data':'Dasisteintest'}, 'otp' : '12345' },
  428                 { 'param' : { 'data':'Dasisteintest'}, 'otp' : '12345' },
  429                 { 'param' : { 'data':'Dasist'}, 'otp' : '12345' },
  430                 { 'param' : { 'data':'EinTestdasist'}, 'otp' : '12345' },
  431                 { 'param' : { 'data':'ss'}, 'otp' : '12345' },
  432                 { 'param' : { 'data':'SS'}, 'otp' : '12345' },
  433                 { 'param' : { 'data':'SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS'}, 'otp' : '12345' },
  434                 { 'param' : { 'data':'DasisteinTExt'}, 'otp' : '12345' },
  435                 { 'param' : { 'data':'Das'}, 'otp' : '12345' },
  436                 { 'param' : { 'data':'EinLeerzeichen'}, 'otp' : '12345' },
  437                 { 'param' : { 'data':'Ein Leerzeichen'}, 'otp' : '12345' },
  438                 { 'param' : { 'data':'Ein+Leerzeichen'}, 'otp' : '12345' },
  439             ]
  440           }, ]
  441 
  442 
  443         self.setupPolicies(check_url='https://ebanking.1882.de')
  444 
  445         tt = []
  446         for test in testchall:
  447             testdata = {}
  448 
  449             ocra = OcraOtp()
  450             response1 = self.init_0_QR_Token(user='root', ocrasuite=test['ocrasuite'])
  451             ocra.init_1(response1)
  452 
  453             jresp = json.loads(response1.body)
  454             app_import_1 = str(jresp.get('detail').get('app_import'))
  455 
  456             message = 'abc'
  457             (response2, activationkey) = self.init_1_QR_Token(user='root', message=message, activationkey='GEZDGNBVGY3TQOJQ01', ocrasuite=test['ocrasuite'])
  458             (challenge, transid) = ocra.init_2(response2, activationkey)
  459 
  460             jresp = json.loads(response2.body)
  461             app_import_2 = str(jresp.get('detail').get('app_import'))
  462 
  463             testdata['ocrasuite'] = ocra.ocrasuite
  464             testdata['nonce'] = ocra.nonce
  465             testdata['activationcode'] = ocra.activationkey
  466             testdata['sharedsecret'] = ocra.sharedsecret
  467             testdata['app_import_1'] = app_import_1
  468             testdata['app_import_2'] = app_import_2
  469 
  470             counter = 0
  471             ''' finish rollout '''
  472             otp = ocra.callcOtp(challenge, counter=counter)
  473 
  474             bkey = ocra.bkey
  475             key = binascii.hexlify(bkey)
  476             testdata['key'] = key
  477 
  478             response = self.check_otp(transid, otp)
  479             assert '"result": true' in response
  480 
  481 
  482             testv = []
  483 
  484             ''' initial challenge '''
  485             test_set = {}
  486             test_set['message'] = message
  487             test_set['data'] = app_import_2
  488             test_set['challenge'] = challenge
  489             test_set['otp'] = otp
  490             testv.append(test_set)
  491 
  492 
  493 
  494             for v in test.get('vectors'):
  495                 param = v.get('param')
  496                 ''' get next challenge'''
  497                 (response, challenge, transid) = self.get_challenge(ocra.serial, challenge_data=param.get('data'))
  498                 jresp = json.loads(response.body)
  499                 app_import = str(jresp.get('detail').get('data'))
  500                 challenge = str(jresp.get('detail').get("challenge"))
  501 
  502                 counter += 1
  503                 otp = ocra.callcOtp(challenge, counter=counter)
  504 
  505                 ''' correct response '''
  506                 response = self.check_otp(transid, otp)
  507                 assert '"result": true' in response
  508 
  509                 ''' push test data in our test set'''
  510                 test_set = {}
  511                 test_set['message'] = param.get('data')
  512                 test_set['data'] = app_import
  513                 test_set['challenge'] = challenge
  514                 test_set['otp'] = otp
  515                 testv.append(test_set)
  516 
  517             testdata['vectors'] = testv
  518             tt.append(testdata)
  519 
  520 
  521         self.removeTokens(serial=ocra.serial)
  522 
  523         f = open('/tmp/challengeTestSet', 'w+')
  524         testStr = json.dumps(tt, indent=4)
  525         f.write(testStr)
  526         f.close()
  527 
  528         return
  529 
  530     def randOTP(self, otp):
  531         ''' randomly change the chars in an otp - to gen a wron otp '''
  532         rotp = otp
  533         lenotp = len(str(otp))
  534         if lenotp > 1:
  535             while rotp == otp:
  536                 for i in range(0, 3):
  537                     idx1 = random.randint(0, lenotp - 1)
  538                     idx2 = random.randint(0, lenotp - 1)
  539                     if idx1 != idx2:
  540                         c1 = rotp[idx1]
  541                         c2 = rotp[idx2]
  542                         rotp = rotp[:idx1] + c2 + rotp[idx1 + 1:]
  543                         rotp = rotp[:idx2] + c1 + rotp[idx2 + 1:]
  544         return rotp
  545 
  546     def init_0_QR_Token(self, tokentype='ocra', ocrapin='', pin='pin', user='root', description='QRTestToken',
  547                         serial='QR123', sharedsecret='1', genkey='1', otpkey=None, ocrasuite='OCRA-1:HOTP-SHA256-8:C-QA64'):
  548         ''' -1- create an ocra token '''
  549         parameters = {}
  550 
  551         if tokentype is not None:
  552             parameters['type'] = tokentype
  553 
  554         if pin is not None:
  555             parameters['pin'] = pin
  556 
  557         if genkey is not None:
  558             parameters['genkey'] = genkey
  559 
  560         if otpkey is not None:
  561             parameters['otpkey'] = otpkey
  562 
  563         if sharedsecret is not None:
  564             parameters['sharedsecret'] = sharedsecret
  565 
  566         if ocrapin is not None:
  567             parameters['ocrapin'] = ocrapin
  568 
  569         if ocrasuite is not None:
  570             parameters['ocrasuite'] = ocrasuite
  571 
  572         if user is not None:
  573             parameters['user'] = user
  574         elif serial is not None:
  575             parameters['serial'] = serial
  576 
  577 
  578         response = self.app.get(genUrl(controller='admin', action='init'), params=parameters)
  579         return response
  580 
  581 
  582     def init_1_QR_Token(self, activationkey=None, tokentype='ocra', serial=None, user=None, pin='pin', message='Message', ocrapin='', genkey='1', ocrasuite='OCRA-1:HOTP-SHA256-8:C-QA64'):
  583         ''' -2- acivate ocra token '''
  584         parameters = {}
  585 
  586         if tokentype is not None:
  587             parameters['type'] = tokentype
  588 
  589         if pin is not None:
  590             parameters['pin'] = pin
  591 
  592         if message is not None:
  593             parameters['message'] = message
  594 
  595         if genkey is not None:
  596             parameters['genkey'] = genkey
  597 
  598         if ocrapin is not None:
  599             parameters['ocrapin'] = ocrapin
  600 
  601 
  602         if user is not None:
  603             parameters['user'] = user
  604         elif serial is not None:
  605             parameters['serial'] = serial
  606 
  607         if activationkey is None:
  608             activationkey = createActivationCode('1234567890')
  609         parameters['activationcode'] = activationkey
  610 
  611         if ocrasuite is not None:
  612             parameters['ocrasuite'] = ocrasuite
  613 
  614         response = self.app.get(genUrl(controller='admin', action='init'), params=parameters)
  615         return (response, activationkey)
  616 
  617 
  618 
  619     def removeTokens(self, user=None, serial=None):
  620         serials = []
  621 
  622         if user is not None:
  623             p = {"user" : user }
  624             response = self.app.get(genUrl(controller='admin', action='remove'), params=p)
  625             log.info("response %s\n", response)
  626             assert '"value": 1' in response
  627 
  628         if serial is not None:
  629             p = {"serial" : serial }
  630             response = self.app.get(genUrl(controller='admin', action='remove'), params=p)
  631             log.info("response %s\n", response)
  632             assert '"value": 1' in response
  633 
  634 
  635         if serial is None and user is None:
  636             parameters = {}
  637             response = self.app.get(genUrl(controller='admin', action='show'), params=parameters)
  638             log.info("response %s\n", response)
  639             assert '"status": true' in response
  640 
  641             jresp = json.loads(response.body)
  642 
  643             d_root = jresp.get('result').get('value').get('data')
  644             for tok in d_root:
  645                 serial = tok.get("LinOtp.TokenSerialnumber")
  646                 serials.append(serial)
  647 
  648             for serial in  serials:
  649                 p = {"serial" : serial }
  650                 response = self.app.get(genUrl(controller='admin', action='remove'), params=p)
  651                 log.info("response %s\n", response)
  652                 assert '"value": 1' in response
  653 
  654 
  655 
  656     def _getChallenge(self, ocrasuite, bkey, serial, ocrapin='', data=None, count=0, ttime=None):
  657 
  658         otp1 = None
  659 
  660         p = {"serial"      : serial,
  661              "data"        : "0105037311 Konto 50150850 BLZ 1752,03 Eur"
  662             }
  663         if data is not None:
  664             p[data] = data
  665 
  666         response = self.app.get(genUrl(controller='ocra', action='request'), params=p)
  667         log.info("response %s\n", response)
  668         assert '"value": true' in response
  669 
  670         ''' -2b- from the response get the challenge '''
  671         jresp = json.loads(response.body)
  672         challenge1 = str(jresp.get('detail').get('challenge'))
  673         transid1 = str(jresp.get('detail').get('transactionid'))
  674 
  675 
  676         now = datetime.now()
  677         if ttime is not None:
  678             now = ttime
  679         stime = now.strftime("%s")
  680         itime = int(stime)
  681 
  682         param = {}
  683         param['C'] = count
  684         param['Q'] = challenge1
  685         param['P'] = ocrapin
  686         param['S'] = ''
  687         param['T'] = itime
  688 
  689         ocra = OcraSuite(ocrasuite)
  690         data = ocra.combineData(**param)
  691         otp1 = ocra.compute(data, bkey)
  692 
  693         return (otp1, transid1)
  694 
  695     def get_challenge(self, serial, user=None, challenge_data=None):
  696         p = {
  697 
  698              "data"        : challenge_data,
  699             }
  700         if user is None:
  701             p["serial"] = serial
  702         else:
  703             p["user"] = user
  704 
  705         response = self.app.get(genUrl(controller='ocra', action='request'), params=p)
  706         try:
  707             jresp = json.loads(response.body)
  708             challenge = str(jresp.get('detail').get('challenge'))
  709             transid = str(jresp.get('detail').get('transactionid'))
  710         except Exception, e:
  711             challenge = None
  712             transid = None
  713 
  714         return (response, challenge, transid)
  715 
  716     def createSpassToken(self, serial=None, user='root', pin='spass'):
  717         if serial is None:
  718             serial = "TSpass"
  719         parameters = {
  720                       "serial"      : serial,
  721                       "user"        : user,
  722                       "pin"         : pin,
  723                       "description" : "SpassToken",
  724                       "type"        : "spass"
  725                       }
  726 
  727         response = self.app.get(genUrl(controller='admin', action='init'), params=parameters)
  728         assert '"value": true' in response
  729         return serial
  730 
  731     def test_ocra_paralell(self):
  732 
  733         if 'sqlite' in self.sqlconnect:
  734             error = "This test will fail for sqlite db, as it does not support enough concurrency"
  735             assert ('sqlite' not in self.sqlconnect), error
  736 
  737         self.createPolicy_Super()
  738 
  739         if self.runs == -1:
  740             i = 0
  741             while(1 == 1):
  742                 i = i + 1
  743                 self._prun_(i, self.threads)
  744         else:
  745             for i in range(0, self.runs):
  746                 self._prun_(i, self.threads)
  747 
  748         return
  749 
  750     def _prun_(self, run_num, numthreads):
  751         '''
  752         worker method
  753         '''
  754         p_tests = []
  755 
  756         for _i in range(0, numthreads):
  757             p_test = doRequest(self, rid=_i,
  758                                test='ptest_OCRA_token_failcounterInc')
  759             p_tests.append(p_test)
  760             if environ.has_key('paste.registry'):
  761                 environ['paste.registry'].register(myglobal, p_test)
  762 
  763             #prevent that all threads start at same time:
  764             # this will cause an p_thread error within the odbc layer
  765             sleep = random.uniform(0.1, 0.3)
  766             time.sleep(sleep)
  767 
  768             p_test.start()
  769 
  770         ''' wait till all threads are completed '''
  771         for p_test in p_tests:
  772             p_test.join()
  773 
  774         log.debug(' %d Threads finished' % (numthreads))
  775     # as there are asserts in the test, the thread will stop
  776     # if something is wrong. So we don't care for the results
  777 
  778 
  779     def createPolicy_Super(self):
  780         '''
  781         Policy 01: create a policy for the superadmin
  782         '''
  783         parameters = { 'name' : 'ManageAll',
  784                        'scope' : 'admin',
  785                        'realm' : '*',
  786                        'action' : '*',
  787                        'user' : 'superadmin, Administrator',
  788                        'selftest_admin' : 'superadmin'
  789                       }
  790         response = self.app.get(url(controller='system', action='setPolicy'), params=parameters)
  791         log.error(response)
  792         assert '"status": true' in response
  793 
  794 
  795 ###############################################################################
  796 ## paralell test starts here
  797 ###############################################################################
  798 
  799     def ptest_OCRA_token_failcounterInc(self, tid=1):
  800         '''
  801             test_OCRA_token_failcounterInc: failcounter increment
  802 
  803             description:
  804                 for all ocrasuites:
  805                    create and enroll token
  806                    verify the first otp
  807                    get some challenges
  808                    4 times:
  809                       verify a wrong otp
  810                       verify a wrong transaction
  811                       check status and if fail counter has incremented
  812         '''
  813         tcount = 0
  814         for test in self.tests:
  815             ocrasuite = test['ocrasuite']
  816             key = test['keyh']
  817             bkey = test['key']
  818             ocrapin = 'myocrapin'
  819             tid = tid
  820             serial = "QR_One_%r_%r_%r_%r" % (tid, tcount, int(time.time()), random.randint(0, 100))
  821             log.info("## serial: %s" % serial)
  822             count = 0
  823             tcount = tcount + 1
  824 
  825             ocra = OcraSuite(ocrasuite)
  826             pinlen = ocra.truncation
  827             ''' -1- create an ocra token '''
  828             parameters = {
  829                           "serial"      : serial,
  830                           "user"        : "root",
  831                           "pin"         : "pin",
  832                           "description" : "first QRToken",
  833                           'type'        : 'ocra',
  834                           'ocrapin'     : ocrapin,
  835                           'otpkey'      : key,
  836                           'ocrasuite'   : ocrasuite
  837                           }
  838 
  839             response = self.app.get(genUrl(controller='admin', action='init'), params=parameters)
  840             assert '"value": true' in response
  841 
  842             ## verify that the token is usable
  843             ''' -2- fetch the challenge '''
  844             p = {"serial"      : serial,
  845                  "data"        : "0105037311 Konto 50150850 BLZ 1752,03 Eur"
  846                 }
  847             response = self.app.get(genUrl(controller='ocra', action='request'), params=p)
  848             log.info("response %s\n", response)
  849             if '"value": true' not in response:
  850                 assert '"value": true' in response
  851 
  852             ''' -3.a- from the response get the challenge '''
  853             jresp = json.loads(response.body)
  854             challenge = str(jresp.get('detail').get('challenge'))
  855             transid = str(jresp.get('detail').get('transactionid'))
  856 
  857             param = {}
  858             param['C'] = count
  859             param['Q'] = challenge
  860             param['P'] = ocrapin
  861             param['S'] = ''
  862             if ocra.T is not None:
  863                 '''    Default value for G is 1M, i.e., time-step size is one minute and the
  864                        T represents the number of minutes since epoch time [UT].
  865                 '''
  866                 now = datetime.now()
  867                 stime = now.strftime("%s")
  868                 itime = int(stime)
  869                 param['T'] = itime
  870 
  871             ocra = OcraSuite(ocrasuite)
  872             data = ocra.combineData(**param)
  873             otp = ocra.compute(data, bkey)
  874 
  875             ppin = 'pin' + otp
  876 
  877             ''' -3.b- verify the correct otp value '''
  878             parameters = {"transactionid"   : transid,
  879                           "pass"            : ppin,
  880                           }
  881             response = self.app.get(genUrl(controller='ocra', action='check_t'), params=parameters)
  882             log.info("response %s\n", response)
  883             if '"result": true' not in response:
  884                 assert '"result": true' in response
  885 
  886             # verify that the failcounter increments (max is 10)
  887             fcount = 0
  888             for count in range(1, 3):
  889 
  890                 ## create more than one challenge
  891                 chals = random.randint(2, 5)
  892                 for cc in range(1, chals):
  893                     ''' -2- fetch the challenge '''
  894                     p = {"serial"      : serial,
  895                          "data"        : "0105037311 Konto 50150850 BLZ 1752,03 Eur"
  896                         }
  897                     response = self.app.get(genUrl(controller='ocra', action='request'), params=p)
  898                     log.info("response %s\n", response)
  899                     if '"value": true' not in response:
  900                         assert '"value": true' in response
  901 
  902 
  903                 ''' -3.a- from the response get the challenge '''
  904                 jresp = json.loads(response.body)
  905                 challenge = str(jresp.get('detail').get('challenge'))
  906                 transid = str(jresp.get('detail').get('transactionid'))
  907 
  908                 ppin = 'pin' + 'a' * pinlen
  909 
  910                 ''' -4- verify the wrong otp value '''
  911                 parameters = {"transactionid"   : transid,
  912                               "pass"            : ppin,
  913                               }
  914                 response = self.app.get(genUrl(controller='ocra', action='check_t'), params=parameters)
  915                 log.info("response %s\n", response)
  916                 if '"result": false' not in response:
  917                     assert '"result": false' in response
  918                 fcount += 1
  919 
  920                 ppin = 'pin' + '4' * pinlen
  921 
  922                 ''' -5- verify the wrong otp value '''
  923                 parameters = {"transactionid"   : transid,
  924                               "pass"            : ppin,
  925                               }
  926                 response = self.app.get(genUrl(controller='ocra', action='check_t'), params=parameters)
  927                 log.info("response %s\n", response)
  928                 if not '"result": false' in response:
  929                     assert '"result": false' in response
  930                 fcount += 1
  931 
  932                 ''' -6- check if the failcounter has incremented  '''
  933                 parameters = {"transactionid"   : transid,
  934                               }
  935                 response = self.app.get(genUrl(controller='ocra', action='checkstatus'), params=parameters)
  936                 log.info("response %s\n", response)
  937                 assert '"status": true' in response
  938                 assstring = '"failcount": %d,' % (fcount)
  939                 log.info("assert %s\n", assstring)
  940                 if assstring not in response:
  941                     log.error(response)
  942                     assert assstring in response
  943 
  944                 sleep = random.uniform(0.0, 0.3)
  945                 time.sleep(sleep)
  946 
  947             ''' -remove the ocra token '''
  948             parameters = {"serial"      : serial, }
  949             response = self.app.get(genUrl(controller='admin', action='remove'), params=parameters)
  950             log.info("response %s\n", response)
  951             assert '"value": 1' in response
  952 
  953             for _iii in range(0, 3):
  954                 parameters = {"serial"      : serial, }
  955                 response = self.app.get(genUrl(controller='admin', action='remove'), params=parameters)
  956 
  957 
  958         return response
  959