"""LDAP Authentication Provider Plugin""" import os import ssl from PIL import Image from io import BytesIO from flask_ldapconn import LDAPConn from flask import current_app as app from ldap3 import SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError from werkzeug.exceptions import BadRequest, InternalServerError, NotFound from werkzeug.datastructures import FileStorage from flaschengeist import logger from flaschengeist.controller import userController from flaschengeist.models.user import User, Role, _Avatar from flaschengeist.plugins import AuthPlugin, before_role_updated class AuthLDAP(AuthPlugin): id = "auth_ldap" def __init__(self, config): super().__init__() app.config.update( LDAP_SERVER=config.get("host", "localhost"), LDAP_PORT=config.get("port", 389), LDAP_BINDDN=config.get("bind_dn", None), LDAP_SECRET=config.get("secret", None), LDAP_USE_SSL=config.get("use_ssl", False), # That's not TLS, its dirty StartTLS on unencrypted LDAP LDAP_USE_TLS=False, LDAP_TLS_VERSION=ssl.PROTOCOL_TLS, FORCE_ATTRIBUTE_VALUE_AS_LIST=True, ) if "ca_cert" in config: app.config["LDAP_CA_CERTS_FILE"] = config["ca_cert"] else: # Default is CERT_REQUIRED app.config["LDAP_REQUIRE_CERT"] = ssl.CERT_OPTIONAL self.ldap = LDAPConn(app) self.base_dn = config["base_dn"] self.search_dn = config.get("search_dn", "ou=people,{base_dn}").format(base_dn=self.base_dn) self.group_dn = config.get("group_dn", "ou=group,{base_dn}").format(base_dn=self.base_dn) self.password_hash = config.get("password_hash", "SSHA").upper() self.object_classes = config.get("object_classes", ["inetOrgPerson"]) self.user_attributes: dict = config.get("user_attributes", {}) self.dn_template = config.get("dn_template") # TODO: might not be set if modify is called self.root_dn = config.get("root_dn", None) self.root_secret = config.get("root_secret", None) @before_role_updated def _role_updated(role, new_name): logger.debug(f"LDAP: before_role_updated called with ({role}, {new_name})") self.__modify_role(role, new_name) def login(self, user, password): if not user: return False return self.ldap.authenticate(user.userid, password, "uid", self.base_dn) def find_user(self, userid, mail=None): attr = self.__find(userid, mail) if attr is not None: user = User(userid=attr["uid"][0]) self.__update(user, attr) return user def update_user(self, user): attr = self.__find(user.userid) self.__update(user, attr) def create_user(self, user, password): if self.root_dn is None: logger.error("root_dn missing in ldap config!") raise InternalServerError try: ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) attributes = self.user_attributes.copy() if "uidNumber" in attributes: self.ldap.connection.search( self.search_dn, "(uidNumber=*)", SUBTREE, attributes=["uidNumber"], ) resp = sorted( self.ldap.response(), key=lambda i: i["attributes"]["uidNumber"], reverse=True, ) attributes["uidNumber"] = resp[0]["attributes"]["uidNumber"] + 1 if resp else attributes["uidNumber"] dn = self.dn_template.format( user=user, base_dn=self.base_dn, ) if "default_gid" in attributes: default_gid = attributes.pop("default_gid") attributes["gidNumber"] = default_gid if "homeDirectory" in attributes: attributes["homeDirectory"] = attributes.get("homeDirectory").format( firstname=user.firstname, lastname=user.lastname, userid=user.userid, mail=user.mail, display_name=user.display_name, ) attributes.update( { "sn": user.lastname, "givenName": user.firstname, "uid": user.userid, "userPassword": self.__hash(password), "mail": user.mail, } ) if user.display_name: attributes.update({"displayName": user.display_name}) ldap_conn.add(dn, self.object_classes, attributes) self._set_roles(user) self.update_user(user) except (LDAPPasswordIsMandatoryError, LDAPBindError): raise BadRequest def modify_user(self, user: User, password=None, new_password=None): try: dn = user.get_attribute("DN") if password: ldap_conn = self.ldap.connect(dn, password) else: if self.root_dn is None: logger.error("root_dn missing in ldap config!") raise InternalServerError ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) modifier = {} for name, ldap_name in [ ("firstname", "givenName"), ("lastname", "sn"), ("mail", "mail"), ("display_name", "displayName"), ]: if hasattr(user, name): modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])] if new_password: modifier["userPassword"] = [(MODIFY_REPLACE, [self.__hash(new_password)])] ldap_conn.modify(dn, modifier) self._set_roles(user) except (LDAPPasswordIsMandatoryError, LDAPBindError): raise BadRequest def get_avatar(self, user): self.ldap.connection.search( self.search_dn, "(uid={})".format(user.userid), SUBTREE, attributes=["jpegPhoto"], ) r = self.ldap.connection.response[0]["attributes"] if "jpegPhoto" in r and len(r["jpegPhoto"]) > 0: avatar = _Avatar() avatar.mimetype = "image/jpeg" avatar.binary = bytearray(r["jpegPhoto"][0]) return avatar else: raise NotFound def set_avatar(self, user: User, file: FileStorage): if self.root_dn is None: logger.error("root_dn missing in ldap config!") raise InternalServerError image_bytes = BytesIO() try: # Make sure converted to RGB, e.g. png support RGBA but jpeg does not image = Image.open(file).convert("RGB") image.save(image_bytes, format="JPEG") except IOError: logger.debug(f"Could not convert avatar from '{file.mimetype}' to JPEG") raise BadRequest("Unsupported image format") dn = user.get_attribute("DN") ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [image_bytes.getvalue()])]}) def delete_avatar(self, user): if self.root_dn is None: logger.error("root_dn missing in ldap config!") dn = user.get_attribute("DN") ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [])]}) def __find(self, userid, mail=None): """Find attributes of an user by uid or mail in LDAP""" con = self.ldap.connection if not con: con = self.ldap.connect(self.root_dn, self.root_secret) con.search( self.search_dn, f"(| (uid={userid})(mail={mail}))" if mail else f"(uid={userid})", SUBTREE, attributes=["uid", "givenName", "sn", "mail"], ) return con.response[0]["attributes"] if len(con.response) > 0 else None def __update(self, user, attr): """Update an User object with LDAP attributes""" if attr["uid"][0] == user.userid: user.set_attribute("DN", self.ldap.connection.response[0]["dn"]) user.firstname = attr["givenName"][0] user.lastname = attr["sn"][0] if attr["mail"]: user.mail = attr["mail"][0] if "displayName" in attr: user.display_name = attr["displayName"][0] userController.set_roles(user, self._get_groups(user.userid), create=True) def __modify_role( self, role: Role, new_name, ): if self.root_dn is None: logger.error("root_dn missing in ldap config!") raise InternalServerError try: ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) ldap_conn.search(self.group_dn, f"(cn={role.name})", SUBTREE, attributes=["cn"]) if len(ldap_conn.response) > 0: dn = ldap_conn.response[0]["dn"] if new_name: ldap_conn.modify_dn(dn, f"cn={new_name}") else: ldap_conn.delete(dn) except LDAPPasswordIsMandatoryError: raise BadRequest except LDAPBindError: logger.debug(f"Could not bind to LDAP server", exc_info=True) raise InternalServerError def __hash(self, password): if self.password_hash == "ARGON2": from argon2 import PasswordHasher return f"{{ARGON2}}{PasswordHasher().hash(password)}" else: from hashlib import pbkdf2_hmac, sha1 import base64 salt = os.urandom(16) if self.password_hash == "PBKDF2": rounds = 200000 password_hash = base64.b64encode(pbkdf2_hmac("sha512", password.encode("utf-8"), salt, rounds)).decode() return f"{{PBKDF2-SHA512}}{rounds}${base64.b64encode(salt).decode()}${password_hash}" else: return f"{{SSHA}}{base64.b64encode(sha1(password.encode() + salt).digest() + salt).decode()}" def _get_groups(self, uid): groups = [] self.ldap.connection.search( self.group_dn, "(memberUID={})".format(uid), SUBTREE, attributes=["cn"], ) groups_data = self.ldap.connection.response for data in groups_data: groups.append(data["attributes"]["cn"][0]) return groups def _get_all_roles(self): self.ldap.connection.search( self.group_dn, "(cn=*)", SUBTREE, attributes=["cn", "gidNumber", "memberUid"], ) return self.ldap.response() def _set_roles(self, user: User): try: ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) ldap_roles = self._get_all_roles() gid_numbers = sorted(ldap_roles, key=lambda i: i["attributes"]["gidNumber"], reverse=True) gid_number = gid_numbers[0]["attributes"]["gidNumber"] + 1 for user_role in user.roles: if user_role not in [role["attributes"]["cn"][0] for role in ldap_roles]: ldap_conn.add( f"cn={user_role},{self.group_dn}", ["posixGroup"], attributes={"gidNumber": gid_number}, ) ldap_roles = self._get_all_roles() for ldap_role in ldap_roles: if ldap_role["attributes"]["cn"][0] in user.roles: modify = {"memberUid": [(MODIFY_ADD, [user.userid])]} else: modify = {"memberUid": [(MODIFY_DELETE, [user.userid])]} ldap_conn.modify(ldap_role["dn"], modify) except (LDAPPasswordIsMandatoryError, LDAPBindError): raise BadRequest