109 lines
2.9 KiB
Python
109 lines
2.9 KiB
Python
from flask import current_app
|
|
from sqlalchemy.orm.exc import NoResultFound
|
|
from werkzeug.exceptions import NotFound, BadRequest
|
|
|
|
from flaschengeist.utils.hook import Hook
|
|
from flaschengeist.models.user import User, Role
|
|
from flaschengeist.database import db
|
|
from flaschengeist import logger
|
|
|
|
|
|
class Avatar:
|
|
mimetype = ""
|
|
binary = bytearray()
|
|
|
|
|
|
@Hook
|
|
def load_avatar(avatar: Avatar, user: User):
|
|
pass
|
|
|
|
|
|
@Hook
|
|
def save_avatar(avatar: Avatar, user: User):
|
|
pass
|
|
|
|
|
|
def login_user(username, password):
|
|
logger.info("login user {{ {} }}".format(username))
|
|
try:
|
|
user = User.query.filter(User.userid == username).one()
|
|
except NoResultFound:
|
|
user = User(userid=username)
|
|
db.session.add(user)
|
|
if current_app.config["FG_AUTH_BACKEND"].login(user, password):
|
|
update_user(user)
|
|
return user
|
|
return None
|
|
|
|
|
|
@Hook
|
|
def update_user(user):
|
|
current_app.config["FG_AUTH_BACKEND"].update_user(user)
|
|
if not user.display_name:
|
|
user.display_name = "{} {}.".format(user.firstname, user.lastname[0])
|
|
db.session.commit()
|
|
|
|
|
|
def set_roles(user: User, roles: [str], create=False):
|
|
user.roles_.clear()
|
|
for role_name in roles:
|
|
role = Role.query.filter(Role.name == role_name).one_or_none()
|
|
if not role:
|
|
if not create:
|
|
raise BadRequest("Role not found >{}<".format(role_name))
|
|
role = Role(name=role_name)
|
|
user.roles_.append(role)
|
|
|
|
|
|
def modify_user(user, password, new_password=None):
|
|
"""Modify given user on the backend
|
|
|
|
Args:
|
|
user: User object to sync with backend
|
|
password: Current password (most backends are needing this)
|
|
new_password (optional): New password, if password should be changed
|
|
|
|
Raises:
|
|
NotImplemented: If backend is not capable of this operation
|
|
BadRequest: Password is wrong or other logic issues
|
|
"""
|
|
current_app.config["FG_AUTH_BACKEND"].modify_user(user, password, new_password)
|
|
|
|
|
|
def get_users():
|
|
return User.query.all()
|
|
|
|
|
|
def get_user_by_role(role: Role):
|
|
return User.query.join(User.roles_).filter_by(role_id=role.id).all()
|
|
|
|
|
|
def get_user(uid):
|
|
user = User.query.filter(User.userid == uid).one_or_none()
|
|
if not user:
|
|
raise NotFound
|
|
return user
|
|
|
|
|
|
def delete(user):
|
|
current_app.config["FG_AUTH_BACKEND"].delete_user(user)
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
|
|
def register(data):
|
|
for required in ["firstname", "lastname", "mail"]:
|
|
if required not in data:
|
|
raise BadRequest("Missing required parameters")
|
|
allowed_keys = User().serialize().keys()
|
|
values = {key: value for key, value in data.items() if key in allowed_keys}
|
|
roles = values.pop("roles", [])
|
|
user = User(**values)
|
|
set_roles(user, roles)
|
|
|
|
current_app.config["FG_AUTH_BACKEND"].create_user(user, data["password"])
|
|
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
return user
|