Преглед изворни кода

better handling of dislodged (errored) subscriber connections

Daniel Gruno пре 5 година
родитељ
комит
3aea443615
1 измењених фајлова са 14 додато и 10 уклоњено
  1. 14 10
      pypubsub.py

+ 14 - 10
pypubsub.py

@@ -29,7 +29,7 @@ import argparse
 import plugins.ldap
 
 # Some consts
-PUBSUB_VERSION = '0.4.4'
+PUBSUB_VERSION = '0.4.5'
 PUBSUB_BAD_REQUEST = "I could not understand your request, sorry! Please see https://pubsub.apache.org/api.html \
 for usage documentation.\n"
 PUBSUB_PAYLOAD_RECEIVED = "Payload received, thank you very much!\n"
@@ -61,7 +61,11 @@ class Server:
         """Polls for new stuff to publish, and if found, publishes to whomever wants it."""
         while True:
             for payload in self.pending_events:
-                await payload.publish(self.subscribers)
+                bad_subs = await payload.publish(self.subscribers)
+                # Cull subscribers we couldn't deliver payload to.
+                for bad_sub in bad_subs:
+                    print("Culling %r due to connection errors" % bad_sub)
+                    self.subscribers.remove(bad_sub)
             self.pending_events = []
             await asyncio.sleep(0.5)
 
@@ -121,11 +125,14 @@ class Server:
                 await resp.prepare(request)
                 while True:
                     await subscriber.ping()
+                    if subscriber not in self.subscribers: # If we got dislodged somehow, end session
+                        break
                     await asyncio.sleep(5)
             # We may get exception types we don't have imported, so grab ANY exception and kick out the subscriber
             except:
                 pass
-            self.subscribers.remove(subscriber)
+            if subscriber in self.subscribers:
+                self.subscribers.remove(subscriber)
             return resp
         elif request.method == 'HEAD':
             resp = aiohttp.web.Response(headers=headers, status=204, text="")
@@ -236,6 +243,7 @@ class Payload:
         """Publishes an object to all subscribers using those topics (or a sub-set thereof)"""
         js = b"%s\n" % json.dumps(self.json).encode('utf-8')
         ojs = js + b"\0"
+        bad_subs = []
         for sub in subscribers:
             # If a private payload, check ACL and bail if not a match
             if self.private:
@@ -253,13 +261,9 @@ class Payload:
                         await sub.connection.write(ojs)
                     else:
                         await sub.connection.write(js)
-                except ConnectionResetError:
-                    pass
-                except RuntimeError:
-                    pass
-                except AssertionError:  # drain helper throws these sometimes
-                    pass
-
+                except Exception:
+                    bad_subs.append(sub)
+        return bad_subs
 
 if __name__ == '__main__':
     parser = argparse.ArgumentParser()