from dataclasses import dataclass from typing import List, Optional from dataclass_type_validator import dataclass_type_validator, dataclass_validate, TypeValidationError from configparser import ConfigParser import base64 import sqlite3 import os import json import json_stream from zipfile import ZipFile import itertools import time from io import BufferedReader import re import datetime import dateutil import dateutil.parser import dateutil.tz import requests from werkzeug.utils import secure_filename from flask import json, Response, render_template, request, send_from_directory, Blueprint, session, redirect, g, current_app, jsonify from flask_cors import CORS from tweet_source import ApiV2TweetSource, TwitterApiV2SocialGraph import oauth2_login DATA_DIR='.data' twitter_app = Blueprint('twitter_v2_facade', 'twitter_v2_facade', static_folder='static', static_url_path='', url_prefix='/') twitter_app.register_blueprint(oauth2_login.oauth2_login, url_prefix="/") twitter_app.context_processor(oauth2_login.inject_me) twitter_app.before_request(oauth2_login.add_me) url_for = oauth2_login.url_for_with_me def run_script(script_name, script_vars): script_path = './{}.py'.format(script_name) if (os.path.exists(script_path)): script_file = open(script_path, 'r') script = script_file.read() script_file.close() try: return exec(script, script_vars) except: print('error running script: {}'.format(script_name)) return False False class ActivityData: def __init__ (self,db_path): self.db_path = db_path db_exists = os.path.exists(db_path) self.db = sqlite3.connect(db_path) if not db_exists: self.init_db() return def init_db (self): self.db.execute('create table seen_user (ts, user_id)') self.db.execute('create table seen_tweet (ts, tweet_id)') return def seen_tweet (self, tweet_id): return def seen_user (self, user_id): return def add_tweet_counts (self, user_id, start, end, tweet_count): return [current_ts, user_id, start, end, tweet_count] def add_tweet_public_metrics (self, tweet_id, like_count, reply_count, retweet_count, quote_count): return def add_tweet_non_public_metrics (self, tweet_id, impression_count, click_count, link_click_count, profile_click_count): return def add_user_public_metrics (self, user_id, followers_count, following_count, tweet_count, listed_count): return class DataSet: def __init__ (self): self.items = {} return def update_items (self, items): """ merges objects by ID. Asssigns an ID if none exists. Mutates OG object. """ ids = [] for item in items: if not 'id' in item: #item = dict(item) item['id'] = uuid.uuid4().hex else: existing_item = self.items.get( item['id'] ) if existing_item: existing_item.update(item) item = existing_item self.items[ item['id'] ] = item ids.append( item['id'] ) return ids def get_items (self): return self.items.values() class TwitterMetadata: def __init__ (self, data_dir): self.data_dir = data_dir os.mkdir(data_dir, exist_ok=True) def get_tweet (self, tweet_id): path = f'{self.data_dir}/tweet_{tweet_id}.json' if not os.path.exists(path): return None with open(path, 'rt') as f: return json.loads(f.read()) def update_tweet (self, tweet_id, fields): tweet = self.get_tweet(tweet_id) if not tweet: tweet = {'id': tweet_id} tweet.update(fields) with open(f'{self.data_dir}/tweet_{tweet_id}.json', 'wt') as f: f.write(json.dumps(tweet)) return tweet def collection_from_card_source (url): """ temp1 = await fetch('http://localhost:5000/notes/cards/search?q=twitter.com/&limit=10').then(r => r.json()) re = /(http[^\s]+twitter\.com\/[^\/]+\/status\/[\d]+)/ig tweetLinks = temp1.cards.map(c => c.card.content).map(c => c.match(re)) tweetLinks2 = tweetLinks.flat().filter(l => l) tweetLinksS = Array.from(new Set(tweetLinks2)) statusUrls = tweetLinksS.map(s => new URL(s)) //users = Array.from(new Set(statusUrls.map(s => s.pathname.split('/')[1]))) ids = Array.from(new Set(statusUrls.map(s => parseInt(s.pathname.split('/')[3])))) """ """ temp1 = JSON.parse(document.body.innerText) // get swipe note + created_at + tweet user + tweet ID tweetCards = temp1.cards.map(c => c.card).filter(c => c.content.match(re)) tweets = tweetCards.map(c => ({created_at: c.created_at, content: c.content, tweets: c.content.match(re).map(m => new URL(m))})) tweets.filter(t => t.tweets.filter(t2 => t2.user.toLowerCase() == 'stephenmpinto').length) // HN re = /(http[^\s]+news.ycombinator\.com\/[^\s]+\=[\d]+)/ig linkCards = temp1.cards.map(c => c.card).filter(c => c.content.match(re)) links = linkCards.map(c => ({created_at: c.created_at, content: c.content, links: c.content.match(re).map(m => new URL(m))})) // YT (I thnk I've already done this one) """ # more in 2022 twitter report return None def get_tweet_collection (collection_id): with open(f'{DATA_DIR}/collection/{collection_id}.json', 'rt', encoding='utf-8') as f: collection = json.loads(f.read()) return collection # pagination token is the next tweet_ID @twitter_app.get('/collection/.html') def get_collection_html (collection_id): max_results = int(request.args.get('max_results', 10)) pagination_token = request.args.get('pagination_token') collection = get_tweet_collection(collection_id) if 'authorized_users' in collection and g.twitter_user['id'] not in collection['authorized_users']: return 'access denied.', 403 items = [] for item in collection['items']: tweet_id = item['id'] if pagination_token and tweet_id != pagination_token: continue elif tweet_id == pagination_token: pagination_token = None elif len(items) == max_results: pagination_token = tweet_id break items.append(item) if not len(items): return 'no tweets', 404 token = g.twitter_user['access_token'] tweet_source = ApiV2TweetSource(token) tweet_ids = list(map(lambda item: item['id'], items)) response_json = tweet_source.get_tweets( tweet_ids ) #print(response_json) if 'errors' in response_json: # types: # https://api.twitter.com/2/problems/not-authorized-for-resource (blocked or suspended) # https://api.twitter.com/2/problems/resource-not-found (deleted) #print(response_json.get('errors')) for err in response_json.get('errors'): if not 'type' in err: print('unknown error type: ' + str(err)) elif err['type'] == 'https://api.twitter.com/2/problems/not-authorized-for-resource': print('blocked or suspended tweet: ' + err['value']) elif err['type'] == 'https://api.twitter.com/2/problems/resource-not-found': print('deleted tweet: ' + err['value']) else: print('unknown error: ' + str(err)) includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data'])) for item in items: t = list(filter(lambda t: item['id'] == t['id'], tweets)) if not len(t): print("no tweet for item: " + item['id']) t = { "id": item['id'], "text": "(Deleted, suspended or blocked)", "created_at": "", "handle": "error", "display_name": "Error" } # FIXME 1) put this in relative order to the collection # FIXME 2) we can use the tweet link to get the user ID... tweets.append(t) else: t = t[0] t['note'] = item['note'] if request.args.get('format') == 'json': return jsonify({'ids': tweet_ids, 'data': response_json, 'tweets': tweets, 'items': items, 'pagination_token': pagination_token}) else: query = {} if pagination_token: query['next_data_url'] = url_for('.get_collection_html', collection_id=collection_id, pagination_token=pagination_token) if 'HX-Request' in request.headers: return render_template('partial/tweets-timeline.html', tweets = tweets, user = {}, query = query) else: if pagination_token: query['next_page_url'] = url_for('.get_collection_html', collection_id=collection_id, pagination_token=pagination_token) return render_template('tweet-collection.html', tweets = tweets, user = {}, query = query) @twitter_app.post('/data/collection/create/from-cards') def post_data_collection_create_from_cards (): """ // create collection from search, supporting multiple Tweets per card and Tweets in multiple Cards. re = /(https?[a-z0-9\.\/\:]+twitter\.com\/[0-9a-z\_]+\/status\/[\d]+)/ig temp1 = await fetch('http://localhost:5000/notes/cards/search?q=twitter.com/').then(r => r.json()) cardMatches = temp1.cards .map(cm => Object.assign({}, cm, {tweetLinks: Array.from(new Set(cm.card.content.match(re)))})) .filter(cm => cm.tweetLinks && cm.tweetLinks.length) .map(cm => Object.assign({}, cm, {tweetUrls: cm.tweetLinks.map(l => new URL(l))})) .map(cm => Object.assign({}, cm, {tweetInfos: cm.tweetUrls.map(u => ({user: u.pathname.split('/')[1], tweetId: u.pathname.split('/')[3]}))})); collectionCards = {} cardMatches.forEach(function (cm) { if (!cm.tweetLinks.length) { return; } cm.tweetInfos.forEach(function (ti) { if (!collectionCards[ti.tweetId]) { collectionCards[ti.tweetId] = []; } collectionCards[ti.tweetId].push(cm.card); }) }) var collectionItems = []; Object.entries(collectionCards).forEach(function (e) { var tweetId = e[0], cards = e[1]; var note = cards.map(function (card) { return card.created_at + "\n\n" + card.content; }).join("\n\n-\n\n"); collectionItems.push({id: tweetId, note: note, tweet_infos: cm.tweetInfos, card_infos: cards.map(c => 'card#' + c.id)}); }) """ collection = { 'items': [], # described in JS function above 'authorized_users': [g.twitter_user['id']] } return jsonify(collection) #twitter_meta = TwitterMetadata('./data/meta') @twitter_app.route('/tweets', methods=['POST']) def post_tweets_create (): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] text = request.form.get('text') reply_to_tweet_id = request.form.get('reply_to_tweet_id') quote_tweet_id = request.form.get('quote_tweet_id') tweet_source = ApiV2TweetSource(token) result = tweet_source.create_tweet(text, reply_to_tweet_id=reply_to_tweet_id, quote_tweet_id=quote_tweet_id) print(result) run_script('on_tweeted', {'twitter_user': g.twitter_user, 'tweet': result}) if 'HX-Request' in request.headers: return render_template('partial/compose-form.html', new_tweet_id=result['data']['id']) else: response_body = json.dumps({ 'result': result }) return jsonify(response_body) @twitter_app.route('/tweet//retweet', methods=['POST']) def post_tweet_retweet (tweet_id): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] tweet_source = ApiV2TweetSource(token) result = tweet_source.retweet(tweet_id, user_id=user_id) print(result) run_script('on_tweeted', {'twitter_user': g.twitter_user, 'retweet': result}) if 'HX-Request' in request.headers: return """retweeted """.replace('{}', url_for('.get_tweet_html', tweet_id=tweet_id)) else: response_body = json.dumps({ 'result': result }) return jsonify(response_body) @twitter_app.route('/tweet//bookmark', methods=['POST']) def post_tweet_bookmark (tweet_id): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] tweet_source = ApiV2TweetSource(token) result = tweet_source.bookmark(tweet_id, user_id=user_id) print(result) if 'HX-Request' in request.headers: return """bookmarked """.replace('{}', url_for('.get_tweet_html', tweet_id=tweet_id)) else: response_body = json.dumps({ 'result': result }) return jsonify(response_body) @twitter_app.route('/tweet//bookmark', methods=['DELETE']) def delete_tweet_bookmark (tweet_id): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] tweet_source = ApiV2TweetSource(token) result = tweet_source.delete_bookmark(tweet_id, user_id=user_id) response_body = json.dumps({ 'result': result }) return jsonify(response_body) @twitter_app.route('/tweet/.html', methods=['GET']) def get_tweet_html (tweet_id): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] pagination_token = request.args.get('pagination_token') view = request.args.get('view', 'replies') token = g.twitter_user['access_token'] tweet_source = ApiV2TweetSource(token) only_replies = view == 'replies' tweets = [] if not pagination_token: response_json = tweet_source.get_tweet(tweet_id) print("parent tweet=") print(response_json) includes = response_json.get('includes') tweet = response_json.get('data')[0] tweets.append(tweet_model(includes, tweet, g.me)) response_json = None data_route = None if view == 'replies': data_route = '.get_tweet_html' response_json = tweet_source.get_thread(tweet_id, only_replies=True, pagination_token = pagination_token) elif view == 'thread': data_route = '.get_tweet_html' response_json = tweet_source.get_thread(tweet_id, only_replies=False, author_id=tweets[0]['author_id'], pagination_token = pagination_token) elif view == 'conversation': data_route = '.get_tweet_html' response_json = tweet_source.get_thread(tweet_id, only_replies=False, pagination_token = pagination_token) elif view == 'tweet': None next_token = None #print("conversation meta:") #print(json.dumps(response_json.get('meta'), indent=2)) if response_json and response_json.get('meta').get('result_count'): includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data'])) + tweets next_token = response_json.get('meta').get('next_token') # this method is OK except it doesn't work if there are no replies. #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me)) #related_tweets = [] # derived from includes tweets.reverse() query = {} if next_token: query = { **query, # FIXME only_replies 'next_data_url': url_for(data_route, tweet_id=tweet_id, pagination_token=next_token, only_replies = '1' if only_replies else '0', author_id = tweets[0]['author_id']), 'next_page_url': url_for('.get_tweet_html', tweet_id=tweet_id, view=view, pagination_token=next_token) } user = { 'id': user_id } if 'HX-Request' in request.headers: # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n")) return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: return render_template('tweet-collection.html', user = user, tweets = tweets, query = query, show_parent_tweet_controls=True) @twitter_app.route('/followers/.html', methods=['GET']) def get_data_user_followers (user_id): token = g.twitter_user['access_token'] social_source = TwitterApiV2SocialGraph(token) response_json = social_source.get_followers(user_id) ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/followers_{user_id}_{ts}.json', 'wt') as f: f.write(json.dumps(response_json)) print(response_json) #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': response_json}) followers = list(map(lambda f: f['id'], response_json.get('data'))) return render_template('following.html', following=followers) @twitter_app.route('/following/.html', methods=['GET']) def get_data_user_following (user_id): token = g.twitter_user['access_token'] social_source = TwitterApiV2SocialGraph(token) response_json = social_source.get_following(user_id) ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/following_{user_id}_{ts}.json', 'wt') as f: f.write(json.dumps(response_json)) print(response_json) #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': response_json}) following = list(map(lambda f: f['id'], response_json.get('data'))) return render_template('following.html', following=following) @twitter_app.route('/data/timeline/user//counts') def get_data_timeline_user_counts (user_id): query = f'from:{user_id}' # is:reply is:quote is:retweet has:links has:mentions has:media has:images has:videos has:geo if not oauth2_login.app_access_token: return 'refresh app token first.', 400 tweet_source = ApiV2TweetSource(oauth2_login.app_access_token) response_json = tweet_source.count_tweets(query) ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/tl_counts_{user_id}_{ts}.json', 'wt') as f: f.write(json.dumps(response_json)) data = list(filter(lambda d: d.get('tweet_count') > 0, response_json.get('data'))) result = { 'total_count': response_json.get('meta').get('total_tweet_count'), 'data': data } return jsonify(result) # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- # HTMx partials # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- @dataclass_validate @dataclass class PublicMetrics: reply_count: int = None quote_count: int = None retweet_count: int = None like_count: int = None @dataclass_validate @dataclass class NonPublicMetrics: impression_count: int = None user_profile_clicks: int = None url_link_clicks: int = None @dataclass_validate @dataclass class MediaItem: height: int width: int url: str media_key: str type: str preview_image_url: str image_url: str @dataclass_validate @dataclass class Card: display_url: Optional[str] = None source_url: Optional[str] = None content: Optional[str] = None title: Optional[str] = None @dataclass_validate @dataclass class TweetModel: id: str text: str created_at: str display_name: str handle: str author_is_verified: Optional[bool] = None url: Optional[str] = None conversation_id: Optional[str] = None avi_icon_url: Optional[str] = None author_url: Optional[str] = None author_id: Optional[str] = None source_url: Optional[str] = None source_author_url: Optional[str] = None reply_depth: Optional[int] = 0 is_marked: Optional[bool] = None card: Optional[Card] = None public_metrics: Optional[PublicMetrics] = None non_public_metrics: Optional[NonPublicMetrics] = None retweeted_tweet_id: Optional[str] = None source_retweeted_by_url: Optional[str] = None retweeted_by: Optional[str] = None retweeted_by_url: Optional[str] = None videos: Optional[List[MediaItem]] = None photos: Optional[List[MediaItem]] = None quoted_tweet_id: Optional[str] = None quoted_tweet: Optional['TweetModel'] = None replied_tweet_id: Optional[str] = None replied_tweet: Optional['TweetModel'] = None def update(self, new): for key, value in new.items(): if hasattr(self, key): setattr(self, key, value) dataclass_type_validator(self) # tm = TweetModel(id="1", text="aa", created_at="fs", display_name="fda", handle="fdsafas") def tweet_model (includes, tweet_data, me, my_url_for=url_for, reply_depth=0): # retweeted_by, avi_icon_url, display_name, handle, created_at, text user = list(filter(lambda u: u.get('id') == tweet_data['author_id'], includes.get('users')))[0] url = my_url_for('.get_tweet_html', tweet_id=tweet_data['id']) source_url = 'https://twitter.com/{}/status/{}'.format(user['username'], tweet_data['id']) avi_icon_url = user['profile_image_url'] retweet_of = None quoted = None replied_to = None if 'referenced_tweets' in tweet_data: retweet_of = list(filter(lambda r: r['type'] == 'retweeted', tweet_data['referenced_tweets'])) quoted = list(filter(lambda r: r['type'] == 'quoted', tweet_data['referenced_tweets'])) replied_to = list(filter(lambda r: r['type'] == 'replied_to', tweet_data['referenced_tweets'])) t = { 'id': tweet_data['id'], 'text': tweet_data['text'], 'created_at': tweet_data['created_at'], 'author_is_verified': user['verified'], 'url': url, 'conversation_id': tweet_data['conversation_id'], 'avi_icon_url': avi_icon_url, 'display_name': user['name'], 'handle': user['username'], 'author_url': my_url_for('.get_profile_html', user_id=user['id']), 'author_id': user['id'], 'source_url': source_url, 'source_author_url': 'https://twitter.com/{}'.format(user['username']), #'is_edited': len(tweet_data['edit_history_tweet_ids']) > 1 } if reply_depth: t['reply_depth'] = reply_depth # HACK we should not refer to the request directly... if request and request.args.get('marked_reply') == str(t['id']): t['is_marked'] = True # This is where we should put "is_bookmark", "is_liked", "is_in_collection", etc... if 'entities' in tweet_data: if 'urls' in tweet_data['entities']: urls = list(filter(lambda u: 'title' in u and 'description' in u, tweet_data['entities']['urls'])) if len(urls): url = urls[0] t['card'] = { 'display_url': url['display_url'].split('/')[0], 'source_url': url['unwound_url'], 'content': url['description'], 'title': url['title'] } if 'public_metrics' in tweet_data: t['public_metrics'] = { 'reply_count': tweet_data['public_metrics']['reply_count'], 'quote_count': tweet_data['public_metrics']['quote_count'], 'retweet_count': tweet_data['public_metrics']['retweet_count'], 'like_count': tweet_data['public_metrics']['like_count'] } try: pm = PublicMetrics(**t['public_metrics']) except: print('error populating public_metrics') if 'non_public_metrics' in tweet_data: t['non_public_metrics'] = { 'impression_count': tweet_data['non_public_metrics']['impression_count'], 'user_profile_clicks': tweet_data['non_public_metrics']['user_profile_clicks'] } if 'url_link_clicks' in tweet_data['non_public_metrics']: t['non_public_metrics']['url_link_clicks'] = tweet_data['non_public_metrics']['url_link_clicks'] if retweet_of and len(retweet_of): t['retweeted_tweet_id'] = retweet_of[0]['id'] retweeted_tweet = list(filter(lambda t: t.get('id') == retweet_of[0]['id'], includes.get('tweets')))[0] t.update({ 'source_retweeted_by_url': 'https://twitter.com/{}'.format(user['username']), 'retweeted_by': user['name'], 'retweeted_by_url': url_for('.get_profile_html', user_id=user['id']) }) rt = tweet_model(includes, retweeted_tweet, me) t.update(rt) try: if 'attachments' in tweet_data and 'media_keys' in tweet_data['attachments']: media_keys = tweet_data['attachments']['media_keys'] def first_media (mk): medias = list(filter(lambda m: m['media_key'] == mk, includes['media'])) if len(medias): return medias[0] return None media = filter(lambda m: m != None, map(first_media, media_keys)) #print('found media=') #print(media) photos = filter(lambda m: m['type'] == 'photo', media) videos = filter(lambda m: m['type'] == 'video', media) photos = map(lambda p: {**p, 'preview_image_url': p['url'] + '?name=tiny&format=webp'}, photos) videos = map(lambda p: {**p, 'image_url': p['preview_image_url'], 'preview_image_url': p['preview_image_url'] + '?name=tiny&format=webp'}, videos) t['photos'] = list(photos) t['videos'] = list(videos) except: # it seems like this comes when we have a retweeted tweet with media on it. print('exception adding attachments to tweet:') print(tweet_data) print('view tweet:') print(t) print('included media:') print(includes.get('media')) try: if quoted and len(quoted): t['quoted_tweet_id'] = quoted[0]['id'] quoted_tweets = list(filter(lambda t: t.get('id') == quoted[0]['id'], includes.get('tweets'))) if len(quoted_tweets): t['quoted_tweet'] = tweet_model(includes, quoted_tweets[0], me) except: print('error adding quoted tweet') try: if replied_to and len(replied_to): t['replied_tweet_id'] = replied_to[0]['id'] if reply_depth < 1: replied_tweets = list(filter(lambda t: t.get('id') == replied_to[0]['id'], includes.get('tweets'))) if len(replied_tweets): print("Found replied Tweet (t={}, rep={})".format(t['id'], t['replied_tweet_id'])) t['replied_tweet'] = tweet_model(includes, replied_tweets[0], me, reply_depth=reply_depth + 1) else: print("No replied tweet found (t={}, rep={})".format(t['id'], t['replied_tweet_id'])) except: print('error adding replied_to tweet') try: tm = TweetModel(**t) except: print('error populating TweetModel') return t def tweet_paginated_timeline (): return # This is a hybrid of get_tweet_html and get_collection_html, where we feed in the IDs. @twitter_app.route('/data/tweets', methods=['GET']) def get_twitter_tweets (): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] ids = request.args.get('ids') max_id='' if ids: ids = ids.split(',') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_tweets(ids) user = { 'id': user_id } query = {} if 'HX-Request' in request.headers: includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data'])) return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: return jsonify(response_json) @twitter_app.route('/data/mentions/', methods=['GET']) def get_data_mentions (user_id): token = g.twitter_user['access_token'] pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_mentions_timeline(user_id, pagination_token = pagination_token) # the OG tweet is in the include.tweets collection. # All thread tweets are as well, clearly. Does it cost a fetch? #print(response_json) includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, token), response_json['data'])) related_tweets = [] # derived from includes tweets.reverse() next_token = response_json.get('meta').get('next_token') query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_data_metnions', pagination_token=next_token) } if 'HX-Request' in request.headers: user = { 'id': user_id } # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n")) return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: response_body = json.dumps({ 'tweets': tweets, 'pagination_token': pagination_token, 'next_token': next_token }) return jsonify(response_body) @twitter_app.route('/data/between//', methods=['GET']) def get_data_between (user_id, user2_id): token = g.twitter_user['access_token'] pagination_token = request.args.get('pagination_token') if user_id == 'me': user_id = g.twitter_user['id'] if user2_id == 'me': user2_id = g.twitter_user['id'] search_query = "(from:{} to:{}) OR (to:{} from:{})".format(user_id, user2_id, user_id, user2_id) tweet_source = ApiV2TweetSource(token) response_json = tweet_source.search_tweets(search_query, pagination_token = pagination_token) # the OG tweet is in the include.tweets collection. # All thread tweets are as well, clearly. Does it cost a fetch? #print(response_json) # augment with archive if one of the users is me # /twitter-archive/tweets/search?in_reply_to_user_id=__ # /twitter-archive/tweets/search?q=@__ tweets = [] next_token = None if response_json.get('meta').get('result_count'): includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data'])) related_tweets = [] # derived from includes next_token = response_json.get('meta').get('next_token') tweets.reverse() query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_data_between', pagination_token=next_token) } if 'HX-Request' in request.headers: user = { 'id': twitter['id'] } # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n")) return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query, me = me) else: response_body = json.dumps({ 'tweets': tweets, 'pagination_token': pagination_token, 'next_token': next_token }) return jsonify(response_body) @twitter_app.route('/data/thread/', methods=['GET']) def get_data_thread (tweet_id): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_thread(tweet_id, author_id=user_id, pagination_token = pagination_token) # the OG tweet is in the include.tweets collection. # All thread tweets are as well, clearly. Does it cost a fetch? #print(response_json) tweets = [] next_token = None if response_json.get('meta').get('result_count'): includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data'])) # FIXME this method is OK except it doesn't work if there are no replies. #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me)) #related_tweets = [] # derived from includes next_token = response_json.get('meta').get('next_token') if not pagination_token: response_json = tweet_source.get_tweet(tweet_id) print("parent tweet=") #print(response_json) includes = response_json.get('includes') tweet = response_json.get('data')[0] tweets.append(tweet_model(includes, tweet, g.me)) tweets.reverse() query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_data_thread', tweet_id=tweet_id, pagination_token=next_token) } if 'HX-Request' in request.headers: user = { 'id': user_id } # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n")) return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: response_body = json.dumps({ 'tweets': tweets, 'pagination_token': pagination_token, 'next_token': next_token }) return jsonify(response_body) @twitter_app.route('/data/conversation/', methods=['GET']) def get_data_conversation (tweet_id): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] pagination_token = request.args.get('pagination_token') only_replies = request.args.get('only_replies') tweet_source = ApiV2TweetSource(token) # seems to get l response_json = tweet_source.get_thread(tweet_id, only_replies = only_replies == '1', pagination_token = pagination_token) # the OG tweet is in the include.tweets collection. # All thread tweets are as well, clearly. Does it cost a fetch? #print(response_json) tweets = [] next_token = None print("conversation meta:") print(json.dumps(response_json.get('meta'), indent=2)) if response_json.get('meta').get('result_count'): includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data'])) next_token = response_json.get('meta').get('next_token') # this method is OK except it doesn't work if there are no replies. #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me)) if not pagination_token: response_json = tweet_source.get_tweet(tweet_id) print("parent tweet=") print(response_json) includes = response_json.get('includes') tweet = response_json.get('data')[0] tweets.append(tweet_model(includes, tweet, g.me)) #related_tweets = [] # derived from includes tweets.reverse() query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_data_conversation', tweet_id=tweet_id, pagination_token=next_token) } if 'HX-Request' in request.headers: user = { 'id': user_id } # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n")) return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: response_body = { 'tweets': tweets, 'pagination_token': pagination_token, 'next_token': next_token } return jsonify(response_body) @twitter_app.route('/data/likes/', methods=['GET']) def get_data_likes (user_id): token = g.twitter_user['access_token'] pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_likes(user_id, pagination_token = pagination_token) ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/likes_{user_id}_{ts}_{pagination_token}.json', 'wt') as f: f.write(json.dumps(response_json)) includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data'])) next_token = response_json.get('meta').get('next_token') query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_data_likes', user_id=user_id, pagination_token=next_token) } if 'HX-Request' in request.headers: user = { 'id': user_id } return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: response_body = json.dumps({ 'tweets': tweets, 'query': query }) return jsonify(response_body) @twitter_app.route('/data/tweets/user//media', methods=['GET']) def get_data_tweets_media (user_id): token = g.twitter_user['access_token'] pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_media_tweets(author_id=user_id, has_images=True, is_reply=False, is_retweet=False, pagination_token = pagination_token) includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data'])) next_token = response_json.get('meta').get('next_token') query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_data_tweets_media', user_id=user_id, pagination_token=next_token) } if 'HX-Request' in request.headers: user = { 'id': user_id } return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: response_body = json.dumps({ 'tweets': tweets, 'query': query }) return jsonify(response_body) # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- # HTMx views # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- @twitter_app.route('/latest.html', methods=['GET']) def get_timeline_home_html (variant = "reverse_chronological", pagination_token=None): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] if not pagination_token: pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_home_timeline(user_id, pagination_token = pagination_token) #print(json.dumps(response_json, indent=2)) includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data'])) next_token = response_json.get('meta').get('next_token') query = {} if next_token: query = { **query, #'next_data_url': url_for('.get_data_timeline_home', variant=variant, pagination_token=next_token), 'next_data_url': url_for('.get_timeline_home_html', pagination_token=next_token), 'next_page_url': url_for('.get_timeline_home_html', pagination_token=next_token) } user = { 'id': user_id } if 'HX-Request' in request.headers: return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: return render_template('tweet-collection.html', user = user, tweets = tweets, query = query) @twitter_app.route('/bookmarks.html', methods=['GET']) def get_bookmarks_html (): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_bookmarks(user_id, pagination_token = pagination_token) #print(response_json) includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data'])) next_token = response_json.get('meta').get('next_token') query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_bookmarks_html', user_id=user_id, pagination_token=next_token), 'next_page_url': url_for('.get_bookmarks_html', user_id=user_id, pagination_token=next_token) } user = { 'id': user_id } if 'HX-Request' in request.headers: return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: return render_template('tweet-collection.html', user = user, tweets = tweets, query = query) @twitter_app.route('/conversations.html', methods=['GET']) def get_conversations_html (): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] pagination_token = request.args.get('pagination_token') max_results = int(request.args.get('max_results', 10)) # https://developer.twitter.com/en/docs/twitter-api/direct-messages/lookup/api-reference/get-dm_events url = "https://api.twitter.com/2/dm_events" params = { "dm_event.fields": "id,event_type,text,created_at,dm_conversation_id,sender_id,participant_ids,referenced_tweets,attachments", "expansions": ",".join(["sender_id", "participant_ids"]), "max_results": max_results, "user.fields": ",".join(["id", "created_at", "name", "username", "location", "profile_image_url", "url", "verified"]) } if pagination_token: params['pagination_token'] = pagination_token headers = {"Authorization": "Bearer {}".format(token)} response = requests.get(url, params=params, headers=headers) response_json = json.loads(response.text) print(response.text) dm_events = response_json.get('data') next_token = response_json.get('meta').get('next_token') query = { 'pagination_token': pagination_token, 'next_token': next_token } user = { 'id': user_id } return render_template('conversations.html', user = user, dm_events = dm_events, query = query) @twitter_app.route('/profile/.html', methods=['GET']) def get_profile_html (user_id): token = g.twitter_user['access_token'] is_me = user_id == g.twitter_user['id'] pagination_token = request.args.get('pagination_token') exclude_replies = request.args.get('exclude_replies', '0') exclude_retweets = request.args.get('exclude_retweets', '0') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_user_timeline(user_id, exclude_replies = exclude_replies == '1', exclude_retweets = exclude_retweets == '1', pagination_token = pagination_token, non_public_metrics = is_me) ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/tl_{user_id}_{ts}_{pagination_token}.json', 'wt') as f: f.write(json.dumps(response_json)) includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data'])) next_token = response_json.get('meta').get('next_token') query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_profile_html', user_id=user_id, pagination_token=next_token, exclude_replies=exclude_replies, exclude_retweets=exclude_retweets), 'next_page_url': url_for('.get_profile_html', user_id=user_id , pagination_token=next_token, exclude_replies=exclude_replies, exclude_retweets=exclude_retweets) } profile_user = { 'id': user_id } if 'HX-Request' in request.headers: return render_template('partial/tweets-timeline.html', user = profile_user, tweets = tweets, query = query) else: return render_template('user-profile.html', user = profile_user, tweets = tweets, query = query) @twitter_app.route('/media/upload', methods=['POST']) def post_media_upload (): token = g.twitter_user['access_token'] form = { 'media_category': 'tweet_image' } headers = { 'Authorization': 'Bearer {}'.format(token) } url = 'http://localhost:5004/twitter/fake-twitter/media/upload' #url = 'https://upload.twitter.com/1.1/media/upload.json' # .json upload_media = {} for e in request.files.items(): media_name = e[0] f = e[1] print('.') files = {'media': [secure_filename(f.filename), BufferedReader(f), f.content_type]} response = requests.post(url, files=files, data=form, headers=headers) print(response.status_code) print(response.text) response_json = json.loads(response.text) upload_media[media_name] = response_json return jsonify({'upload_media': upload_media}) @twitter_app.route('/fake-twitter/media/upload', methods=['POST']) def post_media_upload2 (): print(request.content_type) f = request.files.get('media') f.seek(0,2) media_size = f.tell() media = { #'_auth': request.headers.get('Authorization'), 'media_key': '3_{}'.format(secure_filename(f.filename)), 'media_id': secure_filename(f.filename), 'size': media_size, 'expires_after_secs': 86400, 'image': { 'image_type': f.content_type, 'w': 1, 'h': 1 } } return jsonify(media)