flaschengeist/flaschengeist/modules/auth_ldap/__init__.py

97 lines
4.0 KiB
Python

import ssl
from ldap3.utils.hashed import hashed
from ldap3 import SUBTREE, MODIFY_REPLACE, HASHED_SALTED_SHA512
from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError
from flask import current_app as app
from flask_ldapconn import LDAPConn
from werkzeug.exceptions import BadRequest
from flaschengeist.modules import AuthPlugin
from flaschengeist.system.models.user import User
class AuthLDAP(AuthPlugin):
def __init__(self, config):
super().__init__()
defaults = {
'PORT': '389',
'USE_SSL': 'False'
}
for name in defaults:
if name not in config:
config[name] = defaults[name]
app.config.update(
LDAP_SERVER=config['URL'],
LDAP_PORT=config.getint('PORT'),
LDAP_BINDDN=config['BINDDN'],
LDAP_USE_TLS=False,
LDAP_USE_SSL=config.getboolean('USE_SSL'),
LDAP_TLS_VERSION=ssl.PROTOCOL_TLSv1_2,
LDAP_REQUIRE_CERT=ssl.CERT_NONE,
FORCE_ATTRIBUTE_VALUE_AS_LIST=True
)
if 'SECRET' in config:
app.config['LDAP_SECRET'] = config['SECRET'],
self.ldap = LDAPConn(app)
self.dn = config['BASEDN']
def login(self, user, password):
if not user:
return False
return self.ldap.authenticate(user.uid, password, 'uid', self.dn)
def update_user(self, user):
self.ldap.connection.search('ou=user,{}'.format(self.dn), '(uid={})'.format(user.uid), SUBTREE,
attributes=['uid', 'givenName', 'sn', 'mail'])
r = self.ldap.connection.response[0]['attributes']
if r['uid'][0] == user.uid:
user.set_attribute('DN', self.ldap.connection.response[0]['dn'])
user.firstname = r['givenName'][0]
user.lastname = r['sn'][0]
if r['mail']:
user.mail = r['mail'][0]
if 'displayName' in r:
user.display_name = r['displayName'][0]
for group in self._get_groups(user.uid):
user.add_role(group)
def _get_groups(self, uid):
groups = []
self.ldap.connection.search('ou=user,{}'.format(self.dn), '(uid={})'.format(uid), SUBTREE,
attributes=['gidNumber'])
main_group_number = self.ldap.connection.response[0]['attributes']['gidNumber']
if main_group_number:
if type(main_group_number) is list:
main_group_number = main_group_number[0]
self.ldap.connection.search('ou=group,{}'.format(self.dn), '(gidNumber={})'.format(main_group_number),
attributes=['cn'])
groups.append(self.ldap.connection.response[0]['attributes']['cn'][0])
self.ldap.connection.search('ou=group,{}'.format(self.dn), '(memberUID={})'.format(uid), SUBTREE,
attributes=['cn'])
groups_data = self.ldap.connection.response
for data in groups_data:
groups.append(data['attributes']['cn'][0])
return groups
def modify_user(self, user: User, password, new_password=None):
try:
dn = user.attributes['DN'].value
ldap_conn = self.ldap.connect(dn, password)
modifier = {}
for name, ldap_name in [("firstname", "givenName"),
("lastname", "sn"),
("mail", "mail"),
("display_name", "displayName")]:
if getattr(user, name):
modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])]
if new_password:
salted_password = hashed(HASHED_SALTED_SHA512, new_password)
modifier['userPassword'] = [(MODIFY_REPLACE, [salted_password])]
ldap_conn.modify(dn, modifier)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest