"Fossies" - the Fresh Open Source Software Archive 
Member "nss-pam-ldapd-0.9.12/pynslcd/pam.py" (15 Nov 2021, 14964 Bytes) of package /linux/privat/nss-pam-ldapd-0.9.12.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
1
2 # pam.py - functions authentication, authorisation and session handling
3 #
4 # Copyright (C) 2010-2019 Arthur de Jong
5 #
6 # This library is free software; you can redistribute it and/or
7 # modify it under the terms of the GNU Lesser General Public
8 # License as published by the Free Software Foundation; either
9 # version 2.1 of the License, or (at your option) any later version.
10 #
11 # This library is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with this library; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301 USA
20
21 import logging
22 import random
23 import socket
24 import time
25
26 import ldap
27 from ldap.controls.ppolicy import PasswordPolicyControl, PasswordPolicyError
28 from ldap.filter import escape_filter_chars
29
30 import cfg
31 import common
32 import constants
33 import passwd
34 import search
35 import shadow
36
37
38 random = random.SystemRandom()
39
40
41 def authenticate(binddn, password):
42 # open a new connection
43 conn = search.Connection()
44 # bind using the specified credentials
45 serverctrls = []
46 if cfg.pam_authc_ppolicy:
47 serverctrls.append(PasswordPolicyControl())
48 res, data, msgid, ctrls = conn.simple_bind_s(binddn, password, serverctrls=serverctrls)
49 # go over bind result server controls
50 for ctrl in ctrls:
51 if ctrl.controlType == PasswordPolicyControl.controlType:
52 # found a password policy control
53 logging.debug(
54 'PasswordPolicyControl found: error=%s (%s), '
55 'timeBeforeExpiration=%s, graceAuthNsRemaining=%s',
56 'None' if ctrl.error is None else PasswordPolicyError(ctrl.error).prettyPrint(),
57 ctrl.error, ctrl.timeBeforeExpiration, ctrl.graceAuthNsRemaining)
58 if ctrl.error == 0: # passwordExpired
59 return (
60 conn, constants.NSLCD_PAM_AUTHTOK_EXPIRED,
61 PasswordPolicyError(ctrl.error).prettyPrint())
62 elif ctrl.error == 1: # accountLocked
63 return (
64 conn, constants.NSLCD_PAM_ACCT_EXPIRED,
65 PasswordPolicyError(ctrl.error).prettyPrint())
66 elif ctrl.error == 2: # changeAfterReset
67 return (
68 conn, constants.NSLCD_PAM_NEW_AUTHTOK_REQD,
69 'Password change is needed after reset')
70 elif ctrl.error:
71 return (
72 conn, constants.NSLCD_PAM_PERM_DENIED,
73 PasswordPolicyError(ctrl.error).prettyPrint())
74 elif ctrl.timeBeforeExpiration is not None:
75 return (
76 conn, constants.NSLCD_PAM_NEW_AUTHTOK_REQD,
77 'Password will expire in %d seconds' % ctrl.timeBeforeExpiration)
78 elif ctrl.graceAuthNsRemaining is not None:
79 return (
80 conn, constants.NSLCD_PAM_NEW_AUTHTOK_REQD,
81 'Password expired, %d grace logins left' % ctrl.graceAuthNsRemaining)
82 # perform search for own object (just to do any kind of search)
83 results = search.LDAPSearch(
84 conn, base=binddn, scope=ldap.SCOPE_BASE,
85 filter='(objectClass=*)', attributes=['dn'])
86 for entry in results:
87 if entry[0] == binddn:
88 return conn, constants.NSLCD_PAM_SUCCESS, ''
89 # if our DN wasn't found raise an error to signal bind failure
90 raise ldap.NO_SUCH_OBJECT()
91
92
93 def pwmod(conn, userdn, oldpassword, newpassword):
94 # perform request without old password
95 try:
96 conn.passwd_s(userdn, None, newpassword)
97 except ldap.LDAPError:
98 # retry with old password
99 if oldpassword:
100 conn.passwd_s(userdn, oldpassword, newpassword)
101 else:
102 raise
103
104
105 def update_lastchange(conns, userdn):
106 """Try to update the shadowLastChange attribute of the entry."""
107 attribute = shadow.attmap['shadowLastChange']
108 if str(attribute) == '"${shadowLastChange:--1}"':
109 attribute = 'shadowLastChange'
110 if not attribute or '$' in str(attribute):
111 raise ValueError('shadowLastChange has unsupported mapping')
112 # build the value for the new attribute
113 if attribute.lower() == 'pwdlastset':
114 # for AD we use another timestamp */
115 value = '%d000000000' % (int(time.time()) // 100 + (134774 * 864))
116 else:
117 # time in days since Jan 1, 1970
118 value = '%d' % (int(time.time()) // (60 * 60 * 24))
119 # perform the modification, return at first success
120 for conn in conns:
121 try:
122 conn.modify_s(userdn, [(ldap.MOD_REPLACE, attribute, [value.encode('utf-8')])])
123 return
124 except ldap.LDAPError:
125 pass # ignore error and try next connection
126
127
128 class PAMRequest(common.Request):
129
130 def validate(self, parameters):
131 """Check the username for validity and fill in the DN if needed."""
132 # check username for validity
133 common.validate_name(parameters['username'])
134 # look up user DN
135 entry = passwd.uid2entry(self.conn, parameters['username'])
136 if not entry:
137 # FIXME: we should close the stream with an empty response here
138 raise ValueError('%r: user not found' % parameters['username'])
139 # save the DN
140 parameters['userdn'] = entry[0]
141 # get the "real" username
142 value = passwd.attmap.get_rdn_value(entry[0], 'uid')
143 if not value:
144 # get the username from the uid attribute
145 values = entry[1]['uid']
146 if not values or not values[0]:
147 logging.warning('%s: is missing a %s attribute', entry[0], passwd.attmap['uid'])
148 value = values[0]
149 # check the username
150 if value and not common.is_valid_name(value):
151 raise ValueError('%s: has invalid %s attribute', entry[0], passwd.attmap['uid'])
152 # check if the username is different and update it if needed
153 if value != parameters['username']:
154 logging.info('username changed from %r to %r', parameters['username'], value)
155 parameters['username'] = value
156
157
158 class PAMAuthenticationRequest(PAMRequest):
159
160 action = constants.NSLCD_ACTION_PAM_AUTHC
161
162 def read_parameters(self, fp):
163 return dict(username=fp.read_string(),
164 service=fp.read_string(),
165 ruser=fp.read_string(),
166 rhost=fp.read_string(),
167 tty=fp.read_string(),
168 password=fp.read_string())
169 # TODO: log call with parameters
170
171 def write(self, username, authc=constants.NSLCD_PAM_SUCCESS,
172 authz=constants.NSLCD_PAM_SUCCESS, msg=''):
173 self.fp.write_int32(constants.NSLCD_RESULT_BEGIN)
174 self.fp.write_int32(authc)
175 self.fp.write_string(username)
176 self.fp.write_int32(authz)
177 self.fp.write_string(msg)
178 self.fp.write_int32(constants.NSLCD_RESULT_END)
179
180 def handle_request(self, parameters):
181 # if the username is blank and rootpwmoddn is configured, try to
182 # authenticate as administrator, otherwise validate request as usual
183 if not parameters['username'] and cfg.rootpwmoddn:
184 # authenticate as rootpwmoddn
185 binddn = cfg.rootpwmoddn
186 # if the caller is root we will allow the use of rootpwmodpw
187 if not parameters['password'] and self.calleruid == 0 and cfg.rootpwmodpw:
188 password = cfg.rootpwmodpw
189 elif parameters['password']:
190 password = parameters['password']
191 else:
192 raise ValueError('password missing')
193 else:
194 self.validate(parameters)
195 binddn = parameters['userdn']
196 password = parameters['password']
197 # try authentication
198 try:
199 conn, authz, msg = authenticate(binddn, password)
200 except ldap.INVALID_CREDENTIALS as e:
201 try:
202 msg = e[0]['desc']
203 except Exception:
204 msg = str(e)
205 logging.debug('bind failed: %s', msg)
206 self.write(parameters['username'], authc=constants.NSLCD_PAM_AUTH_ERR, msg=msg)
207 return
208 if authz != constants.NSLCD_PAM_SUCCESS:
209 logging.warning('%s: %s: %s', binddn, parameters['username'], msg)
210 else:
211 logging.debug('bind successful')
212 # FIXME: perform shadow attribute checks with check_shadow()
213 self.write(parameters['username'], authz=authz, msg=msg)
214
215
216 class PAMAuthorisationRequest(PAMRequest):
217
218 action = constants.NSLCD_ACTION_PAM_AUTHZ
219
220 def read_parameters(self, fp):
221 return dict(username=fp.read_string(),
222 service=fp.read_string(),
223 ruser=fp.read_string(),
224 rhost=fp.read_string(),
225 tty=fp.read_string())
226 # TODO: log call with parameters
227
228 def write(self, authz=constants.NSLCD_PAM_SUCCESS, msg=''):
229 self.fp.write_int32(constants.NSLCD_RESULT_BEGIN)
230 self.fp.write_int32(authz)
231 self.fp.write_string(msg)
232 self.fp.write_int32(constants.NSLCD_RESULT_END)
233
234 def check_authz_search(self, parameters):
235 if not cfg.pam_authz_searches:
236 return
237 # escape all parameters
238 variables = dict((k, escape_filter_chars(v)) for k, v in parameters.items())
239 variables.update(
240 hostname=escape_filter_chars(socket.gethostname()),
241 fqdn=escape_filter_chars(socket.getfqdn()),
242 dn=variables['userdn'],
243 uid=variables['username'])
244 # go over all authz searches
245 for x in cfg.pam_authz_searches:
246 filter = x.value(variables)
247 logging.debug('trying pam_authz_search "%s"', filter)
248 srch = search.LDAPSearch(self.conn, filter=filter, attributes=('dn', ))
249 try:
250 dn, values = srch.items().next()
251 except StopIteration:
252 logging.error('pam_authz_search "%s" found no matches', filter)
253 raise
254 logging.debug('pam_authz_search found "%s"', dn)
255
256 def handle_request(self, parameters):
257 # fill in any missing userdn, etc.
258 self.validate(parameters)
259 # check authorisation search
260 try:
261 self.check_authz_search(parameters)
262 except StopIteration:
263 self.write(constants.NSLCD_PAM_PERM_DENIED,
264 'LDAP authorisation check failed')
265 return
266 # all tests passed, return OK response
267 self.write()
268
269
270 class PAMPasswordModificationRequest(PAMRequest):
271
272 action = constants.NSLCD_ACTION_PAM_PWMOD
273
274 def read_parameters(self, fp):
275 return dict(username=fp.read_string(),
276 service=fp.read_string(),
277 ruser=fp.read_string(),
278 rhost=fp.read_string(),
279 tty=fp.read_string(),
280 asroot=fp.read_int32(),
281 oldpassword=fp.read_string(),
282 newpassword=fp.read_string())
283 # TODO: log call with parameters
284
285 def write(self, rc=constants.NSLCD_PAM_SUCCESS, msg=''):
286 self.fp.write_int32(constants.NSLCD_RESULT_BEGIN)
287 self.fp.write_int32(rc)
288 self.fp.write_string(msg)
289 self.fp.write_int32(constants.NSLCD_RESULT_END)
290
291 def handle_request(self, parameters):
292 # fill in any missing userdn, etc.
293 self.validate(parameters)
294 # check if pam_password_prohibit_message is set
295 if cfg.pam_password_prohibit_message:
296 self.write(constants.NSLCD_PAM_PERM_DENIED,
297 cfg.pam_password_prohibit_message)
298 return
299 # check if the the user passed the rootpwmoddn
300 if parameters['asroot']:
301 binddn = cfg.rootpwmoddn
302 # check if rootpwmodpw should be used
303 if not parameters['oldpassword'] and self.calleruid == 0 and cfg.rootpwmodpw:
304 password = cfg.rootpwmodpw
305 elif parameters['oldpassword']:
306 password = parameters['oldpassword']
307 else:
308 raise ValueError('password missing')
309 else:
310 binddn = parameters['userdn']
311 password = parameters['oldpassword']
312 # TODO: check if shadow properties allow password change
313 # perform password modification
314 try:
315 conn, authz, msg = authenticate(binddn, password)
316 pwmod(conn, parameters['userdn'], parameters['oldpassword'], parameters['newpassword'])
317 # try to update lastchange with normal or user connection
318 update_lastchange((self.conn, conn), parameters['userdn'])
319 except ldap.INVALID_CREDENTIALS as e:
320 try:
321 msg = e[0]['desc']
322 except Exception:
323 msg = str(e)
324 logging.debug('pwmod failed: %s', msg)
325 self.write(constants.NSLCD_PAM_PERM_DENIED, msg)
326 return
327 logging.debug('pwmod successful')
328 self.write()
329
330
331 SESSION_ID_LENGTH = 25
332 SESSION_ID_ALPHABET = (
333 "ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
334 "abcdefghijklmnopqrstuvwxyz" +
335 "01234567890"
336 )
337
338
339 def generate_session_id():
340 return ''.join(
341 random.choice(SESSION_ID_ALPHABET)
342 for i in range(SESSION_ID_LENGTH)
343 )
344
345
346 class PAMSessionOpenRequest(PAMRequest):
347
348 action = constants.NSLCD_ACTION_PAM_SESS_O
349
350 def read_parameters(self, fp):
351 return dict(username=fp.read_string(),
352 service=fp.read_string(),
353 ruser=fp.read_string(),
354 rhost=fp.read_string(),
355 tty=fp.read_string())
356 # TODO: log call with parameters
357
358 def write(self, sessionid):
359 self.fp.write_int32(constants.NSLCD_RESULT_BEGIN)
360 self.fp.write_string(sessionid)
361 self.fp.write_int32(constants.NSLCD_RESULT_END)
362
363 def handle_request(self, parameters):
364 # generate a session id
365 session_id = generate_session_id()
366 self.write(session_id)
367
368
369 class PAMSessionCloseRequest(PAMRequest):
370
371 action = constants.NSLCD_ACTION_PAM_SESS_C
372
373 def read_parameters(self, fp):
374 return dict(username=fp.read_string(),
375 service=fp.read_string(),
376 ruser=fp.read_string(),
377 rhost=fp.read_string(),
378 tty=fp.read_string(),
379 session_id=fp.read_string())
380 # TODO: log call with parameters
381
382 def write(self):
383 self.fp.write_int32(constants.NSLCD_RESULT_BEGIN)
384 self.fp.write_int32(constants.NSLCD_RESULT_END)
385
386 def handle_request(self, parameters):
387 self.write()