import pkg_resources from datetime import datetime, timedelta from flask import Blueprint from flaschengeist import logger from flaschengeist.utils.HTTP import no_content from . import Plugin class __Task: def __init__(self, function, **kwags): self.function = function self.interval = timedelta(**kwags) _scheduled_tasks: dict[__Task] = dict() def add_scheduled(id: str, function, replace=False, **kwargs): if id not in _scheduled_tasks or replace: _scheduled_tasks[id] = __Task(function, **kwargs) logger.info(f"Registered task: {id}") else: logger.debug(f"Skipping already registered task: {id}") def scheduled(id: str, replace=False, **kwargs): """ kwargs: days, hours, minutes """ def real_decorator(function): add_scheduled(id, function, replace, **kwargs) return function if not isinstance(id, str): raise TypeError return real_decorator class SchedulerPlugin(Plugin): id = "dev.flaschengeist.scheduler" name = "scheduler" blueprint = Blueprint(name, __name__) def __init__(self, config=None): """Constructor called by create_app Args: config: Dict configuration containing the plugin section """ def __view_func(): self.run_tasks() return no_content() def __passiv_func(v): try: self.run_tasks() except: logger.error("Error while executing scheduled tasks!", exc_info=True) self.version = pkg_resources.get_distribution(self.__module__.split(".")[0]).version cron = None if config is None else config.get("cron", "passive_web").lower() if cron is None or cron == "passive_web": self.blueprint.teardown_app_request(__passiv_func) elif cron == "active_web": self.blueprint.add_url_rule("/cron", view_func=__view_func) def run_tasks(self): changed = False now = datetime.now() status = self.get_setting("status", default=dict()) for id, task in _scheduled_tasks.items(): last_run = status.setdefault(id, now) if last_run + task.interval <= now: logger.debug( f"Run task {id}, was scheduled for {last_run + task.interval}, next iteration: {now + task.interval}" ) task.function() changed = True else: logger.debug(f"Skip task {id}, is scheduled for {last_run + task.interval}") if changed: # Remove not registered tasks for id in status.keys(): if id not in _scheduled_tasks.keys(): del status[id] self.set_setting("status", status)