|
@@ -33,6 +33,7 @@ import plugins.ldap
|
|
|
import plugins.sqs
|
|
|
import typing
|
|
|
import signal
|
|
|
+import uuid
|
|
|
|
|
|
|
|
|
PUBSUB_VERSION = '0.7.2'
|
|
@@ -237,8 +238,9 @@ class Server:
|
|
|
await resp.prepare(request)
|
|
|
|
|
|
|
|
|
- backlog = request.headers.get('X-Fetch-Since')
|
|
|
- if backlog:
|
|
|
+ epoch_based_backlog = request.headers.get('X-Fetch-Since')
|
|
|
+ cursor_based_backlog = request.headers.get('X-Fetch-Since-Cursor')
|
|
|
+ if epoch_based_backlog:
|
|
|
try:
|
|
|
backlog_ts = int(backlog)
|
|
|
except ValueError:
|
|
@@ -250,6 +252,15 @@ class Server:
|
|
|
for item in self.backlog:
|
|
|
if item.timestamp >= backlog_ts:
|
|
|
await item.publish([subscriber])
|
|
|
+
|
|
|
+ if cursor_based_backlog and len(cursor_based_backlog) == 36:
|
|
|
+
|
|
|
+ is_after_cursor = False
|
|
|
+ for item in self.backlog:
|
|
|
+ if item.cursor == cursor_based_backlog:
|
|
|
+ is_after_cursor = True
|
|
|
+ elif is_after_cursor:
|
|
|
+ await item.publish([subscriber])
|
|
|
|
|
|
while True:
|
|
|
await subscriber.ping()
|
|
@@ -427,15 +438,18 @@ class Payload:
|
|
|
self.timestamp = timestamp or time.time()
|
|
|
self.topics = [x for x in path.split('/') if x]
|
|
|
self.private = False
|
|
|
+ self.cursor = str(uuid.uuid4())
|
|
|
|
|
|
|
|
|
if self.topics and self.topics[0] == 'private':
|
|
|
self.private = True
|
|
|
del self.topics[0]
|
|
|
|
|
|
+
|
|
|
self.json['pubsub_timestamp'] = self.timestamp
|
|
|
self.json['pubsub_topics'] = self.topics
|
|
|
self.json['pubsub_path'] = path
|
|
|
+ self.json['pubsub_cursor'] = self.cursor
|
|
|
|
|
|
async def publish(self, subscribers: typing.List[Subscriber]):
|
|
|
"""Publishes an object to all subscribers using those topics (or a sub-set thereof)"""
|