flaschengeist/flaschengeist/plugins/auth_ldap/__init__.py

142 lines
5.5 KiB
Python

"""LDAP Authentication Provider Plugin"""
import ssl
from ldap3.utils.hashed import hashed
from ldap3 import SUBTREE, MODIFY_REPLACE, HASHED_SALTED_MD5
from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError
from flask import current_app as app
from flask_ldapconn import LDAPConn
from werkzeug.exceptions import BadRequest
from flaschengeist.plugins import AuthPlugin
from flaschengeist.models.user import User
import flaschengeist.controller.userController as userController
class AuthLDAP(AuthPlugin):
def __init__(self, cfg):
super().__init__()
config = {"PORT": 389, "USE_SSL": False}
config.update(cfg)
app.config.update(
LDAP_SERVER=config["URL"],
LDAP_PORT=config["PORT"],
LDAP_BINDDN=config["BINDDN"],
LDAP_USE_TLS=False,
LDAP_USE_SSL=config["USE_SSL"],
LDAP_TLS_VERSION=ssl.PROTOCOL_TLSv1_2,
LDAP_REQUIRE_CERT=ssl.CERT_NONE,
FORCE_ATTRIBUTE_VALUE_AS_LIST=True,
)
if "SECRET" in config:
app.config["LDAP_SECRET"] = (config["SECRET"],)
self.ldap = LDAPConn(app)
self.dn = config["BASEDN"]
# TODO: might not be set if modify is called
if "ADMIN_DN" in config:
self.admin_dn = config["ADMIN_DN"]
self.admin_secret = config["ADMIN_SECRET"]
def login(self, user, password):
if not user:
return False
return self.ldap.authenticate(user.userid, password, "uid", self.dn)
def update_user(self, user):
self.ldap.connection.search(
"ou=user,{}".format(self.dn),
"(uid={})".format(user.userid),
SUBTREE,
attributes=["uid", "givenName", "sn", "mail"],
)
r = self.ldap.connection.response[0]["attributes"]
if r["uid"][0] == user.userid:
user.set_attribute("DN", self.ldap.connection.response[0]["dn"])
user.firstname = r["givenName"][0]
user.lastname = r["sn"][0]
if r["mail"]:
user.mail = r["mail"][0]
if "displayName" in r:
user.display_name = r["displayName"][0]
userController.set_roles(user, self._get_groups(user.userid), create=True)
def create_user(self, user, password):
try:
ldap_conn = self.ldap.connect(self.admin_dn, self.admin_secret)
self.ldap.connection.search(
"ou=user,{}".format(self.dn), "(uidNumber=*)", SUBTREE, attributes=["uidNumber"]
)
uidNumbers = sorted(self.ldap.response(), key = lambda i: i['attributes']['uidNumber'], reverse=True)
uidNumber = uidNumbers[0]['attributes']['uidNumber'] + 1
dn = f'cn={user.firstname} {user.lastname},ou=user,{self.dn}'
object_class = ['inetOrgPerson', 'posixAccount', 'person', 'organizationalPerson']
attributes = {
'sn': user.firstname,
'givenName': user.lastname,
'gidNumber': 15000,
'homeDirectory': f'/home/{user.userid}',
'loginShell': '/bin/bash',
'uid': user.userid,
'userPassword': hashed(HASHED_SALTED_MD5, password),
'uidNumber': uidNumber
}
test = ldap_conn.add(dn, object_class, attributes)
print(test)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest
except Exception as e:
pass
def _get_groups(self, uid):
groups = []
self.ldap.connection.search(
"ou=user,{}".format(self.dn), "(uid={})".format(uid), SUBTREE, attributes=["gidNumber"]
)
# Maingroup ist uninteressant
#main_group_number = self.ldap.connection.response[0]["attributes"]["gidNumber"]
#if main_group_number:
# if type(main_group_number) is list:
# main_group_number = main_group_number[0]
# self.ldap.connection.search(
# "ou=group,{}".format(self.dn), "(gidNumber={})".format(main_group_number), attributes=["cn"]
# )
# groups.append(self.ldap.connection.response[0]["attributes"]["cn"][0])
self.ldap.connection.search(
"ou=group,{}".format(self.dn), "(memberUID={})".format(uid), SUBTREE, attributes=["cn"]
)
groups_data = self.ldap.connection.response
for data in groups_data:
groups.append(data["attributes"]["cn"][0])
return groups
def modify_user(self, user: User, password, new_password=None):
try:
dn = user.get_attribute("DN")
if password:
ldap_conn = self.ldap.connect(dn, password)
else:
ldap_conn = self.ldap.connect(self.admin_dn, self.admin_secret)
modifier = {}
for name, ldap_name in [
("firstname", "givenName"),
("lastname", "sn"),
("mail", "mail"),
("display_name", "displayName"),
]:
if hasattr(user, name):
modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])]
if new_password:
# TODO: Use secure hash!
salted_password = hashed(HASHED_SALTED_MD5, new_password)
modifier["userPassword"] = [(MODIFY_REPLACE, [salted_password])]
ldap_conn.modify(dn, modifier)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest