pypubsub.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. #!/usr/bin/env python3
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. """PyPubSub - a simple publisher/subscriber service written in Python 3"""
  19. import asyncio
  20. import aiohttp.web
  21. import time
  22. import json
  23. import yaml
  24. import netaddr
  25. import binascii
  26. import base64
  27. import pypubsub_ldap
  28. # Some consts
  29. PUBSUB_VERSION = '0.4.0'
  30. PUBSUB_BAD_REQUEST = "I could not understand your request, sorry! Please see https://pubsub.apache.org/api.html \
  31. for usage documentation.\n"
  32. PUBSUB_PAYLOAD_RECEIVED = "Payload received, thank you very much!\n"
  33. PUBSUB_NOT_ALLOWED = "You are not authorized to deliver payloads!\n"
  34. PUBSUB_BAD_PAYLOAD = "Bad payload type. Payloads must be JSON dictionary objects, {..}!\n"
  35. class Server:
  36. def __init__(self):
  37. self.config = yaml.safe_load(open('pypubsub.yaml'))
  38. self.lconfig = None
  39. self.no_requests = 0
  40. self.subscribers = []
  41. self.pending_events = []
  42. self.last_ping = time.time()
  43. if 'ldap' in self.config.get('clients', {}):
  44. pypubsub_ldap.vet_settings(self.config['clients']['ldap'])
  45. self.lconfig = self.config['clients']['ldap']
  46. self.acl = {}
  47. try:
  48. self.acl = yaml.safe_load(open('pypubsub_acl.yaml'))
  49. except FileNotFoundError:
  50. print("No ACL configuration file found, private events will not be broadcast.")
  51. self.payloaders = [netaddr.IPNetwork(x) for x in self.config['clients']['payloaders']]
  52. async def poll(self):
  53. """Polls for new stuff to publish, and if found, publishes to whomever wants it."""
  54. while True:
  55. for payload in self.pending_events:
  56. await payload.publish(self.subscribers)
  57. self.pending_events = []
  58. await asyncio.sleep(0.5)
  59. async def handle_request(self, request):
  60. """Generic handler for all incoming HTTP requests"""
  61. self.no_requests += 1
  62. # Define response headers first...
  63. headers = {
  64. 'Server': 'PyPubSub/%s' % PUBSUB_VERSION,
  65. 'X-Subscribers': str(len(self.subscribers)),
  66. 'X-Requests': str(self.no_requests),
  67. }
  68. # Are we handling a publisher payload request? (PUT/POST)
  69. if request.method in ['PUT', 'POST']:
  70. ip = netaddr.IPAddress(request.remote)
  71. allowed = False
  72. for network in self.payloaders:
  73. if ip in network:
  74. allowed = True
  75. break
  76. if not allowed:
  77. resp = aiohttp.web.Response(headers=headers, status=403, text=PUBSUB_NOT_ALLOWED)
  78. return resp
  79. if request.can_read_body:
  80. try:
  81. body = await request.json()
  82. assert isinstance(body, dict) # Payload MUST be an dictionary object, {...}
  83. self.pending_events.append(Payload(request.path, body))
  84. resp = aiohttp.web.Response(headers=headers, status=202, text=PUBSUB_PAYLOAD_RECEIVED)
  85. return resp
  86. except json.decoder.JSONDecodeError:
  87. resp = aiohttp.web.Response(headers=headers, status=400, text=PUBSUB_BAD_REQUEST)
  88. return resp
  89. except AssertionError:
  90. resp = aiohttp.web.Response(headers=headers, status=400, text=PUBSUB_BAD_PAYLOAD)
  91. return resp
  92. # Is this a subscriber request? (GET)
  93. elif request.method == 'GET':
  94. resp = aiohttp.web.StreamResponse(headers=headers)
  95. # We do not support HTTP 1.0 here...
  96. if request.version.major == 1 and request.version.minor == 0:
  97. return resp
  98. subscriber = Subscriber(self, resp, request)
  99. # Is there a basic auth in this request? If so, set up ACL
  100. auth = request.headers.get('Authorization')
  101. if auth:
  102. subscriber.acl = await subscriber.parse_acl(auth)
  103. self.subscribers.append(subscriber)
  104. # We'll change the content type once we're ready
  105. # resp.content_type = 'application/vnd.apache-pubsub-stream'
  106. resp.content_type = 'application/json'
  107. try:
  108. resp.enable_chunked_encoding()
  109. await resp.prepare(request)
  110. while True:
  111. await subscriber.ping()
  112. await asyncio.sleep(5)
  113. # We may get exception types we don't have imported, so grab ANY exception and kick out the subscriber
  114. except:
  115. pass
  116. self.subscribers.remove(subscriber)
  117. return resp
  118. elif request.method == 'HEAD':
  119. resp = aiohttp.web.Response(headers=headers, status=204, text="")
  120. return resp
  121. # I don't know this type of request :/ (DELETE, PATCH, etc)
  122. else:
  123. resp = aiohttp.web.Response(headers=headers, status=400, text=PUBSUB_BAD_REQUEST)
  124. return resp
  125. async def server_loop(self):
  126. server = aiohttp.web.Server(self.handle_request)
  127. runner = aiohttp.web.ServerRunner(server)
  128. await runner.setup()
  129. site = aiohttp.web.TCPSite(runner, self.config['server']['bind'], self.config['server']['port'])
  130. await site.start()
  131. print("==== PyPubSub v/%s starting... ====" % PUBSUB_VERSION)
  132. print("==== Serving up PubSub goodness at %s:%s ====" % (self.config['server']['bind'], self.config['server']['port']))
  133. await self.poll()
  134. def run(self):
  135. loop = asyncio.get_event_loop()
  136. try:
  137. loop.run_until_complete(self.server_loop())
  138. except KeyboardInterrupt:
  139. pass
  140. loop.close()
  141. class Subscriber:
  142. """Basic subscriber (client) class. Holds information about the connection and ACL"""
  143. def __init__(self, server, connection, request):
  144. self.connection = connection
  145. self.acl = {}
  146. self.server = server
  147. # Set topics subscribed to
  148. self.topics = [x for x in request.path.split('/') if x]
  149. # Is the client old and expecting zero-terminators?
  150. self.old_school = False
  151. for ua in self.server.config['clients'].get('oldscoolers', []):
  152. if ua in request.headers.get('User-Agent', ''):
  153. self.old_school = True
  154. break
  155. async def parse_acl(self, basic):
  156. """Sets the ACL if possible, based on Basic Auth"""
  157. try:
  158. decoded = str(base64.decodebytes(bytes(basic.replace('Basic ', ''), 'ascii')), 'utf-8')
  159. u, p = decoded.split(':', 1)
  160. if u in self.server.acl:
  161. acl_pass = self.server.acl[u].get('password')
  162. if acl_pass and acl_pass == p:
  163. acl = self.server.acl[u].get('acl', {})
  164. # Vet ACL for user
  165. assert isinstance(acl, dict), f"ACL for user {u} " \
  166. f"must be a dictionary of sub-IDs and topics, but is not."
  167. # Make sure each ACL segment is a list of topics
  168. for k, v in acl.items():
  169. assert isinstance(v, list), f"ACL segment {k} for user {u} is not a list of topics!"
  170. print(f"Client {u} successfully authenticated (and ACL is valid).")
  171. return acl
  172. elif self.server.lconfig:
  173. acl = {}
  174. groups = await pypubsub_ldap.get_groups(self.server.lconfig, u, p)
  175. # Make sure each ACL segment is a list of topics
  176. for k, v in self.server.lconfig['acl'].items():
  177. assert isinstance(v, list), f"ACL segment {k} for user {u} is not a list of topics!"
  178. if k in groups:
  179. print(f"Enabling ACL segment {k} for user {u}")
  180. acl[k] = v
  181. return acl
  182. except binascii.Error as e:
  183. pass # Bad Basic Auth params, bail quietly
  184. except AssertionError as e:
  185. print(e)
  186. print(f"ACL configuration error: ACL scheme for {u} contains errors, setting ACL to nothing.")
  187. except Exception as e:
  188. print(f"Basic unknown exception occurred: {e}")
  189. return {}
  190. async def ping(self):
  191. """Generic ping-back to the client"""
  192. js = b"%s\n" % json.dumps({"stillalive": time.time()}).encode('utf-8')
  193. if self.old_school:
  194. js += b"\0"
  195. await self.connection.write(js)
  196. class Payload:
  197. """A payload (event) object sent by a registered publisher."""
  198. def __init__(self, path, data):
  199. self.json = data
  200. self.topics = [x for x in path.split('/') if x]
  201. self.private = False
  202. # Private payload?
  203. if self.topics[0] == 'private':
  204. self.private = True
  205. del self.topics[0] # Remove the private bit from topics now.
  206. self.json['pubsub_topics'] = self.topics
  207. self.json['pubsub_path'] = path
  208. async def publish(self, subscribers):
  209. """Publishes an object to all subscribers using those topics (or a sub-set thereof)"""
  210. js = b"%s\n" % json.dumps(self.json).encode('utf-8')
  211. ojs = js + b"\0"
  212. for sub in subscribers:
  213. # If a private payload, check ACL and bail if not a match
  214. if self.private:
  215. can_see = False
  216. for key, private_topics in sub.acl.items():
  217. if all(el in self.topics for el in private_topics):
  218. can_see = True
  219. break
  220. if not can_see:
  221. continue
  222. # If subscribed to all the topics, tell a subscriber about this
  223. if all(el in self.topics for el in sub.topics):
  224. try:
  225. if sub.old_school:
  226. await sub.connection.write(ojs)
  227. else:
  228. await sub.connection.write(js)
  229. except ConnectionResetError:
  230. pass
  231. except RuntimeError:
  232. pass
  233. except AssertionError: # drain helper throws these sometimes
  234. pass
  235. if __name__ == '__main__':
  236. pubsub_server = Server()
  237. pubsub_server.run()