content_source.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. """
  2. This translates from the Tweet Source and Twitter v2 types
  3. Into ViewModel types such as FeedItem
  4. And the rest of the Taxonomy.
  5. """
  6. from dataclasses import asdict
  7. import os
  8. from flask import session, g, request
  9. import time
  10. import json
  11. from twitter_v2.api import ApiV2TweetSource, TwitterApiV2SocialGraph, ApiV2ConversationSource
  12. from hogumathi_app.view_model import CollectionPage, cleandict
  13. from .view_model import tweet_model_dc_vm, user_model_dc
  14. from hogumathi_app.content_system import register_content_source, get_content, register_hook
  15. DATA_DIR='.data'
  16. def get_tweet_item (tweet_id, me=None):
  17. if me:
  18. twitter_user = session.get(me)
  19. token = twitter_user['access_token']
  20. else:
  21. token = os.environ.get('BEARER_TOKEN')
  22. tweet_source = ApiV2TweetSource(token)
  23. tweets_response = tweet_source.get_tweet(tweet_id, return_dataclass=True)
  24. #print(response_json)
  25. if tweets_response.errors:
  26. # types:
  27. # https://api.twitter.com/2/problems/not-authorized-for-resource (blocked or suspended)
  28. # https://api.twitter.com/2/problems/resource-not-found (deleted)
  29. #print(response_json.get('errors'))
  30. for err in tweets_response.errors:
  31. if not 'type' in err:
  32. print('unknown error type: ' + str(err))
  33. elif err['type'] == 'https://api.twitter.com/2/problems/not-authorized-for-resource':
  34. print('blocked or suspended tweet: ' + err['value'])
  35. elif err['type'] == 'https://api.twitter.com/2/problems/resource-not-found':
  36. print('deleted tweet: ' + err['value'])
  37. else:
  38. print('unknown error')
  39. print(json.dumps(err, indent=2))
  40. includes = tweets_response.includes
  41. tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), tweets_response.data))
  42. collection_page = CollectionPage(
  43. id = tweet_id,
  44. items = tweets,
  45. next_token = None # Fixme
  46. )
  47. return collection_page
  48. def get_bookmarks_feed (user_id, pagination_token=None, max_results=10, me=None):
  49. if not me:
  50. me = g.get('me') or request.args.get('me')
  51. print(f'get_bookmarks_feed. me={me}')
  52. twitter_user = session.get( me )
  53. if not twitter_user:
  54. return None
  55. token = twitter_user['access_token']
  56. tweet_source = ApiV2TweetSource(token)
  57. response_tweets = tweet_source.get_bookmarks(user_id,
  58. pagination_token = pagination_token,
  59. return_dataclass=True,
  60. max_results=max_results
  61. )
  62. #print(response_json)
  63. includes = response_tweets.includes
  64. tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), response_tweets.data))
  65. next_token = response_tweets.meta.next_token
  66. query = {}
  67. if next_token:
  68. query = {
  69. **query,
  70. }
  71. user = {
  72. 'id': user_id
  73. }
  74. ts = int(time.time() * 1000)
  75. with open(f'{DATA_DIR}/cache/bookmarks_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
  76. f.write(json.dumps(cleandict(asdict(response_tweets))))
  77. collection_page = CollectionPage(
  78. id = user_id, # FIXME this should perhaps be the unresolved id
  79. items = tweets,
  80. next_token = next_token
  81. )
  82. return collection_page
  83. def get_user_feed (user_id, pagination_token=None, me=None, exclude_replies=False, exclude_retweets=True, format=None):
  84. if not me and 'me' in g:
  85. me = g.me
  86. if 'twitter_user' in g and g.twitter_user:
  87. token = g.twitter_user['access_token']
  88. # issue: retweets don't come back if we request non_public_metrics
  89. is_me = False and user_id == g.twitter_user['id']
  90. else:
  91. token = os.environ.get('BEARER_TOKEN')
  92. is_me = False
  93. tweet_source = ApiV2TweetSource(token)
  94. tweets_response = tweet_source.get_user_timeline(user_id,
  95. exclude_replies = exclude_replies,
  96. exclude_retweets = exclude_retweets,
  97. pagination_token = pagination_token,
  98. non_public_metrics = False,
  99. return_dataclass=True)
  100. tweets = None
  101. if not tweets_response:
  102. print('no response_json')
  103. if tweets_response.meta.result_count == 0:
  104. print('no results')
  105. if not tweets_response.includes:
  106. print(tweets_response)
  107. print('no tweets_response.includes')
  108. if tweets_response.errors:
  109. print('profile get_user_timeline errors:')
  110. print(tweets_response.errors)
  111. ts = int(time.time() * 1000)
  112. with open(f'{DATA_DIR}/cache/tl_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
  113. f.write(json.dumps(cleandict(asdict(tweets_response))))
  114. if tweets_response.data:
  115. tweets = list(map(lambda t: tweet_model_dc_vm(tweets_response.includes, t, me), tweets_response.data))
  116. next_token = tweets_response.meta.next_token
  117. collection_page = CollectionPage(
  118. id = user_id,
  119. items = tweets,
  120. next_token = next_token
  121. )
  122. return collection_page
  123. def get_tweets_collection (tweet_ids, pagination_token=None, max_results=None):
  124. """
  125. We might be able to have a generalizer in the content system as well...
  126. If a source exposes a get many interface then use it. We want to avoid many singular fetches.
  127. """
  128. return []
  129. def get_user (user_id, me=None):
  130. if me:
  131. twitter_user = session.get(me)
  132. token = twitter_user['access_token']
  133. else:
  134. token = os.environ.get('BEARER_TOKEN')
  135. social_graph = TwitterApiV2SocialGraph(token)
  136. users_response = social_graph.get_user(user_id, return_dataclass=True)
  137. print(users_response)
  138. if not len(users_response.data):
  139. return
  140. user = user_model_dc(users_response.data[0])
  141. return user
  142. def register_content_sources ():
  143. register_content_source('twitter:tweets', get_tweets_collection)
  144. register_content_source('twitter:tweet:', get_tweet_item, id_pattern='(?P<tweet_id>\d+)')
  145. register_content_source('twitter:bookmarks:', get_bookmarks_feed, id_pattern='(?P<user_id>\d+)')
  146. register_content_source('twitter:feed:user:', get_user_feed, id_pattern='(?P<user_id>\d+)')
  147. register_content_source('twitter:user:', get_user, id_pattern='(?P<user_id>\d+)')