#!/usr/bin/env python3 # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ This is the SQS component of PyPubSub """ import asyncio import aiobotocore import botocore.exceptions import json import pypubsub import typing # Global to hold ID of all items seem across all queues, to dedup things. ITEMS_SEEN: typing.List[str] = [] async def get_payloads(server: pypubsub.Server, config: dict): # Assume everything is configured in the client's .aws config session = aiobotocore.get_session() queue_name = config.get('queue', '???') while True: async with session.create_client('sqs', aws_secret_access_key=config.get('secret'), aws_access_key_id=config.get('key'), region_name=config.get('region', 'default') ) as client: try: response = await client.get_queue_url(QueueName=queue_name) except botocore.exceptions.ClientError as err: if err.response['Error']['Code'] == \ 'AWS.SimpleQueueService.NonExistentQueue': print(f"SQS item {queue_name} does not exist, bailing!") return else: raise queue_url = response['QueueUrl'] print(f"Connected to SQS {queue_url}, reading stream...") while True: try: response = await client.receive_message( QueueUrl=queue_url, WaitTimeSeconds=3, ) if 'Messages' in response: for msg in response['Messages']: body = msg.get('Body', '{}') mid = msg.get('MessageId', '') try: # If we already logged this one, but couldn't delete - skip payload construction, # but do try to remove it again... if mid not in ITEMS_SEEN: js = json.loads(body) path = js.get('pubsub_path', '/') # Default to catch-all pubsub topic payload = pypubsub.Payload(path, js) server.pending_events.put_nowait(payload) backlog_size = server.config.backlog.queue_size if backlog_size > 0: server.backlog.append(payload) except ValueError as e: print(f"Could not parse payload from SQS: {e}") # Do we delete messages or keep them? if config.get('delete'): try: await client.delete_message( QueueUrl=queue_url, ReceiptHandle=msg['ReceiptHandle'] ) if mid in ITEMS_SEEN: ITEMS_SEEN.remove(mid) # Remove if found and now deleted except Exception as e: if mid not in ITEMS_SEEN: print(f"Could not remove item from SQS, marking as potential later duplicate!") print(e) ITEMS_SEEN.append(mid) else: # dedup nonetheless ITEMS_SEEN.append(mid) except KeyboardInterrupt: return except botocore.exceptions.ClientError as e: print(f"Could not receive message(s) from SQS, bouncing connection:") print(e) break await asyncio.sleep(10) # Sleep for 10 before bouncing SQS connection so as to not retry too often.