feat(scheduler): Add function to add scheduled tasks programmatically

This commit is contained in:
Ferdinand Thiessen 2021-12-18 04:19:57 +01:00
parent 691345cf40
commit e1f919bd20
1 changed files with 14 additions and 6 deletions

View File

@ -17,19 +17,25 @@ class __Task:
_scheduled_tasks: dict[__Task] = dict()
def add_scheduled(id: str, function, replace=False, **kwargs):
if id not in _scheduled_tasks or replace:
_scheduled_tasks[id] = __Task(function, **kwargs)
logger.info(f"Registered task: {id}")
else:
logger.debug(f"Skipping already registered task: {id}")
def scheduled(id: str, replace=False, **kwargs):
"""
kwargs: days, hours, minutes
"""
def real_decorator(function):
if id not in _scheduled_tasks or replace:
logger.info(f"Registered task: {id}")
_scheduled_tasks[id] = __Task(function, **kwargs)
else:
logger.debug(f"Skipping already registered task: {id}")
add_scheduled(id, function, replace, **kwargs)
return function
if not isinstance(id, str):
raise TypeError
return real_decorator
@ -75,9 +81,11 @@ class SchedulerPlugin(Plugin):
)
task.function()
changed = True
else:
logger.debug(f"Skip task {id}, is scheduled for {last_run + task.interval}")
if changed:
# Remove not registered tasks
for id in status.keys():
if id not in _scheduled_tasks.keys():
del status[id]
self.set_setting("status", status)
self.set_setting("status", status)