Compare commits

...

2 Commits

6 changed files with 641 additions and 497 deletions

82
doc/plugin.md Normal file
View File

@ -0,0 +1,82 @@
# Example on Plugin Development
## File Structure
```
- root
- flaschengeist_example
- __init__.py
- plugin.py
[- migrations]
- setup.cfg
```
In a minimal example, you would only need two python source files and
your setup configuration.
If you also use custom database tables, you would also need a directory
containing your _alembic_-migration files.
## Source code
Code is seperated in two locations, the meta data of your plugin goes into the `__init__.py` and all logic goes into the `plugin.py`, of cause you could add more files, but make sure to not depend on some external packages in the `__init__.py`.
### Project data / setuptools
For **Flaschengeist** to find your plugin, you have to
install it corretly, especially the entry points have to be set.
```ini
[metadata]
license = MIT
version = 1.0.0
name = flaschengeist-example
description = Example plugin for Flaschengeist
# ...
[options]
packages =
flaschengeist_example
install_requires =
flaschengeist == 2.0.*
[options.entry_points]
# Path to your meta data instance
flaschengeist.plugins =
example = flaschengeist_example:plugin
```
### Metadata
The `__init__.py` contains the plugin meta data.
```python
# __init__.py
from flaschengeist.plugins import PluginMetadata
plugin = PluginMetadata(
id="com.example.plugin", # unique, recommend FQN
name="Example Plugin", # Human readable name
version="1.0.0", # Optional
module="flaschengeist_example.plugin.ExamplePlugin" # module and class of your plugin
)
```
`version` is optional, if not provided the version of your distribution (set in setup.cfg) is used.
### Implementation
You plugin implementation goes into the `plugin.py`.
Here you define your Plugin class, make sure it inheritates from `flaschengeist.plugins.Plugin` or `flaschengeist.plugins.AuthPlugin` respectivly.
It will get initalized with the meta data you defined in the `__init__.py` and the configuration object.
```python
from flaschengeist.plugins import Plugin
from flask import Blueprint
bp = Blueprint(...)
class ExamplePlugin(Plugin):
blueprint = bp
...
@bp.route("/example")
def get_example():
...
```

View File

@ -1,6 +1,8 @@
import pkg_resources from flask import Blueprint
from dataclasses import asdict, dataclass, field
from werkzeug.datastructures import FileStorage from werkzeug.datastructures import FileStorage
from werkzeug.exceptions import MethodNotAllowed, NotFound from werkzeug.exceptions import MethodNotAllowed, NotFound
from flaschengeist.models.user import _Avatar, User from flaschengeist.models.user import _Avatar, User
from flaschengeist.utils.hook import HookBefore, HookAfter from flaschengeist.utils.hook import HookBefore, HookAfter
@ -43,33 +45,80 @@ Args:
""" """
class Plugin: @dataclass
"""Base class for all Plugins class PluginMetadata:
If your class uses custom models add a static property called ``models``""" """Class providing metadata of a plugin"""
blueprint = None # You have to override id: str
"""Unique ID of the plugin (Hint: FQN)"""
name: str
"""Human readable name of the plugin"""
module: str
"""Module and class of the plugin
Example:
my_pkg.my_module.plugin_class
"""
version: str = None
"""Version of the plugin
If not provided the version will be set to the
distribution version of the package providing the module"""
migrations_path = None # Override this with the location of your db migrations directory
"""Override with path to migration files, if custome db tables are used"""
def load(self, config=None) -> "Plugin":
"""Load the plugin"""
import importlib
mod, cls = self.module.rsplit(".", 1)
module = importlib.import_module(mod)
return getattr(module, cls)(self, config)
def __post_init__(self):
"""Post init to set version if not set automatically"""
if not self.version:
import pkg_resources
self.version = pkg_resources.get_distribution(self.module.split(".")[0]).version
class Plugin(PluginMetadata):
"""Base class for all Plugins
All plugins must interitate from this class for adding their logic.
This class gets filled with the meta data of the plugin automatically.
Additionally the `flask.Blueprint` can be added for API routes.
"""
blueprint: Blueprint = None
"""Override with a `flask.blueprint` if the plugin uses custom routes""" """Override with a `flask.blueprint` if the plugin uses custom routes"""
permissions = [] # You have to override
models = None
"""Module with all custom sqlalchemy models
Override this with the module containing all your sqlalchemy models module
to enable the `flaschengeist` tool to generate TS API.
"""
permissions: list[str] = []
"""Override to add custom permissions used by the plugin """Override to add custom permissions used by the plugin
A good style is to name the permissions with a prefix related to the plugin name, A good style is to name the permissions with a prefix related to the plugin name,
to prevent clashes with other plugins. E. g. instead of *delete* use *plugin_delete*. to prevent clashes with other plugins. E. g. instead of *delete* use *plugin_delete*.
""" """
id = "dev.flaschengeist.plugin" # You have to override
"""Override with the unique ID of the plugin (Hint: FQN)"""
name = "plugin" # You have to override
"""Override with human readable name of the plugin"""
models = None # You have to override
"""Override with models module"""
migrations_path = None # Override this with the location of your db migrations directory
"""Override with path to migration files, if custome db tables are used"""
def __init__(self, config=None): def __init__(self, metadata: PluginMetadata, config=None):
"""Constructor called by create_app """Constructor called by create_app
Args: Args:
config: Dict configuration containing the plugin section config: Dict configuration containing the plugin section
""" """
self.version = pkg_resources.get_distribution(self.__module__.split(".")[0]).version super().__init__(**asdict(metadata))
def install(self): def install(self):
"""Installation routine """Installation routine

View File

@ -1,177 +1,7 @@
"""Authentication plugin, provides basic routes from flaschengeist.plugins import PluginMetadata
Allow management of authentication, login, logout, etc. plugin = PluginMetadata(
""" id = "auth",
from flask import Blueprint, request, jsonify name="auth",
from werkzeug.exceptions import Forbidden, BadRequest, Unauthorized module=__name__ + ".plugin.AuthPlugin"
)
from flaschengeist import logger
from flaschengeist.plugins import Plugin
from flaschengeist.utils.HTTP import no_content, created
from flaschengeist.utils.decorators import login_required
from flaschengeist.controller import sessionController, userController
class AuthRoutePlugin(Plugin):
name = "auth"
blueprint = Blueprint(name, __name__)
@AuthRoutePlugin.blueprint.route("/auth", methods=["POST"])
def login():
"""Login in an user and create a session
Route: ``/auth`` | Method: ``POST``
POST-data: ``{userid: string, password: string}``
Returns:
A JSON object with `flaschengeist.models.user.User` and created
`flaschengeist.models.session.Session` or HTTP error
"""
logger.debug("Start log in.")
data = request.get_json()
try:
userid = str(data["userid"])
password = str(data["password"])
except (KeyError, ValueError, TypeError):
raise BadRequest("Missing parameter(s)")
logger.debug(f"search user {userid} in database")
user = userController.login_user(userid, password)
if not user:
raise Unauthorized
session = sessionController.create(user, user_agent=request.user_agent)
logger.debug(f"token is {session.token}")
logger.info(f"User {userid} logged in.")
# Lets cleanup the DB
sessionController.clear_expired()
return created(session)
@AuthRoutePlugin.blueprint.route("/auth", methods=["GET"])
@login_required()
def get_sessions(current_session, **kwargs):
"""Get all valid sessions of current user
Route: ``/auth`` | Method: ``GET``
Returns:
A JSON array of `flaschengeist.models.session.Session` or HTTP error
"""
sessions = sessionController.get_users_sessions(current_session.user_)
return jsonify(sessions)
@AuthRoutePlugin.blueprint.route("/auth/<token>", methods=["DELETE"])
@login_required()
def delete_session(token, current_session, **kwargs):
"""Delete a session aka "logout"
Route: ``/auth/<token>`` | Method: ``DELETE``
Returns:
200 Status (empty) or HTTP error
"""
logger.debug("Try to delete access token {{ {} }}".format(token))
session = sessionController.get_session(token, current_session.user_)
if not session:
logger.debug("Token not found in database!")
# Return 403 error, so that users can not bruteforce tokens
# Valid tokens from other users and invalid tokens now are looking the same
raise Forbidden
sessionController.delete_session(session)
sessionController.clear_expired()
return ""
@AuthRoutePlugin.blueprint.route("/auth/<token>", methods=["GET"])
@login_required()
def get_session(token, current_session, **kwargs):
"""Retrieve information about a session
Route: ``/auth/<token>`` | Method: ``GET``
Attributes:
token: Token identifying session to retrieve
current_session: Session sent with Authorization Header
Returns:
JSON encoded `flaschengeist.models.session.Session` or HTTP error
"""
logger.debug("get token {{ {} }}".format(token))
session = sessionController.get_session(token, current_session.user_)
if not session:
# Return 403 error, so that users can not bruteforce tokens
# Valid tokens from other users and invalid tokens now are looking the same
raise Forbidden
return jsonify(session)
@AuthRoutePlugin.blueprint.route("/auth/<token>", methods=["PUT"])
@login_required()
def set_lifetime(token, current_session, **kwargs):
"""Set lifetime of a session
Route: ``/auth/<token>`` | Method: ``PUT``
POST-data: ``{value: int}``
Attributes:
token: Token identifying the session
current_session: Session sent with Authorization Header
Returns:
HTTP-204 or HTTP error
"""
session = sessionController.get_session(token, current_session.user_)
if not session:
# Return 403 error, so that users can not bruteforce tokens
# Valid tokens from other users and invalid tokens now are looking the same
raise Forbidden
try:
lifetime = request.get_json()["value"]
logger.debug(f"set lifetime >{lifetime}< to access token >{token}<")
sessionController.set_lifetime(session, lifetime)
return jsonify(sessionController.get_session(token, current_session.user_))
except (KeyError, TypeError):
raise BadRequest
@AuthRoutePlugin.blueprint.route("/auth/<token>/user", methods=["GET"])
@login_required()
def get_assocd_user(token, current_session, **kwargs):
"""Retrieve user owning a session
Route: ``/auth/<token>/user`` | Method: ``GET``
Attributes:
token: Token identifying the session
current_session: Session sent with Authorization Header
Returns:
JSON encoded `flaschengeist.models.user.User` or HTTP error
"""
logger.debug("get token {{ {} }}".format(token))
session = sessionController.get_session(token, current_session.user_)
if not session:
# Return 403 error, so that users can not bruteforce tokens
# Valid tokens from other users and invalid tokens now are looking the same
raise Forbidden
return jsonify(session.user_)
@AuthRoutePlugin.blueprint.route("/auth/reset", methods=["POST"])
def reset_password():
data = request.get_json()
if "userid" in data:
user = userController.find_user(data["userid"])
if user:
userController.request_reset(user)
elif "password" in data and "token" in data:
userController.reset_password(data["token"], data["password"])
else:
raise BadRequest("Missing parameter(s)")
return no_content()

View File

@ -0,0 +1,176 @@
"""Authentication plugin, provides basic routes
Allow management of authentication, login, logout, etc.
"""
from flask import Blueprint, request, jsonify
from werkzeug.exceptions import Forbidden, BadRequest, Unauthorized
from flaschengeist import logger
from flaschengeist.plugins import Plugin
from flaschengeist.utils.HTTP import no_content, created
from flaschengeist.utils.decorators import login_required
from flaschengeist.controller import sessionController, userController
class AuthRoutePlugin(Plugin):
blueprint = Blueprint("auth", __name__)
@AuthRoutePlugin.blueprint.route("/auth", methods=["POST"])
def login():
"""Login in an user and create a session
Route: ``/auth`` | Method: ``POST``
POST-data: ``{userid: string, password: string}``
Returns:
A JSON object with `flaschengeist.models.user.User` and created
`flaschengeist.models.session.Session` or HTTP error
"""
logger.debug("Start log in.")
data = request.get_json()
try:
userid = str(data["userid"])
password = str(data["password"])
except (KeyError, ValueError, TypeError):
raise BadRequest("Missing parameter(s)")
logger.debug(f"search user {userid} in database")
user = userController.login_user(userid, password)
if not user:
raise Unauthorized
session = sessionController.create(user, user_agent=request.user_agent)
logger.debug(f"token is {session.token}")
logger.info(f"User {userid} logged in.")
# Lets cleanup the DB
sessionController.clear_expired()
return created(session)
@AuthRoutePlugin.blueprint.route("/auth", methods=["GET"])
@login_required()
def get_sessions(current_session, **kwargs):
"""Get all valid sessions of current user
Route: ``/auth`` | Method: ``GET``
Returns:
A JSON array of `flaschengeist.models.session.Session` or HTTP error
"""
sessions = sessionController.get_users_sessions(current_session.user_)
return jsonify(sessions)
@AuthRoutePlugin.blueprint.route("/auth/<token>", methods=["DELETE"])
@login_required()
def delete_session(token, current_session, **kwargs):
"""Delete a session aka "logout"
Route: ``/auth/<token>`` | Method: ``DELETE``
Returns:
200 Status (empty) or HTTP error
"""
logger.debug("Try to delete access token {{ {} }}".format(token))
session = sessionController.get_session(token, current_session.user_)
if not session:
logger.debug("Token not found in database!")
# Return 403 error, so that users can not bruteforce tokens
# Valid tokens from other users and invalid tokens now are looking the same
raise Forbidden
sessionController.delete_session(session)
sessionController.clear_expired()
return ""
@AuthRoutePlugin.blueprint.route("/auth/<token>", methods=["GET"])
@login_required()
def get_session(token, current_session, **kwargs):
"""Retrieve information about a session
Route: ``/auth/<token>`` | Method: ``GET``
Attributes:
token: Token identifying session to retrieve
current_session: Session sent with Authorization Header
Returns:
JSON encoded `flaschengeist.models.session.Session` or HTTP error
"""
logger.debug("get token {{ {} }}".format(token))
session = sessionController.get_session(token, current_session.user_)
if not session:
# Return 403 error, so that users can not bruteforce tokens
# Valid tokens from other users and invalid tokens now are looking the same
raise Forbidden
return jsonify(session)
@AuthRoutePlugin.blueprint.route("/auth/<token>", methods=["PUT"])
@login_required()
def set_lifetime(token, current_session, **kwargs):
"""Set lifetime of a session
Route: ``/auth/<token>`` | Method: ``PUT``
POST-data: ``{value: int}``
Attributes:
token: Token identifying the session
current_session: Session sent with Authorization Header
Returns:
HTTP-204 or HTTP error
"""
session = sessionController.get_session(token, current_session.user_)
if not session:
# Return 403 error, so that users can not bruteforce tokens
# Valid tokens from other users and invalid tokens now are looking the same
raise Forbidden
try:
lifetime = request.get_json()["value"]
logger.debug(f"set lifetime >{lifetime}< to access token >{token}<")
sessionController.set_lifetime(session, lifetime)
return jsonify(sessionController.get_session(token, current_session.user_))
except (KeyError, TypeError):
raise BadRequest
@AuthRoutePlugin.blueprint.route("/auth/<token>/user", methods=["GET"])
@login_required()
def get_assocd_user(token, current_session, **kwargs):
"""Retrieve user owning a session
Route: ``/auth/<token>/user`` | Method: ``GET``
Attributes:
token: Token identifying the session
current_session: Session sent with Authorization Header
Returns:
JSON encoded `flaschengeist.models.user.User` or HTTP error
"""
logger.debug("get token {{ {} }}".format(token))
session = sessionController.get_session(token, current_session.user_)
if not session:
# Return 403 error, so that users can not bruteforce tokens
# Valid tokens from other users and invalid tokens now are looking the same
raise Forbidden
return jsonify(session.user_)
@AuthRoutePlugin.blueprint.route("/auth/reset", methods=["POST"])
def reset_password():
data = request.get_json()
if "userid" in data:
user = userController.find_user(data["userid"])
if user:
userController.request_reset(user)
elif "password" in data and "token" in data:
userController.reset_password(data["token"], data["password"])
else:
raise BadRequest("Missing parameter(s)")
return no_content()

View File

@ -1,307 +1,7 @@
"""LDAP Authentication Provider Plugin""" from flaschengeist.plugins import PluginMetadata
import os
import ssl
from PIL import Image
from io import BytesIO
from flask_ldapconn import LDAPConn
from flask import current_app as app
from ldap3 import SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE
from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError
from werkzeug.exceptions import BadRequest, InternalServerError, NotFound
from werkzeug.datastructures import FileStorage
from flaschengeist import logger
from flaschengeist.controller import userController
from flaschengeist.models.user import User, Role, _Avatar
from flaschengeist.plugins import AuthPlugin, before_role_updated
class AuthLDAP(AuthPlugin): plugin = PluginMetadata(
def __init__(self, config): id="auth_ldap",
super().__init__()
app.config.update(
LDAP_SERVER=config.get("host", "localhost"),
LDAP_PORT=config.get("port", 389),
LDAP_BINDDN=config.get("bind_dn", None),
LDAP_SECRET=config.get("secret", None),
LDAP_USE_SSL=config.get("use_ssl", False),
# That's not TLS, its dirty StartTLS on unencrypted LDAP
LDAP_USE_TLS=False,
LDAP_TLS_VERSION=ssl.PROTOCOL_TLS,
FORCE_ATTRIBUTE_VALUE_AS_LIST=True,
)
if "ca_cert" in config:
app.config["LDAP_CA_CERTS_FILE"] = config["ca_cert"]
else:
# Default is CERT_REQUIRED
app.config["LDAP_REQUIRE_CERT"] = ssl.CERT_OPTIONAL
self.ldap = LDAPConn(app)
self.base_dn = config["base_dn"]
self.search_dn = config.get("search_dn", "ou=people,{base_dn}").format(base_dn=self.base_dn)
self.group_dn = config.get("group_dn", "ou=group,{base_dn}").format(base_dn=self.base_dn)
self.password_hash = config.get("password_hash", "SSHA").upper()
self.object_classes = config.get("object_classes", ["inetOrgPerson"])
self.user_attributes: dict = config.get("user_attributes", {})
self.dn_template = config.get("dn_template")
# TODO: might not be set if modify is called )
self.root_dn = config.get("root_dn", None)
self.root_secret = config.get("root_secret", None)
@before_role_updated
def _role_updated(role, new_name):
logger.debug(f"LDAP: before_role_updated called with ({role}, {new_name})")
self.__modify_role(role, new_name)
def login(self, user, password):
if not user:
return False
return self.ldap.authenticate(user.userid, password, "uid", self.base_dn)
def find_user(self, userid, mail=None):
attr = self.__find(userid, mail)
if attr is not None:
user = User(userid=attr["uid"][0])
self.__update(user, attr)
return user
def update_user(self, user):
attr = self.__find(user.userid)
self.__update(user, attr)
def create_user(self, user, password):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
try:
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
attributes = self.user_attributes.copy()
if "uidNumber" in attributes:
self.ldap.connection.search(
self.search_dn,
"(uidNumber=*)",
SUBTREE,
attributes=["uidNumber"],
)
resp = sorted(
self.ldap.response(),
key=lambda i: i["attributes"]["uidNumber"],
reverse=True,
)
attributes["uidNumber"] = resp[0]["attributes"]["uidNumber"] + 1 if resp else attributes["uidNumber"]
dn = self.dn_template.format(
user=user,
base_dn=self.base_dn,
)
if "default_gid" in attributes:
default_gid = attributes.pop("default_gid")
attributes["gidNumber"] = default_gid
if "homeDirectory" in attributes:
attributes["homeDirectory"] = attributes.get("homeDirectory").format(
firstname=user.firstname,
lastname=user.lastname,
userid=user.userid,
mail=user.mail,
display_name=user.display_name,
)
attributes.update(
{
"sn": user.lastname,
"givenName": user.firstname,
"uid": user.userid,
"userPassword": self.__hash(password),
"mail": user.mail,
}
)
if user.display_name:
attributes.update({"displayName": user.display_name})
ldap_conn.add(dn, self.object_classes, attributes)
self._set_roles(user)
self.update_user(user)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest
def modify_user(self, user: User, password=None, new_password=None):
try:
dn = user.get_attribute("DN")
if password:
ldap_conn = self.ldap.connect(dn, password)
else:
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
modifier = {}
for name, ldap_name in [
("firstname", "givenName"),
("lastname", "sn"),
("mail", "mail"),
("display_name", "displayName"),
]:
if hasattr(user, name):
modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])]
if new_password:
modifier["userPassword"] = [(MODIFY_REPLACE, [self.__hash(new_password)])]
ldap_conn.modify(dn, modifier)
self._set_roles(user)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest
def get_avatar(self, user):
self.ldap.connection.search(
self.search_dn,
"(uid={})".format(user.userid),
SUBTREE,
attributes=["jpegPhoto"],
)
r = self.ldap.connection.response[0]["attributes"]
if "jpegPhoto" in r and len(r["jpegPhoto"]) > 0:
avatar = _Avatar()
avatar.mimetype = "image/jpeg"
avatar.binary = bytearray(r["jpegPhoto"][0])
return avatar
else:
raise NotFound
def set_avatar(self, user: User, file: FileStorage):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
image_bytes = BytesIO()
try:
# Make sure converted to RGB, e.g. png support RGBA but jpeg does not
image = Image.open(file).convert("RGB")
image.save(image_bytes, format="JPEG")
except IOError:
logger.debug(f"Could not convert avatar from '{file.mimetype}' to JPEG")
raise BadRequest("Unsupported image format")
dn = user.get_attribute("DN")
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [image_bytes.getvalue()])]})
def delete_avatar(self, user):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
dn = user.get_attribute("DN")
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [])]})
def __find(self, userid, mail=None):
"""Find attributes of an user by uid or mail in LDAP"""
con = self.ldap.connection
if not con:
con = self.ldap.connect(self.root_dn, self.root_secret)
con.search(
self.search_dn,
f"(| (uid={userid})(mail={mail}))" if mail else f"(uid={userid})",
SUBTREE,
attributes=["uid", "givenName", "sn", "mail"],
)
return con.response[0]["attributes"] if len(con.response) > 0 else None
def __update(self, user, attr):
"""Update an User object with LDAP attributes"""
if attr["uid"][0] == user.userid:
user.set_attribute("DN", self.ldap.connection.response[0]["dn"])
user.firstname = attr["givenName"][0]
user.lastname = attr["sn"][0]
if attr["mail"]:
user.mail = attr["mail"][0]
if "displayName" in attr:
user.display_name = attr["displayName"][0]
userController.set_roles(user, self._get_groups(user.userid), create=True)
def __modify_role(
self,
role: Role,
new_name,
):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
try:
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_conn.search(self.group_dn, f"(cn={role.name})", SUBTREE, attributes=["cn"])
if len(ldap_conn.response) > 0:
dn = ldap_conn.response[0]["dn"]
if new_name:
ldap_conn.modify_dn(dn, f"cn={new_name}")
else:
ldap_conn.delete(dn)
except LDAPPasswordIsMandatoryError:
raise BadRequest
except LDAPBindError:
logger.debug(f"Could not bind to LDAP server", exc_info=True)
raise InternalServerError
def __hash(self, password):
if self.password_hash == "ARGON2":
from argon2 import PasswordHasher
return f"{{ARGON2}}{PasswordHasher().hash(password)}"
else:
from hashlib import pbkdf2_hmac, sha1
import base64
salt = os.urandom(16)
if self.password_hash == "PBKDF2":
rounds = 200000
password_hash = base64.b64encode(pbkdf2_hmac("sha512", password.encode("utf-8"), salt, rounds)).decode()
return f"{{PBKDF2-SHA512}}{rounds}${base64.b64encode(salt).decode()}${password_hash}"
else:
return f"{{SSHA}}{base64.b64encode(sha1(password.encode() + salt).digest() + salt).decode()}"
def _get_groups(self, uid):
groups = []
self.ldap.connection.search(
self.group_dn,
"(memberUID={})".format(uid),
SUBTREE,
attributes=["cn"],
)
groups_data = self.ldap.connection.response
for data in groups_data:
groups.append(data["attributes"]["cn"][0])
return groups
def _get_all_roles(self):
self.ldap.connection.search(
self.group_dn,
"(cn=*)",
SUBTREE,
attributes=["cn", "gidNumber", "memberUid"],
)
return self.ldap.response()
def _set_roles(self, user: User):
try:
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_roles = self._get_all_roles()
gid_numbers = sorted(ldap_roles, key=lambda i: i["attributes"]["gidNumber"], reverse=True)
gid_number = gid_numbers[0]["attributes"]["gidNumber"] + 1
for user_role in user.roles:
if user_role not in [role["attributes"]["cn"][0] for role in ldap_roles]:
ldap_conn.add(
f"cn={user_role},{self.group_dn}",
["posixGroup"],
attributes={"gidNumber": gid_number},
)
ldap_roles = self._get_all_roles()
for ldap_role in ldap_roles:
if ldap_role["attributes"]["cn"][0] in user.roles:
modify = {"memberUid": [(MODIFY_ADD, [user.userid])]}
else:
modify = {"memberUid": [(MODIFY_DELETE, [user.userid])]}
ldap_conn.modify(ldap_role["dn"], modify)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest

View File

@ -0,0 +1,307 @@
"""LDAP Authentication Provider Plugin"""
import os
import ssl
from PIL import Image
from io import BytesIO
from flask_ldapconn import LDAPConn
from flask import current_app as app
from ldap3 import SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE
from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError
from werkzeug.exceptions import BadRequest, InternalServerError, NotFound
from werkzeug.datastructures import FileStorage
from flaschengeist import logger
from flaschengeist.controller import userController
from flaschengeist.models.user import User, Role, _Avatar
from flaschengeist.plugins import AuthPlugin, before_role_updated
class AuthLDAP(AuthPlugin):
def __init__(self, config):
super().__init__()
app.config.update(
LDAP_SERVER=config.get("host", "localhost"),
LDAP_PORT=config.get("port", 389),
LDAP_BINDDN=config.get("bind_dn", None),
LDAP_SECRET=config.get("secret", None),
LDAP_USE_SSL=config.get("use_ssl", False),
# That's not TLS, its dirty StartTLS on unencrypted LDAP
LDAP_USE_TLS=False,
LDAP_TLS_VERSION=ssl.PROTOCOL_TLS,
FORCE_ATTRIBUTE_VALUE_AS_LIST=True,
)
if "ca_cert" in config:
app.config["LDAP_CA_CERTS_FILE"] = config["ca_cert"]
else:
# Default is CERT_REQUIRED
app.config["LDAP_REQUIRE_CERT"] = ssl.CERT_OPTIONAL
self.ldap = LDAPConn(app)
self.base_dn = config["base_dn"]
self.search_dn = config.get("search_dn", "ou=people,{base_dn}").format(base_dn=self.base_dn)
self.group_dn = config.get("group_dn", "ou=group,{base_dn}").format(base_dn=self.base_dn)
self.password_hash = config.get("password_hash", "SSHA").upper()
self.object_classes = config.get("object_classes", ["inetOrgPerson"])
self.user_attributes: dict = config.get("user_attributes", {})
self.dn_template = config.get("dn_template")
# TODO: might not be set if modify is called
self.root_dn = config.get("root_dn", None)
self.root_secret = config.get("root_secret", None)
@before_role_updated
def _role_updated(role, new_name):
logger.debug(f"LDAP: before_role_updated called with ({role}, {new_name})")
self.__modify_role(role, new_name)
def login(self, user, password):
if not user:
return False
return self.ldap.authenticate(user.userid, password, "uid", self.base_dn)
def find_user(self, userid, mail=None):
attr = self.__find(userid, mail)
if attr is not None:
user = User(userid=attr["uid"][0])
self.__update(user, attr)
return user
def update_user(self, user):
attr = self.__find(user.userid)
self.__update(user, attr)
def create_user(self, user, password):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
try:
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
attributes = self.user_attributes.copy()
if "uidNumber" in attributes:
self.ldap.connection.search(
self.search_dn,
"(uidNumber=*)",
SUBTREE,
attributes=["uidNumber"],
)
resp = sorted(
self.ldap.response(),
key=lambda i: i["attributes"]["uidNumber"],
reverse=True,
)
attributes["uidNumber"] = resp[0]["attributes"]["uidNumber"] + 1 if resp else attributes["uidNumber"]
dn = self.dn_template.format(
user=user,
base_dn=self.base_dn,
)
if "default_gid" in attributes:
default_gid = attributes.pop("default_gid")
attributes["gidNumber"] = default_gid
if "homeDirectory" in attributes:
attributes["homeDirectory"] = attributes.get("homeDirectory").format(
firstname=user.firstname,
lastname=user.lastname,
userid=user.userid,
mail=user.mail,
display_name=user.display_name,
)
attributes.update(
{
"sn": user.lastname,
"givenName": user.firstname,
"uid": user.userid,
"userPassword": self.__hash(password),
"mail": user.mail,
}
)
if user.display_name:
attributes.update({"displayName": user.display_name})
ldap_conn.add(dn, self.object_classes, attributes)
self._set_roles(user)
self.update_user(user)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest
def modify_user(self, user: User, password=None, new_password=None):
try:
dn = user.get_attribute("DN")
if password:
ldap_conn = self.ldap.connect(dn, password)
else:
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
modifier = {}
for name, ldap_name in [
("firstname", "givenName"),
("lastname", "sn"),
("mail", "mail"),
("display_name", "displayName"),
]:
if hasattr(user, name):
modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])]
if new_password:
modifier["userPassword"] = [(MODIFY_REPLACE, [self.__hash(new_password)])]
ldap_conn.modify(dn, modifier)
self._set_roles(user)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest
def get_avatar(self, user):
self.ldap.connection.search(
self.search_dn,
"(uid={})".format(user.userid),
SUBTREE,
attributes=["jpegPhoto"],
)
r = self.ldap.connection.response[0]["attributes"]
if "jpegPhoto" in r and len(r["jpegPhoto"]) > 0:
avatar = _Avatar()
avatar.mimetype = "image/jpeg"
avatar.binary = bytearray(r["jpegPhoto"][0])
return avatar
else:
raise NotFound
def set_avatar(self, user: User, file: FileStorage):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
image_bytes = BytesIO()
try:
# Make sure converted to RGB, e.g. png support RGBA but jpeg does not
image = Image.open(file).convert("RGB")
image.save(image_bytes, format="JPEG")
except IOError:
logger.debug(f"Could not convert avatar from '{file.mimetype}' to JPEG")
raise BadRequest("Unsupported image format")
dn = user.get_attribute("DN")
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [image_bytes.getvalue()])]})
def delete_avatar(self, user):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
dn = user.get_attribute("DN")
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [])]})
def __find(self, userid, mail=None):
"""Find attributes of an user by uid or mail in LDAP"""
con = self.ldap.connection
if not con:
con = self.ldap.connect(self.root_dn, self.root_secret)
con.search(
self.search_dn,
f"(| (uid={userid})(mail={mail}))" if mail else f"(uid={userid})",
SUBTREE,
attributes=["uid", "givenName", "sn", "mail"],
)
return con.response[0]["attributes"] if len(con.response) > 0 else None
def __update(self, user, attr):
"""Update an User object with LDAP attributes"""
if attr["uid"][0] == user.userid:
user.set_attribute("DN", self.ldap.connection.response[0]["dn"])
user.firstname = attr["givenName"][0]
user.lastname = attr["sn"][0]
if attr["mail"]:
user.mail = attr["mail"][0]
if "displayName" in attr:
user.display_name = attr["displayName"][0]
userController.set_roles(user, self._get_groups(user.userid), create=True)
def __modify_role(
self,
role: Role,
new_name,
):
if self.root_dn is None:
logger.error("root_dn missing in ldap config!")
raise InternalServerError
try:
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_conn.search(self.group_dn, f"(cn={role.name})", SUBTREE, attributes=["cn"])
if len(ldap_conn.response) > 0:
dn = ldap_conn.response[0]["dn"]
if new_name:
ldap_conn.modify_dn(dn, f"cn={new_name}")
else:
ldap_conn.delete(dn)
except LDAPPasswordIsMandatoryError:
raise BadRequest
except LDAPBindError:
logger.debug(f"Could not bind to LDAP server", exc_info=True)
raise InternalServerError
def __hash(self, password):
if self.password_hash == "ARGON2":
from argon2 import PasswordHasher
return f"{{ARGON2}}{PasswordHasher().hash(password)}"
else:
from hashlib import pbkdf2_hmac, sha1
import base64
salt = os.urandom(16)
if self.password_hash == "PBKDF2":
rounds = 200000
password_hash = base64.b64encode(pbkdf2_hmac("sha512", password.encode("utf-8"), salt, rounds)).decode()
return f"{{PBKDF2-SHA512}}{rounds}${base64.b64encode(salt).decode()}${password_hash}"
else:
return f"{{SSHA}}{base64.b64encode(sha1(password.encode() + salt).digest() + salt).decode()}"
def _get_groups(self, uid):
groups = []
self.ldap.connection.search(
self.group_dn,
"(memberUID={})".format(uid),
SUBTREE,
attributes=["cn"],
)
groups_data = self.ldap.connection.response
for data in groups_data:
groups.append(data["attributes"]["cn"][0])
return groups
def _get_all_roles(self):
self.ldap.connection.search(
self.group_dn,
"(cn=*)",
SUBTREE,
attributes=["cn", "gidNumber", "memberUid"],
)
return self.ldap.response()
def _set_roles(self, user: User):
try:
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
ldap_roles = self._get_all_roles()
gid_numbers = sorted(ldap_roles, key=lambda i: i["attributes"]["gidNumber"], reverse=True)
gid_number = gid_numbers[0]["attributes"]["gidNumber"] + 1
for user_role in user.roles:
if user_role not in [role["attributes"]["cn"][0] for role in ldap_roles]:
ldap_conn.add(
f"cn={user_role},{self.group_dn}",
["posixGroup"],
attributes={"gidNumber": gid_number},
)
ldap_roles = self._get_all_roles()
for ldap_role in ldap_roles:
if ldap_role["attributes"]["cn"][0] in user.roles:
modify = {"memberUid": [(MODIFY_ADD, [user.userid])]}
else:
modify = {"memberUid": [(MODIFY_DELETE, [user.userid])]}
ldap_conn.modify(ldap_role["dn"], modify)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest