瀏覽代碼

Change from generic list to asyncio queue for pending events

Using a generic list can lead to race conditions.
While it could be solved with a .copy() and subsequent .remove(payload)
from the origin list after processing, using the asyncio queue means we
can skip the sleep calls, as .get() will wait for an item to appear.
Daniel Gruno 4 年之前
父節點
當前提交
0a09b1840e
共有 3 個文件被更改,包括 21 次插入16 次删除
  1. 4 0
      CHANGELOG.md
  2. 1 1
      plugins/sqs.py
  3. 16 15
      pypubsub.py

+ 4 - 0
CHANGELOG.md

@@ -1,3 +1,7 @@
+# 0.7.1
+- Use asyncio queues for moifying the list of events pending publishing to avoid potential race conditions.
+- Minor tweak to the handling of backlog size   
+
 # 0.7.0
 - Allow for multiple subscriptions per stream
 

+ 1 - 1
plugins/sqs.py

@@ -67,7 +67,7 @@ async def get_payloads(server, config):
                                 js = json.loads(body)
                                 path = js.get('pubsub_path', '/')  # Default to catch-all pubsub topic
                                 payload = pypubsub.Payload(path, js)
-                                server.pending_events.append(payload)
+                                server.pending_events.put(payload)
                                 backlog_size = server.config.backlog.queue_size
                                 if backlog_size > 0:
                                     server.backlog.append(payload)

+ 16 - 15
pypubsub.py

@@ -33,7 +33,7 @@ import plugins.ldap
 import plugins.sqs
 
 # Some consts
-PUBSUB_VERSION = '0.7.0'
+PUBSUB_VERSION = '0.7.1'
 PUBSUB_CONTENT_TYPE = 'application/vnd.pypubsub-stream'
 PUBSUB_DEFAULT_PORT = 2069
 PUBSUB_DEFAULT_IP = '0.0.0.0'
@@ -97,7 +97,7 @@ class Server:
         self.yaml = yaml.safe_load(open(args.config))
         self.config = Configuration(self.yaml)
         self.subscribers = []
-        self.pending_events = []
+        self.pending_events = asyncio.Queue()
         self.backlog = []
         self.last_ping = time.time()
         self.server = None
@@ -111,17 +111,18 @@ class Server:
     async def poll(self):
         """Polls for new stuff to publish, and if found, publishes to whomever wants it."""
         while True:
-            for payload in self.pending_events:
-                bad_subs = await payload.publish(self.subscribers)
-                # Cull subscribers we couldn't deliver payload to.
-                for bad_sub in bad_subs:
-                    print("Culling %r due to connection errors" % bad_sub)
-                    try:
-                        self.subscribers.remove(bad_sub)
-                    except ValueError:  # Already removed elsewhere
-                        pass
-            self.pending_events = []
-            await asyncio.sleep(0.1)
+            payload = await self.pending_events.get()
+            bad_subs = await payload.publish(self.subscribers)
+            self.pending_events.task_done()
+
+            # Cull subscribers we couldn't deliver payload to.
+            for bad_sub in bad_subs:
+                print("Culling %r due to connection errors" % bad_sub)
+                try:
+                    self.subscribers.remove(bad_sub)
+                except ValueError:  # Already removed elsewhere
+                    pass
+
 
     async def handle_request(self, request):
         """Generic handler for all incoming HTTP requests"""
@@ -152,12 +153,12 @@ class Server:
                     as_json = json.loads(body)
                     assert isinstance(as_json, dict)  # Payload MUST be an dictionary object, {...}
                     pl = Payload(request.path, as_json)
-                    self.pending_events.append(pl)
+                    self.pending_events.put_nowait(pl)
                     # Add to backlog?
                     if self.config.backlog.queue_size > 0:
                         self.backlog.append(pl)
                         # If backlog has grown too large, delete the first (oldest) item in it.
-                        if len(self.backlog) > self.config.backlog.queue_size:
+                        while len(self.backlog) > self.config.backlog.queue_size:
                             del self.backlog[0]
 
                     resp = aiohttp.web.Response(headers=headers, status=202, text=PUBSUB_PAYLOAD_RECEIVED)