flaschengeist/flaschengeist/plugins/auth_ldap/__init__.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

142 lines
5.5 KiB
Python
Raw Normal View History

"""LDAP Authentication Provider Plugin"""
import ssl
from ldap3.utils.hashed import hashed
from ldap3 import SUBTREE, MODIFY_REPLACE, HASHED_SALTED_MD5
from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError
from flask import current_app as app
2020-08-25 02:36:05 +00:00
from flask_ldapconn import LDAPConn
from werkzeug.exceptions import BadRequest
2020-08-25 02:36:05 +00:00
from flaschengeist.plugins import AuthPlugin
from flaschengeist.models.user import User
import flaschengeist.controller.userController as userController
class AuthLDAP(AuthPlugin):
def __init__(self, cfg):
super().__init__()
config = {"PORT": 389, "USE_SSL": False}
config.update(cfg)
2020-08-25 02:36:05 +00:00
app.config.update(
LDAP_SERVER=config["URL"],
LDAP_PORT=config["PORT"],
LDAP_BINDDN=config["BINDDN"],
LDAP_USE_TLS=False,
LDAP_USE_SSL=config["USE_SSL"],
LDAP_TLS_VERSION=ssl.PROTOCOL_TLSv1_2,
LDAP_REQUIRE_CERT=ssl.CERT_NONE,
FORCE_ATTRIBUTE_VALUE_AS_LIST=True,
)
2020-08-25 02:36:05 +00:00
if "SECRET" in config:
app.config["LDAP_SECRET"] = (config["SECRET"],)
self.ldap = LDAPConn(app)
self.dn = config["BASEDN"]
# TODO: might not be set if modify is called
if "ADMIN_DN" in config:
self.admin_dn = config["ADMIN_DN"]
self.admin_secret = config["ADMIN_SECRET"]
def login(self, user, password):
if not user:
return False
return self.ldap.authenticate(user.userid, password, "uid", self.dn)
2020-08-25 02:36:05 +00:00
2020-09-03 15:56:12 +00:00
def update_user(self, user):
self.ldap.connection.search(
"ou=user,{}".format(self.dn),
"(uid={})".format(user.userid),
SUBTREE,
attributes=["uid", "givenName", "sn", "mail"],
)
2020-08-25 02:36:05 +00:00
r = self.ldap.connection.response[0]["attributes"]
if r["uid"][0] == user.userid:
2020-09-02 11:07:21 +00:00
user.set_attribute("DN", self.ldap.connection.response[0]["dn"])
2020-08-25 02:36:05 +00:00
user.firstname = r["givenName"][0]
user.lastname = r["sn"][0]
if r["mail"]:
user.mail = r["mail"][0]
if "displayName" in r:
2020-09-02 11:07:21 +00:00
user.display_name = r["displayName"][0]
userController.set_roles(user, self._get_groups(user.userid), create=True)
2020-08-25 02:36:05 +00:00
def create_user(self, user, password):
try:
ldap_conn = self.ldap.connect(self.admin_dn, self.admin_secret)
self.ldap.connection.search(
"ou=user,{}".format(self.dn), "(uidNumber=*)", SUBTREE, attributes=["uidNumber"]
)
uidNumbers = sorted(self.ldap.response(), key = lambda i: i['attributes']['uidNumber'], reverse=True)
uidNumber = uidNumbers[0]['attributes']['uidNumber'] + 1
dn = f'cn={user.firstname} {user.lastname},ou=user,{self.dn}'
object_class = ['inetOrgPerson', 'posixAccount', 'person', 'organizationalPerson']
attributes = {
'sn': user.firstname,
'givenName': user.lastname,
'gidNumber': 15000,
'homeDirectory': f'/home/{user.userid}',
'loginShell': '/bin/bash',
'uid': user.userid,
'userPassword': hashed(HASHED_SALTED_MD5, password),
'uidNumber': uidNumber
}
test = ldap_conn.add(dn, object_class, attributes)
print(test)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest
except Exception as e:
pass
2020-09-02 11:07:21 +00:00
def _get_groups(self, uid):
groups = []
self.ldap.connection.search(
"ou=user,{}".format(self.dn), "(uid={})".format(uid), SUBTREE, attributes=["gidNumber"]
)
# Maingroup ist uninteressant
#main_group_number = self.ldap.connection.response[0]["attributes"]["gidNumber"]
#if main_group_number:
# if type(main_group_number) is list:
# main_group_number = main_group_number[0]
# self.ldap.connection.search(
# "ou=group,{}".format(self.dn), "(gidNumber={})".format(main_group_number), attributes=["cn"]
# )
# groups.append(self.ldap.connection.response[0]["attributes"]["cn"][0])
2020-08-25 02:36:05 +00:00
self.ldap.connection.search(
"ou=group,{}".format(self.dn), "(memberUID={})".format(uid), SUBTREE, attributes=["cn"]
)
groups_data = self.ldap.connection.response
for data in groups_data:
groups.append(data["attributes"]["cn"][0])
return groups
2020-08-25 02:36:05 +00:00
def modify_user(self, user: User, password, new_password=None):
try:
2020-10-27 12:36:23 +00:00
dn = user.get_attribute("DN")
if password:
ldap_conn = self.ldap.connect(dn, password)
else:
ldap_conn = self.ldap.connect(self.admin_dn, self.admin_secret)
modifier = {}
for name, ldap_name in [
("firstname", "givenName"),
("lastname", "sn"),
("mail", "mail"),
("display_name", "displayName"),
]:
if hasattr(user, name):
modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])]
if new_password:
2020-10-28 19:30:21 +00:00
# TODO: Use secure hash!
salted_password = hashed(HASHED_SALTED_MD5, new_password)
modifier["userPassword"] = [(MODIFY_REPLACE, [salted_password])]
ldap_conn.modify(dn, modifier)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest