"Fossies" - the Fresh Open Source Software Archive

Member "LinOTP-release-2.11/linotpd/src/linotp/lib/tools/import_user/__init__.py" (12 Nov 2019, 8590 Bytes) of package /linux/misc/LinOTP-release-2.11.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file.

    1 # -*- coding: utf-8 -*-
    2 #
    3 #    LinOTP - the open source solution for two factor authentication
    4 #    Copyright (C) 2010 - 2019 KeyIdentity GmbH
    5 #
    6 #    This file is part of LinOTP server.
    7 #
    8 #    This program is free software: you can redistribute it and/or
    9 #    modify it under the terms of the GNU Affero General Public
   10 #    License, version 3, as published by the Free Software Foundation.
   11 #
   12 #    This program is distributed in the hope that it will be useful,
   13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
   14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   15 #    GNU Affero General Public License for more details.
   16 #
   17 #    You should have received a copy of the
   18 #               GNU Affero General Public License
   19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
   20 #
   21 #
   22 #    E-mail: linotp@keyidentity.com
   23 #    Contact: www.linotp.org
   24 #    Support: www.keyidentity.com
   25 #
   26 
   27 """
   28 import user tool -
   29   import a csv file of users
   30 
   31 if called from within LinOTP
   32 
   33 import linotp.model.meta
   34 Session = linotp.model.meta.Session
   35 Engine = meta.engine
   36 
   37 """
   38 import csv
   39 import json
   40 
   41 import logging
   42 from linotp.lib.tools.import_user.ImportHandler import ImportHandler
   43 
   44 
   45 log = logging.getLogger(__name__)
   46 
   47 
   48 class FormatReader(object):
   49     """
   50     support for special csv formats
   51     """
   52     pass
   53 
   54 
   55 class DefaultFormatReader(FormatReader):
   56 
   57     delimiter = ','
   58     quotechar = '"'
   59 
   60     @classmethod
   61     def prepare_row(cls, row):
   62         return row
   63 
   64 
   65 class PasswdFormatReader(FormatReader):
   66 
   67     delimiter = ':'
   68     quotechar = '"'
   69 
   70     @classmethod
   71     def prepare_row(cls, row):
   72 
   73         if len(row) < 5:
   74             return row
   75 
   76         ext_row = row[0:4]
   77 
   78         # support for extend password format, which contains email++
   79 
   80         if ',' in row[4]:
   81             attr = row[4].split(',')
   82             if len(attr) < 5:
   83                 attr.append("")
   84         else:
   85             attr = ',,,,'.split(',')
   86             attr[0] = row[4]
   87 
   88         # now split the name into surname and lastname
   89 
   90         if ' ' in attr[0]:
   91             ext_name = attr[0].split(' ', 1)
   92         else:
   93             ext_name = (attr[0] + " ").split(' ', 1)
   94 
   95         # finally we concat all
   96 
   97         ext_row.extend(ext_name)
   98         ext_row.extend(attr[1:])
   99         ext_row.extend(row[5:])
  100 
  101         return ext_row
  102 
  103 
  104 class UserImport(object):
  105 
  106     def __init__(self, ImportHandler):
  107 
  108         self.user_column_mapping = {}
  109         self.import_handler = ImportHandler
  110         self.encoding = 'UTF-8'
  111 
  112     def set_mapping(self, mapping):
  113         self.user_column_mapping = mapping
  114 
  115     def get_users_from_data(self, csv_data, format_reader,
  116                             passwords_in_plaintext=False):
  117         """
  118         for each row
  119         - iterate over all available database columns and
  120         - check if there is a column for this in the csv data
  121 
  122         and add the group identifier
  123 
  124         """
  125 
  126         reader = csv.reader(csv_data.split('\n'),
  127                             delimiter=format_reader.delimiter,
  128                             quotechar=format_reader.quotechar)
  129 
  130         for row in reader:
  131 
  132             if not row:
  133                 continue
  134 
  135             row = format_reader.prepare_row(row)
  136 
  137             user = self.import_handler.User()
  138 
  139             for entry in self.import_handler.User.user_entries:
  140 
  141                 value = ""
  142                 column_id = self.user_column_mapping.get(entry, -1)
  143 
  144                 if column_id == -1 or column_id >= len(row):
  145                     continue
  146 
  147                 # as the csv converter does not support unicode
  148                 # we have to decode the data
  149 
  150                 value = row[column_id].decode(self.encoding)
  151 
  152                 user.set(entry, value)
  153 
  154                 if entry == 'password' and passwords_in_plaintext:
  155                     user.creat_password_hash(row[column_id])
  156 
  157             yield user
  158 
  159     def import_csv_users(self, csv_data, dryrun=False,
  160                          format_reader=DefaultFormatReader,
  161                          passwords_in_plaintext=False):
  162         """
  163         insert and update users
  164 
  165         update of users is done in 2 steps
  166 
  167         0. get a list of all former stored userid
  168         1. insert all csv data, either update or create
  169         2. all former entries, which have not been update, will be removed
  170 
  171         """
  172         users_deleted = {}
  173         users_created = {}
  174         users_not_modified = {}
  175         users_modified = {}
  176 
  177         processed_users = {}
  178 
  179         former_user_by_id = self.import_handler.prepare()
  180 
  181         try:
  182 
  183             # -------------------------------------------------------------- --
  184 
  185             # finally remove all former, not updated users
  186             # update or insert all user from the csv data
  187 
  188             for user in self.get_users_from_data(
  189                              csv_data,
  190                              format_reader,
  191                              passwords_in_plaintext=passwords_in_plaintext):
  192 
  193                 # only store valid users that have a userid and a username
  194                 if not user.userid or not user.username:
  195                     continue
  196 
  197                 # prevent processing user multiple times
  198                 if (user.userid in processed_users.keys() or
  199                     user.username in processed_users.values()):
  200                     raise Exception("Violation of unique constraint - "
  201                                     "duplicate user in data: %r" % user)
  202                 else:
  203                     processed_users[user.userid] = user.username
  204 
  205                 # search for the user
  206 
  207                 former_user = self.import_handler.lookup(user)
  208 
  209                 # if it does not exist we create a new one
  210 
  211                 if not former_user:
  212                     users_created[user.userid] = user.username
  213                     if not dryrun:
  214                         self.import_handler.add(user)
  215 
  216                 else:
  217 
  218                     if former_user.userid in former_user_by_id:
  219                         del former_user_by_id[former_user.userid]
  220 
  221                     if former_user == user:
  222                         users_not_modified[user.userid] = user.username
  223                     else:
  224                         users_modified[user.userid] = user.username
  225                         if not dryrun:
  226                             self.import_handler.update(former_user, user)
  227 
  228             # -------------------------------------------------------------- --
  229 
  230             # finally remove all former, not updated users
  231 
  232             for del_userid, del_user_name in former_user_by_id.items():
  233                 users_deleted[del_userid] = del_user_name
  234                 if not dryrun:
  235                     self.import_handler.delete_by_id(del_userid)
  236 
  237             result = {
  238                 'created': users_created,
  239                 'updated': users_not_modified,
  240                 'modified': users_modified,
  241                 'deleted': users_deleted,
  242                 }
  243 
  244             if not dryrun:
  245                 self.import_handler.commit()
  246 
  247             return result
  248 
  249         except Exception as exx:
  250 
  251             self.import_handler.rollback()
  252             log.exception(exx)
  253             raise exx
  254 
  255         finally:
  256             self.import_handler.close()
  257 
  258 # ------------------------------------------------------------------------- --
  259 
  260 
  261 def main():
  262 
  263     from linotp.lib.tools.import_user.SQLImportHandler import Shell_DatabaseContext
  264     from linotp.lib.tools.import_user.SQLImportHandler import SQLImportHandler
  265 
  266     # in the test main() we use a password file, which is prepared
  267     # for splitting the description fields into csv data
  268 
  269     with open("/linotp/def-passwd", "r") as f:
  270         csv_data = f.read()
  271 
  272     user_column_map = {
  273             "userid": 2,
  274             "username": 0,
  275             "phone": 8,
  276             "mobile": 7,
  277             "email": 9,
  278             "surname": 5,
  279             "givenname": 4,
  280             "password": 1}
  281 
  282     sql_url = 'postgres://otpd:linotp2d@localhost/otpdb'
  283     shell_db_context = Shell_DatabaseContext(sql_url=sql_url)
  284 
  285     import_handler = SQLImportHandler(
  286                                  groupid="Hello",
  287                                  resolver_name="TestResolver",
  288                                  database_context=shell_db_context)
  289 
  290     user_import = UserImport(import_handler)
  291 
  292     user_import.set_mapping(user_column_map)
  293 
  294     result = user_import.import_csv_users(
  295                                 csv_data,
  296                                 format_reader=PasswdFormatReader())
  297 
  298     print(result)
  299 
  300     return
  301 
  302 if __name__ == "__main__":
  303 
  304     main()