Some more on autodetection API interfaces for frontend

* Breaks API for Session and User
This commit is contained in:
Ferdinand Thiessen 2020-10-19 16:48:34 +02:00
parent 28b202cf30
commit addfb7c7c4
10 changed files with 131 additions and 119 deletions

View File

@ -12,6 +12,7 @@ from flaschengeist import logger
from flaschengeist.modules import Plugin
from flaschengeist.system.decorator import login_required
from flaschengeist.system.controller import sessionController, userController, messageController
from flaschengeist.system.models.session import Session
session_controller = LocalProxy(lambda: sessionController.SessionController())
auth_bp = Blueprint("auth", __name__)
@ -21,6 +22,7 @@ class AuthRoutePlugin(Plugin):
def __init__(self, conf):
super().__init__(blueprint=auth_bp)
#################################################
# Routes #
# #
@ -60,21 +62,21 @@ def _login():
# Lets cleanup the DB
session_controller.clear_expired()
return jsonify({"session": session, "permissions": user.get_permissions()})
return jsonify({"session": session, "user": user})
@auth_bp.route("/auth", methods=["GET"])
@login_required()
def _get_tokens(access_token, **kwargs):
tokens = session_controller.get_users_sessions(access_token.user)
a = messageController.Message(access_token.user, "Go", "Bar")
def _get_sessions(access_token: Session, **kwargs):
tokens = session_controller.get_users_sessions(access_token._user)
a = messageController.Message(access_token._user, "Go", "Bar")
messageController.send_message(a)
return jsonify(tokens)
@auth_bp.route("/auth/<token>", methods=["DELETE"])
@login_required()
def _delete_token(access_token, token, **kwargs):
def _delete_session(access_token, token, **kwargs):
logger.debug("Try to delete access token {{ {} }}".format(token))
token = session_controller.get_session(token, access_token.user)
if not token:
@ -89,14 +91,26 @@ def _delete_token(access_token, token, **kwargs):
@auth_bp.route("/auth/<token>", methods=["GET"])
@login_required()
def _get_token(token, access_token, **kwargs):
def _get_session(token, access_token, **kwargs):
logger.debug("get token {{ {} }}".format(token))
session = session_controller.get_session(token, access_token.user)
if not token:
# Return 403 error, so that users can not bruteforce tokens
# Valid tokens from other users and invalid tokens now are looking the same
raise Forbidden
return jsonify({"session": session, "permissions": session.user.get_permissions()})
return jsonify(session)
@auth_bp.route("/auth/<token>/user", methods=["GET"])
@login_required()
def _get_assocd_user(token, access_token, **kwargs):
logger.debug("get token {{ {} }}".format(token))
session = session_controller.get_session(token, access_token.user)
if not token:
# Return 403 error, so that users can not bruteforce tokens
# Valid tokens from other users and invalid tokens now are looking the same
raise Forbidden
return jsonify(session._user)
@auth_bp.route("/auth/<token>", methods=["PUT"])

View File

@ -12,6 +12,7 @@ class RolesPlugin(Plugin):
def __init__(self, config):
super().__init__(config, roles_bp)
######################################################
# Routes #
# #

View File

@ -16,6 +16,7 @@ class SchedulePlugin(Plugin):
def __init__(self, config):
super().__init__(blueprint=schedule_bp)
####################################################################################
# Routes #
# #

View File

@ -62,7 +62,11 @@ class SessionController(metaclass=Singleton):
logger.debug("create access token")
token_str = secrets.token_hex(16)
session = Session(
token=token_str, user=user, lifetime=self.lifetime, browser=user_agent.browser, platform=user_agent.platform
token=token_str,
_user=user,
lifetime=self.lifetime,
browser=user_agent.browser,
platform=user_agent.platform,
)
session.refresh()
db.session.add(session)
@ -89,7 +93,7 @@ class SessionController(metaclass=Singleton):
return session
def get_users_sessions(self, user):
return Session.query.filter(Session.user == user)
return Session.query.filter(Session._user == user)
@staticmethod
def delete_session(token: Session):

View File

@ -19,6 +19,8 @@ def login_user(username, password):
def update_user(user):
current_app.config["FG_AUTH_BACKEND"].update_user(user)
if not user.display_name:
user.display_name = user.firstname[0] + user.lastname
db.session.commit()

View File

@ -17,4 +17,4 @@ class HookCall(object):
def __call__(self, function):
_hook_dict[self.name] = function
return function
return function

View File

@ -0,0 +1,7 @@
class ModelSerializeMixin:
def serialize(self):
d = {param: getattr(self, param) for param in self.__class__.__annotations__ if not param.startswith("_")}
if len(d) == 1:
key, value = d.popitem()
return value
return d

View File

@ -1,12 +1,13 @@
from datetime import datetime, timedelta, timezone
from . import ModelSerializeMixin
from .user import User
from ..database import db
from secrets import compare_digest
from flaschengeist import logger
class Session(db.Model):
class Session(db.Model, ModelSerializeMixin):
"""Model for a Session
Args:
@ -16,16 +17,16 @@ class Session(db.Model):
"""
__tablename__ = "session"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
user: User = db.relationship("User", back_populates="sessions")
expires: datetime = db.Column(db.DateTime)
token: str = db.Column(db.String(32), unique=True)
lifetime: int = db.Column(db.Integer)
browser: str = db.Column(db.String(30))
platform: str = db.Column(db.String(30))
_id = db.Column("id", db.Integer, primary_key=True)
_user: User = db.relationship("User", back_populates="_sessions")
_user_id = db.Column("user_id", db.Integer, db.ForeignKey("user.id"))
def refresh(self):
"""Update the Timestamp
@ -33,21 +34,7 @@ class Session(db.Model):
"""
logger.debug("update timestamp from session with token {{ {} }}".format(self))
self.expires = datetime.utcnow() + timedelta(seconds=self.lifetime)
def serialize(self):
"""Create Dic to dump in JSON
Returns:
A Dic with static Attributes.
"""
return {
"token": self.token,
"expires": self.expires.replace(tzinfo=timezone.utc),
"lifetime": self.lifetime,
"user": self.user,
"browser": self.browser,
"platform": self.platform,
}
self.expires.replace(tzinfo=timezone.utc)
def __eq__(self, token):
return compare_digest(self.token, token)

View File

@ -1,9 +1,10 @@
from ..database import db
from sqlalchemy.orm.collections import attribute_mapped_collection
from flask import current_app
from werkzeug.local import LocalProxy
from typing import List
logger = LocalProxy(lambda: current_app.logger)
from sqlalchemy.orm.collections import attribute_mapped_collection
from . import ModelSerializeMixin
from ..database import db
from ... import logger
association_table = db.Table(
"user_x_role",
@ -11,8 +12,31 @@ association_table = db.Table(
db.Column("role_id", db.Integer, db.ForeignKey("role.id")),
)
role_permission_association_table = db.Table(
"role_x_permission",
db.Column("role_id", db.Integer, db.ForeignKey("role.id")),
db.Column("permission_id", db.Integer, db.ForeignKey("permission.id")),
)
class User(db.Model):
class Permission(db.Model, ModelSerializeMixin):
__tablename__ = "permission"
name: str = db.Column(db.String(30), unique=True)
_id = db.Column("id", db.Integer, primary_key=True)
class Role(db.Model, ModelSerializeMixin):
__tablename__ = "role"
name: str = db.Column(db.String(30), unique=True)
permissions: [Permission] = db.relationship(
"Permission", secondary=role_permission_association_table, cascade="all, delete"
)
_id = db.Column("id", db.Integer, primary_key=True)
class User(db.Model, ModelSerializeMixin):
"""Database Object for User
Table for all saved User
@ -27,23 +51,24 @@ class User(db.Model):
"""
__tablename__ = "user"
id = db.Column(db.Integer, primary_key=True)
userid = db.Column(db.String(30))
display_name = db.Column(db.String(30))
firstname = db.Column(db.String(30))
lastname = db.Column(db.String(30))
mail = db.Column(db.String(30))
roles = db.relationship("Role", secondary=association_table)
sessions = db.relationship("Session", back_populates="user")
attributes = db.relationship(
"UserAttribute", collection_class=attribute_mapped_collection("name"), cascade="all, delete"
userid: str = db.Column(db.String(30))
display_name: str = db.Column(db.String(30))
firstname: str = db.Column(db.String(30))
lastname: str = db.Column(db.String(30))
mail: str = db.Column(db.String(30))
roles: [Role] = db.relationship("Role", secondary=association_table)
_id = db.Column("id", db.Integer, primary_key=True)
_sessions = db.relationship("Session", back_populates="_user")
_attributes = db.relationship(
"_UserAttribute", collection_class=attribute_mapped_collection("name"), cascade="all, delete"
)
def set_attribute(self, name, value):
if name in self.attributes:
self.attributes[name].value = value
if name in self._attributes:
self._attributes[name].value = value
else:
self.attributes[name] = UserAttribute(name=name, value=value)
self._attributes[name] = _UserAttribute(name=name, value=value)
def add_role(self, name):
r = Role.query.filter_by(name=name).first()
@ -74,46 +99,10 @@ class User(db.Model):
return True
return False
def serialize(self):
return {
"userid": self.userid,
"display_name": self.display_name,
"firstname": self.firstname,
"lastname": self.lastname,
"mail": self.mail,
"roles": [r.name for r in self.roles],
}
class UserAttribute(db.Model):
class _UserAttribute(db.Model, ModelSerializeMixin):
__tablename__ = "user_attribute"
id = db.Column(db.Integer, primary_key=True)
user = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False)
name = db.Column(db.String(30))
value = db.Column(db.String(192))
role_permission_association_table = db.Table(
"role_x_permission",
db.Column("role_id", db.Integer, db.ForeignKey("role.id")),
db.Column("permission_id", db.Integer, db.ForeignKey("permission.id")),
)
class Role(db.Model):
__tablename__ = "role"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(30), unique=True)
permissions = db.relationship("Permission", secondary=role_permission_association_table, cascade="all, delete")
def serialize(self):
return self.name
class Permission(db.Model):
__tablename__ = "permission"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(30), unique=True)
def serialize(self):
return self.name
id = db.Column("id", db.Integer, primary_key=True)
user: User = db.Column("user", db.Integer, db.ForeignKey("user.id"), nullable=False)
name: str = db.Column(db.String(30))
value: str = db.Column(db.String(192))

View File

@ -1,6 +1,8 @@
#!/usr/bin/python3
import inspect
import argparse
from datetime import datetime
import bjoern
import sqlalchemy
from sqlalchemy.orm import RelationshipProperty, ColumnProperty
@ -27,47 +29,53 @@ def run(arguments):
def export(arguments):
import flaschengeist.system.models as models
known = []
done = []
classes = {}
def orm_type(attr):
if hasattr(attr, 'type'):
tt = attr.type
if isinstance(attr, ColumnProperty):
tt = attr.columns[0].type
elif isinstance(attr, RelationshipProperty):
proto = "{}"
if attr.key.endswith("s"):
proto = "Array<{}>"
return proto.format(attr.mapper.class_.__name__)
def pytype(cls):
if isinstance(cls, list):
return "Array<{}>".format(pytype(cls[0]))
mapper = {"str": "string", "int": "number", "datetime": "Date"}
if cls.__name__ in mapper:
return mapper[cls.__name__]
else:
raise TypeError("Couldn't inspect type.")
tt = tt.__str__().split("(")[0]
return {"INTEGER": "number",
"VARCHAR": "string",
"DATETIME": "Date"}[tt]
return cls.__name__
def walker(mod, file):
if inspect.isclass(mod[1]) and mod[1].__module__.startswith(models.__name__) and mod[0] not in done:
mapper = sqlalchemy.inspect(mod[1], False)
if mapper is not None:
file.write("interface {} {{\n".format(mod[0]))
for desc in mapper.attrs:
file.write(" {}: {};\n".format(desc.key, orm_type(desc)))
file.write("}\n")
done.append(mod[0])
elif inspect.ismodule(mod[1]) and mod[1].__name__.startswith(models.__name__) and mod[1].__name__ not in known:
def walker(mod):
if inspect.ismodule(mod[1]) and mod[1].__name__.startswith(models.__name__) and mod[1].__name__ not in known:
known.append(mod[1].__name__)
for cls in inspect.getmembers(mod[1], lambda x: inspect.isclass(x) or inspect.ismodule(x)):
walker(cls, file)
walker(cls)
elif (
inspect.isclass(mod[1])
and mod[1].__module__.startswith(models.__name__)
and mod[0] not in classes
and not mod[0].startswith("_")
and hasattr(mod[1], "__annotations__")
):
d = {param: pytype(ptype) for param, ptype in mod[1].__annotations__.items() if not param.startswith("_")}
if len(d) == 1:
key, value = d.popitem()
classes[mod[0]] = value
else:
classes[mod[0]] = d
from flaschengeist.app import create_app
app = create_app()
with app.app_context():
walker(("models", models))
with open(arguments.file, "w") as file:
walker(("models", models), file)
for cls, params in classes.items():
if isinstance(params, str):
file.write("type {} = {};\n".format(cls, params))
else:
file.write("interface {} {{\n".format(cls))
for name in params:
file.write(" {}: {};\n".format(name, params[name]))
file.write("}\n")
if __name__ == "__main__":
@ -88,6 +96,5 @@ if __name__ == "__main__":
parser_export.set_defaults(func=export)
parser_export.add_argument("--file", help="Filename where to save", default="flaschengeist.d.ts")
args = parser.parse_args()
args.func(args)