feat(users): Add deleted attribute to users.

This allows us to filter out deleted users which could not be deleted and
had to be soft-deleted.
Meaning: users which still had foreign keys on the database,
from e.g. disabled plugins.
This commit is contained in:
Ferdinand Thiessen 2021-12-03 12:51:40 +01:00
parent f9d9494a36
commit 47400f02e9
5 changed files with 77 additions and 18 deletions

View File

@ -12,6 +12,7 @@ from flaschengeist.database import db
from flaschengeist.models.notification import Notification from flaschengeist.models.notification import Notification
from flaschengeist.utils.hook import Hook from flaschengeist.utils.hook import Hook
from flaschengeist.utils.datetime import from_iso_format from flaschengeist.utils.datetime import from_iso_format
from flaschengeist.utils.foreign_keys import merge_references
from flaschengeist.models.user import User, Role, _PasswordReset from flaschengeist.models.user import User, Role, _PasswordReset
from flaschengeist.controller import imageController, messageController, sessionController from flaschengeist.controller import imageController, messageController, sessionController
@ -33,10 +34,6 @@ def _generate_password_reset(user):
return reset return reset
def install():
pass
def login_user(username, password): def login_user(username, password):
logger.info("login user {{ {} }}".format(username)) logger.info("login user {{ {} }}".format(username))
@ -125,23 +122,24 @@ def modify_user(user, password, new_password=None):
messageController.send_message(messageController.Message(user, text, subject)) messageController.send_message(messageController.Message(user, text, subject))
def get_users(): def get_users(deleted=False):
return User.query.all() return User.query.filter(User.deleted == deleted).all()
def get_user_by_role(role: Role): def get_user_by_role(role: Role):
return User.query.join(User.roles_).filter_by(role_id=role.id).all() return User.query.join(User.roles_).filter_by(role_id=role.id).all()
def get_user(uid): def get_user(uid, deleted=False):
"""Get an user by userid from database """Get an user by userid from database
Args: Args:
uid: Userid to search for uid: Userid to search for
deleted: Set to true to also search deleted users
Returns: Returns:
User fround User fround
Raises: Raises:
NotFound if not found""" NotFound if not found"""
user = User.query.filter(User.userid == uid).one_or_none() user = User.query.filter(User.userid == uid, User.deleted == deleted).one_or_none()
if not user: if not user:
raise NotFound raise NotFound
return user return user
@ -184,9 +182,19 @@ def delete_user(user: User):
user.roles_.clear() user.roles_.clear()
user.sessions_.clear() user.sessions_.clear()
user.reset_requests_.clear() user.reset_requests_.clear()
db.session.commit() # Now move all other references to the DELETED_USER
try:
deleted_user = get_user("__deleted_user__", True)
except NotFound:
deleted_user = User(
userid="__deleted_user__", firstname="USER", lastname="DELETED", display_name="DELETED USER", deleted=True
)
db.session.add(user)
db.session.flush()
merge_references(user, deleted_user)
db.session.commit()
# Now try to delete the user for real
try: try:
# Delete the user
db.session.delete(user) db.session.delete(user)
db.session.commit() db.session.commit()
except exc.IntegrityError: except exc.IntegrityError:
@ -196,6 +204,7 @@ def delete_user(user: User):
user.display_name = "DELETED USER" user.display_name = "DELETED USER"
user.firstname = "" user.firstname = ""
user.lastname = "" user.lastname = ""
user.deleted = True
user.birthday = None user.birthday = None
user.mail = None user.mail = None
db.session.commit() db.session.commit()

View File

@ -57,8 +57,9 @@ class User(db.Model, ModelSerializeMixin):
display_name: str = db.Column(db.String(30)) display_name: str = db.Column(db.String(30))
firstname: str = db.Column(db.String(50), nullable=False) firstname: str = db.Column(db.String(50), nullable=False)
lastname: str = db.Column(db.String(50), nullable=False) lastname: str = db.Column(db.String(50), nullable=False)
mail: str = db.Column(db.String(60)) deleted: bool = db.Column(db.Boolean(), default=False)
birthday: Optional[date] = db.Column(db.Date) birthday: Optional[date] = db.Column(db.Date)
mail: str = db.Column(db.String(60))
roles: list[str] = [] roles: list[str] = []
permissions: Optional[list[str]] = None permissions: Optional[list[str]] = None

View File

@ -15,7 +15,7 @@ from flaschengeist import logger
class AuthPlain(AuthPlugin): class AuthPlain(AuthPlugin):
def post_install(self): def post_install(self):
if User.query.first() is None: if User.query.filter(User.deleted == False).count() == 0:
logger.info("Installing admin user") logger.info("Installing admin user")
role = Role.query.filter(Role.name == "Superuser").first() role = Role.query.filter(Role.name == "Superuser").first()
if role is None: if role is None:

View File

@ -22,10 +22,6 @@ class UsersPlugin(Plugin):
blueprint = Blueprint(name, __name__) blueprint = Blueprint(name, __name__)
permissions = permissions.permissions permissions = permissions.permissions
def install(self):
userController.install()
return super().install()
@UsersPlugin.blueprint.route("/users", methods=["POST"]) @UsersPlugin.blueprint.route("/users", methods=["POST"])
def register(): def register():
@ -93,7 +89,9 @@ def get_user(userid, current_session):
JSON encoded `flaschengeist.models.user.User` or if userid is current user also containing permissions or HTTP error JSON encoded `flaschengeist.models.user.User` or if userid is current user also containing permissions or HTTP error
""" """
logger.debug("Get information of user {{ {} }}".format(userid)) logger.debug("Get information of user {{ {} }}".format(userid))
user: User = userController.get_user(userid) user: User = userController.get_user(
userid, True
) # This is the only API point that should return data for deleted users
serial = user.serialize() serial = user.serialize()
if userid == current_session.user_.userid: if userid == current_session.user_.userid:
serial["permissions"] = user.get_permissions() serial["permissions"] = user.get_permissions()

View File

@ -0,0 +1,51 @@
# Borrowed from https://github.com/kvesteri/sqlalchemy-utils
# Modifications see: https://github.com/kvesteri/sqlalchemy-utils/issues/561
# LICENSED under the BSD license, see upstream https://github.com/kvesteri/sqlalchemy-utils/blob/master/LICENSE
import sqlalchemy as sa
from sqlalchemy.orm import object_session
def get_foreign_key_values(fk, obj):
mapper = sa.inspect(obj.__class__)
return dict(
(
fk.constraint.columns.values()[index],
getattr(obj, element.column.key)
if hasattr(obj, element.column.key)
else getattr(obj, mapper.get_property_by_column(element.column).key),
)
for index, element in enumerate(fk.constraint.elements)
)
def get_referencing_foreign_keys(mixed):
tables = [mixed]
referencing_foreign_keys = set()
for table in mixed.metadata.tables.values():
if table not in tables:
for constraint in table.constraints:
if isinstance(constraint, sa.sql.schema.ForeignKeyConstraint):
for fk in constraint.elements:
if any(fk.references(t) for t in tables):
referencing_foreign_keys.add(fk)
return referencing_foreign_keys
def merge_references(from_, to, foreign_keys=None):
"""
Merge the references of an entity into another entity.
"""
if from_.__tablename__ != to.__tablename__:
raise TypeError("The tables of given arguments do not match.")
session = object_session(from_)
foreign_keys = get_referencing_foreign_keys(from_.__table__)
for fk in foreign_keys:
old_values = get_foreign_key_values(fk, from_)
new_values = get_foreign_key_values(fk, to)
session.query(from_.__mapper__).filter(*[k == old_values[k] for k in old_values]).update(
new_values, synchronize_session=False
)