flaschengeist/run_flaschengeist

227 lines
8.6 KiB
Python

#!/usr/bin/python3
from __future__ import annotations # TODO: Remove if python requirement is >= 3.10
import inspect
import argparse
import sys
import pkg_resources
from os import environ
from flaschengeist.app import create_app, install_all
from flaschengeist.config import config
class PrefixMiddleware(object):
def __init__(self, app, prefix=""):
self.app = app
self.prefix = prefix
def __call__(self, environ, start_response):
if environ["PATH_INFO"].startswith(self.prefix):
environ["PATH_INFO"] = environ["PATH_INFO"][len(self.prefix) :]
environ["SCRIPT_NAME"] = self.prefix
return self.app(environ, start_response)
else:
start_response("404", [("Content-Type", "text/plain")])
return ["This url does not belong to the app.".encode()]
class InterfaceGenerator:
known = []
classes = {}
mapper = {
"str": "string",
"int": "number",
"float": "number",
"date": "Date",
"datetime": "Date",
"NoneType": "null",
"bool": "boolean",
}
def __init__(self, namespace, filename):
self.basename = ""
self.namespace = namespace
self.filename = filename
self.this_type = None
def pytype(self, cls):
a = self._pytype(cls)
print(f"{cls} -> {a}")
return a
def _pytype(self, cls):
import typing
origin = typing.get_origin(cls)
arguments = typing.get_args(cls)
if origin is typing.ForwardRef: # isinstance(cls, typing.ForwardRef):
return "", "this" if cls.__forward_arg__ == self.this_type else cls.__forward_arg__
if origin is typing.Union:
print(f"A1: {arguments[1]}")
if len(arguments) == 2 and arguments[1] is type(None):
return "?", self.pytype(arguments[0])[1]
else:
return "", "|".join([self.pytype(pt)[1] for pt in arguments])
if origin is list:
return "", "Array<{}>".format("|".join([self.pytype(a_type)[1] for a_type in arguments]))
name = cls.__name__ if hasattr(cls, "__name__") else cls if isinstance(cls, str) else None
if name is not None:
if name in self.mapper:
return "", self.mapper[name]
else:
return "", name
print(
"WARNING: This python version might not detect all types (try >= 3.9). Could not identify >{}<".format(cls)
)
return "?", "any"
def walker(self, module):
if sys.version_info < (3, 9):
raise RuntimeError("Python >= 3.9 is required to export API")
import typing
if (
inspect.ismodule(module[1])
and module[1].__name__.startswith(self.basename)
and module[1].__name__ not in self.known
):
self.known.append(module[1].__name__)
for cls in inspect.getmembers(module[1], lambda x: inspect.isclass(x) or inspect.ismodule(x)):
self.walker(cls)
elif (
inspect.isclass(module[1])
and module[1].__module__.startswith(self.basename)
and module[0] not in self.classes
and not module[0].startswith("_")
and hasattr(module[1], "__annotations__")
):
self.this_type = module[0]
print("\n\n" + module[0] + "\n")
d = {}
for param, ptype in typing.get_type_hints(module[1], globalns=None, localns=None).items():
if not param.startswith("_") and not param.endswith("_"):
print(f"{param} ::: {ptype}")
d[param] = self.pytype(ptype)
if len(d) == 1:
key, value = d.popitem()
self.classes[module[0]] = value[1]
else:
self.classes[module[0]] = d
def run(self, models):
self.basename = models.__name__
self.walker(("models", models))
def write(self):
with open(self.filename, "w") as file:
file.write("declare namespace {} {{\n".format(self.namespace))
for cls, params in self.classes.items():
if isinstance(params, str):
file.write("\ttype {} = {};\n".format(cls, params))
else:
file.write("\tinterface {} {{\n".format(cls))
for name in params:
file.write("\t\t{}{}: {};\n".format(name, *params[name]))
file.write("\t}\n")
file.write("}\n")
def install(arguments):
app = create_app()
with app.app_context():
install_all()
def run(arguments):
app = create_app()
with app.app_context():
app.wsgi_app = PrefixMiddleware(app.wsgi_app, prefix=config["FLASCHENGEIST"].get("root", ""))
if arguments.debug:
environ["FLASK_DEBUG"] = "1"
app.run(arguments.host, arguments.port, debug=True)
else:
app.run(arguments.host, arguments.port, debug=False)
def export(arguments):
import flaschengeist.models as models
app = create_app()
with app.app_context():
gen = InterfaceGenerator(arguments.namespace, arguments.file)
if not arguments.no_core:
gen.run(models)
if arguments.plugins is not None:
for entry_point in pkg_resources.iter_entry_points("flaschengeist.plugin"):
if len(arguments.plugins) == 0 or entry_point.name in arguments.plugins:
plg = entry_point.load()
if hasattr(plg, "models") and plg.models is not None:
gen.run(plg.models)
gen.write()
def ldap(arguments):
app = create_app()
with app.app_context():
if arguments.set_admin:
from flaschengeist.controller import roleController
from flaschengeist.database import db
role = roleController.get(arguments.set_admin)
role.permissions = roleController.get_permissions()
db.session.commit()
if arguments.sync:
from flaschengeist.controller import userController
from flaschengeist.plugins.auth_ldap import AuthLDAP
from ldap3 import SUBTREE
auth_ldap: AuthLDAP = app.config.get("FG_AUTH_BACKEND")
if auth_ldap is None and isinstance(auth_ldap, AuthLDAP):
raise Exception("Plugin >auth_ldap< not found")
conn = auth_ldap.ldap.connection
if not conn:
conn = auth_ldap.ldap.connect(auth_ldap.root_dn, auth_ldap.root_secret)
conn.search(auth_ldap.search_dn, "(uid=*)", SUBTREE, attributes=["uid", "givenName", "sn", "mail"])
ldap_users_response = conn.response
for ldap_user in ldap_users_response:
uid = ldap_user["attributes"]["uid"][0]
userController.find_user(uid)
if __name__ == "__main__":
# create the top-level parser
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(help="sub-command help", dest="sub_command")
subparsers.required = True
parser_run = subparsers.add_parser("run", help="run flaschengeist")
parser_run.set_defaults(func=run)
parser_run.add_argument("--host", help="set hostname to listen on", default="127.0.0.1")
parser_run.add_argument("--port", help="set port to listen on", type=int, default=5000)
parser_run.add_argument("--debug", help="run in debug mode", action="store_true")
parser_install = subparsers.add_parser(
"install", help="run database setup for flaschengeist and all installed plugins"
)
parser_install.set_defaults(func=install)
parser_export = subparsers.add_parser("export", help="export models to typescript interfaces")
parser_export.set_defaults(func=export)
parser_export.add_argument("--file", help="Filename where to save", default="flaschengeist.d.ts")
parser_export.add_argument("--namespace", help="Namespace of declarations", default="FG")
parser_export.add_argument(
"--no-core",
help="Do not export core declarations (only useful in conjunction with --plugins)",
action="store_true",
)
parser_export.add_argument("--plugins", help="Also export plugins (none means all)", nargs="*")
parser_ldap = subparsers.add_parser("ldap", help="LDAP helper utils")
parser_ldap.set_defaults(func=ldap)
parser_ldap.add_argument('--sync', action="store_true", help="Sync ldap-users with database")
parser_ldap.add_argument('--set-admin', type=str, help="Assign all permissions this to group")
args = parser.parse_args()
args.func(args)