123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- """ This is the SQS component of PyPubSub """
- import asyncio
- import aiobotocore
- import botocore.exceptions
- import json
- import pypubsub
- import typing
- ITEMS_SEEN: typing.List[str] = []
- async def get_payloads(server: pypubsub.Server, config: dict):
-
- session = aiobotocore.get_session()
- queue_name = config.get('queue', '???')
- while True:
- async with session.create_client('sqs',
- aws_secret_access_key=config.get('secret'),
- aws_access_key_id=config.get('key'),
- region_name=config.get('region', 'default')
- ) as client:
- try:
- response = await client.get_queue_url(QueueName=queue_name)
- except botocore.exceptions.ClientError as err:
- if err.response['Error']['Code'] == \
- 'AWS.SimpleQueueService.NonExistentQueue':
- print(f"SQS item {queue_name} does not exist, bailing!")
- return
- else:
- raise
- queue_url = response['QueueUrl']
- print(f"Connected to SQS {queue_url}, reading stream...")
- while True:
- try:
- response = await client.receive_message(
- QueueUrl=queue_url,
- WaitTimeSeconds=3,
- )
- if 'Messages' in response:
- for msg in response['Messages']:
- body = msg.get('Body', '{}')
- mid = msg.get('MessageId', '')
- try:
-
-
- if mid not in ITEMS_SEEN:
- js = json.loads(body)
- path = js.get('pubsub_path', '/')
- payload = pypubsub.Payload(path, js)
- server.pending_events.put_nowait(payload)
- backlog_size = server.config.backlog.queue_size
- if backlog_size > 0:
- server.backlog.append(payload)
- except ValueError as e:
- print(f"Could not parse payload from SQS: {e}")
-
- if config.get('delete'):
- try:
- await client.delete_message(
- QueueUrl=queue_url,
- ReceiptHandle=msg['ReceiptHandle']
- )
- if mid in ITEMS_SEEN:
- ITEMS_SEEN.remove(mid)
- except Exception as e:
- if mid not in ITEMS_SEEN:
- print(f"Could not remove item from SQS, marking as potential later duplicate!")
- print(e)
- ITEMS_SEEN.append(mid)
- else:
- ITEMS_SEEN.append(mid)
- except KeyboardInterrupt:
- return
- except botocore.exceptions.ClientError as e:
- print(f"Could not receive message(s) from SQS, bouncing connection:")
- print(e)
- break
- await asyncio.sleep(10)
|