Browse Source

0.5.0: Add SQS support

Daniel Gruno 5 years ago
parent
commit
c9eae4cfee
6 changed files with 128 additions and 4 deletions
  1. 3 0
      CHANGELOG.md
  2. 14 0
      README.md
  3. 89 0
      plugins/sqs.py
  4. 13 3
      pypubsub.py
  5. 6 0
      pypubsub.yaml
  6. 3 1
      requirements.txt

+ 3 - 0
CHANGELOG.md

@@ -1,3 +1,6 @@
+# 0.5.0
+- Added SQS support for weaving in items from AWS SQS
+
 # 0.4.6
 - Changed content type to better reflect that this is a custom stream
 - Switched to internal counter for number of requests served

+ 14 - 0
README.md

@@ -196,5 +196,19 @@ PyPubSub supports ACL via asynchronous LDAP, either through group memberships or
 
 See `pypubsub.yaml` for an LDAP example.
 
+## Working with Amazon SQS
+PyPubSub supports AWS SQS for weaving in payloads from their server-less Simple Queue Services.
+Multiple queues can be supported and items pushed to SQS will seamlessly appear in the 
+pubsub stream. For these objects to be compatible with pypubsub, they must be JSONified 
+strings with a `pubsub_path` element specifying the URI they would have otherwise been 
+posted to via PyPubSub, for instance `"pubsub_path": "/fruits/apples"` for a public 
+payload or `"pubsub_path": "/private/secretstuff"` for a private (auth-required) 
+payload. If no such path is specified, PyPubSub will assume a default empty topic 
+list (free-for-all).
+
+For more information on how to configure SQS, please see `pypubsub.yaml`.
+SQS support assumes that the AWS CLI has been set up, and the user has AWS configured 
+in their .aws directory before startup.
+
 ## License
 PyPubSub is licensed under the Apache License v/2.

+ 89 - 0
plugins/sqs.py

@@ -0,0 +1,89 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+""" This is the SQS component of PyPubSub """
+
+import asyncio
+import aiobotocore
+import botocore.exceptions
+import sys
+import json
+import pypubsub
+
+# Global to hold ID of all items seem across all queues, to dedup things.
+ITEMS_SEEN = []
+
+
+async def get_payloads(server, config):
+    # Assume everything is configured in the client's .aws config
+    session = aiobotocore.get_session()
+    queue_name = config.get('queue', '???')
+    async with session.create_client('sqs', region_name=config.get('region', 'default')) as client:
+        try:
+            response = await client.get_queue_url(QueueName=queue_name)
+        except botocore.exceptions.ClientError as err:
+            if err.response['Error']['Code'] == \
+                    'AWS.SimpleQueueService.NonExistentQueue':
+                print(f"SQS item {queue_name} does not exist, bailing!")
+                return
+            else:
+                raise
+        queue_url = response['QueueUrl']
+        print(f"Connected to SQS {queue_url}, reading stream...")
+        while True:
+            try:
+                response = await client.receive_message(
+                    QueueUrl=queue_url,
+                    WaitTimeSeconds=3,
+                )
+
+                if 'Messages' in response:
+                    for msg in response['Messages']:
+                        body = msg.get('Body', '{}')
+                        mid = msg.get('MessageId', '')
+                        try:
+                            # If we already logged this one, but couldn't delete - skip payload construction,
+                            # but do try to remove it again...
+                            if mid not in ITEMS_SEEN:
+                                js = json.loads(body)
+                                path = js.get('pubsub_path', '/')  # Default to catch-all pubsub topic
+                                payload = pypubsub.Payload(path, js)
+                                server.pending_events.append(payload)
+                                backlog_size = server.config['clients'].get('payload_backlog_size',
+                                                                            pypubsub.PUBSUB_DEFAULT_BACKLOG_SIZE)
+                                if backlog_size > 0:
+                                    server.backlog.append(payload)
+                        except ValueError as e:
+                            print(f"Could not parse payload from SQS: {e}")
+                        # Do we delete messages or keep them?
+                        if config.get('delete'):
+                            try:
+                                await client.delete_message(
+                                    QueueUrl=queue_url,
+                                    ReceiptHandle=msg['ReceiptHandle']
+                                )
+                                ITEMS_SEEN.remove(mid) # Remove if found and now deleted
+                            except Exception as e:
+                                if mid not in ITEMS_SEEN:
+                                    print(f"Could not remove item from SQS, marking as potential later duplicate!")
+                                    print(e)
+                                    ITEMS_SEEN.append(mid)
+                        else: # dedup nonetheless
+                            ITEMS_SEEN.append(mid)
+            except KeyboardInterrupt:
+                break

+ 13 - 3
pypubsub.py

@@ -27,9 +27,10 @@ import binascii
 import base64
 import argparse
 import plugins.ldap
+import plugins.sqs
 
 # Some consts
-PUBSUB_VERSION = '0.4.6'
+PUBSUB_VERSION = '0.5.0'
 PUBSUB_CONTENT_TYPE = 'application/vnd.pypubsub-stream'
 PUBSUB_DEFAULT_MAX_PAYLOAD_SIZE = 102400
 PUBSUB_DEFAULT_BACKLOG_SIZE = 0
@@ -48,6 +49,7 @@ class Server:
     def __init__(self, args):
         self.config = yaml.safe_load(open(args.config))
         self.lconfig = None
+        self.sqsconfig = None
         self.subscribers = []
         self.pending_events = []
         self.backlog = []
@@ -68,10 +70,15 @@ class Server:
                 bma = int(bma.replace('d', '')) * 86400
         self.backlog_max_age = bma
 
+        # LDAP configuration present?
         if 'ldap' in self.config.get('clients', {}):
             self.lconfig = self.config['clients']['ldap']
             plugins.ldap.vet_settings(self.lconfig)
         self.acl = {}
+
+        # SQS configuration present?
+        if 'sqs' in self.config:
+            self.sqsconfig = self.config.get('sqs')
         try:
             self.acl = yaml.safe_load(open(args.acl))
         except FileNotFoundError:
@@ -190,7 +197,7 @@ class Server:
             resp = aiohttp.web.Response(headers=headers, status=400, text=PUBSUB_BAD_REQUEST)
             return resp
 
-    async def server_loop(self):
+    async def server_loop(self, loop):
         self.server = aiohttp.web.Server(self.handle_request)
         runner = aiohttp.web.ServerRunner(self.server)
         await runner.setup()
@@ -199,12 +206,15 @@ class Server:
         print("==== PyPubSub v/%s starting... ====" % PUBSUB_VERSION)
         print("==== Serving up PubSub goodness at %s:%s ====" % (
         self.config['server']['bind'], self.config['server']['port']))
+        if self.sqsconfig:
+            for key, config in self.sqsconfig.items():
+                loop.create_task(plugins.sqs.get_payloads(self, config))
         await self.poll()
 
     def run(self):
         loop = asyncio.get_event_loop()
         try:
-            loop.run_until_complete(self.server_loop())
+            loop.run_until_complete(self.server_loop(loop))
         except KeyboardInterrupt:
             pass
         loop.close()

+ 6 - 0
pypubsub.yaml

@@ -30,3 +30,9 @@ clients:
 #        - accounts
 #        - contracts
 #        - hr
+#sqs:
+#  defaultQueue:
+#    queue: myQueueName.fifo
+#    region: us-east-1
+#    delete: true  # If set, deletes items from SQS as they are processed. Comment out to keep items in SQS
+

+ 3 - 1
requirements.txt

@@ -2,4 +2,6 @@ aiohttp>=3.6.0
 asyncio>=3.4.3
 netaddr>=0.7.19
 python-ldap>=3.0.0
-PyYAML~=5.1.2
+PyYAML~=5.1.2
+aiobotocore~=1.0.4
+botocore~=1.15.32