|
@@ -32,6 +32,7 @@ import plugins.ldap
|
|
|
PUBSUB_VERSION = '0.4.6'
|
|
|
PUBSUB_CONTENT_TYPE = 'application/vnd.pypubsub-stream'
|
|
|
PUBSUB_DEFAULT_MAX_PAYLOAD_SIZE = 102400
|
|
|
+PUBSUB_DEFAULT_BACKLOG_SIZE = 0
|
|
|
PUBSUB_BAD_REQUEST = "I could not understand your request, sorry! Please see https://pubsub.apache.org/api.html \
|
|
|
for usage documentation.\n"
|
|
|
PUBSUB_PAYLOAD_RECEIVED = "Payload received, thank you very much!\n"
|
|
@@ -46,6 +47,7 @@ class Server:
|
|
|
self.lconfig = None
|
|
|
self.subscribers = []
|
|
|
self.pending_events = []
|
|
|
+ self.backlog = []
|
|
|
self.last_ping = time.time()
|
|
|
self.server = None
|
|
|
|
|
@@ -100,7 +102,15 @@ class Server:
|
|
|
body = await request.text()
|
|
|
as_json = json.loads(body)
|
|
|
assert isinstance(as_json, dict) # Payload MUST be an dictionary object, {...}
|
|
|
- self.pending_events.append(Payload(request.path, as_json))
|
|
|
+ pl = Payload(request.path, as_json)
|
|
|
+ self.pending_events.append(pl)
|
|
|
+ backlog_size = self.config['clients'].get('payload_backlog_size', PUBSUB_DEFAULT_BACKLOG_SIZE)
|
|
|
+ if backlog_size > 0:
|
|
|
+ self.backlog.append(pl)
|
|
|
+ # If backlog has grown too large, delete the first (oldest) item in it.
|
|
|
+ if len(self.backlog) > backlog_size:
|
|
|
+ del self.backlog[0]
|
|
|
+
|
|
|
resp = aiohttp.web.Response(headers=headers, status=202, text=PUBSUB_PAYLOAD_RECEIVED)
|
|
|
return resp
|
|
|
except json.decoder.JSONDecodeError:
|
|
@@ -122,11 +132,25 @@ class Server:
|
|
|
if auth:
|
|
|
subscriber.acl = await subscriber.parse_acl(auth)
|
|
|
|
|
|
+ # Subscribe the user before we deal with the potential backlog request and pings
|
|
|
self.subscribers.append(subscriber)
|
|
|
resp.content_type = PUBSUB_CONTENT_TYPE
|
|
|
try:
|
|
|
resp.enable_chunked_encoding()
|
|
|
await resp.prepare(request)
|
|
|
+
|
|
|
+ # Is the client requesting a backlog of items?
|
|
|
+ backlog = request.headers.get('X-Fetch-Since')
|
|
|
+ if backlog:
|
|
|
+ try:
|
|
|
+ backlog_ts = int(backlog)
|
|
|
+ except ValueError: # Default to 0 if we can't parse the epoch
|
|
|
+ backlog_ts = 0
|
|
|
+ # For each item, publish to client if new enough.
|
|
|
+ for item in self.backlog:
|
|
|
+ if item.timestamp >= backlog_ts:
|
|
|
+ await item.publish([subscriber])
|
|
|
+
|
|
|
while True:
|
|
|
await subscriber.ping()
|
|
|
if subscriber not in self.subscribers: # If we got dislodged somehow, end session
|
|
@@ -232,6 +256,7 @@ class Payload:
|
|
|
"""A payload (event) object sent by a registered publisher."""
|
|
|
def __init__(self, path, data):
|
|
|
self.json = data
|
|
|
+ self.timestamp = time.time()
|
|
|
self.topics = [x for x in path.split('/') if x]
|
|
|
self.private = False
|
|
|
|
|
@@ -240,6 +265,7 @@ class Payload:
|
|
|
self.private = True
|
|
|
del self.topics[0] # Remove the private bit from topics now.
|
|
|
|
|
|
+ self.json['pubsub_timestamp'] = self.timestamp
|
|
|
self.json['pubsub_topics'] = self.topics
|
|
|
self.json['pubsub_path'] = path
|
|
|
|