Bläddra i källkod

added python eventsourcing notes demo. focus: eventsourcing module, modeling notes domain, pytest

Harlan J. Iverson 1 år sedan
förälder
incheckning
e395d6d6f5

+ 25 - 0
2023-04-10/python-eventsourcing_notes/README.md

@@ -0,0 +1,25 @@
+# Event Sourced Notes App Demo
+
+Created an Event Sourced demo based on module documentation [1] [2].
+
+In a later Dev Practice I look at serializing custom Entitiy types [3].
+
+The project has very good documentation and the author seems to be responsive.
+
+## Usage
+
+Test with `python -m pytest test_notes_app_es.py`.
+
+## Dev Practice
+
+Focus:
+
+* Learning Python Event Sourcing module
+* Modeling a familiar domain (DDD)
+* PyTest
+
+# References
+
+[1]: https://eventsourcing.readthedocs.io/en/stable/topics/examples/content-management.html
+[2]: https://eventsourcing.readthedocs.io/en/stable/topics/system.html#system-of-applications
+[3]: https://gist.github.com/harlanji/8a4cd0adac32d626d052034bea560e08 "Gist: Python EventSourcing: DataClassAsDict Transcoding"

+ 1 - 0
2023-04-10/python-eventsourcing_notes/dev-requirements.txt

@@ -0,0 +1 @@
+pytest

+ 246 - 0
2023-04-10/python-eventsourcing_notes/notes_app_es.py

@@ -0,0 +1,246 @@
+from __future__ import annotations # do we need this?
+
+from contextvars import ContextVar
+
+from typing import Any, Dict, Iterator, Optional, Union, cast
+from typing import Optional, cast
+
+from dataclasses import dataclass, field
+
+from uuid import NAMESPACE_URL, UUID, uuid5
+
+from eventsourcing.domain import Aggregate, DomainEvent, event
+from eventsourcing.application import AggregateNotFound, Application, EventSourcedLog
+
+from eventsourcing.utils import EnvType
+from eventsourcing.domain import create_utc_datetime_now
+
+from eventsourcing.system import ProcessApplication
+from eventsourcing.dispatch import singledispatchmethod
+
+from diff_match_patch import diff_match_patch
+
+
+
+user_id_cvar: ContextVar[Optional[UUID]] = ContextVar("user_id", default=None)
+
+@dataclass
+class Note(Aggregate):
+    slug: str
+    # Not certain about this strategy combined with Event.apply using the event's TS
+    created_at: datetime = None #field(default_factory=create_utc_datetime_now, init=False)
+    modified_at: datetime = None #field(default_factory=create_utc_datetime_now, init=False)
+    body: str = ""
+    title: Optional[str] = None
+    modified_by: Optional[UUID] = field(default=None, init=False)
+
+    class Event(Aggregate.Event):
+        user_id: Optional[UUID] = field(default_factory=user_id_cvar.get, init=False)
+
+        def apply(self, aggregate: Aggregate) -> None:
+            if type(self) == Note.Created:
+                # Not 100% sure this is legit, default TS from the event...
+                # From the perspective of a subscriber it makes no sense, but then
+                # logical timestamps seem excessive/redundant.
+                if cast(Note, aggregate).created_at:
+                    print(f'Note.Created already has TS; ie. logically specified {self.originator_id}')
+                else:
+                    cast(Note, aggregate).created_at = self.timestamp
+            
+            cast(Note, aggregate).modified_at = self.timestamp
+            
+            cast(Note, aggregate).modified_by = self.user_id
+
+    @event("SlugUpdated")
+    def update_slug(self, slug: str) -> None:
+        self.slug = slug
+
+    @event("TitleUpdated")
+    def update_title(self, title: str) -> None:
+        self.title = title
+
+    def update_body(self, body: str) -> None:
+        self._update_body(create_diff(old=self.body, new=body))
+
+    @event("BodyUpdated")
+    def _update_body(self, diff: str) -> None:
+        self.body = apply_patch(old=self.body, diff=diff)
+
+@dataclass
+class Index(Aggregate):
+    slug: str
+    ref: Optional[UUID]
+
+    class Event(Aggregate.Event):
+        pass
+
+    @staticmethod
+    def create_id(slug: str) -> UUID:
+        return uuid5(NAMESPACE_URL, f"/slugs/{slug}")
+
+    @event("RefChanged")
+    def update_ref(self, ref: Optional[UUID]) -> None:
+        self.ref = ref
+        
+class NoteLogged(DomainEvent):
+    note_id: UUID
+    
+
+dmp = diff_match_patch()
+
+def create_diff(old: str, new: str) -> str:
+    patches = dmp.patch_make(old, new)
+    diff = dmp.patch_toText(patches)
+    
+    return diff
+
+
+def apply_patch(old: str, diff: str) -> str:
+    patches = dmp.patch_fromText(diff)
+    new_text, _ = dmp.patch_apply(patches, old)
+    
+    return new_text
+    
+
+
+
+# In practice we may create a ViewModel/TransferObject that mirrors the Aggregate or just use that.
+NoteDetailsType = Dict[str, Union[str, Any]]
+
+
+class NotesApplication(Application):
+    env = {} # {"COMPRESSOR_TOPIC": "gzip"}
+    snapshotting_intervals = {Note: 5}
+
+    def __init__(self, env: Optional[EnvType] = None) -> None:
+        super().__init__(env)
+        self.note_log: EventSourcedLog[NoteLogged] = EventSourcedLog(
+            self.events, uuid5(NAMESPACE_URL, "/note_log"), NoteLogged
+        )
+
+    def create_note(self, title: str, slug: str, body: Optional[str] = "", created_at: Optional[datetime] = None) -> None:
+        note = Note(title=title, slug=slug, body=body, created_at=created_at, modified_at=created_at)
+        note_logged = self.note_log.trigger_event(note_id=note.id) # timestamp=created_at fails
+        index_entry = Index(slug, ref=note.id)
+        self.save(note, note_logged, index_entry)
+
+    def get_note_details(self, slug: str) -> NoteDetailsType:
+        note = self._get_note_by_slug(slug)
+        return self._details_from_note(note)
+
+    def _details_from_note(self, note: Note) -> NoteDetailsType:
+        return {
+            "title": note.title,
+            "slug": note.slug,
+            "body": note.body,
+            "modified_by": note.modified_by,
+            "created_at": note.created_at,
+            "modified_at": note.modified_at,
+            
+        }
+
+    def update_title(self, slug: str, title: str) -> None:
+        note = self._get_note_by_slug(slug)
+        note.update_title(title=title)
+        self.save(note)
+
+    def update_slug(self, old_slug: str, new_slug: str) -> None:
+        note = self._get_note_by_slug(old_slug)
+        note.update_slug(new_slug)
+        old_index = self._get_index(old_slug)
+        old_index.update_ref(None)
+        try:
+            new_index = self._get_index(new_slug)
+        except AggregateNotFound:
+            new_index = Index(new_slug, note.id)
+        else:
+            if new_index.ref is None:
+                new_index.update_ref(note.id)
+            else:
+                raise SlugConflictError()
+        self.save(note, old_index, new_index)
+
+    def update_body(self, slug: str, body: str) -> None:
+        note = self._get_note_by_slug(slug)
+        note.update_body(body)
+        self.save(note)
+
+    def _get_note_by_slug(self, slug: str) -> Note:
+        try:
+            index = self._get_index(slug)
+        except AggregateNotFound:
+            raise NoteNotFound(slug)
+        if index.ref is None:
+            raise NoteNotFound(slug)
+        note_id = index.ref
+        return self._get_note_by_id(note_id)
+
+    def _get_note_by_id(self, note_id: UUID) -> Note:
+        return cast(Note, self.repository.get(note_id))
+
+    def _get_index(self, slug: str) -> Index:
+        return cast(Index, self.repository.get(Index.create_id(slug)))
+
+    def get_notes(
+        self,
+        gt: Optional[int] = None,
+        lte: Optional[int] = None,
+        desc: bool = False,
+        limit: Optional[int] = None,
+    ) -> Iterator[NoteDetailsType]:
+        for note_logged in self.note_log.get(gt, lte, desc, limit):
+            note = self._get_note_by_id(note_logged.note_id)
+            yield self._details_from_note(note)
+
+
+class NoteNotFound(Exception):
+    """
+    Raised when a note is not found.
+    """
+
+
+class SlugConflictError(Exception):
+    """
+    Raised when updating a note to a slug used by another note.
+    """
+
+class Counter(Aggregate):
+    def __init__(self, name):
+        self.name = name
+        self.count = 0
+
+    @classmethod
+    def create_id(cls, name):
+        return uuid5(NAMESPACE_URL, f'/counters/{name}')
+
+    @event('Incremented')
+    def increment(self):
+        self.count += 1
+
+class NoteAnalytics(ProcessApplication):
+    @singledispatchmethod
+    def policy(self, domain_event, process_event):
+        """Default policy"""
+        
+    @policy.register(Note.BodyUpdated)
+    def _(self, domain_event, process_event):
+        note_id = domain_event.originator_id
+        print(f"NoteAnalytics: Note.BodyUpdated: {note_id}")
+        try:
+            counter_id = Counter.create_id(note_id)
+            counter = self.repository.get(counter_id)
+        except AggregateNotFound:
+            counter = Counter(note_id)
+        counter.increment()
+        
+        print(f"  Count = {counter.count}")
+        
+        process_event.collect_events(counter)
+
+    def get_count(self, note_id):
+        counter_id = Counter.create_id(note_id)
+        try:
+            counter = self.repository.get(counter_id)
+        except AggregateNotFound:
+            return 0
+        return counter.count

+ 2 - 0
2023-04-10/python-eventsourcing_notes/requirements.txt

@@ -0,0 +1,2 @@
+eventsourcing
+diff_match_patch

+ 293 - 0
2023-04-10/python-eventsourcing_notes/test_notes_app_es.py

@@ -0,0 +1,293 @@
+from dataclasses import replace
+from datetime import datetime
+import os
+
+
+from typing import cast
+import unittest
+from unittest import TestCase
+from uuid import uuid4
+
+from notes_app_es import (
+    NotesApplication,
+    NoteNotFound,
+    SlugConflictError,
+    
+)
+from notes_app_es import (
+    Index,
+    Note,
+    user_id_cvar,
+)
+
+from notes_app_es import (
+    NoteAnalytics,
+    Counter,
+)
+
+from eventsourcing.system import NotificationLogReader
+from eventsourcing.domain import create_utc_datetime_now
+
+from eventsourcing.system import System, SingleThreadedRunner
+
+
+from eventsourcing.application import ProcessingEvent
+
+class TestContentManagement(TestCase):
+    def test(self) -> None:
+        # Set user_id context variable.
+        user_id = uuid4()
+        user_id_cvar.set(user_id)
+
+        # Construct application.
+        #app = NotesApplication()
+        #analytics = NoteAnalytics()
+        #system = System(pipes=[[app, analytics]])
+        
+        system = System(pipes=[[NotesApplication, NoteAnalytics]])
+        
+        runner = SingleThreadedRunner(system)
+        runner.start()
+        
+        app = runner.get(NotesApplication)
+        analytics = runner.get(NoteAnalytics)
+
+        # Check the note doesn't exist.
+        with self.assertRaises(NoteNotFound):
+            app.get_note_details(slug="welcome")
+
+        # Check the list of notes is empty.
+        notes = list(app.get_notes())
+        self.assertEqual(len(notes), 0)
+
+        # Create a note.
+        app.create_note(title="Welcome", slug="welcome")
+
+        # Present note identified by the given slug.
+        note = app.get_note_details(slug="welcome")
+        
+        #print(note)
+
+        # Check we got a dict that has the given title and slug.
+        self.assertEqual(note["title"], "Welcome")
+        self.assertEqual(note["slug"], "welcome")
+        self.assertEqual(note["body"], "")
+        self.assertEqual(note["modified_by"], user_id)
+
+        # Update the title.
+        app.update_title(slug="welcome", title="Welcome Visitors")
+
+        # Check the title was updated.
+        note = app.get_note_details(slug="welcome")
+        self.assertEqual(note["title"], "Welcome Visitors")
+        self.assertEqual(note["modified_by"], user_id)
+
+        # Update the slug.
+        app.update_slug(old_slug="welcome", new_slug="welcome-visitors")
+
+        # Check the index was updated.
+        with self.assertRaises(NoteNotFound):
+            app.get_note_details(slug="welcome")
+
+        # Check we can get the note by the new slug.
+        note = app.get_note_details(slug="welcome-visitors")
+        self.assertEqual(note["title"], "Welcome Visitors")
+        self.assertEqual(note["slug"], "welcome-visitors")
+
+        # Update the body.
+        app.update_body(slug="welcome-visitors", body="Welcome to my wiki")
+
+        # Check the body was updated.
+        note = app.get_note_details(slug="welcome-visitors")
+        self.assertEqual(note["body"], "Welcome to my wiki")
+
+        # Update the body.
+        app.update_body(slug="welcome-visitors", body="Welcome to this wiki")
+
+        # Check the body was updated.
+        note = app.get_note_details(slug="welcome-visitors")
+        self.assertEqual(note["body"], "Welcome to this wiki")
+
+        # Update the body.
+        app.update_body(
+            slug="welcome-visitors",
+            body="""
+Welcome to this wiki!
+
+This is a wiki about...
+""",
+        )
+
+        # Check the body was updated.
+        note = app.get_note_details(slug="welcome-visitors")
+        self.assertEqual(
+            note["body"],
+            """
+Welcome to this wiki!
+
+This is a wiki about...
+""",
+        )
+
+        # Check all the Note events have the user_id.
+        for notification in NotificationLogReader(app.notification_log).read(start=1):
+            domain_event = app.mapper.to_domain_event(notification)
+            if isinstance(domain_event, Note.Event):
+                self.assertEqual(domain_event.user_id, user_id)
+
+        # Change user_id context variable.
+        user_id = uuid4()
+        user_id_cvar.set(user_id)
+
+        # Update the body.
+        app.update_body(
+            slug="welcome-visitors",
+            body="""
+Welcome to this wiki!
+
+This is a wiki about us!
+""",
+        )
+
+        # Check 'modified_by' changed.
+        note = app.get_note_details(slug="welcome-visitors")
+        self.assertEqual(note["title"], "Welcome Visitors")
+        self.assertEqual(note["modified_by"], user_id)
+
+        # Check a snapshot was created by now.
+        assert app.snapshots
+        index = cast(Index, app.repository.get(Index.create_id("welcome-visitors")))
+        assert index.ref
+        self.assertTrue(len(list(app.snapshots.get(index.ref))))
+
+        # Create some more pages and list all the pages.
+        app.create_note("Note 2", "note-2")
+        app.create_note("Note 3", "note-3")
+        app.create_note("Note 4", "note-4")
+        app.create_note("Note 5", "note-5")
+        
+        app.create_note(None, "note-no-title")
+        
+        notes = list(app.get_notes(desc=True))
+        self.assertEqual(notes[1]["title"], "Note 5")
+        self.assertEqual(notes[1]["slug"], "note-5")
+        self.assertEqual(notes[2]["title"], "Note 4")
+        self.assertEqual(notes[2]["slug"], "note-4")
+        self.assertEqual(notes[3]["title"], "Note 3")
+        self.assertEqual(notes[3]["slug"], "note-3")
+        self.assertEqual(notes[4]["title"], "Note 2")
+        self.assertEqual(notes[4]["slug"], "note-2")
+        self.assertEqual(notes[5]["title"], "Welcome Visitors")
+        self.assertEqual(notes[5]["slug"], "welcome-visitors")
+        
+        self.assertEqual(notes[0]["title"], None)
+        self.assertEqual(notes[0]["slug"], "note-no-title")
+        
+        notes = list(app.get_notes(desc=True, limit=3))
+        self.assertEqual(len(notes), 3)
+        self.assertEqual(notes[0]["slug"], "note-no-title")
+        self.assertEqual(notes[1]["slug"], "note-5")
+        self.assertEqual(notes[2]["slug"], "note-4")
+
+        notes = list(app.get_notes(desc=True, limit=3, lte=2))
+        self.assertEqual(len(notes), 2)
+        self.assertEqual(notes[0]["slug"], "note-2")
+        self.assertEqual(notes[1]["slug"], "welcome-visitors")
+
+        notes = list(app.get_notes(desc=True, lte=2))
+        self.assertEqual(len(notes), 2)
+        self.assertEqual(notes[0]["slug"], "note-2")
+        self.assertEqual(notes[1]["slug"], "welcome-visitors")
+
+        # Check we can't change the slug of a note to one
+        # that is being used by another note.
+        with self.assertRaises(SlugConflictError):
+            app.update_slug("note-2", "note-3")
+
+        # Check we can change the slug of a note to one
+        # that was previously being used.
+        app.update_slug("welcome-visitors", "welcome")
+
+        note = app.get_note_details(slug="welcome")
+        self.assertEqual(note["title"], "Welcome Visitors")
+        self.assertEqual(note["modified_by"], user_id)
+        
+        now_dt = create_utc_datetime_now()
+        app.create_note(title='imported-note.md', slug='imported-note.md', created_at=now_dt, body='new note')
+        
+        note = app.get_note_details(slug="imported-note.md")
+        
+        self.assertEqual(note["created_at"], now_dt)
+        self.assertEqual(note["modified_at"], now_dt)
+        self.assertEqual(note["body"], 'new note')
+        
+        app.create_note(title='imported-note-no-ts.md', slug='imported-note-no-ts.md', body='new note no ts')
+        
+        note = app.get_note_details(slug="imported-note-no-ts.md")
+        
+        # can we get to the underlying event?
+        self.assertEqual(note["created_at"], note["modified_at"])
+        self.assertEqual(note["body"], 'new note no ts')
+        
+        # ----
+        # Import a note with a retro dated creation event
+        #
+        # Re-implement the Application.save machinery and the create_note logic.
+        
+        created_at = datetime(2020,1,2)
+        import_events = Note(
+                slug='abc.md', 
+                body='abc note', 
+                created_at= created_at
+                ).collect_events()
+            
+        import_events[0] = replace(import_events[0], 
+                                timestamp = created_at)
+        
+        print(import_events)
+        
+        processing_event = ProcessingEvent()
+        processing_event.collect_events(*import_events)
+        
+        recordings = app._record(processing_event)
+        app._take_snapshots(processing_event)
+        app._notify(recordings)
+        app.notify(processing_event.events)
+        
+        note_logged = app.note_log.trigger_event(
+            note_id=import_events[0].originator_id
+            )
+        index_entry = Index(import_events[0].slug, 
+            ref=import_events[0].originator_id
+            )
+        
+        app.save(note_logged, index_entry)
+        
+        imported_note = app.get_note_details(slug = 'abc.md')
+        
+        print(imported_note)
+        
+        # ---
+        
+        
+        # Get a stream of all events
+        #print(analytics.recorder.select_notifications(0, 1000)[0])
+        #print(app.recorder.protocol)
+        
+        runner.stop()
+        
+        
+if __name__ == '__main__':
+    db_path = "./test_notes_app_es.db"
+    
+    os.environ["PERSISTENCE_MODULE"] = "eventsourcing.sqlite"
+    os.environ["SQLITE_DBNAME"] = db_path
+    os.environ["SQLITE_LOCK_TIMEOUT"] = "10"
+    #os.environ["COMPRESSOR_TOPIC"] = "gzip"
+    
+    print(f'Using {db_path}')
+    
+    if os.path.exists(db_path):
+        os.remove(db_path)
+    
+    unittest.main()