12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409 |
- from typing import List
- from dataclasses import asdict, replace
- from dacite import from_dict
- from configparser import ConfigParser
- import base64
- import sqlite3
- import os
- import json
- import json_stream
- from zipfile import ZipFile
- import itertools
- import time
- from io import BufferedReader
- import re
- import datetime
- import dateutil
- import dateutil.parser
- import dateutil.tz
- import requests
- from werkzeug.utils import secure_filename
- from flask import json, Response, render_template, request, send_from_directory, Blueprint, session, redirect, g, current_app, jsonify
- from flask_cors import CORS
- from tweet_source import ApiV2TweetSource, TwitterApiV2SocialGraph, ApiV2ConversationSource
- from twitter_v2_types import Tweet, TweetExpansions
- from view_model import FeedItem, FeedServiceUser, FeedItemAction, MediaItem, Card, PublicMetrics, NonPublicMetrics, cleandict
- import oauth2_login
- DATA_DIR='.data'
- twitter_app = Blueprint('twitter_v2_facade', 'twitter_v2_facade',
- static_folder='static',
- static_url_path='',
- url_prefix='/')
-
- twitter_app.register_blueprint(oauth2_login.oauth2_login, url_prefix="/")
- twitter_app.context_processor(oauth2_login.inject_me)
- twitter_app.before_request(oauth2_login.add_me)
- url_for = oauth2_login.url_for_with_me
- def run_script(script_name, script_vars):
- script_path = './{}.py'.format(script_name)
- if (os.path.exists(script_path)):
- script_file = open(script_path, 'r')
- script = script_file.read()
- script_file.close()
- try:
- return exec(script, script_vars)
- except:
- print('error running script: {}'.format(script_name))
- return False
- False
- class ActivityData:
- def __init__ (self, user_id, db_path):
-
- self.db_path = db_path
- self.user_id = user_id
-
- db_exists = os.path.exists(db_path)
-
- self.db = sqlite3.connect(db_path)
-
- if not db_exists:
- self.init_db()
-
- return
-
- def init_db (self):
-
- self.db.execute('create table seen_user (ts, user_id)')
- self.db.execute('create table seen_tweet (ts, tweet_id)')
-
- return
-
- def seen_tweet (self, tweet_id):
- return
-
- def seen_user (self, user_id):
- return
-
- def add_tweet_counts (self, user_id, start, end, tweet_count):
- return [current_ts, user_id, start, end, tweet_count]
-
- def add_tweet_public_metrics (self, tweet_id, like_count, reply_count, retweet_count, quote_count):
- return
-
- def add_tweet_non_public_metrics (self, tweet_id, impression_count, click_count, link_click_count, profile_click_count):
- return
-
- def add_user_public_metrics (self, user_id, followers_count, following_count, tweet_count, listed_count):
- return
- class DataSet:
- def __init__ (self):
- self.items = {}
- return
-
- def update_items (self, items):
- """
- merges objects by ID. Asssigns an ID if none exists. Mutates OG object.
- """
-
- ids = []
-
- for item in items:
- if not 'id' in item:
- #item = dict(item)
- item['id'] = uuid.uuid4().hex
- else:
- existing_item = self.items.get( item['id'] )
- if existing_item:
- existing_item.update(item)
- item = existing_item
- self.items[ item['id'] ] = item
- ids.append( item['id'] )
-
- return ids
-
- def get_items (self):
- return self.items.values()
- class TwitterMetadata:
- def __init__ (self, data_dir):
- self.data_dir = data_dir
-
- os.mkdir(data_dir, exist_ok=True)
-
- def get_tweet (self, tweet_id):
- path = f'{self.data_dir}/tweet_{tweet_id}.json'
-
- if not os.path.exists(path):
- return None
-
- with open(path, 'rt') as f:
- return json.loads(f.read())
-
-
- def update_tweet (self, tweet_id, fields):
- tweet = self.get_tweet(tweet_id)
-
- if not tweet:
- tweet = {'id': tweet_id}
-
- tweet.update(fields)
-
- with open(f'{self.data_dir}/tweet_{tweet_id}.json', 'wt') as f:
- f.write(json.dumps(tweet))
-
- return tweet
- def collection_from_card_source (url):
- """
- temp1 = await fetch('http://localhost:5000/notes/cards/search?q=twitter.com/&limit=10').then(r => r.json())
- re = /(http[^\s]+twitter\.com\/[^\/]+\/status\/[\d]+)/ig
- tweetLinks = temp1.cards.map(c => c.card.content).map(c => c.match(re))
- tweetLinks2 = tweetLinks.flat().filter(l => l)
- tweetLinksS = Array.from(new Set(tweetLinks2))
- statusUrls = tweetLinksS.map(s => new URL(s))
- //users = Array.from(new Set(statusUrls.map(s => s.pathname.split('/')[1])))
- ids = Array.from(new Set(statusUrls.map(s => parseInt(s.pathname.split('/')[3]))))
- """
-
- """
- temp1 = JSON.parse(document.body.innerText)
- // get swipe note + created_at + tweet user + tweet ID
- tweetCards = temp1.cards.map(c => c.card).filter(c => c.content.match(re))
- tweets = tweetCards.map(c => ({created_at: c.created_at, content: c.content, tweets: c.content.match(re).map(m => new URL(m))}))
-
- tweets.filter(t => t.tweets.filter(t2 => t2.user.toLowerCase() == 'stephenmpinto').length)
-
-
- // HN
- re = /(http[^\s]+news.ycombinator\.com\/[^\s]+\=[\d]+)/ig
- linkCards = temp1.cards.map(c => c.card).filter(c => c.content.match(re))
- links = linkCards.map(c => ({created_at: c.created_at, content: c.content, links: c.content.match(re).map(m => new URL(m))}))
-
- // YT (I thnk I've already done this one)
-
- """
-
- # more in 2022 twitter report
- return None
- def get_tweet_collection (collection_id):
- with open(f'{DATA_DIR}/collection/{collection_id}.json', 'rt', encoding='utf-8') as f:
- collection = json.loads(f.read())
-
- return collection
- # pagination token is the next tweet_ID
- @twitter_app.get('/collection/<collection_id>.html')
- def get_collection_html (collection_id):
-
- max_results = int(request.args.get('max_results', 10))
-
- pagination_token = request.args.get('pagination_token')
-
- collection = get_tweet_collection(collection_id)
-
- if 'authorized_users' in collection and g.twitter_user['id'] not in collection['authorized_users']:
- return 'access denied.', 403
-
- items = []
- for item in collection['items']:
- tweet_id = item['id']
- if pagination_token and tweet_id != pagination_token:
- continue
- elif tweet_id == pagination_token:
- pagination_token = None
- elif len(items) == max_results:
- pagination_token = tweet_id
- break
-
- items.append(item)
-
-
- if not len(items):
- return 'no tweets', 404
-
- token = g.twitter_user['access_token']
-
-
- tweet_source = ApiV2TweetSource(token)
-
- tweet_ids = list(map(lambda item: item['id'], items))
- response_json = tweet_source.get_tweets( tweet_ids, return_dataclass=True )
-
- #print(response_json)
- if response_json.errors:
- # types:
- # https://api.twitter.com/2/problems/not-authorized-for-resource (blocked or suspended)
- # https://api.twitter.com/2/problems/resource-not-found (deleted)
- #print(response_json.get('errors'))
- for err in response_json.errors:
- if not 'type' in err:
- print('unknown error type: ' + str(err))
- elif err['type'] == 'https://api.twitter.com/2/problems/not-authorized-for-resource':
- print('blocked or suspended tweet: ' + err['value'])
- elif err['type'] == 'https://api.twitter.com/2/problems/resource-not-found':
- print('deleted tweet: ' + err['value'])
- else:
- print('unknown error')
-
- print(json.dumps(err, indent=2))
-
-
- includes = response_json.includes
- tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me), response_json.data))
-
- for item in items:
- t = list(filter(lambda t: item['id'] == t.id, tweets))
-
- if not len(t):
- print("no tweet for item: " + item['id'])
- t = FeedItem(
- id = item['id'],
- text = "(Deleted, suspended or blocked)",
- created_at = "",
- handle = "error",
- display_name = "Error"
- )
- # FIXME 1) put this in relative order to the collection
- # FIXME 2) we can use the tweet link to get the user ID...
-
- else:
- t = t[0]
-
- t = replace(t, note = item['note'])
-
- tweets.append(t)
-
- if request.args.get('format') == 'json':
- return jsonify({'ids': tweet_ids,
- 'data': cleandict(asdict(response_json)),
- 'tweets': tweets,
- 'items': items,
- 'pagination_token': pagination_token})
- else:
- query = {}
-
- if pagination_token:
- query['next_data_url'] = url_for('twitter_v2_facade.get_collection_html', collection_id=collection_id, pagination_token=pagination_token)
-
- if 'HX-Request' in request.headers:
- return render_template('partial/tweets-timeline.html', tweets = tweets, user = {}, query = query)
- else:
- if pagination_token:
- query['next_page_url'] = url_for('twitter_v2_facade.get_collection_html', collection_id=collection_id, pagination_token=pagination_token)
- return render_template('user-profile.html', tweets = tweets, user = {}, query = query)
- @twitter_app.post('/data/collection/create/from-cards')
- def post_data_collection_create_from_cards ():
- """
- // create collection from search, supporting multiple Tweets per card and Tweets in multiple Cards.
-
-
- re = /(https?[a-z0-9\.\/\:]+twitter\.com\/[0-9a-z\_]+\/status\/[\d]+)/ig
-
- temp1 = await fetch('http://localhost:5000/notes/cards/search?q=twitter.com/').then(r => r.json())
- cardMatches = temp1.cards
- .map(cm => Object.assign({}, cm, {tweetLinks: Array.from(new Set(cm.card.content.match(re)))}))
- .filter(cm => cm.tweetLinks && cm.tweetLinks.length)
- .map(cm => Object.assign({}, cm, {tweetUrls: cm.tweetLinks.map(l => new URL(l))}))
- .map(cm => Object.assign({}, cm, {tweetInfos: cm.tweetUrls.map(u => ({user: u.pathname.split('/')[1], tweetId: u.pathname.split('/')[3]}))}));
-
- collectionCards = {}
- cardMatches.forEach(function (cm) {
- if (!cm.tweetLinks.length) { return; }
- cm.tweetInfos.forEach(function (ti) {
- if (!collectionCards[ti.tweetId]) {
- collectionCards[ti.tweetId] = [];
- }
- collectionCards[ti.tweetId].push(cm.card);
- })
- })
- var collectionItems = [];
- Object.entries(collectionCards).forEach(function (e) {
- var tweetId = e[0], cards = e[1];
- var note = cards.map(function (card) {
- return card.created_at + "\n\n" + card.content;
- }).join("\n\n-\n\n");
- collectionItems.push({id: tweetId, note: note, tweet_infos: cm.tweetInfos, card_infos: cards.map(c => 'card#' + c.id)});
- })
- """
-
- collection = {
- 'items': [], # described in JS function above
- 'authorized_users': [g.twitter_user['id']]
- }
-
- return jsonify(collection)
- #twitter_meta = TwitterMetadata('./data/meta')
- @twitter_app.route('/tweets', methods=['POST'])
- def post_tweets_create ():
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- text = request.form.get('text')
- reply_to_tweet_id = request.form.get('reply_to_tweet_id')
- quote_tweet_id = request.form.get('quote_tweet_id')
-
- tweet_source = ApiV2TweetSource(token)
- result = tweet_source.create_tweet(text, reply_to_tweet_id=reply_to_tweet_id, quote_tweet_id=quote_tweet_id)
-
- print(result)
-
- run_script('on_tweeted', {'twitter_user': g.twitter_user, 'tweet': result})
-
- if 'HX-Request' in request.headers:
- return render_template('partial/compose-form.html', new_tweet_id=result['data']['id'])
- else:
- response_body = json.dumps({
- 'result': result
- })
- return jsonify(response_body)
- @twitter_app.route('/tweet/<tweet_id>/retweet', methods=['POST'])
- def post_tweet_retweet (tweet_id):
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- tweet_source = ApiV2TweetSource(token)
- result = tweet_source.retweet(tweet_id, user_id=user_id)
-
- print(result)
-
- run_script('on_tweeted', {'twitter_user': g.twitter_user, 'retweet': result})
-
- if 'HX-Request' in request.headers:
- return """retweeted <script>Toast.fire({
- icon: 'success',
- title: 'Retweet was sent; <a style="text-align: right" href="{}">View</a>.'
- });</script>""".replace('{}', url_for('.get_tweet_html', tweet_id=tweet_id))
- else:
- response_body = json.dumps({
- 'result': result
- })
- return jsonify(response_body)
- @twitter_app.route('/tweet/<tweet_id>/bookmark', methods=['POST'])
- def post_tweet_bookmark (tweet_id):
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- tweet_source = ApiV2TweetSource(token)
- result = tweet_source.bookmark(tweet_id, user_id=user_id)
-
- print(result)
- if 'HX-Request' in request.headers:
- return """bookmarked <script>Toast.fire({
- icon: 'success',
- title: 'Tweet was bookmarked; <a style="text-align: right" href="{}">View</a>.'
- });</script>""".replace('{}', url_for('.get_tweet_html', tweet_id=tweet_id))
- else:
- response_body = json.dumps({
- 'result': result
- })
- return jsonify(response_body)
- @twitter_app.route('/tweet/<tweet_id>/bookmark', methods=['DELETE'])
- def delete_tweet_bookmark (tweet_id):
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- tweet_source = ApiV2TweetSource(token)
- result = tweet_source.delete_bookmark(tweet_id, user_id=user_id)
-
- response_body = json.dumps({
- 'result': result
- })
- return jsonify(response_body)
- @twitter_app.route('/tweet/<tweet_id>.html', methods=['GET'])
- def get_tweet_html (tweet_id):
-
-
- pagination_token = request.args.get('pagination_token')
- view = request.args.get('view', 'replies')
-
-
- if g.twitter_user:
- token = g.twitter_user['access_token']
- else:
- token = os.environ.get('BEARER_TOKEN')
-
-
- tweet_source = ApiV2TweetSource(token)
-
- only_replies = view == 'replies'
-
-
- tweets = []
- if not pagination_token:
- tweets_response = tweet_source.get_tweet(tweet_id, return_dataclass=True)
-
- tweet = tweets_response.data[0]
-
- tweets.append(tweet_model_dc_vm(tweets_response.includes, tweet, g.me))
-
- skip_embed_replies = False
-
- if view == 'replies':
- replies_response = tweet_source.get_thread(tweet_id,
- only_replies=True,
- pagination_token = pagination_token,
- return_dataclass=True)
- elif view == 'thread':
- skip_embed_replies = True
- replies_response = tweet_source.get_thread(tweet_id,
- only_replies=False,
- author_id=tweets[0].author_id,
- pagination_token = pagination_token,
- return_dataclass=True)
-
- elif view == 'conversation':
- replies_response = tweet_source.get_thread(tweet_id,
- only_replies=False,
- pagination_token = pagination_token,
- return_dataclass=True)
- elif view == 'tweet':
- replies_response = None
-
- next_token = None
-
- #print("conversation meta:")
- #print(json.dumps(tweets_response.get('meta'), indent=2))
-
- if replies_response and replies_response.meta and replies_response.meta.result_count:
-
- includes = replies_response.includes
- tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me), replies_response.data)) + tweets
-
- next_token = replies_response.meta.next_token
-
- # this method is OK except it doesn't work if there are no replies.
- #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me))
-
- #related_tweets = [] # derived from includes
-
- tweets.reverse()
-
-
-
- query = {}
-
- if next_token:
- query = {
- **query,
- # FIXME only_replies
- 'next_data_url': url_for('.get_tweet_html', tweet_id=tweet_id, pagination_token=next_token, only_replies = '1' if only_replies else '0', author_id = tweets[0].author_id),
- 'next_page_url': url_for('.get_tweet_html', tweet_id=tweet_id, view=view, pagination_token=next_token)
- }
-
- user = {
- }
-
- if 'HX-Request' in request.headers:
-
- # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n"))
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- page_nav = [
- dict(
- href=url_for('.get_tweet_html', tweet_id=tweets[0].conversation_id, view='thread'),
- label = 'author thread',
- order = 10
- ),
- dict(
- href = url_for('.get_tweet_html', tweet_id=tweets[0].conversation_id, view='conversation'),
- label = 'full convo',
- order = 20
- )
- ]
-
- tweet = tweets_response.data[0]
- user = list(filter(lambda u: u.id == tweet.author_id, tweets_response.includes.users))[0]
-
- source_url = f'https://twitter.com/{user.username}/status/{tweet_id}'
- title = f'Tweet by {user.name} at {tweet.created_at}'
-
- opengraph_info = dict(
- type = 'webpage', # threads might be article
- url = source_url,
- title = title,
- description = tweet.text,
- image = user.profile_image_url
- )
-
- return render_template('tweet-collection.html', user = user, tweets = tweets, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info)
- @twitter_app.route('/followers/<user_id>.html', methods=['GET'])
- def get_followers_html (user_id):
-
- if not g.twitter_user:
- return 'need to log in.', 403
-
- use_cache = request.args.get('use_cache')
-
- token = g.twitter_user['access_token']
-
- social_source = TwitterApiV2SocialGraph(token)
-
- if use_cache:
- print(f'using cache for user {user_id}: {use_cache}')
- with open(f'.data/cache/followers_{user_id}_{use_cache}.json', 'rt') as f:
- response_json = json.load(f)
- else:
- response_json = social_source.get_followers(user_id, return_dataclass=True)
-
- if not use_cache:
- ts = int(time.time() * 1000)
-
- print(f'followers cache for {user_id}: {ts}')
-
- with open(f'{DATA_DIR}/cache/followers_{user_id}_{ts}.json', 'wt') as f:
- json.dump(response_json, f, indent=2)
-
- #print(response_json)
- #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': response_json})
-
- #followers = list(map(lambda f: f['id'], response_json.get('data')))
- followers = response_json.data
-
- followers = list(map(user_model_dc, followers))
-
- return render_template('following.html', users=followers)
- @twitter_app.route('/following/<user_id>.html', methods=['GET'])
- def get_following_html (user_id):
-
- if not g.twitter_user:
- return 'need to log in.', 403
-
- token = g.twitter_user['access_token']
-
- social_source = TwitterApiV2SocialGraph(token)
-
- response_json = social_source.get_following(user_id, return_dataclass=True)
-
- ts = int(time.time() * 1000)
- with open(f'{DATA_DIR}/cache/following_{user_id}_{ts}.json', 'wt') as f:
- f.write(json.dumps(response_json))
-
- #print(response_json)
- #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': response_json})
-
- #following = list(map(lambda f: f['id'], response_json.get('data')))
- following = list(map(user_model_dc, response_json.data))
-
- return render_template('following.html', users=following)
-
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- # HTMx partials
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- def user_model (user):
-
- fsu = FeedServiceUser(
- id = user['id'],
- name = user['name'],
- username = user['username'],
- created_at = user['created_at'],
- description = '', # user['description'],
- preview_image_url = '', # user['profile_image_url'],
-
- url = url_for('.get_profile_html', user_id=user['id'])
- )
-
- return fsu
- def user_model_dc (user):
-
- fsu = FeedServiceUser(
- id = user.id,
- name = user.name,
- username = user.username,
- created_at = user.created_at,
- description = user.description,
- preview_image_url = user.profile_image_url,
- website = user.url,
-
-
- url = url_for('.get_profile_html', user_id=user.id),
- source_url = f'https://twitter.com/{user.username}'
- )
-
- return fsu
- def tweet_model_dc_vm (includes: TweetExpansions, tweet: Tweet, me, my_url_for=url_for, reply_depth=0) -> FeedItem:
-
- # retweeted_by, avi_icon_url, display_name, handle, created_at, text
-
-
- user = list(filter(lambda u: u.id == tweet.author_id, includes.users))[0]
-
-
- url = my_url_for('twitter_v2_facade.get_tweet_html', tweet_id=tweet.id, view='tweet')
- source_url = 'https://twitter.com/{}/status/{}'.format(user.username, tweet.id)
-
- avi_icon_url = user.profile_image_url
-
- retweet_of = None
- quoted = None
- replied_to = None
-
- if tweet.referenced_tweets:
- retweet_of = list(filter(lambda r: r.type == 'retweeted', tweet.referenced_tweets))
- quoted = list(filter(lambda r: r.type == 'quoted', tweet.referenced_tweets))
- replied_to = list(filter(lambda r: r.type == 'replied_to', tweet.referenced_tweets))
-
- actions = {
- 'view_replies': FeedItemAction('twitter_v2_facade.get_tweet_html', {'tweet_id': tweet.id, 'view': 'replies'}),
-
- 'view_thread': FeedItemAction('twitter_v2_facade.get_tweet_html', {'tweet_id': tweet.conversation_id, 'view': 'thread'}),
- 'view_conversation': FeedItemAction('twitter_v2_facade.get_tweet_html', {'tweet_id': tweet.conversation_id, 'view': 'conversation'}),
- }
-
- if g.twitter_user:
- actions.update(
- bookmark = FeedItemAction('twitter_v2_facade.post_tweet_bookmark', {'tweet_id': tweet.id}),
- delete_bookmark = FeedItemAction('twitter_v2_facade.delete_tweet_bookmark', {'tweet_id': tweet.id}),
-
- retweet = FeedItemAction('twitter_v2_facade.post_tweet_retweet', {'tweet_id': tweet.id})
- )
-
- if g.twitter_live_enabled:
- actions.update(
- view_activity = FeedItemAction('twitter_v2_live_facade.get_tweet_activity_html', {'tweet_id': tweet.id})
- )
-
- t = FeedItem(
- id = tweet.id,
- text = tweet.text,
- created_at = tweet.created_at,
- author_is_verified = user.verified,
- url = url,
-
- conversation_id = tweet.conversation_id,
-
- avi_icon_url = avi_icon_url,
-
- display_name = user.name,
- handle = user.username,
-
- author_url = my_url_for('twitter_v2_facade.get_profile_html', user_id=user.id),
- author_id = user.id,
-
- source_url = source_url,
- source_author_url = 'https://twitter.com/{}'.format(user.username),
- #'is_edited': len(tweet['edit_history_tweet_ids']) > 1
-
- actions = actions,
- )
-
- if reply_depth:
- t = replace(t, reply_depth = reply_depth)
-
- # HACK we should not refer to the request directly...
- if request and request.args.get('marked_reply') == str(t.id):
- t = replace(t, is_marked = True)
-
- # This is where we should put "is_bookmark", "is_liked", "is_in_collection", etc...
-
- if tweet.entities:
- if tweet.entities.urls:
- urls = list(filter(lambda u: u.title and u.description, tweet.entities.urls))
-
- if len(urls):
- url = urls[0]
- card = Card(
- display_url = url.display_url.split('/')[0],
- source_url = url.unwound_url,
- content = url.description,
- title = url.title
- )
- t = replace(t, card = card)
-
- if tweet.public_metrics:
- public_metrics = PublicMetrics(
- reply_count = tweet.public_metrics.reply_count,
- quote_count = tweet.public_metrics.quote_count,
- retweet_count = tweet.public_metrics.retweet_count,
- like_count = tweet.public_metrics.like_count
- )
-
- t = replace(t, public_metrics = public_metrics)
-
- if tweet.non_public_metrics:
- non_public_metrics = NonPublicMetrics(
- impression_count = tweet.non_public_metrics.impression_count,
- user_profile_clicks = tweet.non_public_metrics.user_profile_clicks,
- url_link_clicks = tweet.non_public_metrics.url_link_clicks
- )
-
- t = replace(t, non_public_metrics = non_public_metrics)
-
- if retweet_of and len(retweet_of):
- print('found retweet_of')
- t = replace(t, retweeted_tweet_id = retweet_of[0].id)
-
- retweeted_tweet:Tweet = list(filter(lambda t: t.id == retweet_of[0].id, includes.tweets))[0]
-
- rt = tweet_model_dc_vm(includes, retweeted_tweet, me)
-
- t = replace(rt,
- retweeted_tweet_id = retweet_of[0].id,
- source_retweeted_by_url = 'https://twitter.com/{}'.format(user.username),
- retweeted_by = user.name,
- retweeted_by_url = url_for('.get_profile_html', user_id=user.id)
- )
-
-
- try:
- if tweet.attachments and tweet.attachments.media_keys and includes.media:
-
- media_keys = tweet.attachments.media_keys
-
- def first_media (mk):
- medias = list(filter(lambda m: m.media_key == mk, includes.media))
- if len(medias):
- return medias[0]
- return None
-
- media = list(filter(lambda m: m != None, map(first_media, media_keys)))
-
- photos = filter(lambda m: m.type == 'photo', media)
- videos = filter(lambda m: m.type == 'video', media)
-
- photo_media = map(lambda p: MediaItem(media_key = p.media_key, type = 'photo', preview_image_url = p.url + '?name=tiny&format=webp', url = p.url, width = p.width, height = p.height), photos)
-
- def video_to_mi (v):
- use_hls = False # mainly iOS
- max_bitrate = 100000000
-
- if use_hls:
- variants = list(filter(lambda var: var.content_type == 'application/x-mpegURL'))
- else:
-
- variants = list(filter(lambda var: var.content_type != 'application/x-mpegURL' and var.bit_rate <= max_bitrate, v.variants))
-
- variants.sort(key=lambda v: v.bit_rate, reverse=True)
-
- url = None
- content_type = None
- size = None
- if len(variants):
- if len(variants) > 1:
- print('multiple qualifying variants (using first):')
- print(variants)
- variant = variants[0]
-
- url = variant.url
- content_type = variant.content_type
- size = int(v.duration_ms / 1000 * variant.bit_rate)
-
- public_metrics = None
- if v.public_metrics and v.public_metrics.view_count:
- public_metrics = PublicMetrics(
- view_count = v.public_metrics.view_count
- )
-
- mi = MediaItem(
- media_key = v.media_key,
- type = 'video',
- preview_image_url = v.preview_image_url + '?name=tiny&format=webp',
- image_url = v.preview_image_url,
- width = v.width,
- height = v.height,
- url=url,
- content_type = content_type,
- duration_ms = v.duration_ms,
- size = size,
- public_metrics = public_metrics
- )
-
- return mi
-
- video_media = map(video_to_mi, videos)
-
-
-
- t = replace(t,
- photos = list(photo_media),
- videos = list(video_media)
- )
-
- elif tweet.attachments and tweet.attachments.media_keys and not includes.media:
- print('tweet had attachments and media keys, but no expansion media content was given')
- print(tweet.attachments.media_keys)
-
- except:
- # it seems like this comes when we have a retweeted tweet with media on it.
- print('exception adding attachments to tweet:')
- print(tweet)
- print('view tweet:')
- print(t)
- print('included media:')
- print(includes.media)
-
- raise 'exception adding attachments to tweet'
-
-
-
- try:
- if quoted and len(quoted):
- t = replace(t, quoted_tweet_id = quoted[0].id)
-
- quoted_tweets = list(filter(lambda t: t.id == quoted[0].id, includes.tweets))
-
- if len(quoted_tweets):
- t = replace(t, quoted_tweet = tweet_model_dc_vm(includes, quoted_tweets[0], me))
- except:
- raise 'error adding quoted tweet'
-
- try:
- if replied_to and len(replied_to) and includes.tweets:
- t = replace(t, replied_tweet_id = replied_to[0].id)
-
- if reply_depth < 1:
-
- replied_tweets = list(filter(lambda t: t.id == replied_to[0].id, includes.tweets))
- if len(replied_tweets):
- t = replace(t, replied_tweet = tweet_model_dc_vm(includes, replied_tweets[0], me, reply_depth=reply_depth + 1))
- else:
- print("No replied tweet found (t={}, rep={})".format(t.id, t.replied_tweet_id))
- except:
- raise 'error adding replied_to tweet'
-
- return t
- def tweet_paginated_timeline ():
- return
- @twitter_app.route('/data/tweets/user/<user_id>/media', methods=['GET'])
- def get_data_tweets_media (user_id):
- """
- Not used anywhere... trying an idea. tweet_model needs to be updated.
- """
- token = g.twitter_user['access_token']
-
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_media_tweets(author_id=user_id,
- has_images=True,
- is_reply=False,
- is_retweet=False,
- pagination_token = pagination_token)
-
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data']))
- next_token = response_json.get('meta').get('next_token')
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_data_tweets_media', user_id=user_id, pagination_token=next_token)
- }
-
- if 'HX-Request' in request.headers:
- user = {
- 'id': user_id
- }
-
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- response_body = json.dumps({
- 'data': tweets,
- 'query': query
- })
- return jsonify(response_body)
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- # HTMx views
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- @twitter_app.route('/latest.html', methods=['GET'])
- def get_timeline_home_html (variant = "reverse_chronological", pagination_token=None):
-
- if not g.twitter_user:
- return 'need to login. go to /login.html', 403
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- if not pagination_token:
- pagination_token = request.args.get('pagination_token')
-
- output_format = request.args.get('format', 'html')
-
- tq = cleandict({
- 'pagination_token': pagination_token,
- 'since_id': request.args.get('since_id'),
- 'until_id': request.args.get('until_id'),
- 'end_time': request.args.get('end_time')
- })
-
- tweet_source = ApiV2TweetSource(token)
- response = tweet_source.get_home_timeline(user_id, **tq)
-
- #print(json.dumps(response_json, indent=2))
-
- includes = response.includes
- tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me), response.data))
- next_token = response.meta.next_token
-
- tq['pagination_token'] = next_token
-
- query = {
- **tq,
- 'format': output_format,
- 'me': g.me
- }
-
- if next_token:
- query = {
- **query,
-
- #'next_data_url': url_for('.get_data_timeline_home', variant=variant, pagination_token=next_token),
- 'next_data_url': url_for('.get_timeline_home_html', **tq),
- 'next_page_url': url_for('.get_timeline_home_html', **tq)
- }
-
- user = {
- 'id': user_id
- }
-
-
- if output_format == 'feed.json':
- return jsonify(cleandict({
- 'data': tweets,
- 'query': query
- }))
- elif 'HX-Request' in request.headers:
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query, show_thread_controls=True)
- else:
- return render_template('tweet-collection.html', user = user, tweets = tweets, query = query, show_thread_controls=True)
- @twitter_app.route('/bookmarks.html', methods=['GET'])
- def get_bookmarks_html ():
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_bookmarks(user_id,
- pagination_token = pagination_token, return_dataclass=True)
-
- #print(response_json)
-
- includes = response_json.includes
- tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me), response_json.data))
- next_token = response_json.meta.next_token
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_bookmarks_html', user_id=user_id, pagination_token=next_token),
- 'next_page_url': url_for('.get_bookmarks_html', user_id=user_id, pagination_token=next_token)
- }
-
- user = {
- 'id': user_id
- }
-
- if 'HX-Request' in request.headers:
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- return render_template('tweet-collection.html', user = user, tweets = tweets, query = query)
- @twitter_app.route('/profile/<user_id>.html', methods=['GET'])
- def get_profile_html (user_id):
- if g.twitter_user:
- token = g.twitter_user['access_token']
- # issue: retweets don't come back if we request non_public_metrics
- is_me = False and user_id == g.twitter_user['id']
- else:
- token = os.environ.get('BEARER_TOKEN')
- is_me = False
-
- output_format = request.args.get('format', 'html')
-
- pagination_token = request.args.get('pagination_token')
- exclude_replies = request.args.get('exclude_replies', '0')
- exclude_retweets = request.args.get('exclude_retweets', '0')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_user_timeline(user_id,
- exclude_replies = exclude_replies == '1',
- exclude_retweets = exclude_retweets == '1',
- pagination_token = pagination_token,
- non_public_metrics = is_me,
- return_dataclass=True)
-
- if not response_json:
- print('no response_json')
-
- if response_json.meta.result_count == 0:
- print('no results')
-
- if not response_json.includes:
- print(response_json)
- print('no response_json.includes')
-
- if response_json.errors:
- print('profile get_user_timeline errors:')
- print(response_json.errors)
-
- ts = int(time.time() * 1000)
- with open(f'{DATA_DIR}/cache/tl_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
- f.write(json.dumps(cleandict(asdict(response_json))))
-
- if response_json.data:
- tweets = list(map(lambda t: tweet_model_dc_vm(response_json.includes, t, g.me), response_json.data))
- else:
- tweets = []
-
- next_token = response_json.meta.next_token
-
- query = cleandict({
- 'pagination_token': pagination_token,
- 'exclude_replies': exclude_replies,
- 'exclude_retweets': exclude_retweets,
- 'format': output_format
- })
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_profile_html', user_id=user_id, pagination_token=next_token, exclude_replies=exclude_replies, exclude_retweets=exclude_retweets),
- 'next_page_url': url_for('.get_profile_html', user_id=user_id , pagination_token=next_token, exclude_replies=exclude_replies, exclude_retweets=exclude_retweets)
- }
-
-
-
-
- if output_format == 'feed.json':
- return jsonify(cleandict({
- 'data': tweets,
- 'query': query
- }))
- elif 'HX-Request' in request.headers:
- profile_user = {
- 'id': user_id
- }
- return render_template('partial/tweets-timeline.html', user = profile_user, tweets = tweets, query = query)
- else:
- # FIXME the user is probably present in the tweet expansions info.
-
- social_graph = TwitterApiV2SocialGraph(token)
- users_response = social_graph.get_user(user_id)
-
- print(users_response)
-
- user = users_response['data'][0]
-
- title = f'{user["name"]} ({user["username"]})'
-
- # FIXME official Twitter or owner's instance?
- source_url = f'https://www.twitter.com/{user["username"]}'
-
- opengraph_info = dict(
- type = 'webpage', # threads might be article
- url = source_url,
- title = title,
- description = user['description'],
- image = user['profile_image_url']
- )
- page_nav = [
- dict(
- href = url_for('twitter_v2_facade.get_profile_html', user_id=user['id']),
- label = 'Timeline',
- order = 10,
- ),
- dict (
- href = url_for('twitter_v2_facade.get_following_html', user_id=user['id']),
- label = 'Following',
- order = 40,
- ),
- dict (
- href = url_for('twitter_v2_facade.get_followers_html', user_id=user['id']),
- label = 'Followers',
- order = 50,
- )
- ]
- if g.twitter_live_enabled:
- page_nav += [
- dict(
- href = url_for('twitter_v2_live_facade.get_likes_html', user_id=user['id']),
- label = 'Likes',
- order = 20,
- ),
- dict (
- href = url_for('twitter_v2_live_facade.get_mentions_html', user_id=user['id']),
- label = 'Mentions',
- order = 30,
- ),
- dict (
- href = url_for('twitter_v2_live_facade.get_user_activity_html', user_id=user['id']),
- label = 'Activity',
- order = 60,
- )
- ]
-
- top8 = get_top8(user_id)
-
- return render_template('user-profile.html', user = user, tweets = tweets, query = query, opengraph_info=opengraph_info, page_nav = page_nav)
- @twitter_app.route('/users.html', methods=['GET'])
- def get_users ():
-
- ids = request.args.get('ids')
-
- if not ids:
- return 'supply ids=', 400
-
- token = g.twitter_user['access_token']
-
- tweet_source = TwitterApiV2SocialGraph(token)
- response_json = tweet_source.get_users(ids)
-
- ts = int(time.time() * 1000)
- with open(f'{DATA_DIR}/cache/users_{ts}.json', 'wt') as f:
- f.write(json.dumps(response_json))
-
- #print(response_json)
- #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': response_json})
-
- #following = list(map(lambda f: f['id'], response_json.get('data')))
- users = response_json.get('data')
-
- return render_template('following.html', users=users)
-
- @twitter_app.route('/media/upload', methods=['POST'])
- def post_media_upload ():
-
- token = g.twitter_user['access_token']
- form = {
- 'media_category': 'tweet_image'
- }
-
- headers = {
- 'Authorization': 'Bearer {}'.format(token)
- }
-
- url = 'http://localhost:5004/twitter/fake-twitter/media/upload'
- #url = 'https://upload.twitter.com/1.1/media/upload.json' # .json
-
-
-
- upload_media = {}
- for e in request.files.items():
- media_name = e[0]
- f = e[1]
-
- print('.')
-
- files = {'media': [secure_filename(f.filename), BufferedReader(f), f.content_type]}
- response = requests.post(url, files=files, data=form, headers=headers)
-
- print(response.status_code)
- print(response.text)
-
- response_json = json.loads(response.text)
-
- upload_media[media_name] = response_json
-
- return jsonify({'upload_media': upload_media})
- @twitter_app.route('/fake-twitter/media/upload', methods=['POST'])
- def post_media_upload2 ():
- print(request.content_type)
-
- f = request.files.get('media')
-
- f.seek(0,2)
- media_size = f.tell()
- media = {
- #'_auth': request.headers.get('Authorization'),
- 'media_key': '3_{}'.format(secure_filename(f.filename)),
- 'media_id': secure_filename(f.filename),
- 'size': media_size,
- 'expires_after_secs': 86400,
- 'image': {
- 'image_type': f.content_type,
- 'w': 1,
- 'h': 1
- }
- }
- return jsonify(media)
- def get_nav_items ():
-
- nav_items = [
-
- ]
-
- twitter_user = g.get('twitter_user')
- me = g.get('me')
-
- if twitter_user:
- nav_items += [
- dict(
- href = url_for('twitter_v2_facade.get_timeline_home_html'),
- label = 'Latest Tweets',
- order = 0
- ),
- dict (
- href = url_for('twitter_v2_facade.get_bookmarks_html'),
- label = 'Bookmarks',
- order = 100
- ),
- dict (
- href = url_for('twitter_v2_facade.get_profile_html', user_id=twitter_user['id']),
- label = 'My Profile',
- order = 200
- ),
- dict (
- href = url_for('twitter_v2_facade.oauth2_login.get_logout_html'),
- label = f'Logout ({me})',
- order = 1000
- )
- ]
-
- if g.get('twitter_live_enabled'):
- nav_items += [
- dict (
- href = url_for('twitter_v2_live_facade.get_conversations_html'),
- label = 'DMs',
- order = 10
- ),
- dict (
- href = url_for('twitter_v2_live_facade.get_mentions_html', user_id=twitter_user['id']),
- label = 'Mentions',
- order = 20
- )
- ]
-
- return nav_items
- @twitter_app.before_request
- def add_module_nav_items_to_template_context ():
- g.module_nav = get_nav_items()
-
- def get_top8 (user_id):
- if user_id != '14520320':
- return
-
- return [
- dict(
- id='14520320'
- ),
- dict(
- id='14520320'
- ),
- dict(
- id='14520320'
- ),
- dict(
- id='14520320'
- ),
-
- dict(
- id='14520320'
- ),
- dict(
- id='14520320'
- ),
- dict(
- id='14520320'
- ),
- dict(
- id='14520320'
- ),
- ]
|