From e4c552af019cbaccb258cc915625e40c1764e340 Mon Sep 17 00:00:00 2001 From: Ferdinand Thiessen Date: Sun, 26 Dec 2021 15:26:50 +0100 Subject: [PATCH] Proposal: Add plugin metadata class to seperate implementation --- flaschengeist/plugins/__init__.py | 64 +++- flaschengeist/plugins/auth_ldap/__init__.py | 307 +------------------- flaschengeist/plugins/auth_ldap/plugin.py | 307 ++++++++++++++++++++ 3 files changed, 364 insertions(+), 314 deletions(-) create mode 100644 flaschengeist/plugins/auth_ldap/plugin.py diff --git a/flaschengeist/plugins/__init__.py b/flaschengeist/plugins/__init__.py index 2db90c5..fefd82d 100644 --- a/flaschengeist/plugins/__init__.py +++ b/flaschengeist/plugins/__init__.py @@ -1,6 +1,8 @@ -import pkg_resources +from flask import Blueprint +from dataclasses import asdict, dataclass, field from werkzeug.datastructures import FileStorage from werkzeug.exceptions import MethodNotAllowed, NotFound + from flaschengeist.models.user import _Avatar, User from flaschengeist.utils.hook import HookBefore, HookAfter @@ -43,33 +45,71 @@ Args: """ -class Plugin: - """Base class for all Plugins - If your class uses custom models add a static property called ``models``""" +@dataclass +class PluginMetadata: + """Class providing metadata of a plugin""" - blueprint = None # You have to override + id: str + """Unique ID of the plugin (Hint: FQN)""" + + name: str + """Human readable name of the plugin""" + + module: str + """Module and class of the plugin + + Example: + my_pkg.my_module.plugin_class + """ + + version: str = None + """Version of the plugin + + If not provided the version will be set to the + distribution version of the package providing the module""" + + blueprint: Blueprint = None """Override with a `flask.blueprint` if the plugin uses custom routes""" - permissions = [] # You have to override + + permissions: list[str] = field(default_factory=list) """Override to add custom permissions used by the plugin A good style is to name the permissions with a prefix related to the plugin name, to prevent clashes with other plugins. E. g. instead of *delete* use *plugin_delete*. """ - id = "dev.flaschengeist.plugin" # You have to override - """Override with the unique ID of the plugin (Hint: FQN)""" - name = "plugin" # You have to override - """Override with human readable name of the plugin""" + models = None # You have to override """Override with models module""" + migrations_path = None # Override this with the location of your db migrations directory """Override with path to migration files, if custome db tables are used""" - def __init__(self, config=None): + def load(self, config=None) -> "Plugin": + """Load the plugin""" + import importlib + + mod, cls = self.module.rsplit(".", 1) + module = importlib.import_module(mod) + return getattr(module, cls)(self, config) + + def __post_init__(self): + """Post init to set version if not set automatically""" + if not self.version: + import pkg_resources + + self.version = pkg_resources.get_distribution(self.module.split(".")[0]).version + + +class Plugin(PluginMetadata): + """Base class for all Plugins + If your class uses custom models add a static property called ``models``""" + + def __init__(self, metadata: PluginMetadata, config=None): """Constructor called by create_app Args: config: Dict configuration containing the plugin section """ - self.version = pkg_resources.get_distribution(self.__module__.split(".")[0]).version + super().__init__(**asdict(metadata)) def install(self): """Installation routine diff --git a/flaschengeist/plugins/auth_ldap/__init__.py b/flaschengeist/plugins/auth_ldap/__init__.py index 697e972..1bcdb0d 100644 --- a/flaschengeist/plugins/auth_ldap/__init__.py +++ b/flaschengeist/plugins/auth_ldap/__init__.py @@ -1,307 +1,10 @@ -"""LDAP Authentication Provider Plugin""" -import os -import ssl -from PIL import Image -from io import BytesIO -from flask_ldapconn import LDAPConn -from flask import current_app as app -from ldap3 import SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE -from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError -from werkzeug.exceptions import BadRequest, InternalServerError, NotFound -from werkzeug.datastructures import FileStorage - -from flaschengeist import logger -from flaschengeist.controller import userController -from flaschengeist.models.user import User, Role, _Avatar -from flaschengeist.plugins import AuthPlugin, before_role_updated +from flaschengeist.plugins import PluginMetadata -class AuthLDAP(AuthPlugin): - def __init__(self, config): - super().__init__() - app.config.update( - LDAP_SERVER=config.get("host", "localhost"), - LDAP_PORT=config.get("port", 389), - LDAP_BINDDN=config.get("bind_dn", None), - LDAP_SECRET=config.get("secret", None), - LDAP_USE_SSL=config.get("use_ssl", False), - # That's not TLS, its dirty StartTLS on unencrypted LDAP - LDAP_USE_TLS=False, - LDAP_TLS_VERSION=ssl.PROTOCOL_TLS, - FORCE_ATTRIBUTE_VALUE_AS_LIST=True, - ) - if "ca_cert" in config: - app.config["LDAP_CA_CERTS_FILE"] = config["ca_cert"] - else: - # Default is CERT_REQUIRED - app.config["LDAP_REQUIRE_CERT"] = ssl.CERT_OPTIONAL - self.ldap = LDAPConn(app) - self.base_dn = config["base_dn"] - self.search_dn = config.get("search_dn", "ou=people,{base_dn}").format(base_dn=self.base_dn) - self.group_dn = config.get("group_dn", "ou=group,{base_dn}").format(base_dn=self.base_dn) - self.password_hash = config.get("password_hash", "SSHA").upper() - self.object_classes = config.get("object_classes", ["inetOrgPerson"]) - self.user_attributes: dict = config.get("user_attributes", {}) - self.dn_template = config.get("dn_template") +def loader(): + from .plugin import AuthLDAP - # TODO: might not be set if modify is called - self.root_dn = config.get("root_dn", None) - self.root_secret = config.get("root_secret", None) + return AuthLDAP - @before_role_updated - def _role_updated(role, new_name): - logger.debug(f"LDAP: before_role_updated called with ({role}, {new_name})") - self.__modify_role(role, new_name) - def login(self, user, password): - if not user: - return False - return self.ldap.authenticate(user.userid, password, "uid", self.base_dn) - - def find_user(self, userid, mail=None): - attr = self.__find(userid, mail) - if attr is not None: - user = User(userid=attr["uid"][0]) - self.__update(user, attr) - return user - - def update_user(self, user): - attr = self.__find(user.userid) - self.__update(user, attr) - - def create_user(self, user, password): - if self.root_dn is None: - logger.error("root_dn missing in ldap config!") - raise InternalServerError - - try: - ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) - attributes = self.user_attributes.copy() - if "uidNumber" in attributes: - self.ldap.connection.search( - self.search_dn, - "(uidNumber=*)", - SUBTREE, - attributes=["uidNumber"], - ) - resp = sorted( - self.ldap.response(), - key=lambda i: i["attributes"]["uidNumber"], - reverse=True, - ) - attributes["uidNumber"] = resp[0]["attributes"]["uidNumber"] + 1 if resp else attributes["uidNumber"] - dn = self.dn_template.format( - user=user, - base_dn=self.base_dn, - ) - if "default_gid" in attributes: - default_gid = attributes.pop("default_gid") - attributes["gidNumber"] = default_gid - if "homeDirectory" in attributes: - attributes["homeDirectory"] = attributes.get("homeDirectory").format( - firstname=user.firstname, - lastname=user.lastname, - userid=user.userid, - mail=user.mail, - display_name=user.display_name, - ) - attributes.update( - { - "sn": user.lastname, - "givenName": user.firstname, - "uid": user.userid, - "userPassword": self.__hash(password), - "mail": user.mail, - } - ) - if user.display_name: - attributes.update({"displayName": user.display_name}) - ldap_conn.add(dn, self.object_classes, attributes) - self._set_roles(user) - self.update_user(user) - except (LDAPPasswordIsMandatoryError, LDAPBindError): - raise BadRequest - - def modify_user(self, user: User, password=None, new_password=None): - try: - dn = user.get_attribute("DN") - if password: - ldap_conn = self.ldap.connect(dn, password) - else: - if self.root_dn is None: - logger.error("root_dn missing in ldap config!") - raise InternalServerError - ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) - modifier = {} - for name, ldap_name in [ - ("firstname", "givenName"), - ("lastname", "sn"), - ("mail", "mail"), - ("display_name", "displayName"), - ]: - if hasattr(user, name): - modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])] - if new_password: - modifier["userPassword"] = [(MODIFY_REPLACE, [self.__hash(new_password)])] - ldap_conn.modify(dn, modifier) - self._set_roles(user) - except (LDAPPasswordIsMandatoryError, LDAPBindError): - raise BadRequest - - def get_avatar(self, user): - self.ldap.connection.search( - self.search_dn, - "(uid={})".format(user.userid), - SUBTREE, - attributes=["jpegPhoto"], - ) - r = self.ldap.connection.response[0]["attributes"] - - if "jpegPhoto" in r and len(r["jpegPhoto"]) > 0: - avatar = _Avatar() - avatar.mimetype = "image/jpeg" - avatar.binary = bytearray(r["jpegPhoto"][0]) - return avatar - else: - raise NotFound - - def set_avatar(self, user: User, file: FileStorage): - if self.root_dn is None: - logger.error("root_dn missing in ldap config!") - raise InternalServerError - - image_bytes = BytesIO() - try: - # Make sure converted to RGB, e.g. png support RGBA but jpeg does not - image = Image.open(file).convert("RGB") - image.save(image_bytes, format="JPEG") - except IOError: - logger.debug(f"Could not convert avatar from '{file.mimetype}' to JPEG") - raise BadRequest("Unsupported image format") - - dn = user.get_attribute("DN") - ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) - ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [image_bytes.getvalue()])]}) - - def delete_avatar(self, user): - if self.root_dn is None: - logger.error("root_dn missing in ldap config!") - dn = user.get_attribute("DN") - ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) - ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [])]}) - - def __find(self, userid, mail=None): - """Find attributes of an user by uid or mail in LDAP""" - con = self.ldap.connection - if not con: - con = self.ldap.connect(self.root_dn, self.root_secret) - con.search( - self.search_dn, - f"(| (uid={userid})(mail={mail}))" if mail else f"(uid={userid})", - SUBTREE, - attributes=["uid", "givenName", "sn", "mail"], - ) - return con.response[0]["attributes"] if len(con.response) > 0 else None - - def __update(self, user, attr): - """Update an User object with LDAP attributes""" - if attr["uid"][0] == user.userid: - user.set_attribute("DN", self.ldap.connection.response[0]["dn"]) - user.firstname = attr["givenName"][0] - user.lastname = attr["sn"][0] - if attr["mail"]: - user.mail = attr["mail"][0] - if "displayName" in attr: - user.display_name = attr["displayName"][0] - userController.set_roles(user, self._get_groups(user.userid), create=True) - - def __modify_role( - self, - role: Role, - new_name, - ): - if self.root_dn is None: - logger.error("root_dn missing in ldap config!") - raise InternalServerError - try: - ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) - ldap_conn.search(self.group_dn, f"(cn={role.name})", SUBTREE, attributes=["cn"]) - if len(ldap_conn.response) > 0: - dn = ldap_conn.response[0]["dn"] - if new_name: - ldap_conn.modify_dn(dn, f"cn={new_name}") - else: - ldap_conn.delete(dn) - - except LDAPPasswordIsMandatoryError: - raise BadRequest - except LDAPBindError: - logger.debug(f"Could not bind to LDAP server", exc_info=True) - raise InternalServerError - - def __hash(self, password): - if self.password_hash == "ARGON2": - from argon2 import PasswordHasher - - return f"{{ARGON2}}{PasswordHasher().hash(password)}" - else: - from hashlib import pbkdf2_hmac, sha1 - import base64 - - salt = os.urandom(16) - if self.password_hash == "PBKDF2": - rounds = 200000 - password_hash = base64.b64encode(pbkdf2_hmac("sha512", password.encode("utf-8"), salt, rounds)).decode() - return f"{{PBKDF2-SHA512}}{rounds}${base64.b64encode(salt).decode()}${password_hash}" - else: - return f"{{SSHA}}{base64.b64encode(sha1(password.encode() + salt).digest() + salt).decode()}" - - def _get_groups(self, uid): - groups = [] - self.ldap.connection.search( - self.group_dn, - "(memberUID={})".format(uid), - SUBTREE, - attributes=["cn"], - ) - groups_data = self.ldap.connection.response - for data in groups_data: - groups.append(data["attributes"]["cn"][0]) - return groups - - def _get_all_roles(self): - self.ldap.connection.search( - self.group_dn, - "(cn=*)", - SUBTREE, - attributes=["cn", "gidNumber", "memberUid"], - ) - return self.ldap.response() - - def _set_roles(self, user: User): - try: - ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) - ldap_roles = self._get_all_roles() - - gid_numbers = sorted(ldap_roles, key=lambda i: i["attributes"]["gidNumber"], reverse=True) - gid_number = gid_numbers[0]["attributes"]["gidNumber"] + 1 - - for user_role in user.roles: - if user_role not in [role["attributes"]["cn"][0] for role in ldap_roles]: - ldap_conn.add( - f"cn={user_role},{self.group_dn}", - ["posixGroup"], - attributes={"gidNumber": gid_number}, - ) - - ldap_roles = self._get_all_roles() - - for ldap_role in ldap_roles: - if ldap_role["attributes"]["cn"][0] in user.roles: - modify = {"memberUid": [(MODIFY_ADD, [user.userid])]} - else: - modify = {"memberUid": [(MODIFY_DELETE, [user.userid])]} - ldap_conn.modify(ldap_role["dn"], modify) - - except (LDAPPasswordIsMandatoryError, LDAPBindError): - raise BadRequest +Plugin = PluginMetadata(id="auth_ldap", name="auth_ldap", plugin=loader) diff --git a/flaschengeist/plugins/auth_ldap/plugin.py b/flaschengeist/plugins/auth_ldap/plugin.py new file mode 100644 index 0000000..697e972 --- /dev/null +++ b/flaschengeist/plugins/auth_ldap/plugin.py @@ -0,0 +1,307 @@ +"""LDAP Authentication Provider Plugin""" +import os +import ssl +from PIL import Image +from io import BytesIO +from flask_ldapconn import LDAPConn +from flask import current_app as app +from ldap3 import SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE +from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError +from werkzeug.exceptions import BadRequest, InternalServerError, NotFound +from werkzeug.datastructures import FileStorage + +from flaschengeist import logger +from flaschengeist.controller import userController +from flaschengeist.models.user import User, Role, _Avatar +from flaschengeist.plugins import AuthPlugin, before_role_updated + + +class AuthLDAP(AuthPlugin): + def __init__(self, config): + super().__init__() + app.config.update( + LDAP_SERVER=config.get("host", "localhost"), + LDAP_PORT=config.get("port", 389), + LDAP_BINDDN=config.get("bind_dn", None), + LDAP_SECRET=config.get("secret", None), + LDAP_USE_SSL=config.get("use_ssl", False), + # That's not TLS, its dirty StartTLS on unencrypted LDAP + LDAP_USE_TLS=False, + LDAP_TLS_VERSION=ssl.PROTOCOL_TLS, + FORCE_ATTRIBUTE_VALUE_AS_LIST=True, + ) + if "ca_cert" in config: + app.config["LDAP_CA_CERTS_FILE"] = config["ca_cert"] + else: + # Default is CERT_REQUIRED + app.config["LDAP_REQUIRE_CERT"] = ssl.CERT_OPTIONAL + self.ldap = LDAPConn(app) + self.base_dn = config["base_dn"] + self.search_dn = config.get("search_dn", "ou=people,{base_dn}").format(base_dn=self.base_dn) + self.group_dn = config.get("group_dn", "ou=group,{base_dn}").format(base_dn=self.base_dn) + self.password_hash = config.get("password_hash", "SSHA").upper() + self.object_classes = config.get("object_classes", ["inetOrgPerson"]) + self.user_attributes: dict = config.get("user_attributes", {}) + self.dn_template = config.get("dn_template") + + # TODO: might not be set if modify is called + self.root_dn = config.get("root_dn", None) + self.root_secret = config.get("root_secret", None) + + @before_role_updated + def _role_updated(role, new_name): + logger.debug(f"LDAP: before_role_updated called with ({role}, {new_name})") + self.__modify_role(role, new_name) + + def login(self, user, password): + if not user: + return False + return self.ldap.authenticate(user.userid, password, "uid", self.base_dn) + + def find_user(self, userid, mail=None): + attr = self.__find(userid, mail) + if attr is not None: + user = User(userid=attr["uid"][0]) + self.__update(user, attr) + return user + + def update_user(self, user): + attr = self.__find(user.userid) + self.__update(user, attr) + + def create_user(self, user, password): + if self.root_dn is None: + logger.error("root_dn missing in ldap config!") + raise InternalServerError + + try: + ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) + attributes = self.user_attributes.copy() + if "uidNumber" in attributes: + self.ldap.connection.search( + self.search_dn, + "(uidNumber=*)", + SUBTREE, + attributes=["uidNumber"], + ) + resp = sorted( + self.ldap.response(), + key=lambda i: i["attributes"]["uidNumber"], + reverse=True, + ) + attributes["uidNumber"] = resp[0]["attributes"]["uidNumber"] + 1 if resp else attributes["uidNumber"] + dn = self.dn_template.format( + user=user, + base_dn=self.base_dn, + ) + if "default_gid" in attributes: + default_gid = attributes.pop("default_gid") + attributes["gidNumber"] = default_gid + if "homeDirectory" in attributes: + attributes["homeDirectory"] = attributes.get("homeDirectory").format( + firstname=user.firstname, + lastname=user.lastname, + userid=user.userid, + mail=user.mail, + display_name=user.display_name, + ) + attributes.update( + { + "sn": user.lastname, + "givenName": user.firstname, + "uid": user.userid, + "userPassword": self.__hash(password), + "mail": user.mail, + } + ) + if user.display_name: + attributes.update({"displayName": user.display_name}) + ldap_conn.add(dn, self.object_classes, attributes) + self._set_roles(user) + self.update_user(user) + except (LDAPPasswordIsMandatoryError, LDAPBindError): + raise BadRequest + + def modify_user(self, user: User, password=None, new_password=None): + try: + dn = user.get_attribute("DN") + if password: + ldap_conn = self.ldap.connect(dn, password) + else: + if self.root_dn is None: + logger.error("root_dn missing in ldap config!") + raise InternalServerError + ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) + modifier = {} + for name, ldap_name in [ + ("firstname", "givenName"), + ("lastname", "sn"), + ("mail", "mail"), + ("display_name", "displayName"), + ]: + if hasattr(user, name): + modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])] + if new_password: + modifier["userPassword"] = [(MODIFY_REPLACE, [self.__hash(new_password)])] + ldap_conn.modify(dn, modifier) + self._set_roles(user) + except (LDAPPasswordIsMandatoryError, LDAPBindError): + raise BadRequest + + def get_avatar(self, user): + self.ldap.connection.search( + self.search_dn, + "(uid={})".format(user.userid), + SUBTREE, + attributes=["jpegPhoto"], + ) + r = self.ldap.connection.response[0]["attributes"] + + if "jpegPhoto" in r and len(r["jpegPhoto"]) > 0: + avatar = _Avatar() + avatar.mimetype = "image/jpeg" + avatar.binary = bytearray(r["jpegPhoto"][0]) + return avatar + else: + raise NotFound + + def set_avatar(self, user: User, file: FileStorage): + if self.root_dn is None: + logger.error("root_dn missing in ldap config!") + raise InternalServerError + + image_bytes = BytesIO() + try: + # Make sure converted to RGB, e.g. png support RGBA but jpeg does not + image = Image.open(file).convert("RGB") + image.save(image_bytes, format="JPEG") + except IOError: + logger.debug(f"Could not convert avatar from '{file.mimetype}' to JPEG") + raise BadRequest("Unsupported image format") + + dn = user.get_attribute("DN") + ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) + ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [image_bytes.getvalue()])]}) + + def delete_avatar(self, user): + if self.root_dn is None: + logger.error("root_dn missing in ldap config!") + dn = user.get_attribute("DN") + ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) + ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [])]}) + + def __find(self, userid, mail=None): + """Find attributes of an user by uid or mail in LDAP""" + con = self.ldap.connection + if not con: + con = self.ldap.connect(self.root_dn, self.root_secret) + con.search( + self.search_dn, + f"(| (uid={userid})(mail={mail}))" if mail else f"(uid={userid})", + SUBTREE, + attributes=["uid", "givenName", "sn", "mail"], + ) + return con.response[0]["attributes"] if len(con.response) > 0 else None + + def __update(self, user, attr): + """Update an User object with LDAP attributes""" + if attr["uid"][0] == user.userid: + user.set_attribute("DN", self.ldap.connection.response[0]["dn"]) + user.firstname = attr["givenName"][0] + user.lastname = attr["sn"][0] + if attr["mail"]: + user.mail = attr["mail"][0] + if "displayName" in attr: + user.display_name = attr["displayName"][0] + userController.set_roles(user, self._get_groups(user.userid), create=True) + + def __modify_role( + self, + role: Role, + new_name, + ): + if self.root_dn is None: + logger.error("root_dn missing in ldap config!") + raise InternalServerError + try: + ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) + ldap_conn.search(self.group_dn, f"(cn={role.name})", SUBTREE, attributes=["cn"]) + if len(ldap_conn.response) > 0: + dn = ldap_conn.response[0]["dn"] + if new_name: + ldap_conn.modify_dn(dn, f"cn={new_name}") + else: + ldap_conn.delete(dn) + + except LDAPPasswordIsMandatoryError: + raise BadRequest + except LDAPBindError: + logger.debug(f"Could not bind to LDAP server", exc_info=True) + raise InternalServerError + + def __hash(self, password): + if self.password_hash == "ARGON2": + from argon2 import PasswordHasher + + return f"{{ARGON2}}{PasswordHasher().hash(password)}" + else: + from hashlib import pbkdf2_hmac, sha1 + import base64 + + salt = os.urandom(16) + if self.password_hash == "PBKDF2": + rounds = 200000 + password_hash = base64.b64encode(pbkdf2_hmac("sha512", password.encode("utf-8"), salt, rounds)).decode() + return f"{{PBKDF2-SHA512}}{rounds}${base64.b64encode(salt).decode()}${password_hash}" + else: + return f"{{SSHA}}{base64.b64encode(sha1(password.encode() + salt).digest() + salt).decode()}" + + def _get_groups(self, uid): + groups = [] + self.ldap.connection.search( + self.group_dn, + "(memberUID={})".format(uid), + SUBTREE, + attributes=["cn"], + ) + groups_data = self.ldap.connection.response + for data in groups_data: + groups.append(data["attributes"]["cn"][0]) + return groups + + def _get_all_roles(self): + self.ldap.connection.search( + self.group_dn, + "(cn=*)", + SUBTREE, + attributes=["cn", "gidNumber", "memberUid"], + ) + return self.ldap.response() + + def _set_roles(self, user: User): + try: + ldap_conn = self.ldap.connect(self.root_dn, self.root_secret) + ldap_roles = self._get_all_roles() + + gid_numbers = sorted(ldap_roles, key=lambda i: i["attributes"]["gidNumber"], reverse=True) + gid_number = gid_numbers[0]["attributes"]["gidNumber"] + 1 + + for user_role in user.roles: + if user_role not in [role["attributes"]["cn"][0] for role in ldap_roles]: + ldap_conn.add( + f"cn={user_role},{self.group_dn}", + ["posixGroup"], + attributes={"gidNumber": gid_number}, + ) + + ldap_roles = self._get_all_roles() + + for ldap_role in ldap_roles: + if ldap_role["attributes"]["cn"][0] in user.roles: + modify = {"memberUid": [(MODIFY_ADD, [user.userid])]} + else: + modify = {"memberUid": [(MODIFY_DELETE, [user.userid])]} + ldap_conn.modify(ldap_role["dn"], modify) + + except (LDAPPasswordIsMandatoryError, LDAPBindError): + raise BadRequest