浏览代码

Allow for multiple subscriptions in a single stream

Daniel Gruno 4 年之前
父节点
当前提交
7f10ec6b85
共有 3 个文件被更改,包括 29 次插入12 次删除
  1. 2 0
      CHANGELOG.md
  2. 10 0
      README.md
  3. 17 12
      pypubsub.py

+ 2 - 0
CHANGELOG.md

@@ -1,3 +1,5 @@
+# 0.7.0
+- Allow for multiple subscriptions per stream
 
 
 # 0.6.3
 # 0.6.3
 - Fixed an issue with payload delivery stalling due to client pipe timeouts
 - Fixed an issue with payload delivery stalling due to client pipe timeouts

+ 10 - 0
README.md

@@ -6,6 +6,7 @@
 - [Introduction](#introduction)
 - [Introduction](#introduction)
 - [Installing](#installing)
 - [Installing](#installing)
 - [Topics and publishing/subscribing](#topics-and-publishingsubscribing)
 - [Topics and publishing/subscribing](#topics-and-publishingsubscribing)
+  * [Subscribing to multiple topic streams](#subscribing-to-multiple-topic-streams)
 - [Pushing an event to PyPubSub](#pushing-an-event-to-pypubsub)
 - [Pushing an event to PyPubSub](#pushing-an-event-to-pypubsub)
   * [Pushing an event via Python](#pushing-an-event-via-python)
   * [Pushing an event via Python](#pushing-an-event-via-python)
 - [Listening for events](#listening-for-events)
 - [Listening for events](#listening-for-events)
@@ -52,6 +53,15 @@ The below matrix shows how subscription paths match topics:
 | fruits + apples + red | ✓ | ✓ | ✓ | ✗ | ✓ |
 | fruits + apples + red | ✓ | ✓ | ✓ | ✗ | ✓ |
 | fruits + oranges | ✓ | ✗ | ✗ | ✓ | ✗ |
 | fruits + oranges | ✓ | ✗ | ✗ | ✓ | ✗ |
 
 
+### Subscribing to multiple topic streams
+As mentioned above, subscription topics are typically AND'ed together, with more topics narrowing the 
+event stream. Thus, if you wanted to subscribe to `bar` OR `foo`, you would need two streams.
+
+It is possible (from 0.7.0 onwards) to utilize a single stream to subscribe to multiple topic streams at once,
+using a comma as a delimiter between topic batches. For instance, to subscribe to both 
+`apples/red` events AND `oranges/ripe` events, you may subscribe to:
+
+`http://localhost:2069/apples/red,oranges/ripe`.
 
 
 ## Pushing an event to PyPubSub
 ## Pushing an event to PyPubSub
 Event payloads requires that the IP or IP range (Ipv4 or IPv6) is listed in `pypubsub.yaml` under `payloaders` first.
 Event payloads requires that the IP or IP range (Ipv4 or IPv6) is listed in `pypubsub.yaml` under `payloaders` first.

+ 17 - 12
pypubsub.py

@@ -33,7 +33,7 @@ import plugins.ldap
 import plugins.sqs
 import plugins.sqs
 
 
 # Some consts
 # Some consts
-PUBSUB_VERSION = '0.6.3'
+PUBSUB_VERSION = '0.7.0'
 PUBSUB_CONTENT_TYPE = 'application/vnd.pypubsub-stream'
 PUBSUB_CONTENT_TYPE = 'application/vnd.pypubsub-stream'
 PUBSUB_DEFAULT_PORT = 2069
 PUBSUB_DEFAULT_PORT = 2069
 PUBSUB_DEFAULT_IP = '0.0.0.0'
 PUBSUB_DEFAULT_IP = '0.0.0.0'
@@ -302,7 +302,10 @@ class Subscriber:
         self.lock = asyncio.Lock()
         self.lock = asyncio.Lock()
 
 
         # Set topics subscribed to
         # Set topics subscribed to
-        self.topics = [x for x in request.path.split('/') if x]
+        self.topics = []
+        for topic_batch in request.path.split(','):
+            sub_to = [x for x in topic_batch.split('/') if x]
+            self.topics.append(sub_to)
 
 
         # Is the client old and expecting zero-terminators?
         # Is the client old and expecting zero-terminators?
         self.old_school = False
         self.old_school = False
@@ -393,16 +396,18 @@ class Payload:
                 if not can_see:
                 if not can_see:
                     continue
                     continue
             # If subscribed to all the topics, tell a subscriber about this
             # If subscribed to all the topics, tell a subscriber about this
-            if all(el in self.topics for el in sub.topics):
-                try:
-                    if sub.old_school:
-                        async with sub.lock:
-                            await asyncio.wait_for(sub.connection.write(ojs), timeout=PUBSUB_WRITE_TIMEOUT)
-                    else:
-                        async with sub.lock:
-                            await asyncio.wait_for(sub.connection.write(js), timeout=PUBSUB_WRITE_TIMEOUT)
-                except Exception:
-                    bad_subs.append(sub)
+            for topic_batch in sub.topics:
+                if all(el in self.topics for el in topic_batch):
+                    try:
+                        if sub.old_school:
+                            async with sub.lock:
+                                await asyncio.wait_for(sub.connection.write(ojs), timeout=PUBSUB_WRITE_TIMEOUT)
+                        else:
+                            async with sub.lock:
+                                await asyncio.wait_for(sub.connection.write(js), timeout=PUBSUB_WRITE_TIMEOUT)
+                    except Exception:
+                        bad_subs.append(sub)
+                    break
         return bad_subs
         return bad_subs