feat(cli): Ported CLI to use native click / flask cli

This commit is contained in:
Ferdinand Thiessen 2021-12-18 01:39:04 +01:00
parent 38ebaf0e79
commit ece6893675
7 changed files with 360 additions and 248 deletions

View File

@ -95,7 +95,7 @@ def install_all():
roleController.create_permissions(plugin.permissions) roleController.create_permissions(plugin.permissions)
def create_app(test_config=None): def create_app(test_config=None, cli=False):
app = Flask(__name__) app = Flask(__name__)
app.json_encoder = CustomJSONEncoder app.json_encoder = CustomJSONEncoder
CORS(app) CORS(app)
@ -103,7 +103,7 @@ def create_app(test_config=None):
with app.app_context(): with app.app_context():
from flaschengeist.database import db from flaschengeist.database import db
configure_app(app, test_config) configure_app(app, test_config, cli)
db.init_app(app) db.init_app(app)
__load_plugins(app) __load_plugins(app)
@ -113,14 +113,6 @@ def create_app(test_config=None):
return jsonify({"plugins": app.config["FG_PLUGINS"], "version": version}) return jsonify({"plugins": app.config["FG_PLUGINS"], "version": version})
@app.after_request
def after_request(response):
header = response.headers
header["Access-Control-Allow-Origin"] = "*"
header["Access-Control-Allow-Methods"] = "GET,HEAD,OPTIONS,POST,PUT"
header["Access-Control-Allow-Headers"] = "Origin, X-Requested-With, Content-Type, Accept, Authorization"
return response
@app.errorhandler(Exception) @app.errorhandler(Exception)
def handle_exception(e): def handle_exception(e):
if isinstance(e, HTTPException): if isinstance(e, HTTPException):

View File

@ -0,0 +1,107 @@
import sys
import inspect
import logging
class InterfaceGenerator:
known = []
classes = {}
mapper = {
"str": "string",
"int": "number",
"float": "number",
"date": "Date",
"datetime": "Date",
"NoneType": "null",
"bool": "boolean",
}
def __init__(self, namespace, filename, logger=logging.getLogger()):
self.basename = ""
self.namespace = namespace
self.filename = filename
self.this_type = None
self.logger = logger
def pytype(self, cls):
a = self._pytype(cls)
return a
def _pytype(self, cls):
import typing
origin = typing.get_origin(cls)
arguments = typing.get_args(cls)
if origin is typing.ForwardRef: # isinstance(cls, typing.ForwardRef):
return "", "this" if cls.__forward_arg__ == self.this_type else cls.__forward_arg__
if origin is typing.Union:
if len(arguments) == 2 and arguments[1] is type(None):
return "?", self.pytype(arguments[0])[1]
else:
return "", "|".join([self.pytype(pt)[1] for pt in arguments])
if origin is list:
return "", "Array<{}>".format("|".join([self.pytype(a_type)[1] for a_type in arguments]))
if cls is typing.Any:
return "", "any"
name = cls.__name__ if hasattr(cls, "__name__") else cls if isinstance(cls, str) else None
if name is not None:
if name in self.mapper:
return "", self.mapper[name]
else:
return "", name
self.logger.warning(f"This python version might not detect all types (try >= 3.9). Could not identify >{cls}<")
return "?", "any"
def walker(self, module):
if sys.version_info < (3, 9):
raise RuntimeError("Python >= 3.9 is required to export API")
import typing
if (
inspect.ismodule(module[1])
and module[1].__name__.startswith(self.basename)
and module[1].__name__ not in self.known
):
self.known.append(module[1].__name__)
for cls in inspect.getmembers(module[1], lambda x: inspect.isclass(x) or inspect.ismodule(x)):
self.walker(cls)
elif (
inspect.isclass(module[1])
and module[1].__module__.startswith(self.basename)
and module[0] not in self.classes
and not module[0].startswith("_")
and hasattr(module[1], "__annotations__")
):
self.this_type = module[0]
d = {}
for param, ptype in typing.get_type_hints(module[1], globalns=None, localns=None).items():
if not param.startswith("_") and not param.endswith("_"):
d[param] = self.pytype(ptype)
if len(d) == 1:
key, value = d.popitem()
self.classes[module[0]] = value[1]
else:
self.classes[module[0]] = d
def run(self, models):
self.basename = models.__name__
self.walker(("models", models))
def write(self):
with (open(self.filename, "w") if self.filename else sys.stdout) as file:
file.write("declare namespace {} {{\n".format(self.namespace))
for cls, params in self.classes.items():
if isinstance(params, str):
file.write("\ttype {} = {};\n".format(cls, params))
else:
file.write("\tinterface {} {{\n".format(cls))
for name in params:
file.write("\t\t{}{}: {};\n".format(name, *params[name]))
file.write("\t}\n")
file.write("}\n")

View File

@ -0,0 +1,160 @@
import pathlib
import subprocess
import click
from os import environ
from flask import current_app
from flask.cli import FlaskGroup, run_command, with_appcontext
import pkg_resources
from flaschengeist.app import create_app
from flaschengeist.config import configure_logger
def get_version(ctx, param, value):
if not value or ctx.resilient_parsing:
return
import platform
from werkzeug import __version__ as werkzeug_version
from flask import __version__ as flask_version
from flaschengeist import __version__
click.echo(
f"Python {platform.python_version()}\n"
f"Flask {flask_version}\n"
f"Werkzeug {werkzeug_version}\n"
f"Flaschengeist {__version__}",
color=ctx.color,
)
ctx.exit()
@with_appcontext
def verbosity(ctx, param, value):
"""Toggle verbosity between WARNING <-> DEBUG"""
if not value or ctx.resilient_parsing:
return
configure_logger(cli=40 - max(0, min(value * 10, 30)))
@click.group(
cls=FlaskGroup,
add_version_option=False,
add_default_commands=False,
create_app=lambda: create_app(cli=True),
)
@click.option(
"--version",
help="Show the flask version",
expose_value=False,
callback=get_version,
is_flag=True,
is_eager=True,
)
@click.option(
"--verbose",
"-v",
help="Increase logging level",
callback=verbosity,
count=True,
expose_value=False,
)
def cli():
"""Management script for the Flaschengeist application."""
pass
@cli.command()
@with_appcontext
def install():
"""Install and initialize enabled plugins.
Most plugins need to install custom tables into the database
running this command will lookup all enabled plugins and run
their database initalization routines.
"""
from flaschengeist.app import install_all
install_all()
@cli.command()
@click.option("--output", "-o", help="Output file, default is stdout", type=click.Path())
@click.option("--namespace", "-n", help="TS namespace for the interfaces", type=str, show_default=True)
@click.option("--plugin", "-p", help="Also export types for a plugin (even if disabled)", multiple=True, type=str)
@click.option("--no-core", help="Skip models / types from flaschengeist core", is_flag=True)
def export(namespace, output, no_core, plugin):
from flaschengeist import models
from flaschengeist import logger
from .InterfaceGenerator import InterfaceGenerator
gen = InterfaceGenerator(namespace, output, logger)
if not no_core:
gen.run(models)
if plugin:
for entry_point in pkg_resources.iter_entry_points("flaschengeist.plugins"):
if len(plugin) == 0 or entry_point.name in plugin:
plg = entry_point.load()
if hasattr(plg, "models") and plg.models is not None:
gen.run(plg.models)
gen.write()
@cli.command()
@click.option("--host", help="set hostname to listen on", default="127.0.0.1", show_default=True)
@click.option("--port", help="set port to listen on", type=int, default=5000, show_default=True)
@click.option("--debug", help="run in debug mode", is_flag=True)
@with_appcontext
@click.pass_context
def run(ctx, host, port, debug):
"""Run Flaschengeist using a development server."""
class PrefixMiddleware(object):
def __init__(self, app, prefix=""):
self.app = app
self.prefix = prefix
def __call__(self, environ, start_response):
if environ["PATH_INFO"].startswith(self.prefix):
environ["PATH_INFO"] = environ["PATH_INFO"][len(self.prefix) :]
environ["SCRIPT_NAME"] = self.prefix
return self.app(environ, start_response)
else:
start_response("404", [("Content-Type", "text/plain")])
return ["This url does not belong to the app.".encode()]
from flaschengeist.config import config
# re configure logger, as we are no logger in CLI mode
configure_logger()
current_app.wsgi_app = PrefixMiddleware(current_app.wsgi_app, prefix=config["FLASCHENGEIST"].get("root", ""))
if debug:
environ["FLASK_DEBUG"] = "1"
environ["FLASK_ENV"] = "development"
ctx.invoke(run_command, host=host, port=port, debugger=debug)
@cli.command()
@click.option(
"--output",
"-o",
help="Documentation output path",
default="./docs",
type=click.Path(file_okay=False, path_type=pathlib.Path),
)
def docs(output: pathlib.Path):
"""Generate and export API documentation using pdoc3"""
output.mkdir(parents=True, exist_ok=True)
command = [
"python",
"-m",
"pdoc",
"--skip-errors",
"--html",
"--output-dir",
str(output),
"flaschengeist",
]
click.echo(f"Running command: {command}")
subprocess.check_call(command)

View File

@ -33,7 +33,7 @@ def read_configuration(test_config):
for loc in paths: for loc in paths:
try: try:
with (loc / "flaschengeist.toml").open() as source: with (loc / "flaschengeist.toml").open() as source:
print("Reading config file from >{}<".format(loc)) logger.warning(f"Reading config file from >{loc}<") # default root logger, goes to stderr
update_dict(config, toml.load(source)) update_dict(config, toml.load(source))
except IOError: except IOError:
pass pass
@ -41,7 +41,7 @@ def read_configuration(test_config):
update_dict(config, test_config) update_dict(config, test_config)
def configure_logger(): def configure_logger(cli=False):
global config global config
# Read default config # Read default config
logger_config = toml.load(_module_path / "logging.toml") logger_config = toml.load(_module_path / "logging.toml")
@ -50,25 +50,27 @@ def configure_logger():
# Override with user config # Override with user config
update_dict(logger_config, config.get("LOGGING")) update_dict(logger_config, config.get("LOGGING"))
# Check for shortcuts # Check for shortcuts
if "level" in config["LOGGING"]: if "level" in config["LOGGING"] or isinstance(cli, int):
logger_config["loggers"]["flaschengeist"] = {"level": config["LOGGING"]["level"]} level = cli if isinstance(cli, int) else config["LOGGING"]["level"]
logger_config["handlers"]["console"]["level"] = config["LOGGING"]["level"] logger_config["loggers"]["flaschengeist"] = {"level": level}
logger_config["handlers"]["file"]["level"] = config["LOGGING"]["level"] logger_config["handlers"]["console"]["level"] = level
if not config["LOGGING"].get("console", True): logger_config["handlers"]["file"]["level"] = level
if cli is True or not config["LOGGING"].get("console", True):
logger_config["handlers"]["console"]["level"] = "CRITICAL" logger_config["handlers"]["console"]["level"] = "CRITICAL"
if "file" in config["LOGGING"]: if not cli and isinstance(config["LOGGING"].get("file", False), str):
logger_config["root"]["handlers"].append("file") logger_config["root"]["handlers"].append("file")
logger_config["handlers"]["file"]["filename"] = config["LOGGING"]["file"] logger_config["handlers"]["file"]["filename"] = config["LOGGING"]["file"]
path = Path(config["LOGGING"]["file"]) Path(config["LOGGING"]["file"]).parent.mkdir(parents=True, exist_ok=True)
path.parent.mkdir(parents=True, exist_ok=True) else:
del logger_config["handlers"]["file"]
logging.config.dictConfig(logger_config) logging.config.dictConfig(logger_config)
def configure_app(app, test_config=None): def configure_app(app, test_config=None, cli=False):
global config global config
read_configuration(test_config) read_configuration(test_config)
configure_logger() configure_logger(cli)
# Always enable this builtin plugins! # Always enable this builtin plugins!
update_dict( update_dict(

View File

@ -0,0 +1,27 @@
import click
from flask import current_app
from flask.cli import with_appcontext
@click.command(no_args_is_help=True)
@click.option("--sync", is_flag=True, default=False, help="Synchronize users from LDAP -> database")
@with_appcontext
@click.pass_context
def ldap(ctx, sync):
"""Tools for the LDAP authentification"""
if sync:
from flaschengeist.controller import userController
from flaschengeist.plugins.auth_ldap import AuthLDAP
from ldap3 import SUBTREE
auth_ldap: AuthLDAP = current_app.config.get("FG_AUTH_BACKEND")
if auth_ldap is None or not isinstance(auth_ldap, AuthLDAP):
ctx.fail("auth_ldap plugin not found or not enabled!")
conn = auth_ldap.ldap.connection
if not conn:
conn = auth_ldap.ldap.connect(auth_ldap.root_dn, auth_ldap.root_secret)
conn.search(auth_ldap.search_dn, "(uid=*)", SUBTREE, attributes=["uid", "givenName", "sn", "mail"])
ldap_users_response = conn.response
for ldap_user in ldap_users_response:
uid = ldap_user["attributes"]["uid"][0]
userController.find_user(uid)

View File

@ -0,0 +1,50 @@
import click
from flask.cli import with_appcontext
from werkzeug.exceptions import BadRequest, Conflict, NotFound
from flaschengeist.controller import roleController, userController
USER_KEY = f"{__name__}.user"
def user(ctx, param, value):
if not value or ctx.resilient_parsing:
return
click.echo("Adding new user")
ctx.meta[USER_KEY] = {}
try:
ctx.meta[USER_KEY]["userid"] = click.prompt("userid", type=str)
ctx.meta[USER_KEY]["firstname"] = click.prompt("firstname", type=str)
ctx.meta[USER_KEY]["lastname"] = click.prompt("lastname", type=str)
ctx.meta[USER_KEY]["display_name"] = click.prompt("displayed name", type=str, default="")
ctx.meta[USER_KEY]["mail"] = click.prompt("mail", type=str, default="")
ctx.meta[USER_KEY]["password"] = click.prompt("password", type=str, confirmation_prompt=True, hide_input=True)
ctx.meta[USER_KEY] = {k: v for k, v in ctx.meta[USER_KEY].items() if v != ""}
except click.Abort:
click.echo("\n!!! User was not added, aborted.")
del ctx.meta[USER_KEY]
@click.command()
@click.option("--add-role", help="Add new role", type=str)
@click.option("--set-admin", help="Make a role an admin role, adding all permissions", type=str)
@click.option("--add-user", help="Add new user interactivly", callback=user, is_flag=True, expose_value=False)
@with_appcontext
def users(add_role, set_admin):
from flaschengeist.database import db
ctx = click.get_current_context()
try:
if add_role:
roleController.create_role(add_role)
if set_admin:
role = roleController.get(set_admin)
role.permissions = roleController.get_permissions()
db.session.commit()
if USER_KEY in ctx.meta:
userController.register(ctx.meta[USER_KEY], ctx.meta[USER_KEY]["password"])
except (BadRequest, NotFound) as e:
ctx.fail(e.description)

View File

@ -1,226 +0,0 @@
#!/usr/bin/python3
from __future__ import annotations # TODO: Remove if python requirement is >= 3.10
import inspect
import argparse
import sys
import pkg_resources
from os import environ
from flaschengeist.app import create_app, install_all
from flaschengeist.config import config
class PrefixMiddleware(object):
def __init__(self, app, prefix=""):
self.app = app
self.prefix = prefix
def __call__(self, environ, start_response):
if environ["PATH_INFO"].startswith(self.prefix):
environ["PATH_INFO"] = environ["PATH_INFO"][len(self.prefix) :]
environ["SCRIPT_NAME"] = self.prefix
return self.app(environ, start_response)
else:
start_response("404", [("Content-Type", "text/plain")])
return ["This url does not belong to the app.".encode()]
class InterfaceGenerator:
known = []
classes = {}
mapper = {
"str": "string",
"int": "number",
"float": "number",
"date": "Date",
"datetime": "Date",
"NoneType": "null",
"bool": "boolean",
}
def __init__(self, namespace, filename):
self.basename = ""
self.namespace = namespace
self.filename = filename
self.this_type = None
def pytype(self, cls):
a = self._pytype(cls)
print(f"{cls} -> {a}")
return a
def _pytype(self, cls):
import typing
origin = typing.get_origin(cls)
arguments = typing.get_args(cls)
if origin is typing.ForwardRef: # isinstance(cls, typing.ForwardRef):
return "", "this" if cls.__forward_arg__ == self.this_type else cls.__forward_arg__
if origin is typing.Union:
print(f"A1: {arguments[1]}")
if len(arguments) == 2 and arguments[1] is type(None):
return "?", self.pytype(arguments[0])[1]
else:
return "", "|".join([self.pytype(pt)[1] for pt in arguments])
if origin is list:
return "", "Array<{}>".format("|".join([self.pytype(a_type)[1] for a_type in arguments]))
name = cls.__name__ if hasattr(cls, "__name__") else cls if isinstance(cls, str) else None
if name is not None:
if name in self.mapper:
return "", self.mapper[name]
else:
return "", name
print(
"WARNING: This python version might not detect all types (try >= 3.9). Could not identify >{}<".format(cls)
)
return "?", "any"
def walker(self, module):
if sys.version_info < (3, 9):
raise RuntimeError("Python >= 3.9 is required to export API")
import typing
if (
inspect.ismodule(module[1])
and module[1].__name__.startswith(self.basename)
and module[1].__name__ not in self.known
):
self.known.append(module[1].__name__)
for cls in inspect.getmembers(module[1], lambda x: inspect.isclass(x) or inspect.ismodule(x)):
self.walker(cls)
elif (
inspect.isclass(module[1])
and module[1].__module__.startswith(self.basename)
and module[0] not in self.classes
and not module[0].startswith("_")
and hasattr(module[1], "__annotations__")
):
self.this_type = module[0]
print("\n\n" + module[0] + "\n")
d = {}
for param, ptype in typing.get_type_hints(module[1], globalns=None, localns=None).items():
if not param.startswith("_") and not param.endswith("_"):
print(f"{param} ::: {ptype}")
d[param] = self.pytype(ptype)
if len(d) == 1:
key, value = d.popitem()
self.classes[module[0]] = value[1]
else:
self.classes[module[0]] = d
def run(self, models):
self.basename = models.__name__
self.walker(("models", models))
def write(self):
with open(self.filename, "w") as file:
file.write("declare namespace {} {{\n".format(self.namespace))
for cls, params in self.classes.items():
if isinstance(params, str):
file.write("\ttype {} = {};\n".format(cls, params))
else:
file.write("\tinterface {} {{\n".format(cls))
for name in params:
file.write("\t\t{}{}: {};\n".format(name, *params[name]))
file.write("\t}\n")
file.write("}\n")
def install(arguments):
app = create_app()
with app.app_context():
install_all()
def run(arguments):
app = create_app()
with app.app_context():
app.wsgi_app = PrefixMiddleware(app.wsgi_app, prefix=config["FLASCHENGEIST"].get("root", ""))
if arguments.debug:
environ["FLASK_DEBUG"] = "1"
app.run(arguments.host, arguments.port, debug=True)
else:
app.run(arguments.host, arguments.port, debug=False)
def export(arguments):
import flaschengeist.models as models
app = create_app()
with app.app_context():
gen = InterfaceGenerator(arguments.namespace, arguments.file)
if not arguments.no_core:
gen.run(models)
if arguments.plugins is not None:
for entry_point in pkg_resources.iter_entry_points("flaschengeist.plugin"):
if len(arguments.plugins) == 0 or entry_point.name in arguments.plugins:
plg = entry_point.load()
if hasattr(plg, "models") and plg.models is not None:
gen.run(plg.models)
gen.write()
def ldap(arguments):
app = create_app()
with app.app_context():
if arguments.set_admin:
from flaschengeist.controller import roleController
from flaschengeist.database import db
role = roleController.get(arguments.set_admin)
role.permissions = roleController.get_permissions()
db.session.commit()
if arguments.sync:
from flaschengeist.controller import userController
from flaschengeist.plugins.auth_ldap import AuthLDAP
from ldap3 import SUBTREE
auth_ldap: AuthLDAP = app.config.get("FG_AUTH_BACKEND")
if auth_ldap is None or not isinstance(auth_ldap, AuthLDAP):
raise Exception("Plugin >auth_ldap< not found")
conn = auth_ldap.ldap.connection
if not conn:
conn = auth_ldap.ldap.connect(auth_ldap.root_dn, auth_ldap.root_secret)
conn.search(auth_ldap.search_dn, "(uid=*)", SUBTREE, attributes=["uid", "givenName", "sn", "mail"])
ldap_users_response = conn.response
for ldap_user in ldap_users_response:
uid = ldap_user["attributes"]["uid"][0]
userController.find_user(uid)
if __name__ == "__main__":
# create the top-level parser
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(help="sub-command help", dest="sub_command")
subparsers.required = True
parser_run = subparsers.add_parser("run", help="run flaschengeist")
parser_run.set_defaults(func=run)
parser_run.add_argument("--host", help="set hostname to listen on", default="127.0.0.1")
parser_run.add_argument("--port", help="set port to listen on", type=int, default=5000)
parser_run.add_argument("--debug", help="run in debug mode", action="store_true")
parser_install = subparsers.add_parser(
"install", help="run database setup for flaschengeist and all installed plugins"
)
parser_install.set_defaults(func=install)
parser_export = subparsers.add_parser("export", help="export models to typescript interfaces")
parser_export.set_defaults(func=export)
parser_export.add_argument("--file", help="Filename where to save", default="flaschengeist.d.ts")
parser_export.add_argument("--namespace", help="Namespace of declarations", default="FG")
parser_export.add_argument(
"--no-core",
help="Do not export core declarations (only useful in conjunction with --plugins)",
action="store_true",
)
parser_export.add_argument("--plugins", help="Also export plugins (none means all)", nargs="*")
parser_ldap = subparsers.add_parser("ldap", help="LDAP helper utils")
parser_ldap.set_defaults(func=ldap)
parser_ldap.add_argument('--sync', action="store_true", help="Sync ldap-users with database")
parser_ldap.add_argument('--set-admin', type=str, help="Assign all permissions this to group")
args = parser.parse_args()
args.func(args)