Bläddra i källkod

Fix issue with client pipes stalling the payload delivery

Daniel Gruno 4 år sedan
förälder
incheckning
1a0364db55
2 ändrade filer med 18 tillägg och 8 borttagningar
  1. 3 0
      CHANGELOG.md
  2. 15 8
      pypubsub.py

+ 3 - 0
CHANGELOG.md

@@ -1,3 +1,6 @@
+# 0.6.2
+- Fixed an issue with payload delivery stalling due to client pipe timeouts
+
 # 0.6.1
 - Added FS-backed persistant backlog storage for persisting backlog through restarts
 

+ 15 - 8
pypubsub.py

@@ -33,7 +33,7 @@ import plugins.ldap
 import plugins.sqs
 
 # Some consts
-PUBSUB_VERSION = '0.6.1'
+PUBSUB_VERSION = '0.6.2'
 PUBSUB_CONTENT_TYPE = 'application/vnd.pypubsub-stream'
 PUBSUB_DEFAULT_PORT = 2069
 PUBSUB_DEFAULT_IP = '0.0.0.0'
@@ -46,7 +46,7 @@ PUBSUB_PAYLOAD_RECEIVED = "Payload received, thank you very much!\n"
 PUBSUB_NOT_ALLOWED = "You are not authorized to deliver payloads!\n"
 PUBSUB_BAD_PAYLOAD = "Bad payload type. Payloads must be JSON dictionary objects, {..}!\n"
 PUBSUB_PAYLOAD_TOO_LARGE = "Payload is too large for me to serve, please make it shorter.\n"
-
+PUBSUB_WRITE_TIMEOUT = 0.35  # If we can't deliver to a pipe within N seconds, drop it.
 
 class Configuration:
     def __init__(self, yml):
@@ -116,9 +116,12 @@ class Server:
                 # Cull subscribers we couldn't deliver payload to.
                 for bad_sub in bad_subs:
                     print("Culling %r due to connection errors" % bad_sub)
-                    self.subscribers.remove(bad_sub)
+                    try:
+                        self.subscribers.remove(bad_sub)
+                    except ValueError:  # Already removed elsewhere
+                        pass
             self.pending_events = []
-            await asyncio.sleep(0.5)
+            await asyncio.sleep(0.1)
 
     async def handle_request(self, request):
         """Generic handler for all incoming HTTP requests"""
@@ -244,7 +247,7 @@ class Server:
             await asyncio.sleep(10)
 
     def read_backlog_storage(self):
-        if os.path.exists(self.config.backlog.storage):
+        if self.config.backlog.storage and os.path.exists(self.config.backlog.storage):
             try:
                 readlines = 0
                 with open(self.config.backlog.storage, 'r') as fp:
@@ -296,6 +299,7 @@ class Subscriber:
         self.connection = connection
         self.acl = {}
         self.server = server
+        self.lock = asyncio.Lock()
 
         # Set topics subscribed to
         self.topics = [x for x in request.path.split('/') if x]
@@ -351,7 +355,8 @@ class Subscriber:
         js = b"%s\n" % json.dumps({"stillalive": time.time()}).encode('utf-8')
         if self.old_school:
             js += b"\0"
-        await self.connection.write(js)
+        async with self.lock:
+            await asyncio.wait_for(self.connection.write(js), timeout=PUBSUB_WRITE_TIMEOUT)
 
 
 class Payload:
@@ -391,9 +396,11 @@ class Payload:
             if all(el in self.topics for el in sub.topics):
                 try:
                     if sub.old_school:
-                        await sub.connection.write(ojs)
+                        async with sub.lock:
+                            await asyncio.wait_for(sub.connection.write(ojs), timeout=PUBSUB_WRITE_TIMEOUT)
                     else:
-                        await sub.connection.write(js)
+                        async with sub.lock:
+                            await asyncio.wait_for(sub.connection.write(js), timeout=PUBSUB_WRITE_TIMEOUT)
                 except Exception:
                     bad_subs.append(sub)
         return bad_subs