"Fossies" - the Fresh Open Source Software Archive 
Member "nss-pam-ldapd-0.9.12/pynslcd/pam.py" (15 Nov 2021, 14964 Bytes) of package /linux/privat/nss-pam-ldapd-0.9.12.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "pam.py" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
0.9.11_vs_0.9.12.
1
2 # pam.py - functions authentication, authorisation and session handling
3 #
4 # Copyright (C) 2010-2019 Arthur de Jong
5 #
6 # This library is free software; you can redistribute it and/or
7 # modify it under the terms of the GNU Lesser General Public
8 # License as published by the Free Software Foundation; either
9 # version 2.1 of the License, or (at your option) any later version.
10 #
11 # This library is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with this library; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301 USA
20
21 import logging
22 import random
23 import socket
24 import time
25
26 import ldap
27 from ldap.controls.ppolicy import PasswordPolicyControl, PasswordPolicyError
28 from ldap.filter import escape_filter_chars
29
30 import cfg
31 import common
32 import constants
33 import passwd
34 import search
35 import shadow
36
37
38 random = random.SystemRandom()
39
40
41 def authenticate(binddn, password):
42 # open a new connection
43 conn = search.Connection()
44 # bind using the specified credentials
45 serverctrls = []
46 if cfg.pam_authc_ppolicy:
47 serverctrls.append(PasswordPolicyControl())
48 res, data, msgid, ctrls = conn.simple_bind_s(binddn, password, serverctrls=serverctrls)
49 # go over bind result server controls
50 for ctrl in ctrls:
51 if ctrl.controlType == PasswordPolicyControl.controlType:
52 # found a password policy control
53 logging.debug(
54 'PasswordPolicyControl found: error=%s (%s), '
55 'timeBeforeExpiration=%s, graceAuthNsRemaining=%s',
56 'None' if ctrl.error is None else PasswordPolicyError(ctrl.error).prettyPrint(),
57 ctrl.error, ctrl.timeBeforeExpiration, ctrl.graceAuthNsRemaining)
58 if ctrl.error == 0: # passwordExpired
59 return (
60 conn, constants.NSLCD_PAM_AUTHTOK_EXPIRED,
61 PasswordPolicyError(ctrl.error).prettyPrint())
62 elif ctrl.error == 1: # accountLocked
63 return (
64 conn, constants.NSLCD_PAM_ACCT_EXPIRED,
65 PasswordPolicyError(ctrl.error).prettyPrint())
66 elif ctrl.error == 2: # changeAfterReset
67 return (
68 conn, constants.NSLCD_PAM_NEW_AUTHTOK_REQD,
69 'Password change is needed after reset')
70 elif ctrl.error:
71 return (
72 conn, constants.NSLCD_PAM_PERM_DENIED,
73 PasswordPolicyError(ctrl.error).prettyPrint())
74 elif ctrl.timeBeforeExpiration is not None:
75 return (
76 conn, constants.NSLCD_PAM_NEW_AUTHTOK_REQD,
77 'Password will expire in %d seconds' % ctrl.timeBeforeExpiration)
78 elif ctrl.graceAuthNsRemaining is not None:
79 return (
80 conn, constants.NSLCD_PAM_NEW_AUTHTOK_REQD,
81 'Password expired, %d grace logins left' % ctrl.graceAuthNsRemaining)
82 # perform search for own object (just to do any kind of search)
83 results = search.LDAPSearch(
84 conn, base=binddn, scope=ldap.SCOPE_BASE,
85 filter='(objectClass=*)', attributes=['dn'])
86 for entry in results:
87 if entry[0] == binddn:
88 return conn, constants.NSLCD_PAM_SUCCESS, ''
89 # if our DN wasn't found raise an error to signal bind failure
90 raise ldap.NO_SUCH_OBJECT()
91
92
93 def pwmod(conn, userdn, oldpassword, newpassword):
94 # perform request without old password
95 try:
96 conn.passwd_s(userdn, None, newpassword)
97 except ldap.LDAPError:
98 # retry with old password
99 if oldpassword:
100 conn.passwd_s(userdn, oldpassword, newpassword)
101 else:
102 raise
103
104
105 def update_lastchange(conns, userdn):
106 """Try to update the shadowLastChange attribute of the entry."""
107 attribute = shadow.attmap['shadowLastChange']
108 if str(attribute) == '"${shadowLastChange:--1}"':
109 attribute = 'shadowLastChange'
110 if not attribute or '$' in str(attribute):
111 raise ValueError('shadowLastChange has unsupported mapping')
112 # build the value for the new attribute
113 if attribute.lower() == 'pwdlastset':
114 # for AD we use another timestamp */
115 value = '%d000000000' % (int(time.time()) // 100 + (134774 * 864))
116 else:
117 # time in days since Jan 1, 1970
118 value = '%d' % (int(time.time()) // (60 * 60 * 24))
119 # perform the modification, return at first success
120 for conn in conns:
121 try:
122 conn.modify_s(userdn, [(ldap.MOD_REPLACE, attribute, [value.encode('utf-8')])])
123 return
124 except ldap.LDAPError:
125 pass # ignore error and try next connection
126
127
128 class PAMRequest(common.Request):
129
130 def validate(self, parameters):
131 """Check the username for validity and fill in the DN if needed."""
132 # check username for validity
133 common.validate_name(parameters['username'])
134 # look up user DN
135 entry = passwd.uid2entry(self.conn, parameters['username'])
136 if not entry:
137 # FIXME: we should close the stream with an empty response here
138 raise ValueError('%r: user not found' % parameters['username'])
139 # save the DN
140 parameters['userdn'] = entry[0]
141 # get the "real" username
142 value = passwd.attmap.get_rdn_value(entry[0], 'uid')
143 if not value:
144 # get the username from the uid attribute
145 values = entry[1]['uid']
146 if not values or not values[0]:
147 logging.warning('%s: is missing a %s attribute', entry[0], passwd.attmap['uid'])
148 value = values[0]
149 # check the username
150 if value and not common.is_valid_name(value):
151 raise ValueError('%s: has invalid %s attribute', entry[0], passwd.attmap['uid'])
152 # check if the username is different and update it if needed
153 if value != parameters['username']:
154 logging.info('username changed from %r to %r', parameters['username'], value)
155 parameters['username'] = value
156
157
158 class PAMAuthenticationRequest(PAMRequest):
159
160 action = constants.NSLCD_ACTION_PAM_AUTHC
161
162 def read_parameters(self, fp):
163 return dict(username=fp.read_string(),
164 service=fp.read_string(),
165 ruser=fp.read_string(),
166 rhost=fp.read_string(),
167 tty=fp.read_string(),
168 password=fp.read_string())
169 # TODO: log call with parameters
170
171 def write(self, username, authc=constants.NSLCD_PAM_SUCCESS,
172 authz=constants.NSLCD_PAM_SUCCESS, msg=''):
173 self.fp.write_int32(constants.NSLCD_RESULT_BEGIN)
174 self.fp.write_int32(authc)
175 self.fp.write_string(username)
176 self.fp.write_int32(authz)
177 self.fp.write_string(msg)
178 self.fp.write_int32(constants.NSLCD_RESULT_END)
179
180 def handle_request(self, parameters):
181 # if the username is blank and rootpwmoddn is configured, try to
182 # authenticate as administrator, otherwise validate request as usual
183 if not parameters['username'] and cfg.rootpwmoddn:
184 # authenticate as rootpwmoddn
185 binddn = cfg.rootpwmoddn
186 # if the caller is root we will allow the use of rootpwmodpw
187 if not parameters['password'] and self.calleruid == 0 and cfg.rootpwmodpw:
188 password = cfg.rootpwmodpw
189 elif parameters['password']:
190 password = parameters['password']
191 else:
192 raise ValueError('password missing')
193 else:
194 self.validate(parameters)
195 binddn = parameters['userdn']
196 password = parameters['password']
197 # try authentication
198 try:
199 conn, authz, msg = authenticate(binddn, password)
200 except ldap.INVALID_CREDENTIALS as e:
201 try:
202 msg = e[0]['desc']
203 except Exception:
204 msg = str(e)
205 logging.debug('bind failed: %s', msg)
206 self.write(parameters['username'], authc=constants.NSLCD_PAM_AUTH_ERR, msg=msg)
207 return
208 if authz != constants.NSLCD_PAM_SUCCESS:
209 logging.warning('%s: %s: %s', binddn, parameters['username'], msg)
210 else:
211 logging.debug('bind successful')
212 # FIXME: perform shadow attribute checks with check_shadow()
213 self.write(parameters['username'], authz=authz, msg=msg)
214
215
216 class PAMAuthorisationRequest(PAMRequest):
217
218 action = constants.NSLCD_ACTION_PAM_AUTHZ
219
220 def read_parameters(self, fp):
221 return dict(username=fp.read_string(),
222 service=fp.read_string(),
223 ruser=fp.read_string(),
224 rhost=fp.read_string(),
225 tty=fp.read_string())
226 # TODO: log call with parameters
227
228 def write(self, authz=constants.NSLCD_PAM_SUCCESS, msg=''):
229 self.fp.write_int32(constants.NSLCD_RESULT_BEGIN)
230 self.fp.write_int32(authz)
231 self.fp.write_string(msg)
232 self.fp.write_int32(constants.NSLCD_RESULT_END)
233
234 def check_authz_search(self, parameters):
235 if not cfg.pam_authz_searches:
236 return
237 # escape all parameters
238 variables = dict((k, escape_filter_chars(v)) for k, v in parameters.items())
239 variables.update(
240 hostname=escape_filter_chars(socket.gethostname()),
241 fqdn=escape_filter_chars(socket.getfqdn()),
242 dn=variables['userdn'],
243 uid=variables['username'])
244 # go over all authz searches
245 for x in cfg.pam_authz_searches:
246 filter = x.value(variables)
247 logging.debug('trying pam_authz_search "%s"', filter)
248 srch = search.LDAPSearch(self.conn, filter=filter, attributes=('dn', ))
249 try:
250 dn, values = srch.items().next()
251 except StopIteration:
252 logging.error('pam_authz_search "%s" found no matches', filter)
253 raise
254 logging.debug('pam_authz_search found "%s"', dn)
255
256 def handle_request(self, parameters):
257 # fill in any missing userdn, etc.
258 self.validate(parameters)
259 # check authorisation search
260 try:
261 self.check_authz_search(parameters)
262 except StopIteration:
263 self.write(constants.NSLCD_PAM_PERM_DENIED,
264 'LDAP authorisation check failed')
265 return
266 # all tests passed, return OK response
267 self.write()
268
269
270 class PAMPasswordModificationRequest(PAMRequest):
271
272 action = constants.NSLCD_ACTION_PAM_PWMOD
273
274 def read_parameters(self, fp):
275 return dict(username=fp.read_string(),
276 service=fp.read_string(),
277 ruser=fp.read_string(),
278 rhost=fp.read_string(),
279 tty=fp.read_string(),
280 asroot=fp.read_int32(),
281 oldpassword=fp.read_string(),
282 newpassword=fp.read_string())
283 # TODO: log call with parameters
284
285 def write(self, rc=constants.NSLCD_PAM_SUCCESS, msg=''):
286 self.fp.write_int32(constants.NSLCD_RESULT_BEGIN)
287 self.fp.write_int32(rc)
288 self.fp.write_string(msg)
289 self.fp.write_int32(constants.NSLCD_RESULT_END)
290
291 def handle_request(self, parameters):
292 # fill in any missing userdn, etc.
293 self.validate(parameters)
294 # check if pam_password_prohibit_message is set
295 if cfg.pam_password_prohibit_message:
296 self.write(constants.NSLCD_PAM_PERM_DENIED,
297 cfg.pam_password_prohibit_message)
298 return
299 # check if the the user passed the rootpwmoddn
300 if parameters['asroot']:
301 binddn = cfg.rootpwmoddn
302 # check if rootpwmodpw should be used
303 if not parameters['oldpassword'] and self.calleruid == 0 and cfg.rootpwmodpw:
304 password = cfg.rootpwmodpw
305 elif parameters['oldpassword']:
306 password = parameters['oldpassword']
307 else:
308 raise ValueError('password missing')
309 else:
310 binddn = parameters['userdn']
311 password = parameters['oldpassword']
312 # TODO: check if shadow properties allow password change
313 # perform password modification
314 try:
315 conn, authz, msg = authenticate(binddn, password)
316 pwmod(conn, parameters['userdn'], parameters['oldpassword'], parameters['newpassword'])
317 # try to update lastchange with normal or user connection
318 update_lastchange((self.conn, conn), parameters['userdn'])
319 except ldap.INVALID_CREDENTIALS as e:
320 try:
321 msg = e[0]['desc']
322 except Exception:
323 msg = str(e)
324 logging.debug('pwmod failed: %s', msg)
325 self.write(constants.NSLCD_PAM_PERM_DENIED, msg)
326 return
327 logging.debug('pwmod successful')
328 self.write()
329
330
331 SESSION_ID_LENGTH = 25
332 SESSION_ID_ALPHABET = (
333 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
334 "abcdefghijklmnopqrstuvwxyz" +
335 "01234567890"
336 )
337
338
339 def generate_session_id():
340 return ''.join(
341 random.choice(SESSION_ID_ALPHABET)
342 for i in range(SESSION_ID_LENGTH)
343 )
344
345
346 class PAMSessionOpenRequest(PAMRequest):
347
348 action = constants.NSLCD_ACTION_PAM_SESS_O
349
350 def read_parameters(self, fp):
351 return dict(username=fp.read_string(),
352 service=fp.read_string(),
353 ruser=fp.read_string(),
354 rhost=fp.read_string(),
355 tty=fp.read_string())
356 # TODO: log call with parameters
357
358 def write(self, sessionid):
359 self.fp.write_int32(constants.NSLCD_RESULT_BEGIN)
360 self.fp.write_string(sessionid)
361 self.fp.write_int32(constants.NSLCD_RESULT_END)
362
363 def handle_request(self, parameters):
364 # generate a session id
365 session_id = generate_session_id()
366 self.write(session_id)
367
368
369 class PAMSessionCloseRequest(PAMRequest):
370
371 action = constants.NSLCD_ACTION_PAM_SESS_C
372
373 def read_parameters(self, fp):
374 return dict(username=fp.read_string(),
375 service=fp.read_string(),
376 ruser=fp.read_string(),
377 rhost=fp.read_string(),
378 tty=fp.read_string(),
379 session_id=fp.read_string())
380 # TODO: log call with parameters
381
382 def write(self):
383 self.fp.write_int32(constants.NSLCD_RESULT_BEGIN)
384 self.fp.write_int32(constants.NSLCD_RESULT_END)
385
386 def handle_request(self, parameters):
387 self.write()