import click from flask import current_app from flask.cli import with_appcontext from werkzeug.exceptions import NotFound @click.command(no_args_is_help=True) @click.option("--sync", is_flag=True, default=False, help="Synchronize users from LDAP -> database") @click.option("--sync-ldap", is_flag=True, default=False, help="Synchronize users from database -> LDAP") @with_appcontext @click.pass_context def ldap(ctx, sync, sync_ldap): """Tools for the LDAP authentification""" from flaschengeist.controller import userController from flaschengeist.plugins.auth_ldap import AuthLDAP if sync: click.echo("Synchronizing users from LDAP -> database") from ldap3 import SUBTREE from flaschengeist.models import User from flaschengeist.database import db auth_ldap: AuthLDAP = current_app.config.get("FG_PLUGINS").get("auth_ldap") if auth_ldap is None or not isinstance(auth_ldap, AuthLDAP): ctx.fail("auth_ldap plugin not found or not enabled!") conn = auth_ldap.ldap.connection if not conn: conn = auth_ldap.ldap.connect(auth_ldap.root_dn, auth_ldap.root_secret) conn.search(auth_ldap.search_dn, "(uid=*)", SUBTREE, attributes=["uid", "givenName", "sn", "mail"]) ldap_users_response = conn.response for ldap_user in ldap_users_response: uid = ldap_user["attributes"]["uid"][0] try: user = userController.get_user(uid) except NotFound: user = User(userid=uid) db.session.add(user) userController.update_user(user, auth_ldap) if sync_ldap: click.echo("Synchronizing users from database -> LDAP") auth_ldap: AuthLDAP = current_app.config.get("FG_PLUGINS").get("auth_ldap") if auth_ldap is None or not isinstance(auth_ldap, AuthLDAP): ctx.fail("auth_ldap plugin not found or not enabled!") users = userController.get_users() for user in users: userController.update_user(user, auth_ldap)