flaschengeist/flaschengeist/controller/userController.py

208 lines
6.3 KiB
Python

import secrets
from flask import current_app
from datetime import datetime, timedelta, timezone
from werkzeug.exceptions import NotFound, BadRequest, Forbidden
from flaschengeist import logger
from flaschengeist.config import config
from flaschengeist.database import db
from flaschengeist.utils.hook import Hook
from flaschengeist.models.user import User, Role, _PasswordReset
from flaschengeist.controller import messageController, sessionController
def _generate_password_reset(user):
"""Generate a password reset link for the user"""
reset = _PasswordReset.query.get(user._id)
if not reset:
reset = _PasswordReset(_user_id=user._id)
db.session.add(reset)
expires = datetime.now(tz=timezone.utc)
if not reset.expires or reset.expires < expires:
expires = expires + timedelta(hours=12)
reset.expires = expires
reset.token = secrets.token_urlsafe(24)
db.session.commit()
return reset
def login_user(username, password):
logger.info("login user {{ {} }}".format(username))
user = find_user(username)
if not user:
logger.debug("User not found in Database.")
user = User(userid=username)
db.session.add(user)
if current_app.config["FG_AUTH_BACKEND"].login(user, password):
update_user(user)
return user
return None
def request_reset(user: User):
logger.debug(f"New password reset request for {user.userid}")
reset = _generate_password_reset(user)
subject = str(config["MESSAGES"]["password_subject"]).format(name=user.display_name, username=user.userid)
text = str(config["MESSAGES"]["password_text"]).format(
name=user.display_name,
username=user.userid,
link=f'https://{config["FLASCHENGEIST"]["domain"]}/reset?token={reset.token}'
)
messageController.send_message(messageController.Message(user, text, subject))
def reset_password(token: str, password: str):
if len(token) != 32:
raise BadRequest
reset = _PasswordReset.query.filter(_PasswordReset.token == token).one_or_none()
logger.debug(f"Token is {'valid' if reset else 'invalid'}")
if not reset or reset.expires < datetime.now(tz=timezone.utc):
raise Forbidden
modify_user(reset.user, None, password)
sessionController.delete_sessions(reset.user)
db.session.delete(reset)
db.session.commit()
@Hook
def update_user(user):
current_app.config["FG_AUTH_BACKEND"].update_user(user)
if not user.display_name:
user.display_name = "{} {}.".format(user.firstname, user.lastname[0])
db.session.commit()
def set_roles(user: User, roles: [str], create=False):
user.roles_.clear()
for role_name in roles:
role = Role.query.filter(Role.name == role_name).one_or_none()
if not role:
if not create:
raise BadRequest("Role not found >{}<".format(role_name))
role = Role(name=role_name)
user.roles_.append(role)
def modify_user(user, password, new_password=None):
"""Modify given user on the backend
Args:
user: User object to sync with backend
password: Current password (most backends are needing this)
new_password (optional): New password, if password should be changed
Raises:
NotImplemented: If backend is not capable of this operation
BadRequest: Password is wrong or other logic issues
"""
current_app.config["FG_AUTH_BACKEND"].modify_user(user, password, new_password)
if new_password:
logger.debug(f"Password changed for user {user.userid}")
subject = str(config["MESSAGES"]["password_changed_subject"]).format(name=user.display_name, username=user.userid)
text = str(config["MESSAGES"]["password_changed_text"]).format(
name=user.display_name,
username=user.userid,
)
messageController.send_message(messageController.Message(user, text, subject))
def get_users():
return User.query.all()
def get_user_by_role(role: Role):
return User.query.join(User.roles_).filter_by(role_id=role.id).all()
def get_user(uid):
"""Get an user by userid from database
Args:
uid: Userid to search for
Returns:
User fround
Raises:
NotFound if not found"""
user = User.query.filter(User.userid == uid).one_or_none()
if not user:
raise NotFound
return user
def find_user(uid_mail):
"""Finding an user by userid or mail in database or auth-backend
Args:
uid_mail: userid and or mail to search for
Returns:
User if found or None
"""
mail = uid_mail.split("@")
mail = len(mail) == 2 and len(mail[0]) > 0 and len(mail[1]) > 0
query = User.userid == uid_mail
if mail:
query |= User.mail == uid_mail
user = User.query.filter(query).one_or_none()
if user:
update_user(user)
else:
user = current_app.config["FG_AUTH_BACKEND"].find_user(uid_mail, uid_mail if mail else None)
if user:
db.session.add(user)
db.session.commit()
return user
def delete(user):
"""Delete given user"""
current_app.config["FG_AUTH_BACKEND"].delete_user(user)
db.session.delete(user)
db.session.commit()
def register(data):
allowed_keys = User().serialize().keys()
values = {key: value for key, value in data.items() if key in allowed_keys}
roles = values.pop("roles", [])
user = User(**values)
set_roles(user, roles)
password = secrets.token_bytes(16)
current_app.config["FG_AUTH_BACKEND"].create_user(user, password)
db.session.add(user)
db.session.commit()
reset = _generate_password_reset(user)
subject = str(config["MESSAGES"]["welcome_subject"]).format(name=user.display_name, username=user.userid)
text = str(config["MESSAGES"]["welcome_text"]).format(
name=user.display_name,
username=user.userid,
password_link=f'https://{config["FLASCHENGEIST"]["domain"]}/reset?token={reset.token}'
)
messageController.send_message(messageController.Message(user, text, subject))
return user
def load_avatar(user: User):
return current_app.config["FG_AUTH_BACKEND"].get_avatar(user)
def save_avatar(user, avatar):
current_app.config["FG_AUTH_BACKEND"].set_avatar(user, avatar)
db.session.commit()
def persist(user=None):
if user:
db.session.add(user)
db.session.commit()