[Doc] More documentation on decorator and plugins roles and auth*

This commit is contained in:
Ferdinand Thiessen 2020-10-30 04:05:59 +01:00
parent a5d3b837cd
commit 8a9776ae0e
6 changed files with 134 additions and 65 deletions

View File

@ -10,8 +10,8 @@ def get_all():
return Role.query.all()
def get(rid):
role = Role.query.get(rid).one_or_none()
def get(role_name):
role = Role.query.filter(Role.name == role_name).one_or_none()
if not role:
raise NotFound
@ -42,11 +42,11 @@ def create_permissions(permissions):
db.session.commit()
def create_role(name, permissions=[]):
def create_role(name: str, permissions=[]):
role = Role(name=name)
db.session.add(role)
set_permissions(role, permissions)
return role.id
return role
def delete(role):

View File

@ -7,6 +7,15 @@ from flaschengeist.controller import sessionController
def login_required(permission=None):
"""Decorator use to make a route only accessible by logged in users.
Sets ``current_session`` into kwargs of wrapped function with session identified by Authorization header.
Attributes:
permission: Optional permission needed for this route
Returns:
Wrapped function with login (and permission) guard
"""
def wrap(func):
@wraps(func)
def wrapped_f(*args, **kwargs):

View File

@ -28,8 +28,8 @@ def login():
POST-data: {'userid': string, 'password': string}
Returns:
A JSON object with `flaschengeist.system.models.user.User` and created
`flaschengeist.system.models.session.Session` or HTTP error
A JSON object with `flaschengeist.models.user.User` and created
`flaschengeist.models.session.Session` or HTTP error
"""
logger.debug("Start log in.")
data = request.get_json()
@ -60,7 +60,7 @@ def get_sessions(current_session, **kwargs):
Route: ``/auth`` | Method: ``GET``
Returns:
A JSON array of `flaschengeist.system.models.session.Session` or HTTP error
A JSON array of `flaschengeist.models.session.Session` or HTTP error
"""
sessions = sessionController.get_users_sessions(current_session._user)
return jsonify(sessions)
@ -100,7 +100,7 @@ def get_session(token, current_session, **kwargs):
current_session: Session sent with Authorization Header
Returns:
JSON encoded `flaschengeist.system.models.session.Session` or HTTP error
JSON encoded `flaschengeist.models.session.Session` or HTTP error
"""
logger.debug("get token {{ {} }}".format(token))
session = sessionController.get_session(token, current_session._user)
@ -153,7 +153,7 @@ def get_assocd_user(token, current_session, **kwargs):
current_session: Session sent with Authorization Header
Returns:
JSON encoded `flaschengeist.system.models.user.User` or HTTP error
JSON encoded `flaschengeist.models.user.User` or HTTP error
"""
logger.debug("get token {{ {} }}".format(token))
session = sessionController.get_session(token, current_session._user)

View File

@ -1,3 +1,5 @@
"""LDAP Authentication Provider Plugin"""
import ssl
from ldap3.utils.hashed import hashed
from ldap3 import SUBTREE, MODIFY_REPLACE, HASHED_SALTED_MD5

View File

@ -1,36 +1,40 @@
import binascii
import hashlib
import os
"""Authentication Provider Plugin
Allows simple authentication using Username-Password pair with password saved into
Flaschengeist database (as User attribute)
"""
import os
import hashlib
import binascii
from werkzeug.exceptions import BadRequest
from flaschengeist.plugins import AuthPlugin
from flaschengeist.models.user import User
def _hash_password(password):
salt = hashlib.sha256(os.urandom(60)).hexdigest().encode("ascii")
pass_hash = hashlib.pbkdf2_hmac("sha3-512", password.encode("utf-8"), salt, 100000)
pass_hash = binascii.hexlify(pass_hash)
return (salt + pass_hash).decode("ascii")
def _verify_password(stored_password, provided_password):
salt = stored_password[:64]
stored_password = stored_password[64:]
pass_hash = hashlib.pbkdf2_hmac("sha3-512", provided_password.encode("utf-8"), salt.encode("ascii"), 100000)
pass_hash = binascii.hexlify(pass_hash).decode("ascii")
return pass_hash == stored_password
class AuthPlain(AuthPlugin):
def login(self, user: User, password: str):
if user.has_attribute("password"):
return _verify_password(user.get_attribute("password"), password)
return AuthPlain._verify_password(user.get_attribute("password"), password)
return False
def modify_user(self, user, password, new_password=None):
if password is not None and not self.login(user, password):
raise BadRequest
if new_password:
user.set_attribute("password", _hash_password(new_password))
user.set_attribute("password", AuthPlain._hash_password(new_password))
@staticmethod
def _hash_password(password):
salt = hashlib.sha256(os.urandom(60)).hexdigest().encode("ascii")
pass_hash = hashlib.pbkdf2_hmac("sha3-512", password.encode("utf-8"), salt, 100000)
pass_hash = binascii.hexlify(pass_hash)
return (salt + pass_hash).decode("ascii")
@staticmethod
def _verify_password(stored_password, provided_password):
salt = stored_password[:64]
stored_password = stored_password[64:]
pass_hash = hashlib.pbkdf2_hmac("sha3-512", provided_password.encode("utf-8"), salt.encode("ascii"), 100000)
pass_hash = binascii.hexlify(pass_hash).decode("ascii")
return pass_hash == stored_password

View File

@ -1,3 +1,8 @@
"""Roles plugin
Provides routes used to configure roles and permissions of users / roles.
"""
from flask import Blueprint, request, jsonify
from werkzeug.exceptions import NotFound, BadRequest
@ -15,57 +20,98 @@ class RolesPlugin(Plugin):
super().__init__(config, roles_bp, permissions=[_permission_edit, _permission_delete])
######################################################
# Routes #
# #
# /roles POST: register new #
# GET: get all roles #
# /roles/permissions GET: get all permissions #
# /roles/<rid> GET: get role with rid #
# PUT: modify role / permission #
# DELETE: remove role #
######################################################
@roles_bp.route("/roles", methods=["GET"])
@login_required()
def list_roles(current_session):
"""List all existing roles
Route: ``/roles`` | Method: ``GET``
Args:
current_session: Session sent with Authorization Header
Returns:
JSON encodes array of `flaschengeist.models.user.Role`
"""
roles = roleController.get_all()
return jsonify(roles)
@roles_bp.route("/roles", methods=["POST"])
@login_required(permission=_permission_edit)
def add_role(**kwargs):
def create_role(current_session):
"""Create new role
Route: ``/roles`` | Method: ``POST``
POST-data: ``{name: string, permissions?: string[]}``
Args:
current_session: Session sent with Authorization Header
Returns:
HTTP-200 or HTTP error
"""
data = request.get_json()
if not data or "name" not in data:
raise BadRequest
if "permissions" in data:
permissions = data["permissions"]
role = roleController.create_role(data["name"], permissions)
return jsonify({"ok": "ok", "id": role.id})
@roles_bp.route("/roles", methods=["GET"])
@login_required()
def list_roles(**kwargs):
roles = roleController.get_all()
return jsonify(roles)
roleController.create_role(data["name"], permissions)
@roles_bp.route("/roles/permissions", methods=["GET"])
@login_required()
def list_permissions(**kwargs):
def list_permissions(current_session):
"""List all existing permissions
Route: ``/roles/permissions`` | Method: ``GET``
Args:
current_session: Session sent with Authorization Header
Returns:
JSON encoded list of `flaschengeist.models.user.Permission`
"""
permissions = roleController.get_permissions()
return jsonify(permissions)
@roles_bp.route("/roles/<rid>", methods=["GET"])
@roles_bp.route("/roles/<role_name>", methods=["GET"])
@login_required()
def __get_role(rid, **kwargs):
role = roleController.get(rid)
if role:
return jsonify({"id": role.id, "name": role, "permissions": role.permissions})
raise NotFound
def get_role(role_name, current_session):
"""Get role by name
Route: ``/roles/<role_name>`` | Method: ``GET``
Args:
role_name: Name of role to retrieve
current_session: Session sent with Authorization Header
Returns:
JSON encoded `flaschengeist.models.user.Role` or HTTP error
"""
role = roleController.get(role_name)
return jsonify(role)
@roles_bp.route("/roles/<rid>", methods=["PUT"])
@roles_bp.route("/roles/<role_name>", methods=["PUT"])
@login_required(permission=_permission_edit)
def __edit_role(rid, **kwargs):
role = roleController.get(rid)
def edit_role(role_name, current_session):
"""Edit role, rename and / or set permissions
Route: ``/roles/<role_name>`` | Method: ``PUT``
POST-data: ``{name?: string, permissions?: string[]}``
Args:
role_name: Name of role
current_session: Session sent with Authorization Header
Returns:
HTTP-200 or HTTP error
"""
role = roleController.get(role_name)
data = request.get_json()
if "name" in data:
@ -73,13 +119,21 @@ def __edit_role(rid, **kwargs):
if "permissions" in data:
roleController.set_permissions(role, data["permissions"])
roleController.update_role(role)
return jsonify({"ok": "ok"})
@roles_bp.route("/roles/<rid>", methods=["DELETE"])
@roles_bp.route("/roles/<role_name>", methods=["DELETE"])
@login_required(permission=_permission_delete)
def __delete_role(rid, **kwargs):
role = roleController.get(rid)
roleController.delete(role)
def delete_role(role_name, current_session):
"""Delete role
return jsonify({"ok": "ok"})
Route: ``/roles/<role_name>`` | Method: ``DELETE``
Args:
role_name: Name of role
current_session: Session sent with Authorization Header
Returns:
HTTP-200 or HTTP error
"""
role = roleController.get(role_name)
roleController.delete(role)