12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- """ This is the SQS component of PyPubSub """
- import asyncio
- import aiobotocore
- import botocore.exceptions
- import sys
- import json
- import pypubsub
- ITEMS_SEEN = []
- async def get_payloads(server, config):
-
- session = aiobotocore.get_session()
- queue_name = config.get('queue', '???')
- async with session.create_client('sqs',
- aws_secret_access_key=config.get('secret'),
- aws_access_key_id=config.get('key'),
- region_name=config.get('region', 'default')
- ) as client:
- try:
- response = await client.get_queue_url(QueueName=queue_name)
- except botocore.exceptions.ClientError as err:
- if err.response['Error']['Code'] == \
- 'AWS.SimpleQueueService.NonExistentQueue':
- print(f"SQS item {queue_name} does not exist, bailing!")
- return
- else:
- raise
- queue_url = response['QueueUrl']
- print(f"Connected to SQS {queue_url}, reading stream...")
- while True:
- try:
- response = await client.receive_message(
- QueueUrl=queue_url,
- WaitTimeSeconds=3,
- )
- if 'Messages' in response:
- for msg in response['Messages']:
- body = msg.get('Body', '{}')
- mid = msg.get('MessageId', '')
- try:
-
-
- if mid not in ITEMS_SEEN:
- js = json.loads(body)
- path = js.get('pubsub_path', '/')
- payload = pypubsub.Payload(path, js)
- server.pending_events.append(payload)
- backlog_size = server.config['clients'].get('payload_backlog_size',
- pypubsub.PUBSUB_DEFAULT_BACKLOG_SIZE)
- if backlog_size > 0:
- server.backlog.append(payload)
- except ValueError as e:
- print(f"Could not parse payload from SQS: {e}")
-
- if config.get('delete'):
- try:
- await client.delete_message(
- QueueUrl=queue_url,
- ReceiptHandle=msg['ReceiptHandle']
- )
- if mid in ITEMS_SEEN:
- ITEMS_SEEN.remove(mid)
- except Exception as e:
- if mid not in ITEMS_SEEN:
- print(f"Could not remove item from SQS, marking as potential later duplicate!")
- print(e)
- ITEMS_SEEN.append(mid)
- else:
- ITEMS_SEEN.append(mid)
- except KeyboardInterrupt:
- break
|