Compare commits
2 Commits
main
...
proposal/p
Author | SHA1 | Date |
---|---|---|
Ferdinand Thiessen | ccb1f9b005 | |
Ferdinand Thiessen | e4c552af01 |
|
@ -0,0 +1,82 @@
|
|||
# Example on Plugin Development
|
||||
|
||||
## File Structure
|
||||
```
|
||||
- root
|
||||
- flaschengeist_example
|
||||
- __init__.py
|
||||
- plugin.py
|
||||
[- migrations]
|
||||
- setup.cfg
|
||||
```
|
||||
|
||||
In a minimal example, you would only need two python source files and
|
||||
your setup configuration.
|
||||
If you also use custom database tables, you would also need a directory
|
||||
containing your _alembic_-migration files.
|
||||
|
||||
## Source code
|
||||
Code is seperated in two locations, the meta data of your plugin goes into the `__init__.py` and all logic goes into the `plugin.py`, of cause you could add more files, but make sure to not depend on some external packages in the `__init__.py`.
|
||||
|
||||
### Project data / setuptools
|
||||
For **Flaschengeist** to find your plugin, you have to
|
||||
install it corretly, especially the entry points have to be set.
|
||||
```ini
|
||||
[metadata]
|
||||
license = MIT
|
||||
version = 1.0.0
|
||||
name = flaschengeist-example
|
||||
description = Example plugin for Flaschengeist
|
||||
# ...
|
||||
|
||||
[options]
|
||||
packages =
|
||||
flaschengeist_example
|
||||
install_requires =
|
||||
flaschengeist == 2.0.*
|
||||
|
||||
[options.entry_points]
|
||||
# Path to your meta data instance
|
||||
flaschengeist.plugins =
|
||||
example = flaschengeist_example:plugin
|
||||
|
||||
```
|
||||
|
||||
|
||||
### Metadata
|
||||
The `__init__.py` contains the plugin meta data.
|
||||
|
||||
```python
|
||||
# __init__.py
|
||||
from flaschengeist.plugins import PluginMetadata
|
||||
plugin = PluginMetadata(
|
||||
id="com.example.plugin", # unique, recommend FQN
|
||||
name="Example Plugin", # Human readable name
|
||||
version="1.0.0", # Optional
|
||||
module="flaschengeist_example.plugin.ExamplePlugin" # module and class of your plugin
|
||||
)
|
||||
```
|
||||
|
||||
`version` is optional, if not provided the version of your distribution (set in setup.cfg) is used.
|
||||
|
||||
### Implementation
|
||||
You plugin implementation goes into the `plugin.py`.
|
||||
Here you define your Plugin class, make sure it inheritates from `flaschengeist.plugins.Plugin` or `flaschengeist.plugins.AuthPlugin` respectivly.
|
||||
|
||||
It will get initalized with the meta data you defined in the `__init__.py` and the configuration object.
|
||||
|
||||
```python
|
||||
from flaschengeist.plugins import Plugin
|
||||
from flask import Blueprint
|
||||
|
||||
bp = Blueprint(...)
|
||||
|
||||
class ExamplePlugin(Plugin):
|
||||
blueprint = bp
|
||||
...
|
||||
|
||||
@bp.route("/example")
|
||||
def get_example():
|
||||
...
|
||||
|
||||
```
|
|
@ -1,6 +1,8 @@
|
|||
import pkg_resources
|
||||
from flask import Blueprint
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from werkzeug.datastructures import FileStorage
|
||||
from werkzeug.exceptions import MethodNotAllowed, NotFound
|
||||
|
||||
from flaschengeist.models.user import _Avatar, User
|
||||
from flaschengeist.utils.hook import HookBefore, HookAfter
|
||||
|
||||
|
@ -43,33 +45,80 @@ Args:
|
|||
"""
|
||||
|
||||
|
||||
class Plugin:
|
||||
"""Base class for all Plugins
|
||||
If your class uses custom models add a static property called ``models``"""
|
||||
@dataclass
|
||||
class PluginMetadata:
|
||||
"""Class providing metadata of a plugin"""
|
||||
|
||||
blueprint = None # You have to override
|
||||
id: str
|
||||
"""Unique ID of the plugin (Hint: FQN)"""
|
||||
|
||||
name: str
|
||||
"""Human readable name of the plugin"""
|
||||
|
||||
module: str
|
||||
"""Module and class of the plugin
|
||||
|
||||
Example:
|
||||
my_pkg.my_module.plugin_class
|
||||
"""
|
||||
|
||||
version: str = None
|
||||
"""Version of the plugin
|
||||
|
||||
If not provided the version will be set to the
|
||||
distribution version of the package providing the module"""
|
||||
|
||||
migrations_path = None # Override this with the location of your db migrations directory
|
||||
"""Override with path to migration files, if custome db tables are used"""
|
||||
|
||||
def load(self, config=None) -> "Plugin":
|
||||
"""Load the plugin"""
|
||||
import importlib
|
||||
|
||||
mod, cls = self.module.rsplit(".", 1)
|
||||
module = importlib.import_module(mod)
|
||||
return getattr(module, cls)(self, config)
|
||||
|
||||
def __post_init__(self):
|
||||
"""Post init to set version if not set automatically"""
|
||||
if not self.version:
|
||||
import pkg_resources
|
||||
|
||||
self.version = pkg_resources.get_distribution(self.module.split(".")[0]).version
|
||||
|
||||
|
||||
class Plugin(PluginMetadata):
|
||||
"""Base class for all Plugins
|
||||
|
||||
All plugins must interitate from this class for adding their logic.
|
||||
This class gets filled with the meta data of the plugin automatically.
|
||||
|
||||
Additionally the `flask.Blueprint` can be added for API routes.
|
||||
"""
|
||||
|
||||
blueprint: Blueprint = None
|
||||
"""Override with a `flask.blueprint` if the plugin uses custom routes"""
|
||||
permissions = [] # You have to override
|
||||
|
||||
models = None
|
||||
"""Module with all custom sqlalchemy models
|
||||
|
||||
Override this with the module containing all your sqlalchemy models module
|
||||
to enable the `flaschengeist` tool to generate TS API.
|
||||
"""
|
||||
|
||||
permissions: list[str] = []
|
||||
"""Override to add custom permissions used by the plugin
|
||||
|
||||
A good style is to name the permissions with a prefix related to the plugin name,
|
||||
to prevent clashes with other plugins. E. g. instead of *delete* use *plugin_delete*.
|
||||
"""
|
||||
id = "dev.flaschengeist.plugin" # You have to override
|
||||
"""Override with the unique ID of the plugin (Hint: FQN)"""
|
||||
name = "plugin" # You have to override
|
||||
"""Override with human readable name of the plugin"""
|
||||
models = None # You have to override
|
||||
"""Override with models module"""
|
||||
migrations_path = None # Override this with the location of your db migrations directory
|
||||
"""Override with path to migration files, if custome db tables are used"""
|
||||
|
||||
def __init__(self, config=None):
|
||||
def __init__(self, metadata: PluginMetadata, config=None):
|
||||
"""Constructor called by create_app
|
||||
Args:
|
||||
config: Dict configuration containing the plugin section
|
||||
"""
|
||||
self.version = pkg_resources.get_distribution(self.__module__.split(".")[0]).version
|
||||
super().__init__(**asdict(metadata))
|
||||
|
||||
def install(self):
|
||||
"""Installation routine
|
||||
|
|
|
@ -1,177 +1,7 @@
|
|||
"""Authentication plugin, provides basic routes
|
||||
from flaschengeist.plugins import PluginMetadata
|
||||
|
||||
Allow management of authentication, login, logout, etc.
|
||||
"""
|
||||
from flask import Blueprint, request, jsonify
|
||||
from werkzeug.exceptions import Forbidden, BadRequest, Unauthorized
|
||||
|
||||
from flaschengeist import logger
|
||||
from flaschengeist.plugins import Plugin
|
||||
from flaschengeist.utils.HTTP import no_content, created
|
||||
from flaschengeist.utils.decorators import login_required
|
||||
from flaschengeist.controller import sessionController, userController
|
||||
|
||||
|
||||
class AuthRoutePlugin(Plugin):
|
||||
name = "auth"
|
||||
blueprint = Blueprint(name, __name__)
|
||||
|
||||
|
||||
@AuthRoutePlugin.blueprint.route("/auth", methods=["POST"])
|
||||
def login():
|
||||
"""Login in an user and create a session
|
||||
|
||||
Route: ``/auth`` | Method: ``POST``
|
||||
|
||||
POST-data: ``{userid: string, password: string}``
|
||||
|
||||
Returns:
|
||||
A JSON object with `flaschengeist.models.user.User` and created
|
||||
`flaschengeist.models.session.Session` or HTTP error
|
||||
"""
|
||||
logger.debug("Start log in.")
|
||||
data = request.get_json()
|
||||
try:
|
||||
userid = str(data["userid"])
|
||||
password = str(data["password"])
|
||||
except (KeyError, ValueError, TypeError):
|
||||
raise BadRequest("Missing parameter(s)")
|
||||
|
||||
logger.debug(f"search user {userid} in database")
|
||||
user = userController.login_user(userid, password)
|
||||
if not user:
|
||||
raise Unauthorized
|
||||
session = sessionController.create(user, user_agent=request.user_agent)
|
||||
logger.debug(f"token is {session.token}")
|
||||
logger.info(f"User {userid} logged in.")
|
||||
|
||||
# Lets cleanup the DB
|
||||
sessionController.clear_expired()
|
||||
return created(session)
|
||||
|
||||
|
||||
@AuthRoutePlugin.blueprint.route("/auth", methods=["GET"])
|
||||
@login_required()
|
||||
def get_sessions(current_session, **kwargs):
|
||||
"""Get all valid sessions of current user
|
||||
|
||||
Route: ``/auth`` | Method: ``GET``
|
||||
|
||||
Returns:
|
||||
A JSON array of `flaschengeist.models.session.Session` or HTTP error
|
||||
"""
|
||||
sessions = sessionController.get_users_sessions(current_session.user_)
|
||||
return jsonify(sessions)
|
||||
|
||||
|
||||
@AuthRoutePlugin.blueprint.route("/auth/<token>", methods=["DELETE"])
|
||||
@login_required()
|
||||
def delete_session(token, current_session, **kwargs):
|
||||
"""Delete a session aka "logout"
|
||||
|
||||
Route: ``/auth/<token>`` | Method: ``DELETE``
|
||||
|
||||
Returns:
|
||||
200 Status (empty) or HTTP error
|
||||
"""
|
||||
logger.debug("Try to delete access token {{ {} }}".format(token))
|
||||
session = sessionController.get_session(token, current_session.user_)
|
||||
if not session:
|
||||
logger.debug("Token not found in database!")
|
||||
# Return 403 error, so that users can not bruteforce tokens
|
||||
# Valid tokens from other users and invalid tokens now are looking the same
|
||||
raise Forbidden
|
||||
sessionController.delete_session(session)
|
||||
sessionController.clear_expired()
|
||||
return ""
|
||||
|
||||
|
||||
@AuthRoutePlugin.blueprint.route("/auth/<token>", methods=["GET"])
|
||||
@login_required()
|
||||
def get_session(token, current_session, **kwargs):
|
||||
"""Retrieve information about a session
|
||||
|
||||
Route: ``/auth/<token>`` | Method: ``GET``
|
||||
|
||||
Attributes:
|
||||
token: Token identifying session to retrieve
|
||||
current_session: Session sent with Authorization Header
|
||||
|
||||
Returns:
|
||||
JSON encoded `flaschengeist.models.session.Session` or HTTP error
|
||||
"""
|
||||
logger.debug("get token {{ {} }}".format(token))
|
||||
session = sessionController.get_session(token, current_session.user_)
|
||||
if not session:
|
||||
# Return 403 error, so that users can not bruteforce tokens
|
||||
# Valid tokens from other users and invalid tokens now are looking the same
|
||||
raise Forbidden
|
||||
return jsonify(session)
|
||||
|
||||
|
||||
@AuthRoutePlugin.blueprint.route("/auth/<token>", methods=["PUT"])
|
||||
@login_required()
|
||||
def set_lifetime(token, current_session, **kwargs):
|
||||
"""Set lifetime of a session
|
||||
|
||||
Route: ``/auth/<token>`` | Method: ``PUT``
|
||||
|
||||
POST-data: ``{value: int}``
|
||||
|
||||
Attributes:
|
||||
token: Token identifying the session
|
||||
current_session: Session sent with Authorization Header
|
||||
|
||||
Returns:
|
||||
HTTP-204 or HTTP error
|
||||
"""
|
||||
session = sessionController.get_session(token, current_session.user_)
|
||||
if not session:
|
||||
# Return 403 error, so that users can not bruteforce tokens
|
||||
# Valid tokens from other users and invalid tokens now are looking the same
|
||||
raise Forbidden
|
||||
try:
|
||||
lifetime = request.get_json()["value"]
|
||||
logger.debug(f"set lifetime >{lifetime}< to access token >{token}<")
|
||||
sessionController.set_lifetime(session, lifetime)
|
||||
return jsonify(sessionController.get_session(token, current_session.user_))
|
||||
except (KeyError, TypeError):
|
||||
raise BadRequest
|
||||
|
||||
|
||||
@AuthRoutePlugin.blueprint.route("/auth/<token>/user", methods=["GET"])
|
||||
@login_required()
|
||||
def get_assocd_user(token, current_session, **kwargs):
|
||||
"""Retrieve user owning a session
|
||||
|
||||
Route: ``/auth/<token>/user`` | Method: ``GET``
|
||||
|
||||
Attributes:
|
||||
token: Token identifying the session
|
||||
current_session: Session sent with Authorization Header
|
||||
|
||||
Returns:
|
||||
JSON encoded `flaschengeist.models.user.User` or HTTP error
|
||||
"""
|
||||
logger.debug("get token {{ {} }}".format(token))
|
||||
session = sessionController.get_session(token, current_session.user_)
|
||||
if not session:
|
||||
# Return 403 error, so that users can not bruteforce tokens
|
||||
# Valid tokens from other users and invalid tokens now are looking the same
|
||||
raise Forbidden
|
||||
return jsonify(session.user_)
|
||||
|
||||
|
||||
@AuthRoutePlugin.blueprint.route("/auth/reset", methods=["POST"])
|
||||
def reset_password():
|
||||
data = request.get_json()
|
||||
if "userid" in data:
|
||||
user = userController.find_user(data["userid"])
|
||||
if user:
|
||||
userController.request_reset(user)
|
||||
elif "password" in data and "token" in data:
|
||||
userController.reset_password(data["token"], data["password"])
|
||||
else:
|
||||
raise BadRequest("Missing parameter(s)")
|
||||
|
||||
return no_content()
|
||||
plugin = PluginMetadata(
|
||||
id = "auth",
|
||||
name="auth",
|
||||
module=__name__ + ".plugin.AuthPlugin"
|
||||
)
|
|
@ -0,0 +1,176 @@
|
|||
"""Authentication plugin, provides basic routes
|
||||
|
||||
Allow management of authentication, login, logout, etc.
|
||||
"""
|
||||
from flask import Blueprint, request, jsonify
|
||||
from werkzeug.exceptions import Forbidden, BadRequest, Unauthorized
|
||||
|
||||
from flaschengeist import logger
|
||||
from flaschengeist.plugins import Plugin
|
||||
from flaschengeist.utils.HTTP import no_content, created
|
||||
from flaschengeist.utils.decorators import login_required
|
||||
from flaschengeist.controller import sessionController, userController
|
||||
|
||||
|
||||
class AuthRoutePlugin(Plugin):
|
||||
blueprint = Blueprint("auth", __name__)
|
||||
|
||||
|
||||
@AuthRoutePlugin.blueprint.route("/auth", methods=["POST"])
|
||||
def login():
|
||||
"""Login in an user and create a session
|
||||
|
||||
Route: ``/auth`` | Method: ``POST``
|
||||
|
||||
POST-data: ``{userid: string, password: string}``
|
||||
|
||||
Returns:
|
||||
A JSON object with `flaschengeist.models.user.User` and created
|
||||
`flaschengeist.models.session.Session` or HTTP error
|
||||
"""
|
||||
logger.debug("Start log in.")
|
||||
data = request.get_json()
|
||||
try:
|
||||
userid = str(data["userid"])
|
||||
password = str(data["password"])
|
||||
except (KeyError, ValueError, TypeError):
|
||||
raise BadRequest("Missing parameter(s)")
|
||||
|
||||
logger.debug(f"search user {userid} in database")
|
||||
user = userController.login_user(userid, password)
|
||||
if not user:
|
||||
raise Unauthorized
|
||||
session = sessionController.create(user, user_agent=request.user_agent)
|
||||
logger.debug(f"token is {session.token}")
|
||||
logger.info(f"User {userid} logged in.")
|
||||
|
||||
# Lets cleanup the DB
|
||||
sessionController.clear_expired()
|
||||
return created(session)
|
||||
|
||||
|
||||
@AuthRoutePlugin.blueprint.route("/auth", methods=["GET"])
|
||||
@login_required()
|
||||
def get_sessions(current_session, **kwargs):
|
||||
"""Get all valid sessions of current user
|
||||
|
||||
Route: ``/auth`` | Method: ``GET``
|
||||
|
||||
Returns:
|
||||
A JSON array of `flaschengeist.models.session.Session` or HTTP error
|
||||
"""
|
||||
sessions = sessionController.get_users_sessions(current_session.user_)
|
||||
return jsonify(sessions)
|
||||
|
||||
|
||||
@AuthRoutePlugin.blueprint.route("/auth/<token>", methods=["DELETE"])
|
||||
@login_required()
|
||||
def delete_session(token, current_session, **kwargs):
|
||||
"""Delete a session aka "logout"
|
||||
|
||||
Route: ``/auth/<token>`` | Method: ``DELETE``
|
||||
|
||||
Returns:
|
||||
200 Status (empty) or HTTP error
|
||||
"""
|
||||
logger.debug("Try to delete access token {{ {} }}".format(token))
|
||||
session = sessionController.get_session(token, current_session.user_)
|
||||
if not session:
|
||||
logger.debug("Token not found in database!")
|
||||
# Return 403 error, so that users can not bruteforce tokens
|
||||
# Valid tokens from other users and invalid tokens now are looking the same
|
||||
raise Forbidden
|
||||
sessionController.delete_session(session)
|
||||
sessionController.clear_expired()
|
||||
return ""
|
||||
|
||||
|
||||
@AuthRoutePlugin.blueprint.route("/auth/<token>", methods=["GET"])
|
||||
@login_required()
|
||||
def get_session(token, current_session, **kwargs):
|
||||
"""Retrieve information about a session
|
||||
|
||||
Route: ``/auth/<token>`` | Method: ``GET``
|
||||
|
||||
Attributes:
|
||||
token: Token identifying session to retrieve
|
||||
current_session: Session sent with Authorization Header
|
||||
|
||||
Returns:
|
||||
JSON encoded `flaschengeist.models.session.Session` or HTTP error
|
||||
"""
|
||||
logger.debug("get token {{ {} }}".format(token))
|
||||
session = sessionController.get_session(token, current_session.user_)
|
||||
if not session:
|
||||
# Return 403 error, so that users can not bruteforce tokens
|
||||
# Valid tokens from other users and invalid tokens now are looking the same
|
||||
raise Forbidden
|
||||
return jsonify(session)
|
||||
|
||||
|
||||
@AuthRoutePlugin.blueprint.route("/auth/<token>", methods=["PUT"])
|
||||
@login_required()
|
||||
def set_lifetime(token, current_session, **kwargs):
|
||||
"""Set lifetime of a session
|
||||
|
||||
Route: ``/auth/<token>`` | Method: ``PUT``
|
||||
|
||||
POST-data: ``{value: int}``
|
||||
|
||||
Attributes:
|
||||
token: Token identifying the session
|
||||
current_session: Session sent with Authorization Header
|
||||
|
||||
Returns:
|
||||
HTTP-204 or HTTP error
|
||||
"""
|
||||
session = sessionController.get_session(token, current_session.user_)
|
||||
if not session:
|
||||
# Return 403 error, so that users can not bruteforce tokens
|
||||
# Valid tokens from other users and invalid tokens now are looking the same
|
||||
raise Forbidden
|
||||
try:
|
||||
lifetime = request.get_json()["value"]
|
||||
logger.debug(f"set lifetime >{lifetime}< to access token >{token}<")
|
||||
sessionController.set_lifetime(session, lifetime)
|
||||
return jsonify(sessionController.get_session(token, current_session.user_))
|
||||
except (KeyError, TypeError):
|
||||
raise BadRequest
|
||||
|
||||
|
||||
@AuthRoutePlugin.blueprint.route("/auth/<token>/user", methods=["GET"])
|
||||
@login_required()
|
||||
def get_assocd_user(token, current_session, **kwargs):
|
||||
"""Retrieve user owning a session
|
||||
|
||||
Route: ``/auth/<token>/user`` | Method: ``GET``
|
||||
|
||||
Attributes:
|
||||
token: Token identifying the session
|
||||
current_session: Session sent with Authorization Header
|
||||
|
||||
Returns:
|
||||
JSON encoded `flaschengeist.models.user.User` or HTTP error
|
||||
"""
|
||||
logger.debug("get token {{ {} }}".format(token))
|
||||
session = sessionController.get_session(token, current_session.user_)
|
||||
if not session:
|
||||
# Return 403 error, so that users can not bruteforce tokens
|
||||
# Valid tokens from other users and invalid tokens now are looking the same
|
||||
raise Forbidden
|
||||
return jsonify(session.user_)
|
||||
|
||||
|
||||
@AuthRoutePlugin.blueprint.route("/auth/reset", methods=["POST"])
|
||||
def reset_password():
|
||||
data = request.get_json()
|
||||
if "userid" in data:
|
||||
user = userController.find_user(data["userid"])
|
||||
if user:
|
||||
userController.request_reset(user)
|
||||
elif "password" in data and "token" in data:
|
||||
userController.reset_password(data["token"], data["password"])
|
||||
else:
|
||||
raise BadRequest("Missing parameter(s)")
|
||||
|
||||
return no_content()
|
|
@ -1,307 +1,7 @@
|
|||
"""LDAP Authentication Provider Plugin"""
|
||||
import os
|
||||
import ssl
|
||||
from PIL import Image
|
||||
from io import BytesIO
|
||||
from flask_ldapconn import LDAPConn
|
||||
from flask import current_app as app
|
||||
from ldap3 import SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE
|
||||
from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError
|
||||
from werkzeug.exceptions import BadRequest, InternalServerError, NotFound
|
||||
from werkzeug.datastructures import FileStorage
|
||||
|
||||
from flaschengeist import logger
|
||||
from flaschengeist.controller import userController
|
||||
from flaschengeist.models.user import User, Role, _Avatar
|
||||
from flaschengeist.plugins import AuthPlugin, before_role_updated
|
||||
from flaschengeist.plugins import PluginMetadata
|
||||
|
||||
|
||||
class AuthLDAP(AuthPlugin):
|
||||
def __init__(self, config):
|
||||
super().__init__()
|
||||
app.config.update(
|
||||
LDAP_SERVER=config.get("host", "localhost"),
|
||||
LDAP_PORT=config.get("port", 389),
|
||||
LDAP_BINDDN=config.get("bind_dn", None),
|
||||
LDAP_SECRET=config.get("secret", None),
|
||||
LDAP_USE_SSL=config.get("use_ssl", False),
|
||||
# That's not TLS, its dirty StartTLS on unencrypted LDAP
|
||||
LDAP_USE_TLS=False,
|
||||
LDAP_TLS_VERSION=ssl.PROTOCOL_TLS,
|
||||
FORCE_ATTRIBUTE_VALUE_AS_LIST=True,
|
||||
)
|
||||
if "ca_cert" in config:
|
||||
app.config["LDAP_CA_CERTS_FILE"] = config["ca_cert"]
|
||||
else:
|
||||
# Default is CERT_REQUIRED
|
||||
app.config["LDAP_REQUIRE_CERT"] = ssl.CERT_OPTIONAL
|
||||
self.ldap = LDAPConn(app)
|
||||
self.base_dn = config["base_dn"]
|
||||
self.search_dn = config.get("search_dn", "ou=people,{base_dn}").format(base_dn=self.base_dn)
|
||||
self.group_dn = config.get("group_dn", "ou=group,{base_dn}").format(base_dn=self.base_dn)
|
||||
self.password_hash = config.get("password_hash", "SSHA").upper()
|
||||
self.object_classes = config.get("object_classes", ["inetOrgPerson"])
|
||||
self.user_attributes: dict = config.get("user_attributes", {})
|
||||
self.dn_template = config.get("dn_template")
|
||||
|
||||
# TODO: might not be set if modify is called
|
||||
self.root_dn = config.get("root_dn", None)
|
||||
self.root_secret = config.get("root_secret", None)
|
||||
|
||||
@before_role_updated
|
||||
def _role_updated(role, new_name):
|
||||
logger.debug(f"LDAP: before_role_updated called with ({role}, {new_name})")
|
||||
self.__modify_role(role, new_name)
|
||||
|
||||
def login(self, user, password):
|
||||
if not user:
|
||||
return False
|
||||
return self.ldap.authenticate(user.userid, password, "uid", self.base_dn)
|
||||
|
||||
def find_user(self, userid, mail=None):
|
||||
attr = self.__find(userid, mail)
|
||||
if attr is not None:
|
||||
user = User(userid=attr["uid"][0])
|
||||
self.__update(user, attr)
|
||||
return user
|
||||
|
||||
def update_user(self, user):
|
||||
attr = self.__find(user.userid)
|
||||
self.__update(user, attr)
|
||||
|
||||
def create_user(self, user, password):
|
||||
if self.root_dn is None:
|
||||
logger.error("root_dn missing in ldap config!")
|
||||
raise InternalServerError
|
||||
|
||||
try:
|
||||
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
|
||||
attributes = self.user_attributes.copy()
|
||||
if "uidNumber" in attributes:
|
||||
self.ldap.connection.search(
|
||||
self.search_dn,
|
||||
"(uidNumber=*)",
|
||||
SUBTREE,
|
||||
attributes=["uidNumber"],
|
||||
)
|
||||
resp = sorted(
|
||||
self.ldap.response(),
|
||||
key=lambda i: i["attributes"]["uidNumber"],
|
||||
reverse=True,
|
||||
)
|
||||
attributes["uidNumber"] = resp[0]["attributes"]["uidNumber"] + 1 if resp else attributes["uidNumber"]
|
||||
dn = self.dn_template.format(
|
||||
user=user,
|
||||
base_dn=self.base_dn,
|
||||
)
|
||||
if "default_gid" in attributes:
|
||||
default_gid = attributes.pop("default_gid")
|
||||
attributes["gidNumber"] = default_gid
|
||||
if "homeDirectory" in attributes:
|
||||
attributes["homeDirectory"] = attributes.get("homeDirectory").format(
|
||||
firstname=user.firstname,
|
||||
lastname=user.lastname,
|
||||
userid=user.userid,
|
||||
mail=user.mail,
|
||||
display_name=user.display_name,
|
||||
)
|
||||
attributes.update(
|
||||
{
|
||||
"sn": user.lastname,
|
||||
"givenName": user.firstname,
|
||||
"uid": user.userid,
|
||||
"userPassword": self.__hash(password),
|
||||
"mail": user.mail,
|
||||
}
|
||||
)
|
||||
if user.display_name:
|
||||
attributes.update({"displayName": user.display_name})
|
||||
ldap_conn.add(dn, self.object_classes, attributes)
|
||||
self._set_roles(user)
|
||||
self.update_user(user)
|
||||
except (LDAPPasswordIsMandatoryError, LDAPBindError):
|
||||
raise BadRequest
|
||||
|
||||
def modify_user(self, user: User, password=None, new_password=None):
|
||||
try:
|
||||
dn = user.get_attribute("DN")
|
||||
if password:
|
||||
ldap_conn = self.ldap.connect(dn, password)
|
||||
else:
|
||||
if self.root_dn is None:
|
||||
logger.error("root_dn missing in ldap config!")
|
||||
raise InternalServerError
|
||||
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
|
||||
modifier = {}
|
||||
for name, ldap_name in [
|
||||
("firstname", "givenName"),
|
||||
("lastname", "sn"),
|
||||
("mail", "mail"),
|
||||
("display_name", "displayName"),
|
||||
]:
|
||||
if hasattr(user, name):
|
||||
modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])]
|
||||
if new_password:
|
||||
modifier["userPassword"] = [(MODIFY_REPLACE, [self.__hash(new_password)])]
|
||||
ldap_conn.modify(dn, modifier)
|
||||
self._set_roles(user)
|
||||
except (LDAPPasswordIsMandatoryError, LDAPBindError):
|
||||
raise BadRequest
|
||||
|
||||
def get_avatar(self, user):
|
||||
self.ldap.connection.search(
|
||||
self.search_dn,
|
||||
"(uid={})".format(user.userid),
|
||||
SUBTREE,
|
||||
attributes=["jpegPhoto"],
|
||||
)
|
||||
r = self.ldap.connection.response[0]["attributes"]
|
||||
|
||||
if "jpegPhoto" in r and len(r["jpegPhoto"]) > 0:
|
||||
avatar = _Avatar()
|
||||
avatar.mimetype = "image/jpeg"
|
||||
avatar.binary = bytearray(r["jpegPhoto"][0])
|
||||
return avatar
|
||||
else:
|
||||
raise NotFound
|
||||
|
||||
def set_avatar(self, user: User, file: FileStorage):
|
||||
if self.root_dn is None:
|
||||
logger.error("root_dn missing in ldap config!")
|
||||
raise InternalServerError
|
||||
|
||||
image_bytes = BytesIO()
|
||||
try:
|
||||
# Make sure converted to RGB, e.g. png support RGBA but jpeg does not
|
||||
image = Image.open(file).convert("RGB")
|
||||
image.save(image_bytes, format="JPEG")
|
||||
except IOError:
|
||||
logger.debug(f"Could not convert avatar from '{file.mimetype}' to JPEG")
|
||||
raise BadRequest("Unsupported image format")
|
||||
|
||||
dn = user.get_attribute("DN")
|
||||
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
|
||||
ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [image_bytes.getvalue()])]})
|
||||
|
||||
def delete_avatar(self, user):
|
||||
if self.root_dn is None:
|
||||
logger.error("root_dn missing in ldap config!")
|
||||
dn = user.get_attribute("DN")
|
||||
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
|
||||
ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [])]})
|
||||
|
||||
def __find(self, userid, mail=None):
|
||||
"""Find attributes of an user by uid or mail in LDAP"""
|
||||
con = self.ldap.connection
|
||||
if not con:
|
||||
con = self.ldap.connect(self.root_dn, self.root_secret)
|
||||
con.search(
|
||||
self.search_dn,
|
||||
f"(| (uid={userid})(mail={mail}))" if mail else f"(uid={userid})",
|
||||
SUBTREE,
|
||||
attributes=["uid", "givenName", "sn", "mail"],
|
||||
)
|
||||
return con.response[0]["attributes"] if len(con.response) > 0 else None
|
||||
|
||||
def __update(self, user, attr):
|
||||
"""Update an User object with LDAP attributes"""
|
||||
if attr["uid"][0] == user.userid:
|
||||
user.set_attribute("DN", self.ldap.connection.response[0]["dn"])
|
||||
user.firstname = attr["givenName"][0]
|
||||
user.lastname = attr["sn"][0]
|
||||
if attr["mail"]:
|
||||
user.mail = attr["mail"][0]
|
||||
if "displayName" in attr:
|
||||
user.display_name = attr["displayName"][0]
|
||||
userController.set_roles(user, self._get_groups(user.userid), create=True)
|
||||
|
||||
def __modify_role(
|
||||
self,
|
||||
role: Role,
|
||||
new_name,
|
||||
):
|
||||
if self.root_dn is None:
|
||||
logger.error("root_dn missing in ldap config!")
|
||||
raise InternalServerError
|
||||
try:
|
||||
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
|
||||
ldap_conn.search(self.group_dn, f"(cn={role.name})", SUBTREE, attributes=["cn"])
|
||||
if len(ldap_conn.response) > 0:
|
||||
dn = ldap_conn.response[0]["dn"]
|
||||
if new_name:
|
||||
ldap_conn.modify_dn(dn, f"cn={new_name}")
|
||||
else:
|
||||
ldap_conn.delete(dn)
|
||||
|
||||
except LDAPPasswordIsMandatoryError:
|
||||
raise BadRequest
|
||||
except LDAPBindError:
|
||||
logger.debug(f"Could not bind to LDAP server", exc_info=True)
|
||||
raise InternalServerError
|
||||
|
||||
def __hash(self, password):
|
||||
if self.password_hash == "ARGON2":
|
||||
from argon2 import PasswordHasher
|
||||
|
||||
return f"{{ARGON2}}{PasswordHasher().hash(password)}"
|
||||
else:
|
||||
from hashlib import pbkdf2_hmac, sha1
|
||||
import base64
|
||||
|
||||
salt = os.urandom(16)
|
||||
if self.password_hash == "PBKDF2":
|
||||
rounds = 200000
|
||||
password_hash = base64.b64encode(pbkdf2_hmac("sha512", password.encode("utf-8"), salt, rounds)).decode()
|
||||
return f"{{PBKDF2-SHA512}}{rounds}${base64.b64encode(salt).decode()}${password_hash}"
|
||||
else:
|
||||
return f"{{SSHA}}{base64.b64encode(sha1(password.encode() + salt).digest() + salt).decode()}"
|
||||
|
||||
def _get_groups(self, uid):
|
||||
groups = []
|
||||
self.ldap.connection.search(
|
||||
self.group_dn,
|
||||
"(memberUID={})".format(uid),
|
||||
SUBTREE,
|
||||
attributes=["cn"],
|
||||
)
|
||||
groups_data = self.ldap.connection.response
|
||||
for data in groups_data:
|
||||
groups.append(data["attributes"]["cn"][0])
|
||||
return groups
|
||||
|
||||
def _get_all_roles(self):
|
||||
self.ldap.connection.search(
|
||||
self.group_dn,
|
||||
"(cn=*)",
|
||||
SUBTREE,
|
||||
attributes=["cn", "gidNumber", "memberUid"],
|
||||
)
|
||||
return self.ldap.response()
|
||||
|
||||
def _set_roles(self, user: User):
|
||||
try:
|
||||
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
|
||||
ldap_roles = self._get_all_roles()
|
||||
|
||||
gid_numbers = sorted(ldap_roles, key=lambda i: i["attributes"]["gidNumber"], reverse=True)
|
||||
gid_number = gid_numbers[0]["attributes"]["gidNumber"] + 1
|
||||
|
||||
for user_role in user.roles:
|
||||
if user_role not in [role["attributes"]["cn"][0] for role in ldap_roles]:
|
||||
ldap_conn.add(
|
||||
f"cn={user_role},{self.group_dn}",
|
||||
["posixGroup"],
|
||||
attributes={"gidNumber": gid_number},
|
||||
)
|
||||
|
||||
ldap_roles = self._get_all_roles()
|
||||
|
||||
for ldap_role in ldap_roles:
|
||||
if ldap_role["attributes"]["cn"][0] in user.roles:
|
||||
modify = {"memberUid": [(MODIFY_ADD, [user.userid])]}
|
||||
else:
|
||||
modify = {"memberUid": [(MODIFY_DELETE, [user.userid])]}
|
||||
ldap_conn.modify(ldap_role["dn"], modify)
|
||||
|
||||
except (LDAPPasswordIsMandatoryError, LDAPBindError):
|
||||
raise BadRequest
|
||||
plugin = PluginMetadata(
|
||||
id="auth_ldap",
|
||||
|
||||
)
|
||||
|
|
|
@ -0,0 +1,307 @@
|
|||
"""LDAP Authentication Provider Plugin"""
|
||||
import os
|
||||
import ssl
|
||||
from PIL import Image
|
||||
from io import BytesIO
|
||||
from flask_ldapconn import LDAPConn
|
||||
from flask import current_app as app
|
||||
from ldap3 import SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE
|
||||
from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError
|
||||
from werkzeug.exceptions import BadRequest, InternalServerError, NotFound
|
||||
from werkzeug.datastructures import FileStorage
|
||||
|
||||
from flaschengeist import logger
|
||||
from flaschengeist.controller import userController
|
||||
from flaschengeist.models.user import User, Role, _Avatar
|
||||
from flaschengeist.plugins import AuthPlugin, before_role_updated
|
||||
|
||||
|
||||
class AuthLDAP(AuthPlugin):
|
||||
def __init__(self, config):
|
||||
super().__init__()
|
||||
app.config.update(
|
||||
LDAP_SERVER=config.get("host", "localhost"),
|
||||
LDAP_PORT=config.get("port", 389),
|
||||
LDAP_BINDDN=config.get("bind_dn", None),
|
||||
LDAP_SECRET=config.get("secret", None),
|
||||
LDAP_USE_SSL=config.get("use_ssl", False),
|
||||
# That's not TLS, its dirty StartTLS on unencrypted LDAP
|
||||
LDAP_USE_TLS=False,
|
||||
LDAP_TLS_VERSION=ssl.PROTOCOL_TLS,
|
||||
FORCE_ATTRIBUTE_VALUE_AS_LIST=True,
|
||||
)
|
||||
if "ca_cert" in config:
|
||||
app.config["LDAP_CA_CERTS_FILE"] = config["ca_cert"]
|
||||
else:
|
||||
# Default is CERT_REQUIRED
|
||||
app.config["LDAP_REQUIRE_CERT"] = ssl.CERT_OPTIONAL
|
||||
self.ldap = LDAPConn(app)
|
||||
self.base_dn = config["base_dn"]
|
||||
self.search_dn = config.get("search_dn", "ou=people,{base_dn}").format(base_dn=self.base_dn)
|
||||
self.group_dn = config.get("group_dn", "ou=group,{base_dn}").format(base_dn=self.base_dn)
|
||||
self.password_hash = config.get("password_hash", "SSHA").upper()
|
||||
self.object_classes = config.get("object_classes", ["inetOrgPerson"])
|
||||
self.user_attributes: dict = config.get("user_attributes", {})
|
||||
self.dn_template = config.get("dn_template")
|
||||
|
||||
# TODO: might not be set if modify is called
|
||||
self.root_dn = config.get("root_dn", None)
|
||||
self.root_secret = config.get("root_secret", None)
|
||||
|
||||
@before_role_updated
|
||||
def _role_updated(role, new_name):
|
||||
logger.debug(f"LDAP: before_role_updated called with ({role}, {new_name})")
|
||||
self.__modify_role(role, new_name)
|
||||
|
||||
def login(self, user, password):
|
||||
if not user:
|
||||
return False
|
||||
return self.ldap.authenticate(user.userid, password, "uid", self.base_dn)
|
||||
|
||||
def find_user(self, userid, mail=None):
|
||||
attr = self.__find(userid, mail)
|
||||
if attr is not None:
|
||||
user = User(userid=attr["uid"][0])
|
||||
self.__update(user, attr)
|
||||
return user
|
||||
|
||||
def update_user(self, user):
|
||||
attr = self.__find(user.userid)
|
||||
self.__update(user, attr)
|
||||
|
||||
def create_user(self, user, password):
|
||||
if self.root_dn is None:
|
||||
logger.error("root_dn missing in ldap config!")
|
||||
raise InternalServerError
|
||||
|
||||
try:
|
||||
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
|
||||
attributes = self.user_attributes.copy()
|
||||
if "uidNumber" in attributes:
|
||||
self.ldap.connection.search(
|
||||
self.search_dn,
|
||||
"(uidNumber=*)",
|
||||
SUBTREE,
|
||||
attributes=["uidNumber"],
|
||||
)
|
||||
resp = sorted(
|
||||
self.ldap.response(),
|
||||
key=lambda i: i["attributes"]["uidNumber"],
|
||||
reverse=True,
|
||||
)
|
||||
attributes["uidNumber"] = resp[0]["attributes"]["uidNumber"] + 1 if resp else attributes["uidNumber"]
|
||||
dn = self.dn_template.format(
|
||||
user=user,
|
||||
base_dn=self.base_dn,
|
||||
)
|
||||
if "default_gid" in attributes:
|
||||
default_gid = attributes.pop("default_gid")
|
||||
attributes["gidNumber"] = default_gid
|
||||
if "homeDirectory" in attributes:
|
||||
attributes["homeDirectory"] = attributes.get("homeDirectory").format(
|
||||
firstname=user.firstname,
|
||||
lastname=user.lastname,
|
||||
userid=user.userid,
|
||||
mail=user.mail,
|
||||
display_name=user.display_name,
|
||||
)
|
||||
attributes.update(
|
||||
{
|
||||
"sn": user.lastname,
|
||||
"givenName": user.firstname,
|
||||
"uid": user.userid,
|
||||
"userPassword": self.__hash(password),
|
||||
"mail": user.mail,
|
||||
}
|
||||
)
|
||||
if user.display_name:
|
||||
attributes.update({"displayName": user.display_name})
|
||||
ldap_conn.add(dn, self.object_classes, attributes)
|
||||
self._set_roles(user)
|
||||
self.update_user(user)
|
||||
except (LDAPPasswordIsMandatoryError, LDAPBindError):
|
||||
raise BadRequest
|
||||
|
||||
def modify_user(self, user: User, password=None, new_password=None):
|
||||
try:
|
||||
dn = user.get_attribute("DN")
|
||||
if password:
|
||||
ldap_conn = self.ldap.connect(dn, password)
|
||||
else:
|
||||
if self.root_dn is None:
|
||||
logger.error("root_dn missing in ldap config!")
|
||||
raise InternalServerError
|
||||
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
|
||||
modifier = {}
|
||||
for name, ldap_name in [
|
||||
("firstname", "givenName"),
|
||||
("lastname", "sn"),
|
||||
("mail", "mail"),
|
||||
("display_name", "displayName"),
|
||||
]:
|
||||
if hasattr(user, name):
|
||||
modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])]
|
||||
if new_password:
|
||||
modifier["userPassword"] = [(MODIFY_REPLACE, [self.__hash(new_password)])]
|
||||
ldap_conn.modify(dn, modifier)
|
||||
self._set_roles(user)
|
||||
except (LDAPPasswordIsMandatoryError, LDAPBindError):
|
||||
raise BadRequest
|
||||
|
||||
def get_avatar(self, user):
|
||||
self.ldap.connection.search(
|
||||
self.search_dn,
|
||||
"(uid={})".format(user.userid),
|
||||
SUBTREE,
|
||||
attributes=["jpegPhoto"],
|
||||
)
|
||||
r = self.ldap.connection.response[0]["attributes"]
|
||||
|
||||
if "jpegPhoto" in r and len(r["jpegPhoto"]) > 0:
|
||||
avatar = _Avatar()
|
||||
avatar.mimetype = "image/jpeg"
|
||||
avatar.binary = bytearray(r["jpegPhoto"][0])
|
||||
return avatar
|
||||
else:
|
||||
raise NotFound
|
||||
|
||||
def set_avatar(self, user: User, file: FileStorage):
|
||||
if self.root_dn is None:
|
||||
logger.error("root_dn missing in ldap config!")
|
||||
raise InternalServerError
|
||||
|
||||
image_bytes = BytesIO()
|
||||
try:
|
||||
# Make sure converted to RGB, e.g. png support RGBA but jpeg does not
|
||||
image = Image.open(file).convert("RGB")
|
||||
image.save(image_bytes, format="JPEG")
|
||||
except IOError:
|
||||
logger.debug(f"Could not convert avatar from '{file.mimetype}' to JPEG")
|
||||
raise BadRequest("Unsupported image format")
|
||||
|
||||
dn = user.get_attribute("DN")
|
||||
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
|
||||
ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [image_bytes.getvalue()])]})
|
||||
|
||||
def delete_avatar(self, user):
|
||||
if self.root_dn is None:
|
||||
logger.error("root_dn missing in ldap config!")
|
||||
dn = user.get_attribute("DN")
|
||||
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
|
||||
ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [])]})
|
||||
|
||||
def __find(self, userid, mail=None):
|
||||
"""Find attributes of an user by uid or mail in LDAP"""
|
||||
con = self.ldap.connection
|
||||
if not con:
|
||||
con = self.ldap.connect(self.root_dn, self.root_secret)
|
||||
con.search(
|
||||
self.search_dn,
|
||||
f"(| (uid={userid})(mail={mail}))" if mail else f"(uid={userid})",
|
||||
SUBTREE,
|
||||
attributes=["uid", "givenName", "sn", "mail"],
|
||||
)
|
||||
return con.response[0]["attributes"] if len(con.response) > 0 else None
|
||||
|
||||
def __update(self, user, attr):
|
||||
"""Update an User object with LDAP attributes"""
|
||||
if attr["uid"][0] == user.userid:
|
||||
user.set_attribute("DN", self.ldap.connection.response[0]["dn"])
|
||||
user.firstname = attr["givenName"][0]
|
||||
user.lastname = attr["sn"][0]
|
||||
if attr["mail"]:
|
||||
user.mail = attr["mail"][0]
|
||||
if "displayName" in attr:
|
||||
user.display_name = attr["displayName"][0]
|
||||
userController.set_roles(user, self._get_groups(user.userid), create=True)
|
||||
|
||||
def __modify_role(
|
||||
self,
|
||||
role: Role,
|
||||
new_name,
|
||||
):
|
||||
if self.root_dn is None:
|
||||
logger.error("root_dn missing in ldap config!")
|
||||
raise InternalServerError
|
||||
try:
|
||||
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
|
||||
ldap_conn.search(self.group_dn, f"(cn={role.name})", SUBTREE, attributes=["cn"])
|
||||
if len(ldap_conn.response) > 0:
|
||||
dn = ldap_conn.response[0]["dn"]
|
||||
if new_name:
|
||||
ldap_conn.modify_dn(dn, f"cn={new_name}")
|
||||
else:
|
||||
ldap_conn.delete(dn)
|
||||
|
||||
except LDAPPasswordIsMandatoryError:
|
||||
raise BadRequest
|
||||
except LDAPBindError:
|
||||
logger.debug(f"Could not bind to LDAP server", exc_info=True)
|
||||
raise InternalServerError
|
||||
|
||||
def __hash(self, password):
|
||||
if self.password_hash == "ARGON2":
|
||||
from argon2 import PasswordHasher
|
||||
|
||||
return f"{{ARGON2}}{PasswordHasher().hash(password)}"
|
||||
else:
|
||||
from hashlib import pbkdf2_hmac, sha1
|
||||
import base64
|
||||
|
||||
salt = os.urandom(16)
|
||||
if self.password_hash == "PBKDF2":
|
||||
rounds = 200000
|
||||
password_hash = base64.b64encode(pbkdf2_hmac("sha512", password.encode("utf-8"), salt, rounds)).decode()
|
||||
return f"{{PBKDF2-SHA512}}{rounds}${base64.b64encode(salt).decode()}${password_hash}"
|
||||
else:
|
||||
return f"{{SSHA}}{base64.b64encode(sha1(password.encode() + salt).digest() + salt).decode()}"
|
||||
|
||||
def _get_groups(self, uid):
|
||||
groups = []
|
||||
self.ldap.connection.search(
|
||||
self.group_dn,
|
||||
"(memberUID={})".format(uid),
|
||||
SUBTREE,
|
||||
attributes=["cn"],
|
||||
)
|
||||
groups_data = self.ldap.connection.response
|
||||
for data in groups_data:
|
||||
groups.append(data["attributes"]["cn"][0])
|
||||
return groups
|
||||
|
||||
def _get_all_roles(self):
|
||||
self.ldap.connection.search(
|
||||
self.group_dn,
|
||||
"(cn=*)",
|
||||
SUBTREE,
|
||||
attributes=["cn", "gidNumber", "memberUid"],
|
||||
)
|
||||
return self.ldap.response()
|
||||
|
||||
def _set_roles(self, user: User):
|
||||
try:
|
||||
ldap_conn = self.ldap.connect(self.root_dn, self.root_secret)
|
||||
ldap_roles = self._get_all_roles()
|
||||
|
||||
gid_numbers = sorted(ldap_roles, key=lambda i: i["attributes"]["gidNumber"], reverse=True)
|
||||
gid_number = gid_numbers[0]["attributes"]["gidNumber"] + 1
|
||||
|
||||
for user_role in user.roles:
|
||||
if user_role not in [role["attributes"]["cn"][0] for role in ldap_roles]:
|
||||
ldap_conn.add(
|
||||
f"cn={user_role},{self.group_dn}",
|
||||
["posixGroup"],
|
||||
attributes={"gidNumber": gid_number},
|
||||
)
|
||||
|
||||
ldap_roles = self._get_all_roles()
|
||||
|
||||
for ldap_role in ldap_roles:
|
||||
if ldap_role["attributes"]["cn"][0] in user.roles:
|
||||
modify = {"memberUid": [(MODIFY_ADD, [user.userid])]}
|
||||
else:
|
||||
modify = {"memberUid": [(MODIFY_DELETE, [user.userid])]}
|
||||
ldap_conn.modify(ldap_role["dn"], modify)
|
||||
|
||||
except (LDAPPasswordIsMandatoryError, LDAPBindError):
|
||||
raise BadRequest
|
Loading…
Reference in New Issue