schedule_system.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import datetime
  2. import schedule
  3. import time
  4. from uuid import uuid4
  5. from typing import Optional, Any
  6. from dataclasses import dataclass
  7. @dataclass
  8. class ScheduledJob:
  9. """
  10. Basically an import of our schedule library:
  11. https://schedule.readthedocs.io/en/stable/_modules/schedule.html#Job
  12. """
  13. id: str
  14. interval: int # pause interval * unit between runs
  15. latest: Optional[int] = None # upper limit to the interval
  16. #job_func: Optional[functools.partial] = None # the job job_func to run
  17. # time units, e.g. 'minutes', 'hours', ...
  18. unit: Optional[str] = None
  19. # optional time at which this job runs
  20. at_time: Optional[datetime.time] = None
  21. # optional time zone of the self.at_time field. Only relevant when at_time is not None
  22. at_time_zone:Optional[Any] = None # str or pytz.BaseTzInfo
  23. # datetime of the last run
  24. last_run: Optional[datetime.datetime] = None
  25. # datetime of the next run
  26. next_run: Optional[datetime.datetime] = None
  27. # timedelta between runs, only valid for
  28. period: Optional[datetime.timedelta] = None # period vs. interval?
  29. # Specific day of the week to start on
  30. start_day: Optional[str] = None
  31. # optional time of final run
  32. cancel_after: Optional[datetime.datetime] = None
  33. job_fn:Optional[Any] = None
  34. class ScheduleApplication:
  35. """
  36. Maintains a persistent schedule of jobs to run and manages execution.
  37. """
  38. def __init__ (self):
  39. self.scheduler = schedule.default_scheduler
  40. self.jobs = {}
  41. def schedule_job (self, sched_job_spec: schedule.Job, job_fn: str):
  42. """
  43. Usage:
  44. schedule_job(schedule.every(3).weeks.at(...), 'module.fn')
  45. module.fn needs to be ready to run when the job scheduler comes online.
  46. Can't be something like a partial, needs to be able to be recreated at startup.
  47. -
  48. we can persist a schedule.Job using its properties,
  49. and restore it upon startup.
  50. https://schedule.readthedocs.io/en/stable/_modules/schedule.html#Job
  51. """
  52. job_id = uuid4().hex
  53. sched_job_spec._schedule_next_run()
  54. job = ScheduledJob(
  55. id = job_id,
  56. interval = sched_job_spec.interval,
  57. latest = sched_job_spec.latest,
  58. unit = sched_job_spec.unit,
  59. at_time = sched_job_spec.at_time,
  60. at_time_zone = sched_job_spec.at_time_zone,
  61. next_run = sched_job_spec.next_run,
  62. last_run = sched_job_spec.last_run,
  63. period = sched_job_spec.period,
  64. start_day = sched_job_spec.start_day,
  65. cancel_after = sched_job_spec.cancel_after,
  66. job_fn = job_fn
  67. )
  68. self.jobs[ job_id ] = job
  69. sched_job = self._as_sched_job_spec( job )
  70. sched_job.scheduler = self.scheduler
  71. sched_job.do( job.job_fn )
  72. # FIXME save jobs
  73. return job_id
  74. def get_missed_jobs (self):
  75. pass
  76. def delete_job (self, job_id):
  77. del self.jobs[job_id]
  78. # FIXME save jobs
  79. def _as_sched_job_spec (self, job):
  80. sched_job_spec = schedule.Job( interval=job.interval )
  81. sched_job_spec.latest = job.latest
  82. sched_job_spec.unit = job.unit
  83. sched_job_spec.at_time = job.at_time
  84. sched_job_spec.at_time_zone = job.at_time_zone
  85. sched_job_spec.next_run = job.next_run
  86. sched_job_spec.last_run = job.last_run
  87. sched_job_spec.period = job.period
  88. sched_job_spec.start_day = job.start_day
  89. sched_job_spec.cancel_after = job.cancel_after
  90. return sched_job_spec
  91. def restore_schedule (self):
  92. # FIXME load from storage
  93. saved_jobs = {}
  94. for job_id, job in saved_jobs:
  95. job_func = lambda: print(f'running job: {job_id}')
  96. # TODO check if we missed a run
  97. # if next_run is in the past
  98. # we want to get user input for what to do.
  99. # Wire and schedule for execution
  100. #
  101. #sched_job_spec.job_func = job_func
  102. #
  103. # two options:
  104. # job.scheduler = ...
  105. # job.do( job_func )
  106. # scheduler.jobs.append(sched_job_spec)
  107. # FIXME we need to find a way to serialize this. Maybe just full qual name with some rules, like no partials/lambdas.
  108. job_fn = None
  109. # replace(job, job_fn => job_fn)
  110. #self.jobs[ job_id ] = job
  111. #sched_job_spec = self._as_sched_job_spec( job )
  112. #sched_job_spec.scheduler = self.scheduler
  113. #sched_job_spec.do( job_fn )
  114. def run_pending (self):
  115. self.scheduler.run_pending()