twitter_v2_facade.py 37 KB


  1. from configparser import ConfigParser
  2. import base64
  3. from flask import json, Response, render_template, request, send_from_directory, Blueprint, session, redirect, g, current_app
  4. from flask_cors import CORS
  5. import sqlite3
  6. import os
  7. import json
  8. import json_stream
  9. from zipfile import ZipFile
  10. import itertools
  11. from io import BufferedReader
  12. from werkzeug.utils import secure_filename
  13. import datetime
  14. import dateutil
  15. import dateutil.parser
  16. import dateutil.tz
  17. import requests
  18. import hashlib
  19. import re
  20. from requests.auth import AuthBase, HTTPBasicAuth
  21. from requests_oauthlib import OAuth2Session
  22. from tweet_source import ApiV2TweetSource
  23. from flask import url_for as og_url_for
  24. app_access_token = None
  25. app_consumer_key = os.environ.get("TWITTER_CONSUMER_KEY")
  26. app_secret_key = os.environ.get("TWITTER_CONSUMER_SECRET")
  27. TWITTER_SCOPES = ["bookmark.read", "tweet.read", "tweet.write", "dm.read", "users.read", "offline.access"]
  28. twitter_app = Blueprint('twitter_v2_facade', 'twitter_v2_facade',
  29. static_folder='static',
  30. static_url_path='',
  31. url_prefix='/')
  32. def url_for_with_me (route, *args, **kwargs):
  33. #print('url_for_with_me')
  34. if route.endswith('.static'):
  35. return og_url_for(route, *args, **kwargs)
  36. return og_url_for(route, *args, **{'me': g.me, **kwargs})
  37. url_for = url_for_with_me
  38. @twitter_app.before_request
  39. def add_me ():
  40. g.me = request.args.get('me')
  41. #if me.startswith('twitter') and me in session:
  42. g.twitter_user = session.get(g.me)
  43. @twitter_app.context_processor
  44. def inject_me():
  45. return {'me': g.me, 'twitter_user': g.twitter_user, 'url_for': url_for_with_me}
  46. @twitter_app.route('/logout.html')
  47. def get_logout_html ():
  48. del session[g.me]
  49. return redirect('/')
  50. # def add_me(endpoint, values):
  51. ##values['me'] = request.args.get('me')
  52. # g.me = request.args.get('me')
  53. # twitter_app.url_value_preprocessor(add_me)
  54. @twitter_app.route('/tokens.html')
  55. def get_tokens_html ():
  56. return url_for('.get_tokens_html', me=g.me)
  57. @twitter_app.route('/logged-in.html')
  58. def get_loggedin_html ():
  59. client_id = os.environ.get('TWITTER_CLIENT_ID')
  60. client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
  61. code_verifier = session['twitter_code_verifier']
  62. code = request.args.get('code')
  63. state = request.args.get('state')
  64. endpoint_url = g.app_url
  65. redirect_uri = endpoint_url + og_url_for('.get_loggedin_html')
  66. authorization_response = redirect_uri + '?code={}&state={}'.format(code, state)
  67. # Fetch your access token
  68. token_url = "https://api.twitter.com/2/oauth2/token"
  69. # The following line of code will only work if you are using a type of App that is a public client
  70. auth = False
  71. # If you are using a confidential client you will need to pass in basic encoding of your client ID and client secret.
  72. # Please remove the comment on the following line if you are using a type of App that is a confidential client
  73. auth = HTTPBasicAuth(client_id, client_secret)
  74. scopes = TWITTER_SCOPES
  75. #redirect_uri = 'https://{}/api/logged-in'.format(os.environ.get('PROJECT_DOMAIN') + '.glitch.me')
  76. oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes)
  77. token = oauth.fetch_token(
  78. token_url=token_url,
  79. authorization_response=authorization_response,
  80. auth=auth,
  81. client_id=client_id,
  82. include_client_id=True,
  83. code_verifier=code_verifier,
  84. )
  85. # Your access token
  86. access = token["access_token"]
  87. refresh = token["refresh_token"]
  88. expires_at = token["expires_at"] # expires_in
  89. # Make a request to the users/me endpoint to get your user ID
  90. user_me = requests.request(
  91. "GET",
  92. "https://api.twitter.com/2/users/me",
  93. headers={"Authorization": "Bearer {}".format(access)},
  94. ).json()
  95. user_id = user_me["data"]["id"]
  96. user_username = user_me["data"]["username"]
  97. user_name = user_me["data"]["name"]
  98. del session['twitter_code_verifier']
  99. me = 'twitter:{}'.format(user_id)
  100. session[ me ] = {
  101. 'expires_at': expires_at,
  102. 'access_token': access,
  103. 'refresh_token': refresh,
  104. 'id': user_id,
  105. 'display_name': user_name,
  106. 'username': user_username
  107. }
  108. g.me = me
  109. g.twitter_user = session[ me ]
  110. return redirect(url_for('.get_timeline_home_html'))
  111. @twitter_app.route('/login.html')
  112. def get_login_html ():
  113. client_id = os.environ.get('TWITTER_CLIENT_ID')
  114. client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
  115. #redirect_uri = 'https://{}/api/logged-in'.format(os.environ.get('PROJECT_DOMAIN') + '.glitch.me')
  116. endpoint_url = g.app_url
  117. redirect_uri = endpoint_url + og_url_for('.get_loggedin_html')
  118. # Set the scopes
  119. scopes = TWITTER_SCOPES
  120. # Create a code verifier
  121. code_verifier = base64.urlsafe_b64encode(os.urandom(30)).decode("utf-8")
  122. code_verifier = re.sub("[^a-zA-Z0-9]+", "", code_verifier)
  123. # Create a code challenge
  124. code_challenge = hashlib.sha256(code_verifier.encode("utf-8")).digest()
  125. code_challenge = base64.urlsafe_b64encode(code_challenge).decode("utf-8")
  126. code_challenge = code_challenge.replace("=", "")
  127. # Start an OAuth 2.0 session
  128. oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes)
  129. # Create an authorize URL
  130. auth_url = "https://twitter.com/i/oauth2/authorize"
  131. authorization_url, state = oauth.authorization_url(
  132. auth_url, code_challenge=code_challenge, code_challenge_method="S256"
  133. )
  134. session['twitter_code_verifier'] = code_verifier
  135. return redirect(authorization_url)
  136. @twitter_app.route('/refresh-token', methods=['GET'])
  137. def get_twitter_refresh_token (response_format='json'):
  138. client_id = os.environ.get('TWITTER_CLIENT_ID')
  139. client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
  140. me = request.args.get('me')
  141. twitter = session.get(me)
  142. if not twitter:
  143. return redirect(url_for('.get_login_html'))
  144. token = twitter['refresh_token']
  145. basic_auth = base64.b64encode('{}:{}'.format(client_id, client_secret).encode('utf-8')).decode('utf-8')
  146. headers = {
  147. 'Authorization': 'Basic ' + basic_auth,
  148. }
  149. data = {
  150. 'refresh_token': token,
  151. 'grant_type': 'refresh_token'
  152. }
  153. response = requests.post('https://api.twitter.com/2/oauth2/token', data=data, headers=headers)
  154. result = json.loads(response.text)
  155. if 'access_token' in result:
  156. twitter['refresh_token'] = result['refresh_token']
  157. twitter['access_token'] = result['access_token']
  158. session[ me ] = twitter
  159. return response.text
  160. @twitter_app.route('/app/refresh-token', methods=['GET'])
  161. def get_twitter_app_refresh_token ():
  162. client_id = os.environ.get('TWITTER_CLIENT_ID')
  163. client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
  164. basic_auth = base64.b64encode('{}:{}'.format(app_consumer_key, app_secret_key).encode('utf-8')).decode('utf-8')
  165. headers = {
  166. 'Authorization': 'Basic ' + basic_auth,
  167. }
  168. data = {
  169. 'grant_type': 'client_credentials'
  170. }
  171. response = requests.post('https://api.twitter.com/oauth2/token', data=data, headers=headers)
  172. result = json.loads(response.text)
  173. if 'access_token' in result:
  174. app_access_token = result['access_token']
  175. return response.text
  176. @twitter_app.route('/', methods=['GET'])
  177. @twitter_app.route('/accounts.html', methods=['GET'])
  178. def get_loggedin_accounts_html ():
  179. twitter_accounts = dict(filter(lambda e: e[0].startswith('twitter:'), session.items()))
  180. return Response(json.dumps(twitter_accounts), mimetype='application/json')
  181. @twitter_app.route('/tweets/create', methods=['POST'])
  182. def post_tweets_create ():
  183. me = request.args.get('me')
  184. twitter = session.get(me)
  185. if not twitter:
  186. return redirect(url_for('.get_login_html'))
  187. user_id = twitter['id']
  188. token = twitter['access_token']
  189. text = request.form.get('text')
  190. reply_to_tweet_id = request.form.get('reply_to_tweet_id')
  191. quote_tweet_id = request.form.get('quote_tweet_id')
  192. tweet_source = ApiV2TweetSource(token)
  193. result = tweet_source.create_tweet(text, reply_to_tweet_id=reply_to_tweet_id, quote_tweet_id=quote_tweet_id)
  194. print(result)
  195. if 'HX-Request' in request.headers:
  196. return render_template('partial/compose-form.html', new_tweet_id=result['data']['id'])
  197. else:
  198. response_body = json.dumps({
  199. 'result': result
  200. })
  201. return Response(response_body, mimetype='application/json')
  202. @twitter_app.route('/data/timeline/user/<user_id>/counts')
  203. def get_data_timeline_user_counts (user_id):
  204. query = f'from:{user_id}'
  205. # is:reply is:quote is:retweet has:links has:mentions has:media has:images has:videos has:geo
  206. if not app_access_token:
  207. return 'refresh app token first.', 400
  208. tweet_source = ApiV2TweetSource(app_access_token)
  209. response_json = tweet_source.count_tweets(query)
  210. data = list(filter(lambda d: d.get('tweet_count') > 0, response_json.get('data')))
  211. result = {
  212. 'total_count': response_json.get('meta').get('total_tweet_count'),
  213. 'data': data
  214. }
  215. return Response(json.dumps(result), mimetype='application/json')
  216. # ---------------------------------------------------------------------------------------------------------
  217. # ---------------------------------------------------------------------------------------------------------
  218. # HTMx partials
  219. # ---------------------------------------------------------------------------------------------------------
  220. # ---------------------------------------------------------------------------------------------------------
  221. def tweet_model (includes, tweet_data, me):
  222. # retweeted_by, avi_icon_url, display_name, handle, created_at, text
  223. user = list(filter(lambda u: u.get('id') == tweet_data['author_id'], includes.get('users')))[0]
  224. source_url = 'https://twitter.com/{}/status/{}'.format(user['username'], tweet_data['id'])
  225. avi_icon_url = user['profile_image_url']
  226. retweet_of = None
  227. quoted = None
  228. if 'referenced_tweets' in tweet_data:
  229. retweet_of = list(filter(lambda r: r['type'] == 'retweeted', tweet_data['referenced_tweets']))
  230. quoted = list(filter(lambda r: r['type'] == 'quoted', tweet_data['referenced_tweets']))
  231. t = {
  232. 'id': tweet_data['id'],
  233. 'text': tweet_data['text'],
  234. 'created_at': tweet_data['created_at'],
  235. 'author_is_verified': user['verified'],
  236. 'conversation_id': tweet_data['conversation_id'],
  237. 'avi_icon_url': avi_icon_url,
  238. 'display_name': user['name'],
  239. 'handle': user['username'],
  240. 'author_url': url_for('.get_profile_html', user_id=user['id']),
  241. 'author_id': user['id'],
  242. 'source_url': source_url,
  243. 'source_author_url': 'https://twitter.com/{}'.format(user['username']),
  244. #'is_edited': len(tweet_data['edit_history_tweet_ids']) > 1
  245. }
  246. if 'entities' in tweet_data:
  247. if 'urls' in tweet_data['entities']:
  248. urls = list(filter(lambda u: 'title' in u and 'description' in u, tweet_data['entities']['urls']))
  249. if len(urls):
  250. url = urls[0]
  251. t['card'] = {
  252. 'display_url': url['display_url'].split('/')[0],
  253. 'source_url': url['unwound_url'],
  254. 'content': url['description'],
  255. 'title': url['title']
  256. }
  257. if 'public_metrics' in tweet_data:
  258. t['public_metrics'] = tweet_data['public_metrics']
  259. if 'non_public_metrics' in tweet_data:
  260. t['non_public_metrics'] = tweet_data['non_public_metrics']
  261. try:
  262. if 'attachments' in tweet_data and 'media_keys' in tweet_data['attachments']:
  263. media = list(map(lambda mk: list(filter(lambda m: m['media_key'] == mk, includes['media']))[0], tweet_data['attachments']['media_keys']))
  264. photos = list(filter(lambda m: m['type'] == 'photo', media))
  265. videos = list(filter(lambda m: m['type'] == 'video', media))
  266. photos = list(map(lambda p: {**p, 'preview_image_url': p['url'] + '?name=tiny&format=webp'}, photos))
  267. videos = list(map(lambda p: {**p, 'image_url': p['preview_image_url'], 'preview_image_url': p['preview_image_url'] + '?name=tiny&format=webp'}, videos))
  268. t['photos'] = photos
  269. t['videos'] = videos
  270. except:
  271. print('exception adding attachments to tweet.')
  272. if retweet_of and len(retweet_of):
  273. retweeted_tweet = list(filter(lambda t: t.get('id') == retweet_of[0]['id'], includes.get('tweets')))[0]
  274. t.update({
  275. 'source_retweeted_by_url': 'https://twitter.com/{}'.format(user['username']),
  276. 'retweeted_by': user['name'],
  277. 'retweeted_by_url': '/profile/{}.html'.format(user['id'])
  278. })
  279. rt = tweet_model(includes, retweeted_tweet, me)
  280. t.update(rt)
  281. try:
  282. if quoted and len(quoted):
  283. quoted_tweet = list(filter(lambda t: t.get('id') == quoted[0]['id'], includes.get('tweets')))[0]
  284. t['quoted_tweet'] = tweet_model(includes, quoted_tweet, me)
  285. except:
  286. print('error adding quoted tweet')
  287. return t
  288. def tweet_paginated_timeline ():
  289. return
  290. @twitter_app.route('/data/tweets', methods=['GET'])
  291. def get_twitter_tweets ():
  292. me = request.args.get('me')
  293. twitter = session.get(me)
  294. if not twitter:
  295. return redirect(url_for('.get_login_html'))
  296. user_id = twitter['id']
  297. token = twitter['access_token']
  298. ids = request.args.get('ids')
  299. max_id=''
  300. if ids:
  301. ids = ids.split(',')
  302. tweet_source = ApiV2TweetSource(token)
  303. response_json = tweet_source.get_tweets(ids)
  304. user = {
  305. 'id': user_id
  306. }
  307. query = {}
  308. if 'HX-Request' in request.headers:
  309. includes = response_json.get('includes')
  310. tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
  311. return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
  312. else:
  313. return Response(json.dumps(response_json), mimetype="application/json")
  314. @twitter_app.route('/data/timeline/home/<variant>', methods=['GET'])
  315. def get_data_timeline_home (variant):
  316. # retweeted_by, avi_icon_url, display_name, handle, created_at, text
  317. me = request.args.get('me')
  318. twitter = session.get(me)
  319. if not twitter:
  320. return redirect(url_for('.get_login_html'))
  321. user_id = twitter['id']
  322. token = twitter['access_token']
  323. pagination_token = request.args.get('pagination_token')
  324. tweet_source = ApiV2TweetSource(token)
  325. response_json = tweet_source.get_home_timeline(user_id,
  326. pagination_token = pagination_token)
  327. includes = response_json.get('includes')
  328. tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
  329. next_token = response_json.get('meta').get('next_token')
  330. query = {}
  331. if next_token:
  332. query = {
  333. **query,
  334. 'next_data_url': url_for('.get_data_timeline_home', variant=variant, pagination_token=next_token)
  335. }
  336. if 'HX-Request' in request.headers:
  337. user = {
  338. 'id': user_id
  339. }
  340. return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
  341. else:
  342. response_body = json.dumps({
  343. 'tweets': tweets,
  344. 'query': query
  345. })
  346. return Response(response_body, mimetype='application/json')
  347. @twitter_app.route('/data/mentions/<user_id>', methods=['GET'])
  348. def get_data_mentions (user_id):
  349. me = request.args.get('me')
  350. twitter = session.get(me)
  351. if not twitter:
  352. return redirect(url_for('.get_login_html'))
  353. token = twitter['access_token']
  354. pagination_token = request.args.get('pagination_token')
  355. tweet_source = ApiV2TweetSource(token)
  356. response_json = tweet_source.get_mentions_timeline(user_id,
  357. pagination_token = pagination_token)
  358. # the OG tweet is in the include.tweets collection.
  359. # All thread tweets are as well, clearly. Does it cost a fetch?
  360. #print(response_json)
  361. includes = response_json.get('includes')
  362. tweets = list(map(lambda t: tweet_model(includes, t, token), response_json['data']))
  363. related_tweets = [] # derived from includes
  364. tweets.reverse()
  365. next_token = response_json.get('meta').get('next_token')
  366. query = {}
  367. if next_token:
  368. query = {
  369. **query,
  370. 'next_data_url': '/twitter/data/mentions/{}?me={}&pagination_token={}'.format(user_id, me, next_token)
  371. }
  372. if 'HX-Request' in request.headers:
  373. user = {
  374. 'id': user_id
  375. }
  376. # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n"))
  377. return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query, me = me)
  378. else:
  379. response_body = json.dumps({
  380. 'tweets': tweets,
  381. 'pagination_token': pagination_token,
  382. 'next_token': next_token
  383. })
  384. return Response(response_body, mimetype='application/json')
  385. @twitter_app.route('/data/between/<user_id>/<user2_id>', methods=['GET'])
  386. def get_data_between (user_id, user2_id):
  387. me = request.args.get('me')
  388. twitter = session.get(me)
  389. if not twitter:
  390. return redirect(url_for('.get_login_html'))
  391. token = twitter['access_token']
  392. pagination_token = request.args.get('pagination_token')
  393. if user_id == 'me':
  394. user_id = twitter['id']
  395. if user2_id == 'me':
  396. user2_id = twitter['id']
  397. search_query = "(from:{} to:{}) OR (to:{} from:{})".format(user_id, user2_id, user_id, user2_id)
  398. tweet_source = ApiV2TweetSource(token)
  399. response_json = tweet_source.search_tweets(search_query,
  400. pagination_token = pagination_token)
  401. # the OG tweet is in the include.tweets collection.
  402. # All thread tweets are as well, clearly. Does it cost a fetch?
  403. #print(response_json)
  404. # augment with archive if one of the users is me
  405. # /twitter-archive/tweets/search?in_reply_to_user_id=__
  406. # /twitter-archive/tweets/search?q=@__
  407. tweets = []
  408. next_token = None
  409. if response_json.get('meta').get('result_count'):
  410. includes = response_json.get('includes')
  411. tweets = list(map(lambda t: tweet_model(includes, t, token), response_json['data']))
  412. related_tweets = [] # derived from includes
  413. next_token = response_json.get('meta').get('next_token')
  414. tweets.reverse()
  415. query = {}
  416. if next_token:
  417. query = {
  418. **query,
  419. 'next_data_url': '/twitter/data/mentions/{}?me={}&pagination_token={}'.format(user_id, me, next_token)
  420. }
  421. if 'HX-Request' in request.headers:
  422. user = {
  423. 'id': twitter['id']
  424. }
  425. # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n"))
  426. return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query, me = me)
  427. else:
  428. response_body = json.dumps({
  429. 'tweets': tweets,
  430. 'pagination_token': pagination_token,
  431. 'next_token': next_token
  432. })
  433. return Response(response_body, mimetype='application/json')
  434. @twitter_app.route('/data/thread/<tweet_id>', methods=['GET'])
  435. def get_data_thread (tweet_id):
  436. me = request.args.get('me')
  437. twitter = session.get(me)
  438. if not twitter:
  439. return redirect(url_for('.get_login_html'))
  440. user_id = twitter['id']
  441. token = twitter['access_token']
  442. pagination_token = request.args.get('pagination_token')
  443. tweet_source = ApiV2TweetSource(token)
  444. response_json = tweet_source.get_thread(tweet_id, author_id=user_id,
  445. pagination_token = pagination_token)
  446. # the OG tweet is in the include.tweets collection.
  447. # All thread tweets are as well, clearly. Does it cost a fetch?
  448. print(response_json)
  449. tweets = []
  450. next_token = None
  451. if response_json.get('meta').get('result_count'):
  452. includes = response_json.get('includes')
  453. tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
  454. # FIXME this method is OK except it doesn't work if there are no replies.
  455. #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me))
  456. #related_tweets = [] # derived from includes
  457. next_token = response_json.get('meta').get('next_token')
  458. if not pagination_token:
  459. response_json = tweet_source.get_tweet(tweet_id)
  460. print("parent tweet=")
  461. print(response_json)
  462. includes = response_json.get('includes')
  463. tweet = response_json.get('data')[0]
  464. tweets.append(tweet_model(includes, tweet, me))
  465. tweets.reverse()
  466. query = {}
  467. if next_token:
  468. query = {
  469. **query,
  470. 'next_data_url': '/twitter/data/thread/{}?me={}&pagination_token={}'.format(tweet_id, me, next_token)
  471. }
  472. if 'HX-Request' in request.headers:
  473. user = {
  474. 'id': user_id
  475. }
  476. # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n"))
  477. return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
  478. else:
  479. response_body = json.dumps({
  480. 'tweets': tweets,
  481. 'pagination_token': pagination_token,
  482. 'next_token': next_token
  483. })
  484. return Response(response_body, mimetype='application/json')
  485. @twitter_app.route('/data/conversation/<tweet_id>', methods=['GET'])
  486. def get_data_conversation (tweet_id):
  487. me = request.args.get('me')
  488. twitter = session.get(me)
  489. if not twitter:
  490. return redirect(url_for('.get_login_html'))
  491. user_id = twitter['id']
  492. token = twitter['access_token']
  493. pagination_token = request.args.get('pagination_token')
  494. tweet_source = ApiV2TweetSource(token)
  495. # seems to get l
  496. response_json = tweet_source.get_thread(tweet_id,
  497. pagination_token = pagination_token)
  498. # the OG tweet is in the include.tweets collection.
  499. # All thread tweets are as well, clearly. Does it cost a fetch?
  500. #print(response_json)
  501. tweets = []
  502. next_token = None
  503. print("conversation meta:")
  504. print(json.dumps(response_json.get('meta'), indent=2))
  505. if response_json.get('meta').get('result_count'):
  506. includes = response_json.get('includes')
  507. tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
  508. next_token = response_json.get('meta').get('next_token')
  509. # this method is OK except it doesn't work if there are no replies.
  510. #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me))
  511. if not pagination_token:
  512. response_json = tweet_source.get_tweet(tweet_id)
  513. print("parent tweet=")
  514. print(response_json)
  515. includes = response_json.get('includes')
  516. tweet = response_json.get('data')[0]
  517. tweets.append(tweet_model(includes, tweet, me))
  518. #related_tweets = [] # derived from includes
  519. tweets.reverse()
  520. query = {}
  521. if next_token:
  522. query = {
  523. **query,
  524. 'next_data_url': '/twitter/data/conversation/{}?me={}&pagination_token={}'.format(tweet_id, me, next_token)
  525. }
  526. if 'HX-Request' in request.headers:
  527. user = {
  528. 'id': user_id
  529. }
  530. # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n"))
  531. return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
  532. else:
  533. response_body = json.dumps({
  534. 'tweets': tweets,
  535. 'pagination_token': pagination_token,
  536. 'next_token': next_token
  537. })
  538. return Response(response_body, mimetype='application/json')
  539. @twitter_app.route('/data/timeline/user/<user_id>', methods=['GET'])
  540. def get_data_timeline_user (user_id ):
  541. me = request.args.get('me')
  542. twitter = session.get(me)
  543. if not twitter:
  544. return redirect(url_for('.get_login_html'))
  545. token = twitter['access_token']
  546. pagination_token = request.args.get('pagination_token')
  547. exclude_replies = request.args.get('exclude_replies')
  548. is_me = user_id == twitter['id']
  549. tweet_source = ApiV2TweetSource(token)
  550. response_json = tweet_source.get_user_timeline(user_id,
  551. pagination_token = pagination_token,
  552. non_public_metrics = is_me,
  553. exclude_replies = exclude_replies == '1')
  554. print(response_json)
  555. includes = response_json.get('includes')
  556. tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
  557. next_token = response_json.get('meta').get('next_token')
  558. query = {}
  559. if next_token:
  560. query = {
  561. **query,
  562. 'next_data_url': url_for('.get_data_timeline_user', user_id=user_id , pagination_token=next_token)
  563. }
  564. if 'HX-Request' in request.headers:
  565. user = {
  566. 'id': user_id
  567. }
  568. return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
  569. else:
  570. response_body = json.dumps({
  571. 'tweets': tweets,
  572. 'query': query
  573. })
  574. return Response(response_body, mimetype='application/json')
  575. @twitter_app.route('/data/bookmarks/<user_id>', methods=['GET'])
  576. def get_data_bookmarks (user_id):
  577. # retweeted_by, avi_icon_url, display_name, handle, created_at, text
  578. me = request.args.get('me')
  579. twitter = session.get(me)
  580. if not twitter:
  581. return redirect(url_for('.get_login_html'))
  582. token = twitter['access_token']
  583. pagination_token = request.args.get('pagination_token')
  584. tweet_source = ApiV2TweetSource(token)
  585. response_json = tweet_source.get_bookmarks(user_id,
  586. pagination_token = pagination_token)
  587. includes = response_json.get('includes')
  588. tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
  589. next_token = response_json.get('meta').get('next_token')
  590. query = {}
  591. if next_token:
  592. query = {
  593. **query,
  594. 'next_data_url': '/twitter/data/bookmarks/{}?me={}&pagination_token={}'.format(user_id, me, next_token),
  595. 'next_page_url': '?me={}&pagination_token={}'.format(me, next_token)
  596. }
  597. if 'HX-Request' in request.headers:
  598. user = {
  599. 'id': user_id
  600. }
  601. return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
  602. else:
  603. response_body = json.dumps({
  604. 'tweets': tweets,
  605. 'query': query
  606. })
  607. return Response(response_body, mimetype='application/json')
  608. # ---------------------------------------------------------------------------------------------------------
  609. # ---------------------------------------------------------------------------------------------------------
  610. # HTMx views
  611. # ---------------------------------------------------------------------------------------------------------
  612. # ---------------------------------------------------------------------------------------------------------
  613. @twitter_app.route('/latest.html', methods=['GET'])
  614. def get_timeline_home_html (variant = "reverse_chronological", pagination_token=None):
  615. # retweeted_by, avi_icon_url, display_name, handle, created_at, text
  616. me = request.args.get('me')
  617. twitter = session.get(me)
  618. if not twitter:
  619. return redirect(url_for('.get_login_html'))
  620. user_id = twitter['id']
  621. token = twitter['access_token']
  622. if not pagination_token:
  623. pagination_token = request.args.get('pagination_token')
  624. tweet_source = ApiV2TweetSource(token)
  625. response_json = tweet_source.get_home_timeline(user_id,
  626. pagination_token = pagination_token)
  627. print(json.dumps(response_json, indent=2))
  628. includes = response_json.get('includes')
  629. tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
  630. next_token = response_json.get('meta').get('next_token')
  631. query = {}
  632. if next_token:
  633. query = {
  634. **query,
  635. 'next_data_url': url_for('.get_data_timeline_home', variant=variant, pagination_token=next_token),
  636. 'next_page_url': url_for('.get_timeline_home_html', pagination_token=pagination_token)
  637. }
  638. user = {
  639. 'id': user_id
  640. }
  641. return render_template('tweet-collection.html', user = user, tweets = tweets, query = query)
  642. @twitter_app.route('/bookmarks.html', methods=['GET'])
  643. def get_bookmarks_html (user_id = None, pagination_token=None, token=None):
  644. # retweeted_by, avi_icon_url, display_name, handle, created_at, text
  645. me = request.args.get('me')
  646. twitter = session.get(me)
  647. if not twitter:
  648. return redirect(url_for('.get_login_html'))
  649. if not user_id:
  650. user_id = twitter['id']
  651. token = twitter['access_token']
  652. if not pagination_token:
  653. pagination_token = request.args.get('pagination_token')
  654. tweet_source = ApiV2TweetSource(token)
  655. response_json = tweet_source.get_bookmarks(user_id,
  656. pagination_token = pagination_token)
  657. print(response_json)
  658. includes = response_json.get('includes')
  659. tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
  660. next_token = response_json.get('meta').get('next_token')
  661. query = {}
  662. if next_token:
  663. query = {
  664. **query,
  665. 'next_data_url': url_for('.get_data_bookmarks', user_id=user_id, pagination_token=next_token),
  666. 'next_page_url': url_for('.get_bookmarks_html', user_id=user_id, pagination_token=pagination_token)
  667. }
  668. user = {
  669. 'id': user_id
  670. }
  671. return render_template('tweet-collection.html', user = user, tweets = tweets, query = query)
  672. @twitter_app.route('/conversations.html', methods=['GET'])
  673. def get_conversations_html ():
  674. me = request.args.get('me')
  675. twitter = session.get(me)
  676. if not twitter:
  677. return redirect(url_for('.get_login_html'))
  678. user_id = twitter['id']
  679. token = twitter['access_token']
  680. pagination_token = request.args.get('pagination_token')
  681. max_results = int(request.args.get('max_results', 10))
  682. # https://developer.twitter.com/en/docs/twitter-api/direct-messages/lookup/api-reference/get-dm_events
  683. url = "https://api.twitter.com/2/dm_events"
  684. params = {
  685. "dm_event.fields": "id,event_type,text,created_at,dm_conversation_id,sender_id,participant_ids,referenced_tweets,attachments",
  686. "expansions": ",".join(["sender_id", "participant_ids"]),
  687. "max_results": max_results,
  688. "user.fields": ",".join(["id", "created_at", "name", "username", "location", "profile_image_url", "url", "verified"])
  689. }
  690. if pagination_token:
  691. params['pagination_token'] = pagination_token
  692. headers = {"Authorization": "Bearer {}".format(token)}
  693. response = requests.get(url, params=params, headers=headers)
  694. response_json = json.loads(response.text)
  695. print(response_json)
  696. dm_events = response_json.get('data')
  697. next_token = response_json.get('meta').get('next_token')
  698. query = {
  699. 'pagination_token': pagination_token,
  700. 'next_token': next_token
  701. }
  702. user = {
  703. 'id': user_id
  704. }
  705. return render_template('conversations.html', user = user, dm_events = dm_events, query = query)
  706. @twitter_app.route('/profile/<user_id>.html', methods=['GET'])
  707. def get_profile_html (user_id):
  708. me = request.args.get('me')
  709. twitter = session.get(me)
  710. if not twitter:
  711. return redirect(url_for('.get_login_html'))
  712. token = twitter['access_token']
  713. is_me = user_id == twitter['id']
  714. pagination_token = request.args.get('pagination_token')
  715. exclude_replies = request.args.get('exclude_replies', '1')
  716. tweet_source = ApiV2TweetSource(token)
  717. response_json = tweet_source.get_user_timeline(user_id,
  718. exclude_replies = exclude_replies == '1',
  719. pagination_token = pagination_token,
  720. non_public_metrics = is_me)
  721. profile_links = []
  722. if user_id == "":
  723. profile_links += [
  724. {
  725. 'title': 'Mastodon',
  726. 'type': 'rss',
  727. 'url': 'https://mastodon.cloud/@ispoogedaily.rss'
  728. },
  729. {
  730. 'title': 'YouTube',
  731. 'type': 'rss',
  732. 'url': 'https://mastodon.cloud/@ispoogedaily.rss'
  733. },
  734. {
  735. 'title': 'Reddit',
  736. 'type': 'rss',
  737. 'url': 'https://mastodon.cloud/@ispoogedaily.rss'
  738. },
  739. {
  740. 'title': 'BedRSS',
  741. 'type': 'rss',
  742. 'url': 'https://mastodon.cloud/@ispoogedaily.rss'
  743. },
  744. {
  745. 'title': 'ispoogedaily',
  746. 'type': 'ig',
  747. 'url': 'https://mastodon.cloud/@ispoogedaily.rss'
  748. },
  749. {
  750. 'title': 'iSpooge Daily',
  751. 'type': 'yt',
  752. 'url': 'https://mastodon.cloud/@ispoogedaily.rss'
  753. },
  754. {
  755. 'title': 'US 555-555-5555',
  756. 'type': 'tel',
  757. 'url': 'tel:+15555555555'
  758. },
  759. {
  760. 'title': 'biz@example.com',
  761. 'type': 'email',
  762. 'url': 'mailto:biz@example.com?subject=hey'
  763. },
  764. ]
  765. includes = response_json.get('includes')
  766. tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
  767. next_token = response_json.get('meta').get('next_token')
  768. query = {}
  769. if next_token:
  770. query = {
  771. **query,
  772. 'next_data_url': url_for('.get_data_timeline_user', user_id=user_id, pagination_token=next_token, exclude_replies=1),
  773. 'next_page_url': url_for('.get_profile_html', user_id=user_id , pagination_token=next_token)
  774. }
  775. profile_user = {
  776. 'id': user_id
  777. }
  778. return render_template('user-profile.html', user = profile_user, tweets = tweets, query = query)
  779. @twitter_app.route('/media/upload', methods=['POST'])
  780. def post_media_upload ():
  781. me = request.args.get('me')
  782. twitter = session.get(me)
  783. if not twitter:
  784. return redirect(url_for('.get_login_html'))
  785. token = twitter['access_token']
  786. form = {
  787. 'media_category': 'tweet_image'
  788. }
  789. headers = {
  790. 'Authorization': 'Bearer {}'.format(token)
  791. }
  792. url = 'http://localhost:5004/twitter/fake-twitter/media/upload'
  793. #url = 'https://upload.twitter.com/1.1/media/upload.json' # .json
  794. upload_media = {}
  795. for e in request.files.items():
  796. media_name = e[0]
  797. f = e[1]
  798. print('.')
  799. files = {'media': [secure_filename(f.filename), BufferedReader(f), f.content_type]}
  800. response = requests.post(url, files=files, data=form, headers=headers)
  801. print(response.status_code)
  802. print(response.text)
  803. response_json = json.loads(response.text)
  804. upload_media[media_name] = response_json
  805. return Response(json.dumps({'upload_media': upload_media}), mimetype='application/json')
  806. @twitter_app.route('/fake-twitter/media/upload', methods=['POST'])
  807. def post_media_upload2 ():
  808. print(request.content_type)
  809. f = request.files.get('media')
  810. f.seek(0,2)
  811. media_size = f.tell()
  812. media = {
  813. #'_auth': request.headers.get('Authorization'),
  814. 'media_key': '3_{}'.format(secure_filename(f.filename)),
  815. 'media_id': secure_filename(f.filename),
  816. 'size': media_size,
  817. 'expires_after_secs': 86400,
  818. 'image': {
  819. 'image_type': f.content_type,
  820. 'w': 1,
  821. 'h': 1
  822. }
  823. }
  824. return Response(json.dumps(media), mimetype='application/json')