168 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			168 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
	
| #!/usr/bin/python3
 | |
| import inspect
 | |
| import argparse
 | |
| import sys
 | |
| 
 | |
| import pkg_resources
 | |
| from werkzeug.middleware.dispatcher import DispatcherMiddleware
 | |
| 
 | |
| from flaschengeist.config import config
 | |
| 
 | |
| 
 | |
| class PrefixMiddleware(object):
 | |
| 
 | |
|     def __init__(self, app, prefix=''):
 | |
|         self.app = app
 | |
|         self.prefix = prefix
 | |
| 
 | |
|     def __call__(self, environ, start_response):
 | |
| 
 | |
|         if environ['PATH_INFO'].startswith(self.prefix):
 | |
|             environ['PATH_INFO'] = environ['PATH_INFO'][len(self.prefix):]
 | |
|             environ['SCRIPT_NAME'] = self.prefix
 | |
|             return self.app(environ, start_response)
 | |
|         else:
 | |
|             start_response('404', [('Content-Type', 'text/plain')])
 | |
|             return ["This url does not belong to the app.".encode()]
 | |
| 
 | |
| 
 | |
| class InterfaceGenerator:
 | |
|     known = []
 | |
|     classes = {}
 | |
|     mapper = {"str": "string", "int": "number", "float": "number", "date": "Date", "datetime": "Date", "NoneType": "null"}
 | |
| 
 | |
|     def __init__(self, namespace, filename):
 | |
|         self.basename = ""
 | |
|         self.namespace = namespace
 | |
|         self.filename = filename
 | |
|         self.this_type = None
 | |
| 
 | |
|     def pytype(self, cls):
 | |
|         if isinstance(cls, list):
 | |
|             return "", "Array<{}>".format(self.pytype(cls[0])[1])
 | |
|         if sys.version_info >= (3, 8):
 | |
|             import typing
 | |
| 
 | |
|             if isinstance(cls, typing.ForwardRef):
 | |
|                 return "", "this" if cls.__forward_arg__ == self.this_type else cls.__forward_arg__
 | |
|             if typing.get_origin(cls) == typing.Union:
 | |
|                 types = typing.get_args(cls)
 | |
|                 if len(types) == 2 and types[-1] is type(None):
 | |
|                     return "?", self.pytype(types[0])[1]
 | |
|                 else:
 | |
|                     return "", "|".join([self.pytype(pt)[1] for pt in types])
 | |
|         if hasattr(cls, "__name__"):
 | |
|             if cls.__name__ in self.mapper:
 | |
|                 return "", self.mapper[cls.__name__]
 | |
|             else:
 | |
|                 return "", cls.__name__
 | |
|         print(
 | |
|             "WARNING: This python version might not detect all types (try >= 3.8). Could not identify >{}<".format(cls)
 | |
|         )
 | |
|         return "?", "any"
 | |
| 
 | |
|     def walker(self, module):
 | |
|         if (
 | |
|             inspect.ismodule(module[1])
 | |
|             and module[1].__name__.startswith(self.basename)
 | |
|             and module[1].__name__ not in self.known
 | |
|         ):
 | |
|             self.known.append(module[1].__name__)
 | |
|             for cls in inspect.getmembers(module[1], lambda x: inspect.isclass(x) or inspect.ismodule(x)):
 | |
|                 self.walker(cls)
 | |
|         elif (
 | |
|             inspect.isclass(module[1])
 | |
|             and module[1].__module__.startswith(self.basename)
 | |
|             and module[0] not in self.classes
 | |
|             and not module[0].startswith("_")
 | |
|             and hasattr(module[1], "__annotations__")
 | |
|         ):
 | |
|             self.this_type = module[0]
 | |
|             d = {
 | |
|                 param: self.pytype(ptype)
 | |
|                 for param, ptype in module[1].__annotations__.items()
 | |
|                 if not param.startswith("_") and not param.endswith("_")
 | |
|             }
 | |
|             if len(d) == 1:
 | |
|                 key, value = d.popitem()
 | |
|                 self.classes[module[0]] = value[1]
 | |
|             else:
 | |
|                 self.classes[module[0]] = d
 | |
| 
 | |
|     def run(self, models):
 | |
|         self.basename = models.__name__
 | |
|         self.walker(("models", models))
 | |
| 
 | |
|     def write(self):
 | |
|         with open(self.filename, "w") as file:
 | |
|             file.write("declare namespace {} {{\n".format(self.namespace))
 | |
|             for cls, params in self.classes.items():
 | |
|                 if isinstance(params, str):
 | |
|                     file.write("\ttype {} = {};\n".format(cls, params))
 | |
|                 else:
 | |
|                     file.write("\tinterface {} {{\n".format(cls))
 | |
|                     for name in params:
 | |
|                         file.write("\t\t{}{}: {};\n".format(name, *params[name]))
 | |
|                     file.write("\t}\n")
 | |
|             file.write("}\n")
 | |
| 
 | |
| 
 | |
| def install(arguments):
 | |
|     from flaschengeist.app import create_app, install_all
 | |
| 
 | |
|     app = create_app()
 | |
|     with app.app_context():
 | |
|         install_all()
 | |
| 
 | |
| 
 | |
| def run(arguments):
 | |
|     from flaschengeist.app import create_app
 | |
| 
 | |
|     app = create_app()
 | |
|     with app.app_context():
 | |
|         app.wsgi_app = PrefixMiddleware(app.wsgi_app, prefix=config["FLASCHENGEIST"].get("root", ""))
 | |
|         if arguments.debug:
 | |
|             app.run(arguments.host, arguments.port, debug=True)
 | |
|         else:
 | |
|             app.run(arguments.host, arguments.port, debug=False)
 | |
| 
 | |
| 
 | |
| def export(arguments):
 | |
|     import flaschengeist.models as models
 | |
|     from flaschengeist.app import create_app
 | |
| 
 | |
|     app = create_app()
 | |
|     with app.app_context():
 | |
|         gen = InterfaceGenerator(arguments.namespace, arguments.file)
 | |
|         gen.run(models)
 | |
|         if arguments.plugins:
 | |
|             for entry_point in pkg_resources.iter_entry_points("flaschengeist.plugin"):
 | |
|                 plg = entry_point.load()
 | |
|                 if hasattr(plg, "models"):
 | |
|                     gen.run(plg.models)
 | |
|         gen.write()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     # create the top-level parser
 | |
|     parser = argparse.ArgumentParser()
 | |
|     subparsers = parser.add_subparsers(help="sub-command help", dest="sub_command")
 | |
|     subparsers.required = True
 | |
|     parser_run = subparsers.add_parser("run", help="run flaschengeist")
 | |
|     parser_run.set_defaults(func=run)
 | |
|     parser_run.add_argument("--host", help="set hostname to listen on", default="127.0.0.1")
 | |
|     parser_run.add_argument("--port", help="set port to listen on", type=int, default=5000)
 | |
|     parser_run.add_argument("--debug", help="run in debug mode", action="store_true")
 | |
|     parser_install = subparsers.add_parser(
 | |
|         "install", help="run database setup for flaschengeist and all installed plugins"
 | |
|     )
 | |
|     parser_install.set_defaults(func=install)
 | |
|     parser_export = subparsers.add_parser("export", help="export models to typescript interfaces")
 | |
|     parser_export.set_defaults(func=export)
 | |
|     parser_export.add_argument("--file", help="Filename where to save", default="flaschengeist.d.ts")
 | |
|     parser_export.add_argument("--namespace", help="Namespace of declarations", default="FG")
 | |
|     parser_export.add_argument("--plugins", help="Also export plugins", action="store_true")
 | |
| 
 | |
|     args = parser.parse_args()
 | |
|     args.func(args)
 |