Proposal: Add plugin metadata class to seperate implementation
	
		
			
	
		
	
	
		
			
				
	
				continuous-integration/woodpecker the build failed
				
					Details
				
			
		
	
				
					
				
			
				
	
				continuous-integration/woodpecker the build failed
				
					Details
				
			
		
	This commit is contained in:
		
							parent
							
								
									a6cbc002f6
								
							
						
					
					
						commit
						96eb9799d6
					
				| 
						 | 
				
			
			@ -1,10 +1,13 @@
 | 
			
		|||
import sqlalchemy
 | 
			
		||||
import pkg_resources
 | 
			
		||||
from dataclasses import dataclass
 | 
			
		||||
from flask import Blueprint
 | 
			
		||||
from typing import Callable, Type
 | 
			
		||||
from werkzeug.datastructures import FileStorage
 | 
			
		||||
from werkzeug.exceptions import MethodNotAllowed, NotFound
 | 
			
		||||
from flaschengeist.controller import imageController
 | 
			
		||||
 | 
			
		||||
from flaschengeist.database import db
 | 
			
		||||
from flaschengeist.controller import imageController
 | 
			
		||||
from flaschengeist.models.notification import Notification
 | 
			
		||||
from flaschengeist.models.user import _Avatar, User
 | 
			
		||||
from flaschengeist.models.setting import _PluginSetting
 | 
			
		||||
| 
						 | 
				
			
			@ -49,27 +52,34 @@ Args:
 | 
			
		|||
"""
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Plugin:
 | 
			
		||||
    """Base class for all Plugins
 | 
			
		||||
    If your class uses custom models add a static property called ``models``"""
 | 
			
		||||
@dataclass
 | 
			
		||||
class PluginMetadata:
 | 
			
		||||
    """Class providing metadata of a plugin"""
 | 
			
		||||
 | 
			
		||||
    blueprint = None  # You have to override
 | 
			
		||||
    id: str
 | 
			
		||||
    """Unique ID of the plugin (Hint: FQN)"""
 | 
			
		||||
    name: str
 | 
			
		||||
    """Human readable name of the plugin"""
 | 
			
		||||
    plugin: Callable[[], Type["Plugin"]]
 | 
			
		||||
    """Retrieve the plugin class"""
 | 
			
		||||
    blueprint: Blueprint = None
 | 
			
		||||
    """Override with a `flask.blueprint` if the plugin uses custom routes"""
 | 
			
		||||
    permissions = []  # You have to override
 | 
			
		||||
    permissions: list[str] = []
 | 
			
		||||
    """Override to add custom permissions used by the plugin
 | 
			
		||||
    
 | 
			
		||||
    A good style is to name the permissions with a prefix related to the plugin name,
 | 
			
		||||
    to prevent clashes with other plugins. E. g. instead of *delete* use *plugin_delete*.
 | 
			
		||||
    """
 | 
			
		||||
    id = "dev.flaschengeist.plugin"  # You have to override
 | 
			
		||||
    """Override with the unique ID of the plugin (Hint: FQN)"""
 | 
			
		||||
    name = "plugin"  # You have to override
 | 
			
		||||
    """Override with human readable name of the plugin"""
 | 
			
		||||
    models = None  # You have to override
 | 
			
		||||
    """Override with models module"""
 | 
			
		||||
    migrations_path = None  # Override this with the location of your db migrations directory
 | 
			
		||||
    """Override with path to migration files, if custome db tables are used"""
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Plugin:
 | 
			
		||||
    """Base class for all Plugins
 | 
			
		||||
    If your class uses custom models add a static property called ``models``"""
 | 
			
		||||
 | 
			
		||||
    def __init__(self, config=None):
 | 
			
		||||
        """Constructor called by create_app
 | 
			
		||||
        Args:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,307 +1,10 @@
 | 
			
		|||
"""LDAP Authentication Provider Plugin"""
 | 
			
		||||
import os
 | 
			
		||||
import ssl
 | 
			
		||||
from PIL import Image
 | 
			
		||||
from io import BytesIO
 | 
			
		||||
from flask_ldapconn import LDAPConn
 | 
			
		||||
from flask import current_app as app
 | 
			
		||||
from ldap3 import SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE
 | 
			
		||||
from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError
 | 
			
		||||
from werkzeug.exceptions import BadRequest, InternalServerError, NotFound
 | 
			
		||||
from werkzeug.datastructures import FileStorage
 | 
			
		||||
 | 
			
		||||
from flaschengeist import logger
 | 
			
		||||
from flaschengeist.controller import userController
 | 
			
		||||
from flaschengeist.models.user import User, Role, _Avatar
 | 
			
		||||
from flaschengeist.plugins import AuthPlugin, before_role_updated
 | 
			
		||||
from flaschengeist.plugins import PluginMetadata
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class AuthLDAP(AuthPlugin):
 | 
			
		||||
    def __init__(self, config):
 | 
			
		||||
        super().__init__()
 | 
			
		||||
        app.config.update(
 | 
			
		||||
            LDAP_SERVER=config.get("host", "localhost"),
 | 
			
		||||
            LDAP_PORT=config.get("port", 389),
 | 
			
		||||
            LDAP_BINDDN=config.get("bind_dn", None),
 | 
			
		||||
            LDAP_SECRET=config.get("secret", None),
 | 
			
		||||
            LDAP_USE_SSL=config.get("use_ssl", False),
 | 
			
		||||
            # That's not TLS, its dirty StartTLS on unencrypted LDAP
 | 
			
		||||
            LDAP_USE_TLS=False,
 | 
			
		||||
            LDAP_TLS_VERSION=ssl.PROTOCOL_TLS,
 | 
			
		||||
            FORCE_ATTRIBUTE_VALUE_AS_LIST=True,
 | 
			
		||||
        )
 | 
			
		||||
        if "ca_cert" in config:
 | 
			
		||||
            app.config["LDAP_CA_CERTS_FILE"] = config["ca_cert"]
 | 
			
		||||
        else:
 | 
			
		||||
            # Default is CERT_REQUIRED
 | 
			
		||||
            app.config["LDAP_REQUIRE_CERT"] = ssl.CERT_OPTIONAL
 | 
			
		||||
        self.ldap = LDAPConn(app)
 | 
			
		||||
        self.base_dn = config["base_dn"]
 | 
			
		||||
        self.search_dn = config.get("search_dn", "ou=people,{base_dn}").format(base_dn=self.base_dn)
 | 
			
		||||
        self.group_dn = config.get("group_dn", "ou=group,{base_dn}").format(base_dn=self.base_dn)
 | 
			
		||||
        self.password_hash = config.get("password_hash", "SSHA").upper()
 | 
			
		||||
        self.object_classes = config.get("object_classes", ["inetOrgPerson"])
 | 
			
		||||
        self.user_attributes: dict = config.get("user_attributes", {})
 | 
			
		||||
        self.dn_template = config.get("dn_template")
 | 
			
		||||
def loader():
 | 
			
		||||
    from .plugin import AuthLDAP
 | 
			
		||||
 | 
			
		||||
        # TODO: might not be set if modify is called
 | 
			
		||||
        self.root_dn = config.get("root_dn", None)
 | 
			
		||||
        self.root_secret = config.get("root_secret", None)
 | 
			
		||||
    return AuthLDAP
 | 
			
		||||
 | 
			
		||||
        @before_role_updated
 | 
			
		||||
        def _role_updated(role, new_name):
 | 
			
		||||
            logger.debug(f"LDAP: before_role_updated called with ({role}, {new_name})")
 | 
			
		||||
            self.__modify_role(role, new_name)
 | 
			
		||||
 | 
			
		||||
    def login(self, user, password):
 | 
			
		||||
        if not user:
 | 
			
		||||
            return False
 | 
			
		||||
        return self.ldap.authenticate(user.userid, password, "uid", self.base_dn)
 | 
			
		||||
 | 
			
		||||
    def find_user(self, userid, mail=None):
 | 
			
		||||
        attr = self.__find(userid, mail)
 | 
			
		||||
        if attr is not None:
 | 
			
		||||
            user = User(userid=attr["uid"][0])
 | 
			
		||||
            self.__update(user, attr)
 | 
			
		||||
            return user
 | 
			
		||||
 | 
			
		||||
    def update_user(self, user):
 | 
			
		||||
        attr = self.__find(user.userid)
 | 
			
		||||
        self.__update(user, attr)
 | 
			
		||||
 | 
			
		||||
    def create_user(self, user, password):
 | 
			
		||||
        if self.root_dn is None:
 | 
			
		||||
            logger.error("root_dn missing in ldap config!")
 | 
			
		||||
            raise InternalServerError
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
 | 
			
		||||
            attributes = self.user_attributes.copy()
 | 
			
		||||
            if "uidNumber" in attributes:
 | 
			
		||||
                self.ldap.connection.search(
 | 
			
		||||
                    self.search_dn,
 | 
			
		||||
                    "(uidNumber=*)",
 | 
			
		||||
                    SUBTREE,
 | 
			
		||||
                    attributes=["uidNumber"],
 | 
			
		||||
                )
 | 
			
		||||
                resp = sorted(
 | 
			
		||||
                    self.ldap.response(),
 | 
			
		||||
                    key=lambda i: i["attributes"]["uidNumber"],
 | 
			
		||||
                    reverse=True,
 | 
			
		||||
                )
 | 
			
		||||
                attributes["uidNumber"] = resp[0]["attributes"]["uidNumber"] + 1 if resp else attributes["uidNumber"]
 | 
			
		||||
            dn = self.dn_template.format(
 | 
			
		||||
                user=user,
 | 
			
		||||
                base_dn=self.base_dn,
 | 
			
		||||
            )
 | 
			
		||||
            if "default_gid" in attributes:
 | 
			
		||||
                default_gid = attributes.pop("default_gid")
 | 
			
		||||
                attributes["gidNumber"] = default_gid
 | 
			
		||||
            if "homeDirectory" in attributes:
 | 
			
		||||
                attributes["homeDirectory"] = attributes.get("homeDirectory").format(
 | 
			
		||||
                    firstname=user.firstname,
 | 
			
		||||
                    lastname=user.lastname,
 | 
			
		||||
                    userid=user.userid,
 | 
			
		||||
                    mail=user.mail,
 | 
			
		||||
                    display_name=user.display_name,
 | 
			
		||||
                )
 | 
			
		||||
            attributes.update(
 | 
			
		||||
                {
 | 
			
		||||
                    "sn": user.lastname,
 | 
			
		||||
                    "givenName": user.firstname,
 | 
			
		||||
                    "uid": user.userid,
 | 
			
		||||
                    "userPassword": self.__hash(password),
 | 
			
		||||
                    "mail": user.mail,
 | 
			
		||||
                }
 | 
			
		||||
            )
 | 
			
		||||
            if user.display_name:
 | 
			
		||||
                attributes.update({"displayName": user.display_name})
 | 
			
		||||
            ldap_conn.add(dn, self.object_classes, attributes)
 | 
			
		||||
            self._set_roles(user)
 | 
			
		||||
            self.update_user(user)
 | 
			
		||||
        except (LDAPPasswordIsMandatoryError, LDAPBindError):
 | 
			
		||||
            raise BadRequest
 | 
			
		||||
 | 
			
		||||
    def modify_user(self, user: User, password=None, new_password=None):
 | 
			
		||||
        try:
 | 
			
		||||
            dn = user.get_attribute("DN")
 | 
			
		||||
            if password:
 | 
			
		||||
                ldap_conn = self.ldap.connect(dn, password)
 | 
			
		||||
            else:
 | 
			
		||||
                if self.root_dn is None:
 | 
			
		||||
                    logger.error("root_dn missing in ldap config!")
 | 
			
		||||
                    raise InternalServerError
 | 
			
		||||
                ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
 | 
			
		||||
            modifier = {}
 | 
			
		||||
            for name, ldap_name in [
 | 
			
		||||
                ("firstname", "givenName"),
 | 
			
		||||
                ("lastname", "sn"),
 | 
			
		||||
                ("mail", "mail"),
 | 
			
		||||
                ("display_name", "displayName"),
 | 
			
		||||
            ]:
 | 
			
		||||
                if hasattr(user, name):
 | 
			
		||||
                    modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])]
 | 
			
		||||
            if new_password:
 | 
			
		||||
                modifier["userPassword"] = [(MODIFY_REPLACE, [self.__hash(new_password)])]
 | 
			
		||||
            ldap_conn.modify(dn, modifier)
 | 
			
		||||
            self._set_roles(user)
 | 
			
		||||
        except (LDAPPasswordIsMandatoryError, LDAPBindError):
 | 
			
		||||
            raise BadRequest
 | 
			
		||||
 | 
			
		||||
    def get_avatar(self, user):
 | 
			
		||||
        self.ldap.connection.search(
 | 
			
		||||
            self.search_dn,
 | 
			
		||||
            "(uid={})".format(user.userid),
 | 
			
		||||
            SUBTREE,
 | 
			
		||||
            attributes=["jpegPhoto"],
 | 
			
		||||
        )
 | 
			
		||||
        r = self.ldap.connection.response[0]["attributes"]
 | 
			
		||||
 | 
			
		||||
        if "jpegPhoto" in r and len(r["jpegPhoto"]) > 0:
 | 
			
		||||
            avatar = _Avatar()
 | 
			
		||||
            avatar.mimetype = "image/jpeg"
 | 
			
		||||
            avatar.binary = bytearray(r["jpegPhoto"][0])
 | 
			
		||||
            return avatar
 | 
			
		||||
        else:
 | 
			
		||||
            raise NotFound
 | 
			
		||||
 | 
			
		||||
    def set_avatar(self, user: User, file: FileStorage):
 | 
			
		||||
        if self.root_dn is None:
 | 
			
		||||
            logger.error("root_dn missing in ldap config!")
 | 
			
		||||
            raise InternalServerError
 | 
			
		||||
 | 
			
		||||
        image_bytes = BytesIO()
 | 
			
		||||
        try:
 | 
			
		||||
            # Make sure converted to RGB, e.g. png support RGBA but jpeg does not
 | 
			
		||||
            image = Image.open(file).convert("RGB")
 | 
			
		||||
            image.save(image_bytes, format="JPEG")
 | 
			
		||||
        except IOError:
 | 
			
		||||
            logger.debug(f"Could not convert avatar from '{file.mimetype}' to JPEG")
 | 
			
		||||
            raise BadRequest("Unsupported image format")
 | 
			
		||||
 | 
			
		||||
        dn = user.get_attribute("DN")
 | 
			
		||||
        ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
 | 
			
		||||
        ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [image_bytes.getvalue()])]})
 | 
			
		||||
 | 
			
		||||
    def delete_avatar(self, user):
 | 
			
		||||
        if self.root_dn is None:
 | 
			
		||||
            logger.error("root_dn missing in ldap config!")
 | 
			
		||||
        dn = user.get_attribute("DN")
 | 
			
		||||
        ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
 | 
			
		||||
        ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [])]})
 | 
			
		||||
 | 
			
		||||
    def __find(self, userid, mail=None):
 | 
			
		||||
        """Find attributes of an user by uid or mail in LDAP"""
 | 
			
		||||
        con = self.ldap.connection
 | 
			
		||||
        if not con:
 | 
			
		||||
            con = self.ldap.connect(self.root_dn, self.root_secret)
 | 
			
		||||
        con.search(
 | 
			
		||||
            self.search_dn,
 | 
			
		||||
            f"(| (uid={userid})(mail={mail}))" if mail else f"(uid={userid})",
 | 
			
		||||
            SUBTREE,
 | 
			
		||||
            attributes=["uid", "givenName", "sn", "mail"],
 | 
			
		||||
        )
 | 
			
		||||
        return con.response[0]["attributes"] if len(con.response) > 0 else None
 | 
			
		||||
 | 
			
		||||
    def __update(self, user, attr):
 | 
			
		||||
        """Update an User object with LDAP attributes"""
 | 
			
		||||
        if attr["uid"][0] == user.userid:
 | 
			
		||||
            user.set_attribute("DN", self.ldap.connection.response[0]["dn"])
 | 
			
		||||
            user.firstname = attr["givenName"][0]
 | 
			
		||||
            user.lastname = attr["sn"][0]
 | 
			
		||||
            if attr["mail"]:
 | 
			
		||||
                user.mail = attr["mail"][0]
 | 
			
		||||
            if "displayName" in attr:
 | 
			
		||||
                user.display_name = attr["displayName"][0]
 | 
			
		||||
            userController.set_roles(user, self._get_groups(user.userid), create=True)
 | 
			
		||||
 | 
			
		||||
    def __modify_role(
 | 
			
		||||
        self,
 | 
			
		||||
        role: Role,
 | 
			
		||||
        new_name,
 | 
			
		||||
    ):
 | 
			
		||||
        if self.root_dn is None:
 | 
			
		||||
            logger.error("root_dn missing in ldap config!")
 | 
			
		||||
            raise InternalServerError
 | 
			
		||||
        try:
 | 
			
		||||
            ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
 | 
			
		||||
            ldap_conn.search(self.group_dn, f"(cn={role.name})", SUBTREE, attributes=["cn"])
 | 
			
		||||
            if len(ldap_conn.response) > 0:
 | 
			
		||||
                dn = ldap_conn.response[0]["dn"]
 | 
			
		||||
                if new_name:
 | 
			
		||||
                    ldap_conn.modify_dn(dn, f"cn={new_name}")
 | 
			
		||||
                else:
 | 
			
		||||
                    ldap_conn.delete(dn)
 | 
			
		||||
 | 
			
		||||
        except LDAPPasswordIsMandatoryError:
 | 
			
		||||
            raise BadRequest
 | 
			
		||||
        except LDAPBindError:
 | 
			
		||||
            logger.debug(f"Could not bind to LDAP server", exc_info=True)
 | 
			
		||||
            raise InternalServerError
 | 
			
		||||
 | 
			
		||||
    def __hash(self, password):
 | 
			
		||||
        if self.password_hash == "ARGON2":
 | 
			
		||||
            from argon2 import PasswordHasher
 | 
			
		||||
 | 
			
		||||
            return f"{{ARGON2}}{PasswordHasher().hash(password)}"
 | 
			
		||||
        else:
 | 
			
		||||
            from hashlib import pbkdf2_hmac, sha1
 | 
			
		||||
            import base64
 | 
			
		||||
 | 
			
		||||
            salt = os.urandom(16)
 | 
			
		||||
            if self.password_hash == "PBKDF2":
 | 
			
		||||
                rounds = 200000
 | 
			
		||||
                password_hash = base64.b64encode(pbkdf2_hmac("sha512", password.encode("utf-8"), salt, rounds)).decode()
 | 
			
		||||
                return f"{{PBKDF2-SHA512}}{rounds}${base64.b64encode(salt).decode()}${password_hash}"
 | 
			
		||||
            else:
 | 
			
		||||
                return f"{{SSHA}}{base64.b64encode(sha1(password.encode() + salt).digest() + salt).decode()}"
 | 
			
		||||
 | 
			
		||||
    def _get_groups(self, uid):
 | 
			
		||||
        groups = []
 | 
			
		||||
        self.ldap.connection.search(
 | 
			
		||||
            self.group_dn,
 | 
			
		||||
            "(memberUID={})".format(uid),
 | 
			
		||||
            SUBTREE,
 | 
			
		||||
            attributes=["cn"],
 | 
			
		||||
        )
 | 
			
		||||
        groups_data = self.ldap.connection.response
 | 
			
		||||
        for data in groups_data:
 | 
			
		||||
            groups.append(data["attributes"]["cn"][0])
 | 
			
		||||
        return groups
 | 
			
		||||
 | 
			
		||||
    def _get_all_roles(self):
 | 
			
		||||
        self.ldap.connection.search(
 | 
			
		||||
            self.group_dn,
 | 
			
		||||
            "(cn=*)",
 | 
			
		||||
            SUBTREE,
 | 
			
		||||
            attributes=["cn", "gidNumber", "memberUid"],
 | 
			
		||||
        )
 | 
			
		||||
        return self.ldap.response()
 | 
			
		||||
 | 
			
		||||
    def _set_roles(self, user: User):
 | 
			
		||||
        try:
 | 
			
		||||
            ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
 | 
			
		||||
            ldap_roles = self._get_all_roles()
 | 
			
		||||
 | 
			
		||||
            gid_numbers = sorted(ldap_roles, key=lambda i: i["attributes"]["gidNumber"], reverse=True)
 | 
			
		||||
            gid_number = gid_numbers[0]["attributes"]["gidNumber"] + 1
 | 
			
		||||
 | 
			
		||||
            for user_role in user.roles:
 | 
			
		||||
                if user_role not in [role["attributes"]["cn"][0] for role in ldap_roles]:
 | 
			
		||||
                    ldap_conn.add(
 | 
			
		||||
                        f"cn={user_role},{self.group_dn}",
 | 
			
		||||
                        ["posixGroup"],
 | 
			
		||||
                        attributes={"gidNumber": gid_number},
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
            ldap_roles = self._get_all_roles()
 | 
			
		||||
 | 
			
		||||
            for ldap_role in ldap_roles:
 | 
			
		||||
                if ldap_role["attributes"]["cn"][0] in user.roles:
 | 
			
		||||
                    modify = {"memberUid": [(MODIFY_ADD, [user.userid])]}
 | 
			
		||||
                else:
 | 
			
		||||
                    modify = {"memberUid": [(MODIFY_DELETE, [user.userid])]}
 | 
			
		||||
                ldap_conn.modify(ldap_role["dn"], modify)
 | 
			
		||||
 | 
			
		||||
        except (LDAPPasswordIsMandatoryError, LDAPBindError):
 | 
			
		||||
            raise BadRequest
 | 
			
		||||
Plugin = PluginMetadata(id="auth_ldap", name="auth_ldap", plugin=loader)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,307 @@
 | 
			
		|||
"""LDAP Authentication Provider Plugin"""
 | 
			
		||||
import os
 | 
			
		||||
import ssl
 | 
			
		||||
from PIL import Image
 | 
			
		||||
from io import BytesIO
 | 
			
		||||
from flask_ldapconn import LDAPConn
 | 
			
		||||
from flask import current_app as app
 | 
			
		||||
from ldap3 import SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE
 | 
			
		||||
from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError
 | 
			
		||||
from werkzeug.exceptions import BadRequest, InternalServerError, NotFound
 | 
			
		||||
from werkzeug.datastructures import FileStorage
 | 
			
		||||
 | 
			
		||||
from flaschengeist import logger
 | 
			
		||||
from flaschengeist.controller import userController
 | 
			
		||||
from flaschengeist.models.user import User, Role, _Avatar
 | 
			
		||||
from flaschengeist.plugins import AuthPlugin, before_role_updated
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class AuthLDAP(AuthPlugin):
 | 
			
		||||
    def __init__(self, config):
 | 
			
		||||
        super().__init__()
 | 
			
		||||
        app.config.update(
 | 
			
		||||
            LDAP_SERVER=config.get("host", "localhost"),
 | 
			
		||||
            LDAP_PORT=config.get("port", 389),
 | 
			
		||||
            LDAP_BINDDN=config.get("bind_dn", None),
 | 
			
		||||
            LDAP_SECRET=config.get("secret", None),
 | 
			
		||||
            LDAP_USE_SSL=config.get("use_ssl", False),
 | 
			
		||||
            # That's not TLS, its dirty StartTLS on unencrypted LDAP
 | 
			
		||||
            LDAP_USE_TLS=False,
 | 
			
		||||
            LDAP_TLS_VERSION=ssl.PROTOCOL_TLS,
 | 
			
		||||
            FORCE_ATTRIBUTE_VALUE_AS_LIST=True,
 | 
			
		||||
        )
 | 
			
		||||
        if "ca_cert" in config:
 | 
			
		||||
            app.config["LDAP_CA_CERTS_FILE"] = config["ca_cert"]
 | 
			
		||||
        else:
 | 
			
		||||
            # Default is CERT_REQUIRED
 | 
			
		||||
            app.config["LDAP_REQUIRE_CERT"] = ssl.CERT_OPTIONAL
 | 
			
		||||
        self.ldap = LDAPConn(app)
 | 
			
		||||
        self.base_dn = config["base_dn"]
 | 
			
		||||
        self.search_dn = config.get("search_dn", "ou=people,{base_dn}").format(base_dn=self.base_dn)
 | 
			
		||||
        self.group_dn = config.get("group_dn", "ou=group,{base_dn}").format(base_dn=self.base_dn)
 | 
			
		||||
        self.password_hash = config.get("password_hash", "SSHA").upper()
 | 
			
		||||
        self.object_classes = config.get("object_classes", ["inetOrgPerson"])
 | 
			
		||||
        self.user_attributes: dict = config.get("user_attributes", {})
 | 
			
		||||
        self.dn_template = config.get("dn_template")
 | 
			
		||||
 | 
			
		||||
        # TODO: might not be set if modify is called
 | 
			
		||||
        self.root_dn = config.get("root_dn", None)
 | 
			
		||||
        self.root_secret = config.get("root_secret", None)
 | 
			
		||||
 | 
			
		||||
        @before_role_updated
 | 
			
		||||
        def _role_updated(role, new_name):
 | 
			
		||||
            logger.debug(f"LDAP: before_role_updated called with ({role}, {new_name})")
 | 
			
		||||
            self.__modify_role(role, new_name)
 | 
			
		||||
 | 
			
		||||
    def login(self, user, password):
 | 
			
		||||
        if not user:
 | 
			
		||||
            return False
 | 
			
		||||
        return self.ldap.authenticate(user.userid, password, "uid", self.base_dn)
 | 
			
		||||
 | 
			
		||||
    def find_user(self, userid, mail=None):
 | 
			
		||||
        attr = self.__find(userid, mail)
 | 
			
		||||
        if attr is not None:
 | 
			
		||||
            user = User(userid=attr["uid"][0])
 | 
			
		||||
            self.__update(user, attr)
 | 
			
		||||
            return user
 | 
			
		||||
 | 
			
		||||
    def update_user(self, user):
 | 
			
		||||
        attr = self.__find(user.userid)
 | 
			
		||||
        self.__update(user, attr)
 | 
			
		||||
 | 
			
		||||
    def create_user(self, user, password):
 | 
			
		||||
        if self.root_dn is None:
 | 
			
		||||
            logger.error("root_dn missing in ldap config!")
 | 
			
		||||
            raise InternalServerError
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
 | 
			
		||||
            attributes = self.user_attributes.copy()
 | 
			
		||||
            if "uidNumber" in attributes:
 | 
			
		||||
                self.ldap.connection.search(
 | 
			
		||||
                    self.search_dn,
 | 
			
		||||
                    "(uidNumber=*)",
 | 
			
		||||
                    SUBTREE,
 | 
			
		||||
                    attributes=["uidNumber"],
 | 
			
		||||
                )
 | 
			
		||||
                resp = sorted(
 | 
			
		||||
                    self.ldap.response(),
 | 
			
		||||
                    key=lambda i: i["attributes"]["uidNumber"],
 | 
			
		||||
                    reverse=True,
 | 
			
		||||
                )
 | 
			
		||||
                attributes["uidNumber"] = resp[0]["attributes"]["uidNumber"] + 1 if resp else attributes["uidNumber"]
 | 
			
		||||
            dn = self.dn_template.format(
 | 
			
		||||
                user=user,
 | 
			
		||||
                base_dn=self.base_dn,
 | 
			
		||||
            )
 | 
			
		||||
            if "default_gid" in attributes:
 | 
			
		||||
                default_gid = attributes.pop("default_gid")
 | 
			
		||||
                attributes["gidNumber"] = default_gid
 | 
			
		||||
            if "homeDirectory" in attributes:
 | 
			
		||||
                attributes["homeDirectory"] = attributes.get("homeDirectory").format(
 | 
			
		||||
                    firstname=user.firstname,
 | 
			
		||||
                    lastname=user.lastname,
 | 
			
		||||
                    userid=user.userid,
 | 
			
		||||
                    mail=user.mail,
 | 
			
		||||
                    display_name=user.display_name,
 | 
			
		||||
                )
 | 
			
		||||
            attributes.update(
 | 
			
		||||
                {
 | 
			
		||||
                    "sn": user.lastname,
 | 
			
		||||
                    "givenName": user.firstname,
 | 
			
		||||
                    "uid": user.userid,
 | 
			
		||||
                    "userPassword": self.__hash(password),
 | 
			
		||||
                    "mail": user.mail,
 | 
			
		||||
                }
 | 
			
		||||
            )
 | 
			
		||||
            if user.display_name:
 | 
			
		||||
                attributes.update({"displayName": user.display_name})
 | 
			
		||||
            ldap_conn.add(dn, self.object_classes, attributes)
 | 
			
		||||
            self._set_roles(user)
 | 
			
		||||
            self.update_user(user)
 | 
			
		||||
        except (LDAPPasswordIsMandatoryError, LDAPBindError):
 | 
			
		||||
            raise BadRequest
 | 
			
		||||
 | 
			
		||||
    def modify_user(self, user: User, password=None, new_password=None):
 | 
			
		||||
        try:
 | 
			
		||||
            dn = user.get_attribute("DN")
 | 
			
		||||
            if password:
 | 
			
		||||
                ldap_conn = self.ldap.connect(dn, password)
 | 
			
		||||
            else:
 | 
			
		||||
                if self.root_dn is None:
 | 
			
		||||
                    logger.error("root_dn missing in ldap config!")
 | 
			
		||||
                    raise InternalServerError
 | 
			
		||||
                ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
 | 
			
		||||
            modifier = {}
 | 
			
		||||
            for name, ldap_name in [
 | 
			
		||||
                ("firstname", "givenName"),
 | 
			
		||||
                ("lastname", "sn"),
 | 
			
		||||
                ("mail", "mail"),
 | 
			
		||||
                ("display_name", "displayName"),
 | 
			
		||||
            ]:
 | 
			
		||||
                if hasattr(user, name):
 | 
			
		||||
                    modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])]
 | 
			
		||||
            if new_password:
 | 
			
		||||
                modifier["userPassword"] = [(MODIFY_REPLACE, [self.__hash(new_password)])]
 | 
			
		||||
            ldap_conn.modify(dn, modifier)
 | 
			
		||||
            self._set_roles(user)
 | 
			
		||||
        except (LDAPPasswordIsMandatoryError, LDAPBindError):
 | 
			
		||||
            raise BadRequest
 | 
			
		||||
 | 
			
		||||
    def get_avatar(self, user):
 | 
			
		||||
        self.ldap.connection.search(
 | 
			
		||||
            self.search_dn,
 | 
			
		||||
            "(uid={})".format(user.userid),
 | 
			
		||||
            SUBTREE,
 | 
			
		||||
            attributes=["jpegPhoto"],
 | 
			
		||||
        )
 | 
			
		||||
        r = self.ldap.connection.response[0]["attributes"]
 | 
			
		||||
 | 
			
		||||
        if "jpegPhoto" in r and len(r["jpegPhoto"]) > 0:
 | 
			
		||||
            avatar = _Avatar()
 | 
			
		||||
            avatar.mimetype = "image/jpeg"
 | 
			
		||||
            avatar.binary = bytearray(r["jpegPhoto"][0])
 | 
			
		||||
            return avatar
 | 
			
		||||
        else:
 | 
			
		||||
            raise NotFound
 | 
			
		||||
 | 
			
		||||
    def set_avatar(self, user: User, file: FileStorage):
 | 
			
		||||
        if self.root_dn is None:
 | 
			
		||||
            logger.error("root_dn missing in ldap config!")
 | 
			
		||||
            raise InternalServerError
 | 
			
		||||
 | 
			
		||||
        image_bytes = BytesIO()
 | 
			
		||||
        try:
 | 
			
		||||
            # Make sure converted to RGB, e.g. png support RGBA but jpeg does not
 | 
			
		||||
            image = Image.open(file).convert("RGB")
 | 
			
		||||
            image.save(image_bytes, format="JPEG")
 | 
			
		||||
        except IOError:
 | 
			
		||||
            logger.debug(f"Could not convert avatar from '{file.mimetype}' to JPEG")
 | 
			
		||||
            raise BadRequest("Unsupported image format")
 | 
			
		||||
 | 
			
		||||
        dn = user.get_attribute("DN")
 | 
			
		||||
        ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
 | 
			
		||||
        ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [image_bytes.getvalue()])]})
 | 
			
		||||
 | 
			
		||||
    def delete_avatar(self, user):
 | 
			
		||||
        if self.root_dn is None:
 | 
			
		||||
            logger.error("root_dn missing in ldap config!")
 | 
			
		||||
        dn = user.get_attribute("DN")
 | 
			
		||||
        ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
 | 
			
		||||
        ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [])]})
 | 
			
		||||
 | 
			
		||||
    def __find(self, userid, mail=None):
 | 
			
		||||
        """Find attributes of an user by uid or mail in LDAP"""
 | 
			
		||||
        con = self.ldap.connection
 | 
			
		||||
        if not con:
 | 
			
		||||
            con = self.ldap.connect(self.root_dn, self.root_secret)
 | 
			
		||||
        con.search(
 | 
			
		||||
            self.search_dn,
 | 
			
		||||
            f"(| (uid={userid})(mail={mail}))" if mail else f"(uid={userid})",
 | 
			
		||||
            SUBTREE,
 | 
			
		||||
            attributes=["uid", "givenName", "sn", "mail"],
 | 
			
		||||
        )
 | 
			
		||||
        return con.response[0]["attributes"] if len(con.response) > 0 else None
 | 
			
		||||
 | 
			
		||||
    def __update(self, user, attr):
 | 
			
		||||
        """Update an User object with LDAP attributes"""
 | 
			
		||||
        if attr["uid"][0] == user.userid:
 | 
			
		||||
            user.set_attribute("DN", self.ldap.connection.response[0]["dn"])
 | 
			
		||||
            user.firstname = attr["givenName"][0]
 | 
			
		||||
            user.lastname = attr["sn"][0]
 | 
			
		||||
            if attr["mail"]:
 | 
			
		||||
                user.mail = attr["mail"][0]
 | 
			
		||||
            if "displayName" in attr:
 | 
			
		||||
                user.display_name = attr["displayName"][0]
 | 
			
		||||
            userController.set_roles(user, self._get_groups(user.userid), create=True)
 | 
			
		||||
 | 
			
		||||
    def __modify_role(
 | 
			
		||||
        self,
 | 
			
		||||
        role: Role,
 | 
			
		||||
        new_name,
 | 
			
		||||
    ):
 | 
			
		||||
        if self.root_dn is None:
 | 
			
		||||
            logger.error("root_dn missing in ldap config!")
 | 
			
		||||
            raise InternalServerError
 | 
			
		||||
        try:
 | 
			
		||||
            ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
 | 
			
		||||
            ldap_conn.search(self.group_dn, f"(cn={role.name})", SUBTREE, attributes=["cn"])
 | 
			
		||||
            if len(ldap_conn.response) > 0:
 | 
			
		||||
                dn = ldap_conn.response[0]["dn"]
 | 
			
		||||
                if new_name:
 | 
			
		||||
                    ldap_conn.modify_dn(dn, f"cn={new_name}")
 | 
			
		||||
                else:
 | 
			
		||||
                    ldap_conn.delete(dn)
 | 
			
		||||
 | 
			
		||||
        except LDAPPasswordIsMandatoryError:
 | 
			
		||||
            raise BadRequest
 | 
			
		||||
        except LDAPBindError:
 | 
			
		||||
            logger.debug(f"Could not bind to LDAP server", exc_info=True)
 | 
			
		||||
            raise InternalServerError
 | 
			
		||||
 | 
			
		||||
    def __hash(self, password):
 | 
			
		||||
        if self.password_hash == "ARGON2":
 | 
			
		||||
            from argon2 import PasswordHasher
 | 
			
		||||
 | 
			
		||||
            return f"{{ARGON2}}{PasswordHasher().hash(password)}"
 | 
			
		||||
        else:
 | 
			
		||||
            from hashlib import pbkdf2_hmac, sha1
 | 
			
		||||
            import base64
 | 
			
		||||
 | 
			
		||||
            salt = os.urandom(16)
 | 
			
		||||
            if self.password_hash == "PBKDF2":
 | 
			
		||||
                rounds = 200000
 | 
			
		||||
                password_hash = base64.b64encode(pbkdf2_hmac("sha512", password.encode("utf-8"), salt, rounds)).decode()
 | 
			
		||||
                return f"{{PBKDF2-SHA512}}{rounds}${base64.b64encode(salt).decode()}${password_hash}"
 | 
			
		||||
            else:
 | 
			
		||||
                return f"{{SSHA}}{base64.b64encode(sha1(password.encode() + salt).digest() + salt).decode()}"
 | 
			
		||||
 | 
			
		||||
    def _get_groups(self, uid):
 | 
			
		||||
        groups = []
 | 
			
		||||
        self.ldap.connection.search(
 | 
			
		||||
            self.group_dn,
 | 
			
		||||
            "(memberUID={})".format(uid),
 | 
			
		||||
            SUBTREE,
 | 
			
		||||
            attributes=["cn"],
 | 
			
		||||
        )
 | 
			
		||||
        groups_data = self.ldap.connection.response
 | 
			
		||||
        for data in groups_data:
 | 
			
		||||
            groups.append(data["attributes"]["cn"][0])
 | 
			
		||||
        return groups
 | 
			
		||||
 | 
			
		||||
    def _get_all_roles(self):
 | 
			
		||||
        self.ldap.connection.search(
 | 
			
		||||
            self.group_dn,
 | 
			
		||||
            "(cn=*)",
 | 
			
		||||
            SUBTREE,
 | 
			
		||||
            attributes=["cn", "gidNumber", "memberUid"],
 | 
			
		||||
        )
 | 
			
		||||
        return self.ldap.response()
 | 
			
		||||
 | 
			
		||||
    def _set_roles(self, user: User):
 | 
			
		||||
        try:
 | 
			
		||||
            ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
 | 
			
		||||
            ldap_roles = self._get_all_roles()
 | 
			
		||||
 | 
			
		||||
            gid_numbers = sorted(ldap_roles, key=lambda i: i["attributes"]["gidNumber"], reverse=True)
 | 
			
		||||
            gid_number = gid_numbers[0]["attributes"]["gidNumber"] + 1
 | 
			
		||||
 | 
			
		||||
            for user_role in user.roles:
 | 
			
		||||
                if user_role not in [role["attributes"]["cn"][0] for role in ldap_roles]:
 | 
			
		||||
                    ldap_conn.add(
 | 
			
		||||
                        f"cn={user_role},{self.group_dn}",
 | 
			
		||||
                        ["posixGroup"],
 | 
			
		||||
                        attributes={"gidNumber": gid_number},
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
            ldap_roles = self._get_all_roles()
 | 
			
		||||
 | 
			
		||||
            for ldap_role in ldap_roles:
 | 
			
		||||
                if ldap_role["attributes"]["cn"][0] in user.roles:
 | 
			
		||||
                    modify = {"memberUid": [(MODIFY_ADD, [user.userid])]}
 | 
			
		||||
                else:
 | 
			
		||||
                    modify = {"memberUid": [(MODIFY_DELETE, [user.userid])]}
 | 
			
		||||
                ldap_conn.modify(ldap_role["dn"], modify)
 | 
			
		||||
 | 
			
		||||
        except (LDAPPasswordIsMandatoryError, LDAPBindError):
 | 
			
		||||
            raise BadRequest
 | 
			
		||||
		Loading…
	
		Reference in New Issue