from typing import List from dataclasses import asdict, replace from dacite import from_dict from importlib.util import find_spec from configparser import ConfigParser import base64 import sqlite3 import os import json import json_stream from zipfile import ZipFile import itertools import time from io import BufferedReader import re import datetime import dateutil import dateutil.parser import dateutil.tz import requests from werkzeug.utils import secure_filename from flask import json, Response, render_template, request, send_from_directory, Blueprint, session, redirect, g, current_app, jsonify from flask_cors import CORS from tweet_source import ApiV2TweetSource, TwitterApiV2SocialGraph, ApiV2ConversationSource from twitter_v2_types import Tweet, TweetExpansions from view_model import FeedItem, FeedServiceUser, ThreadItem, FeedItemAction, MediaItem, Card, PublicMetrics, NonPublicMetrics, UnrepliedSection, cleandict import oauth2_login if find_spec('brands'): from brands import find_brand_by_account, fetch_brand_info DATA_DIR='.data' twitter_app = Blueprint('twitter_v2_facade', 'twitter_v2_facade', static_folder='static', static_url_path='', url_prefix='/') twitter_app.register_blueprint(oauth2_login.oauth2_login, url_prefix="/") twitter_app.context_processor(oauth2_login.inject_me) twitter_app.before_request(oauth2_login.add_me) url_for = oauth2_login.url_for_with_me def run_script(script_name, script_vars): script_path = './{}.py'.format(script_name) if (os.path.exists(script_path)): script_file = open(script_path, 'r') script = script_file.read() script_file.close() try: return exec(script, script_vars) except: print('error running script: {}'.format(script_name)) return False False class ActivityData: def __init__ (self, user_id, db_path): self.db_path = db_path self.user_id = user_id db_exists = os.path.exists(db_path) self.db = sqlite3.connect(db_path) if not db_exists: self.init_db() return def init_db (self): self.db.execute('create table seen_user (ts, user_id)') self.db.execute('create table seen_tweet (ts, tweet_id)') return def seen_tweet (self, tweet_id): return def seen_user (self, user_id): return def add_tweet_counts (self, user_id, start, end, tweet_count): return [current_ts, user_id, start, end, tweet_count] def add_tweet_public_metrics (self, tweet_id, like_count, reply_count, retweet_count, quote_count): return def add_tweet_non_public_metrics (self, tweet_id, impression_count, click_count, link_click_count, profile_click_count): return def add_user_public_metrics (self, user_id, followers_count, following_count, tweet_count, listed_count): return class DataSet: def __init__ (self): self.items = {} return def update_items (self, items): """ merges objects by ID. Asssigns an ID if none exists. Mutates OG object. """ ids = [] for item in items: if not 'id' in item: #item = dict(item) item['id'] = uuid.uuid4().hex else: existing_item = self.items.get( item['id'] ) if existing_item: existing_item.update(item) item = existing_item self.items[ item['id'] ] = item ids.append( item['id'] ) return ids def get_items (self): return self.items.values() class TwitterMetadata: def __init__ (self, data_dir): self.data_dir = data_dir os.mkdir(data_dir, exist_ok=True) def get_tweet (self, tweet_id): path = f'{self.data_dir}/tweet_{tweet_id}.json' if not os.path.exists(path): return None with open(path, 'rt') as f: return json.loads(f.read()) def update_tweet (self, tweet_id, fields): tweet = self.get_tweet(tweet_id) if not tweet: tweet = {'id': tweet_id} tweet.update(fields) with open(f'{self.data_dir}/tweet_{tweet_id}.json', 'wt') as f: f.write(json.dumps(tweet)) return tweet #twitter_meta = TwitterMetadata('./data/meta') @twitter_app.route('/tweets', methods=['POST']) def post_tweets_create (): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] text = request.form.get('text') reply_to_tweet_id = request.form.get('reply_to_tweet_id') quote_tweet_id = request.form.get('quote_tweet_id') tweet_source = ApiV2TweetSource(token) result = tweet_source.create_tweet(text, reply_to_tweet_id=reply_to_tweet_id, quote_tweet_id=quote_tweet_id) print(result) run_script('on_tweeted', {'twitter_user': g.twitter_user, 'tweet': result}) if 'HX-Request' in request.headers: return render_template('partial/compose-form.html', new_tweet_id=result['data']['id']) else: response_body = json.dumps({ 'result': result }) return jsonify(response_body) @twitter_app.route('/tweet//retweet', methods=['POST']) def post_tweet_retweet (tweet_id): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] tweet_source = ApiV2TweetSource(token) result = tweet_source.retweet(tweet_id, user_id=user_id) print(result) run_script('on_tweeted', {'twitter_user': g.twitter_user, 'retweet': result}) if 'HX-Request' in request.headers: return """retweeted """.replace('{}', url_for('.get_tweet_html', tweet_id=tweet_id)) else: response_body = json.dumps({ 'result': result }) return jsonify(response_body) @twitter_app.route('/tweet//bookmark', methods=['POST']) def post_tweet_bookmark (tweet_id): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] tweet_source = ApiV2TweetSource(token) result = tweet_source.bookmark(tweet_id, user_id=user_id) print(result) if 'HX-Request' in request.headers: return """bookmarked """.replace('{}', url_for('.get_tweet_html', tweet_id=tweet_id)) else: response_body = json.dumps({ 'result': result }) return jsonify(response_body) @twitter_app.route('/tweet//bookmark', methods=['DELETE']) def delete_tweet_bookmark (tweet_id): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] tweet_source = ApiV2TweetSource(token) result = tweet_source.delete_bookmark(tweet_id, user_id=user_id) response_body = json.dumps({ 'result': result }) return jsonify(response_body) @twitter_app.route('/tweet/.html', methods=['GET']) def get_tweet_html (tweet_id): pagination_token = request.args.get('pagination_token') view = request.args.get('view', 'replies') if g.twitter_user: token = g.twitter_user['access_token'] else: token = os.environ.get('BEARER_TOKEN') tweet_source = ApiV2TweetSource(token) only_replies = view == 'replies' tweets = [] if not pagination_token: tweets_response = tweet_source.get_tweet(tweet_id, return_dataclass=True) tweet = tweets_response.data[0] tweets.append(tweet_model_dc_vm(tweets_response.includes, tweet, g.me)) skip_embed_replies = False if view == 'replies': replies_response = tweet_source.get_thread(tweet_id, only_replies=True, pagination_token = pagination_token, return_dataclass=True) elif view == 'thread': skip_embed_replies = True replies_response = tweet_source.get_thread(tweet_id, only_replies=False, author_id=tweets[0].author_id, pagination_token = pagination_token, return_dataclass=True) elif view == 'conversation': replies_response = tweet_source.get_thread(tweet_id, only_replies=False, pagination_token = pagination_token, return_dataclass=True) elif view == 'tweet': replies_response = None next_token = None #print("conversation meta:") #print(json.dumps(tweets_response.get('meta'), indent=2)) if replies_response and replies_response.meta and replies_response.meta.result_count: includes = replies_response.includes tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me), replies_response.data)) + tweets next_token = replies_response.meta.next_token # this method is OK except it doesn't work if there are no replies. #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me)) #related_tweets = [] # derived from includes tweets.reverse() query = {} if next_token: query = { **query, # FIXME only_replies 'next_data_url': url_for('.get_tweet_html', tweet_id=tweet_id, pagination_token=next_token, only_replies = '1' if only_replies else '0', author_id = tweets[0].author_id), 'next_page_url': url_for('.get_tweet_html', tweet_id=tweet_id, view=view, pagination_token=next_token) } user = { } if 'HX-Request' in request.headers: # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n")) return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: page_nav = [ dict( href=url_for('.get_tweet_html', tweet_id=tweets[0].conversation_id, view='thread'), label = 'author thread', order = 10 ), dict( href = url_for('.get_tweet_html', tweet_id=tweets[0].conversation_id, view='conversation'), label = 'full convo', order = 20 ) ] tweet = tweets_response.data[0] user = list(filter(lambda u: u.id == tweet.author_id, tweets_response.includes.users))[0] source_url = f'https://twitter.com/{user.username}/status/{tweet_id}' title = f'Tweet by {user.name} at {tweet.created_at}' opengraph_info = dict( type = 'webpage', # threads might be article url = source_url, title = title, description = tweet.text, image = user.profile_image_url ) if view == 'replies': tweet = tweets[0] if tweet.id == '1608510741941989378': unreplied = [ UnrepliedSection( description = "Not clear what GS is still.", span = (40, 80) ) ] tweet = replace(tweet, unreplied = unreplied ) expand_parts = request.args.get('expand') if expand_parts: expand_parts = expand_parts.split(',') def reply_to_thread_item (fi): nonlocal expand_parts if fi.id == '1609714342211244038': print(f'reply_to_thread_item id={fi.id}') unreplied = [ UnrepliedSection( description = "Is there proof of this claim?", span = (40, 80) ) ] fi = replace(fi, unreplied = unreplied ) children = None if expand_parts and len(expand_parts) and fi.id == expand_parts[0]: expand_parts = expand_parts[1:] print(f'getting expanded replied for tweet={fi.id}') expanded_replies_response = tweet_source.get_thread(fi.id, only_replies=True, return_dataclass=True) if expanded_replies_response.data: print('we got expanded responses data') children = list(map(lambda t: tweet_model_dc_vm(expanded_replies_response.includes, t, g.me), expanded_replies_response.data)) children = list(map(reply_to_thread_item, children)) return ThreadItem(feed_item=fi, children=children) children = list(map(reply_to_thread_item, tweets[1:])) root = ThreadItem( feed_item = tweet, children = children ) return render_template('tweet-thread.html', user = user, root = root, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info) else: return render_template('tweet-collection.html', user = user, tweets = tweets, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info) @twitter_app.route('/followers/.html', methods=['GET']) def get_followers_html (user_id): if not g.twitter_user: return 'need to log in.', 403 use_cache = request.args.get('use_cache') token = g.twitter_user['access_token'] social_source = TwitterApiV2SocialGraph(token) if use_cache: print(f'using cache for user {user_id}: {use_cache}') with open(f'.data/cache/followers_{user_id}_{use_cache}.json', 'rt') as f: response_json = json.load(f) else: response_json = social_source.get_followers(user_id, return_dataclass=True) if not use_cache: ts = int(time.time() * 1000) print(f'followers cache for {user_id}: {ts}') with open(f'{DATA_DIR}/cache/followers_{user_id}_{ts}.json', 'wt') as f: json.dump(response_json, f, indent=2) #print(response_json) #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': response_json}) #followers = list(map(lambda f: f['id'], response_json.get('data'))) followers = response_json.data followers = list(map(user_model_dc, followers)) return render_template('following.html', users=followers) @twitter_app.route('/following/.html', methods=['GET']) def get_following_html (user_id): if not g.twitter_user: return 'need to log in.', 403 token = g.twitter_user['access_token'] social_source = TwitterApiV2SocialGraph(token) response_json = social_source.get_following(user_id, return_dataclass=True) ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/following_{user_id}_{ts}.json', 'wt') as f: f.write(json.dumps(response_json)) #print(response_json) #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': response_json}) #following = list(map(lambda f: f['id'], response_json.get('data'))) following = list(map(user_model_dc, response_json.data)) return render_template('following.html', users=following) # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- # HTMx partials # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- def user_model (user): fsu = FeedServiceUser( id = user['id'], name = user['name'], username = user['username'], created_at = user['created_at'], description = '', # user['description'], preview_image_url = '', # user['profile_image_url'], url = url_for('.get_profile_html', user_id=user['id']) ) return fsu def user_model_dc (user): fsu = FeedServiceUser( id = user.id, name = user.name, username = user.username, created_at = user.created_at, description = user.description, preview_image_url = user.profile_image_url, website = user.url, url = url_for('.get_profile_html', user_id=user.id), source_url = f'https://twitter.com/{user.username}' ) return fsu def tweet_model_dc_vm (includes: TweetExpansions, tweet: Tweet, me, my_url_for=url_for, reply_depth=0) -> FeedItem: # retweeted_by, avi_icon_url, display_name, handle, created_at, text user = list(filter(lambda u: u.id == tweet.author_id, includes.users))[0] url = my_url_for('twitter_v2_facade.get_tweet_html', tweet_id=tweet.id, view='tweet') source_url = 'https://twitter.com/{}/status/{}'.format(user.username, tweet.id) avi_icon_url = user.profile_image_url retweet_of = None quoted = None replied_to = None if tweet.referenced_tweets: retweet_of = list(filter(lambda r: r.type == 'retweeted', tweet.referenced_tweets)) quoted = list(filter(lambda r: r.type == 'quoted', tweet.referenced_tweets)) replied_to = list(filter(lambda r: r.type == 'replied_to', tweet.referenced_tweets)) actions = { 'view_replies': FeedItemAction('twitter_v2_facade.get_tweet_html', {'tweet_id': tweet.id, 'view': 'replies'}), 'view_thread': FeedItemAction('twitter_v2_facade.get_tweet_html', {'tweet_id': tweet.conversation_id, 'view': 'thread'}), 'view_conversation': FeedItemAction('twitter_v2_facade.get_tweet_html', {'tweet_id': tweet.conversation_id, 'view': 'conversation'}), } if g.get('twitter_user'): actions.update( bookmark = FeedItemAction('twitter_v2_facade.post_tweet_bookmark', {'tweet_id': tweet.id}), delete_bookmark = FeedItemAction('twitter_v2_facade.delete_tweet_bookmark', {'tweet_id': tweet.id}), retweet = FeedItemAction('twitter_v2_facade.post_tweet_retweet', {'tweet_id': tweet.id}) ) if g.twitter_live_enabled: actions.update( view_activity = FeedItemAction('twitter_v2_live_facade.get_tweet_activity_html', {'tweet_id': tweet.id}) ) t = FeedItem( id = tweet.id, text = tweet.text, created_at = tweet.created_at, author_is_verified = user.verified, url = url, conversation_id = tweet.conversation_id, avi_icon_url = avi_icon_url, display_name = user.name, handle = user.username, author_url = my_url_for('twitter_v2_facade.get_profile_html', user_id=user.id), author_id = user.id, source_url = source_url, source_author_url = 'https://twitter.com/{}'.format(user.username), #'is_edited': len(tweet['edit_history_tweet_ids']) > 1 actions = actions, ) if reply_depth: t = replace(t, reply_depth = reply_depth) # HACK we should not refer to the request directly... if request and request.args.get('marked_reply') == str(t.id): t = replace(t, is_marked = True) # This is where we should put "is_bookmark", "is_liked", "is_in_collection", etc... if tweet.entities: if tweet.entities.urls: urls = list(filter(lambda u: u.title and u.description, tweet.entities.urls)) if len(urls): url = urls[0] card = Card( display_url = url.display_url.split('/')[0], source_url = url.unwound_url, content = url.description, title = url.title ) t = replace(t, card = card) if tweet.public_metrics: public_metrics = PublicMetrics( reply_count = tweet.public_metrics.reply_count, quote_count = tweet.public_metrics.quote_count, retweet_count = tweet.public_metrics.retweet_count, like_count = tweet.public_metrics.like_count ) t = replace(t, public_metrics = public_metrics) if tweet.non_public_metrics: non_public_metrics = NonPublicMetrics( impression_count = tweet.non_public_metrics.impression_count, user_profile_clicks = tweet.non_public_metrics.user_profile_clicks, url_link_clicks = tweet.non_public_metrics.url_link_clicks ) t = replace(t, non_public_metrics = non_public_metrics) if retweet_of and len(retweet_of): print('found retweet_of') t = replace(t, retweeted_tweet_id = retweet_of[0].id) retweeted_tweet:Tweet = list(filter(lambda t: t.id == retweet_of[0].id, includes.tweets))[0] rt = tweet_model_dc_vm(includes, retweeted_tweet, me) t = replace(rt, retweeted_tweet_id = retweet_of[0].id, source_retweeted_by_url = 'https://twitter.com/{}'.format(user.username), retweeted_by = user.name, retweeted_by_url = url_for('.get_profile_html', user_id=user.id) ) try: if tweet.attachments and tweet.attachments.media_keys and includes.media: media_keys = tweet.attachments.media_keys def first_media (mk): medias = list(filter(lambda m: m.media_key == mk, includes.media)) if len(medias): return medias[0] return None media = list(filter(lambda m: m != None, map(first_media, media_keys))) photos = filter(lambda m: m.type == 'photo', media) videos = filter(lambda m: m.type == 'video', media) photo_media = map(lambda p: MediaItem(media_key = p.media_key, type = 'photo', preview_image_url = p.url + '?name=tiny&format=webp', url = p.url, width = p.width, height = p.height), photos) def video_to_mi (v): use_hls = False # mainly iOS max_bitrate = 100000000 if use_hls: variants = list(filter(lambda var: var.content_type == 'application/x-mpegURL')) else: variants = list(filter(lambda var: var.content_type != 'application/x-mpegURL' and var.bit_rate <= max_bitrate, v.variants)) variants.sort(key=lambda v: v.bit_rate, reverse=True) url = None content_type = None size = None if len(variants): if len(variants) > 1: print('multiple qualifying variants (using first):') print(variants) variant = variants[0] url = variant.url content_type = variant.content_type size = int(v.duration_ms / 1000 * variant.bit_rate) public_metrics = None if v.public_metrics and v.public_metrics.view_count: public_metrics = PublicMetrics( view_count = v.public_metrics.view_count ) mi = MediaItem( media_key = v.media_key, type = 'video', preview_image_url = v.preview_image_url + '?name=tiny&format=webp', image_url = v.preview_image_url, width = v.width, height = v.height, url=url, content_type = content_type, duration_ms = v.duration_ms, size = size, public_metrics = public_metrics ) return mi video_media = map(video_to_mi, videos) t = replace(t, photos = list(photo_media), videos = list(video_media) ) elif tweet.attachments and tweet.attachments.media_keys and not includes.media: print('tweet had attachments and media keys, but no expansion media content was given') print(tweet.attachments.media_keys) except: # it seems like this comes when we have a retweeted tweet with media on it. print('exception adding attachments to tweet:') print(tweet) print('view tweet:') print(t) print('included media:') print(includes.media) raise 'exception adding attachments to tweet' try: if quoted and len(quoted): t = replace(t, quoted_tweet_id = quoted[0].id) quoted_tweets = list(filter(lambda t: t.id == quoted[0].id, includes.tweets)) if len(quoted_tweets): t = replace(t, quoted_tweet = tweet_model_dc_vm(includes, quoted_tweets[0], me)) except: raise 'error adding quoted tweet' try: if replied_to and len(replied_to) and includes.tweets: t = replace(t, replied_tweet_id = replied_to[0].id) if reply_depth < 1: replied_tweets = list(filter(lambda t: t.id == replied_to[0].id, includes.tweets)) if len(replied_tweets): t = replace(t, replied_tweet = tweet_model_dc_vm(includes, replied_tweets[0], me, reply_depth=reply_depth + 1)) else: print("No replied tweet found (t={}, rep={})".format(t.id, t.replied_tweet_id)) except: raise 'error adding replied_to tweet' return t def tweet_paginated_timeline (): return @twitter_app.route('/data/tweets/user//media', methods=['GET']) def get_data_tweets_media (user_id): """ Not used anywhere... trying an idea. tweet_model needs to be updated. """ token = g.twitter_user['access_token'] pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_media_tweets(author_id=user_id, has_images=True, is_reply=False, is_retweet=False, pagination_token = pagination_token) includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data'])) next_token = response_json.get('meta').get('next_token') query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_data_tweets_media', user_id=user_id, pagination_token=next_token) } if 'HX-Request' in request.headers: user = { 'id': user_id } return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: response_body = json.dumps({ 'data': tweets, 'query': query }) return jsonify(response_body) # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- # HTMx views # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- @twitter_app.route('/latest.html', methods=['GET']) def get_timeline_home_html (variant = "reverse_chronological", pagination_token=None): if not g.twitter_user: return 'need to login. go to /login.html', 403 user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] if not pagination_token: pagination_token = request.args.get('pagination_token') output_format = request.args.get('format', 'html') tq = cleandict({ 'pagination_token': pagination_token, 'since_id': request.args.get('since_id'), 'until_id': request.args.get('until_id'), 'end_time': request.args.get('end_time') }) tweet_source = ApiV2TweetSource(token) response = tweet_source.get_home_timeline(user_id, **tq) #print(json.dumps(response_json, indent=2)) includes = response.includes tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me), response.data)) next_token = response.meta.next_token tq['pagination_token'] = next_token query = { **tq, 'format': output_format, 'me': g.me } if next_token: query = { **query, #'next_data_url': url_for('.get_data_timeline_home', variant=variant, pagination_token=next_token), 'next_data_url': url_for('.get_timeline_home_html', **tq), 'next_page_url': url_for('.get_timeline_home_html', **tq) } user = { 'id': user_id } if output_format == 'feed.json': return jsonify(cleandict({ 'data': tweets, 'query': query })) elif 'HX-Request' in request.headers: return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query, show_thread_controls=True) else: return render_template('tweet-collection.html', user = user, tweets = tweets, query = query, show_thread_controls=True) @twitter_app.route('/bookmarks.html', methods=['GET']) def get_bookmarks_html (): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_bookmarks(user_id, pagination_token = pagination_token, return_dataclass=True) #print(response_json) includes = response_json.includes tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me), response_json.data)) next_token = response_json.meta.next_token query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_bookmarks_html', user_id=user_id, pagination_token=next_token), 'next_page_url': url_for('.get_bookmarks_html', user_id=user_id, pagination_token=next_token) } user = { 'id': user_id } if 'HX-Request' in request.headers: return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: return render_template('tweet-collection.html', user = user, tweets = tweets, query = query) @twitter_app.route('/profile/.html', methods=['GET']) def get_profile_html (user_id): if g.twitter_user: token = g.twitter_user['access_token'] # issue: retweets don't come back if we request non_public_metrics is_me = False and user_id == g.twitter_user['id'] else: token = os.environ.get('BEARER_TOKEN') is_me = False output_format = request.args.get('format', 'html') pagination_token = request.args.get('pagination_token') exclude_replies = request.args.get('exclude_replies', '0') exclude_retweets = request.args.get('exclude_retweets', '0') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_user_timeline(user_id, exclude_replies = exclude_replies == '1', exclude_retweets = exclude_retweets == '1', pagination_token = pagination_token, non_public_metrics = is_me, return_dataclass=True) if not response_json: print('no response_json') if response_json.meta.result_count == 0: print('no results') if not response_json.includes: print(response_json) print('no response_json.includes') if response_json.errors: print('profile get_user_timeline errors:') print(response_json.errors) ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/tl_{user_id}_{ts}_{pagination_token}.json', 'wt') as f: f.write(json.dumps(cleandict(asdict(response_json)))) if response_json.data: tweets = list(map(lambda t: tweet_model_dc_vm(response_json.includes, t, g.me), response_json.data)) else: tweets = [] next_token = response_json.meta.next_token query = cleandict({ 'pagination_token': pagination_token, 'exclude_replies': exclude_replies, 'exclude_retweets': exclude_retweets, 'format': output_format }) if next_token: query = { **query, 'next_data_url': url_for('.get_profile_html', user_id=user_id, pagination_token=next_token, exclude_replies=exclude_replies, exclude_retweets=exclude_retweets), 'next_page_url': url_for('.get_profile_html', user_id=user_id , pagination_token=next_token, exclude_replies=exclude_replies, exclude_retweets=exclude_retweets) } if output_format == 'feed.json': return jsonify(cleandict({ 'data': tweets, 'query': query })) elif 'HX-Request' in request.headers: profile_user = { 'id': user_id } return render_template('partial/tweets-timeline.html', user = profile_user, tweets = tweets, query = query) else: # FIXME the user is probably present in the tweet expansions info. social_graph = TwitterApiV2SocialGraph(token) users_response = social_graph.get_user(user_id) print(users_response) user = users_response['data'][0] title = f'{user["name"]} ({user["username"]})' # FIXME official Twitter or owner's instance? source_url = f'https://www.twitter.com/{user["username"]}' opengraph_info = dict( type = 'webpage', # threads might be article url = source_url, title = title, description = user['description'], image = user['profile_image_url'] ) page_nav = [ dict( href = url_for('twitter_v2_facade.get_profile_html', user_id=user['id']), label = 'Timeline', order = 10, ), dict ( href = url_for('twitter_v2_facade.get_following_html', user_id=user['id']), label = 'Following', order = 40, ), dict ( href = url_for('twitter_v2_facade.get_followers_html', user_id=user['id']), label = 'Followers', order = 50, ) ] if not g.twitter_user: for uid, acct in session.items(): if uid.startswith('twitter:'): page_nav += [ dict( href = url_for('twitter_v2_facade.get_profile_html', user_id=user_id, me=uid), label = f'View as {acct["id"]}', order = 1000, ) ] if g.twitter_live_enabled: page_nav += [ dict( href = url_for('twitter_v2_live_facade.get_likes_html', user_id=user['id']), label = 'Likes', order = 20, ), dict ( href = url_for('twitter_v2_live_facade.get_mentions_html', user_id=user['id']), label = 'Mentions', order = 30, ), dict ( href = url_for('twitter_v2_live_facade.get_user_activity_html', user_id=user['id']), label = 'Activity', order = 60, ) ] top8 = get_top8(user_id) brand_info = {} if g.twitter_live_enabled: brand = find_brand_by_account(f'twitter:{user_id}') if brand: page_nav += [ dict( href = url_for('brands.get_brand_html', brand_id=brand['id']), label = 'Brand Page', order = 5000, ) ] brand_info = fetch_brand_info(brand) brand_info.update({'brand': brand, 'twitter': None}) return render_template('user-profile.html', user = user, tweets = tweets, query = query, opengraph_info=opengraph_info, page_nav = page_nav, top8=top8, **brand_info) @twitter_app.route('/users.html', methods=['GET']) def get_users (): ids = request.args.get('ids') if not ids: return 'supply ids=', 400 token = g.twitter_user['access_token'] tweet_source = TwitterApiV2SocialGraph(token) response_json = tweet_source.get_users(ids) ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/users_{ts}.json', 'wt') as f: f.write(json.dumps(response_json)) #print(response_json) #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': response_json}) #following = list(map(lambda f: f['id'], response_json.get('data'))) users = response_json.get('data') return render_template('following.html', users=users) @twitter_app.route('/media/upload', methods=['POST']) def post_media_upload (): token = g.twitter_user['access_token'] form = { 'media_category': 'tweet_image' } headers = { 'Authorization': 'Bearer {}'.format(token) } url = 'http://localhost:5004/twitter/fake-twitter/media/upload' #url = 'https://upload.twitter.com/1.1/media/upload.json' # .json upload_media = {} for e in request.files.items(): media_name = e[0] f = e[1] print('.') files = {'media': [secure_filename(f.filename), BufferedReader(f), f.content_type]} response = requests.post(url, files=files, data=form, headers=headers) print(response.status_code) print(response.text) response_json = json.loads(response.text) upload_media[media_name] = response_json return jsonify({'upload_media': upload_media}) @twitter_app.route('/fake-twitter/media/upload', methods=['POST']) def post_media_upload2 (): print(request.content_type) f = request.files.get('media') f.seek(0,2) media_size = f.tell() media = { #'_auth': request.headers.get('Authorization'), 'media_key': '3_{}'.format(secure_filename(f.filename)), 'media_id': secure_filename(f.filename), 'size': media_size, 'expires_after_secs': 86400, 'image': { 'image_type': f.content_type, 'w': 1, 'h': 1 } } return jsonify(media) def get_nav_items (): nav_items = [ ] twitter_user = g.get('twitter_user') me = g.get('me') if twitter_user: nav_items += [ dict( href = url_for('twitter_v2_facade.get_timeline_home_html'), label = 'Latest Tweets', order = 0 ), dict ( href = url_for('twitter_v2_facade.get_bookmarks_html'), label = 'Bookmarks', order = 100 ), dict ( href = url_for('twitter_v2_facade.get_profile_html', user_id=twitter_user['id']), label = 'My Profile', order = 200 ), dict ( href = url_for('twitter_v2_facade.oauth2_login.get_logout_html'), label = f'Logout ({me})', order = 1000 ) ] if g.get('twitter_live_enabled'): nav_items += [ dict ( href = url_for('twitter_v2_live_facade.get_conversations_html'), label = 'DMs', order = 10 ), dict ( href = url_for('twitter_v2_live_facade.get_mentions_html', user_id=twitter_user['id']), label = 'Mentions', order = 20 ) ] return nav_items @twitter_app.before_request def add_module_nav_items_to_template_context (): g.module_nav = get_nav_items() def get_top8 (user_id): if user_id != '14520320': return return [ dict( id='14520320' ), dict( id='14520320' ), dict( id='14520320' ), dict( id='14520320' ), dict( id='14520320' ), dict( id='14520320' ), dict( id='14520320' ), dict( id='14520320' ), ]