84 lines
2.5 KiB
Python
84 lines
2.5 KiB
Python
import pkg_resources
|
|
from datetime import datetime, timedelta
|
|
from flask import Blueprint
|
|
|
|
from flaschengeist import logger
|
|
from flaschengeist.utils.HTTP import no_content
|
|
|
|
from . import Plugin
|
|
|
|
|
|
class __Task:
|
|
def __init__(self, function, **kwags):
|
|
self.function = function
|
|
self.interval = timedelta(**kwags)
|
|
|
|
|
|
_scheduled_tasks: dict[__Task] = dict()
|
|
|
|
|
|
def scheduled(id: str, replace=False, **kwargs):
|
|
"""
|
|
kwargs: days, hours, minutes
|
|
"""
|
|
|
|
def real_decorator(function):
|
|
if id not in _scheduled_tasks or replace:
|
|
logger.info(f"Registered task: {id}")
|
|
_scheduled_tasks[id] = __Task(function, **kwargs)
|
|
else:
|
|
logger.debug(f"Skipping already registered task: {id}")
|
|
return function
|
|
|
|
return real_decorator
|
|
|
|
|
|
class SchedulerPlugin(Plugin):
|
|
id = "dev.flaschengeist.scheduler"
|
|
name = "scheduler"
|
|
blueprint = Blueprint(name, __name__)
|
|
|
|
def __init__(self, config=None):
|
|
"""Constructor called by create_app
|
|
Args:
|
|
config: Dict configuration containing the plugin section
|
|
"""
|
|
|
|
def __view_func():
|
|
self.run_tasks()
|
|
return no_content()
|
|
|
|
def __passiv_func(v):
|
|
try:
|
|
self.run_tasks()
|
|
except:
|
|
logger.error("Error while executing scheduled tasks!", exc_info=True)
|
|
|
|
self.version = pkg_resources.get_distribution(self.__module__.split(".")[0]).version
|
|
cron = None if config is None else config.get("cron", "passive_web").lower()
|
|
|
|
if cron is None or cron == "passive_web":
|
|
self.blueprint.teardown_app_request(__passiv_func)
|
|
elif cron == "active_web":
|
|
self.blueprint.add_url_rule("/cron", view_func=__view_func)
|
|
|
|
def run_tasks(self):
|
|
changed = False
|
|
now = datetime.now()
|
|
status = self.get_setting("status", default=dict())
|
|
|
|
for id, task in _scheduled_tasks.items():
|
|
last_run = status.setdefault(id, now)
|
|
if last_run + task.interval <= now:
|
|
logger.debug(
|
|
f"Run task {id}, was scheduled for {last_run + task.interval}, next iteration: {now + task.interval}"
|
|
)
|
|
task.function()
|
|
changed = True
|
|
if changed:
|
|
# Remove not registered tasks
|
|
for id in status.keys():
|
|
if id not in _scheduled_tasks.keys():
|
|
del status[id]
|
|
self.set_setting("status", status)
|