173 lines
5.5 KiB
Python
173 lines
5.5 KiB
Python
"""Controller for Plugin logic
|
|
|
|
Used by plugins for setting and notification functionality.
|
|
"""
|
|
|
|
from typing import Union, List
|
|
from flask import current_app
|
|
from werkzeug.exceptions import NotFound, BadRequest
|
|
from sqlalchemy.exc import OperationalError, ProgrammingError
|
|
from flask_migrate import upgrade as database_upgrade
|
|
from importlib.metadata import entry_points
|
|
|
|
from flaschengeist import version as flaschengeist_version
|
|
|
|
from .. import logger
|
|
from ..database import db
|
|
from ..utils.hook import Hook
|
|
from ..plugins import Plugin, AuthPlugin
|
|
from ..models import Notification
|
|
|
|
|
|
__required_plugins = ["users", "roles", "scheduler", "auth"]
|
|
|
|
|
|
def get_authentication_provider():
|
|
return [
|
|
current_app.config["FG_PLUGINS"][plugin.name]
|
|
for plugin in get_loaded_plugins().values()
|
|
if isinstance(plugin, AuthPlugin)
|
|
]
|
|
|
|
|
|
def get_loaded_plugins(plugin_name: str = None):
|
|
"""Get loaded plugin(s)"""
|
|
plugins = current_app.config["FG_PLUGINS"]
|
|
if plugin_name is not None:
|
|
plugins = [plugins[plugin_name]]
|
|
return {name: db.session.merge(plugins[name], load=False) for name in plugins}
|
|
|
|
|
|
def get_installed_plugins() -> list[Plugin]:
|
|
"""Get all installed plugins"""
|
|
return Plugin.query.all()
|
|
|
|
|
|
def get_enabled_plugins() -> list[Plugin]:
|
|
"""Get all installed and enabled plugins"""
|
|
try:
|
|
enabled_plugins = Plugin.query.filter(Plugin.enabled == True).all()
|
|
except (OperationalError, ProgrammingError) as e:
|
|
logger.error("Could not connect to database or database not initialized! No plugins enabled!")
|
|
logger.debug("Can not query enabled plugins", exc_info=True)
|
|
# Fake load required plugins so the database can at least be installed
|
|
enabled_plugins = [
|
|
entry_points(group="flaschengeist.plugins", name=name)[0].load()(
|
|
name=name, enabled=True, installed_version=flaschengeist_version
|
|
)
|
|
for name in __required_plugins
|
|
]
|
|
return enabled_plugins
|
|
|
|
|
|
def notify(plugin_id: int, user, text: str, data=None):
|
|
"""Create a new notification for an user
|
|
|
|
Args:
|
|
plugin_id: ID of the plugin
|
|
user: `flaschengeist.models.user.User` to notify
|
|
text: Visibile notification text
|
|
data: Optional data passed to the notificaton
|
|
Returns:
|
|
ID of the created `flaschengeist.models.notification.Notification`
|
|
|
|
Hint: use the data for frontend actions.
|
|
"""
|
|
if not user.deleted:
|
|
n = Notification(text=text, data=data, plugin_id_=plugin_id, user_=user)
|
|
db.session.add(n)
|
|
db.session.commit()
|
|
return n.id
|
|
|
|
|
|
def get_notifications(plugin_id) -> List[Notification]:
|
|
"""Get all notifications for a plugin
|
|
|
|
Args:
|
|
plugin_id: ID of the plugin
|
|
Returns:
|
|
List of `flaschengeist.models.notification.Notification`
|
|
"""
|
|
return db.session.execute(db.select(Notification).where(Notification.plugin_id_ == plugin_id)).scalars().all()
|
|
|
|
|
|
@Hook("plugins.installed")
|
|
def install_plugin(plugin_name: str):
|
|
logger.debug(f"Installing plugin {plugin_name}")
|
|
entry_point = entry_points(group="flaschengeist.plugins", name=plugin_name)
|
|
if not entry_point:
|
|
raise NotFound
|
|
|
|
cls = entry_point[0].load()
|
|
plugin: Plugin = cls.query.filter(Plugin.name == plugin_name).one_or_none()
|
|
if plugin is None:
|
|
plugin = cls(name=plugin_name, installed_version=entry_point[0].dist.version)
|
|
db.session.add(plugin)
|
|
db.session.commit()
|
|
# Custom installation steps
|
|
plugin.install()
|
|
# Check migrations
|
|
directory = entry_point[0].dist.locate_file("")
|
|
logger.debug(f"Checking for migrations in {directory}")
|
|
for loc in entry_point[0].module.split(".") + ["migrations"]:
|
|
directory /= loc
|
|
logger.debug(f"Checking for migrations with loc in {directory}")
|
|
if directory.exists():
|
|
logger.debug(f"Found migrations in {directory}")
|
|
database_upgrade(revision=f"{plugin_name}@head")
|
|
db.session.commit()
|
|
return plugin
|
|
|
|
|
|
@Hook("plugin.uninstalled")
|
|
def uninstall_plugin(plugin_id: Union[str, int, Plugin]):
|
|
plugin = disable_plugin(plugin_id)
|
|
logger.debug(f"Uninstall plugin {plugin.name}")
|
|
plugin.uninstall()
|
|
db.session.delete(plugin)
|
|
db.session.commit()
|
|
|
|
|
|
@Hook("plugins.enabled")
|
|
def enable_plugin(plugin_id: Union[str, int]) -> Plugin:
|
|
logger.debug(f"Enabling plugin {plugin_id}")
|
|
plugin = Plugin.query
|
|
if isinstance(plugin_id, str):
|
|
plugin = plugin.filter(Plugin.name == plugin_id).one_or_none()
|
|
elif isinstance(plugin_id, int):
|
|
plugin = plugin.get(plugin_id)
|
|
else:
|
|
raise TypeError
|
|
if plugin is None:
|
|
raise NotFound
|
|
plugin.enabled = True
|
|
db.session.commit()
|
|
plugin = plugin.entry_point.load().query.get(plugin.id)
|
|
current_app.config["FG_PLUGINS"][plugin.name] = plugin
|
|
return plugin
|
|
|
|
|
|
@Hook("plugins.disabled")
|
|
def disable_plugin(plugin_id: Union[str, int, Plugin]):
|
|
logger.debug(f"Disabling plugin {plugin_id}")
|
|
plugin: Plugin = Plugin.query
|
|
if isinstance(plugin_id, str):
|
|
plugin = plugin.filter(Plugin.name == plugin_id).one_or_none()
|
|
elif isinstance(plugin_id, int):
|
|
plugin = plugin.get(plugin_id)
|
|
elif isinstance(plugin_id, Plugin):
|
|
plugin = plugin_id
|
|
else:
|
|
raise TypeError
|
|
if plugin is None:
|
|
raise NotFound
|
|
if plugin.name in __required_plugins:
|
|
raise BadRequest
|
|
plugin.enabled = False
|
|
db.session.commit()
|
|
|
|
if plugin.name in current_app.config["FG_PLUGINS"].keys():
|
|
del current_app.config["FG_PLUGINS"][plugin.name]
|
|
|
|
return plugin
|