123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- import datetime
- import schedule
- import time
- from uuid import uuid4
- from typing import Optional, Any
- from dataclasses import dataclass
- @dataclass
- class ScheduledJob:
- """
- Basically an import of our schedule library:
-
- https://schedule.readthedocs.io/en/stable/_modules/schedule.html#Job
- """
- id: str
-
- interval: int # pause interval * unit between runs
-
-
- latest: Optional[int] = None # upper limit to the interval
- #job_func: Optional[functools.partial] = None # the job job_func to run
- # time units, e.g. 'minutes', 'hours', ...
- unit: Optional[str] = None
- # optional time at which this job runs
- at_time: Optional[datetime.time] = None
- # optional time zone of the self.at_time field. Only relevant when at_time is not None
- at_time_zone:Optional[Any] = None # str or pytz.BaseTzInfo
- # datetime of the last run
- last_run: Optional[datetime.datetime] = None
- # datetime of the next run
- next_run: Optional[datetime.datetime] = None
- # timedelta between runs, only valid for
- period: Optional[datetime.timedelta] = None # period vs. interval?
- # Specific day of the week to start on
- start_day: Optional[str] = None
- # optional time of final run
- cancel_after: Optional[datetime.datetime] = None
-
- job_fn:Optional[Any] = None
- class ScheduleApplication:
- """
- Maintains a persistent schedule of jobs to run and manages execution.
- """
-
- def __init__ (self):
-
- self.scheduler = schedule.default_scheduler
- self.jobs = {}
-
-
- def schedule_job (self, sched_job_spec: schedule.Job, job_fn: str):
- """
- Usage:
-
- schedule_job(schedule.every(3).weeks.at(...), 'module.fn')
-
- module.fn needs to be ready to run when the job scheduler comes online.
- Can't be something like a partial, needs to be able to be recreated at startup.
-
- -
-
- we can persist a schedule.Job using its properties,
- and restore it upon startup.
-
- https://schedule.readthedocs.io/en/stable/_modules/schedule.html#Job
- """
-
- job_id = uuid4().hex
-
- sched_job_spec._schedule_next_run()
-
- job = ScheduledJob(
- id = job_id,
- interval = sched_job_spec.interval,
- latest = sched_job_spec.latest,
- unit = sched_job_spec.unit,
- at_time = sched_job_spec.at_time,
- at_time_zone = sched_job_spec.at_time_zone,
- next_run = sched_job_spec.next_run,
- last_run = sched_job_spec.last_run,
- period = sched_job_spec.period,
- start_day = sched_job_spec.start_day,
- cancel_after = sched_job_spec.cancel_after,
- job_fn = job_fn
- )
-
- self.jobs[ job_id ] = job
-
- sched_job = self._as_sched_job_spec( job )
-
- sched_job.scheduler = self.scheduler
- sched_job.do( job.job_fn )
-
-
- # FIXME save jobs
-
- return job_id
-
- def get_missed_jobs (self):
- pass
-
- def delete_job (self, job_id):
- del self.jobs[job_id]
-
- # FIXME save jobs
-
- def _as_sched_job_spec (self, job):
- sched_job_spec = schedule.Job( interval=job.interval )
-
- sched_job_spec.latest = job.latest
- sched_job_spec.unit = job.unit
- sched_job_spec.at_time = job.at_time
- sched_job_spec.at_time_zone = job.at_time_zone
- sched_job_spec.next_run = job.next_run
- sched_job_spec.last_run = job.last_run
- sched_job_spec.period = job.period
- sched_job_spec.start_day = job.start_day
- sched_job_spec.cancel_after = job.cancel_after
-
- return sched_job_spec
-
- def restore_schedule (self):
-
- # FIXME load from storage
- saved_jobs = {}
-
- for job_id, job in saved_jobs:
- job_func = lambda: print(f'running job: {job_id}')
-
-
-
- # TODO check if we missed a run
- # if next_run is in the past
- # we want to get user input for what to do.
-
- # Wire and schedule for execution
- #
- #sched_job_spec.job_func = job_func
- #
- # two options:
- # job.scheduler = ...
- # job.do( job_func )
- # scheduler.jobs.append(sched_job_spec)
-
- # FIXME we need to find a way to serialize this. Maybe just full qual name with some rules, like no partials/lambdas.
- job_fn = None
-
- # replace(job, job_fn => job_fn)
-
- #self.jobs[ job_id ] = job
-
- #sched_job_spec = self._as_sched_job_spec( job )
-
- #sched_job_spec.scheduler = self.scheduler
- #sched_job_spec.do( job_fn )
-
-
-
- def run_pending (self):
- self.scheduler.run_pending()
-
|