Compare commits

..

No commits in common. "7dd3321246ccadb72318a7489ac70f0a71022089" and "81080404fb56ac80557c2bf78dbbbbbbbdaab75f" have entirely different histories.

8 changed files with 10 additions and 294 deletions

View File

@ -1,37 +0,0 @@
"""add name and description to api_key
Revision ID: 49118ea16b56
Revises: f9aa4cafa982
Create Date: 2024-10-14 08:15:16.348090
"""
import sqlalchemy as sa
from alembic import op
import flaschengeist
# revision identifiers, used by Alembic.
revision = "49118ea16b56"
down_revision = "f9aa4cafa982"
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("api_key", schema=None) as batch_op:
batch_op.add_column(sa.Column("name", sa.String(length=32), nullable=True))
batch_op.add_column(sa.Column("description", sa.String(length=255), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("api_key", schema=None) as batch_op:
batch_op.drop_column("description")
batch_op.drop_column("name")
# ### end Alembic commands ###

View File

@ -1,41 +0,0 @@
"""Add APIKeys
Revision ID: f9aa4cafa982
Revises: 20482a003db8
Create Date: 2024-10-11 13:04:21.877288
"""
import sqlalchemy as sa
from alembic import op
import flaschengeist
# revision identifiers, used by Alembic.
revision = "f9aa4cafa982"
down_revision = "20482a003db8"
branch_labels = ()
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"api_key",
sa.Column("expires", flaschengeist.database.types.UtcDateTime(), nullable=True),
sa.Column("token", sa.String(length=32), nullable=True),
sa.Column("lifetime", sa.Integer(), nullable=True),
sa.Column("id", flaschengeist.database.types.Serial(), nullable=False),
sa.Column("user_id", flaschengeist.database.types.Serial(), nullable=True),
sa.ForeignKeyConstraint(["user_id"], ["user.id"], name=op.f("fk_api_key_user_id_user")),
sa.PrimaryKeyConstraint("id", name=op.f("pk_api_key")),
sa.UniqueConstraint("token", name=op.f("uq_api_key_token")),
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("api_key")
# ### end Alembic commands ###

View File

@ -1,76 +0,0 @@
import secrets
from .. import logger
from ..database import db
from ..models import ApiKey
def validate_api_key(api_key, permission):
"""Verify api key
Verify a ApiKey so if the User has permission or not.
Retrieves the access token if valid else retrieves False
Args:
api_key: ApiKey to verify
permission: Permission needed to access restricted routes
Returns:
A ApiKey for this given Token
Raises:
Unauthorized: If api key is invalid
Forbidden: If permission is insufficient
"""
logger.debug("check api_key {{ {} }} is valid".format(api_key))
api_key = ApiKey.query.filter_by(api_key=api_key).one_or_none()
if api_key:
logger.debug("api_key found")
if not permission or api_key.user_.has_permission(permission):
return api_key
else:
raise Forbidden
logger.debug("no valid api key with api_key: {{ {} }} and permission: {{ {} }}".format(api_key, permission))
raise Unauthorized
def create(user, name, description=None) -> ApiKey:
"""Create a ApiKey
Args:
user: For which User is to create a ApiKey
Returns:
A ApiKey for this given User
"""
logger.debug("create api key token")
token_str = secrets.token_hex(16)
logger.debug("create api_key for user {{ {} }}".format(user))
api_key = ApiKey(_user_id=user.id__, name=name, description=description, token=token_str)
db.session.add(api_key)
db.session.commit()
return api_key
def get_users_api_keys(user) -> list[ApiKey]:
"""Get all ApiKeys for a User
Args:
user: For which User is to get all ApiKeys
Returns:
List of ApiKeys for this given User
"""
return ApiKey.query.filter(ApiKey._user_id == user.id_).all()
def delete_api_key(api_key):
"""Delete a ApiKey
Args:
api_key: ApiKey to delete
"""
logger.debug(f"delete api_key {{ {api_key} }} {{ {type(api_key)} }}")
if isinstance(api_key, int):
api_key = ApiKey.query.get(api_key)
logger.debug("delete api_key {{ {} }}".format(api_key.token))
db.session.delete(api_key)
db.session.commit()

View File

@ -1,6 +1,5 @@
from .api_key import *
from .image import *
from .notification import *
from .plugin import *
from .session import *
from .user import *
from .plugin import *
from .notification import *
from .image import *

View File

@ -1,49 +0,0 @@
from __future__ import annotations # TODO: Remove if python requirement is >= 3.12 (? PEP 563 is defered)
from datetime import datetime, timedelta, timezone
from secrets import compare_digest
from .. import logger
from ..database import db
from ..database.types import ModelSerializeMixin, Serial, UtcDateTime
class ApiKey(db.Model, ModelSerializeMixin):
"""Model for a Session
Args:
expires: Is a Datetime from current Time.
user: Is an User.
token: String to verify access later.
"""
__allow_unmapped__ = True
__tablename__ = "api_key"
expires: datetime = db.Column(UtcDateTime, nullable=True)
token: str = db.Column(db.String(32), unique=True)
name: str = db.Column(db.String(32))
description: str = db.Column(db.String(255), nullable=True)
lifetime: int = db.Column(db.Integer, nullable=True)
userid: str = ""
id: int = db.Column("id", Serial, primary_key=True)
_user_id = db.Column("user_id", Serial, db.ForeignKey("user.id"))
user_: User = db.relationship("User", back_populates="api_keys_")
@property
def userid(self):
return self.user_.userid
def refresh(self):
"""Update the Timestamp
Update the Timestamp to the current Time.
"""
logger.debug("update timestamp from session with token {{ {} }}".format(self.token))
self.expires = datetime.now(timezone.utc) + timedelta(seconds=self.lifetime)
def __eq__(self, token):
if isinstance(token, str):
return compare_digest(self.token, token)
else:
return super(Session, self).__eq__(token)

View File

@ -1,13 +1,13 @@
from __future__ import \
annotations # TODO: Remove if python requirement is >= 3.12 (? PEP 563 is defered)
from __future__ import (
annotations,
) # TODO: Remove if python requirement is >= 3.12 (? PEP 563 is defered)
from typing import Optional, Union, List
from datetime import date, datetime
from typing import List, Optional, Union
from sqlalchemy.orm.collections import attribute_mapped_collection
from ..database import db
from ..database.types import ModelSerializeMixin, Serial, UtcDateTime
from ..database.types import ModelSerializeMixin, UtcDateTime, Serial
association_table = db.Table(
"user_x_role",
@ -71,7 +71,6 @@ class User(db.Model, ModelSerializeMixin):
id_ = db.Column("id", Serial, primary_key=True)
roles_: List[Role] = db.relationship("Role", secondary=association_table, cascade="save-update, merge")
sessions_: List[Session] = db.relationship("Session", back_populates="user_", cascade="all, delete, delete-orphan")
api_keys_: List[ApiKey] = db.relationship("ApiKey", back_populates="user_", cascade="all, delete, delete-orphan")
avatar_: Optional[Image] = db.relationship("Image", cascade="all, delete, delete-orphan", single_parent=True)
reset_requests_: List["_PasswordReset"] = db.relationship("_PasswordReset", cascade="all, delete, delete-orphan")

View File

@ -11,7 +11,7 @@ from werkzeug.exceptions import BadRequest, Forbidden, MethodNotAllowed
from flaschengeist import logger
from flaschengeist.config import config
from flaschengeist.controller import apiKeyController, userController
from flaschengeist.controller import userController
from flaschengeist.models import User
from flaschengeist.plugins import Plugin
from flaschengeist.utils.datetime import from_iso_format
@ -281,64 +281,3 @@ def settings(userid, setting, current_session):
user.set_attribute(setting, data)
userController.persist()
return no_content()
@UsersPlugin.blueprint.route("/users/<userid>/api_keys", methods=["GET"])
@login_required()
def get_users_api_keys(userid, current_session):
"""Get all API keys of a user
Route: ``/users/<userid>/api_keys`` | Method: ``GET``
Args:
userid: UserID of user to retrieve
current_session: Session sent with Authorization Header
Returns:
JSON encoded array of `flaschengeist.models.api_key.ApiKey` or HTTP error
"""
if userid != current_session.user_.userid:
raise Unauthorized
return jsonify(apiKeyController.get_users_api_keys(current_session.user_))
@UsersPlugin.blueprint.route("/users/<userid>/api_keys", methods=["POST"])
@login_required()
def create_api_key(userid, current_session):
"""Create a new API key for a user
Route: ``/users/<userid>/api_keys`` | Method: ``POST``
Args:
userid: UserID of user to retrieve
current_session: Session sent with Authorization Header
Returns:
JSON encoded `flaschengeist.models.api_key.ApiKey` or HTTP error
"""
data = request.get_json()
if not data or "name" not in data:
raise BadRequest
if userid != current_session.user_.userid:
raise Unauthorized
return jsonify(apiKeyController.create(current_session.user_, data["name"], data.get("description", None)))
@UsersPlugin.blueprint.route("/users/<userid>/api_keys/<int:keyid>", methods=["DELETE"])
@login_required()
def delete_api_key(userid, keyid, current_session):
"""Delete an API key for a user
Route: ``/users/<userid>/api_keys/<keyid>`` | Method: ``DELETE``
Args:
userid: UserID of user to retrieve
keyid: KeyID of the API key to delete
current_session: Session sent with Authorization Header
Returns:
HTTP-204 or HTTP error
"""
if userid != current_session.user_.userid:
raise Unauthorized
apiKeyController.delete_api_key(keyid)
return no_content()

View File

@ -1,25 +1,10 @@
from functools import wraps
from werkzeug.exceptions import Unauthorized
from flaschengeist import logger
from flaschengeist.controller import sessionController
def extract_api_key(permission=None):
from flask import request
try:
api_key = request.headers.get("X-API-KEY")
logger.debug(f"api_key {{ {api_key} }} | headers {{ {request.headers} }}")
except AttributeError:
logger.debug("Missing X-API-KEY header")
raise Unauthorized
session = sessionController.validate_api_key(api_key, request.headers, permission)
return session
def extract_session(permission=None):
from flask import request
@ -47,10 +32,7 @@ def login_required(permission=None):
def wrap(func):
@wraps(func)
def wrapped_f(*args, **kwargs):
try:
session = extract_session(permission)
except Unauthorized:
session = extract_api_key(permission)
kwargs["current_session"] = session
logger.debug("token {{ {} }} is valid".format(session.token))
return func(*args, **kwargs)