[feat][apikey] update model, add logic, add routes

This commit is contained in:
Tim Gröger 2024-10-15 07:27:29 +02:00
parent 2f7fdec492
commit 7dd3321246
5 changed files with 198 additions and 5 deletions

View File

@ -0,0 +1,37 @@
"""add name and description to api_key
Revision ID: 49118ea16b56
Revises: f9aa4cafa982
Create Date: 2024-10-14 08:15:16.348090
"""
import sqlalchemy as sa
from alembic import op
import flaschengeist
# revision identifiers, used by Alembic.
revision = "49118ea16b56"
down_revision = "f9aa4cafa982"
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("api_key", schema=None) as batch_op:
batch_op.add_column(sa.Column("name", sa.String(length=32), nullable=True))
batch_op.add_column(sa.Column("description", sa.String(length=255), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("api_key", schema=None) as batch_op:
batch_op.drop_column("description")
batch_op.drop_column("name")
# ### end Alembic commands ###

View File

@ -0,0 +1,76 @@
import secrets
from .. import logger
from ..database import db
from ..models import ApiKey
def validate_api_key(api_key, permission):
"""Verify api key
Verify a ApiKey so if the User has permission or not.
Retrieves the access token if valid else retrieves False
Args:
api_key: ApiKey to verify
permission: Permission needed to access restricted routes
Returns:
A ApiKey for this given Token
Raises:
Unauthorized: If api key is invalid
Forbidden: If permission is insufficient
"""
logger.debug("check api_key {{ {} }} is valid".format(api_key))
api_key = ApiKey.query.filter_by(api_key=api_key).one_or_none()
if api_key:
logger.debug("api_key found")
if not permission or api_key.user_.has_permission(permission):
return api_key
else:
raise Forbidden
logger.debug("no valid api key with api_key: {{ {} }} and permission: {{ {} }}".format(api_key, permission))
raise Unauthorized
def create(user, name, description=None) -> ApiKey:
"""Create a ApiKey
Args:
user: For which User is to create a ApiKey
Returns:
A ApiKey for this given User
"""
logger.debug("create api key token")
token_str = secrets.token_hex(16)
logger.debug("create api_key for user {{ {} }}".format(user))
api_key = ApiKey(_user_id=user.id__, name=name, description=description, token=token_str)
db.session.add(api_key)
db.session.commit()
return api_key
def get_users_api_keys(user) -> list[ApiKey]:
"""Get all ApiKeys for a User
Args:
user: For which User is to get all ApiKeys
Returns:
List of ApiKeys for this given User
"""
return ApiKey.query.filter(ApiKey._user_id == user.id_).all()
def delete_api_key(api_key):
"""Delete a ApiKey
Args:
api_key: ApiKey to delete
"""
logger.debug(f"delete api_key {{ {api_key} }} {{ {type(api_key)} }}")
if isinstance(api_key, int):
api_key = ApiKey.query.get(api_key)
logger.debug("delete api_key {{ {} }}".format(api_key.token))
db.session.delete(api_key)
db.session.commit()

View File

@ -1,5 +1,4 @@
from __future__ import \
annotations # TODO: Remove if python requirement is >= 3.12 (? PEP 563 is defered)
from __future__ import annotations # TODO: Remove if python requirement is >= 3.12 (? PEP 563 is defered)
from datetime import datetime, timedelta, timezone
from secrets import compare_digest
@ -22,10 +21,12 @@ class ApiKey(db.Model, ModelSerializeMixin):
__tablename__ = "api_key"
expires: datetime = db.Column(UtcDateTime, nullable=True)
token: str = db.Column(db.String(32), unique=True)
name: str = db.Column(db.String(32))
description: str = db.Column(db.String(255), nullable=True)
lifetime: int = db.Column(db.Integer, nullable=True)
userid: str = ""
_id = db.Column("id", Serial, primary_key=True)
id: int = db.Column("id", Serial, primary_key=True)
_user_id = db.Column("user_id", Serial, db.ForeignKey("user.id"))
user_: User = db.relationship("User", back_populates="api_keys_")

View File

@ -11,7 +11,7 @@ from werkzeug.exceptions import BadRequest, Forbidden, MethodNotAllowed
from flaschengeist import logger
from flaschengeist.config import config
from flaschengeist.controller import userController
from flaschengeist.controller import apiKeyController, userController
from flaschengeist.models import User
from flaschengeist.plugins import Plugin
from flaschengeist.utils.datetime import from_iso_format
@ -281,3 +281,64 @@ def settings(userid, setting, current_session):
user.set_attribute(setting, data)
userController.persist()
return no_content()
@UsersPlugin.blueprint.route("/users/<userid>/api_keys", methods=["GET"])
@login_required()
def get_users_api_keys(userid, current_session):
"""Get all API keys of a user
Route: ``/users/<userid>/api_keys`` | Method: ``GET``
Args:
userid: UserID of user to retrieve
current_session: Session sent with Authorization Header
Returns:
JSON encoded array of `flaschengeist.models.api_key.ApiKey` or HTTP error
"""
if userid != current_session.user_.userid:
raise Unauthorized
return jsonify(apiKeyController.get_users_api_keys(current_session.user_))
@UsersPlugin.blueprint.route("/users/<userid>/api_keys", methods=["POST"])
@login_required()
def create_api_key(userid, current_session):
"""Create a new API key for a user
Route: ``/users/<userid>/api_keys`` | Method: ``POST``
Args:
userid: UserID of user to retrieve
current_session: Session sent with Authorization Header
Returns:
JSON encoded `flaschengeist.models.api_key.ApiKey` or HTTP error
"""
data = request.get_json()
if not data or "name" not in data:
raise BadRequest
if userid != current_session.user_.userid:
raise Unauthorized
return jsonify(apiKeyController.create(current_session.user_, data["name"], data.get("description", None)))
@UsersPlugin.blueprint.route("/users/<userid>/api_keys/<int:keyid>", methods=["DELETE"])
@login_required()
def delete_api_key(userid, keyid, current_session):
"""Delete an API key for a user
Route: ``/users/<userid>/api_keys/<keyid>`` | Method: ``DELETE``
Args:
userid: UserID of user to retrieve
keyid: KeyID of the API key to delete
current_session: Session sent with Authorization Header
Returns:
HTTP-204 or HTTP error
"""
if userid != current_session.user_.userid:
raise Unauthorized
apiKeyController.delete_api_key(keyid)
return no_content()

View File

@ -1,10 +1,25 @@
from functools import wraps
from werkzeug.exceptions import Unauthorized
from flaschengeist import logger
from flaschengeist.controller import sessionController
def extract_api_key(permission=None):
from flask import request
try:
api_key = request.headers.get("X-API-KEY")
logger.debug(f"api_key {{ {api_key} }} | headers {{ {request.headers} }}")
except AttributeError:
logger.debug("Missing X-API-KEY header")
raise Unauthorized
session = sessionController.validate_api_key(api_key, request.headers, permission)
return session
def extract_session(permission=None):
from flask import request
@ -32,7 +47,10 @@ def login_required(permission=None):
def wrap(func):
@wraps(func)
def wrapped_f(*args, **kwargs):
session = extract_session(permission)
try:
session = extract_session(permission)
except Unauthorized:
session = extract_api_key(permission)
kwargs["current_session"] = session
logger.debug("token {{ {} }} is valid".format(session.token))
return func(*args, **kwargs)