from typing import List from dataclasses import asdict, replace from dacite import from_dict from importlib.util import find_spec from configparser import ConfigParser import base64 import sqlite3 import os import json import json_stream from zipfile import ZipFile import itertools import time from io import BufferedReader import re import datetime import dateutil import dateutil.parser import dateutil.tz import requests from werkzeug.utils import secure_filename from flask import json, Response, render_template, request, send_from_directory, Blueprint, session, redirect, g, current_app, jsonify from flask_cors import CORS from twitter_v2.api import ApiV2TweetSource, TwitterApiV2SocialGraph, ApiV2ConversationSource from twitter_v2.types import Tweet, TweetExpansions from hogumathi_app.view_model import FeedItem, FeedServiceUser, ThreadItem, FeedItemAction, MediaItem, Card, PublicMetrics, NonPublicMetrics, UnrepliedSection, CollectionPage, cleandict from .view_model import user_model_dc, tweet_model_dc_vm from . import content_source from . import oauth2_login theme_variant = '' if find_spec('theme_bootstrap5'): # FIXME use g. theme_variant = '-bs' DATA_DIR='.data' twitter_app = Blueprint('twitter_v2_facade', 'twitter_v2_facade', static_folder='static', static_url_path='', url_prefix='/') twitter_app.register_blueprint(oauth2_login.oauth2_login, url_prefix="/") twitter_app.context_processor(oauth2_login.inject_me) twitter_app.before_request(oauth2_login.add_me) url_for = oauth2_login.url_for_with_me def run_script(script_name, script_vars): script_path = './{}.py'.format(script_name) if (os.path.exists(script_path)): script_file = open(script_path, 'r') script = script_file.read() script_file.close() try: return exec(script, script_vars) except: print('error running script: {}'.format(script_name)) return False False class ActivityData: def __init__ (self, user_id, db_path): self.db_path = db_path self.user_id = user_id db_exists = os.path.exists(db_path) self.db = sqlite3.connect(db_path) if not db_exists: self.init_db() return def init_db (self): self.db.execute('create table seen_user (ts, user_id)') self.db.execute('create table seen_tweet (ts, tweet_id)') return def seen_tweet (self, tweet_id): return def seen_user (self, user_id): return def add_tweet_counts (self, user_id, start, end, tweet_count): return [current_ts, user_id, start, end, tweet_count] def add_tweet_public_metrics (self, tweet_id, like_count, reply_count, retweet_count, quote_count): return def add_tweet_non_public_metrics (self, tweet_id, impression_count, click_count, link_click_count, profile_click_count): return def add_user_public_metrics (self, user_id, followers_count, following_count, tweet_count, listed_count): return class DataSet: def __init__ (self): self.items = {} return def update_items (self, items): """ merges objects by ID. Asssigns an ID if none exists. Mutates OG object. """ ids = [] for item in items: if not 'id' in item: #item = dict(item) item['id'] = uuid.uuid4().hex else: existing_item = self.items.get( item['id'] ) if existing_item: existing_item.update(item) item = existing_item self.items[ item['id'] ] = item ids.append( item['id'] ) return ids def get_items (self): return self.items.values() class TwitterMetadata: def __init__ (self, data_dir): self.data_dir = data_dir os.mkdir(data_dir, exist_ok=True) def get_tweet (self, tweet_id): path = f'{self.data_dir}/tweet_{tweet_id}.json' if not os.path.exists(path): return None with open(path, 'rt') as f: return json.loads(f.read()) def update_tweet (self, tweet_id, fields): tweet = self.get_tweet(tweet_id) if not tweet: tweet = {'id': tweet_id} tweet.update(fields) with open(f'{self.data_dir}/tweet_{tweet_id}.json', 'wt') as f: f.write(json.dumps(tweet)) return tweet #twitter_meta = TwitterMetadata('./data/meta') @twitter_app.route('/tweets', methods=['POST']) def post_tweets_create (): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] text = request.form.get('text') reply_to_tweet_id = request.form.get('reply_to_tweet_id') quote_tweet_id = request.form.get('quote_tweet_id') tweet_source = ApiV2TweetSource(token) result = tweet_source.create_tweet(text, reply_to_tweet_id=reply_to_tweet_id, quote_tweet_id=quote_tweet_id) print(result) run_script('on_tweeted', {'twitter_user': g.twitter_user, 'tweet': result}) if 'HX-Request' in request.headers: return render_template('partial/compose-form.html', new_tweet_id=result['data']['id']) else: response_body = json.dumps({ 'result': result }) return jsonify(response_body) @twitter_app.route('/tweet//retweet', methods=['POST']) def post_tweet_retweet (tweet_id): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] tweet_source = ApiV2TweetSource(token) result = tweet_source.retweet(tweet_id, user_id=user_id) print(result) run_script('on_tweeted', {'twitter_user': g.twitter_user, 'retweet': result}) if 'HX-Request' in request.headers: return """retweeted """.replace('{}', url_for('.get_tweet_html', tweet_id=tweet_id)) else: response_body = json.dumps({ 'result': result }) return jsonify(response_body) @twitter_app.route('/tweet//bookmark', methods=['POST']) def post_tweet_bookmark (tweet_id): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] tweet_source = ApiV2TweetSource(token) result = tweet_source.bookmark(tweet_id, user_id=user_id) print(result) if 'HX-Request' in request.headers: return """bookmarked """.replace('{}', url_for('.get_tweet_html', tweet_id=tweet_id)) else: response_body = json.dumps({ 'result': result }) return jsonify(response_body) @twitter_app.route('/tweet//bookmark', methods=['DELETE']) def delete_tweet_bookmark (tweet_id): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] tweet_source = ApiV2TweetSource(token) result = tweet_source.delete_bookmark(tweet_id, user_id=user_id) response_body = json.dumps({ 'result': result }) return jsonify(response_body) @twitter_app.route('/tweet/.html', methods=['GET']) def get_tweet_html (tweet_id): pagination_token = request.args.get('pagination_token') view = request.args.get('view', 'replies') if g.twitter_user: token = g.twitter_user['access_token'] else: token = os.environ.get('BEARER_TOKEN') tweet_source = ApiV2TweetSource(token) only_replies = view == 'replies' tweets = [] if not pagination_token: tweets_response = tweet_source.get_tweet(tweet_id, return_dataclass=True) tweet = tweets_response.data[0] tweets.append(tweet_model_dc_vm(tweets_response.includes, tweet, g.me)) skip_embed_replies = False if view == 'replies': replies_response = tweet_source.get_thread(tweet_id, only_replies=True, pagination_token = pagination_token, return_dataclass=True) elif view == 'thread': skip_embed_replies = True replies_response = tweet_source.get_thread(tweet_id, only_replies=False, author_id=tweets[0].author_id, pagination_token = pagination_token, return_dataclass=True) elif view == 'conversation': replies_response = tweet_source.get_thread(tweet_id, only_replies=False, pagination_token = pagination_token, return_dataclass=True) elif view == 'tweet': replies_response = None next_token = None #print("conversation meta:") #print(json.dumps(tweets_response.get('meta'), indent=2)) if replies_response and replies_response.meta and replies_response.meta.result_count: includes = replies_response.includes tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me, expand_path=request.args.get('expand'), reply_depth=1), replies_response.data)) + tweets next_token = replies_response.meta.next_token # this method is OK except it doesn't work if there are no replies. #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me)) #related_tweets = [] # derived from includes tweets.reverse() query = {} if next_token: query = { **query, # FIXME only_replies 'next_data_url': url_for('.get_tweet_html', tweet_id=tweet_id, pagination_token=next_token, only_replies = '1' if only_replies else '0', author_id = tweets[0].author_id), 'next_page_url': url_for('.get_tweet_html', tweet_id=tweet_id, view=view, pagination_token=next_token) } user = { } if 'HX-Request' in request.headers: # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n")) return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: page_nav = [ dict( href=url_for('.get_tweet_html', tweet_id=tweets[0].conversation_id, view='thread'), label = 'author thread', order = 10 ), dict( href = url_for('.get_tweet_html', tweet_id=tweets[0].conversation_id, view='conversation'), label = 'full convo', order = 20 ) ] tweet = tweets_response.data[0] user = list(filter(lambda u: u.id == tweet.author_id, tweets_response.includes.users))[0] source_url = f'https://twitter.com/{user.username}/status/{tweet_id}' title = f'Tweet by {user.name} at {tweet.created_at}' opengraph_info = dict( type = 'webpage', # threads might be article url = source_url, title = title, description = tweet.text, image = user.profile_image_url ) if view == 'replies': tweet = tweets[0] if tweet.id == '1608510741941989378': unreplied = [ UnrepliedSection( description = "Not clear what GS is still.", span = (40, 80) ) ] tweet = replace(tweet, unreplied = unreplied ) expand_parts = request.args.get('expand') if expand_parts: expand_parts = expand_parts.split(',') def reply_to_thread_item (fi): nonlocal expand_parts if fi.id == '1609714342211244038': print(f'reply_to_thread_item id={fi.id}') unreplied = [ UnrepliedSection( description = "Is there proof of this claim?", span = (40, 80) ) ] fi = replace(fi, unreplied = unreplied ) children = None if expand_parts and len(expand_parts) and fi.id == expand_parts[0]: expand_parts = expand_parts[1:] print(f'getting expanded replied for tweet={fi.id}') expanded_replies_response = tweet_source.get_thread(fi.id, only_replies=True, return_dataclass=True) if expanded_replies_response.data: print('we got expanded responses data') children = list(map(lambda t: tweet_model_dc_vm(expanded_replies_response.includes, t, g.me, expand_path=request.args.get('expand'), reply_depth=1), expanded_replies_response.data)) children = list(map(reply_to_thread_item, children)) return ThreadItem(feed_item=fi, children=children) children = list(map(reply_to_thread_item, tweets[1:])) root = ThreadItem( feed_item = tweet, children = children ) return render_template('tweet-thread.html', user = user, root = root, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info) else: return render_template(f'tweet-collection{theme_variant}.html', user = user, tweets = tweets, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info) @twitter_app.route('/followers/.html', methods=['GET']) def get_followers_html (user_id): if not g.twitter_user: return 'need to log in.', 403 use_cache = request.args.get('use_cache') token = g.twitter_user['access_token'] social_source = TwitterApiV2SocialGraph(token) if use_cache: print(f'using cache for user {user_id}: {use_cache}') with open(f'.data/cache/followers_{user_id}_{use_cache}.json', 'rt') as f: response_json = json.load(f) else: response_json = social_source.get_followers(user_id, max_results=1000, return_dataclass=True) ts = int(time.time() * 1000) print(f'followers cache for {user_id}: {ts}') with open(f'{DATA_DIR}/cache/followers_{user_id}_{ts}.json', 'wt') as f: json.dump(response_json, f, indent=2) #print(response_json) #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': response_json}) #followers = list(map(lambda f: f['id'], response_json.get('data'))) followers = response_json.data followers = list(map(user_model_dc, followers)) return render_template('following.html', users=followers) @twitter_app.route('/following/.html', methods=['GET']) def get_following_html (user_id): if not g.twitter_user: return 'need to log in.', 403 token = g.twitter_user['access_token'] social_source = TwitterApiV2SocialGraph(token) response_json = social_source.get_following(user_id, max_results=1000, return_dataclass=True) ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/following_{user_id}_{ts}.json', 'wt') as f: f.write(json.dumps(response_json)) #print(response_json) #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': response_json}) #following = list(map(lambda f: f['id'], response_json.get('data'))) following = list(map(user_model_dc, response_json.data)) return render_template('following.html', users=following) # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- # HTMx partials # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- def tweet_paginated_timeline (): return @twitter_app.route('/data/tweets/user//media', methods=['GET']) def get_data_tweets_media (user_id): """ Not used anywhere... trying an idea. tweet_model needs to be updated. """ token = g.twitter_user['access_token'] pagination_token = request.args.get('pagination_token') tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_media_tweets(author_id=user_id, has_images=True, is_reply=False, is_retweet=False, pagination_token = pagination_token) includes = response_json.get('includes') tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data'])) next_token = response_json.get('meta').get('next_token') query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_data_tweets_media', user_id=user_id, pagination_token=next_token) } if 'HX-Request' in request.headers: user = { 'id': user_id } return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query) else: response_body = json.dumps({ 'data': tweets, 'query': query }) return jsonify(response_body) # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- # HTMx views # --------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------- @twitter_app.route('/latest.html', methods=['GET']) def get_timeline_home_html (variant = "reverse_chronological", pagination_token=None): if not g.twitter_user: return 'need to login. go to /login.html', 403 user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] if not pagination_token: pagination_token = request.args.get('pagination_token') output_format = request.args.get('format', 'html') tq = cleandict({ 'pagination_token': pagination_token, 'since_id': request.args.get('since_id'), 'until_id': request.args.get('until_id'), 'end_time': request.args.get('end_time') }) tweet_source = ApiV2TweetSource(token) response = tweet_source.get_home_timeline(user_id, **tq) #print(json.dumps(response_json, indent=2)) includes = response.includes tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me), response.data)) next_token = response.meta.next_token tq['pagination_token'] = next_token query = { **tq, 'format': output_format, 'me': g.me } if next_token: query = { **query, #'next_data_url': url_for('.get_data_timeline_home', variant=variant, pagination_token=next_token), 'next_data_url': url_for('.get_timeline_home_html', **tq), 'next_page_url': url_for('.get_timeline_home_html', **tq) } user = { 'id': user_id } if output_format == 'feed.json': return jsonify(cleandict({ 'data': tweets, 'query': query })) elif 'HX-Request' in request.headers: return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query, show_thread_controls=True) else: return render_template('tweet-collection.html', user = user, tweets = tweets, query = query, show_thread_controls=True) @twitter_app.route('/bookmarks.html', methods=['GET']) def get_bookmarks2_html (): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] pagination_token = request.args.get('pagination_token') max_results = int(request.args.get('limit', 10)) collection_page = get_content(f'twitter:bookmarks:{user_id}', pagination_token=pagination_token, max_results=max_results) tweets = collection_page.items next_token = collection_page.next_token query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_bookmarks2_html', user_id=user_id, pagination_token=next_token, limit=max_results), 'next_page_url': url_for('.get_bookmarks2_html', user_id=user_id, pagination_token=next_token, limit=max_results) } user = { 'id': user_id } if 'HX-Request' in request.headers: return render_template(f'partial/tweets-timeline{theme_variant}.html', user = user, tweets = tweets, query = query) else: return render_template(f'tweet-collection{theme_variant}.html', user = user, tweets = tweets, query = query) #@twitter_app.route('/bookmarks.html', methods=['GET']) def get_bookmarks_old_html (): user_id = g.twitter_user['id'] token = g.twitter_user['access_token'] pagination_token = request.args.get('pagination_token') max_results = int(request.args.get('limit', 10)) tweet_source = ApiV2TweetSource(token) response_json = tweet_source.get_bookmarks(user_id, pagination_token = pagination_token, return_dataclass=True, max_results=max_results ) #print(response_json) includes = response_json.includes tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me), response_json.data)) next_token = response_json.meta.next_token query = {} if next_token: query = { **query, 'next_data_url': url_for('.get_bookmarks_html', user_id=user_id, pagination_token=next_token, limit=max_results), 'next_page_url': url_for('.get_bookmarks_html', user_id=user_id, pagination_token=next_token, limit=max_results) } user = { 'id': user_id } ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/bookmarks_{user_id}_{ts}_{pagination_token}.json', 'wt') as f: f.write(json.dumps(response_json)) if 'HX-Request' in request.headers: return render_template(f'partial/tweets-timeline{theme_variant}.html', user = user, tweets = tweets, query = query) else: return render_template(f'tweet-collection{theme_variant}.html', user = user, tweets = tweets, query = query) from hogumathi_app.content_system import get_content @twitter_app.route('/profile/.html', methods=['GET']) def get_profile_html (user_id): me = g.get('me') if g.twitter_user: token = g.twitter_user['access_token'] # issue: retweets don't come back if we request non_public_metrics is_me = False and user_id == g.twitter_user['id'] else: token = os.environ.get('BEARER_TOKEN') is_me = False output_format = request.args.get('format', 'html') pagination_token = request.args.get('pagination_token') exclude_replies = request.args.get('exclude_replies', '0') exclude_retweets = request.args.get('exclude_retweets', '0') query = cleandict({ 'pagination_token': pagination_token, 'exclude_replies': exclude_replies, 'exclude_retweets': exclude_retweets, 'format': output_format }) collection_page = get_content(f'twitter:feed:user:{user_id}', me=me, **query) tweets = collection_page.items next_token = collection_page.next_token if next_token: query = { **query, 'next_data_url': url_for('.get_profile_html', user_id=user_id, pagination_token=next_token, exclude_replies=exclude_replies, exclude_retweets=exclude_retweets), 'next_page_url': url_for('.get_profile_html', user_id=user_id , pagination_token=next_token, exclude_replies=exclude_replies, exclude_retweets=exclude_retweets) } if output_format == 'feed.json': return jsonify(cleandict({ 'data': tweets, 'query': query })) elif 'HX-Request' in request.headers: profile_user = { 'id': user_id } return render_template(f'partial/tweets-timeline{theme_variant}.html', user = profile_user, tweets = tweets, query = query) else: # FIXME the user is probably present in the tweet expansions info. #social_graph = TwitterApiV2SocialGraph(token) #users_response = social_graph.get_user(user_id) #print(users_response) #user = users_response['data'][0] user = get_content(f'twitter:user:{user_id}', me=me) title = f'{user.name} ({user.username})' # FIXME official Twitter or owner's instance? source_url = f'https://www.twitter.com/{user.username}' opengraph_info = dict( type = 'webpage', # threads might be article url = source_url, title = title, description = user.description, image = user.avatar_image_url ) page_nav = [ dict( href = url_for('twitter_v2_facade.get_profile_html', user_id=user.id), label = 'Timeline', order = 10, ), dict ( href = url_for('twitter_v2_facade.get_following_html', user_id=user.id), label = 'Following', order = 40, ), dict ( href = url_for('twitter_v2_facade.get_followers_html', user_id=user.id), label = 'Followers', order = 50, ) ] if not g.twitter_user: for uid, acct in session.items(): if uid.startswith('twitter:'): page_nav += [ dict( href = url_for('twitter_v2_facade.get_profile_html', user_id=user_id, me=uid), label = f'View as {acct["id"]}', order = 1000, ) ] if g.twitter_live_enabled: page_nav += [ dict( href = url_for('twitter_v2_live_facade.get_likes_html', user_id=user.id), label = 'Likes', order = 20, ), dict ( href = url_for('twitter_v2_live_facade.get_mentions_html', user_id=user.id), label = 'Mentions', order = 30, ), dict ( href = url_for('twitter_v2_live_facade.get_user_activity_html', user_id=user.id), label = 'Activity', order = 60, ) ] top8 = get_top8(user_id) brand = None brand_info = {} if g.twitter_live_enabled: brand = get_content(f'brand:search:account:twitter:{user_id}', expand=False) if brand: page_nav += [ dict( href = url_for('brands.get_brand_html', brand_id=brand['id']), label = 'Brand Page', order = 5000, ) ] brand_info = brand.get('expanded', {}) return render_template(f'user-profile{theme_variant}.html', user = user.raw_user, user_dc = user, tweets = tweets, query = query, opengraph_info=opengraph_info, page_nav = page_nav, top8=top8, brand=brand, **brand_info ) @twitter_app.route('/users.html', methods=['GET']) def get_users (): ids = request.args.get('ids') if not ids: return 'supply ids=', 400 token = g.twitter_user['access_token'] tweet_source = TwitterApiV2SocialGraph(token) response_json = tweet_source.get_users(ids) ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/users_{ts}.json', 'wt') as f: f.write(json.dumps(response_json)) #print(response_json) #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': response_json}) #following = list(map(lambda f: f['id'], response_json.get('data'))) users = response_json.get('data') return render_template('following.html', users=users) @twitter_app.route('/media/upload', methods=['POST']) def post_media_upload (): token = g.twitter_user['access_token'] form = { 'media_category': 'tweet_image' } headers = { 'Authorization': 'Bearer {}'.format(token) } url = 'http://localhost:5004/twitter/fake-twitter/media/upload' #url = 'https://upload.twitter.com/1.1/media/upload.json' # .json upload_media = {} for e in request.files.items(): media_name = e[0] f = e[1] print('.') files = {'media': [secure_filename(f.filename), BufferedReader(f), f.content_type]} response = requests.post(url, files=files, data=form, headers=headers) print(response.status_code) print(response.text) response_json = json.loads(response.text) upload_media[media_name] = response_json return jsonify({'upload_media': upload_media}) @twitter_app.route('/fake-twitter/media/upload', methods=['POST']) def post_media_upload2 (): print(request.content_type) f = request.files.get('media') f.seek(0,2) media_size = f.tell() media = { #'_auth': request.headers.get('Authorization'), 'media_key': '3_{}'.format(secure_filename(f.filename)), 'media_id': secure_filename(f.filename), 'size': media_size, 'expires_after_secs': 86400, 'image': { 'image_type': f.content_type, 'w': 1, 'h': 1 } } return jsonify(media) def get_nav_items (): nav_items = [ ] twitter_user = g.get('twitter_user') me = g.get('me') if twitter_user: nav_items += [ dict( href = url_for('twitter_v2_facade.get_timeline_home_html'), label = 'Latest Tweets', order = 0 ), dict ( href = url_for('twitter_v2_facade.get_bookmarks2_html'), label = 'Bookmarks', order = 100 ), dict ( href = url_for('twitter_v2_facade.get_profile_html', user_id=twitter_user['id']), label = 'My Profile', order = 200 ), dict ( href = url_for('twitter_v2_facade.oauth2_login.get_logout_html'), label = f'Logout ({me})', order = 1000 ) ] if g.get('twitter_live_enabled'): nav_items += [ dict ( href = url_for('twitter_v2_live_facade.get_conversations_html'), label = 'DMs', order = 10 ), dict ( href = url_for('twitter_v2_live_facade.get_mentions_html', user_id=twitter_user['id']), label = 'Mentions', order = 20 ) ] return nav_items @twitter_app.before_request def add_module_nav_items_to_template_context (): g.module_nav = get_nav_items() def get_top8 (user_id): if user_id != '14520320': return return [ dict( id='14520320' ), dict( id='14520320' ), dict( id='14520320' ), dict( id='14520320' ), dict( id='14520320' ), dict( id='14520320' ), dict( id='14520320' ), dict( id='14520320' ), ]