|
@@ -40,8 +40,10 @@ PUBSUB_NOT_ALLOWED = "You are not authorized to deliver payloads!\n"
|
|
|
PUBSUB_BAD_PAYLOAD = "Bad payload type. Payloads must be JSON dictionary objects, {..}!\n"
|
|
|
PUBSUB_PAYLOAD_TOO_LARGE = "Payload is too large for me to serve, please make it shorter.\n"
|
|
|
|
|
|
+
|
|
|
class Server:
|
|
|
"""Main server class, responsible for handling requests and publishing events """
|
|
|
+
|
|
|
def __init__(self, args):
|
|
|
self.config = yaml.safe_load(open(args.config))
|
|
|
self.lconfig = None
|
|
@@ -96,7 +98,7 @@ class Server:
|
|
|
if request.can_read_body:
|
|
|
try:
|
|
|
if request.content_length > self.config['clients'].get('max_payload_size',
|
|
|
- PUBSUB_DEFAULT_MAX_PAYLOAD_SIZE):
|
|
|
+ PUBSUB_DEFAULT_MAX_PAYLOAD_SIZE):
|
|
|
resp = aiohttp.web.Response(headers=headers, status=400, text=PUBSUB_PAYLOAD_TOO_LARGE)
|
|
|
return resp
|
|
|
body = await request.text()
|
|
@@ -153,7 +155,7 @@ class Server:
|
|
|
|
|
|
while True:
|
|
|
await subscriber.ping()
|
|
|
- if subscriber not in self.subscribers: # If we got dislodged somehow, end session
|
|
|
+ if subscriber not in self.subscribers: # If we got dislodged somehow, end session
|
|
|
break
|
|
|
await asyncio.sleep(5)
|
|
|
# We may get exception types we don't have imported, so grab ANY exception and kick out the subscriber
|
|
@@ -177,7 +179,8 @@ class Server:
|
|
|
site = aiohttp.web.TCPSite(runner, self.config['server']['bind'], self.config['server']['port'])
|
|
|
await site.start()
|
|
|
print("==== PyPubSub v/%s starting... ====" % PUBSUB_VERSION)
|
|
|
- print("==== Serving up PubSub goodness at %s:%s ====" % (self.config['server']['bind'], self.config['server']['port']))
|
|
|
+ print("==== Serving up PubSub goodness at %s:%s ====" % (
|
|
|
+ self.config['server']['bind'], self.config['server']['port']))
|
|
|
await self.poll()
|
|
|
|
|
|
def run(self):
|
|
@@ -191,6 +194,7 @@ class Server:
|
|
|
|
|
|
class Subscriber:
|
|
|
"""Basic subscriber (client) class. Holds information about the connection and ACL"""
|
|
|
+
|
|
|
def __init__(self, server, connection, request):
|
|
|
self.connection = connection
|
|
|
self.acl = {}
|
|
@@ -217,7 +221,7 @@ class Subscriber:
|
|
|
acl = self.server.acl[u].get('acl', {})
|
|
|
# Vet ACL for user
|
|
|
assert isinstance(acl, dict), f"ACL for user {u} " \
|
|
|
- f"must be a dictionary of sub-IDs and topics, but is not."
|
|
|
+ f"must be a dictionary of sub-IDs and topics, but is not."
|
|
|
# Make sure each ACL segment is a list of topics
|
|
|
for k, v in acl.items():
|
|
|
assert isinstance(v, list), f"ACL segment {k} for user {u} is not a list of topics!"
|
|
@@ -232,7 +236,8 @@ class Subscriber:
|
|
|
assert isinstance(v, dict), f"ACL segment {k} for user {u} is not a dictionary of segments!"
|
|
|
for segment, topics in v.items():
|
|
|
print(f"Enabling ACL segment {segment} for user {u}")
|
|
|
- assert isinstance(topics, list), f"ACL segment {segment} for user {u} is not a list of topics!"
|
|
|
+ assert isinstance(topics,
|
|
|
+ list), f"ACL segment {segment} for user {u} is not a list of topics!"
|
|
|
acl[segment] = topics
|
|
|
return acl
|
|
|
except binascii.Error as e:
|
|
@@ -254,6 +259,7 @@ class Subscriber:
|
|
|
|
|
|
class Payload:
|
|
|
"""A payload (event) object sent by a registered publisher."""
|
|
|
+
|
|
|
def __init__(self, path, data):
|
|
|
self.json = data
|
|
|
self.timestamp = time.time()
|
|
@@ -295,11 +301,12 @@ class Payload:
|
|
|
bad_subs.append(sub)
|
|
|
return bad_subs
|
|
|
|
|
|
+
|
|
|
if __name__ == '__main__':
|
|
|
parser = argparse.ArgumentParser()
|
|
|
parser.add_argument("--config", help="Configuration file to load (default: pypubsub.yaml)", default="pypubsub.yaml")
|
|
|
- parser.add_argument("--acl", help="ACL Configuration file to load (default: pypubsub_acl.yaml)", default="pypubsub_acl.yaml")
|
|
|
+ parser.add_argument("--acl", help="ACL Configuration file to load (default: pypubsub_acl.yaml)",
|
|
|
+ default="pypubsub_acl.yaml")
|
|
|
cliargs = parser.parse_args()
|
|
|
pubsub_server = Server(cliargs)
|
|
|
pubsub_server.run()
|
|
|
-
|