|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
""" This is the SQS component of PyPubSub """
|
|
|
|
|
|
+import asyncio
|
|
|
import aiobotocore
|
|
|
import botocore.exceptions
|
|
|
import json
|
|
@@ -32,61 +33,67 @@ async def get_payloads(server: pypubsub.Server, config: dict):
|
|
|
# Assume everything is configured in the client's .aws config
|
|
|
session = aiobotocore.get_session()
|
|
|
queue_name = config.get('queue', '???')
|
|
|
- async with session.create_client('sqs',
|
|
|
- aws_secret_access_key=config.get('secret'),
|
|
|
- aws_access_key_id=config.get('key'),
|
|
|
- region_name=config.get('region', 'default')
|
|
|
- ) as client:
|
|
|
- try:
|
|
|
- response = await client.get_queue_url(QueueName=queue_name)
|
|
|
- except botocore.exceptions.ClientError as err:
|
|
|
- if err.response['Error']['Code'] == \
|
|
|
- 'AWS.SimpleQueueService.NonExistentQueue':
|
|
|
- print(f"SQS item {queue_name} does not exist, bailing!")
|
|
|
- return
|
|
|
- else:
|
|
|
- raise
|
|
|
- queue_url = response['QueueUrl']
|
|
|
- print(f"Connected to SQS {queue_url}, reading stream...")
|
|
|
- while True:
|
|
|
+ while True:
|
|
|
+ async with session.create_client('sqs',
|
|
|
+ aws_secret_access_key=config.get('secret'),
|
|
|
+ aws_access_key_id=config.get('key'),
|
|
|
+ region_name=config.get('region', 'default')
|
|
|
+ ) as client:
|
|
|
try:
|
|
|
- response = await client.receive_message(
|
|
|
- QueueUrl=queue_url,
|
|
|
- WaitTimeSeconds=3,
|
|
|
- )
|
|
|
+ response = await client.get_queue_url(QueueName=queue_name)
|
|
|
+ except botocore.exceptions.ClientError as err:
|
|
|
+ if err.response['Error']['Code'] == \
|
|
|
+ 'AWS.SimpleQueueService.NonExistentQueue':
|
|
|
+ print(f"SQS item {queue_name} does not exist, bailing!")
|
|
|
+ return
|
|
|
+ else:
|
|
|
+ raise
|
|
|
+ queue_url = response['QueueUrl']
|
|
|
+ print(f"Connected to SQS {queue_url}, reading stream...")
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ response = await client.receive_message(
|
|
|
+ QueueUrl=queue_url,
|
|
|
+ WaitTimeSeconds=3,
|
|
|
+ )
|
|
|
|
|
|
- if 'Messages' in response:
|
|
|
- for msg in response['Messages']:
|
|
|
- body = msg.get('Body', '{}')
|
|
|
- mid = msg.get('MessageId', '')
|
|
|
- try:
|
|
|
- # If we already logged this one, but couldn't delete - skip payload construction,
|
|
|
- # but do try to remove it again...
|
|
|
- if mid not in ITEMS_SEEN:
|
|
|
- js = json.loads(body)
|
|
|
- path = js.get('pubsub_path', '/') # Default to catch-all pubsub topic
|
|
|
- payload = pypubsub.Payload(path, js)
|
|
|
- server.pending_events.put_nowait(payload)
|
|
|
- backlog_size = server.config.backlog.queue_size
|
|
|
- if backlog_size > 0:
|
|
|
- server.backlog.append(payload)
|
|
|
- except ValueError as e:
|
|
|
- print(f"Could not parse payload from SQS: {e}")
|
|
|
- # Do we delete messages or keep them?
|
|
|
- if config.get('delete'):
|
|
|
+ if 'Messages' in response:
|
|
|
+ for msg in response['Messages']:
|
|
|
+ body = msg.get('Body', '{}')
|
|
|
+ mid = msg.get('MessageId', '')
|
|
|
try:
|
|
|
- await client.delete_message(
|
|
|
- QueueUrl=queue_url,
|
|
|
- ReceiptHandle=msg['ReceiptHandle']
|
|
|
- )
|
|
|
- if mid in ITEMS_SEEN:
|
|
|
- ITEMS_SEEN.remove(mid) # Remove if found and now deleted
|
|
|
- except Exception as e:
|
|
|
+ # If we already logged this one, but couldn't delete - skip payload construction,
|
|
|
+ # but do try to remove it again...
|
|
|
if mid not in ITEMS_SEEN:
|
|
|
- print(f"Could not remove item from SQS, marking as potential later duplicate!")
|
|
|
- print(e)
|
|
|
- ITEMS_SEEN.append(mid)
|
|
|
- else: # dedup nonetheless
|
|
|
- ITEMS_SEEN.append(mid)
|
|
|
- except KeyboardInterrupt:
|
|
|
- break
|
|
|
+ js = json.loads(body)
|
|
|
+ path = js.get('pubsub_path', '/') # Default to catch-all pubsub topic
|
|
|
+ payload = pypubsub.Payload(path, js)
|
|
|
+ server.pending_events.put_nowait(payload)
|
|
|
+ backlog_size = server.config.backlog.queue_size
|
|
|
+ if backlog_size > 0:
|
|
|
+ server.backlog.append(payload)
|
|
|
+ except ValueError as e:
|
|
|
+ print(f"Could not parse payload from SQS: {e}")
|
|
|
+ # Do we delete messages or keep them?
|
|
|
+ if config.get('delete'):
|
|
|
+ try:
|
|
|
+ await client.delete_message(
|
|
|
+ QueueUrl=queue_url,
|
|
|
+ ReceiptHandle=msg['ReceiptHandle']
|
|
|
+ )
|
|
|
+ if mid in ITEMS_SEEN:
|
|
|
+ ITEMS_SEEN.remove(mid) # Remove if found and now deleted
|
|
|
+ except Exception as e:
|
|
|
+ if mid not in ITEMS_SEEN:
|
|
|
+ print(f"Could not remove item from SQS, marking as potential later duplicate!")
|
|
|
+ print(e)
|
|
|
+ ITEMS_SEEN.append(mid)
|
|
|
+ else: # dedup nonetheless
|
|
|
+ ITEMS_SEEN.append(mid)
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ return
|
|
|
+ except botocore.exceptions.ClientError as e:
|
|
|
+ print(f"Could not receive message(s) from SQS, bouncing connection:")
|
|
|
+ print(e)
|
|
|
+ break
|
|
|
+ await asyncio.sleep(10) # Sleep for 10 before bouncing SQS connection so as to not retry too often.
|