"""LDAP Authentication Provider Plugin""" import io import os import ssl from typing import Optional from flask_ldapconn import LDAPConn from flask import current_app as app from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError from ldap3 import SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE from werkzeug.exceptions import BadRequest, InternalServerError, NotFound from flaschengeist import logger from flaschengeist.plugins import AuthPlugin, after_role_updated from flaschengeist.models.user import User, Role, _Avatar import flaschengeist.controller.userController as userController class AuthLDAP(AuthPlugin): def __init__(self, config): super().__init__() app.config.update( LDAP_SERVER=config.get("host", "localhost"), LDAP_PORT=config.get("port", 389), LDAP_BINDDN=config.get("bind_dn", None), LDAP_SECRET=config.get("secret", None), LDAP_USE_SSL=config.get("use_ssl", False), # That's not TLS, its dirty StartTLS on unencrypted LDAP LDAP_USE_TLS=False, LDAP_TLS_VERSION=ssl.PROTOCOL_TLS, FORCE_ATTRIBUTE_VALUE_AS_LIST=True, ) logger.warning(app.config.get("LDAP_USE_SSL")) if "ca_cert" in config: app.config["LDAP_CA_CERTS_FILE"] = config["ca_cert"] else: # Default is CERT_REQUIRED app.config["LDAP_REQUIRE_CERT"] = ssl.CERT_OPTIONAL self.ldap = LDAPConn(app) self.base_dn = config["base_dn"] self.search_dn = config.get("search_dn", "ou=people,{base_dn}").format(base_dn=self.base_dn) self.group_dn = config.get("group_dn", "ou=group,{base_dn}").format(base_dn=self.base_dn) self.password_hash = config.get("password_hash", "SSHA").upper() self.object_classes = config.get("object_classes", ["inetOrgPerson"]) self.user_attributes: dict = config.get("user_attributes", {}) # TODO: might not be set if modify is called self.root_dn = config.get("root_dn", None) self.root_secret = config.get("root_secret", None) @after_role_updated def _role_updated(role, new_name): self.__modify_role(role, new_name) def login(self, user, password): if not user: return False return self.ldap.authenticate(user.userid, password, "uid", self.base_dn) def find_user(self, userid, mail=None): attr = self.__find(userid, mail) if attr is not None: user = User(userid=attr["uid"][0]) self.__update(user, attr) return user def update_user(self, user): attr = self.__find(user.userid) self.__update(user, attr) def create_user(self, user, password): if self.root_dn is None: logger.error("root_dn missing in ldap config!") raise InternalServerError try: ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) attributes = self.user_attributes.copy() if "uidNumber" in attributes: self.ldap.connection.search( self.search_dn, "(uidNumber=*)", SUBTREE, attributes=["uidNumber"], ) resp = sorted( self.ldap.response(), key=lambda i: i["attributes"]["uidNumber"], reverse=True, ) attributes = resp[0]["attributes"]["uidNumber"] + 1 if resp else attributes["uidNumber"] dn = self.dn_template.format( firstname=user.firstname, lastname=user.lastname, userid=user.userid, mail=user.mail, display_name=user.display_name, base_dn=self.base_dn, ) attributes.update({ "sn": user.lastname, "givenName": user.firstname, "uid": user.userid, "userPassword": self.__hash(password), }) ldap_conn.add(dn, self.object_classes, attributes) self._set_roles(user) except (LDAPPasswordIsMandatoryError, LDAPBindError): raise BadRequest def modify_user(self, user: User, password=None, new_password=None): try: dn = user.get_attribute("DN") if password: ldap_conn = self.ldap.connect(dn, password) else: if self.root_dn is None: logger.error("root_dn missing in ldap config!") raise InternalServerError ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) modifier = {} for name, ldap_name in [ ("firstname", "givenName"), ("lastname", "sn"), ("mail", "mail"), ("display_name", "displayName"), ]: if hasattr(user, name): modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])] if new_password: modifier["userPassword"] = [(MODIFY_REPLACE, [self.__hash(new_password)])] ldap_conn.modify(dn, modifier) self._set_roles(user) except (LDAPPasswordIsMandatoryError, LDAPBindError): raise BadRequest def get_avatar(self, user): self.ldap.connection.search( self.search_dn, "(uid={})".format(user.userid), SUBTREE, attributes=["jpegPhoto"], ) r = self.ldap.connection.response[0]["attributes"] if "jpegPhoto" in r and len(r["jpegPhoto"]) > 0: avatar = _Avatar() avatar.mimetype = "image/jpeg" avatar.binary.extend(r["jpegPhoto"][0]) return avatar else: raise NotFound def set_avatar(self, user, avatar: _Avatar): if self.root_dn is None: logger.error("root_dn missing in ldap config!") raise InternalServerError if avatar.mimetype != "image/jpeg": # Try converting using Pillow (if installed) try: from PIL import Image image = Image.open(io.BytesIO(avatar.binary)) image_bytes = io.BytesIO() image.save(image_bytes, format="JPEG") avatar.binary = image_bytes.getvalue() avatar.mimetype = "image/jpeg" except ImportError: logger.debug("Pillow not installed for image conversion") raise BadRequest("Unsupported image format") except IOError: logger.debug(f"Could not convert avatar from '{avatar.mimetype}' to JPEG") raise BadRequest("Unsupported image format") dn = user.get_attribute("DN") ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [avatar.binary])]}) def __find(self, userid, mail=None): """Find attributes of an user by uid or mail in LDAP""" con = self.ldap.connection if not con: con = self.ldap.connect(self.root_dn, self.root_secret) con.search( self.search_dn, f"(| (uid={userid})(mail={mail}))" if mail else f"(uid={userid})", SUBTREE, attributes=["uid", "givenName", "sn", "mail"], ) return con.response[0]["attributes"] if len(con.response) > 0 else None def __update(self, user, attr): """Update an User object with LDAP attributes""" if attr["uid"][0] == user.userid: user.set_attribute("DN", self.ldap.connection.response[0]["dn"]) user.firstname = attr["givenName"][0] user.lastname = attr["sn"][0] if attr["mail"]: user.mail = attr["mail"][0] if "displayName" in attr: user.display_name = attr["displayName"][0] userController.set_roles(user, self._get_groups(user.userid), create=True) def __modify_role( self, role: Role, new_name: Optional[str], ): if self.root_dn is None: logger.error("root_dn missing in ldap config!") raise InternalServerError try: ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) ldap_conn.search(self.group_dn, f"(cn={role.name})", SUBTREE, attributes=["cn"]) if len(ldap_conn.response) > 0: dn = ldap_conn.response[0]["dn"] if new_name: ldap_conn.modify_dn(dn, f"cn={new_name}") else: ldap_conn.delete(dn) except LDAPPasswordIsMandatoryError: raise BadRequest except LDAPBindError: logger.debug(f"Could not bind to LDAP server", exc_info=True) raise InternalServerError def __hash(self, password): if self.password_hash == "ARGON2": from argon2 import PasswordHasher return f"{{ARGON2}}{PasswordHasher().hash(password)}" else: from hashlib import pbkdf2_hmac, sha1 import base64 salt = os.urandom(16) if self.password_hash == "PBKDF2": rounds = 200000 password_hash = base64.b64encode(pbkdf2_hmac("sha512", password.encode("utf-8"), salt, rounds)).decode() return f"{{PBKDF2-SHA512}}{rounds}${base64.b64encode(salt).decode()}${password_hash}" else: return f"{{SSHA}}{base64.b64encode(sha1(password + salt) + salt)}" def _get_groups(self, uid): groups = [] self.ldap.connection.search( self.group_dn, "(memberUID={})".format(uid), SUBTREE, attributes=["cn"], ) groups_data = self.ldap.connection.response for data in groups_data: groups.append(data["attributes"]["cn"][0]) return groups def _get_all_roles(self): self.ldap.connection.search( self.group_dn, "(cn=*)", SUBTREE, attributes=["cn", "gidNumber", "memberUid"], ) return self.ldap.response() def _set_roles(self, user: User): try: ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) ldap_roles = self._get_all_roles() gid_numbers = sorted(ldap_roles, key=lambda i: i["attributes"]["gidNumber"], reverse=True) gid_number = gid_numbers[0]["attributes"]["gidNumber"] + 1 for user_role in user.roles: if user_role not in [role["attributes"]["cn"][0] for role in ldap_roles]: ldap_conn.add( f"cn={user_role},{self.group_dn}", ["posixGroup"], attributes={"gidNumber": gid_number}, ) ldap_roles = self._get_all_roles() for ldap_role in ldap_roles: if ldap_role["attributes"]["cn"][0] in user.roles: modify = {"memberUid": [(MODIFY_ADD, [user.userid])]} else: modify = {"memberUid": [(MODIFY_DELETE, [user.userid])]} ldap_conn.modify(ldap_role["dn"], modify) except (LDAPPasswordIsMandatoryError, LDAPBindError): raise BadRequest