test_notes_app_es.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. from dataclasses import replace
  2. from datetime import datetime
  3. import os
  4. from typing import cast
  5. import unittest
  6. from unittest import TestCase
  7. from uuid import uuid4
  8. from notes_app_es import (
  9. NotesApplication,
  10. NoteNotFound,
  11. SlugConflictError,
  12. )
  13. from notes_app_es import (
  14. Index,
  15. Note,
  16. user_id_cvar,
  17. )
  18. from notes_app_es import (
  19. NoteAnalytics,
  20. Counter,
  21. )
  22. from eventsourcing.system import NotificationLogReader
  23. from eventsourcing.domain import create_utc_datetime_now
  24. from eventsourcing.system import System, SingleThreadedRunner
  25. from eventsourcing.application import ProcessingEvent
  26. class TestContentManagement(TestCase):
  27. def test(self) -> None:
  28. # Set user_id context variable.
  29. user_id = uuid4()
  30. user_id_cvar.set(user_id)
  31. # Construct application.
  32. #app = NotesApplication()
  33. #analytics = NoteAnalytics()
  34. #system = System(pipes=[[app, analytics]])
  35. system = System(pipes=[[NotesApplication, NoteAnalytics]])
  36. runner = SingleThreadedRunner(system)
  37. runner.start()
  38. app = runner.get(NotesApplication)
  39. analytics = runner.get(NoteAnalytics)
  40. # Check the note doesn't exist.
  41. with self.assertRaises(NoteNotFound):
  42. app.get_note_details(slug="welcome")
  43. # Check the list of notes is empty.
  44. notes = list(app.get_notes())
  45. self.assertEqual(len(notes), 0)
  46. # Create a note.
  47. app.create_note(title="Welcome", slug="welcome")
  48. # Present note identified by the given slug.
  49. note = app.get_note_details(slug="welcome")
  50. #print(note)
  51. # Check we got a dict that has the given title and slug.
  52. self.assertEqual(note["title"], "Welcome")
  53. self.assertEqual(note["slug"], "welcome")
  54. self.assertEqual(note["body"], "")
  55. self.assertEqual(note["modified_by"], user_id)
  56. # Update the title.
  57. app.update_title(slug="welcome", title="Welcome Visitors")
  58. # Check the title was updated.
  59. note = app.get_note_details(slug="welcome")
  60. self.assertEqual(note["title"], "Welcome Visitors")
  61. self.assertEqual(note["modified_by"], user_id)
  62. # Update the slug.
  63. app.update_slug(old_slug="welcome", new_slug="welcome-visitors")
  64. # Check the index was updated.
  65. with self.assertRaises(NoteNotFound):
  66. app.get_note_details(slug="welcome")
  67. # Check we can get the note by the new slug.
  68. note = app.get_note_details(slug="welcome-visitors")
  69. self.assertEqual(note["title"], "Welcome Visitors")
  70. self.assertEqual(note["slug"], "welcome-visitors")
  71. # Update the body.
  72. app.update_body(slug="welcome-visitors", body="Welcome to my wiki")
  73. # Check the body was updated.
  74. note = app.get_note_details(slug="welcome-visitors")
  75. self.assertEqual(note["body"], "Welcome to my wiki")
  76. # Update the body.
  77. app.update_body(slug="welcome-visitors", body="Welcome to this wiki")
  78. # Check the body was updated.
  79. note = app.get_note_details(slug="welcome-visitors")
  80. self.assertEqual(note["body"], "Welcome to this wiki")
  81. # Update the body.
  82. app.update_body(
  83. slug="welcome-visitors",
  84. body="""
  85. Welcome to this wiki!
  86. This is a wiki about...
  87. """,
  88. )
  89. # Check the body was updated.
  90. note = app.get_note_details(slug="welcome-visitors")
  91. self.assertEqual(
  92. note["body"],
  93. """
  94. Welcome to this wiki!
  95. This is a wiki about...
  96. """,
  97. )
  98. # Check all the Note events have the user_id.
  99. for notification in NotificationLogReader(app.notification_log).read(start=1):
  100. domain_event = app.mapper.to_domain_event(notification)
  101. if isinstance(domain_event, Note.Event):
  102. self.assertEqual(domain_event.user_id, user_id)
  103. # Change user_id context variable.
  104. user_id = uuid4()
  105. user_id_cvar.set(user_id)
  106. # Update the body.
  107. app.update_body(
  108. slug="welcome-visitors",
  109. body="""
  110. Welcome to this wiki!
  111. This is a wiki about us!
  112. """,
  113. )
  114. # Check 'modified_by' changed.
  115. note = app.get_note_details(slug="welcome-visitors")
  116. self.assertEqual(note["title"], "Welcome Visitors")
  117. self.assertEqual(note["modified_by"], user_id)
  118. # Check a snapshot was created by now.
  119. assert app.snapshots
  120. index = cast(Index, app.repository.get(Index.create_id("welcome-visitors")))
  121. assert index.ref
  122. self.assertTrue(len(list(app.snapshots.get(index.ref))))
  123. # Create some more pages and list all the pages.
  124. app.create_note("Note 2", "note-2")
  125. app.create_note("Note 3", "note-3")
  126. app.create_note("Note 4", "note-4")
  127. app.create_note("Note 5", "note-5")
  128. app.create_note(None, "note-no-title")
  129. notes = list(app.get_notes(desc=True))
  130. self.assertEqual(notes[1]["title"], "Note 5")
  131. self.assertEqual(notes[1]["slug"], "note-5")
  132. self.assertEqual(notes[2]["title"], "Note 4")
  133. self.assertEqual(notes[2]["slug"], "note-4")
  134. self.assertEqual(notes[3]["title"], "Note 3")
  135. self.assertEqual(notes[3]["slug"], "note-3")
  136. self.assertEqual(notes[4]["title"], "Note 2")
  137. self.assertEqual(notes[4]["slug"], "note-2")
  138. self.assertEqual(notes[5]["title"], "Welcome Visitors")
  139. self.assertEqual(notes[5]["slug"], "welcome-visitors")
  140. self.assertEqual(notes[0]["title"], None)
  141. self.assertEqual(notes[0]["slug"], "note-no-title")
  142. notes = list(app.get_notes(desc=True, limit=3))
  143. self.assertEqual(len(notes), 3)
  144. self.assertEqual(notes[0]["slug"], "note-no-title")
  145. self.assertEqual(notes[1]["slug"], "note-5")
  146. self.assertEqual(notes[2]["slug"], "note-4")
  147. notes = list(app.get_notes(desc=True, limit=3, lte=2))
  148. self.assertEqual(len(notes), 2)
  149. self.assertEqual(notes[0]["slug"], "note-2")
  150. self.assertEqual(notes[1]["slug"], "welcome-visitors")
  151. notes = list(app.get_notes(desc=True, lte=2))
  152. self.assertEqual(len(notes), 2)
  153. self.assertEqual(notes[0]["slug"], "note-2")
  154. self.assertEqual(notes[1]["slug"], "welcome-visitors")
  155. # Check we can't change the slug of a note to one
  156. # that is being used by another note.
  157. with self.assertRaises(SlugConflictError):
  158. app.update_slug("note-2", "note-3")
  159. # Check we can change the slug of a note to one
  160. # that was previously being used.
  161. app.update_slug("welcome-visitors", "welcome")
  162. note = app.get_note_details(slug="welcome")
  163. self.assertEqual(note["title"], "Welcome Visitors")
  164. self.assertEqual(note["modified_by"], user_id)
  165. now_dt = create_utc_datetime_now()
  166. app.create_note(title='imported-note.md', slug='imported-note.md', created_at=now_dt, body='new note')
  167. note = app.get_note_details(slug="imported-note.md")
  168. self.assertEqual(note["created_at"], now_dt)
  169. self.assertEqual(note["modified_at"], now_dt)
  170. self.assertEqual(note["body"], 'new note')
  171. app.create_note(title='imported-note-no-ts.md', slug='imported-note-no-ts.md', body='new note no ts')
  172. note = app.get_note_details(slug="imported-note-no-ts.md")
  173. # can we get to the underlying event?
  174. self.assertEqual(note["created_at"], note["modified_at"])
  175. self.assertEqual(note["body"], 'new note no ts')
  176. # ----
  177. # Import a note with a retro dated creation event
  178. #
  179. # Re-implement the Application.save machinery and the create_note logic.
  180. created_at = datetime(2020,1,2)
  181. import_events = Note(
  182. slug='abc.md',
  183. body='abc note',
  184. created_at= created_at
  185. ).collect_events()
  186. import_events[0] = replace(import_events[0],
  187. timestamp = created_at)
  188. print(import_events)
  189. processing_event = ProcessingEvent()
  190. processing_event.collect_events(*import_events)
  191. recordings = app._record(processing_event)
  192. app._take_snapshots(processing_event)
  193. app._notify(recordings)
  194. app.notify(processing_event.events)
  195. note_logged = app.note_log.trigger_event(
  196. note_id=import_events[0].originator_id
  197. )
  198. index_entry = Index(import_events[0].slug,
  199. ref=import_events[0].originator_id
  200. )
  201. app.save(note_logged, index_entry)
  202. imported_note = app.get_note_details(slug = 'abc.md')
  203. print(imported_note)
  204. # ---
  205. # Get a stream of all events
  206. #print(analytics.recorder.select_notifications(0, 1000)[0])
  207. #print(app.recorder.protocol)
  208. runner.stop()
  209. if __name__ == '__main__':
  210. db_path = "./test_notes_app_es.db"
  211. os.environ["PERSISTENCE_MODULE"] = "eventsourcing.sqlite"
  212. os.environ["SQLITE_DBNAME"] = db_path
  213. os.environ["SQLITE_LOCK_TIMEOUT"] = "10"
  214. #os.environ["COMPRESSOR_TOPIC"] = "gzip"
  215. print(f'Using {db_path}')
  216. if os.path.exists(db_path):
  217. os.remove(db_path)
  218. unittest.main()