|
@@ -26,12 +26,15 @@ import netaddr
|
|
|
import binascii
|
|
|
import base64
|
|
|
import argparse
|
|
|
+import collections
|
|
|
import plugins.ldap
|
|
|
import plugins.sqs
|
|
|
|
|
|
# Some consts
|
|
|
-PUBSUB_VERSION = '0.5.1'
|
|
|
+PUBSUB_VERSION = '0.6.0'
|
|
|
PUBSUB_CONTENT_TYPE = 'application/vnd.pypubsub-stream'
|
|
|
+PUBSUB_DEFAULT_PORT = 2069
|
|
|
+PUBSUB_DEFAULT_IP = '0.0.0.0'
|
|
|
PUBSUB_DEFAULT_MAX_PAYLOAD_SIZE = 102400
|
|
|
PUBSUB_DEFAULT_BACKLOG_SIZE = 0
|
|
|
PUBSUB_DEFAULT_BACKLOG_AGE = 0
|
|
@@ -43,21 +46,27 @@ PUBSUB_BAD_PAYLOAD = "Bad payload type. Payloads must be JSON dictionary objects
|
|
|
PUBSUB_PAYLOAD_TOO_LARGE = "Payload is too large for me to serve, please make it shorter.\n"
|
|
|
|
|
|
|
|
|
-class Server:
|
|
|
- """Main server class, responsible for handling requests and publishing events """
|
|
|
+class Configuration:
|
|
|
+ def __init__(self, yml):
|
|
|
|
|
|
- def __init__(self, args):
|
|
|
- self.config = yaml.safe_load(open(args.config))
|
|
|
- self.lconfig = None
|
|
|
- self.sqsconfig = None
|
|
|
- self.subscribers = []
|
|
|
- self.pending_events = []
|
|
|
- self.backlog = []
|
|
|
- self.last_ping = time.time()
|
|
|
- self.server = None
|
|
|
+ # LDAP Settings
|
|
|
+ self.ldap = None
|
|
|
+ lyml = yml.get('clients', {}).get('ldap')
|
|
|
+ if isinstance(lyml, dict):
|
|
|
+ self.ldap = plugins.ldap.LDAPConnection(lyml)
|
|
|
+
|
|
|
+ # SQS?
|
|
|
+ self.sqs = yml.get('sqs')
|
|
|
|
|
|
- # Backlog age calcs
|
|
|
- bma = self.config['clients'].get('payload_backlog_max_age', PUBSUB_DEFAULT_BACKLOG_AGE)
|
|
|
+ # Main server config
|
|
|
+ self.server = collections.namedtuple('serverConfig', 'ip port payload_limit')
|
|
|
+ self.server.ip = yml['server'].get('bind', PUBSUB_DEFAULT_IP)
|
|
|
+ self.server.port = int(yml['server'].get('port', PUBSUB_DEFAULT_PORT))
|
|
|
+ self.server.payload_limit = int(yml['server'].get('max_payload_size', PUBSUB_DEFAULT_MAX_PAYLOAD_SIZE))
|
|
|
+
|
|
|
+ # Backlog settings
|
|
|
+ self.backlog = collections.namedtuple('backlogConfig', 'max_age queue_size')
|
|
|
+ bma = yml['server'].get('backlog', {}).get('max_age', PUBSUB_DEFAULT_BACKLOG_AGE)
|
|
|
if isinstance(bma, str):
|
|
|
bma = bma.lower()
|
|
|
if bma.endswith('s'):
|
|
@@ -68,22 +77,33 @@ class Server:
|
|
|
bma = int(bma.replace('h', '')) * 3600
|
|
|
elif bma.endswith('d'):
|
|
|
bma = int(bma.replace('d', '')) * 86400
|
|
|
- self.backlog_max_age = bma
|
|
|
+ self.backlog.max_age = bma
|
|
|
+ self.backlog.queue_size = yml['server'].get('backlog', {}).get('size',
|
|
|
+ PUBSUB_DEFAULT_BACKLOG_SIZE)
|
|
|
|
|
|
- # LDAP configuration present?
|
|
|
- if 'ldap' in self.config.get('clients', {}):
|
|
|
- self.lconfig = self.config['clients']['ldap']
|
|
|
- plugins.ldap.vet_settings(self.lconfig)
|
|
|
- self.acl = {}
|
|
|
+ # Payloaders - clients that can post payloads
|
|
|
+ self.payloaders = [netaddr.IPNetwork(x) for x in yml['clients'].get('payloaders', [])]
|
|
|
+
|
|
|
+ # Binary backwards compatibility
|
|
|
+ self.oldschoolers = yml['clients'].get('oldschoolers', [])
|
|
|
|
|
|
- # SQS configuration present?
|
|
|
- if 'sqs' in self.config:
|
|
|
- self.sqsconfig = self.config.get('sqs')
|
|
|
+class Server:
|
|
|
+ """Main server class, responsible for handling requests and publishing events """
|
|
|
+
|
|
|
+ def __init__(self, args):
|
|
|
+ self.yaml = yaml.safe_load(open(args.config))
|
|
|
+ self.config = Configuration(self.yaml)
|
|
|
+ self.subscribers = []
|
|
|
+ self.pending_events = []
|
|
|
+ self.backlog = []
|
|
|
+ self.last_ping = time.time()
|
|
|
+ self.server = None
|
|
|
+ self.acl = {}
|
|
|
try:
|
|
|
self.acl = yaml.safe_load(open(args.acl))
|
|
|
except FileNotFoundError:
|
|
|
print(f"ACL configuration file {args.acl} not found, private events will not be broadcast.")
|
|
|
- self.payloaders = [netaddr.IPNetwork(x) for x in self.config['clients']['payloaders']]
|
|
|
+
|
|
|
|
|
|
async def poll(self):
|
|
|
"""Polls for new stuff to publish, and if found, publishes to whomever wants it."""
|
|
@@ -110,7 +130,7 @@ class Server:
|
|
|
if request.method in ['PUT', 'POST']:
|
|
|
ip = netaddr.IPAddress(request.remote)
|
|
|
allowed = False
|
|
|
- for network in self.payloaders:
|
|
|
+ for network in self.config.payloaders:
|
|
|
if ip in network:
|
|
|
allowed = True
|
|
|
break
|
|
@@ -119,8 +139,7 @@ class Server:
|
|
|
return resp
|
|
|
if request.can_read_body:
|
|
|
try:
|
|
|
- if request.content_length > self.config['clients'].get('max_payload_size',
|
|
|
- PUBSUB_DEFAULT_MAX_PAYLOAD_SIZE):
|
|
|
+ if request.content_length > self.config.server.payload_limit:
|
|
|
resp = aiohttp.web.Response(headers=headers, status=400, text=PUBSUB_PAYLOAD_TOO_LARGE)
|
|
|
return resp
|
|
|
body = await request.text()
|
|
@@ -128,11 +147,11 @@ class Server:
|
|
|
assert isinstance(as_json, dict) # Payload MUST be an dictionary object, {...}
|
|
|
pl = Payload(request.path, as_json)
|
|
|
self.pending_events.append(pl)
|
|
|
- backlog_size = self.config['clients'].get('payload_backlog_size', PUBSUB_DEFAULT_BACKLOG_SIZE)
|
|
|
- if backlog_size > 0:
|
|
|
+ # Add to backlog?
|
|
|
+ if self.config.backlog.queue_size > 0:
|
|
|
self.backlog.append(pl)
|
|
|
# If backlog has grown too large, delete the first (oldest) item in it.
|
|
|
- if len(self.backlog) > backlog_size:
|
|
|
+ if len(self.backlog) > self.config.backlog.queue_size:
|
|
|
del self.backlog[0]
|
|
|
|
|
|
resp = aiohttp.web.Response(headers=headers, status=202, text=PUBSUB_PAYLOAD_RECEIVED)
|
|
@@ -171,8 +190,8 @@ class Server:
|
|
|
except ValueError: # Default to 0 if we can't parse the epoch
|
|
|
backlog_ts = 0
|
|
|
# If max age is specified, force the TS to minimum that age
|
|
|
- if self.backlog_max_age and self.backlog_max_age > 0:
|
|
|
- backlog_ts = max(backlog_ts, time.time() - self.backlog_max_age)
|
|
|
+ if self.config.backlog.max_age > 0:
|
|
|
+ backlog_ts = max(backlog_ts, time.time() - self.config.backlog.max_age)
|
|
|
# For each item, publish to client if new enough.
|
|
|
for item in self.backlog:
|
|
|
if item.timestamp >= backlog_ts:
|
|
@@ -201,13 +220,13 @@ class Server:
|
|
|
self.server = aiohttp.web.Server(self.handle_request)
|
|
|
runner = aiohttp.web.ServerRunner(self.server)
|
|
|
await runner.setup()
|
|
|
- site = aiohttp.web.TCPSite(runner, self.config['server']['bind'], self.config['server']['port'])
|
|
|
+ site = aiohttp.web.TCPSite(runner, self.config.server.ip, self.config.server.port)
|
|
|
await site.start()
|
|
|
print("==== PyPubSub v/%s starting... ====" % PUBSUB_VERSION)
|
|
|
print("==== Serving up PubSub goodness at %s:%s ====" % (
|
|
|
- self.config['server']['bind'], self.config['server']['port']))
|
|
|
- if self.sqsconfig:
|
|
|
- for key, config in self.sqsconfig.items():
|
|
|
+ self.config.server.ip, self.config.server.port))
|
|
|
+ if self.config.sqs:
|
|
|
+ for key, config in self.config.sqs.items():
|
|
|
loop.create_task(plugins.sqs.get_payloads(self, config))
|
|
|
await self.poll()
|
|
|
|
|
@@ -233,7 +252,7 @@ class Subscriber:
|
|
|
|
|
|
# Is the client old and expecting zero-terminators?
|
|
|
self.old_school = False
|
|
|
- for ua in self.server.config['clients'].get('oldschoolers', []):
|
|
|
+ for ua in self.server.config.oldschoolers:
|
|
|
if ua in request.headers.get('User-Agent', ''):
|
|
|
self.old_school = True
|
|
|
break
|
|
@@ -255,11 +274,11 @@ class Subscriber:
|
|
|
assert isinstance(v, list), f"ACL segment {k} for user {u} is not a list of topics!"
|
|
|
print(f"Client {u} successfully authenticated (and ACL is valid).")
|
|
|
return acl
|
|
|
- elif self.server.lconfig:
|
|
|
+ elif self.server.config.ldap:
|
|
|
acl = {}
|
|
|
- groups = await plugins.ldap.get_groups(self.server.lconfig, u, p)
|
|
|
+ groups = await self.server.config.ldap.get_groups(u,p)
|
|
|
# Make sure each ACL segment is a list of topics
|
|
|
- for k, v in self.server.lconfig['acl'].items():
|
|
|
+ for k, v in self.server.config.ldap.acl.items():
|
|
|
if k in groups:
|
|
|
assert isinstance(v, dict), f"ACL segment {k} for user {u} is not a dictionary of segments!"
|
|
|
for segment, topics in v.items():
|