import ssl from ldap3.utils.hashed import hashed from ldap3 import SUBTREE, MODIFY_REPLACE, HASHED_SALTED_SHA512, HASHED_SALTED_MD5 from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError from flask import current_app as app from flask_ldapconn import LDAPConn from werkzeug.exceptions import BadRequest from flaschengeist.modules import AuthPlugin from flaschengeist.system.models.user import User import flaschengeist.system.controller.userController as userController class AuthLDAP(AuthPlugin): def __init__(self, cfg): super().__init__() config = {"PORT": 389, "USE_SSL": False} config.update(cfg) app.config.update( LDAP_SERVER=config["URL"], LDAP_PORT=config["PORT"], LDAP_BINDDN=config["BINDDN"], LDAP_USE_TLS=False, LDAP_USE_SSL=config["USE_SSL"], LDAP_TLS_VERSION=ssl.PROTOCOL_TLSv1_2, LDAP_REQUIRE_CERT=ssl.CERT_NONE, FORCE_ATTRIBUTE_VALUE_AS_LIST=True, ) if "SECRET" in config: app.config["LDAP_SECRET"] = (config["SECRET"],) self.ldap = LDAPConn(app) self.dn = config["BASEDN"] if "ADMIN_DN" in config: self.admin_dn = config["ADMIN_DN"] self.admin_secret = config["ADMIN_SECRET"] def login(self, user, password): if not user: return False return self.ldap.authenticate(user.userid, password, "uid", self.dn) def update_user(self, user): self.ldap.connection.search( "ou=user,{}".format(self.dn), "(uid={})".format(user.userid), SUBTREE, attributes=["uid", "givenName", "sn", "mail"], ) r = self.ldap.connection.response[0]["attributes"] if r["uid"][0] == user.userid: user.set_attribute("DN", self.ldap.connection.response[0]["dn"]) user.firstname = r["givenName"][0] user.lastname = r["sn"][0] if r["mail"]: user.mail = r["mail"][0] if "displayName" in r: user.display_name = r["displayName"][0] userController.set_roles(user, self._get_groups(user.userid)) def _get_groups(self, uid): groups = [] self.ldap.connection.search( "ou=user,{}".format(self.dn), "(uid={})".format(uid), SUBTREE, attributes=["gidNumber"] ) main_group_number = self.ldap.connection.response[0]["attributes"]["gidNumber"] if main_group_number: if type(main_group_number) is list: main_group_number = main_group_number[0] self.ldap.connection.search( "ou=group,{}".format(self.dn), "(gidNumber={})".format(main_group_number), attributes=["cn"] ) groups.append(self.ldap.connection.response[0]["attributes"]["cn"][0]) self.ldap.connection.search( "ou=group,{}".format(self.dn), "(memberUID={})".format(uid), SUBTREE, attributes=["cn"] ) groups_data = self.ldap.connection.response for data in groups_data: groups.append(data["attributes"]["cn"][0]) return groups def modify_user(self, user: User, password, new_password=None): try: dn = user.get_attribute("DN") if password: ldap_conn = self.ldap.connect(dn, password) else: ldap_conn = self.ldap.connect(self.admin_dn, self.admin_secret) modifier = {} for name, ldap_name in [ ("firstname", "givenName"), ("lastname", "sn"), ("mail", "mail"), ("display_name", "displayName"), ]: if hasattr(user, name): modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])] if new_password: # TODO: Use secure hash! salted_password = hashed(HASHED_SALTED_MD5, new_password) modifier["userPassword"] = [(MODIFY_REPLACE, [salted_password])] ldap_conn.modify(dn, modifier) except (LDAPPasswordIsMandatoryError, LDAPBindError): raise BadRequest