123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545 |
- from dataclasses import dataclass
- from typing import List, Optional
- from dataclass_type_validator import dataclass_type_validator, dataclass_validate, TypeValidationError
- from configparser import ConfigParser
- import base64
- import sqlite3
- import os
- import json
- import json_stream
- from zipfile import ZipFile
- import itertools
- import time
- from io import BufferedReader
- import re
- import datetime
- import dateutil
- import dateutil.parser
- import dateutil.tz
- import requests
- from werkzeug.utils import secure_filename
- from flask import json, Response, render_template, request, send_from_directory, Blueprint, session, redirect, g, current_app, jsonify
- from flask_cors import CORS
- from tweet_source import ApiV2TweetSource, TwitterApiV2SocialGraph
- import oauth2_login
- DATA_DIR='.data'
- twitter_app = Blueprint('twitter_v2_facade', 'twitter_v2_facade',
- static_folder='static',
- static_url_path='',
- url_prefix='/')
-
- twitter_app.register_blueprint(oauth2_login.oauth2_login, url_prefix="/")
- twitter_app.context_processor(oauth2_login.inject_me)
- twitter_app.before_request(oauth2_login.add_me)
- url_for = oauth2_login.url_for_with_me
- def run_script(script_name, script_vars):
- script_path = './{}.py'.format(script_name)
- if (os.path.exists(script_path)):
- script_file = open(script_path, 'r')
- script = script_file.read()
- script_file.close()
- try:
- return exec(script, script_vars)
- except:
- print('error running script: {}'.format(script_name))
- return False
- False
- class ActivityData:
- def __init__ (self,db_path):
-
- self.db_path = db_path
-
- db_exists = os.path.exists(db_path)
-
- self.db = sqlite3.connect(db_path)
-
- if not db_exists:
- self.init_db()
-
- return
-
- def init_db (self):
-
- self.db.execute('create table seen_user (ts, user_id)')
- self.db.execute('create table seen_tweet (ts, tweet_id)')
-
- return
-
- def seen_tweet (self, tweet_id):
- return
-
- def seen_user (self, user_id):
- return
-
- def add_tweet_counts (self, user_id, start, end, tweet_count):
- return [current_ts, user_id, start, end, tweet_count]
-
- def add_tweet_public_metrics (self, tweet_id, like_count, reply_count, retweet_count, quote_count):
- return
-
- def add_tweet_non_public_metrics (self, tweet_id, impression_count, click_count, link_click_count, profile_click_count):
- return
-
- def add_user_public_metrics (self, user_id, followers_count, following_count, tweet_count, listed_count):
- return
- class DataSet:
- def __init__ (self):
- self.items = {}
- return
-
- def update_items (self, items):
- """
- merges objects by ID. Asssigns an ID if none exists. Mutates OG object.
- """
-
- ids = []
-
- for item in items:
- if not 'id' in item:
- #item = dict(item)
- item['id'] = uuid.uuid4().hex
- else:
- existing_item = self.items.get( item['id'] )
- if existing_item:
- existing_item.update(item)
- item = existing_item
- self.items[ item['id'] ] = item
- ids.append( item['id'] )
-
- return ids
-
- def get_items (self):
- return self.items.values()
- class TwitterMetadata:
- def __init__ (self, data_dir):
- self.data_dir = data_dir
-
- os.mkdir(data_dir, exist_ok=True)
-
- def get_tweet (self, tweet_id):
- path = f'{self.data_dir}/tweet_{tweet_id}.json'
-
- if not os.path.exists(path):
- return None
-
- with open(path, 'rt') as f:
- return json.loads(f.read())
-
-
- def update_tweet (self, tweet_id, fields):
- tweet = self.get_tweet(tweet_id)
-
- if not tweet:
- tweet = {'id': tweet_id}
-
- tweet.update(fields)
-
- with open(f'{self.data_dir}/tweet_{tweet_id}.json', 'wt') as f:
- f.write(json.dumps(tweet))
-
- return tweet
- def collection_from_card_source (url):
- """
- temp1 = await fetch('http://localhost:5000/notes/cards/search?q=twitter.com/&limit=10').then(r => r.json())
- re = /(http[^\s]+twitter\.com\/[^\/]+\/status\/[\d]+)/ig
- tweetLinks = temp1.cards.map(c => c.card.content).map(c => c.match(re))
- tweetLinks2 = tweetLinks.flat().filter(l => l)
- tweetLinksS = Array.from(new Set(tweetLinks2))
- statusUrls = tweetLinksS.map(s => new URL(s))
- //users = Array.from(new Set(statusUrls.map(s => s.pathname.split('/')[1])))
- ids = Array.from(new Set(statusUrls.map(s => parseInt(s.pathname.split('/')[3]))))
- """
-
- """
- temp1 = JSON.parse(document.body.innerText)
- // get swipe note + created_at + tweet user + tweet ID
- tweetCards = temp1.cards.map(c => c.card).filter(c => c.content.match(re))
- tweets = tweetCards.map(c => ({created_at: c.created_at, content: c.content, tweets: c.content.match(re).map(m => new URL(m))}))
-
- tweets.filter(t => t.tweets.filter(t2 => t2.user.toLowerCase() == 'stephenmpinto').length)
-
-
- // HN
- re = /(http[^\s]+news.ycombinator\.com\/[^\s]+\=[\d]+)/ig
- linkCards = temp1.cards.map(c => c.card).filter(c => c.content.match(re))
- links = linkCards.map(c => ({created_at: c.created_at, content: c.content, links: c.content.match(re).map(m => new URL(m))}))
-
- // YT (I thnk I've already done this one)
-
- """
-
- # more in 2022 twitter report
- return None
- def get_tweet_collection (collection_id):
- with open(f'{DATA_DIR}/collection/{collection_id}.json', 'rt', encoding='utf-8') as f:
- collection = json.loads(f.read())
-
- return collection
- # pagination token is the next tweet_ID
- @twitter_app.get('/collection/<collection_id>.html')
- def get_collection_html (collection_id):
-
- max_results = int(request.args.get('max_results', 10))
-
- pagination_token = request.args.get('pagination_token')
-
- collection = get_tweet_collection(collection_id)
-
- if 'authorized_users' in collection and g.twitter_user['id'] not in collection['authorized_users']:
- return 'access denied.', 403
-
- items = []
- for item in collection['items']:
- tweet_id = item['id']
- if pagination_token and tweet_id != pagination_token:
- continue
- elif tweet_id == pagination_token:
- pagination_token = None
- elif len(items) == max_results:
- pagination_token = tweet_id
- break
-
- items.append(item)
-
-
- if not len(items):
- return 'no tweets', 404
-
- token = g.twitter_user['access_token']
-
-
- tweet_source = ApiV2TweetSource(token)
-
- tweet_ids = list(map(lambda item: item['id'], items))
- response_json = tweet_source.get_tweets( tweet_ids )
-
- #print(response_json)
- if 'errors' in response_json:
- # types:
- # https://api.twitter.com/2/problems/not-authorized-for-resource (blocked or suspended)
- # https://api.twitter.com/2/problems/resource-not-found (deleted)
- #print(response_json.get('errors'))
- for err in response_json.get('errors'):
- if not 'type' in err:
- print('unknown error type: ' + str(err))
- elif err['type'] == 'https://api.twitter.com/2/problems/not-authorized-for-resource':
- print('blocked or suspended tweet: ' + err['value'])
- elif err['type'] == 'https://api.twitter.com/2/problems/resource-not-found':
- print('deleted tweet: ' + err['value'])
- else:
- print('unknown error: ' + str(err))
-
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data']))
-
- for item in items:
- t = list(filter(lambda t: item['id'] == t['id'], tweets))
-
- if not len(t):
- print("no tweet for item: " + item['id'])
- t = {
- "id": item['id'],
- "text": "(Deleted, suspended or blocked)",
- "created_at": "",
- "handle": "error",
- "display_name": "Error"
- }
- # FIXME 1) put this in relative order to the collection
- # FIXME 2) we can use the tweet link to get the user ID...
- tweets.append(t)
- else:
- t = t[0]
-
- t['note'] = item['note']
-
- if request.args.get('format') == 'json':
- return jsonify({'ids': tweet_ids,
- 'data': response_json,
- 'tweets': tweets,
- 'items': items,
- 'pagination_token': pagination_token})
- else:
- query = {}
-
- if pagination_token:
- query['next_data_url'] = url_for('.get_collection_html', collection_id=collection_id, pagination_token=pagination_token)
-
- if 'HX-Request' in request.headers:
- return render_template('partial/tweets-timeline.html', tweets = tweets, user = {}, query = query)
- else:
- if pagination_token:
- query['next_page_url'] = url_for('.get_collection_html', collection_id=collection_id, pagination_token=pagination_token)
- return render_template('tweet-collection.html', tweets = tweets, user = {}, query = query)
- @twitter_app.post('/data/collection/create/from-cards')
- def post_data_collection_create_from_cards ():
- """
- // create collection from search, supporting multiple Tweets per card and Tweets in multiple Cards.
-
-
- re = /(https?[a-z0-9\.\/\:]+twitter\.com\/[0-9a-z\_]+\/status\/[\d]+)/ig
-
- temp1 = await fetch('http://localhost:5000/notes/cards/search?q=twitter.com/').then(r => r.json())
- cardMatches = temp1.cards
- .map(cm => Object.assign({}, cm, {tweetLinks: Array.from(new Set(cm.card.content.match(re)))}))
- .filter(cm => cm.tweetLinks && cm.tweetLinks.length)
- .map(cm => Object.assign({}, cm, {tweetUrls: cm.tweetLinks.map(l => new URL(l))}))
- .map(cm => Object.assign({}, cm, {tweetInfos: cm.tweetUrls.map(u => ({user: u.pathname.split('/')[1], tweetId: u.pathname.split('/')[3]}))}));
-
- collectionCards = {}
- cardMatches.forEach(function (cm) {
- if (!cm.tweetLinks.length) { return; }
- cm.tweetInfos.forEach(function (ti) {
- if (!collectionCards[ti.tweetId]) {
- collectionCards[ti.tweetId] = [];
- }
- collectionCards[ti.tweetId].push(cm.card);
- })
- })
- var collectionItems = [];
- Object.entries(collectionCards).forEach(function (e) {
- var tweetId = e[0], cards = e[1];
- var note = cards.map(function (card) {
- return card.created_at + "\n\n" + card.content;
- }).join("\n\n-\n\n");
- collectionItems.push({id: tweetId, note: note, tweet_infos: cm.tweetInfos, card_infos: cards.map(c => 'card#' + c.id)});
- })
- """
-
- collection = {
- 'items': [], # described in JS function above
- 'authorized_users': [g.twitter_user['id']]
- }
-
- return jsonify(collection)
- #twitter_meta = TwitterMetadata('./data/meta')
- @twitter_app.route('/tweets', methods=['POST'])
- def post_tweets_create ():
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- text = request.form.get('text')
- reply_to_tweet_id = request.form.get('reply_to_tweet_id')
- quote_tweet_id = request.form.get('quote_tweet_id')
-
- tweet_source = ApiV2TweetSource(token)
- result = tweet_source.create_tweet(text, reply_to_tweet_id=reply_to_tweet_id, quote_tweet_id=quote_tweet_id)
-
- print(result)
-
- run_script('on_tweeted', {'twitter_user': g.twitter_user, 'tweet': result})
-
- if 'HX-Request' in request.headers:
- return render_template('partial/compose-form.html', new_tweet_id=result['data']['id'])
- else:
- response_body = json.dumps({
- 'result': result
- })
- return jsonify(response_body)
- @twitter_app.route('/tweet/<tweet_id>/retweet', methods=['POST'])
- def post_tweet_retweet (tweet_id):
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- tweet_source = ApiV2TweetSource(token)
- result = tweet_source.retweet(tweet_id, user_id=user_id)
-
- print(result)
-
- run_script('on_tweeted', {'twitter_user': g.twitter_user, 'retweet': result})
-
- if 'HX-Request' in request.headers:
- return """retweeted <script>Toast.fire({
- icon: 'success',
- title: 'Retweet was sent; <a style="text-align: right" href="{}">View</a>.'
- });</script>""".replace('{}', url_for('.get_tweet_html', tweet_id=tweet_id))
- else:
- response_body = json.dumps({
- 'result': result
- })
- return jsonify(response_body)
- @twitter_app.route('/tweet/<tweet_id>/bookmark', methods=['POST'])
- def post_tweet_bookmark (tweet_id):
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- tweet_source = ApiV2TweetSource(token)
- result = tweet_source.bookmark(tweet_id, user_id=user_id)
-
- print(result)
- if 'HX-Request' in request.headers:
- return """bookmarked <script>Toast.fire({
- icon: 'success',
- title: 'Tweet was bookmarked; <a style="text-align: right" href="{}">View</a>.'
- });</script>""".replace('{}', url_for('.get_tweet_html', tweet_id=tweet_id))
- else:
- response_body = json.dumps({
- 'result': result
- })
- return jsonify(response_body)
- @twitter_app.route('/tweet/<tweet_id>/bookmark', methods=['DELETE'])
- def delete_tweet_bookmark (tweet_id):
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- tweet_source = ApiV2TweetSource(token)
- result = tweet_source.delete_bookmark(tweet_id, user_id=user_id)
-
- response_body = json.dumps({
- 'result': result
- })
- return jsonify(response_body)
- @twitter_app.route('/tweet/<tweet_id>.html', methods=['GET'])
- def get_tweet_html (tweet_id):
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- pagination_token = request.args.get('pagination_token')
- view = request.args.get('view', 'replies')
-
- token = g.twitter_user['access_token']
-
-
- tweet_source = ApiV2TweetSource(token)
-
- only_replies = view == 'replies'
-
-
- tweets = []
- if not pagination_token:
- response_json = tweet_source.get_tweet(tweet_id)
-
-
-
- print("parent tweet=")
- print(response_json)
-
- includes = response_json.get('includes')
- tweet = response_json.get('data')[0]
-
- tweets.append(tweet_model(includes, tweet, g.me))
-
- response_json = None
- data_route = None
-
- if view == 'replies':
- data_route = '.get_tweet_html'
- response_json = tweet_source.get_thread(tweet_id,
- only_replies=True,
- pagination_token = pagination_token)
- elif view == 'thread':
- data_route = '.get_tweet_html'
- response_json = tweet_source.get_thread(tweet_id,
- only_replies=False,
- author_id=tweets[0]['author_id'],
- pagination_token = pagination_token)
-
- elif view == 'conversation':
- data_route = '.get_tweet_html'
- response_json = tweet_source.get_thread(tweet_id,
- only_replies=False,
- pagination_token = pagination_token)
- elif view == 'tweet':
- None
-
- next_token = None
-
- #print("conversation meta:")
- #print(json.dumps(response_json.get('meta'), indent=2))
-
- if response_json and response_json.get('meta').get('result_count'):
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data'])) + tweets
-
- next_token = response_json.get('meta').get('next_token')
-
- # this method is OK except it doesn't work if there are no replies.
- #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me))
-
- #related_tweets = [] # derived from includes
-
- tweets.reverse()
-
-
-
- query = {}
-
- if next_token:
- query = {
- **query,
- # FIXME only_replies
- 'next_data_url': url_for(data_route, tweet_id=tweet_id, pagination_token=next_token, only_replies = '1' if only_replies else '0', author_id = tweets[0]['author_id']),
- 'next_page_url': url_for('.get_tweet_html', tweet_id=tweet_id, view=view, pagination_token=next_token)
- }
-
- user = {
- 'id': user_id
- }
-
- if 'HX-Request' in request.headers:
-
- # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n"))
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- return render_template('tweet-collection.html', user = user, tweets = tweets, query = query, show_parent_tweet_controls=True)
- @twitter_app.route('/followers/<user_id>.html', methods=['GET'])
- def get_data_user_followers (user_id):
-
- token = g.twitter_user['access_token']
-
- social_source = TwitterApiV2SocialGraph(token)
-
- response_json = social_source.get_followers(user_id)
-
- ts = int(time.time() * 1000)
- with open(f'{DATA_DIR}/cache/followers_{user_id}_{ts}.json', 'wt') as f:
- f.write(json.dumps(response_json))
-
- print(response_json)
- #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': response_json})
-
- followers = list(map(lambda f: f['id'], response_json.get('data')))
-
- return render_template('following.html', following=followers)
-
- @twitter_app.route('/following/<user_id>.html', methods=['GET'])
- def get_data_user_following (user_id):
-
- token = g.twitter_user['access_token']
-
- social_source = TwitterApiV2SocialGraph(token)
-
- response_json = social_source.get_following(user_id)
-
- ts = int(time.time() * 1000)
- with open(f'{DATA_DIR}/cache/following_{user_id}_{ts}.json', 'wt') as f:
- f.write(json.dumps(response_json))
-
- print(response_json)
- #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': response_json})
-
- following = list(map(lambda f: f['id'], response_json.get('data')))
-
- return render_template('following.html', following=following)
-
- @twitter_app.route('/data/timeline/user/<user_id>/counts')
- def get_data_timeline_user_counts (user_id):
- query = f'from:{user_id}'
-
- # is:reply is:quote is:retweet has:links has:mentions has:media has:images has:videos has:geo
-
- if not oauth2_login.app_access_token:
- return 'refresh app token first.', 400
-
-
- tweet_source = ApiV2TweetSource(oauth2_login.app_access_token)
-
- response_json = tweet_source.count_tweets(query)
- ts = int(time.time() * 1000)
- with open(f'{DATA_DIR}/cache/tl_counts_{user_id}_{ts}.json', 'wt') as f:
- f.write(json.dumps(response_json))
- data = list(filter(lambda d: d.get('tweet_count') > 0, response_json.get('data')))
-
- result = {
- 'total_count': response_json.get('meta').get('total_tweet_count'),
- 'data': data
- }
-
-
- return jsonify(result)
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- # HTMx partials
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- @dataclass_validate
- @dataclass
- class PublicMetrics:
- reply_count: int = None
- quote_count: int = None
- retweet_count: int = None
- like_count: int = None
- @dataclass_validate
- @dataclass
- class NonPublicMetrics:
- impression_count: int = None
- user_profile_clicks: int = None
- url_link_clicks: int = None
-
- @dataclass_validate
- @dataclass
- class MediaItem:
- height: int
- width: int
- url: str
-
- media_key: str
- type: str
- preview_image_url: str
- image_url: str
- @dataclass_validate
- @dataclass
- class Card:
- display_url: Optional[str] = None
- source_url: Optional[str] = None
- content: Optional[str] = None
- title: Optional[str] = None
- @dataclass_validate
- @dataclass
- class TweetModel:
- id: str
- text: str
- created_at: str
-
- display_name: str
- handle: str
-
- author_is_verified: Optional[bool] = None
- url: Optional[str] = None
- conversation_id: Optional[str] = None
-
- avi_icon_url: Optional[str] = None
-
- author_url: Optional[str] = None
- author_id: Optional[str] = None
-
- source_url: Optional[str] = None
- source_author_url: Optional[str] = None
-
- reply_depth: Optional[int] = 0
-
- is_marked: Optional[bool] = None
-
- card: Optional[Card] = None
-
- public_metrics: Optional[PublicMetrics] = None
- non_public_metrics: Optional[NonPublicMetrics] = None
-
- retweeted_tweet_id: Optional[str] = None
-
- source_retweeted_by_url: Optional[str] = None
- retweeted_by: Optional[str] = None
- retweeted_by_url: Optional[str] = None
-
- videos: Optional[List[MediaItem]] = None
- photos: Optional[List[MediaItem]] = None
-
- quoted_tweet_id: Optional[str] = None
- quoted_tweet: Optional['TweetModel'] = None
-
- replied_tweet_id: Optional[str] = None
- replied_tweet: Optional['TweetModel'] = None
-
- def update(self, new):
- for key, value in new.items():
- if hasattr(self, key):
- setattr(self, key, value)
- dataclass_type_validator(self)
- # tm = TweetModel(id="1", text="aa", created_at="fs", display_name="fda", handle="fdsafas")
- def tweet_model (includes, tweet_data, me, my_url_for=url_for, reply_depth=0):
- # retweeted_by, avi_icon_url, display_name, handle, created_at, text
-
-
- user = list(filter(lambda u: u.get('id') == tweet_data['author_id'], includes.get('users')))[0]
-
-
- url = my_url_for('.get_tweet_html', tweet_id=tweet_data['id'])
- source_url = 'https://twitter.com/{}/status/{}'.format(user['username'], tweet_data['id'])
-
- avi_icon_url = user['profile_image_url']
-
- retweet_of = None
- quoted = None
- replied_to = None
-
- if 'referenced_tweets' in tweet_data:
- retweet_of = list(filter(lambda r: r['type'] == 'retweeted', tweet_data['referenced_tweets']))
- quoted = list(filter(lambda r: r['type'] == 'quoted', tweet_data['referenced_tweets']))
- replied_to = list(filter(lambda r: r['type'] == 'replied_to', tweet_data['referenced_tweets']))
-
- t = {
- 'id': tweet_data['id'],
- 'text': tweet_data['text'],
- 'created_at': tweet_data['created_at'],
- 'author_is_verified': user['verified'],
- 'url': url,
-
- 'conversation_id': tweet_data['conversation_id'],
-
- 'avi_icon_url': avi_icon_url,
-
- 'display_name': user['name'],
- 'handle': user['username'],
-
- 'author_url': my_url_for('.get_profile_html', user_id=user['id']),
- 'author_id': user['id'],
-
- 'source_url': source_url,
- 'source_author_url': 'https://twitter.com/{}'.format(user['username']),
- #'is_edited': len(tweet_data['edit_history_tweet_ids']) > 1
- }
-
- if reply_depth:
- t['reply_depth'] = reply_depth
-
- # HACK we should not refer to the request directly...
- if request and request.args.get('marked_reply') == str(t['id']):
- t['is_marked'] = True
-
- # This is where we should put "is_bookmark", "is_liked", "is_in_collection", etc...
-
- if 'entities' in tweet_data:
- if 'urls' in tweet_data['entities']:
- urls = list(filter(lambda u: 'title' in u and 'description' in u, tweet_data['entities']['urls']))
-
- if len(urls):
- url = urls[0]
- t['card'] = {
- 'display_url': url['display_url'].split('/')[0],
- 'source_url': url['unwound_url'],
- 'content': url['description'],
- 'title': url['title']
- }
-
- if 'public_metrics' in tweet_data:
- t['public_metrics'] = {
-
- 'reply_count': tweet_data['public_metrics']['reply_count'],
- 'quote_count': tweet_data['public_metrics']['quote_count'],
- 'retweet_count': tweet_data['public_metrics']['retweet_count'],
- 'like_count': tweet_data['public_metrics']['like_count']
- }
-
- try:
- pm = PublicMetrics(**t['public_metrics'])
- except:
- print('error populating public_metrics')
-
- if 'non_public_metrics' in tweet_data:
- t['non_public_metrics'] = {
-
- 'impression_count': tweet_data['non_public_metrics']['impression_count'],
- 'user_profile_clicks': tweet_data['non_public_metrics']['user_profile_clicks']
- }
-
- if 'url_link_clicks' in tweet_data['non_public_metrics']:
- t['non_public_metrics']['url_link_clicks'] = tweet_data['non_public_metrics']['url_link_clicks']
-
- if retweet_of and len(retweet_of):
- t['retweeted_tweet_id'] = retweet_of[0]['id']
-
- retweeted_tweet = list(filter(lambda t: t.get('id') == retweet_of[0]['id'], includes.get('tweets')))[0]
-
- t.update({
- 'source_retweeted_by_url': 'https://twitter.com/{}'.format(user['username']),
- 'retweeted_by': user['name'],
- 'retweeted_by_url': url_for('.get_profile_html', user_id=user['id'])
- })
-
- rt = tweet_model(includes, retweeted_tweet, me)
- t.update(rt)
- try:
- if 'attachments' in tweet_data and 'media_keys' in tweet_data['attachments']:
-
- media_keys = tweet_data['attachments']['media_keys']
-
- def first_media (mk):
- medias = list(filter(lambda m: m['media_key'] == mk, includes['media']))
- if len(medias):
- return medias[0]
- return None
-
- media = filter(lambda m: m != None, map(first_media, media_keys))
-
- #print('found media=')
- #print(media)
-
- photos = filter(lambda m: m['type'] == 'photo', media)
- videos = filter(lambda m: m['type'] == 'video', media)
-
- photos = map(lambda p: {**p, 'preview_image_url': p['url'] + '?name=tiny&format=webp'}, photos)
- videos = map(lambda p: {**p, 'image_url': p['preview_image_url'], 'preview_image_url': p['preview_image_url'] + '?name=tiny&format=webp'}, videos)
-
- t['photos'] = list(photos)
- t['videos'] = list(videos)
-
- except:
- # it seems like this comes when we have a retweeted tweet with media on it.
- print('exception adding attachments to tweet:')
- print(tweet_data)
- print('view tweet:')
- print(t)
- print('included media:')
- print(includes.get('media'))
-
-
- try:
- if quoted and len(quoted):
- t['quoted_tweet_id'] = quoted[0]['id']
-
- quoted_tweets = list(filter(lambda t: t.get('id') == quoted[0]['id'], includes.get('tweets')))
-
- if len(quoted_tweets):
- t['quoted_tweet'] = tweet_model(includes, quoted_tweets[0], me)
- except:
- print('error adding quoted tweet')
-
- try:
- if replied_to and len(replied_to):
- t['replied_tweet_id'] = replied_to[0]['id']
-
- if reply_depth < 1:
-
- replied_tweets = list(filter(lambda t: t.get('id') == replied_to[0]['id'], includes.get('tweets')))
- if len(replied_tweets):
- print("Found replied Tweet (t={}, rep={})".format(t['id'], t['replied_tweet_id']))
- t['replied_tweet'] = tweet_model(includes, replied_tweets[0], me, reply_depth=reply_depth + 1)
- else:
- print("No replied tweet found (t={}, rep={})".format(t['id'], t['replied_tweet_id']))
- except:
- print('error adding replied_to tweet')
-
-
- try:
- tm = TweetModel(**t)
- except:
- print('error populating TweetModel')
-
- return t
- def tweet_paginated_timeline ():
- return
- # This is a hybrid of get_tweet_html and get_collection_html, where we feed in the IDs.
- @twitter_app.route('/data/tweets', methods=['GET'])
- def get_twitter_tweets ():
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
- ids = request.args.get('ids')
- max_id=''
- if ids:
- ids = ids.split(',')
- tweet_source = ApiV2TweetSource(token)
-
- response_json = tweet_source.get_tweets(ids)
-
- user = {
- 'id': user_id
- }
-
- query = {}
-
- if 'HX-Request' in request.headers:
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data']))
-
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- return jsonify(response_json)
-
-
- @twitter_app.route('/data/mentions/<user_id>', methods=['GET'])
- def get_data_mentions (user_id):
-
- token = g.twitter_user['access_token']
-
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_mentions_timeline(user_id,
- pagination_token = pagination_token)
-
- # the OG tweet is in the include.tweets collection.
- # All thread tweets are as well, clearly. Does it cost a fetch?
- #print(response_json)
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, token), response_json['data']))
-
- related_tweets = [] # derived from includes
-
- tweets.reverse()
-
- next_token = response_json.get('meta').get('next_token')
-
- query = {}
-
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_data_metnions', pagination_token=next_token)
- }
-
- if 'HX-Request' in request.headers:
- user = {
- 'id': user_id
- }
- # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n"))
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- response_body = json.dumps({
- 'tweets': tweets,
- 'pagination_token': pagination_token,
- 'next_token': next_token
- })
- return jsonify(response_body)
- @twitter_app.route('/data/between/<user_id>/<user2_id>', methods=['GET'])
- def get_data_between (user_id, user2_id):
-
- token = g.twitter_user['access_token']
-
- pagination_token = request.args.get('pagination_token')
-
- if user_id == 'me':
- user_id = g.twitter_user['id']
-
- if user2_id == 'me':
- user2_id = g.twitter_user['id']
-
- search_query = "(from:{} to:{}) OR (to:{} from:{})".format(user_id, user2_id, user_id, user2_id)
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.search_tweets(search_query,
- pagination_token = pagination_token)
-
- # the OG tweet is in the include.tweets collection.
- # All thread tweets are as well, clearly. Does it cost a fetch?
- #print(response_json)
-
- # augment with archive if one of the users is me
- # /twitter-archive/tweets/search?in_reply_to_user_id=__
- # /twitter-archive/tweets/search?q=@__
-
- tweets = []
- next_token = None
- if response_json.get('meta').get('result_count'):
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data']))
-
- related_tweets = [] # derived from includes
-
- next_token = response_json.get('meta').get('next_token')
-
-
-
- tweets.reverse()
-
-
-
- query = {}
-
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_data_between', pagination_token=next_token)
- }
-
- if 'HX-Request' in request.headers:
- user = {
- 'id': twitter['id']
- }
- # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n"))
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query, me = me)
- else:
- response_body = json.dumps({
- 'tweets': tweets,
- 'pagination_token': pagination_token,
- 'next_token': next_token
- })
- return jsonify(response_body)
- @twitter_app.route('/data/thread/<tweet_id>', methods=['GET'])
- def get_data_thread (tweet_id):
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_thread(tweet_id,
- author_id=user_id,
- pagination_token = pagination_token)
-
- # the OG tweet is in the include.tweets collection.
- # All thread tweets are as well, clearly. Does it cost a fetch?
- #print(response_json)
-
- tweets = []
- next_token = None
- if response_json.get('meta').get('result_count'):
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data']))
-
- # FIXME this method is OK except it doesn't work if there are no replies.
- #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me))
-
- #related_tweets = [] # derived from includes
-
- next_token = response_json.get('meta').get('next_token')
-
- if not pagination_token:
- response_json = tweet_source.get_tweet(tweet_id)
- print("parent tweet=")
- #print(response_json)
-
- includes = response_json.get('includes')
- tweet = response_json.get('data')[0]
-
- tweets.append(tweet_model(includes, tweet, g.me))
-
- tweets.reverse()
-
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_data_thread', tweet_id=tweet_id, pagination_token=next_token)
- }
-
- if 'HX-Request' in request.headers:
- user = {
- 'id': user_id
- }
- # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n"))
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- response_body = json.dumps({
- 'tweets': tweets,
- 'pagination_token': pagination_token,
- 'next_token': next_token
- })
- return jsonify(response_body)
- @twitter_app.route('/data/conversation/<tweet_id>', methods=['GET'])
- def get_data_conversation (tweet_id):
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
- pagination_token = request.args.get('pagination_token')
- only_replies = request.args.get('only_replies')
-
- tweet_source = ApiV2TweetSource(token)
-
- # seems to get l
- response_json = tweet_source.get_thread(tweet_id,
- only_replies = only_replies == '1',
- pagination_token = pagination_token)
-
- # the OG tweet is in the include.tweets collection.
- # All thread tweets are as well, clearly. Does it cost a fetch?
- #print(response_json)
-
- tweets = []
- next_token = None
- print("conversation meta:")
- print(json.dumps(response_json.get('meta'), indent=2))
- if response_json.get('meta').get('result_count'):
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data']))
-
- next_token = response_json.get('meta').get('next_token')
-
- # this method is OK except it doesn't work if there are no replies.
- #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me))
-
- if not pagination_token:
- response_json = tweet_source.get_tweet(tweet_id)
-
-
-
- print("parent tweet=")
- print(response_json)
-
- includes = response_json.get('includes')
- tweet = response_json.get('data')[0]
-
- tweets.append(tweet_model(includes, tweet, g.me))
-
- #related_tweets = [] # derived from includes
-
- tweets.reverse()
-
-
-
- query = {}
-
- if next_token:
- query = {
- **query,
- 'next_data_url': url_for('.get_data_conversation', tweet_id=tweet_id, pagination_token=next_token)
- }
-
- if 'HX-Request' in request.headers:
- user = {
- 'id': user_id
- }
-
- # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n"))
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- response_body = {
- 'tweets': tweets,
- 'pagination_token': pagination_token,
- 'next_token': next_token
- }
- return jsonify(response_body)
- @twitter_app.route('/data/likes/<user_id>', methods=['GET'])
- def get_data_likes (user_id):
-
- token = g.twitter_user['access_token']
-
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_likes(user_id,
- pagination_token = pagination_token)
-
- ts = int(time.time() * 1000)
- with open(f'{DATA_DIR}/cache/likes_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
- f.write(json.dumps(response_json))
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data']))
- next_token = response_json.get('meta').get('next_token')
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_data_likes', user_id=user_id, pagination_token=next_token)
- }
-
- if 'HX-Request' in request.headers:
- user = {
- 'id': user_id
- }
-
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- response_body = json.dumps({
- 'tweets': tweets,
- 'query': query
- })
- return jsonify(response_body)
- @twitter_app.route('/data/tweets/user/<user_id>/media', methods=['GET'])
- def get_data_tweets_media (user_id):
-
- token = g.twitter_user['access_token']
-
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_media_tweets(author_id=user_id,
- has_images=True,
- is_reply=False,
- is_retweet=False,
- pagination_token = pagination_token)
-
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data']))
- next_token = response_json.get('meta').get('next_token')
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_data_tweets_media', user_id=user_id, pagination_token=next_token)
- }
-
- if 'HX-Request' in request.headers:
- user = {
- 'id': user_id
- }
-
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- response_body = json.dumps({
- 'tweets': tweets,
- 'query': query
- })
- return jsonify(response_body)
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- # HTMx views
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- @twitter_app.route('/latest.html', methods=['GET'])
- def get_timeline_home_html (variant = "reverse_chronological", pagination_token=None):
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- if not pagination_token:
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_home_timeline(user_id,
- pagination_token = pagination_token)
-
- #print(json.dumps(response_json, indent=2))
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data']))
- next_token = response_json.get('meta').get('next_token')
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- #'next_data_url': url_for('.get_data_timeline_home', variant=variant, pagination_token=next_token),
- 'next_data_url': url_for('.get_timeline_home_html', pagination_token=next_token),
- 'next_page_url': url_for('.get_timeline_home_html', pagination_token=next_token)
- }
-
- user = {
- 'id': user_id
- }
-
- if 'HX-Request' in request.headers:
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- return render_template('tweet-collection.html', user = user, tweets = tweets, query = query)
- @twitter_app.route('/bookmarks.html', methods=['GET'])
- def get_bookmarks_html ():
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_bookmarks(user_id,
- pagination_token = pagination_token)
-
- #print(response_json)
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data']))
- next_token = response_json.get('meta').get('next_token')
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_bookmarks_html', user_id=user_id, pagination_token=next_token),
- 'next_page_url': url_for('.get_bookmarks_html', user_id=user_id, pagination_token=next_token)
- }
-
- user = {
- 'id': user_id
- }
-
- if 'HX-Request' in request.headers:
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- return render_template('tweet-collection.html', user = user, tweets = tweets, query = query)
- @twitter_app.route('/conversations.html', methods=['GET'])
- def get_conversations_html ():
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
-
- pagination_token = request.args.get('pagination_token')
- max_results = int(request.args.get('max_results', 10))
-
- # https://developer.twitter.com/en/docs/twitter-api/direct-messages/lookup/api-reference/get-dm_events
- url = "https://api.twitter.com/2/dm_events"
- params = {
- "dm_event.fields": "id,event_type,text,created_at,dm_conversation_id,sender_id,participant_ids,referenced_tweets,attachments",
- "expansions": ",".join(["sender_id", "participant_ids"]),
- "max_results": max_results,
-
- "user.fields": ",".join(["id", "created_at", "name", "username", "location", "profile_image_url", "url", "verified"])
- }
-
- if pagination_token:
- params['pagination_token'] = pagination_token
-
- headers = {"Authorization": "Bearer {}".format(token)}
-
- response = requests.get(url, params=params, headers=headers)
- response_json = json.loads(response.text)
-
- print(response.text)
-
- dm_events = response_json.get('data')
- next_token = response_json.get('meta').get('next_token')
-
- query = {
- 'pagination_token': pagination_token,
- 'next_token': next_token
- }
-
- user = {
- 'id': user_id
- }
-
- return render_template('conversations.html', user = user, dm_events = dm_events, query = query)
- @twitter_app.route('/profile/<user_id>.html', methods=['GET'])
- def get_profile_html (user_id):
- token = g.twitter_user['access_token']
- is_me = user_id == g.twitter_user['id']
-
- pagination_token = request.args.get('pagination_token')
- exclude_replies = request.args.get('exclude_replies', '0')
- exclude_retweets = request.args.get('exclude_retweets', '0')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_user_timeline(user_id,
- exclude_replies = exclude_replies == '1',
- exclude_retweets = exclude_retweets == '1',
- pagination_token = pagination_token,
- non_public_metrics = is_me)
- ts = int(time.time() * 1000)
- with open(f'{DATA_DIR}/cache/tl_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
- f.write(json.dumps(response_json))
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data']))
- next_token = response_json.get('meta').get('next_token')
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_profile_html', user_id=user_id, pagination_token=next_token, exclude_replies=exclude_replies, exclude_retweets=exclude_retweets),
- 'next_page_url': url_for('.get_profile_html', user_id=user_id , pagination_token=next_token, exclude_replies=exclude_replies, exclude_retweets=exclude_retweets)
- }
-
- profile_user = {
- 'id': user_id
- }
-
-
- if 'HX-Request' in request.headers:
- return render_template('partial/tweets-timeline.html', user = profile_user, tweets = tweets, query = query)
- else:
- return render_template('user-profile.html', user = profile_user, tweets = tweets, query = query)
- @twitter_app.route('/media/upload', methods=['POST'])
- def post_media_upload ():
-
- token = g.twitter_user['access_token']
- form = {
- 'media_category': 'tweet_image'
- }
-
- headers = {
- 'Authorization': 'Bearer {}'.format(token)
- }
-
- url = 'http://localhost:5004/twitter/fake-twitter/media/upload'
- #url = 'https://upload.twitter.com/1.1/media/upload.json' # .json
-
-
-
- upload_media = {}
- for e in request.files.items():
- media_name = e[0]
- f = e[1]
-
- print('.')
-
- files = {'media': [secure_filename(f.filename), BufferedReader(f), f.content_type]}
- response = requests.post(url, files=files, data=form, headers=headers)
-
- print(response.status_code)
- print(response.text)
-
- response_json = json.loads(response.text)
-
- upload_media[media_name] = response_json
-
- return jsonify({'upload_media': upload_media})
- @twitter_app.route('/fake-twitter/media/upload', methods=['POST'])
- def post_media_upload2 ():
- print(request.content_type)
-
- f = request.files.get('media')
-
- f.seek(0,2)
- media_size = f.tell()
- media = {
- #'_auth': request.headers.get('Authorization'),
- 'media_key': '3_{}'.format(secure_filename(f.filename)),
- 'media_id': secure_filename(f.filename),
- 'size': media_size,
- 'expires_after_secs': 86400,
- 'image': {
- 'image_type': f.content_type,
- 'w': 1,
- 'h': 1
- }
- }
- return jsonify(media)
|