notes_app_es.py 7.7 KB


  1. from __future__ import annotations # do we need this?
  2. from contextvars import ContextVar
  3. from typing import Any, Dict, Iterator, Optional, Union, cast
  4. from typing import Optional, cast
  5. from dataclasses import dataclass, field
  6. from uuid import NAMESPACE_URL, UUID, uuid5
  7. from eventsourcing.domain import Aggregate, DomainEvent, event
  8. from eventsourcing.application import AggregateNotFound, Application, EventSourcedLog
  9. from eventsourcing.utils import EnvType
  10. from eventsourcing.domain import create_utc_datetime_now
  11. from eventsourcing.system import ProcessApplication
  12. from eventsourcing.dispatch import singledispatchmethod
  13. from diff_match_patch import diff_match_patch
  14. user_id_cvar: ContextVar[Optional[UUID]] = ContextVar("user_id", default=None)
  15. @dataclass
  16. class Note(Aggregate):
  17. slug: str
  18. # Not certain about this strategy combined with Event.apply using the event's TS
  19. created_at: datetime = None #field(default_factory=create_utc_datetime_now, init=False)
  20. modified_at: datetime = None #field(default_factory=create_utc_datetime_now, init=False)
  21. body: str = ""
  22. title: Optional[str] = None
  23. modified_by: Optional[UUID] = field(default=None, init=False)
  24. class Event(Aggregate.Event):
  25. user_id: Optional[UUID] = field(default_factory=user_id_cvar.get, init=False)
  26. def apply(self, aggregate: Aggregate) -> None:
  27. if type(self) == Note.Created:
  28. # Not 100% sure this is legit, default TS from the event...
  29. # From the perspective of a subscriber it makes no sense, but then
  30. # logical timestamps seem excessive/redundant.
  31. if cast(Note, aggregate).created_at:
  32. print(f'Note.Created already has TS; ie. logically specified {self.originator_id}')
  33. else:
  34. cast(Note, aggregate).created_at = self.timestamp
  35. cast(Note, aggregate).modified_at = self.timestamp
  36. cast(Note, aggregate).modified_by = self.user_id
  37. @event("SlugUpdated")
  38. def update_slug(self, slug: str) -> None:
  39. self.slug = slug
  40. @event("TitleUpdated")
  41. def update_title(self, title: str) -> None:
  42. self.title = title
  43. def update_body(self, body: str) -> None:
  44. self._update_body(create_diff(old=self.body, new=body))
  45. @event("BodyUpdated")
  46. def _update_body(self, diff: str) -> None:
  47. self.body = apply_patch(old=self.body, diff=diff)
  48. @dataclass
  49. class Index(Aggregate):
  50. slug: str
  51. ref: Optional[UUID]
  52. class Event(Aggregate.Event):
  53. pass
  54. @staticmethod
  55. def create_id(slug: str) -> UUID:
  56. return uuid5(NAMESPACE_URL, f"/slugs/{slug}")
  57. @event("RefChanged")
  58. def update_ref(self, ref: Optional[UUID]) -> None:
  59. self.ref = ref
  60. class NoteLogged(DomainEvent):
  61. note_id: UUID
  62. dmp = diff_match_patch()
  63. def create_diff(old: str, new: str) -> str:
  64. patches = dmp.patch_make(old, new)
  65. diff = dmp.patch_toText(patches)
  66. return diff
  67. def apply_patch(old: str, diff: str) -> str:
  68. patches = dmp.patch_fromText(diff)
  69. new_text, _ = dmp.patch_apply(patches, old)
  70. return new_text
  71. # In practice we may create a ViewModel/TransferObject that mirrors the Aggregate or just use that.
  72. NoteDetailsType = Dict[str, Union[str, Any]]
  73. class NotesApplication(Application):
  74. env = {} # {"COMPRESSOR_TOPIC": "gzip"}
  75. snapshotting_intervals = {Note: 5}
  76. def __init__(self, env: Optional[EnvType] = None) -> None:
  77. super().__init__(env)
  78. self.note_log: EventSourcedLog[NoteLogged] = EventSourcedLog(
  79. self.events, uuid5(NAMESPACE_URL, "/note_log"), NoteLogged
  80. )
  81. def create_note(self, title: str, slug: str, body: Optional[str] = "", created_at: Optional[datetime] = None) -> None:
  82. note = Note(title=title, slug=slug, body=body, created_at=created_at, modified_at=created_at)
  83. note_logged = self.note_log.trigger_event(note_id=note.id) # timestamp=created_at fails
  84. index_entry = Index(slug, ref=note.id)
  85. self.save(note, note_logged, index_entry)
  86. def get_note_details(self, slug: str) -> NoteDetailsType:
  87. note = self._get_note_by_slug(slug)
  88. return self._details_from_note(note)
  89. def _details_from_note(self, note: Note) -> NoteDetailsType:
  90. return {
  91. "title": note.title,
  92. "slug": note.slug,
  93. "body": note.body,
  94. "modified_by": note.modified_by,
  95. "created_at": note.created_at,
  96. "modified_at": note.modified_at,
  97. }
  98. def update_title(self, slug: str, title: str) -> None:
  99. note = self._get_note_by_slug(slug)
  100. note.update_title(title=title)
  101. self.save(note)
  102. def update_slug(self, old_slug: str, new_slug: str) -> None:
  103. note = self._get_note_by_slug(old_slug)
  104. note.update_slug(new_slug)
  105. old_index = self._get_index(old_slug)
  106. old_index.update_ref(None)
  107. try:
  108. new_index = self._get_index(new_slug)
  109. except AggregateNotFound:
  110. new_index = Index(new_slug, note.id)
  111. else:
  112. if new_index.ref is None:
  113. new_index.update_ref(note.id)
  114. else:
  115. raise SlugConflictError()
  116. self.save(note, old_index, new_index)
  117. def update_body(self, slug: str, body: str) -> None:
  118. note = self._get_note_by_slug(slug)
  119. note.update_body(body)
  120. self.save(note)
  121. def _get_note_by_slug(self, slug: str) -> Note:
  122. try:
  123. index = self._get_index(slug)
  124. except AggregateNotFound:
  125. raise NoteNotFound(slug)
  126. if index.ref is None:
  127. raise NoteNotFound(slug)
  128. note_id = index.ref
  129. return self._get_note_by_id(note_id)
  130. def _get_note_by_id(self, note_id: UUID) -> Note:
  131. return cast(Note, self.repository.get(note_id))
  132. def _get_index(self, slug: str) -> Index:
  133. return cast(Index, self.repository.get(Index.create_id(slug)))
  134. def get_notes(
  135. self,
  136. gt: Optional[int] = None,
  137. lte: Optional[int] = None,
  138. desc: bool = False,
  139. limit: Optional[int] = None,
  140. ) -> Iterator[NoteDetailsType]:
  141. for note_logged in self.note_log.get(gt, lte, desc, limit):
  142. note = self._get_note_by_id(note_logged.note_id)
  143. yield self._details_from_note(note)
  144. class NoteNotFound(Exception):
  145. """
  146. Raised when a note is not found.
  147. """
  148. class SlugConflictError(Exception):
  149. """
  150. Raised when updating a note to a slug used by another note.
  151. """
  152. class Counter(Aggregate):
  153. def __init__(self, name):
  154. self.name = name
  155. self.count = 0
  156. @classmethod
  157. def create_id(cls, name):
  158. return uuid5(NAMESPACE_URL, f'/counters/{name}')
  159. @event('Incremented')
  160. def increment(self):
  161. self.count += 1
  162. class NoteAnalytics(ProcessApplication):
  163. @singledispatchmethod
  164. def policy(self, domain_event, process_event):
  165. """Default policy"""
  166. @policy.register(Note.BodyUpdated)
  167. def _(self, domain_event, process_event):
  168. note_id = domain_event.originator_id
  169. print(f"NoteAnalytics: Note.BodyUpdated: {note_id}")
  170. try:
  171. counter_id = Counter.create_id(note_id)
  172. counter = self.repository.get(counter_id)
  173. except AggregateNotFound:
  174. counter = Counter(note_id)
  175. counter.increment()
  176. print(f" Count = {counter.count}")
  177. process_event.collect_events(counter)
  178. def get_count(self, note_id):
  179. counter_id = Counter.create_id(note_id)
  180. try:
  181. counter = self.repository.get(counter_id)
  182. except AggregateNotFound:
  183. return 0
  184. return counter.count