sqs.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. #!/usr/bin/env python3
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. """ This is the SQS component of PyPubSub """
  19. import asyncio
  20. import aiobotocore
  21. import botocore.exceptions
  22. import json
  23. import pypubsub
  24. import typing
  25. # Global to hold ID of all items seem across all queues, to dedup things.
  26. ITEMS_SEEN: typing.List[str] = []
  27. async def get_payloads(server: pypubsub.Server, config: dict):
  28. # Assume everything is configured in the client's .aws config
  29. session = aiobotocore.get_session()
  30. queue_name = config.get('queue', '???')
  31. while True:
  32. async with session.create_client('sqs',
  33. aws_secret_access_key=config.get('secret'),
  34. aws_access_key_id=config.get('key'),
  35. region_name=config.get('region', 'default')
  36. ) as client:
  37. try:
  38. response = await client.get_queue_url(QueueName=queue_name)
  39. except botocore.exceptions.ClientError as err:
  40. if err.response['Error']['Code'] == \
  41. 'AWS.SimpleQueueService.NonExistentQueue':
  42. print(f"SQS item {queue_name} does not exist, bailing!")
  43. return
  44. else:
  45. raise
  46. queue_url = response['QueueUrl']
  47. print(f"Connected to SQS {queue_url}, reading stream...")
  48. while True:
  49. try:
  50. response = await client.receive_message(
  51. QueueUrl=queue_url,
  52. WaitTimeSeconds=3,
  53. )
  54. if 'Messages' in response:
  55. for msg in response['Messages']:
  56. body = msg.get('Body', '{}')
  57. mid = msg.get('MessageId', '')
  58. try:
  59. # If we already logged this one, but couldn't delete - skip payload construction,
  60. # but do try to remove it again...
  61. if mid not in ITEMS_SEEN:
  62. js = json.loads(body)
  63. path = js.get('pubsub_path', '/') # Default to catch-all pubsub topic
  64. payload = pypubsub.Payload(path, js)
  65. server.pending_events.put_nowait(payload)
  66. backlog_size = server.config.backlog.queue_size
  67. if backlog_size > 0:
  68. server.backlog.append(payload)
  69. except ValueError as e:
  70. print(f"Could not parse payload from SQS: {e}")
  71. # Do we delete messages or keep them?
  72. if config.get('delete'):
  73. try:
  74. await client.delete_message(
  75. QueueUrl=queue_url,
  76. ReceiptHandle=msg['ReceiptHandle']
  77. )
  78. if mid in ITEMS_SEEN:
  79. ITEMS_SEEN.remove(mid) # Remove if found and now deleted
  80. except Exception as e:
  81. if mid not in ITEMS_SEEN:
  82. print(f"Could not remove item from SQS, marking as potential later duplicate!")
  83. print(e)
  84. ITEMS_SEEN.append(mid)
  85. else: # dedup nonetheless
  86. ITEMS_SEEN.append(mid)
  87. except KeyboardInterrupt:
  88. return
  89. except botocore.exceptions.ClientError as e:
  90. print(f"Could not receive message(s) from SQS, bouncing connection:")
  91. print(e)
  92. break
  93. await asyncio.sleep(10) # Sleep for 10 before bouncing SQS connection so as to not retry too often.