197 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			197 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
	
#!/usr/bin/python3
 | 
						|
from __future__ import annotations  # TODO: Remove if python requirement is >= 3.10
 | 
						|
import inspect
 | 
						|
import argparse
 | 
						|
import sys
 | 
						|
 | 
						|
import pkg_resources
 | 
						|
 | 
						|
from flaschengeist.config import config
 | 
						|
 | 
						|
 | 
						|
class PrefixMiddleware(object):
 | 
						|
    def __init__(self, app, prefix=""):
 | 
						|
        self.app = app
 | 
						|
        self.prefix = prefix
 | 
						|
 | 
						|
    def __call__(self, environ, start_response):
 | 
						|
 | 
						|
        if environ["PATH_INFO"].startswith(self.prefix):
 | 
						|
            environ["PATH_INFO"] = environ["PATH_INFO"][len(self.prefix) :]
 | 
						|
            environ["SCRIPT_NAME"] = self.prefix
 | 
						|
            return self.app(environ, start_response)
 | 
						|
        else:
 | 
						|
            start_response("404", [("Content-Type", "text/plain")])
 | 
						|
            return ["This url does not belong to the app.".encode()]
 | 
						|
 | 
						|
 | 
						|
class InterfaceGenerator:
 | 
						|
    known = []
 | 
						|
    classes = {}
 | 
						|
    mapper = {
 | 
						|
        "str": "string",
 | 
						|
        "int": "number",
 | 
						|
        "float": "number",
 | 
						|
        "date": "Date",
 | 
						|
        "datetime": "Date",
 | 
						|
        "NoneType": "null",
 | 
						|
        "bool": "boolean",
 | 
						|
    }
 | 
						|
 | 
						|
    def __init__(self, namespace, filename):
 | 
						|
        self.basename = ""
 | 
						|
        self.namespace = namespace
 | 
						|
        self.filename = filename
 | 
						|
        self.this_type = None
 | 
						|
 | 
						|
    def pytype(self, cls):
 | 
						|
        a = self._pytype(cls)
 | 
						|
        print(f"{cls} -> {a}")
 | 
						|
        return a
 | 
						|
 | 
						|
    def _pytype(self, cls):
 | 
						|
        import typing
 | 
						|
 | 
						|
        origin = typing.get_origin(cls)
 | 
						|
        arguments = typing.get_args(cls)
 | 
						|
 | 
						|
        if origin is typing.ForwardRef:  # isinstance(cls, typing.ForwardRef):
 | 
						|
            return "", "this" if cls.__forward_arg__ == self.this_type else cls.__forward_arg__
 | 
						|
        if origin is typing.Union:
 | 
						|
            print(f"A1:  {arguments[1]}")
 | 
						|
            if len(arguments) == 2 and arguments[1] is type(None):
 | 
						|
                return "?", self.pytype(arguments[0])[1]
 | 
						|
            else:
 | 
						|
                return "", "|".join([self.pytype(pt)[1] for pt in arguments])
 | 
						|
        if origin is list:
 | 
						|
            return "", "Array<{}>".format("|".join([self.pytype(a_type)[1] for a_type in arguments]))
 | 
						|
 | 
						|
        name = cls.__name__ if hasattr(cls, "__name__") else cls if isinstance(cls, str) else None
 | 
						|
        if name is not None:
 | 
						|
            if name in self.mapper:
 | 
						|
                return "", self.mapper[name]
 | 
						|
            else:
 | 
						|
                return "", name
 | 
						|
        print(
 | 
						|
            "WARNING: This python version might not detect all types (try >= 3.9). Could not identify >{}<".format(cls)
 | 
						|
        )
 | 
						|
        return "?", "any"
 | 
						|
 | 
						|
    def walker(self, module):
 | 
						|
        if sys.version_info < (3, 9):
 | 
						|
            raise RuntimeError("Python >= 3.9 is required to export API")
 | 
						|
        import typing
 | 
						|
 | 
						|
        if (
 | 
						|
            inspect.ismodule(module[1])
 | 
						|
            and module[1].__name__.startswith(self.basename)
 | 
						|
            and module[1].__name__ not in self.known
 | 
						|
        ):
 | 
						|
            self.known.append(module[1].__name__)
 | 
						|
            for cls in inspect.getmembers(module[1], lambda x: inspect.isclass(x) or inspect.ismodule(x)):
 | 
						|
                self.walker(cls)
 | 
						|
        elif (
 | 
						|
            inspect.isclass(module[1])
 | 
						|
            and module[1].__module__.startswith(self.basename)
 | 
						|
            and module[0] not in self.classes
 | 
						|
            and not module[0].startswith("_")
 | 
						|
            and hasattr(module[1], "__annotations__")
 | 
						|
        ):
 | 
						|
            self.this_type = module[0]
 | 
						|
            print("\n\n" + module[0] + "\n")
 | 
						|
            d = {}
 | 
						|
            for param, ptype in typing.get_type_hints(module[1], globalns=None, localns=None).items():
 | 
						|
                if not param.startswith("_") and not param.endswith("_"):
 | 
						|
                    print(f"{param} ::: {ptype}")
 | 
						|
                    d[param] = self.pytype(ptype)
 | 
						|
 | 
						|
            if len(d) == 1:
 | 
						|
                key, value = d.popitem()
 | 
						|
                self.classes[module[0]] = value[1]
 | 
						|
            else:
 | 
						|
                self.classes[module[0]] = d
 | 
						|
 | 
						|
    def run(self, models):
 | 
						|
        self.basename = models.__name__
 | 
						|
        self.walker(("models", models))
 | 
						|
 | 
						|
    def write(self):
 | 
						|
        with open(self.filename, "w") as file:
 | 
						|
            file.write("declare namespace {} {{\n".format(self.namespace))
 | 
						|
            for cls, params in self.classes.items():
 | 
						|
                if isinstance(params, str):
 | 
						|
                    file.write("\ttype {} = {};\n".format(cls, params))
 | 
						|
                else:
 | 
						|
                    file.write("\tinterface {} {{\n".format(cls))
 | 
						|
                    for name in params:
 | 
						|
                        file.write("\t\t{}{}: {};\n".format(name, *params[name]))
 | 
						|
                    file.write("\t}\n")
 | 
						|
            file.write("}\n")
 | 
						|
 | 
						|
 | 
						|
def install(arguments):
 | 
						|
    from flaschengeist.app import create_app, install_all
 | 
						|
 | 
						|
    app = create_app()
 | 
						|
    with app.app_context():
 | 
						|
        install_all()
 | 
						|
 | 
						|
 | 
						|
def run(arguments):
 | 
						|
    from flaschengeist.app import create_app
 | 
						|
 | 
						|
    app = create_app()
 | 
						|
    with app.app_context():
 | 
						|
        app.wsgi_app = PrefixMiddleware(app.wsgi_app, prefix=config["FLASCHENGEIST"].get("root", ""))
 | 
						|
        if arguments.debug:
 | 
						|
            app.run(arguments.host, arguments.port, debug=True)
 | 
						|
        else:
 | 
						|
            app.run(arguments.host, arguments.port, debug=False)
 | 
						|
 | 
						|
 | 
						|
def export(arguments):
 | 
						|
    import flaschengeist.models as models
 | 
						|
    from flaschengeist.app import create_app
 | 
						|
 | 
						|
    app = create_app()
 | 
						|
    with app.app_context():
 | 
						|
        gen = InterfaceGenerator(arguments.namespace, arguments.file)
 | 
						|
        if not arguments.no_core:
 | 
						|
            gen.run(models)
 | 
						|
        if arguments.plugins is not None:
 | 
						|
            for entry_point in pkg_resources.iter_entry_points("flaschengeist.plugin"):
 | 
						|
                if len(arguments.plugins) == 0 or entry_point.name in arguments.plugins:
 | 
						|
                    plg = entry_point.load()
 | 
						|
                    if hasattr(plg, "models") and plg.models is not None:
 | 
						|
                        gen.run(plg.models)
 | 
						|
        gen.write()
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    # create the top-level parser
 | 
						|
    parser = argparse.ArgumentParser()
 | 
						|
    subparsers = parser.add_subparsers(help="sub-command help", dest="sub_command")
 | 
						|
    subparsers.required = True
 | 
						|
    parser_run = subparsers.add_parser("run", help="run flaschengeist")
 | 
						|
    parser_run.set_defaults(func=run)
 | 
						|
    parser_run.add_argument("--host", help="set hostname to listen on", default="127.0.0.1")
 | 
						|
    parser_run.add_argument("--port", help="set port to listen on", type=int, default=5000)
 | 
						|
    parser_run.add_argument("--debug", help="run in debug mode", action="store_true")
 | 
						|
    parser_install = subparsers.add_parser(
 | 
						|
        "install", help="run database setup for flaschengeist and all installed plugins"
 | 
						|
    )
 | 
						|
    parser_install.set_defaults(func=install)
 | 
						|
    parser_export = subparsers.add_parser("export", help="export models to typescript interfaces")
 | 
						|
    parser_export.set_defaults(func=export)
 | 
						|
    parser_export.add_argument("--file", help="Filename where to save", default="flaschengeist.d.ts")
 | 
						|
    parser_export.add_argument("--namespace", help="Namespace of declarations", default="FG")
 | 
						|
    parser_export.add_argument(
 | 
						|
        "--no-core",
 | 
						|
        help="Do not export core declarations (only useful in conjunction with --plugins)",
 | 
						|
        action="store_true",
 | 
						|
    )
 | 
						|
    parser_export.add_argument("--plugins", help="Also export plugins (none means all)", nargs="*")
 | 
						|
 | 
						|
    args = parser.parse_args()
 | 
						|
    args.func(args)
 |