86 lines
2.5 KiB
Python
86 lines
2.5 KiB
Python
from flask import Blueprint
|
|
from datetime import datetime, timedelta
|
|
|
|
from flaschengeist import logger
|
|
from flaschengeist.utils.HTTP import no_content
|
|
|
|
from . import Plugin
|
|
|
|
|
|
class __Task:
|
|
def __init__(self, function, **kwags):
|
|
self.function = function
|
|
self.interval = timedelta(**kwags)
|
|
|
|
|
|
_scheduled_tasks: dict[__Task] = dict()
|
|
|
|
|
|
def add_scheduled(id: str, function, replace=False, **kwargs):
|
|
if id not in _scheduled_tasks or replace:
|
|
_scheduled_tasks[id] = __Task(function, **kwargs)
|
|
logger.info(f"Registered task: {id}")
|
|
else:
|
|
logger.debug(f"Skipping already registered task: {id}")
|
|
|
|
|
|
def scheduled(id: str, replace=False, **kwargs):
|
|
"""
|
|
kwargs: days, hours, minutes
|
|
"""
|
|
|
|
def real_decorator(function):
|
|
add_scheduled(id, function, replace, **kwargs)
|
|
return function
|
|
|
|
if not isinstance(id, str):
|
|
raise TypeError
|
|
return real_decorator
|
|
|
|
|
|
class SchedulerPlugin(Plugin):
|
|
id = "scheduler"
|
|
|
|
def __init__(self, entry_point, config=None):
|
|
super().__init__(entry_point, config)
|
|
self.blueprint = Blueprint(self.name, __name__)
|
|
|
|
def __view_func():
|
|
self.run_tasks()
|
|
return no_content()
|
|
|
|
def __passiv_func(v):
|
|
try:
|
|
self.run_tasks()
|
|
except:
|
|
logger.error("Error while executing scheduled tasks!", exc_info=True)
|
|
|
|
cron = None if config is None else config.get("cron", "passive_web").lower()
|
|
|
|
if cron is None or cron == "passive_web":
|
|
self.blueprint.teardown_app_request(__passiv_func)
|
|
elif cron == "active_web":
|
|
self.blueprint.add_url_rule("/cron", view_func=__view_func)
|
|
|
|
def run_tasks(self):
|
|
changed = False
|
|
now = datetime.now()
|
|
status = self.get_setting("status", default=dict())
|
|
|
|
for id, task in _scheduled_tasks.items():
|
|
last_run = status.setdefault(id, now)
|
|
if last_run + task.interval <= now:
|
|
logger.debug(
|
|
f"Run task {id}, was scheduled for {last_run + task.interval}, next iteration: {now + task.interval}"
|
|
)
|
|
task.function()
|
|
changed = True
|
|
else:
|
|
logger.debug(f"Skip task {id}, is scheduled for {last_run + task.interval}")
|
|
if changed:
|
|
# Remove not registered tasks
|
|
for id in status.keys():
|
|
if id not in _scheduled_tasks.keys():
|
|
del status[id]
|
|
self.set_setting("status", status)
|