Compare commits
No commits in common. "7dd3321246ccadb72318a7489ac70f0a71022089" and "81080404fb56ac80557c2bf78dbbbbbbbdaab75f" have entirely different histories.
7dd3321246
...
81080404fb
|
@ -1,37 +0,0 @@
|
|||
"""add name and description to api_key
|
||||
|
||||
Revision ID: 49118ea16b56
|
||||
Revises: f9aa4cafa982
|
||||
Create Date: 2024-10-14 08:15:16.348090
|
||||
|
||||
"""
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
import flaschengeist
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "49118ea16b56"
|
||||
down_revision = "f9aa4cafa982"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table("api_key", schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column("name", sa.String(length=32), nullable=True))
|
||||
batch_op.add_column(sa.Column("description", sa.String(length=255), nullable=True))
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
|
||||
with op.batch_alter_table("api_key", schema=None) as batch_op:
|
||||
batch_op.drop_column("description")
|
||||
batch_op.drop_column("name")
|
||||
|
||||
# ### end Alembic commands ###
|
|
@ -1,41 +0,0 @@
|
|||
"""Add APIKeys
|
||||
|
||||
Revision ID: f9aa4cafa982
|
||||
Revises: 20482a003db8
|
||||
Create Date: 2024-10-11 13:04:21.877288
|
||||
|
||||
"""
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
import flaschengeist
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "f9aa4cafa982"
|
||||
down_revision = "20482a003db8"
|
||||
branch_labels = ()
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table(
|
||||
"api_key",
|
||||
sa.Column("expires", flaschengeist.database.types.UtcDateTime(), nullable=True),
|
||||
sa.Column("token", sa.String(length=32), nullable=True),
|
||||
sa.Column("lifetime", sa.Integer(), nullable=True),
|
||||
sa.Column("id", flaschengeist.database.types.Serial(), nullable=False),
|
||||
sa.Column("user_id", flaschengeist.database.types.Serial(), nullable=True),
|
||||
sa.ForeignKeyConstraint(["user_id"], ["user.id"], name=op.f("fk_api_key_user_id_user")),
|
||||
sa.PrimaryKeyConstraint("id", name=op.f("pk_api_key")),
|
||||
sa.UniqueConstraint("token", name=op.f("uq_api_key_token")),
|
||||
)
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_table("api_key")
|
||||
# ### end Alembic commands ###
|
|
@ -1,76 +0,0 @@
|
|||
import secrets
|
||||
|
||||
from .. import logger
|
||||
from ..database import db
|
||||
from ..models import ApiKey
|
||||
|
||||
|
||||
def validate_api_key(api_key, permission):
|
||||
"""Verify api key
|
||||
|
||||
Verify a ApiKey so if the User has permission or not.
|
||||
Retrieves the access token if valid else retrieves False
|
||||
|
||||
Args:
|
||||
api_key: ApiKey to verify
|
||||
permission: Permission needed to access restricted routes
|
||||
Returns:
|
||||
A ApiKey for this given Token
|
||||
Raises:
|
||||
Unauthorized: If api key is invalid
|
||||
Forbidden: If permission is insufficient
|
||||
"""
|
||||
logger.debug("check api_key {{ {} }} is valid".format(api_key))
|
||||
api_key = ApiKey.query.filter_by(api_key=api_key).one_or_none()
|
||||
if api_key:
|
||||
logger.debug("api_key found")
|
||||
if not permission or api_key.user_.has_permission(permission):
|
||||
return api_key
|
||||
else:
|
||||
raise Forbidden
|
||||
logger.debug("no valid api key with api_key: {{ {} }} and permission: {{ {} }}".format(api_key, permission))
|
||||
raise Unauthorized
|
||||
|
||||
|
||||
def create(user, name, description=None) -> ApiKey:
|
||||
"""Create a ApiKey
|
||||
|
||||
Args:
|
||||
user: For which User is to create a ApiKey
|
||||
|
||||
Returns:
|
||||
A ApiKey for this given User
|
||||
"""
|
||||
logger.debug("create api key token")
|
||||
token_str = secrets.token_hex(16)
|
||||
logger.debug("create api_key for user {{ {} }}".format(user))
|
||||
api_key = ApiKey(_user_id=user.id__, name=name, description=description, token=token_str)
|
||||
db.session.add(api_key)
|
||||
db.session.commit()
|
||||
return api_key
|
||||
|
||||
|
||||
def get_users_api_keys(user) -> list[ApiKey]:
|
||||
"""Get all ApiKeys for a User
|
||||
|
||||
Args:
|
||||
user: For which User is to get all ApiKeys
|
||||
|
||||
Returns:
|
||||
List of ApiKeys for this given User
|
||||
"""
|
||||
return ApiKey.query.filter(ApiKey._user_id == user.id_).all()
|
||||
|
||||
|
||||
def delete_api_key(api_key):
|
||||
"""Delete a ApiKey
|
||||
|
||||
Args:
|
||||
api_key: ApiKey to delete
|
||||
"""
|
||||
logger.debug(f"delete api_key {{ {api_key} }} {{ {type(api_key)} }}")
|
||||
if isinstance(api_key, int):
|
||||
api_key = ApiKey.query.get(api_key)
|
||||
logger.debug("delete api_key {{ {} }}".format(api_key.token))
|
||||
db.session.delete(api_key)
|
||||
db.session.commit()
|
|
@ -1,6 +1,5 @@
|
|||
from .api_key import *
|
||||
from .image import *
|
||||
from .notification import *
|
||||
from .plugin import *
|
||||
from .session import *
|
||||
from .user import *
|
||||
from .plugin import *
|
||||
from .notification import *
|
||||
from .image import *
|
||||
|
|
|
@ -1,49 +0,0 @@
|
|||
from __future__ import annotations # TODO: Remove if python requirement is >= 3.12 (? PEP 563 is defered)
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from secrets import compare_digest
|
||||
|
||||
from .. import logger
|
||||
from ..database import db
|
||||
from ..database.types import ModelSerializeMixin, Serial, UtcDateTime
|
||||
|
||||
|
||||
class ApiKey(db.Model, ModelSerializeMixin):
|
||||
"""Model for a Session
|
||||
|
||||
Args:
|
||||
expires: Is a Datetime from current Time.
|
||||
user: Is an User.
|
||||
token: String to verify access later.
|
||||
"""
|
||||
|
||||
__allow_unmapped__ = True
|
||||
__tablename__ = "api_key"
|
||||
expires: datetime = db.Column(UtcDateTime, nullable=True)
|
||||
token: str = db.Column(db.String(32), unique=True)
|
||||
name: str = db.Column(db.String(32))
|
||||
description: str = db.Column(db.String(255), nullable=True)
|
||||
lifetime: int = db.Column(db.Integer, nullable=True)
|
||||
userid: str = ""
|
||||
|
||||
id: int = db.Column("id", Serial, primary_key=True)
|
||||
_user_id = db.Column("user_id", Serial, db.ForeignKey("user.id"))
|
||||
user_: User = db.relationship("User", back_populates="api_keys_")
|
||||
|
||||
@property
|
||||
def userid(self):
|
||||
return self.user_.userid
|
||||
|
||||
def refresh(self):
|
||||
"""Update the Timestamp
|
||||
|
||||
Update the Timestamp to the current Time.
|
||||
"""
|
||||
logger.debug("update timestamp from session with token {{ {} }}".format(self.token))
|
||||
self.expires = datetime.now(timezone.utc) + timedelta(seconds=self.lifetime)
|
||||
|
||||
def __eq__(self, token):
|
||||
if isinstance(token, str):
|
||||
return compare_digest(self.token, token)
|
||||
else:
|
||||
return super(Session, self).__eq__(token)
|
|
@ -1,13 +1,13 @@
|
|||
from __future__ import \
|
||||
annotations # TODO: Remove if python requirement is >= 3.12 (? PEP 563 is defered)
|
||||
from __future__ import (
|
||||
annotations,
|
||||
) # TODO: Remove if python requirement is >= 3.12 (? PEP 563 is defered)
|
||||
|
||||
from typing import Optional, Union, List
|
||||
from datetime import date, datetime
|
||||
from typing import List, Optional, Union
|
||||
|
||||
from sqlalchemy.orm.collections import attribute_mapped_collection
|
||||
|
||||
from ..database import db
|
||||
from ..database.types import ModelSerializeMixin, Serial, UtcDateTime
|
||||
from ..database.types import ModelSerializeMixin, UtcDateTime, Serial
|
||||
|
||||
association_table = db.Table(
|
||||
"user_x_role",
|
||||
|
@ -71,7 +71,6 @@ class User(db.Model, ModelSerializeMixin):
|
|||
id_ = db.Column("id", Serial, primary_key=True)
|
||||
roles_: List[Role] = db.relationship("Role", secondary=association_table, cascade="save-update, merge")
|
||||
sessions_: List[Session] = db.relationship("Session", back_populates="user_", cascade="all, delete, delete-orphan")
|
||||
api_keys_: List[ApiKey] = db.relationship("ApiKey", back_populates="user_", cascade="all, delete, delete-orphan")
|
||||
avatar_: Optional[Image] = db.relationship("Image", cascade="all, delete, delete-orphan", single_parent=True)
|
||||
reset_requests_: List["_PasswordReset"] = db.relationship("_PasswordReset", cascade="all, delete, delete-orphan")
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ from werkzeug.exceptions import BadRequest, Forbidden, MethodNotAllowed
|
|||
|
||||
from flaschengeist import logger
|
||||
from flaschengeist.config import config
|
||||
from flaschengeist.controller import apiKeyController, userController
|
||||
from flaschengeist.controller import userController
|
||||
from flaschengeist.models import User
|
||||
from flaschengeist.plugins import Plugin
|
||||
from flaschengeist.utils.datetime import from_iso_format
|
||||
|
@ -281,64 +281,3 @@ def settings(userid, setting, current_session):
|
|||
user.set_attribute(setting, data)
|
||||
userController.persist()
|
||||
return no_content()
|
||||
|
||||
|
||||
@UsersPlugin.blueprint.route("/users/<userid>/api_keys", methods=["GET"])
|
||||
@login_required()
|
||||
def get_users_api_keys(userid, current_session):
|
||||
"""Get all API keys of a user
|
||||
|
||||
Route: ``/users/<userid>/api_keys`` | Method: ``GET``
|
||||
Args:
|
||||
userid: UserID of user to retrieve
|
||||
current_session: Session sent with Authorization Header
|
||||
|
||||
Returns:
|
||||
JSON encoded array of `flaschengeist.models.api_key.ApiKey` or HTTP error
|
||||
|
||||
"""
|
||||
if userid != current_session.user_.userid:
|
||||
raise Unauthorized
|
||||
return jsonify(apiKeyController.get_users_api_keys(current_session.user_))
|
||||
|
||||
|
||||
@UsersPlugin.blueprint.route("/users/<userid>/api_keys", methods=["POST"])
|
||||
@login_required()
|
||||
def create_api_key(userid, current_session):
|
||||
"""Create a new API key for a user
|
||||
|
||||
Route: ``/users/<userid>/api_keys`` | Method: ``POST``
|
||||
Args:
|
||||
userid: UserID of user to retrieve
|
||||
current_session: Session sent with Authorization Header
|
||||
|
||||
Returns:
|
||||
JSON encoded `flaschengeist.models.api_key.ApiKey` or HTTP error
|
||||
|
||||
"""
|
||||
data = request.get_json()
|
||||
if not data or "name" not in data:
|
||||
raise BadRequest
|
||||
if userid != current_session.user_.userid:
|
||||
raise Unauthorized
|
||||
return jsonify(apiKeyController.create(current_session.user_, data["name"], data.get("description", None)))
|
||||
|
||||
|
||||
@UsersPlugin.blueprint.route("/users/<userid>/api_keys/<int:keyid>", methods=["DELETE"])
|
||||
@login_required()
|
||||
def delete_api_key(userid, keyid, current_session):
|
||||
"""Delete an API key for a user
|
||||
|
||||
Route: ``/users/<userid>/api_keys/<keyid>`` | Method: ``DELETE``
|
||||
Args:
|
||||
userid: UserID of user to retrieve
|
||||
keyid: KeyID of the API key to delete
|
||||
current_session: Session sent with Authorization Header
|
||||
|
||||
Returns:
|
||||
HTTP-204 or HTTP error
|
||||
"""
|
||||
if userid != current_session.user_.userid:
|
||||
raise Unauthorized
|
||||
apiKeyController.delete_api_key(keyid)
|
||||
return no_content()
|
||||
|
|
|
@ -1,25 +1,10 @@
|
|||
from functools import wraps
|
||||
|
||||
from werkzeug.exceptions import Unauthorized
|
||||
|
||||
from flaschengeist import logger
|
||||
from flaschengeist.controller import sessionController
|
||||
|
||||
|
||||
def extract_api_key(permission=None):
|
||||
from flask import request
|
||||
|
||||
try:
|
||||
api_key = request.headers.get("X-API-KEY")
|
||||
logger.debug(f"api_key {{ {api_key} }} | headers {{ {request.headers} }}")
|
||||
except AttributeError:
|
||||
logger.debug("Missing X-API-KEY header")
|
||||
raise Unauthorized
|
||||
|
||||
session = sessionController.validate_api_key(api_key, request.headers, permission)
|
||||
return session
|
||||
|
||||
|
||||
def extract_session(permission=None):
|
||||
from flask import request
|
||||
|
||||
|
@ -47,10 +32,7 @@ def login_required(permission=None):
|
|||
def wrap(func):
|
||||
@wraps(func)
|
||||
def wrapped_f(*args, **kwargs):
|
||||
try:
|
||||
session = extract_session(permission)
|
||||
except Unauthorized:
|
||||
session = extract_api_key(permission)
|
||||
kwargs["current_session"] = session
|
||||
logger.debug("token {{ {} }} is valid".format(session.token))
|
||||
return func(*args, **kwargs)
|
||||
|
|
Loading…
Reference in New Issue