#!/usr/bin/python3 from __future__ import annotations # TODO: Remove if python requirement is >= 3.10 import inspect import argparse import sys import pkg_resources from flaschengeist.config import config class PrefixMiddleware(object): def __init__(self, app, prefix=""): self.app = app self.prefix = prefix def __call__(self, environ, start_response): if environ["PATH_INFO"].startswith(self.prefix): environ["PATH_INFO"] = environ["PATH_INFO"][len(self.prefix) :] environ["SCRIPT_NAME"] = self.prefix return self.app(environ, start_response) else: start_response("404", [("Content-Type", "text/plain")]) return ["This url does not belong to the app.".encode()] class InterfaceGenerator: known = [] classes = {} mapper = { "str": "string", "int": "number", "float": "number", "date": "Date", "datetime": "Date", "NoneType": "null", "bool": "boolean", } def __init__(self, namespace, filename): self.basename = "" self.namespace = namespace self.filename = filename self.this_type = None def pytype(self, cls): a = self._pytype(cls) print(f"{cls} -> {a}") return a def _pytype(self, cls): import typing origin = typing.get_origin(cls) arguments = typing.get_args(cls) if origin is typing.ForwardRef: # isinstance(cls, typing.ForwardRef): return "", "this" if cls.__forward_arg__ == self.this_type else cls.__forward_arg__ if origin is typing.Union: print(f"A1: {arguments[1]}") if len(arguments) == 2 and arguments[1] is type(None): return "?", self.pytype(arguments[0])[1] else: return "", "|".join([self.pytype(pt)[1] for pt in arguments]) if origin is list: return "", "Array<{}>".format("|".join([self.pytype(a_type)[1] for a_type in arguments])) name = cls.__name__ if hasattr(cls, "__name__") else cls if isinstance(cls, str) else None if name is not None: if name in self.mapper: return "", self.mapper[name] else: return "", name print( "WARNING: This python version might not detect all types (try >= 3.9). Could not identify >{}<".format(cls) ) return "?", "any" def walker(self, module): if sys.version_info < (3, 9): raise RuntimeError("Python >= 3.9 is required to export API") import typing if ( inspect.ismodule(module[1]) and module[1].__name__.startswith(self.basename) and module[1].__name__ not in self.known ): self.known.append(module[1].__name__) for cls in inspect.getmembers(module[1], lambda x: inspect.isclass(x) or inspect.ismodule(x)): self.walker(cls) elif ( inspect.isclass(module[1]) and module[1].__module__.startswith(self.basename) and module[0] not in self.classes and not module[0].startswith("_") and hasattr(module[1], "__annotations__") ): self.this_type = module[0] print("\n\n" + module[0] + "\n") d = {} for param, ptype in typing.get_type_hints(module[1], globalns=None, localns=None).items(): if not param.startswith("_") and not param.endswith("_"): print(f"{param} ::: {ptype}") d[param] = self.pytype(ptype) if len(d) == 1: key, value = d.popitem() self.classes[module[0]] = value[1] else: self.classes[module[0]] = d def run(self, models): self.basename = models.__name__ self.walker(("models", models)) def write(self): with open(self.filename, "w") as file: file.write("declare namespace {} {{\n".format(self.namespace)) for cls, params in self.classes.items(): if isinstance(params, str): file.write("\ttype {} = {};\n".format(cls, params)) else: file.write("\tinterface {} {{\n".format(cls)) for name in params: file.write("\t\t{}{}: {};\n".format(name, *params[name])) file.write("\t}\n") file.write("}\n") def install(arguments): from flaschengeist.app import create_app, install_all app = create_app() with app.app_context(): install_all() def run(arguments): from flaschengeist.app import create_app app = create_app() with app.app_context(): app.wsgi_app = PrefixMiddleware(app.wsgi_app, prefix=config["FLASCHENGEIST"].get("root", "")) if arguments.debug: app.run(arguments.host, arguments.port, debug=True) else: app.run(arguments.host, arguments.port, debug=False) def export(arguments): import flaschengeist.models as models from flaschengeist.app import create_app app = create_app() with app.app_context(): gen = InterfaceGenerator(arguments.namespace, arguments.file) if not arguments.no_core: gen.run(models) if arguments.plugins is not None: for entry_point in pkg_resources.iter_entry_points("flaschengeist.plugin"): if len(arguments.plugins) == 0 or entry_point.name in arguments.plugins: plg = entry_point.load() if hasattr(plg, "models") and plg.models is not None: gen.run(plg.models) gen.write() def ldap_sync(arguments): from flaschengeist.app import create_app from flaschengeist.controller import userController from flaschengeist.plugins.auth_ldap import AuthLDAP from ldap3 import SUBTREE app = create_app() with app.app_context(): auth_ldap: AuthLDAP = app.config.get("FG_PLUGINS").get("auth_ldap") if auth_ldap: conn = auth_ldap.ldap.connection if not conn: conn = auth_ldap.ldap.connect(auth_ldap.root_dn, auth_ldap.root_secret) conn.search(auth_ldap.search_dn, "(uid=*)", SUBTREE, attributes=["uid", "givenName", "sn", "mail"]) ldap_users_response = conn.response for ldap_user in ldap_users_response: uid = ldap_user["attributes"]["uid"][0] userController.find_user(uid) exit() raise Exception("auth_ldap not found") if __name__ == "__main__": # create the top-level parser parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(help="sub-command help", dest="sub_command") subparsers.required = True parser_run = subparsers.add_parser("run", help="run flaschengeist") parser_run.set_defaults(func=run) parser_run.add_argument("--host", help="set hostname to listen on", default="127.0.0.1") parser_run.add_argument("--port", help="set port to listen on", type=int, default=5000) parser_run.add_argument("--debug", help="run in debug mode", action="store_true") parser_install = subparsers.add_parser( "install", help="run database setup for flaschengeist and all installed plugins" ) parser_install.set_defaults(func=install) parser_export = subparsers.add_parser("export", help="export models to typescript interfaces") parser_export.set_defaults(func=export) parser_export.add_argument("--file", help="Filename where to save", default="flaschengeist.d.ts") parser_export.add_argument("--namespace", help="Namespace of declarations", default="FG") parser_export.add_argument( "--no-core", help="Do not export core declarations (only useful in conjunction with --plugins)", action="store_true", ) parser_export.add_argument("--plugins", help="Also export plugins (none means all)", nargs="*") parser_ldap_sync = subparsers.add_parser("ldap_sync", help="synch ldap-users with database") parser_ldap_sync.set_defaults(func=ldap_sync) args = parser.parse_args() args.func(args)