168 lines
6.1 KiB
Python
168 lines
6.1 KiB
Python
#!/usr/bin/python3
|
|
import inspect
|
|
import argparse
|
|
import sys
|
|
|
|
import pkg_resources
|
|
from werkzeug.middleware.dispatcher import DispatcherMiddleware
|
|
|
|
from flaschengeist.config import config
|
|
|
|
|
|
class PrefixMiddleware(object):
|
|
|
|
def __init__(self, app, prefix=''):
|
|
self.app = app
|
|
self.prefix = prefix
|
|
|
|
def __call__(self, environ, start_response):
|
|
|
|
if environ['PATH_INFO'].startswith(self.prefix):
|
|
environ['PATH_INFO'] = environ['PATH_INFO'][len(self.prefix):]
|
|
environ['SCRIPT_NAME'] = self.prefix
|
|
return self.app(environ, start_response)
|
|
else:
|
|
start_response('404', [('Content-Type', 'text/plain')])
|
|
return ["This url does not belong to the app.".encode()]
|
|
|
|
|
|
class InterfaceGenerator:
|
|
known = []
|
|
classes = {}
|
|
mapper = {"str": "string", "int": "number", "float": "number", "date": "Date", "datetime": "Date", "NoneType": "null"}
|
|
|
|
def __init__(self, namespace, filename):
|
|
self.basename = ""
|
|
self.namespace = namespace
|
|
self.filename = filename
|
|
self.this_type = None
|
|
|
|
def pytype(self, cls):
|
|
if isinstance(cls, list):
|
|
return "", "Array<{}>".format(self.pytype(cls[0])[1])
|
|
if sys.version_info >= (3, 8):
|
|
import typing
|
|
|
|
if isinstance(cls, typing.ForwardRef):
|
|
return "", "this" if cls.__forward_arg__ == self.this_type else cls.__forward_arg__
|
|
if typing.get_origin(cls) == typing.Union:
|
|
types = typing.get_args(cls)
|
|
if len(types) == 2 and types[-1] is type(None):
|
|
return "?", self.pytype(types[0])[1]
|
|
else:
|
|
return "", "|".join([self.pytype(pt)[1] for pt in types])
|
|
if hasattr(cls, "__name__"):
|
|
if cls.__name__ in self.mapper:
|
|
return "", self.mapper[cls.__name__]
|
|
else:
|
|
return "", cls.__name__
|
|
print(
|
|
"WARNING: This python version might not detect all types (try >= 3.8). Could not identify >{}<".format(cls)
|
|
)
|
|
return "?", "any"
|
|
|
|
def walker(self, module):
|
|
if (
|
|
inspect.ismodule(module[1])
|
|
and module[1].__name__.startswith(self.basename)
|
|
and module[1].__name__ not in self.known
|
|
):
|
|
self.known.append(module[1].__name__)
|
|
for cls in inspect.getmembers(module[1], lambda x: inspect.isclass(x) or inspect.ismodule(x)):
|
|
self.walker(cls)
|
|
elif (
|
|
inspect.isclass(module[1])
|
|
and module[1].__module__.startswith(self.basename)
|
|
and module[0] not in self.classes
|
|
and not module[0].startswith("_")
|
|
and hasattr(module[1], "__annotations__")
|
|
):
|
|
self.this_type = module[0]
|
|
d = {
|
|
param: self.pytype(ptype)
|
|
for param, ptype in module[1].__annotations__.items()
|
|
if not param.startswith("_") and not param.endswith("_")
|
|
}
|
|
if len(d) == 1:
|
|
key, value = d.popitem()
|
|
self.classes[module[0]] = value[1]
|
|
else:
|
|
self.classes[module[0]] = d
|
|
|
|
def run(self, models):
|
|
self.basename = models.__name__
|
|
self.walker(("models", models))
|
|
|
|
def write(self):
|
|
with open(self.filename, "w") as file:
|
|
file.write("declare namespace {} {{\n".format(self.namespace))
|
|
for cls, params in self.classes.items():
|
|
if isinstance(params, str):
|
|
file.write("\ttype {} = {};\n".format(cls, params))
|
|
else:
|
|
file.write("\tinterface {} {{\n".format(cls))
|
|
for name in params:
|
|
file.write("\t\t{}{}: {};\n".format(name, *params[name]))
|
|
file.write("\t}\n")
|
|
file.write("}\n")
|
|
|
|
|
|
def install(arguments):
|
|
from flaschengeist.app import create_app, install_all
|
|
|
|
app = create_app()
|
|
with app.app_context():
|
|
install_all()
|
|
|
|
|
|
def run(arguments):
|
|
from flaschengeist.app import create_app
|
|
|
|
app = create_app()
|
|
with app.app_context():
|
|
app.wsgi_app = PrefixMiddleware(app.wsgi_app, prefix=config["FLASCHENGEIST"].get("root", ""))
|
|
if arguments.debug:
|
|
app.run(arguments.host, arguments.port, debug=True)
|
|
else:
|
|
app.run(arguments.host, arguments.port, debug=False)
|
|
|
|
|
|
def export(arguments):
|
|
import flaschengeist.models as models
|
|
from flaschengeist.app import create_app
|
|
|
|
app = create_app()
|
|
with app.app_context():
|
|
gen = InterfaceGenerator(arguments.namespace, arguments.file)
|
|
gen.run(models)
|
|
if arguments.plugins:
|
|
for entry_point in pkg_resources.iter_entry_points("flaschengeist.plugin"):
|
|
plg = entry_point.load()
|
|
if hasattr(plg, "models"):
|
|
gen.run(plg.models)
|
|
gen.write()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# create the top-level parser
|
|
parser = argparse.ArgumentParser()
|
|
subparsers = parser.add_subparsers(help="sub-command help", dest="sub_command")
|
|
subparsers.required = True
|
|
parser_run = subparsers.add_parser("run", help="run flaschengeist")
|
|
parser_run.set_defaults(func=run)
|
|
parser_run.add_argument("--host", help="set hostname to listen on", default="127.0.0.1")
|
|
parser_run.add_argument("--port", help="set port to listen on", type=int, default=5000)
|
|
parser_run.add_argument("--debug", help="run in debug mode", action="store_true")
|
|
parser_install = subparsers.add_parser(
|
|
"install", help="run database setup for flaschengeist and all installed plugins"
|
|
)
|
|
parser_install.set_defaults(func=install)
|
|
parser_export = subparsers.add_parser("export", help="export models to typescript interfaces")
|
|
parser_export.set_defaults(func=export)
|
|
parser_export.add_argument("--file", help="Filename where to save", default="flaschengeist.d.ts")
|
|
parser_export.add_argument("--namespace", help="Namespace of declarations", default="FG")
|
|
parser_export.add_argument("--plugins", help="Also export plugins", action="store_true")
|
|
|
|
args = parser.parse_args()
|
|
args.func(args)
|