mastodon_facade.py 14 KB


  1. from configparser import ConfigParser
  2. import base64
  3. from flask import json, Response, render_template, request, send_from_directory, Blueprint, url_for, session, redirect, g
  4. from flask_cors import CORS
  5. import sqlite3
  6. import os
  7. import json
  8. import json_stream
  9. from zipfile import ZipFile
  10. import itertools
  11. from urllib.parse import urlencode as urlencode_qs
  12. import datetime
  13. import dateutil
  14. import dateutil.parser
  15. import dateutil.tz
  16. import requests
  17. from mastodon_source import MastodonAPSource
  18. from mastodon_v2_types import Status
  19. from view_model import FeedItem, FeedItemAction, PublicMetrics
  20. DATA_DIR='.data'
  21. twitter_app = Blueprint('mastodon_facade', 'mastodon_facade',
  22. static_folder='static',
  23. static_url_path='',
  24. url_prefix='/')
  25. @twitter_app.before_request
  26. def add_module_nav_to_template_context ():
  27. me = request.args.get('me')
  28. if me and me.startswith('mastodon:'):
  29. youtube_user = session[ me ]
  30. g.module_nav = [
  31. dict(
  32. href = url_for('.get_timeline_home_html', me=me),
  33. label = 'Public Timeline',
  34. order = 100
  35. ),
  36. dict(
  37. href = url_for('.get_bookmarks_html', me=me),
  38. label = 'Bookmarks',
  39. order = 200
  40. )
  41. ]
  42. def mastodon_model_dc_vm (post_data: Status) -> FeedItem:
  43. """
  44. This is the method we should use. The others should be refactored out.
  45. """
  46. print(post_data)
  47. user = post_data.account
  48. source_url = post_data.url
  49. avi_icon_url = user.avatar
  50. url = url_for('mastodon_facade.get_tweet_html', tweet_id=post_data.id)
  51. actions = {
  52. 'bookmark': FeedItemAction('mastodon_facade.post_tweet_bookmark', {'tweet_id': post_data.id}),
  53. 'delete_bookmark': FeedItemAction('mastodon_facade.delete_tweet_bookmark', {'tweet_id': post_data.id}),
  54. 'retweet': FeedItemAction('mastodon_facade.post_tweet_retweet', {'tweet_id': post_data.id})
  55. }
  56. t = FeedItem(
  57. id = post_data.id,
  58. # hugely not a fan of allowing the server's HTML out. can we sanitize it ourselves?
  59. html = post_data.content,
  60. url = url,
  61. source_url = source_url,
  62. created_at = post_data.created_at,
  63. avi_icon_url = avi_icon_url,
  64. display_name = user.display_name,
  65. handle = user.acct,
  66. author_url = url_for('mastodon_facade.get_profile_html', user_id = user.id, me='mastodon:mastodon.cloud:109271381872332822'),
  67. source_author_url = user.url,
  68. public_metrics = PublicMetrics(
  69. reply_count = post_data.replies_count,
  70. retweet_count = post_data.reblogs_count,
  71. like_count = post_data.favourites_count
  72. ),
  73. actions = actions,
  74. debug_source_data = post_data
  75. )
  76. return t
  77. def register_app (instance):
  78. endpoint_url = g.app_url
  79. redirect_uri = endpoint_url + og_url_for('.get_loggedin_html')
  80. client_name = os.environ.get('MASTODON_CLIENT_NAME')
  81. url = f'https://{instance}/api/v1/apps'
  82. params = {
  83. 'client_name': client_name,
  84. 'redirect_uris': redirect_uri,
  85. 'scopes': 'read write'
  86. }
  87. resp = requests.post(url, params=params)
  88. with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'wt') as f:
  89. f.write(resp.text)
  90. return resp
  91. @twitter_app.get('/api/app/<instance>/register')
  92. def post_api_app_instance_register (instance):
  93. if not instance:
  94. return 'pass isntance= in request params', 400
  95. resp = register_app(instance)
  96. return resp.text, resp.status_code
  97. @twitter_app.get('/api/app/<instance>')
  98. def get_api_app_instance (instance):
  99. with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'rt') as f:
  100. return Response(f.read(), mimetype='application/json')
  101. @twitter_app.get('/login.html')
  102. def get_login_html ():
  103. instance = request.args.get('instance')
  104. force_login = request.args.get('force_login')
  105. if not instance:
  106. return 'provide instance= in query string.', 400
  107. if not os.path.exists(f'{DATA_DIR}/mastodon-client_{instance}.json'):
  108. resp = register_app(instance)
  109. print(resp)
  110. with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'rt') as f:
  111. app_info = json.loads(f.read())
  112. params = {
  113. 'client_id': app_info['client_id'],
  114. 'redirect_uri': app_info['redirect_uri'],
  115. 'scope': 'read write',
  116. 'response_type': 'code'
  117. }
  118. if force_login:
  119. params['force_login'] = force_login
  120. url = f'https://{instance}/oauth/authorize?' + urlencode_qs(params)
  121. return redirect(url)
  122. @twitter_app.get('/logged-in.html')
  123. def get_logged_in_html ():
  124. code = request.args.get('code')
  125. instance = request.args.get('instance', 'mastodon.cloud')
  126. if not instance:
  127. return 'provide instance= in query string.', 400
  128. with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'rt') as f:
  129. app_info = json.loads(f.read())
  130. params = {
  131. 'client_id': app_info['client_id'],
  132. 'client_secret': app_info['client_secret'],
  133. 'redirect_uri': app_info['redirect_uri'],
  134. 'scope': 'read write',
  135. 'grant_type': 'authorization_code',
  136. 'code': code,
  137. # force_login: True
  138. }
  139. url = f'https://{instance}/oauth/token'
  140. resp = requests.post(url, data=params)
  141. auth_info = json.loads(resp.text)
  142. #auth_info = {"access_token":"6C6-hD2_OK1vDFc_WcPQnZC5KL0jOUePgQgMQELeV0k","token_type":"Bearer","scope":"read write","created_at":1670088545}
  143. access_token = auth_info['access_token']
  144. url = f'https://{instance}/api/v1/accounts/verify_credentials'
  145. headers = {
  146. 'Authorization': f'Bearer {access_token}'
  147. }
  148. resp = requests.get(url, headers=headers)
  149. mastodon_user = json.loads(resp.text)
  150. me = f'mastodon:{instance}:{mastodon_user["id"]}'
  151. session_info = {
  152. **auth_info,
  153. 'instance': instance,
  154. 'id': mastodon_user['id'],
  155. 'acct': mastodon_user['acct'],
  156. 'username': mastodon_user['username'],
  157. 'display_name': mastodon_user['display_name'],
  158. 'source_url': mastodon_user['url'],
  159. 'created_at': mastodon_user['created_at'],
  160. }
  161. session[me] = session_info
  162. return redirect(url_for('.get_timeline_home_html', me=me))
  163. #return resp.text, resp.status_code
  164. @twitter_app.post('/data/statuses')
  165. def post_tweets_create ():
  166. me = request.args.get('me')
  167. mastodon_user = session[me]
  168. instance = mastodon_user['instance']
  169. access_token = mastodon_user["access_token"]
  170. text = request.form.get('text')
  171. in_reply_to_id = request.form.get('reply_to_tweet_id')
  172. headers = {
  173. 'Authorization': f'Bearer {access_token}'
  174. }
  175. params = {
  176. 'text': text
  177. }
  178. if in_reply_to_id:
  179. params['in_reply_to_id'] = in_reply_to_id
  180. url = f'https://{instance}/api/v1/statuses/{tweet_id}/bookmark'
  181. resp = requests.post(url, data=params, headers=headers)
  182. new_status = json.loads(resp.text)
  183. new_tweet_id=new_status['id']
  184. if 'HX-Request' in request.headers:
  185. return render_template('partial/compose-form.html', new_tweet_id=new_tweet_id, me=me)
  186. else:
  187. return resp.text, resp.status_code
  188. @twitter_app.post('/data/status/<tweet_id>/retweet')
  189. def post_tweet_retweet (tweet_id):
  190. me = request.args.get('me')
  191. mastodon_user = session[me]
  192. instance = mastodon_user['instance']
  193. access_token = mastodon_user["access_token"]
  194. headers = {
  195. 'Authorization': f'Bearer {access_token}'
  196. }
  197. url = f'https://{instance}/api/v1/statuses/{tweet_id}/reblog'
  198. resp = requests.post(url, headers=headers)
  199. # new_status = json.loads(resp.text)
  200. # new_tweet_id=new_status['id']
  201. ## old status is in reblog: {id:} property.
  202. # if 'HX-Request' in request.headers:
  203. # return 'retweeted, new ID: ' + new_tweet_id
  204. # else:
  205. return resp.text, resp.status_code
  206. @twitter_app.get('/status/<tweet_id>.html')
  207. @twitter_app.get('/statuses.html')
  208. def get_tweet_html (tweet_id = None):
  209. if not tweet_id:
  210. ids = request.args.get('ids').split(',')
  211. return f'tweets: {ids}'
  212. else:
  213. return 'tweet: {tweet_id}'
  214. @twitter_app.get('/profile/<user_id>.html')
  215. def get_profile_html (user_id):
  216. me = request.args.get('me')
  217. mastodon_user = session.get(me)
  218. max_id = request.args.get('max_id')
  219. instance = request.args.get('instance') or mastodon_user['instance']
  220. access_token = mastodon_user and mastodon_user.get('access_token')
  221. print(instance)
  222. headers = {}
  223. if access_token:
  224. headers.update({
  225. 'Authorization': f'Bearer {access_token}'
  226. })
  227. mastodon_source = MastodonAPSource(f'https://{instance}', access_token)
  228. params = {}
  229. try:
  230. # if the UID isn't numeric then we'll fallback to string.
  231. url = f'https://{instance}/api/v1/accounts/{int(user_id)}'
  232. except:
  233. url = f'https://{instance}/api/v1/accounts/lookup'
  234. params = {
  235. 'acct': user_id
  236. }
  237. resp = requests.get(url, params=params, headers=headers)
  238. account_info = json.loads(resp.text)
  239. user_id = account_info["id"]
  240. tweets = mastodon_source.get_user_statuses(user_id, max_id=max_id, return_dataclasses=True)
  241. #tweets = tweet_source.get_timeline("public", timeline_params)
  242. max_id = tweets[-1].id if len(tweets) else ''
  243. tweets = list(map(mastodon_model_dc_vm, tweets))
  244. print("max_id = " + max_id)
  245. query = {}
  246. if len(tweets):
  247. query = {
  248. #'next_data_hx_select': '#tweets',
  249. 'next_data_url': url_for('.get_profile_html', user_id=user_id, me=me, max_id=max_id),
  250. 'next_page_url': url_for('.get_profile_html', user_id=user_id, me=me, max_id=max_id)
  251. }
  252. user = {
  253. 'id': user_id
  254. }
  255. #return Response(response_json, mimetype='application/json')
  256. if 'HX-Request' in request.headers:
  257. return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query,
  258. me=me, mastodon_user=mastodon_user)
  259. else:
  260. return render_template('user-profile.html', user = user, tweets = tweets, query = query,
  261. me=me, mastodon_user=mastodon_user)
  262. @twitter_app.route('/latest.html', methods=['GET'])
  263. def get_timeline_home_html (variant = "reverse_chronological"):
  264. me = request.args.get('me')
  265. mastodon_user = session[me]
  266. instance = mastodon_user.get('instance')
  267. token = mastodon_user.get('access_token')
  268. max_id = request.args.get('max_id')
  269. tweet_source = MastodonAPSource(f"https://{instance}", token)
  270. timeline_params = {
  271. 'local': True
  272. }
  273. if max_id:
  274. timeline_params['max_id'] = max_id
  275. tweets = tweet_source.get_timeline("public", timeline_params, return_dataclasses=True)
  276. max_id = tweets[-1].id if len(tweets) else ''
  277. tweets = list(map(mastodon_model_dc_vm, tweets))
  278. print("max_id = " + max_id)
  279. query = {}
  280. if len(tweets):
  281. query = {
  282. 'next_data_url': url_for('.get_timeline_home_html', max_id=max_id, me=me),
  283. 'next_page_url': url_for('.get_timeline_home_html', max_id=max_id, me=me)
  284. }
  285. user = {}
  286. #return Response(response_json, mimetype='application/json')
  287. return render_template('tweet-collection.html', user = user, tweets = tweets, query = query,
  288. me=me, mastodon_user=mastodon_user)
  289. @twitter_app.get('/bookmarks.html')
  290. def get_bookmarks_html ():
  291. me = request.args.get('me')
  292. mastodon_user = session[me]
  293. instance = mastodon_user.get('instance')
  294. token = mastodon_user.get('access_token')
  295. max_id = request.args.get('max_id')
  296. tweet_source = MastodonAPSource(f"https://{instance}", token)
  297. tweets = tweet_source.get_bookmarks(max_id=max_id, return_dataclasses=True)
  298. #tweets = tweet_source.get_timeline("public", timeline_params)
  299. max_id = tweets[-1].id if len(tweets) else ''
  300. tweets = list(map(mastodon_model_dc_vm, tweets))
  301. print("max_id = " + max_id)
  302. query = {}
  303. if len(tweets):
  304. query = {
  305. 'next_data_url': url_for('.get_bookmarks_html', me=me, max_id=max_id),
  306. 'next_page_url': url_for('.get_bookmarks_html', me=me, max_id=max_id)
  307. }
  308. user = {}
  309. #return Response(response_json, mimetype='application/json')
  310. return render_template('tweet-collection.html', user = user, tweets = tweets, query = query,
  311. me=me, mastodon_user=mastodon_user)
  312. @twitter_app.post('/data/bookmarks/<tweet_id>')
  313. def post_tweet_bookmark (tweet_id):
  314. me = request.args.get('me')
  315. mastodon_user = session[me]
  316. instance = mastodon_user['instance']
  317. access_token = mastodon_user["access_token"]
  318. max_id = request.args.get('max_id')
  319. headers = {
  320. 'Authorization': f'Bearer {access_token}'
  321. }
  322. params = {}
  323. url = f'https://{instance}/api/v1/statuses/{tweet_id}/bookmark'
  324. resp = requests.post(url, headers=headers)
  325. return resp.text, resp.status_code
  326. @twitter_app.delete('/data/bookmarks/<tweet_id>')
  327. def delete_tweet_bookmark (tweet_id):
  328. me = request.args.get('me')
  329. mastodon_user = session[me]
  330. instance = mastodon_user['instance']
  331. access_token = mastodon_user["access_token"]
  332. max_id = request.args.get('max_id')
  333. headers = {
  334. 'Authorization': f'Bearer {access_token}'
  335. }
  336. params = {}
  337. url = f'https://{instance}/api/v1/statuses/{tweet_id}/unbookmark'
  338. resp = requests.post(url, headers=headers)
  339. return resp.text, resp.status_code