浏览代码

Add support for FS-based persistent backlog

Daniel Gruno 5 年之前
父节点
当前提交
4e6877643f
共有 3 个文件被更改,包括 57 次插入6 次删除
  1. 54 4
      pypubsub.py
  2. 1 1
      pypubsub.yaml
  3. 2 1
      requirements.txt

+ 54 - 4
pypubsub.py

@@ -19,6 +19,8 @@
 """PyPubSub - a simple publisher/subscriber service written in Python 3"""
 """PyPubSub - a simple publisher/subscriber service written in Python 3"""
 import asyncio
 import asyncio
 import aiohttp.web
 import aiohttp.web
+import aiofile
+import os
 import time
 import time
 import json
 import json
 import yaml
 import yaml
@@ -31,7 +33,7 @@ import plugins.ldap
 import plugins.sqs
 import plugins.sqs
 
 
 # Some consts
 # Some consts
-PUBSUB_VERSION = '0.6.0'
+PUBSUB_VERSION = '0.6.1'
 PUBSUB_CONTENT_TYPE = 'application/vnd.pypubsub-stream'
 PUBSUB_CONTENT_TYPE = 'application/vnd.pypubsub-stream'
 PUBSUB_DEFAULT_PORT = 2069
 PUBSUB_DEFAULT_PORT = 2069
 PUBSUB_DEFAULT_IP = '0.0.0.0'
 PUBSUB_DEFAULT_IP = '0.0.0.0'
@@ -65,7 +67,7 @@ class Configuration:
         self.server.payload_limit = int(yml['server'].get('max_payload_size', PUBSUB_DEFAULT_MAX_PAYLOAD_SIZE))
         self.server.payload_limit = int(yml['server'].get('max_payload_size', PUBSUB_DEFAULT_MAX_PAYLOAD_SIZE))
 
 
         # Backlog settings
         # Backlog settings
-        self.backlog = collections.namedtuple('backlogConfig', 'max_age queue_size')
+        self.backlog = collections.namedtuple('backlogConfig', 'max_age queue_size storage')
         bma = yml['server'].get('backlog', {}).get('max_age', PUBSUB_DEFAULT_BACKLOG_AGE)
         bma = yml['server'].get('backlog', {}).get('max_age', PUBSUB_DEFAULT_BACKLOG_AGE)
         if isinstance(bma, str):
         if isinstance(bma, str):
             bma = bma.lower()
             bma = bma.lower()
@@ -80,6 +82,7 @@ class Configuration:
         self.backlog.max_age = bma
         self.backlog.max_age = bma
         self.backlog.queue_size = yml['server'].get('backlog', {}).get('size',
         self.backlog.queue_size = yml['server'].get('backlog', {}).get('size',
                                                                        PUBSUB_DEFAULT_BACKLOG_SIZE)
                                                                        PUBSUB_DEFAULT_BACKLOG_SIZE)
+        self.backlog.storage = yml['server'].get('backlog', {}).get('storage')
 
 
         # Payloaders - clients that can post payloads
         # Payloaders - clients that can post payloads
         self.payloaders = [netaddr.IPNetwork(x) for x in yml['clients'].get('payloaders', [])]
         self.payloaders = [netaddr.IPNetwork(x) for x in yml['clients'].get('payloaders', [])]
@@ -216,6 +219,50 @@ class Server:
             resp = aiohttp.web.Response(headers=headers, status=400, text=PUBSUB_BAD_REQUEST)
             resp = aiohttp.web.Response(headers=headers, status=400, text=PUBSUB_BAD_REQUEST)
             return resp
             return resp
 
 
+    async def write_backlog_storage(self):
+        previous_backlog = []
+        while True:
+            if self.config.backlog.storage:
+                try:
+                    backlog_list = self.backlog.copy()
+                    if backlog_list != previous_backlog:
+                        previous_backlog = backlog_list
+                        async with aiofile.AIOFile(self.config.backlog.storage, 'w+') as afp:
+                            offset = 0
+                            for item in backlog_list:
+                                js =json.dumps({
+                                    'timestamp': item.timestamp,
+                                    'topics': item.topics,
+                                    'json': item.json,
+                                    'private': item.private
+                                }) + '\n'
+                                await afp.write(js, offset=offset)
+                                offset += len(js)
+                            await afp.fsync()
+                except Exception as e:
+                    print(f"Could not write to backlog file {self.config.backlog.storage}: {e}")
+            await asyncio.sleep(10)
+
+    def read_backlog_storage(self):
+        if os.path.exists(self.config.backlog.storage):
+            try:
+                readlines = 0
+                with open(self.config.backlog.storage, 'r') as fp:
+                    for line in fp.readlines():
+                        js = json.loads(line)
+                        readlines += 1
+                        ppath = "/".join(js['topics'])
+                        if js['private']:
+                            ppath = '/private/' + ppath
+                        payload = Payload(ppath, js['json'], js['timestamp'])
+                        self.backlog.append(payload)
+                        if self.config.backlog.queue_size < len(self.backlog):
+                            self.backlog.pop(0)
+            except Exception as e:
+                print(f"Error while reading backlog: {e}")
+
+            print(f"Read {readlines} objects from {self.config.backlog.storage}, applied {len(self.backlog)} to backlog.")
+
     async def server_loop(self, loop):
     async def server_loop(self, loop):
         self.server = aiohttp.web.Server(self.handle_request)
         self.server = aiohttp.web.Server(self.handle_request)
         runner = aiohttp.web.ServerRunner(self.server)
         runner = aiohttp.web.ServerRunner(self.server)
@@ -228,6 +275,8 @@ class Server:
         if self.config.sqs:
         if self.config.sqs:
             for key, config in self.config.sqs.items():
             for key, config in self.config.sqs.items():
                 loop.create_task(plugins.sqs.get_payloads(self, config))
                 loop.create_task(plugins.sqs.get_payloads(self, config))
+        self.read_backlog_storage()
+        loop.create_task(self.write_backlog_storage())
         await self.poll()
         await self.poll()
 
 
     def run(self):
     def run(self):
@@ -239,6 +288,7 @@ class Server:
         loop.close()
         loop.close()
 
 
 
 
+
 class Subscriber:
 class Subscriber:
     """Basic subscriber (client) class. Holds information about the connection and ACL"""
     """Basic subscriber (client) class. Holds information about the connection and ACL"""
 
 
@@ -307,9 +357,9 @@ class Subscriber:
 class Payload:
 class Payload:
     """A payload (event) object sent by a registered publisher."""
     """A payload (event) object sent by a registered publisher."""
 
 
-    def __init__(self, path, data):
+    def __init__(self, path, data, timestamp=None):
         self.json = data
         self.json = data
-        self.timestamp = time.time()
+        self.timestamp = timestamp or time.time()
         self.topics = [x for x in path.split('/') if x]
         self.topics = [x for x in path.split('/') if x]
         self.private = False
         self.private = False
 
 

+ 1 - 1
pypubsub.yaml

@@ -6,7 +6,7 @@ server:
   backlog:
   backlog:
     size:           0   # Max number of payloads to keep in backlog cache (set to 0 to disable)
     size:           0   # Max number of payloads to keep in backlog cache (set to 0 to disable)
     max_age:      48h   # Maximum age of a backlog item before culling it (set to 0 to never prune on age)
     max_age:      48h   # Maximum age of a backlog item before culling it (set to 0 to never prune on age)
-
+#    storage: backlog.json  # File-storage for backlog between restarts. Comment out to disable.
 # Client settings
 # Client settings
 clients:
 clients:
   # Payloaders are clients that are allowed to push events (CIDR-based)
   # Payloaders are clients that are allowed to push events (CIDR-based)

+ 2 - 1
requirements.txt

@@ -4,4 +4,5 @@ netaddr>=0.7.19
 python-ldap>=3.0.0
 python-ldap>=3.0.0
 PyYAML~=5.1.2
 PyYAML~=5.1.2
 aiobotocore~=1.0.4
 aiobotocore~=1.0.4
-botocore~=1.15.32
+botocore~=1.16.4
+aiofile~=1.5.2