import datetime import schedule import time from uuid import uuid4 from typing import Optional, Any from dataclasses import dataclass @dataclass class ScheduledJob: """ Basically an import of our schedule library: https://schedule.readthedocs.io/en/stable/_modules/schedule.html#Job """ id: str interval: int # pause interval * unit between runs latest: Optional[int] = None # upper limit to the interval #job_func: Optional[functools.partial] = None # the job job_func to run # time units, e.g. 'minutes', 'hours', ... unit: Optional[str] = None # optional time at which this job runs at_time: Optional[datetime.time] = None # optional time zone of the self.at_time field. Only relevant when at_time is not None at_time_zone:Optional[Any] = None # str or pytz.BaseTzInfo # datetime of the last run last_run: Optional[datetime.datetime] = None # datetime of the next run next_run: Optional[datetime.datetime] = None # timedelta between runs, only valid for period: Optional[datetime.timedelta] = None # period vs. interval? # Specific day of the week to start on start_day: Optional[str] = None # optional time of final run cancel_after: Optional[datetime.datetime] = None job_fn:Optional[Any] = None class ScheduleApplication: """ Maintains a persistent schedule of jobs to run and manages execution. """ def __init__ (self): self.scheduler = schedule.default_scheduler self.jobs = {} def schedule_job (self, sched_job_spec: schedule.Job, job_fn: str): """ Usage: schedule_job(schedule.every(3).weeks.at(...), 'module.fn') module.fn needs to be ready to run when the job scheduler comes online. Can't be something like a partial, needs to be able to be recreated at startup. - we can persist a schedule.Job using its properties, and restore it upon startup. https://schedule.readthedocs.io/en/stable/_modules/schedule.html#Job """ job_id = uuid4().hex sched_job_spec._schedule_next_run() job = ScheduledJob( id = job_id, interval = sched_job_spec.interval, latest = sched_job_spec.latest, unit = sched_job_spec.unit, at_time = sched_job_spec.at_time, at_time_zone = sched_job_spec.at_time_zone, next_run = sched_job_spec.next_run, last_run = sched_job_spec.last_run, period = sched_job_spec.period, start_day = sched_job_spec.start_day, cancel_after = sched_job_spec.cancel_after, job_fn = job_fn ) self.jobs[ job_id ] = job sched_job = self._as_sched_job_spec( job ) sched_job.scheduler = self.scheduler sched_job.do( job.job_fn ) # FIXME save jobs return job_id def get_missed_jobs (self): pass def delete_job (self, job_id): del self.jobs[job_id] # FIXME save jobs def _as_sched_job_spec (self, job): sched_job_spec = schedule.Job( interval=job.interval ) sched_job_spec.latest = job.latest sched_job_spec.unit = job.unit sched_job_spec.at_time = job.at_time sched_job_spec.at_time_zone = job.at_time_zone sched_job_spec.next_run = job.next_run sched_job_spec.last_run = job.last_run sched_job_spec.period = job.period sched_job_spec.start_day = job.start_day sched_job_spec.cancel_after = job.cancel_after return sched_job_spec def restore_schedule (self): # FIXME load from storage saved_jobs = {} for job_id, job in saved_jobs: job_func = lambda: print(f'running job: {job_id}') # TODO check if we missed a run # if next_run is in the past # we want to get user input for what to do. # Wire and schedule for execution # #sched_job_spec.job_func = job_func # # two options: # job.scheduler = ... # job.do( job_func ) # scheduler.jobs.append(sched_job_spec) # FIXME we need to find a way to serialize this. Maybe just full qual name with some rules, like no partials/lambdas. job_fn = None # replace(job, job_fn => job_fn) #self.jobs[ job_id ] = job #sched_job_spec = self._as_sched_job_spec( job ) #sched_job_spec.scheduler = self.scheduler #sched_job_spec.do( job_fn ) def run_pending (self): self.scheduler.run_pending()