flaschengeist/flaschengeist/plugins/auth_ldap/__init__.py

274 lines
10 KiB
Python

"""LDAP Authentication Provider Plugin"""
import io
import ssl
from typing import Optional
from flask_ldapconn import LDAPConn
from flask import current_app as app
from ldap3.utils.hashed import hashed
from ldap3.core.exceptions import LDAPPasswordIsMandatoryError, LDAPBindError
from ldap3 import SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE, HASHED_SALTED_MD5
from werkzeug.exceptions import BadRequest, InternalServerError, NotFound
from flaschengeist import logger
from flaschengeist.plugins import AuthPlugin, after_role_updated
from flaschengeist.models.user import User, Role, _Avatar
import flaschengeist.controller.userController as userController
class AuthLDAP(AuthPlugin):
def __init__(self, cfg):
super().__init__()
config = {"port": 389, "use_ssl": False}
config.update(cfg)
app.config.update(
LDAP_SERVER=config["host"],
LDAP_PORT=config["port"],
LDAP_BINDDN=config["bind_dn"],
LDAP_USE_TLS=False,
LDAP_USE_SSL=config["use_ssl"],
LDAP_TLS_VERSION=ssl.PROTOCOL_TLSv1_2,
LDAP_REQUIRE_CERT=ssl.CERT_NONE,
FORCE_ATTRIBUTE_VALUE_AS_LIST=True,
)
if "SECRET" in config:
app.config["LDAP_SECRET"] = (config["secret"],)
self.ldap = LDAPConn(app)
self.dn = config["base_dn"]
self.default_gid = config["default_gid"]
# TODO: might not be set if modify is called
if "admin_dn" in config:
self.admin_dn = config["admin_dn"]
self.admin_secret = config["admin_secret"]
else:
self.admin_dn = None
@after_role_updated
def _role_updated(role, new_name):
self.__modify_role(role, new_name)
def login(self, user, password):
if not user:
return False
return self.ldap.authenticate(user.userid, password, "uid", self.dn)
def find_user(self, userid, mail=None):
attr = self.__find(userid, mail)
if attr:
user = User(userid=attr["uid"][0])
self.__update(user, attr)
return user
def update_user(self, user):
attr = self.__find(user.userid)
self.__update(user, attr)
def create_user(self, user, password):
if self.admin_dn is None:
logger.error("admin_dn missing in ldap config!")
raise InternalServerError
try:
ldap_conn = self.ldap.connect(self.admin_dn, self.admin_secret)
self.ldap.connection.search(
"ou=user,{}".format(self.dn),
"(uidNumber=*)",
SUBTREE,
attributes=["uidNumber"],
)
uid_number = (
sorted(self.ldap.response(), key=lambda i: i["attributes"]["uidNumber"], reverse=True,)[0][
"attributes"
]["uidNumber"]
+ 1
)
dn = f"cn={user.firstname} {user.lastname},ou=user,{self.dn}"
object_class = [
"inetOrgPerson",
"posixAccount",
"person",
"organizationalPerson",
]
attributes = {
"sn": user.firstname,
"givenName": user.lastname,
"gidNumber": self.default_gid,
"homeDirectory": f"/home/{user.userid}",
"loginShell": "/bin/bash",
"uid": user.userid,
"userPassword": hashed(HASHED_SALTED_MD5, password),
"uidNumber": uid_number,
}
ldap_conn.add(dn, object_class, attributes)
self._set_roles(user)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest
def modify_user(self, user: User, password=None, new_password=None):
try:
dn = user.get_attribute("DN")
if password:
ldap_conn = self.ldap.connect(dn, password)
else:
if self.admin_dn is None:
logger.error("admin_dn missing in ldap config!")
raise InternalServerError
ldap_conn = self.ldap.connect(self.admin_dn, self.admin_secret)
modifier = {}
for name, ldap_name in [
("firstname", "givenName"),
("lastname", "sn"),
("mail", "mail"),
("display_name", "displayName"),
]:
if hasattr(user, name):
modifier[ldap_name] = [(MODIFY_REPLACE, [getattr(user, name)])]
if new_password:
# TODO: Use secure hash!
salted_password = hashed(HASHED_SALTED_MD5, new_password)
modifier["userPassword"] = [(MODIFY_REPLACE, [salted_password])]
ldap_conn.modify(dn, modifier)
self._set_roles(user)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest
def get_avatar(self, user):
self.ldap.connection.search(
"ou=user,{}".format(self.dn),
"(uid={})".format(user.userid),
SUBTREE,
attributes=["jpegPhoto"],
)
r = self.ldap.connection.response[0]["attributes"]
if "jpegPhoto" in r and len(r["jpegPhoto"]) > 0:
avatar = _Avatar()
avatar.mimetype = "image/jpeg"
avatar.binary.extend(r["jpegPhoto"][0])
return avatar
else:
raise NotFound
def set_avatar(self, user, avatar: _Avatar):
if self.admin_dn is None:
logger.error("admin_dn missing in ldap config!")
raise InternalServerError
if avatar.mimetype != "image/jpeg":
# Try converting using Pillow (if installed)
try:
from PIL import Image
image = Image.open(io.BytesIO(avatar.binary))
image_bytes = io.BytesIO()
image.save(image_bytes, format="JPEG")
avatar.binary = image_bytes.getvalue()
avatar.mimetype = "image/jpeg"
except ImportError:
logger.debug("Pillow not installed for image conversion")
raise BadRequest("Unsupported image format")
except IOError:
logger.debug(f"Could not convert avatar from '{avatar.mimetype}' to JPEG")
raise BadRequest("Unsupported image format")
dn = user.get_attribute("DN")
ldap_conn = self.ldap.connect(self.admin_dn, self.admin_secret)
ldap_conn.modify(dn, {"jpegPhoto": [(MODIFY_REPLACE, [avatar.binary])]})
def __find(self, userid, mail=None):
"""Find attributes of an user by uid or mail in LDAP"""
con = self.ldap.connection
if not con:
con = self.ldap.connect(self.admin_dn, self.admin_secret)
con.search(
f"ou=user,{self.dn}",
f"(| (uid={userid})(mail={mail}))" if mail else f"(uid={userid})",
SUBTREE,
attributes=["uid", "givenName", "sn", "mail"],
)
return con.response[0]["attributes"]
def __update(self, user, attr):
"""Update an User object with LDAP attributes"""
if attr["uid"][0] == user.userid:
user.set_attribute("DN", self.ldap.connection.response[0]["dn"])
user.firstname = attr["givenName"][0]
user.lastname = attr["sn"][0]
if attr["mail"]:
user.mail = attr["mail"][0]
if "displayName" in attr:
user.display_name = attr["displayName"][0]
userController.set_roles(user, self._get_groups(user.userid), create=True)
def __modify_role(
self,
role: Role,
new_name: Optional[str],
):
if self.admin_dn is None:
logger.error("admin_dn missing in ldap config!")
raise InternalServerError
try:
ldap_conn = self.ldap.connect(self.admin_dn, self.admin_secret)
ldap_conn.search(f"ou=group,{self.dn}", f"(cn={role.name})", SUBTREE, attributes=["cn"])
if len(ldap_conn.response) > 0:
dn = ldap_conn.response[0]["dn"]
if new_name:
ldap_conn.modify_dn(dn, f"cn={new_name}")
else:
ldap_conn.delete(dn)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest
def _get_groups(self, uid):
groups = []
self.ldap.connection.search(
"ou=group,{}".format(self.dn),
"(memberUID={})".format(uid),
SUBTREE,
attributes=["cn"],
)
groups_data = self.ldap.connection.response
for data in groups_data:
groups.append(data["attributes"]["cn"][0])
return groups
def _get_all_roles(self):
self.ldap.connection.search(
f"ou=group,{self.dn}",
"(cn=*)",
SUBTREE,
attributes=["cn", "gidNumber", "memberUid"],
)
return self.ldap.response()
def _set_roles(self, user: User):
try:
ldap_conn = self.ldap.connect(self.admin_dn, self.admin_secret)
ldap_roles = self._get_all_roles()
gid_numbers = sorted(ldap_roles, key=lambda i: i["attributes"]["gidNumber"], reverse=True)
gid_number = gid_numbers[0]["attributes"]["gidNumber"] + 1
for user_role in user.roles:
if user_role not in [role["attributes"]["cn"][0] for role in ldap_roles]:
ldap_conn.add(
f"cn={user_role},ou=group,{self.dn}",
["posixGroup"],
attributes={"gidNumber": gid_number},
)
ldap_roles = self._get_all_roles()
for ldap_role in ldap_roles:
if ldap_role["attributes"]["cn"][0] in user.roles:
modify = {"memberUid": [(MODIFY_ADD, [user.userid])]}
else:
modify = {"memberUid": [(MODIFY_DELETE, [user.userid])]}
ldap_conn.modify(ldap_role["dn"], modify)
except (LDAPPasswordIsMandatoryError, LDAPBindError):
raise BadRequest