from configparser import ConfigParser import base64 from flask import json, Response, render_template, request, send_from_directory, Blueprint, session, redirect, g, current_app from flask_cors import CORS import sqlite3 import os import json import json_stream from zipfile import ZipFile import itertools from io import BufferedReader from werkzeug.utils import secure_filename import datetime import dateutil import dateutil.parser import dateutil.tz import requests import hashlib import re from requests.auth import AuthBase, HTTPBasicAuth from requests_oauthlib import OAuth2Session from tweet_source import ApiV2TweetSource from flask import url_for as og_url_for app_access_token = None app_consumer_key = os.environ.get("TWITTER_CONSUMER_KEY") app_secret_key = os.environ.get("TWITTER_CONSUMER_SECRET") TWITTER_SCOPES = ["bookmark.read", "tweet.read", "tweet.write", "dm.read", "users.read", "offline.access"] twitter_app = Blueprint('twitter_v2_facade', 'twitter_v2_facade', static_folder='static', static_url_path='', url_prefix='/') def url_for_with_me (route, *args, **kwargs): #print('url_for_with_me') if route.endswith('.static'): return og_url_for(route, *args, **kwargs) return og_url_for(route, *args, **{'me': g.me, **kwargs}) url_for = url_for_with_me @twitter_app.before_request def add_me (): g.me = request.args.get('me') #if me.startswith('twitter') and me in session: g.twitter_user = session.get(g.me) @twitter_app.context_processor def inject_me(): return {'me': g.me, 'twitter_user': g.twitter_user, 'url_for': url_for_with_me} @twitter_app.route('/logout.html') def get_logout_html (): del session[g.me] return redirect('/') # def add_me(endpoint, values): ##values['me'] = request.args.get('me') # g.me = request.args.get('me') # twitter_app.url_value_preprocessor(add_me) @twitter_app.route('/tokens.html') def get_tokens_html (): return url_for('.get_tokens_html', me=g.me) @twitter_app.route('/logged-in.html') def get_loggedin_html (): client_id = os.environ.get('TWITTER_CLIENT_ID') client_secret = os.environ.get('TWITTER_CLIENT_SECRET') code_verifier = session['twitter_code_verifier'] code = request.args.get('code') state = request.args.get('state') endpoint_url = g.app_url redirect_uri = endpoint_url + og_url_for('.get_loggedin_html') authorization_response = redirect_uri + '?code={}&state={}'.format(code, state) # Fetch your access token token_url = "https://api.twitter.com/2/oauth2/token" # The following line of code will only work if you are using a type of App that is a public client auth = False # If you are using a confidential client you will need to pass in basic encoding of your client ID and client secret. # Please remove the comment on the following line if you are using a type of App that is a confidential client auth = HTTPBasicAuth(client_id, client_secret) scopes = TWITTER_SCOPES #redirect_uri = 'https://{}/api/logged-in'.format(os.environ.get('PROJECT_DOMAIN') + '.glitch.me') oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes) token = oauth.fetch_token( token_url=token_url, authorization_response=authorization_response, auth=auth, client_id=client_id, include_client_id=True, code_verifier=code_verifier, ) # Your access token access = token["access_token"] refresh = token["refresh_token"] expires_at = token["expires_at"] # expires_in # Make a request to the users/me endpoint to get your user ID user_me = requests.request( "GET", "https://api.twitter.com/2/users/me", headers={"Authorization": "Bearer {}".format(access)}, ).json() user_id = user_me["data"]["id"] user_username = user_me["data"]["username"] user_name = user_me["data"]["name"] del session['twitter_code_verifier'] me = 'twitter:{}'.format(user_id) session[ me ] = { 'expires_at': expires_at, 'access_token': access, 'refresh_token': refresh, 'id': user_id, 'display_name': user_name, 'username': user_username } g.me = me g.twitter_user = session[ me ] return redirect(url_for('.get_timeline_home_html')) @twitter_app.route('/login.html') def get_login_html (): client_id = os.environ.get('TWITTER_CLIENT_ID') client_secret = os.environ.get('TWITTER_CLIENT_SECRET') #redirect_uri = 'https://{}/api/logged-in'.format(os.environ.get('PROJECT_DOMAIN') + '.glitch.me') endpoint_url = g.app_url redirect_uri = endpoint_url + og_url_for('.get_loggedin_html') # Set the scopes scopes = TWITTER_SCOPES # Create a code verifier code_verifier = base64.urlsafe_b64encode(os.urandom(30)).decode("utf-8") code_verifier = re.sub("[^a-zA-Z0-9]+", "", code_verifier) # Create a code challenge code_challenge = hashlib.sha256(code_verifier.encode("utf-8")).digest() code_challenge = base64.urlsafe_b64encode(code_challenge).decode("utf-8") code_challenge = code_challenge.replace("=", "") # Start an OAuth 2.0 session oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes) # Create an authorize URL auth_url = "https://twitter.com/i/oauth2/authorize" authorization_url, state = oauth.authorization_url( auth_url, code_challenge=code_challenge, code_challenge_method="S256" ) session['twitter_code_verifier'] = code_verifier return redirect(authorization_url) @twitter_app.route('/refresh-token', methods=['GET']) def get_twitter_refresh_token (response_format='json'): client_id = os.environ.get('TWITTER_CLIENT_ID') client_secret = os.environ.get('TWITTER_CLIENT_SECRET') me = request.args.get('me') twitter = session.get(me) if not twitter: return redirect(url_for('.get_login_html')) token = twitter['refresh_token'] basic_auth = base64.b64encode('{}:{}'.format(client_id, client_secret).encode('utf-8')).decode('utf-8') headers = { 'Authorization': 'Basic ' + basic_auth, } data = { 'refresh_token': token, 'grant_type': 'refresh_token' } response = requests.post('https://api.twitter.com/2/oauth2/token', data=data, headers=headers) result = json.loads(response.text) if 'access_token' in result: twitter['refresh_token'] = result['refresh_token'] twitter['access_token'] = result['access_token'] session[ me ] = twitter return response.text @twitter_app.route('/app/refresh-token', methods=['GET']) def get_twitter_app_refresh_token (): client_id = os.environ.get('TWITTER_CLIENT_ID') client_secret = os.environ.get('TWITTER_CLIENT_SECRET') basic_auth = base64.b64encode('{}:{}'.format(app_consumer_key, app_secret_key).encode('utf-8')).decode('utf-8') headers = { 'Authorization': 'Basic ' + basic_auth, } data = { 'grant_type': 'client_credentials' } response = requests.post('https://api.twitter.com/oauth2/token', data=data, headers=headers) result = json.loads(response.text) if 'access_token' in result: app_access_token = result['access_token'] return response.text @twitter_app.route('/', methods=['GET']) @twitter_app.route('/accounts.html', methods=['GET']) def get_loggedin_accounts_html (): twitter_accounts = dict(filter(lambda e: e[0].startswith('twitter:'), session.items())) return Response(json.dumps(twitter_accounts), mimetype='application/json') @twitter_app.route('/tweets/create', methods=['POST']) def post_tweets_create (): me = request.args.get('me') twitter = session.get(me) if not twitter: return redirect(url_for('.get_login_html')) user_id = twitter['id'] token = twitter['access_token'] text = request.form.get('text') reply_to_tweet_id = request.form.get('reply_to_tweet_id') quote_tweet_id = request.form.get('quote_tweet_id') tweet_source = ApiV2TweetSource(token) result = tweet_source.create_tweet(text, reply_to_tweet_id=reply_to_tweet_id, quote_tweet_id=quote_tweet_id) print(result) if 'HX-Request' in request.headers: return render_template('partial/compose-form.html', new_tweet_id=result['data']['id']) else: response_body = json.dumps({ 'result': result }) return Response(response_body, mimetype='application/json') @twitter_app.route('/data/timeline/user//counts') def get_data_timeline_user_counts (user_id): query = f'from:{user_id}' # is:reply is:quote is:retweet has:links has:mentions has:media has:images has:videos has:geo if not app_access_token: return 'refresh app token first.', 400 tweet_source = ApiV2TweetSource(app_access_token) response_json = tweet_source.count_tweets(query) data = list(filter(lambda d: d.get('tweet_count') > 0, response_json.get('data'))) result = { 'total_count': response_json.get('meta').get('total_tweet_count'), 'data': data } return Response(json.dumps(result), mimetype='application/json') # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- # HTMx partials # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- def tweet_model (includes, tweet_data, me): # retweeted_by, avi_icon_url, display_name, handle, created_at, text user = list(filter(lambda u: u.get('id') == tweet_data['author_id'], includes.get('users')))[0] source_url = 'https://twitter.com/{}/status/{}'.format(user['username'], tweet_data['id']) avi_icon_url = user['profile_image_url'] retweet_of = None quoted = None if 'referenced_tweets' in tweet_data: retweet_of = list(filter(lambda r: r['type'] == 'retweeted', tweet_data['referenced_tweets'])) quoted = list(filter(lambda r: r['type'] == 'quoted', tweet_data['referenced_tweets'])) t = { 'id': tweet_data['id'], 'text': tweet_data['text'], 'created_at': tweet_data['created_at'], 'author_is_verified': user['verified'], 'conversation_id': tweet_data['conversation_id'], 'avi_icon_url': avi_icon_url, 'display_name': user['name'], 'handle': user['username'], 'author_url': url_for('.get_profile_html', user_id=user['id']), 'author_id': user['id'], 'source_url': source_url, 'source_author_url': 'https://twitter.com/{}'.format(user['username']), #'is_edited': len(tweet_data['edit_history_tweet_ids']) > 1 } if 'entities' in tweet_data: if 'urls' in tweet_data['entities']: urls = list(filter(lambda u: 'title' in u and 'description' in u, tweet_data['entities']['urls'])) if len(urls): url = urls[0] t['card'] = { 'display_url': url['display_url'].split('/')[0], 'source_url': url['unwound_url'], 'content': url['description'], 'title': url['title'] } if 'public_metrics' in tweet_data: t['public_metrics'] = tweet_data['public_metrics'] if 'non_public_metrics' in tweet_data: t['non_public_metrics'] = tweet_data['non_public_metrics'] try: if 'attachments' in tweet_data and 'media_keys' in tweet_data['attachments']: media = list(map(lambda mk: list(filter(lambda m: m['media_key'] == mk, includes['media']))[0], tweet_data['attachments']['media_keys'])) photos = list(filter(lambda m: m['type'] == 'photo', media)) videos = list(filter(lambda m: m['type'] == 'video', media)) photos = list(map(lambda p: {**p, 'preview_image_url': p['url'] + '?name=tiny&format=webp'}, photos)) videos = list(map(lambda p: {**p, 'image_url': p['preview_image_url'], 'preview_image_url': p['preview_image_url'] + '?name=tiny&format=webp'}, videos)) t['photos'] = photos t['videos'] = videos except: print('exception adding attachments to tweet.') if retweet_of and len(retweet_of): retweeted_tweet = list(filter(lambda t: t.get('id') == retweet_of[0]['id'], includes.get('tweets')))[0] t.update({ 'source_retweeted_by_url': 'https://twitter.com/{}'.format(user['username']), 'retweeted_by': user['name'], 'retweeted_by_url': '/profile/{}.html'.format(user['id']) }) rt = tweet_model(includes, retweeted_tweet, me) t.update(rt) try: if quoted and len(quoted): quoted_tweet = list(filter(lambda t: t.get('id') == quoted[0]['id'], includes.get('tweets')))[0] t['quoted_tweet'] = tweet_model(includes, quoted_tweet, me) except: print('error adding quoted tweet') return t def tweet_paginated_timeline (): return @twitter_app.route('/data/tweets', methods=['GET']) def get_twitter_tweets (): me = request.args.get('me') twitter = session.get(me) if not twitter: return redirect(url_for('.get_login_html')) user_id = twitter['id'] token = twitter['access_token'] ids = request.args.get('ids') max_id='' if ids: ids = ids.split(',') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_tweets(ids) user = { 'id': user_id } query = {} if 'HX-Request' in request.headers: includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data'])) return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: return Response(json.dumps(response_json), mimetype="application/json") @twitter_app.route('/data/timeline/home/', methods=['GET']) def get_data_timeline_home (variant): # retweeted_by, avi_icon_url, display_name, handle, created_at, text me = request.args.get('me') twitter = session.get(me) if not twitter: return redirect(url_for('.get_login_html')) user_id = twitter['id'] token = twitter['access_token'] pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_home_timeline(user_id, pagination_token = pagination_token) includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data'])) next_token = response_json.get('meta').get('next_token') query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_data_timeline_home', variant=variant, pagination_token=next_token) } if 'HX-Request' in request.headers: user = { 'id': user_id } return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: response_body = json.dumps({ 'tweets': tweets, 'query': query }) return Response(response_body, mimetype='application/json') @twitter_app.route('/data/mentions/', methods=['GET']) def get_data_mentions (user_id): me = request.args.get('me') twitter = session.get(me) if not twitter: return redirect(url_for('.get_login_html')) token = twitter['access_token'] pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_mentions_timeline(user_id, pagination_token = pagination_token) # the OG tweet is in the include.tweets collection. # All thread tweets are as well, clearly. Does it cost a fetch? #print(response_json) includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, token), response_json['data'])) related_tweets = [] # derived from includes tweets.reverse() next_token = response_json.get('meta').get('next_token') query = {} if next_token: query = { **query, 'next_data_url': '/twitter/data/mentions/{}?me={}&pagination_token={}'.format(user_id, me, next_token) } if 'HX-Request' in request.headers: user = { 'id': user_id } # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n")) return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query, me = me) else: response_body = json.dumps({ 'tweets': tweets, 'pagination_token': pagination_token, 'next_token': next_token }) return Response(response_body, mimetype='application/json') @twitter_app.route('/data/between//', methods=['GET']) def get_data_between (user_id, user2_id): me = request.args.get('me') twitter = session.get(me) if not twitter: return redirect(url_for('.get_login_html')) token = twitter['access_token'] pagination_token = request.args.get('pagination_token') if user_id == 'me': user_id = twitter['id'] if user2_id == 'me': user2_id = twitter['id'] search_query = "(from:{} to:{}) OR (to:{} from:{})".format(user_id, user2_id, user_id, user2_id) tweet_source = ApiV2TweetSource(token) response_json = tweet_source.search_tweets(search_query, pagination_token = pagination_token) # the OG tweet is in the include.tweets collection. # All thread tweets are as well, clearly. Does it cost a fetch? #print(response_json) # augment with archive if one of the users is me # /twitter-archive/tweets/search?in_reply_to_user_id=__ # /twitter-archive/tweets/search?q=@__ tweets = [] next_token = None if response_json.get('meta').get('result_count'): includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, token), response_json['data'])) related_tweets = [] # derived from includes next_token = response_json.get('meta').get('next_token') tweets.reverse() query = {} if next_token: query = { **query, 'next_data_url': '/twitter/data/mentions/{}?me={}&pagination_token={}'.format(user_id, me, next_token) } if 'HX-Request' in request.headers: user = { 'id': twitter['id'] } # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n")) return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query, me = me) else: response_body = json.dumps({ 'tweets': tweets, 'pagination_token': pagination_token, 'next_token': next_token }) return Response(response_body, mimetype='application/json') @twitter_app.route('/data/thread/', methods=['GET']) def get_data_thread (tweet_id): me = request.args.get('me') twitter = session.get(me) if not twitter: return redirect(url_for('.get_login_html')) user_id = twitter['id'] token = twitter['access_token'] pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_thread(tweet_id, author_id=user_id, pagination_token = pagination_token) # the OG tweet is in the include.tweets collection. # All thread tweets are as well, clearly. Does it cost a fetch? print(response_json) tweets = [] next_token = None if response_json.get('meta').get('result_count'): includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data'])) # FIXME this method is OK except it doesn't work if there are no replies. #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me)) #related_tweets = [] # derived from includes next_token = response_json.get('meta').get('next_token') if not pagination_token: response_json = tweet_source.get_tweet(tweet_id) print("parent tweet=") print(response_json) includes = response_json.get('includes') tweet = response_json.get('data')[0] tweets.append(tweet_model(includes, tweet, me)) tweets.reverse() query = {} if next_token: query = { **query, 'next_data_url': '/twitter/data/thread/{}?me={}&pagination_token={}'.format(tweet_id, me, next_token) } if 'HX-Request' in request.headers: user = { 'id': user_id } # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n")) return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: response_body = json.dumps({ 'tweets': tweets, 'pagination_token': pagination_token, 'next_token': next_token }) return Response(response_body, mimetype='application/json') @twitter_app.route('/data/conversation/', methods=['GET']) def get_data_conversation (tweet_id): me = request.args.get('me') twitter = session.get(me) if not twitter: return redirect(url_for('.get_login_html')) user_id = twitter['id'] token = twitter['access_token'] pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) # seems to get l response_json = tweet_source.get_thread(tweet_id, pagination_token = pagination_token) # the OG tweet is in the include.tweets collection. # All thread tweets are as well, clearly. Does it cost a fetch? #print(response_json) tweets = [] next_token = None print("conversation meta:") print(json.dumps(response_json.get('meta'), indent=2)) if response_json.get('meta').get('result_count'): includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data'])) next_token = response_json.get('meta').get('next_token') # this method is OK except it doesn't work if there are no replies. #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me)) if not pagination_token: response_json = tweet_source.get_tweet(tweet_id) print("parent tweet=") print(response_json) includes = response_json.get('includes') tweet = response_json.get('data')[0] tweets.append(tweet_model(includes, tweet, me)) #related_tweets = [] # derived from includes tweets.reverse() query = {} if next_token: query = { **query, 'next_data_url': '/twitter/data/conversation/{}?me={}&pagination_token={}'.format(tweet_id, me, next_token) } if 'HX-Request' in request.headers: user = { 'id': user_id } # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n")) return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: response_body = json.dumps({ 'tweets': tweets, 'pagination_token': pagination_token, 'next_token': next_token }) return Response(response_body, mimetype='application/json') @twitter_app.route('/data/timeline/user/', methods=['GET']) def get_data_timeline_user (user_id ): me = request.args.get('me') twitter = session.get(me) if not twitter: return redirect(url_for('.get_login_html')) token = twitter['access_token'] pagination_token = request.args.get('pagination_token') exclude_replies = request.args.get('exclude_replies') is_me = user_id == twitter['id'] tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_user_timeline(user_id, pagination_token = pagination_token, non_public_metrics = is_me, exclude_replies = exclude_replies == '1') print(response_json) includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data'])) next_token = response_json.get('meta').get('next_token') query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_data_timeline_user', user_id=user_id , pagination_token=next_token) } if 'HX-Request' in request.headers: user = { 'id': user_id } return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: response_body = json.dumps({ 'tweets': tweets, 'query': query }) return Response(response_body, mimetype='application/json') @twitter_app.route('/data/bookmarks/', methods=['GET']) def get_data_bookmarks (user_id): # retweeted_by, avi_icon_url, display_name, handle, created_at, text me = request.args.get('me') twitter = session.get(me) if not twitter: return redirect(url_for('.get_login_html')) token = twitter['access_token'] pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_bookmarks(user_id, pagination_token = pagination_token) includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data'])) next_token = response_json.get('meta').get('next_token') query = {} if next_token: query = { **query, 'next_data_url': '/twitter/data/bookmarks/{}?me={}&pagination_token={}'.format(user_id, me, next_token), 'next_page_url': '?me={}&pagination_token={}'.format(me, next_token) } if 'HX-Request' in request.headers: user = { 'id': user_id } return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: response_body = json.dumps({ 'tweets': tweets, 'query': query }) return Response(response_body, mimetype='application/json') # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- # HTMx views # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- @twitter_app.route('/latest.html', methods=['GET']) def get_timeline_home_html (variant = "reverse_chronological", pagination_token=None): # retweeted_by, avi_icon_url, display_name, handle, created_at, text me = request.args.get('me') twitter = session.get(me) if not twitter: return redirect(url_for('.get_login_html')) user_id = twitter['id'] token = twitter['access_token'] if not pagination_token: pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_home_timeline(user_id, pagination_token = pagination_token) print(json.dumps(response_json, indent=2)) includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data'])) next_token = response_json.get('meta').get('next_token') query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_data_timeline_home', variant=variant, pagination_token=next_token), 'next_page_url': url_for('.get_timeline_home_html', pagination_token=pagination_token) } user = { 'id': user_id } return render_template('tweet-collection.html', user = user, tweets = tweets, query = query) @twitter_app.route('/bookmarks.html', methods=['GET']) def get_bookmarks_html (user_id = None, pagination_token=None, token=None): # retweeted_by, avi_icon_url, display_name, handle, created_at, text me = request.args.get('me') twitter = session.get(me) if not twitter: return redirect(url_for('.get_login_html')) if not user_id: user_id = twitter['id'] token = twitter['access_token'] if not pagination_token: pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_bookmarks(user_id, pagination_token = pagination_token) print(response_json) includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data'])) next_token = response_json.get('meta').get('next_token') query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_data_bookmarks', user_id=user_id, pagination_token=next_token), 'next_page_url': url_for('.get_bookmarks_html', user_id=user_id, pagination_token=pagination_token) } user = { 'id': user_id } return render_template('tweet-collection.html', user = user, tweets = tweets, query = query) @twitter_app.route('/conversations.html', methods=['GET']) def get_conversations_html (): me = request.args.get('me') twitter = session.get(me) if not twitter: return redirect(url_for('.get_login_html')) user_id = twitter['id'] token = twitter['access_token'] pagination_token = request.args.get('pagination_token') max_results = int(request.args.get('max_results', 10)) # https://developer.twitter.com/en/docs/twitter-api/direct-messages/lookup/api-reference/get-dm_events url = "https://api.twitter.com/2/dm_events" params = { "dm_event.fields": "id,event_type,text,created_at,dm_conversation_id,sender_id,participant_ids,referenced_tweets,attachments", "expansions": ",".join(["sender_id", "participant_ids"]), "max_results": max_results, "user.fields": ",".join(["id", "created_at", "name", "username", "location", "profile_image_url", "url", "verified"]) } if pagination_token: params['pagination_token'] = pagination_token headers = {"Authorization": "Bearer {}".format(token)} response = requests.get(url, params=params, headers=headers) response_json = json.loads(response.text) print(response_json) dm_events = response_json.get('data') next_token = response_json.get('meta').get('next_token') query = { 'pagination_token': pagination_token, 'next_token': next_token } user = { 'id': user_id } return render_template('conversations.html', user = user, dm_events = dm_events, query = query) @twitter_app.route('/profile/.html', methods=['GET']) def get_profile_html (user_id): me = request.args.get('me') twitter = session.get(me) if not twitter: return redirect(url_for('.get_login_html')) token = twitter['access_token'] is_me = user_id == twitter['id'] pagination_token = request.args.get('pagination_token') exclude_replies = request.args.get('exclude_replies', '1') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_user_timeline(user_id, exclude_replies = exclude_replies == '1', pagination_token = pagination_token, non_public_metrics = is_me) profile_links = [] if user_id == "": profile_links += [ { 'title': 'Mastodon', 'type': 'rss', 'url': 'https://mastodon.cloud/@ispoogedaily.rss' }, { 'title': 'YouTube', 'type': 'rss', 'url': 'https://mastodon.cloud/@ispoogedaily.rss' }, { 'title': 'Reddit', 'type': 'rss', 'url': 'https://mastodon.cloud/@ispoogedaily.rss' }, { 'title': 'BedRSS', 'type': 'rss', 'url': 'https://mastodon.cloud/@ispoogedaily.rss' }, { 'title': 'ispoogedaily', 'type': 'ig', 'url': 'https://mastodon.cloud/@ispoogedaily.rss' }, { 'title': 'iSpooge Daily', 'type': 'yt', 'url': 'https://mastodon.cloud/@ispoogedaily.rss' }, { 'title': 'US 555-555-5555', 'type': 'tel', 'url': 'tel:+15555555555' }, { 'title': 'biz@example.com', 'type': 'email', 'url': 'mailto:biz@example.com?subject=hey' }, ] includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data'])) next_token = response_json.get('meta').get('next_token') query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_data_timeline_user', user_id=user_id, pagination_token=next_token, exclude_replies=1), 'next_page_url': url_for('.get_profile_html', user_id=user_id , pagination_token=next_token) } profile_user = { 'id': user_id } return render_template('user-profile.html', user = profile_user, tweets = tweets, query = query) @twitter_app.route('/media/upload', methods=['POST']) def post_media_upload (): me = request.args.get('me') twitter = session.get(me) if not twitter: return redirect(url_for('.get_login_html')) token = twitter['access_token'] form = { 'media_category': 'tweet_image' } headers = { 'Authorization': 'Bearer {}'.format(token) } url = 'http://localhost:5004/twitter/fake-twitter/media/upload' #url = 'https://upload.twitter.com/1.1/media/upload.json' # .json upload_media = {} for e in request.files.items(): media_name = e[0] f = e[1] print('.') files = {'media': [secure_filename(f.filename), BufferedReader(f), f.content_type]} response = requests.post(url, files=files, data=form, headers=headers) print(response.status_code) print(response.text) response_json = json.loads(response.text) upload_media[media_name] = response_json return Response(json.dumps({'upload_media': upload_media}), mimetype='application/json') @twitter_app.route('/fake-twitter/media/upload', methods=['POST']) def post_media_upload2 (): print(request.content_type) f = request.files.get('media') f.seek(0,2) media_size = f.tell() media = { #'_auth': request.headers.get('Authorization'), 'media_key': '3_{}'.format(secure_filename(f.filename)), 'media_id': secure_filename(f.filename), 'size': media_size, 'expires_after_secs': 86400, 'image': { 'image_type': f.content_type, 'w': 1, 'h': 1 } } return Response(json.dumps(media), mimetype='application/json')