sqs.py 4.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. #!/usr/bin/env python3
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. """ This is the SQS component of PyPubSub """
  19. import aiobotocore
  20. import botocore.exceptions
  21. import json
  22. import pypubsub
  23. import typing
  24. # Global to hold ID of all items seem across all queues, to dedup things.
  25. ITEMS_SEEN: typing.List[str] = []
  26. async def get_payloads(server: pypubsub.Server, config: dict):
  27. # Assume everything is configured in the client's .aws config
  28. session = aiobotocore.get_session()
  29. queue_name = config.get('queue', '???')
  30. async with session.create_client('sqs',
  31. aws_secret_access_key=config.get('secret'),
  32. aws_access_key_id=config.get('key'),
  33. region_name=config.get('region', 'default')
  34. ) as client:
  35. try:
  36. response = await client.get_queue_url(QueueName=queue_name)
  37. except botocore.exceptions.ClientError as err:
  38. if err.response['Error']['Code'] == \
  39. 'AWS.SimpleQueueService.NonExistentQueue':
  40. print(f"SQS item {queue_name} does not exist, bailing!")
  41. return
  42. else:
  43. raise
  44. queue_url = response['QueueUrl']
  45. print(f"Connected to SQS {queue_url}, reading stream...")
  46. while True:
  47. try:
  48. response = await client.receive_message(
  49. QueueUrl=queue_url,
  50. WaitTimeSeconds=3,
  51. )
  52. if 'Messages' in response:
  53. for msg in response['Messages']:
  54. body = msg.get('Body', '{}')
  55. mid = msg.get('MessageId', '')
  56. try:
  57. # If we already logged this one, but couldn't delete - skip payload construction,
  58. # but do try to remove it again...
  59. if mid not in ITEMS_SEEN:
  60. js = json.loads(body)
  61. path = js.get('pubsub_path', '/') # Default to catch-all pubsub topic
  62. payload = pypubsub.Payload(path, js)
  63. server.pending_events.put_nowait(payload)
  64. backlog_size = server.config.backlog.queue_size
  65. if backlog_size > 0:
  66. server.backlog.append(payload)
  67. except ValueError as e:
  68. print(f"Could not parse payload from SQS: {e}")
  69. # Do we delete messages or keep them?
  70. if config.get('delete'):
  71. try:
  72. await client.delete_message(
  73. QueueUrl=queue_url,
  74. ReceiptHandle=msg['ReceiptHandle']
  75. )
  76. if mid in ITEMS_SEEN:
  77. ITEMS_SEEN.remove(mid) # Remove if found and now deleted
  78. except Exception as e:
  79. if mid not in ITEMS_SEEN:
  80. print(f"Could not remove item from SQS, marking as potential later duplicate!")
  81. print(e)
  82. ITEMS_SEEN.append(mid)
  83. else: # dedup nonetheless
  84. ITEMS_SEEN.append(mid)
  85. except KeyboardInterrupt:
  86. break