243 lines
7.5 KiB
Python
243 lines
7.5 KiB
Python
import secrets
|
|
from io import BytesIO
|
|
from flask import current_app
|
|
from datetime import datetime, timedelta, timezone
|
|
from flask.helpers import send_file
|
|
from werkzeug.exceptions import NotFound, BadRequest, Forbidden
|
|
|
|
from flaschengeist import logger
|
|
from flaschengeist.config import config
|
|
from flaschengeist.database import db
|
|
from flaschengeist.models.notification import Notification
|
|
from flaschengeist.utils.hook import Hook
|
|
from flaschengeist.utils.datetime import from_iso_format
|
|
from flaschengeist.models.user import User, Role, _PasswordReset
|
|
from flaschengeist.controller import imageController, messageController, sessionController
|
|
|
|
|
|
def _generate_password_reset(user):
|
|
"""Generate a password reset link for the user"""
|
|
reset = _PasswordReset.query.get(user.id_)
|
|
if not reset:
|
|
reset = _PasswordReset(_user_id=user.id_)
|
|
db.session.add(reset)
|
|
|
|
expires = datetime.now(tz=timezone.utc)
|
|
if not reset.expires or reset.expires < expires:
|
|
expires = expires + timedelta(hours=12)
|
|
reset.expires = expires
|
|
reset.token = secrets.token_urlsafe(24)
|
|
db.session.commit()
|
|
|
|
return reset
|
|
|
|
|
|
def login_user(username, password):
|
|
logger.info("login user {{ {} }}".format(username))
|
|
|
|
user = find_user(username)
|
|
if not user:
|
|
logger.debug("User not found in Database.")
|
|
user = User(userid=username)
|
|
db.session.add(user)
|
|
if current_app.config["FG_AUTH_BACKEND"].login(user, password):
|
|
update_user(user)
|
|
return user
|
|
return None
|
|
|
|
|
|
def request_reset(user: User):
|
|
logger.debug(f"New password reset request for {user.userid}")
|
|
reset = _generate_password_reset(user)
|
|
|
|
subject = str(config["MESSAGES"]["password_subject"]).format(name=user.display_name, username=user.userid)
|
|
text = str(config["MESSAGES"]["password_text"]).format(
|
|
name=user.display_name,
|
|
username=user.userid,
|
|
link=f'https://{config["FLASCHENGEIST"]["domain"]}/reset?token={reset.token}',
|
|
)
|
|
messageController.send_message(messageController.Message(user, text, subject))
|
|
|
|
|
|
def reset_password(token: str, password: str):
|
|
if len(token) != 32:
|
|
raise BadRequest
|
|
|
|
reset = _PasswordReset.query.filter(_PasswordReset.token == token).one_or_none()
|
|
logger.debug(f"Token is {'valid' if reset else 'invalid'}")
|
|
if not reset or reset.expires < datetime.now(tz=timezone.utc):
|
|
raise Forbidden
|
|
|
|
modify_user(reset.user, None, password)
|
|
sessionController.delete_sessions(reset.user)
|
|
|
|
db.session.delete(reset)
|
|
db.session.commit()
|
|
|
|
|
|
@Hook
|
|
def update_user(user):
|
|
current_app.config["FG_AUTH_BACKEND"].update_user(user)
|
|
if not user.display_name:
|
|
user.display_name = "{} {}.".format(user.firstname, user.lastname[0])
|
|
db.session.commit()
|
|
|
|
|
|
def set_roles(user: User, roles: list[str], create=False):
|
|
user.roles_.clear()
|
|
for role_name in roles:
|
|
role = Role.query.filter(Role.name == role_name).one_or_none()
|
|
if not role:
|
|
if not create:
|
|
raise BadRequest("Role not found >{}<".format(role_name))
|
|
role = Role(name=role_name)
|
|
user.roles_.append(role)
|
|
|
|
|
|
def modify_user(user, password, new_password=None):
|
|
"""Modify given user on the backend
|
|
|
|
Args:
|
|
user: User object to sync with backend
|
|
password: Current password (most backends are needing this)
|
|
new_password (optional): New password, if password should be changed
|
|
|
|
Raises:
|
|
NotImplemented: If backend is not capable of this operation
|
|
BadRequest: Password is wrong or other logic issues
|
|
"""
|
|
current_app.config["FG_AUTH_BACKEND"].modify_user(user, password, new_password)
|
|
|
|
if new_password:
|
|
logger.debug(f"Password changed for user {user.userid}")
|
|
subject = str(config["MESSAGES"]["password_changed_subject"]).format(
|
|
name=user.display_name, username=user.userid
|
|
)
|
|
text = str(config["MESSAGES"]["password_changed_text"]).format(
|
|
name=user.display_name,
|
|
username=user.userid,
|
|
)
|
|
messageController.send_message(messageController.Message(user, text, subject))
|
|
|
|
|
|
def get_users():
|
|
return User.query.all()
|
|
|
|
|
|
def get_user_by_role(role: Role):
|
|
return User.query.join(User.roles_).filter_by(role_id=role.id).all()
|
|
|
|
|
|
def get_user(uid):
|
|
"""Get an user by userid from database
|
|
Args:
|
|
uid: Userid to search for
|
|
Returns:
|
|
User fround
|
|
Raises:
|
|
NotFound if not found"""
|
|
user = User.query.filter(User.userid == uid).one_or_none()
|
|
if not user:
|
|
raise NotFound
|
|
return user
|
|
|
|
|
|
def find_user(uid_mail):
|
|
"""Finding an user by userid or mail in database or auth-backend
|
|
Args:
|
|
uid_mail: userid and or mail to search for
|
|
Returns:
|
|
User if found or None
|
|
"""
|
|
mail = uid_mail.split("@")
|
|
mail = len(mail) == 2 and len(mail[0]) > 0 and len(mail[1]) > 0
|
|
|
|
query = User.userid == uid_mail
|
|
if mail:
|
|
query |= User.mail == uid_mail
|
|
user = User.query.filter(query).one_or_none()
|
|
if user:
|
|
update_user(user)
|
|
else:
|
|
user = current_app.config["FG_AUTH_BACKEND"].find_user(uid_mail, uid_mail if mail else None)
|
|
if user:
|
|
if not user.display_name:
|
|
user.display_name = "{} {}.".format(user.firstname, user.lastname[0])
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
return user
|
|
|
|
|
|
def delete(user):
|
|
"""Delete given user"""
|
|
current_app.config["FG_AUTH_BACKEND"].delete_user(user)
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
|
|
def register(data):
|
|
allowed_keys = User().serialize().keys()
|
|
values = {key: value for key, value in data.items() if key in allowed_keys}
|
|
roles = values.pop("roles", [])
|
|
if "birthday" in data:
|
|
values["birthday"] = from_iso_format(data["birthday"]).date()
|
|
user = User(**values)
|
|
set_roles(user, roles)
|
|
|
|
password = secrets.token_urlsafe(16)
|
|
current_app.config["FG_AUTH_BACKEND"].create_user(user, password)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
reset = _generate_password_reset(user)
|
|
|
|
subject = str(config["MESSAGES"]["welcome_subject"]).format(name=user.display_name, username=user.userid)
|
|
text = str(config["MESSAGES"]["welcome_text"]).format(
|
|
name=user.display_name,
|
|
username=user.userid,
|
|
password_link=f'https://{config["FLASCHENGEIST"]["domain"]}/reset?token={reset.token}',
|
|
)
|
|
messageController.send_message(messageController.Message(user, text, subject))
|
|
|
|
find_user(user.userid)
|
|
|
|
return user
|
|
|
|
|
|
def load_avatar(user: User):
|
|
if user.avatar_ is not None:
|
|
return imageController.send_image(image=user.avatar_)
|
|
else:
|
|
avatar = current_app.config["FG_AUTH_BACKEND"].get_avatar(user)
|
|
if len(avatar.binary) > 0:
|
|
return send_file(BytesIO(avatar.binary), avatar.mimetype)
|
|
raise NotFound
|
|
|
|
|
|
def save_avatar(user, file):
|
|
current_app.config["FG_AUTH_BACKEND"].set_avatar(user, file)
|
|
db.session.commit()
|
|
|
|
|
|
def delete_avatar(user):
|
|
current_app.config["FG_AUTH_BACKEND"].delete_avatar(user)
|
|
db.session.commit()
|
|
|
|
|
|
def persist(user=None):
|
|
if user:
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
|
|
def get_notifications(user, start=None):
|
|
query = Notification.query.filter(Notification.user_id_ == user.id_)
|
|
if start is not None:
|
|
query = query.filter(Notification.time > start)
|
|
return query.order_by(Notification.time).all()
|
|
|
|
|
|
def delete_notification(nid, user):
|
|
Notification.query.filter(Notification.id == nid).filter(Notification.user_ == user).delete()
|
|
db.session.commit()
|