123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251 |
- from configparser import ConfigParser
- import base64
- from flask import json, Response, render_template, request, send_from_directory, Blueprint, session, redirect, g, current_app
- from flask_cors import CORS
- import sqlite3
- import os
- import json
- import json_stream
- from zipfile import ZipFile
- import itertools
- from io import BufferedReader
- from werkzeug.utils import secure_filename
- import datetime
- import dateutil
- import dateutil.parser
- import dateutil.tz
- import requests
- import hashlib
- import re
- from requests.auth import AuthBase, HTTPBasicAuth
- from requests_oauthlib import OAuth2Session
- from tweet_source import ApiV2TweetSource
- from flask import url_for as og_url_for
- app_access_token = None
- app_consumer_key = os.environ.get("TWITTER_CONSUMER_KEY")
- app_secret_key = os.environ.get("TWITTER_CONSUMER_SECRET")
- TWITTER_SCOPES = ["bookmark.read", "tweet.read", "tweet.write", "dm.read", "users.read", "offline.access"]
- twitter_app = Blueprint('twitter_v2_facade', 'twitter_v2_facade',
- static_folder='static',
- static_url_path='',
- url_prefix='/')
- def url_for_with_me (route, *args, **kwargs):
- #print('url_for_with_me')
- if route.endswith('.static'):
- return og_url_for(route, *args, **kwargs)
-
- return og_url_for(route, *args, **{'me': g.me, **kwargs})
- url_for = url_for_with_me
- @twitter_app.before_request
- def add_me ():
- g.me = request.args.get('me')
-
- #if me.startswith('twitter') and me in session:
- g.twitter_user = session.get(g.me)
-
- @twitter_app.context_processor
- def inject_me():
-
- return {'me': g.me, 'twitter_user': g.twitter_user, 'url_for': url_for_with_me}
-
- @twitter_app.route('/logout.html')
- def get_logout_html ():
- del session[g.me]
- return redirect('/')
-
-
- # def add_me(endpoint, values):
- ##values['me'] = request.args.get('me')
- # g.me = request.args.get('me')
- # twitter_app.url_value_preprocessor(add_me)
- @twitter_app.route('/tokens.html')
- def get_tokens_html ():
- return url_for('.get_tokens_html', me=g.me)
-
- @twitter_app.route('/logged-in.html')
- def get_loggedin_html ():
- client_id = os.environ.get('TWITTER_CLIENT_ID')
- client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
-
- code_verifier = session['twitter_code_verifier']
-
- code = request.args.get('code')
- state = request.args.get('state')
-
- endpoint_url = g.app_url
- redirect_uri = endpoint_url + og_url_for('.get_loggedin_html')
-
- authorization_response = redirect_uri + '?code={}&state={}'.format(code, state)
-
- # Fetch your access token
- token_url = "https://api.twitter.com/2/oauth2/token"
-
-
- # The following line of code will only work if you are using a type of App that is a public client
- auth = False
- # If you are using a confidential client you will need to pass in basic encoding of your client ID and client secret.
- # Please remove the comment on the following line if you are using a type of App that is a confidential client
- auth = HTTPBasicAuth(client_id, client_secret)
-
- scopes = TWITTER_SCOPES
- #redirect_uri = 'https://{}/api/logged-in'.format(os.environ.get('PROJECT_DOMAIN') + '.glitch.me')
- oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes)
-
- token = oauth.fetch_token(
- token_url=token_url,
- authorization_response=authorization_response,
- auth=auth,
- client_id=client_id,
- include_client_id=True,
- code_verifier=code_verifier,
- )
- # Your access token
- access = token["access_token"]
- refresh = token["refresh_token"]
- expires_at = token["expires_at"] # expires_in
- # Make a request to the users/me endpoint to get your user ID
- user_me = requests.request(
- "GET",
- "https://api.twitter.com/2/users/me",
- headers={"Authorization": "Bearer {}".format(access)},
- ).json()
- user_id = user_me["data"]["id"]
- user_username = user_me["data"]["username"]
- user_name = user_me["data"]["name"]
-
- del session['twitter_code_verifier']
-
- me = 'twitter:{}'.format(user_id)
-
- session[ me ] = {
- 'expires_at': expires_at,
- 'access_token': access,
- 'refresh_token': refresh,
- 'id': user_id,
- 'display_name': user_name,
- 'username': user_username
- }
-
- g.me = me
- g.twitter_user = session[ me ]
-
- return redirect(url_for('.get_timeline_home_html'))
-
- @twitter_app.route('/login.html')
- def get_login_html ():
- client_id = os.environ.get('TWITTER_CLIENT_ID')
- client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
-
- #redirect_uri = 'https://{}/api/logged-in'.format(os.environ.get('PROJECT_DOMAIN') + '.glitch.me')
-
- endpoint_url = g.app_url
- redirect_uri = endpoint_url + og_url_for('.get_loggedin_html')
-
- # Set the scopes
- scopes = TWITTER_SCOPES
- # Create a code verifier
- code_verifier = base64.urlsafe_b64encode(os.urandom(30)).decode("utf-8")
- code_verifier = re.sub("[^a-zA-Z0-9]+", "", code_verifier)
- # Create a code challenge
- code_challenge = hashlib.sha256(code_verifier.encode("utf-8")).digest()
- code_challenge = base64.urlsafe_b64encode(code_challenge).decode("utf-8")
- code_challenge = code_challenge.replace("=", "")
- # Start an OAuth 2.0 session
- oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes)
- # Create an authorize URL
- auth_url = "https://twitter.com/i/oauth2/authorize"
- authorization_url, state = oauth.authorization_url(
- auth_url, code_challenge=code_challenge, code_challenge_method="S256"
- )
-
- session['twitter_code_verifier'] = code_verifier
-
- return redirect(authorization_url)
- @twitter_app.route('/refresh-token', methods=['GET'])
- def get_twitter_refresh_token (response_format='json'):
-
- client_id = os.environ.get('TWITTER_CLIENT_ID')
- client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
-
-
- me = request.args.get('me')
- twitter = session.get(me)
-
- if not twitter:
- return redirect(url_for('.get_login_html'))
-
- token = twitter['refresh_token']
-
- basic_auth = base64.b64encode('{}:{}'.format(client_id, client_secret).encode('utf-8')).decode('utf-8')
-
- headers = {
- 'Authorization': 'Basic ' + basic_auth,
- }
-
- data = {
- 'refresh_token': token,
- 'grant_type': 'refresh_token'
- }
-
- response = requests.post('https://api.twitter.com/2/oauth2/token', data=data, headers=headers)
-
- result = json.loads(response.text)
-
- if 'access_token' in result:
-
- twitter['refresh_token'] = result['refresh_token']
- twitter['access_token'] = result['access_token']
-
- session[ me ] = twitter
-
- return response.text
- @twitter_app.route('/app/refresh-token', methods=['GET'])
- def get_twitter_app_refresh_token ():
-
- client_id = os.environ.get('TWITTER_CLIENT_ID')
- client_secret = os.environ.get('TWITTER_CLIENT_SECRET')
-
-
- basic_auth = base64.b64encode('{}:{}'.format(app_consumer_key, app_secret_key).encode('utf-8')).decode('utf-8')
-
- headers = {
- 'Authorization': 'Basic ' + basic_auth,
- }
-
- data = {
- 'grant_type': 'client_credentials'
- }
-
- response = requests.post('https://api.twitter.com/oauth2/token', data=data, headers=headers)
-
- result = json.loads(response.text)
-
- if 'access_token' in result:
-
- app_access_token = result['access_token']
-
-
-
- return response.text
- @twitter_app.route('/', methods=['GET'])
- @twitter_app.route('/accounts.html', methods=['GET'])
- def get_loggedin_accounts_html ():
- twitter_accounts = dict(filter(lambda e: e[0].startswith('twitter:'), session.items()))
-
- return Response(json.dumps(twitter_accounts), mimetype='application/json')
- @twitter_app.route('/tweets/create', methods=['POST'])
- def post_tweets_create ():
-
- me = request.args.get('me')
- twitter = session.get(me)
-
- if not twitter:
- return redirect(url_for('.get_login_html'))
-
- user_id = twitter['id']
- token = twitter['access_token']
-
- text = request.form.get('text')
- reply_to_tweet_id = request.form.get('reply_to_tweet_id')
- quote_tweet_id = request.form.get('quote_tweet_id')
-
- tweet_source = ApiV2TweetSource(token)
- result = tweet_source.create_tweet(text, reply_to_tweet_id=reply_to_tweet_id, quote_tweet_id=quote_tweet_id)
-
- print(result)
-
- if 'HX-Request' in request.headers:
- return render_template('partial/compose-form.html', new_tweet_id=result['data']['id'])
- else:
- response_body = json.dumps({
- 'result': result
- })
- return Response(response_body, mimetype='application/json')
- @twitter_app.route('/data/timeline/user/<user_id>/counts')
- def get_data_timeline_user_counts (user_id):
- query = f'from:{user_id}'
-
- # is:reply is:quote is:retweet has:links has:mentions has:media has:images has:videos has:geo
-
- if not app_access_token:
- return 'refresh app token first.', 400
-
-
- tweet_source = ApiV2TweetSource(app_access_token)
-
- response_json = tweet_source.count_tweets(query)
-
- data = list(filter(lambda d: d.get('tweet_count') > 0, response_json.get('data')))
-
- result = {
- 'total_count': response_json.get('meta').get('total_tweet_count'),
- 'data': data
- }
-
- return Response(json.dumps(result), mimetype='application/json')
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- # HTMx partials
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- def tweet_model (includes, tweet_data, me):
- # retweeted_by, avi_icon_url, display_name, handle, created_at, text
-
-
- user = list(filter(lambda u: u.get('id') == tweet_data['author_id'], includes.get('users')))[0]
- source_url = 'https://twitter.com/{}/status/{}'.format(user['username'], tweet_data['id'])
-
- avi_icon_url = user['profile_image_url']
-
- retweet_of = None
- quoted = None
- if 'referenced_tweets' in tweet_data:
- retweet_of = list(filter(lambda r: r['type'] == 'retweeted', tweet_data['referenced_tweets']))
- quoted = list(filter(lambda r: r['type'] == 'quoted', tweet_data['referenced_tweets']))
-
- t = {
- 'id': tweet_data['id'],
- 'text': tweet_data['text'],
- 'created_at': tweet_data['created_at'],
- 'author_is_verified': user['verified'],
-
- 'conversation_id': tweet_data['conversation_id'],
-
- 'avi_icon_url': avi_icon_url,
-
- 'display_name': user['name'],
- 'handle': user['username'],
-
- 'author_url': url_for('.get_profile_html', user_id=user['id']),
- 'author_id': user['id'],
-
- 'source_url': source_url,
- 'source_author_url': 'https://twitter.com/{}'.format(user['username']),
- #'is_edited': len(tweet_data['edit_history_tweet_ids']) > 1
-
-
- }
-
- if 'entities' in tweet_data:
- if 'urls' in tweet_data['entities']:
- urls = list(filter(lambda u: 'title' in u and 'description' in u, tweet_data['entities']['urls']))
-
- if len(urls):
- url = urls[0]
- t['card'] = {
- 'display_url': url['display_url'].split('/')[0],
- 'source_url': url['unwound_url'],
- 'content': url['description'],
- 'title': url['title']
- }
-
- if 'public_metrics' in tweet_data:
- t['public_metrics'] = tweet_data['public_metrics']
-
- if 'non_public_metrics' in tweet_data:
- t['non_public_metrics'] = tweet_data['non_public_metrics']
-
- try:
- if 'attachments' in tweet_data and 'media_keys' in tweet_data['attachments']:
-
- media = list(map(lambda mk: list(filter(lambda m: m['media_key'] == mk, includes['media']))[0], tweet_data['attachments']['media_keys']))
-
- photos = list(filter(lambda m: m['type'] == 'photo', media))
- videos = list(filter(lambda m: m['type'] == 'video', media))
-
- photos = list(map(lambda p: {**p, 'preview_image_url': p['url'] + '?name=tiny&format=webp'}, photos))
- videos = list(map(lambda p: {**p, 'image_url': p['preview_image_url'], 'preview_image_url': p['preview_image_url'] + '?name=tiny&format=webp'}, videos))
-
- t['photos'] = photos
- t['videos'] = videos
- except:
- print('exception adding attachments to tweet.')
-
- if retweet_of and len(retweet_of):
- retweeted_tweet = list(filter(lambda t: t.get('id') == retweet_of[0]['id'], includes.get('tweets')))[0]
-
- t.update({
- 'source_retweeted_by_url': 'https://twitter.com/{}'.format(user['username']),
- 'retweeted_by': user['name'],
- 'retweeted_by_url': '/profile/{}.html'.format(user['id'])
- })
-
- rt = tweet_model(includes, retweeted_tweet, me)
- t.update(rt)
-
- try:
- if quoted and len(quoted):
- quoted_tweet = list(filter(lambda t: t.get('id') == quoted[0]['id'], includes.get('tweets')))[0]
-
- t['quoted_tweet'] = tweet_model(includes, quoted_tweet, me)
- except:
- print('error adding quoted tweet')
-
- return t
- def tweet_paginated_timeline ():
- return
- @twitter_app.route('/data/tweets', methods=['GET'])
- def get_twitter_tweets ():
- me = request.args.get('me')
- twitter = session.get(me)
-
- if not twitter:
- return redirect(url_for('.get_login_html'))
-
- user_id = twitter['id']
- token = twitter['access_token']
- ids = request.args.get('ids')
- max_id=''
- if ids:
- ids = ids.split(',')
- tweet_source = ApiV2TweetSource(token)
-
- response_json = tweet_source.get_tweets(ids)
-
- user = {
- 'id': user_id
- }
-
- query = {}
-
- if 'HX-Request' in request.headers:
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
-
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- return Response(json.dumps(response_json), mimetype="application/json")
-
-
- @twitter_app.route('/data/timeline/home/<variant>', methods=['GET'])
- def get_data_timeline_home (variant):
- # retweeted_by, avi_icon_url, display_name, handle, created_at, text
- me = request.args.get('me')
- twitter = session.get(me)
-
- if not twitter:
- return redirect(url_for('.get_login_html'))
-
- user_id = twitter['id']
- token = twitter['access_token']
-
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_home_timeline(user_id,
- pagination_token = pagination_token)
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
- next_token = response_json.get('meta').get('next_token')
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_data_timeline_home', variant=variant, pagination_token=next_token)
- }
-
- if 'HX-Request' in request.headers:
- user = {
- 'id': user_id
- }
-
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- response_body = json.dumps({
- 'tweets': tweets,
- 'query': query
- })
- return Response(response_body, mimetype='application/json')
- @twitter_app.route('/data/mentions/<user_id>', methods=['GET'])
- def get_data_mentions (user_id):
-
- me = request.args.get('me')
- twitter = session.get(me)
-
- if not twitter:
- return redirect(url_for('.get_login_html'))
-
- token = twitter['access_token']
-
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_mentions_timeline(user_id,
- pagination_token = pagination_token)
-
- # the OG tweet is in the include.tweets collection.
- # All thread tweets are as well, clearly. Does it cost a fetch?
- #print(response_json)
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, token), response_json['data']))
-
- related_tweets = [] # derived from includes
-
- tweets.reverse()
-
- next_token = response_json.get('meta').get('next_token')
-
- query = {}
-
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': '/twitter/data/mentions/{}?me={}&pagination_token={}'.format(user_id, me, next_token)
- }
-
- if 'HX-Request' in request.headers:
- user = {
- 'id': user_id
- }
- # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n"))
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query, me = me)
- else:
- response_body = json.dumps({
- 'tweets': tweets,
- 'pagination_token': pagination_token,
- 'next_token': next_token
- })
- return Response(response_body, mimetype='application/json')
- @twitter_app.route('/data/between/<user_id>/<user2_id>', methods=['GET'])
- def get_data_between (user_id, user2_id):
-
- me = request.args.get('me')
- twitter = session.get(me)
-
- if not twitter:
- return redirect(url_for('.get_login_html'))
- token = twitter['access_token']
-
- pagination_token = request.args.get('pagination_token')
-
- if user_id == 'me':
- user_id = twitter['id']
-
- if user2_id == 'me':
- user2_id = twitter['id']
-
- search_query = "(from:{} to:{}) OR (to:{} from:{})".format(user_id, user2_id, user_id, user2_id)
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.search_tweets(search_query,
- pagination_token = pagination_token)
-
- # the OG tweet is in the include.tweets collection.
- # All thread tweets are as well, clearly. Does it cost a fetch?
- #print(response_json)
-
- # augment with archive if one of the users is me
- # /twitter-archive/tweets/search?in_reply_to_user_id=__
- # /twitter-archive/tweets/search?q=@__
-
- tweets = []
- next_token = None
- if response_json.get('meta').get('result_count'):
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, token), response_json['data']))
-
- related_tweets = [] # derived from includes
-
- next_token = response_json.get('meta').get('next_token')
-
-
-
- tweets.reverse()
-
-
-
- query = {}
-
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': '/twitter/data/mentions/{}?me={}&pagination_token={}'.format(user_id, me, next_token)
- }
-
- if 'HX-Request' in request.headers:
- user = {
- 'id': twitter['id']
- }
- # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n"))
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query, me = me)
- else:
- response_body = json.dumps({
- 'tweets': tweets,
- 'pagination_token': pagination_token,
- 'next_token': next_token
- })
- return Response(response_body, mimetype='application/json')
- @twitter_app.route('/data/thread/<tweet_id>', methods=['GET'])
- def get_data_thread (tweet_id):
-
- me = request.args.get('me')
- twitter = session.get(me)
-
- if not twitter:
- return redirect(url_for('.get_login_html'))
-
- user_id = twitter['id']
- token = twitter['access_token']
-
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_thread(tweet_id, author_id=user_id,
- pagination_token = pagination_token)
-
- # the OG tweet is in the include.tweets collection.
- # All thread tweets are as well, clearly. Does it cost a fetch?
- print(response_json)
-
- tweets = []
- next_token = None
- if response_json.get('meta').get('result_count'):
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
-
- # FIXME this method is OK except it doesn't work if there are no replies.
- #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me))
-
- #related_tweets = [] # derived from includes
-
- next_token = response_json.get('meta').get('next_token')
-
- if not pagination_token:
- response_json = tweet_source.get_tweet(tweet_id)
- print("parent tweet=")
- print(response_json)
-
- includes = response_json.get('includes')
- tweet = response_json.get('data')[0]
-
- tweets.append(tweet_model(includes, tweet, me))
-
- tweets.reverse()
-
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': '/twitter/data/thread/{}?me={}&pagination_token={}'.format(tweet_id, me, next_token)
- }
-
- if 'HX-Request' in request.headers:
- user = {
- 'id': user_id
- }
- # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n"))
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- response_body = json.dumps({
- 'tweets': tweets,
- 'pagination_token': pagination_token,
- 'next_token': next_token
- })
- return Response(response_body, mimetype='application/json')
- @twitter_app.route('/data/conversation/<tweet_id>', methods=['GET'])
- def get_data_conversation (tweet_id):
-
- me = request.args.get('me')
- twitter = session.get(me)
-
- if not twitter:
- return redirect(url_for('.get_login_html'))
-
- user_id = twitter['id']
- token = twitter['access_token']
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
-
- # seems to get l
- response_json = tweet_source.get_thread(tweet_id,
- pagination_token = pagination_token)
-
- # the OG tweet is in the include.tweets collection.
- # All thread tweets are as well, clearly. Does it cost a fetch?
- #print(response_json)
-
- tweets = []
- next_token = None
- print("conversation meta:")
- print(json.dumps(response_json.get('meta'), indent=2))
- if response_json.get('meta').get('result_count'):
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
-
- next_token = response_json.get('meta').get('next_token')
-
- # this method is OK except it doesn't work if there are no replies.
- #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me))
-
- if not pagination_token:
- response_json = tweet_source.get_tweet(tweet_id)
-
-
-
- print("parent tweet=")
- print(response_json)
-
- includes = response_json.get('includes')
- tweet = response_json.get('data')[0]
-
- tweets.append(tweet_model(includes, tweet, me))
-
- #related_tweets = [] # derived from includes
-
- tweets.reverse()
-
-
-
- query = {}
-
- if next_token:
- query = {
- **query,
- 'next_data_url': '/twitter/data/conversation/{}?me={}&pagination_token={}'.format(tweet_id, me, next_token)
- }
-
- if 'HX-Request' in request.headers:
- user = {
- 'id': user_id
- }
-
- # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n"))
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- response_body = json.dumps({
- 'tweets': tweets,
- 'pagination_token': pagination_token,
- 'next_token': next_token
- })
- return Response(response_body, mimetype='application/json')
- @twitter_app.route('/data/timeline/user/<user_id>', methods=['GET'])
- def get_data_timeline_user (user_id ):
-
- me = request.args.get('me')
- twitter = session.get(me)
-
- if not twitter:
- return redirect(url_for('.get_login_html'))
-
- token = twitter['access_token']
-
- pagination_token = request.args.get('pagination_token')
- exclude_replies = request.args.get('exclude_replies')
-
- is_me = user_id == twitter['id']
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_user_timeline(user_id,
- pagination_token = pagination_token,
- non_public_metrics = is_me,
- exclude_replies = exclude_replies == '1')
-
-
- print(response_json)
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
- next_token = response_json.get('meta').get('next_token')
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_data_timeline_user', user_id=user_id , pagination_token=next_token)
- }
-
- if 'HX-Request' in request.headers:
- user = {
- 'id': user_id
- }
-
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- response_body = json.dumps({
- 'tweets': tweets,
- 'query': query
- })
- return Response(response_body, mimetype='application/json')
- @twitter_app.route('/data/bookmarks/<user_id>', methods=['GET'])
- def get_data_bookmarks (user_id):
- # retweeted_by, avi_icon_url, display_name, handle, created_at, text
-
- me = request.args.get('me')
- twitter = session.get(me)
-
- if not twitter:
- return redirect(url_for('.get_login_html'))
-
- token = twitter['access_token']
-
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_bookmarks(user_id,
- pagination_token = pagination_token)
-
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
- next_token = response_json.get('meta').get('next_token')
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': '/twitter/data/bookmarks/{}?me={}&pagination_token={}'.format(user_id, me, next_token),
- 'next_page_url': '?me={}&pagination_token={}'.format(me, next_token)
- }
-
- if 'HX-Request' in request.headers:
- user = {
- 'id': user_id
- }
-
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- response_body = json.dumps({
- 'tweets': tweets,
- 'query': query
- })
- return Response(response_body, mimetype='application/json')
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- # HTMx views
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- @twitter_app.route('/latest.html', methods=['GET'])
- def get_timeline_home_html (variant = "reverse_chronological", pagination_token=None):
- # retweeted_by, avi_icon_url, display_name, handle, created_at, text
- me = request.args.get('me')
- twitter = session.get(me)
-
- if not twitter:
- return redirect(url_for('.get_login_html'))
-
- user_id = twitter['id']
- token = twitter['access_token']
-
- if not pagination_token:
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_home_timeline(user_id,
- pagination_token = pagination_token)
-
- print(json.dumps(response_json, indent=2))
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
- next_token = response_json.get('meta').get('next_token')
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_data_timeline_home', variant=variant, pagination_token=next_token),
- 'next_page_url': url_for('.get_timeline_home_html', pagination_token=pagination_token)
- }
-
- user = {
- 'id': user_id
- }
-
- return render_template('tweet-collection.html', user = user, tweets = tweets, query = query)
- @twitter_app.route('/bookmarks.html', methods=['GET'])
- def get_bookmarks_html (user_id = None, pagination_token=None, token=None):
- # retweeted_by, avi_icon_url, display_name, handle, created_at, text
-
- me = request.args.get('me')
- twitter = session.get(me)
-
- if not twitter:
- return redirect(url_for('.get_login_html'))
-
- if not user_id:
- user_id = twitter['id']
-
- token = twitter['access_token']
-
- if not pagination_token:
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_bookmarks(user_id,
- pagination_token = pagination_token)
-
- print(response_json)
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
- next_token = response_json.get('meta').get('next_token')
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_data_bookmarks', user_id=user_id, pagination_token=next_token),
- 'next_page_url': url_for('.get_bookmarks_html', user_id=user_id, pagination_token=pagination_token)
- }
-
- user = {
- 'id': user_id
- }
-
- return render_template('tweet-collection.html', user = user, tweets = tweets, query = query)
- @twitter_app.route('/conversations.html', methods=['GET'])
- def get_conversations_html ():
-
- me = request.args.get('me')
- twitter = session.get(me)
-
- if not twitter:
- return redirect(url_for('.get_login_html'))
-
- user_id = twitter['id']
- token = twitter['access_token']
-
-
- pagination_token = request.args.get('pagination_token')
- max_results = int(request.args.get('max_results', 10))
-
- # https://developer.twitter.com/en/docs/twitter-api/direct-messages/lookup/api-reference/get-dm_events
- url = "https://api.twitter.com/2/dm_events"
- params = {
- "dm_event.fields": "id,event_type,text,created_at,dm_conversation_id,sender_id,participant_ids,referenced_tweets,attachments",
- "expansions": ",".join(["sender_id", "participant_ids"]),
- "max_results": max_results,
-
- "user.fields": ",".join(["id", "created_at", "name", "username", "location", "profile_image_url", "url", "verified"])
- }
-
- if pagination_token:
- params['pagination_token'] = pagination_token
-
- headers = {"Authorization": "Bearer {}".format(token)}
-
- response = requests.get(url, params=params, headers=headers)
- response_json = json.loads(response.text)
-
- print(response_json)
-
- dm_events = response_json.get('data')
- next_token = response_json.get('meta').get('next_token')
-
- query = {
- 'pagination_token': pagination_token,
- 'next_token': next_token
- }
-
- user = {
- 'id': user_id
- }
-
- return render_template('conversations.html', user = user, dm_events = dm_events, query = query)
- @twitter_app.route('/profile/<user_id>.html', methods=['GET'])
- def get_profile_html (user_id):
-
- me = request.args.get('me')
- twitter = session.get(me)
-
- if not twitter:
- return redirect(url_for('.get_login_html'))
-
- token = twitter['access_token']
-
- is_me = user_id == twitter['id']
-
- pagination_token = request.args.get('pagination_token')
- exclude_replies = request.args.get('exclude_replies', '1')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_user_timeline(user_id,
- exclude_replies = exclude_replies == '1',
- pagination_token = pagination_token,
- non_public_metrics = is_me)
-
- profile_links = []
- if user_id == "":
- profile_links += [
- {
- 'title': 'Mastodon',
- 'type': 'rss',
- 'url': 'https://mastodon.cloud/@ispoogedaily.rss'
- },
- {
- 'title': 'YouTube',
- 'type': 'rss',
- 'url': 'https://mastodon.cloud/@ispoogedaily.rss'
- },
- {
- 'title': 'Reddit',
- 'type': 'rss',
- 'url': 'https://mastodon.cloud/@ispoogedaily.rss'
- },
- {
- 'title': 'BedRSS',
- 'type': 'rss',
- 'url': 'https://mastodon.cloud/@ispoogedaily.rss'
- },
- {
- 'title': 'ispoogedaily',
- 'type': 'ig',
- 'url': 'https://mastodon.cloud/@ispoogedaily.rss'
- },
- {
- 'title': 'iSpooge Daily',
- 'type': 'yt',
- 'url': 'https://mastodon.cloud/@ispoogedaily.rss'
- },
- {
- 'title': 'US 555-555-5555',
- 'type': 'tel',
- 'url': 'tel:+15555555555'
- },
- {
- 'title': 'biz@example.com',
- 'type': 'email',
- 'url': 'mailto:biz@example.com?subject=hey'
- },
- ]
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, me), response_json['data']))
- next_token = response_json.get('meta').get('next_token')
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_data_timeline_user', user_id=user_id, pagination_token=next_token, exclude_replies=1),
- 'next_page_url': url_for('.get_profile_html', user_id=user_id , pagination_token=next_token)
- }
-
- profile_user = {
- 'id': user_id
- }
-
- return render_template('user-profile.html', user = profile_user, tweets = tweets, query = query)
- @twitter_app.route('/media/upload', methods=['POST'])
- def post_media_upload ():
- me = request.args.get('me')
- twitter = session.get(me)
-
- if not twitter:
- return redirect(url_for('.get_login_html'))
-
- token = twitter['access_token']
- form = {
- 'media_category': 'tweet_image'
- }
-
- headers = {
- 'Authorization': 'Bearer {}'.format(token)
- }
-
- url = 'http://localhost:5004/twitter/fake-twitter/media/upload'
- #url = 'https://upload.twitter.com/1.1/media/upload.json' # .json
-
-
-
- upload_media = {}
- for e in request.files.items():
- media_name = e[0]
- f = e[1]
-
- print('.')
-
- files = {'media': [secure_filename(f.filename), BufferedReader(f), f.content_type]}
- response = requests.post(url, files=files, data=form, headers=headers)
-
- print(response.status_code)
- print(response.text)
-
- response_json = json.loads(response.text)
-
- upload_media[media_name] = response_json
-
- return Response(json.dumps({'upload_media': upload_media}), mimetype='application/json')
- @twitter_app.route('/fake-twitter/media/upload', methods=['POST'])
- def post_media_upload2 ():
- print(request.content_type)
-
- f = request.files.get('media')
-
- f.seek(0,2)
- media_size = f.tell()
- media = {
- #'_auth': request.headers.get('Authorization'),
- 'media_key': '3_{}'.format(secure_filename(f.filename)),
- 'media_id': secure_filename(f.filename),
- 'size': media_size,
- 'expires_after_secs': 86400,
- 'image': {
- 'image_type': f.content_type,
- 'w': 1,
- 'h': 1
- }
- }
- return Response(json.dumps(media), mimetype='application/json')
|