Proposal: Add plugin metadata class to seperate implementation

This commit is contained in:
Ferdinand Thiessen 2021-12-26 15:26:50 +01:00
parent 90999bbefb
commit e4c552af01
3 changed files with 364 additions and 314 deletions

View File

@ -1,6 +1,8 @@
import pkg_resources
from flask import Blueprint
from dataclasses import asdict, dataclass, field
from werkzeug.datastructures import FileStorage
from werkzeug.exceptions import MethodNotAllowed, NotFound
from flaschengeist.models.user import _Avatar, User
from flaschengeist.utils.hook import HookBefore, HookAfter
@ -43,33 +45,71 @@ Args:
"""
class Plugin:
"""Base class for all Plugins
If your class uses custom models add a static property called ``models``"""
@dataclass
class PluginMetadata:
"""Class providing metadata of a plugin"""
blueprint = None # You have to override
id: str
"""Unique ID of the plugin (Hint: FQN)"""
name: str
"""Human readable name of the plugin"""
module: str
"""Module and class of the plugin
Example:
my_pkg.my_module.plugin_class
"""
version: str = None
"""Version of the plugin
If not provided the version will be set to the
distribution version of the package providing the module"""
blueprint: Blueprint = None
"""Override with a `flask.blueprint` if the plugin uses custom routes"""
permissions = [] # You have to override
permissions: list[str] = field(default_factory=list)
"""Override to add custom permissions used by the plugin
A good style is to name the permissions with a prefix related to the plugin name,
to prevent clashes with other plugins. E. g. instead of *delete* use *plugin_delete*.
"""
id = "dev.flaschengeist.plugin" # You have to override
"""Override with the unique ID of the plugin (Hint: FQN)"""
name = "plugin" # You have to override
"""Override with human readable name of the plugin"""
models = None # You have to override
"""Override with models module"""
migrations_path = None # Override this with the location of your db migrations directory
"""Override with path to migration files, if custome db tables are used"""
def __init__(self, config=None):
def load(self, config=None) -> "Plugin":
"""Load the plugin"""
import importlib
mod, cls = self.module.rsplit(".", 1)
module = importlib.import_module(mod)
return getattr(module, cls)(self, config)
def __post_init__(self):
"""Post init to set version if not set automatically"""
if not self.version:
import pkg_resources
self.version = pkg_resources.get_distribution(self.module.split(".")[0]).version
class Plugin(PluginMetadata):
"""Base class for all Plugins
If your class uses custom models add a static property called ``models``"""
def __init__(self, metadata: PluginMetadata, config=None):
"""Constructor called by create_app
Args:
config: Dict configuration containing the plugin section
"""
self.version = pkg_resources.get_distribution(self.__module__.split(".")[0]).version
super().__init__(**asdict(metadata))
def install(self):
"""Installation routine

View File

@ -1,307 +1,10 @@
"""LDAP Authentication Provider Plugin"""
import os
import ssl
from PIL import Image
from io import BytesIO
from flask_ldapconn import LDAPConn
from flask import current_app as app
from ldap3 import SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE
from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError
from werkzeug.exceptions import BadRequest, InternalServerError, NotFound
from werkzeug.datastructures import FileStorage
from flaschengeist import logger
from flaschengeist.controller import userController
from flaschengeist.models.user import User, Role, _Avatar
from flaschengeist.plugins import AuthPlugin, before_role_updated
from flaschengeist.plugins import PluginMetadata
class AuthLDAP(AuthPlugin):
def __init__(self, config):
super().__init__()
app.config.update(
LDAP_SERVER=config.get("host", "localhost"),
LDAP_PORT=config.get("port", 389),
LDAP_BINDDN=config.get("bind_dn", None),
LDAP_SECRET=config.get("secret", None),
LDAP_USE_SSL=config.get("use_ssl", False),
# That's not TLS, its dirty StartTLS on unencrypted LDAP
LDAP_USE_TLS=False,
LDAP_TLS_VERSION=ssl.PROTOCOL_TLS,
FORCE_ATTRIBUTE_VALUE_AS_LIST=True,
)
if "ca_cert" in config:
app.config["LDAP_CA_CERTS_FILE"] = config["ca_cert"]
else:
# Default is CERT_REQUIRED
app.config["LDAP_REQUIRE_CERT"] = ssl.CERT_OPTIONAL
self.ldap = LDAPConn(app)
self.base_dn = config["base_dn"]
self.search_dn = config.get("search_dn", "ou=people,{base_dn}").format(base_dn=self.base_dn)
self.group_dn = config.get("group_dn", "ou=group,{base_dn}").format(base_dn=self.base_dn)
self.password_hash = config.get("password_hash", "SSHA").upper()
self.object_classes = config.get("object_classes", ["inetOrgPerson"])
self.user_attributes: dict = config.get("user_attributes", {})
self.dn_template = config.get("dn_template")
def loader():
from .plugin import AuthLDAP
# TODO: might not be set if modify is called
self.root_dn = config.get("root_dn", None)
self.root_secret = config.get("root_secret", None)
return AuthLDAP
@before_role_updated
def _role_updated(role, new_name):
logger.debug(f"LDAP: before_role_updated called with ({role}, {new_name})")
self.__modify_role(role, new_name)
def login(self, user, password):
if not user:
return False
return self.ldap.authenticate(user.userid, password, "uid", self.base_dn)
def find_user(self, userid, mail=None):
attr = self.__find(userid, mail)
if attr is not None:
user = User(userid=attr["uid"][0])
self.__update(user, attr)
return user
def update_user(self, user):
attr = self.__find(user.userid)
self.__update(user, attr)
def create_user(self, user, password):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
try:
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
attributes = self.user_attributes.copy()
if "uidNumber" in attributes:
self.ldap.connection.search(
self.search_dn,
"(uidNumber=*)",
SUBTREE,
attributes=["uidNumber"],
)
resp = sorted(
self.ldap.response(),
key=lambda i: i["attributes"]["uidNumber"],
reverse=True,
)
attributes["uidNumber"] = resp[0]["attributes"]["uidNumber"] + 1 if resp else attributes["uidNumber"]
dn = self.dn_template.format(
user=user,
base_dn=self.base_dn,
)
if "default_gid" in attributes:
default_gid = attributes.pop("default_gid")
attributes["gidNumber"] = default_gid
if "homeDirectory" in attributes:
attributes["homeDirectory"] = attributes.get("homeDirectory").format(
firstname=user.firstname,
lastname=user.lastname,
userid=user.userid,
mail=user.mail,
display_name=user.display_name,
)
attributes.update(
{
"sn": user.lastname,
"givenName": user.firstname,
"uid": user.userid,
"userPassword": self.__hash(password),
"mail": user.mail,
}
)
if user.display_name:
attributes.update({"displayName": user.display_name})
ldap_conn.add(dn, self.object_classes, attributes)
self._set_roles(user)
self.update_user(user)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest
def modify_user(self, user: User, password=None, new_password=None):
try:
dn = user.get_attribute("DN")
if password:
ldap_conn = self.ldap.connect(dn, password)
else:
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
modifier = {}
for name, ldap_name in [
("firstname", "givenName"),
("lastname", "sn"),
("mail", "mail"),
("display_name", "displayName"),
]:
if hasattr(user, name):
modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])]
if new_password:
modifier["userPassword"] = [(MODIFY_REPLACE, [self.__hash(new_password)])]
ldap_conn.modify(dn, modifier)
self._set_roles(user)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest
def get_avatar(self, user):
self.ldap.connection.search(
self.search_dn,
"(uid={})".format(user.userid),
SUBTREE,
attributes=["jpegPhoto"],
)
r = self.ldap.connection.response[0]["attributes"]
if "jpegPhoto" in r and len(r["jpegPhoto"]) > 0:
avatar = _Avatar()
avatar.mimetype = "image/jpeg"
avatar.binary = bytearray(r["jpegPhoto"][0])
return avatar
else:
raise NotFound
def set_avatar(self, user: User, file: FileStorage):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
image_bytes = BytesIO()
try:
# Make sure converted to RGB, e.g. png support RGBA but jpeg does not
image = Image.open(file).convert("RGB")
image.save(image_bytes, format="JPEG")
except IOError:
logger.debug(f"Could not convert avatar from '{file.mimetype}' to JPEG")
raise BadRequest("Unsupported image format")
dn = user.get_attribute("DN")
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [image_bytes.getvalue()])]})
def delete_avatar(self, user):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
dn = user.get_attribute("DN")
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [])]})
def __find(self, userid, mail=None):
"""Find attributes of an user by uid or mail in LDAP"""
con = self.ldap.connection
if not con:
con = self.ldap.connect(self.root_dn, self.root_secret)
con.search(
self.search_dn,
f"(| (uid={userid})(mail={mail}))" if mail else f"(uid={userid})",
SUBTREE,
attributes=["uid", "givenName", "sn", "mail"],
)
return con.response[0]["attributes"] if len(con.response) > 0 else None
def __update(self, user, attr):
"""Update an User object with LDAP attributes"""
if attr["uid"][0] == user.userid:
user.set_attribute("DN", self.ldap.connection.response[0]["dn"])
user.firstname = attr["givenName"][0]
user.lastname = attr["sn"][0]
if attr["mail"]:
user.mail = attr["mail"][0]
if "displayName" in attr:
user.display_name = attr["displayName"][0]
userController.set_roles(user, self._get_groups(user.userid), create=True)
def __modify_role(
self,
role: Role,
new_name,
):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
try:
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_conn.search(self.group_dn, f"(cn={role.name})", SUBTREE, attributes=["cn"])
if len(ldap_conn.response) > 0:
dn = ldap_conn.response[0]["dn"]
if new_name:
ldap_conn.modify_dn(dn, f"cn={new_name}")
else:
ldap_conn.delete(dn)
except LDAPPasswordIsMandatoryError:
raise BadRequest
except LDAPBindError:
logger.debug(f"Could not bind to LDAP server", exc_info=True)
raise InternalServerError
def __hash(self, password):
if self.password_hash == "ARGON2":
from argon2 import PasswordHasher
return f"{{ARGON2}}{PasswordHasher().hash(password)}"
else:
from hashlib import pbkdf2_hmac, sha1
import base64
salt = os.urandom(16)
if self.password_hash == "PBKDF2":
rounds = 200000
password_hash = base64.b64encode(pbkdf2_hmac("sha512", password.encode("utf-8"), salt, rounds)).decode()
return f"{{PBKDF2-SHA512}}{rounds}${base64.b64encode(salt).decode()}${password_hash}"
else:
return f"{{SSHA}}{base64.b64encode(sha1(password.encode() + salt).digest() + salt).decode()}"
def _get_groups(self, uid):
groups = []
self.ldap.connection.search(
self.group_dn,
"(memberUID={})".format(uid),
SUBTREE,
attributes=["cn"],
)
groups_data = self.ldap.connection.response
for data in groups_data:
groups.append(data["attributes"]["cn"][0])
return groups
def _get_all_roles(self):
self.ldap.connection.search(
self.group_dn,
"(cn=*)",
SUBTREE,
attributes=["cn", "gidNumber", "memberUid"],
)
return self.ldap.response()
def _set_roles(self, user: User):
try:
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_roles = self._get_all_roles()
gid_numbers = sorted(ldap_roles, key=lambda i: i["attributes"]["gidNumber"], reverse=True)
gid_number = gid_numbers[0]["attributes"]["gidNumber"] + 1
for user_role in user.roles:
if user_role not in [role["attributes"]["cn"][0] for role in ldap_roles]:
ldap_conn.add(
f"cn={user_role},{self.group_dn}",
["posixGroup"],
attributes={"gidNumber": gid_number},
)
ldap_roles = self._get_all_roles()
for ldap_role in ldap_roles:
if ldap_role["attributes"]["cn"][0] in user.roles:
modify = {"memberUid": [(MODIFY_ADD, [user.userid])]}
else:
modify = {"memberUid": [(MODIFY_DELETE, [user.userid])]}
ldap_conn.modify(ldap_role["dn"], modify)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest
Plugin = PluginMetadata(id="auth_ldap", name="auth_ldap", plugin=loader)

View File

@ -0,0 +1,307 @@
"""LDAP Authentication Provider Plugin"""
import os
import ssl
from PIL import Image
from io import BytesIO
from flask_ldapconn import LDAPConn
from flask import current_app as app
from ldap3 import SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE
from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError
from werkzeug.exceptions import BadRequest, InternalServerError, NotFound
from werkzeug.datastructures import FileStorage
from flaschengeist import logger
from flaschengeist.controller import userController
from flaschengeist.models.user import User, Role, _Avatar
from flaschengeist.plugins import AuthPlugin, before_role_updated
class AuthLDAP(AuthPlugin):
def __init__(self, config):
super().__init__()
app.config.update(
LDAP_SERVER=config.get("host", "localhost"),
LDAP_PORT=config.get("port", 389),
LDAP_BINDDN=config.get("bind_dn", None),
LDAP_SECRET=config.get("secret", None),
LDAP_USE_SSL=config.get("use_ssl", False),
# That's not TLS, its dirty StartTLS on unencrypted LDAP
LDAP_USE_TLS=False,
LDAP_TLS_VERSION=ssl.PROTOCOL_TLS,
FORCE_ATTRIBUTE_VALUE_AS_LIST=True,
)
if "ca_cert" in config:
app.config["LDAP_CA_CERTS_FILE"] = config["ca_cert"]
else:
# Default is CERT_REQUIRED
app.config["LDAP_REQUIRE_CERT"] = ssl.CERT_OPTIONAL
self.ldap = LDAPConn(app)
self.base_dn = config["base_dn"]
self.search_dn = config.get("search_dn", "ou=people,{base_dn}").format(base_dn=self.base_dn)
self.group_dn = config.get("group_dn", "ou=group,{base_dn}").format(base_dn=self.base_dn)
self.password_hash = config.get("password_hash", "SSHA").upper()
self.object_classes = config.get("object_classes", ["inetOrgPerson"])
self.user_attributes: dict = config.get("user_attributes", {})
self.dn_template = config.get("dn_template")
# TODO: might not be set if modify is called
self.root_dn = config.get("root_dn", None)
self.root_secret = config.get("root_secret", None)
@before_role_updated
def _role_updated(role, new_name):
logger.debug(f"LDAP: before_role_updated called with ({role}, {new_name})")
self.__modify_role(role, new_name)
def login(self, user, password):
if not user:
return False
return self.ldap.authenticate(user.userid, password, "uid", self.base_dn)
def find_user(self, userid, mail=None):
attr = self.__find(userid, mail)
if attr is not None:
user = User(userid=attr["uid"][0])
self.__update(user, attr)
return user
def update_user(self, user):
attr = self.__find(user.userid)
self.__update(user, attr)
def create_user(self, user, password):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
try:
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
attributes = self.user_attributes.copy()
if "uidNumber" in attributes:
self.ldap.connection.search(
self.search_dn,
"(uidNumber=*)",
SUBTREE,
attributes=["uidNumber"],
)
resp = sorted(
self.ldap.response(),
key=lambda i: i["attributes"]["uidNumber"],
reverse=True,
)
attributes["uidNumber"] = resp[0]["attributes"]["uidNumber"] + 1 if resp else attributes["uidNumber"]
dn = self.dn_template.format(
user=user,
base_dn=self.base_dn,
)
if "default_gid" in attributes:
default_gid = attributes.pop("default_gid")
attributes["gidNumber"] = default_gid
if "homeDirectory" in attributes:
attributes["homeDirectory"] = attributes.get("homeDirectory").format(
firstname=user.firstname,
lastname=user.lastname,
userid=user.userid,
mail=user.mail,
display_name=user.display_name,
)
attributes.update(
{
"sn": user.lastname,
"givenName": user.firstname,
"uid": user.userid,
"userPassword": self.__hash(password),
"mail": user.mail,
}
)
if user.display_name:
attributes.update({"displayName": user.display_name})
ldap_conn.add(dn, self.object_classes, attributes)
self._set_roles(user)
self.update_user(user)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest
def modify_user(self, user: User, password=None, new_password=None):
try:
dn = user.get_attribute("DN")
if password:
ldap_conn = self.ldap.connect(dn, password)
else:
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
modifier = {}
for name, ldap_name in [
("firstname", "givenName"),
("lastname", "sn"),
("mail", "mail"),
("display_name", "displayName"),
]:
if hasattr(user, name):
modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])]
if new_password:
modifier["userPassword"] = [(MODIFY_REPLACE, [self.__hash(new_password)])]
ldap_conn.modify(dn, modifier)
self._set_roles(user)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest
def get_avatar(self, user):
self.ldap.connection.search(
self.search_dn,
"(uid={})".format(user.userid),
SUBTREE,
attributes=["jpegPhoto"],
)
r = self.ldap.connection.response[0]["attributes"]
if "jpegPhoto" in r and len(r["jpegPhoto"]) > 0:
avatar = _Avatar()
avatar.mimetype = "image/jpeg"
avatar.binary = bytearray(r["jpegPhoto"][0])
return avatar
else:
raise NotFound
def set_avatar(self, user: User, file: FileStorage):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
image_bytes = BytesIO()
try:
# Make sure converted to RGB, e.g. png support RGBA but jpeg does not
image = Image.open(file).convert("RGB")
image.save(image_bytes, format="JPEG")
except IOError:
logger.debug(f"Could not convert avatar from '{file.mimetype}' to JPEG")
raise BadRequest("Unsupported image format")
dn = user.get_attribute("DN")
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [image_bytes.getvalue()])]})
def delete_avatar(self, user):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
dn = user.get_attribute("DN")
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [])]})
def __find(self, userid, mail=None):
"""Find attributes of an user by uid or mail in LDAP"""
con = self.ldap.connection
if not con:
con = self.ldap.connect(self.root_dn, self.root_secret)
con.search(
self.search_dn,
f"(| (uid={userid})(mail={mail}))" if mail else f"(uid={userid})",
SUBTREE,
attributes=["uid", "givenName", "sn", "mail"],
)
return con.response[0]["attributes"] if len(con.response) > 0 else None
def __update(self, user, attr):
"""Update an User object with LDAP attributes"""
if attr["uid"][0] == user.userid:
user.set_attribute("DN", self.ldap.connection.response[0]["dn"])
user.firstname = attr["givenName"][0]
user.lastname = attr["sn"][0]
if attr["mail"]:
user.mail = attr["mail"][0]
if "displayName" in attr:
user.display_name = attr["displayName"][0]
userController.set_roles(user, self._get_groups(user.userid), create=True)
def __modify_role(
self,
role: Role,
new_name,
):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
try:
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_conn.search(self.group_dn, f"(cn={role.name})", SUBTREE, attributes=["cn"])
if len(ldap_conn.response) > 0:
dn = ldap_conn.response[0]["dn"]
if new_name:
ldap_conn.modify_dn(dn, f"cn={new_name}")
else:
ldap_conn.delete(dn)
except LDAPPasswordIsMandatoryError:
raise BadRequest
except LDAPBindError:
logger.debug(f"Could not bind to LDAP server", exc_info=True)
raise InternalServerError
def __hash(self, password):
if self.password_hash == "ARGON2":
from argon2 import PasswordHasher
return f"{{ARGON2}}{PasswordHasher().hash(password)}"
else:
from hashlib import pbkdf2_hmac, sha1
import base64
salt = os.urandom(16)
if self.password_hash == "PBKDF2":
rounds = 200000
password_hash = base64.b64encode(pbkdf2_hmac("sha512", password.encode("utf-8"), salt, rounds)).decode()
return f"{{PBKDF2-SHA512}}{rounds}${base64.b64encode(salt).decode()}${password_hash}"
else:
return f"{{SSHA}}{base64.b64encode(sha1(password.encode() + salt).digest() + salt).decode()}"
def _get_groups(self, uid):
groups = []
self.ldap.connection.search(
self.group_dn,
"(memberUID={})".format(uid),
SUBTREE,
attributes=["cn"],
)
groups_data = self.ldap.connection.response
for data in groups_data:
groups.append(data["attributes"]["cn"][0])
return groups
def _get_all_roles(self):
self.ldap.connection.search(
self.group_dn,
"(cn=*)",
SUBTREE,
attributes=["cn", "gidNumber", "memberUid"],
)
return self.ldap.response()
def _set_roles(self, user: User):
try:
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_roles = self._get_all_roles()
gid_numbers = sorted(ldap_roles, key=lambda i: i["attributes"]["gidNumber"], reverse=True)
gid_number = gid_numbers[0]["attributes"]["gidNumber"] + 1
for user_role in user.roles:
if user_role not in [role["attributes"]["cn"][0] for role in ldap_roles]:
ldap_conn.add(
f"cn={user_role},{self.group_dn}",
["posixGroup"],
attributes={"gidNumber": gid_number},
)
ldap_roles = self._get_all_roles()
for ldap_role in ldap_roles:
if ldap_role["attributes"]["cn"][0] in user.roles:
modify = {"memberUid": [(MODIFY_ADD, [user.userid])]}
else:
modify = {"memberUid": [(MODIFY_DELETE, [user.userid])]}
ldap_conn.modify(ldap_role["dn"], modify)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest