86 lines
2.4 KiB
Python
86 lines
2.4 KiB
Python
from typing import Union
|
|
from sqlalchemy.exc import IntegrityError
|
|
from werkzeug.exceptions import BadRequest, Conflict, NotFound
|
|
|
|
from flaschengeist import logger
|
|
from flaschengeist.models.user import Role, Permission
|
|
from flaschengeist.database import db, case_sensitive
|
|
from flaschengeist.utils.hook import Hook
|
|
|
|
|
|
def get_all():
|
|
return Role.query.all()
|
|
|
|
|
|
def get(role_name: Union[int, str]) -> Role:
|
|
"""Get role by ID or name
|
|
Args:
|
|
role_name: Name or ID of the role
|
|
Returns:
|
|
Matching role
|
|
Raises:
|
|
NotFound
|
|
"""
|
|
if type(role_name) is int:
|
|
role = Role.query.get(role_name)
|
|
else:
|
|
role = Role.query.filter(Role.name == role_name).one_or_none()
|
|
if not role:
|
|
raise NotFound("no such role")
|
|
return role
|
|
|
|
|
|
def get_permissions():
|
|
return Permission.query.all()
|
|
|
|
|
|
@Hook
|
|
def update_role(role, new_name):
|
|
if new_name is None or not isinstance(new_name, str):
|
|
raise BadRequest("Invalid new name")
|
|
else:
|
|
if role.name == new_name or db.session.query(db.exists().where(Role.name == case_sensitive(new_name))).scalar():
|
|
raise BadRequest("Name already used")
|
|
role.name = new_name
|
|
db.session.commit()
|
|
|
|
|
|
def set_permissions(role, permissions):
|
|
perms = Permission.query.filter(Permission.name.in_(permissions)).all()
|
|
if len(perms) < len(permissions):
|
|
raise BadRequest("Invalid permission name")
|
|
role.permissions = list(perms)
|
|
db.session.commit()
|
|
|
|
|
|
def create_permissions(permissions):
|
|
for permission in permissions:
|
|
if Permission.query.filter(Permission.name == permission).count() > 0:
|
|
continue
|
|
p = Permission(name=permission)
|
|
db.session.add(p)
|
|
db.session.commit()
|
|
|
|
|
|
def create_role(name: str, permissions=[]):
|
|
logger.debug(f"Create new role with name: {name}")
|
|
try:
|
|
role = Role(name=name)
|
|
db.session.add(role)
|
|
set_permissions(role, permissions)
|
|
db.session.commit()
|
|
logger.debug(f"Created role: {role.serialize()}")
|
|
except IntegrityError:
|
|
raise BadRequest("role already exists")
|
|
return role
|
|
|
|
|
|
def delete(role):
|
|
role.permissions.clear()
|
|
try:
|
|
db.session.delete(role)
|
|
db.session.commit()
|
|
except IntegrityError:
|
|
logger.debug("IntegrityError: Role might still be in use", exc_info=True)
|
|
raise Conflict("Role still in use")
|