""" This translates from the Tweet Source and Twitter v2 types Into ViewModel types such as FeedItem And the rest of the Taxonomy. """ from dataclasses import asdict from typing import List, Optional import os from flask import session, g, request import time from datetime import datetime, timezone import json import sqlite3 from twitter_v2.api import ApiV2TweetSource, TwitterApiV2SocialGraph, ApiV2ConversationSource import hogumathi_app.view_model as h_vm from hogumathi_app.view_model import CollectionPage, cleandict from hogumathi_app.content_system import register_content_source, get_content, register_hook from .view_model import tweet_model_dc_vm, user_model_dc DATA_DIR='.data' CACHE_PATH = f'{DATA_DIR}/twitter_v2_cache.db' def init_cache_db (): db = sqlite3.connect(CACHE_PATH) cur = db.cursor() table_exists = cur.execute(f"SELECT count(*) FROM sqlite_master WHERE type='table' AND name='tweet'").fetchone()[0] if not table_exists: cur.execute(""" create table query ( created_at timestamp, user_id text, last_accessed_at timestamp, next_token text, query_type text, auth_user_id text ) """) cur.execute(""" create table tweet ( id text, accessed_at timestamp, query_id int, data text, unique(id, query_id) ) """) cur.execute(""" create table user ( id text, accessed_at timestamp, query_id int, data text, unique(id, query_id) ) """) cur.execute(""" create table medium ( id text, accessed_at timestamp, query_id int, data text, unique(id, query_id) ) """) cur.connection.commit() print(f'--- created {CACHE_PATH}') cur.close() def cache_tweets_response (response_tweets, query_type, auth_user_id, user_id = None, pagination_token=None, ts=None): """ In bookmarks I observed that the same next_token is returned even with distinct new queries started. So in the case of abandoned paginations, we can end up with duplicate next_token records, meaning we could update the wrong query_id, having downstream timestamp effects. """ includes = response_tweets.includes tweets = response_tweets.data or [] users = includes and includes.users or [] media = includes and includes.media or [] next_token = response_tweets.meta.next_token db = sqlite3.connect(CACHE_PATH) cur = db.cursor() # SQLite is naive by default, so make sure this is UTC. now = datetime.now(timezone.utc) if ts: now = ts if not pagination_token: cur.execute(""" insert into query ( created_at, last_accessed_at, user_id, next_token, query_type, auth_user_id ) values ( ?,?,?,?,?,? ) """, [now, now, user_id, next_token, query_type, auth_user_id] ) query_id = cur.lastrowid else: query_id = cur.execute(""" select rowid from query where next_token = :next_token """, { 'next_token': pagination_token }).fetchone()[0] cur.execute(""" update query set last_accessed_at = :last_accessed_at, next_token = :next_token where rowid = :query_id """, { 'last_accessed_at': now, 'next_token': next_token, 'query_id': query_id }) for tweet in tweets: tweet_json = json.dumps(cleandict(asdict(tweet))) cur.execute(""" insert or ignore into tweet ( id, accessed_at, query_id, data ) values ( ?,?,?,? ) """, [ tweet.id, now, query_id, tweet_json ] ) for user in users: user_json = json.dumps(cleandict(asdict(user))) cur.execute(""" insert or ignore into user ( id, accessed_at, query_id, data ) values ( ?,?,?,? ) """, [ user.id, now, query_id, user_json ] ) for medium in media: medium_json = json.dumps(cleandict(asdict(medium))) cur.execute(""" insert or ignore into medium ( id, accessed_at, query_id, data ) values ( ?,?,?,? ) """, [ medium.media_key, now, query_id, medium_json ] ) cur.connection.commit() cur.close() def cache_users_response (response_users, query_type, auth_user_id, user_id = None, pagination_token=None, ts=None): users = response_users.data or [] next_token = response_users.meta and response_users.meta.get('next_token') db = sqlite3.connect(CACHE_PATH) cur = db.cursor() # SQLite is naive by default, so make sure this is UTC. now = None if ts: now = ts if not pagination_token: cur.execute(""" insert into query ( created_at, last_accessed_at, user_id, next_token, query_type, auth_user_id ) values ( ?,?,?,?,?,? ) """, [now, now, user_id, next_token, query_type, auth_user_id] ) query_id = cur.lastrowid else: query_id = cur.execute(""" select rowid from query where next_token = :next_token """, { 'next_token': pagination_token }).fetchone()[0] cur.execute(""" update query set last_accessed_at = :last_accessed_at, next_token = :next_token where rowid = :query_id """, { 'last_accessed_at': now, 'next_token': next_token, 'query_id': query_id }) for user in users: user_json = json.dumps(cleandict(asdict(user))) cur.execute(""" insert or ignore into user ( id, accessed_at, query_id, data ) values ( ?,?,?,? ) """, [ user.id, now, query_id, user_json ] ) cur.connection.commit() cur.close() def get_cached_query (query_type, auth_user_id, user_id=None): sql = """ select * from query where (auth_user_id in ('14520320') or auth_user_id is null) and query_type = 'bookmarks' """ results = [] next_token = None return results, next_token def get_object_over_time (obj_type, obj_id, auth_user_id): cur = None results = cur.execute(f""" --select id, count(*) c from tweet group by id having c > 1 select t.* from {obj_type} t, query q where t.id = :obj_id and q.rowid = t.query_id and (q.auth_user_id in (:auth_user_id) or q.auth_user_id is null) """, { 'obj_id': obj_id, 'auth_user_id': auth_user_id }) results = [] next_token = None return results, next_token def get_tweet_item (tweet_id, me=None): if me: twitter_user = session.get(me) token = twitter_user['access_token'] else: token = os.environ.get('BEARER_TOKEN') tweet_source = ApiV2TweetSource(token) tweets_response = tweet_source.get_tweet(tweet_id, return_dataclass=True) #print(response_json) if tweets_response.errors: # types: # https://api.twitter.com/2/problems/not-authorized-for-resource (blocked or suspended) # https://api.twitter.com/2/problems/resource-not-found (deleted) #print(response_json.get('errors')) for err in tweets_response.errors: if not 'type' in err: print('unknown error type: ' + str(err)) elif err['type'] == 'https://api.twitter.com/2/problems/not-authorized-for-resource': print('blocked or suspended tweet: ' + err['value']) elif err['type'] == 'https://api.twitter.com/2/problems/resource-not-found': print('deleted tweet: ' + err['value']) else: print('unknown error') print(json.dumps(err, indent=2)) includes = tweets_response.includes tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), tweets_response.data)) collection_page = CollectionPage( id = tweet_id, items = tweets, next_token = None # Fixme ) return collection_page def tweet_embed_template (tweet_id): features = '{"tfw_timeline_list":{"bucket":[],"version":null},"tfw_follower_count_sunset":{"bucket":true,"version":null},"tfw_tweet_edit_backend":{"bucket":"on","version":null},"tfw_refsrc_session":{"bucket":"on","version":null},"tfw_mixed_media_15897":{"bucket":"treatment","version":null},"tfw_experiments_cookie_expiration":{"bucket":1209600,"version":null},"tfw_duplicate_scribes_to_settings":{"bucket":"on","version":null},"tfw_video_hls_dynamic_manifests_15082":{"bucket":"true_bitrate","version":null},"tfw_legacy_timeline_sunset":{"bucket":true,"version":null},"tfw_tweet_edit_frontend":{"bucket":"on","version":null}}' # base64 + encode URI component features_encoded = 'eyJ0ZndfdGltZWxpbmVfbGlzdCI6eyJidWNrZXQiOltdLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X2ZvbGxvd2VyX2NvdW50X3N1bnNldCI6eyJidWNrZXQiOnRydWUsInZlcnNpb24iOm51bGx9LCJ0ZndfdHdlZXRfZWRpdF9iYWNrZW5kIjp7ImJ1Y2tldCI6Im9uIiwidmVyc2lvbiI6bnVsbH0sInRmd19yZWZzcmNfc2Vzc2lvbiI6eyJidWNrZXQiOiJvbiIsInZlcnNpb24iOm51bGx9LCJ0ZndfbWl4ZWRfbWVkaWFfMTU4OTciOnsiYnVja2V0IjoidHJlYXRtZW50IiwidmVyc2lvbiI6bnVsbH0sInRmd19leHBlcmltZW50c19jb29raWVfZXhwaXJhdGlvbiI6eyJidWNrZXQiOjEyMDk2MDAsInZlcnNpb24iOm51bGx9LCJ0ZndfZHVwbGljYXRlX3NjcmliZXNfdG9fc2V0dGluZ3MiOnsiYnVja2V0Ijoib24iLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X3ZpZGVvX2hsc19keW5hbWljX21hbmlmZXN0c18xNTA4MiI6eyJidWNrZXQiOiJ0cnVlX2JpdHJhdGUiLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X2xlZ2FjeV90aW1lbGluZV9zdW5zZXQiOnsiYnVja2V0Ijp0cnVlLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X3R3ZWV0X2VkaXRfZnJvbnRlbmQiOnsiYnVja2V0Ijoib24iLCJ2ZXJzaW9uIjpudWxsfX0%3D' origin = f"http%3A%2F%2Flocalhost%3A5004%2Ftwitter%2Ftweet2%2F{tweet_id}.html" width = 550 height = 755 theme = "dark" # or light hide_card = "false" hide_thread = "false" src = f"https://platform.twitter.com/embed/Tweet.html?dnt=true&features={features_encoded}&origin={origin}&frame=false&hideCard={hide_card}&hideThread={hide_thread}&id={tweet_id}&lang=en&theme=dark&width={width}px" html = f""" """ return html # https://developer.twitter.com/en/docs/twitter-for-websites/embedded-tweets/overview def get_tweet_embed (tweet_id): html = tweet_embed_template(tweet_id) post = h_vm.FeedItem( id = tweet_id, created_at = 'some time', display_name = 'Twitter User', handle = 'tweetuser', html = html ) return post def get_bookmarks_feed (user_id, pagination_token=None, max_results=10, me=None): if not me: me = g.get('me') or request.args.get('me') print(f'get_bookmarks_feed. me={me}') twitter_user = session.get( me ) if not twitter_user: return None token = twitter_user['access_token'] tweet_source = ApiV2TweetSource(token) response_tweets = tweet_source.get_bookmarks(user_id, pagination_token = pagination_token, return_dataclass=True, max_results=max_results ) #print(response_json) cache_tweets_response(response_tweets, 'bookmarks', user_id, user_id=user_id, pagination_token=pagination_token) includes = response_tweets.includes tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), response_tweets.data)) next_token = response_tweets.meta.next_token query = {} if next_token: query = { **query, } user = { 'id': user_id } ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/bookmarks_{user_id}_{ts}_{pagination_token}.json', 'wt') as f: f.write(json.dumps(cleandict(asdict(response_tweets)))) collection_page = CollectionPage( id = user_id, # FIXME this should perhaps be the unresolved id items = tweets, next_token = next_token ) return collection_page def get_user_feed (user_id, me=None, **twitter_kwargs): if not me and 'me' in g: me = g.me if 'twitter_user' in g and g.twitter_user: token = g.twitter_user['access_token'] # issue: retweets don't come back if we request non_public_metrics is_me = False and user_id == g.twitter_user['id'] auth_user_id = g.twitter_user['id'] else: token = os.environ.get('BEARER_TOKEN') is_me = False auth_user_id = None tweet_source = ApiV2TweetSource(token) tweets_response = tweet_source.get_user_timeline(user_id, return_dataclass=True, **twitter_kwargs) tweets = None if not tweets_response: print('no response_json') if tweets_response.meta and tweets_response.meta.result_count == 0: print('no results') print(tweets_response) if not tweets_response.includes: print(tweets_response) print('no tweets_response.includes') if tweets_response.errors: print('profile get_user_timeline errors:') print(tweets_response.errors) pagination_token=twitter_kwargs.get('pagination_token') cache_tweets_response(tweets_response, 'user_feed', auth_user_id, user_id=user_id, pagination_token=pagination_token) ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/tl_{user_id}_{ts}_{pagination_token}.json', 'wt') as f: f.write(json.dumps(cleandict(asdict(tweets_response)))) if tweets_response.data: tweets = list(map(lambda t: tweet_model_dc_vm(tweets_response.includes, t, me), tweets_response.data)) next_token = tweets_response.meta.next_token collection_page = CollectionPage( id = user_id, items = tweets, next_token = next_token ) return collection_page def get_tweets_collection (content_ids, pagination_token=None, max_results=None): """ We might be able to have a generalizer in the content system as well... If a source exposes a get many interface then use it. We want to avoid many singular fetches. """ return [] def get_user (user_id, me=None) -> Optional[h_vm.FeedServiceUser]: users = get_users([user_id], me=me) if users: return users[0] def get_users (content_ids, me=None, pagination_token=None) -> Optional[List[h_vm.FeedServiceUser]]: """ """ if me: twitter_user = session.get(me) token = twitter_user['access_token'] auth_user_id = twitter_user['id'] else: token = os.environ.get('BEARER_TOKEN') auth_user_id = None social_graph = TwitterApiV2SocialGraph(token) users_response = social_graph.get_users(content_ids, return_dataclass=True) if not len(users_response.data): return cache_users_response(users_response, f'users', auth_user_id, pagination_token=pagination_token) users = list(map(user_model_dc, users_response.data)) return users def get_home_feed (user_id, me, **query_kwargs): twitter_user = session.get(me) token = twitter_user['access_token'] auth_user_id = twitter_user['id'] tweet_source = ApiV2TweetSource(token) response = tweet_source.get_home_timeline(user_id, **query_kwargs) #print(json.dumps(response_json, indent=2)) pagination_token = query_kwargs.get('pagination_token') cache_tweets_response(response, 'home_feed', auth_user_id, user_id=user_id, pagination_token=pagination_token) includes = response.includes tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), response.data)) next_token = response.meta.next_token collection_page = CollectionPage( id = user_id, items = tweets, next_token = next_token ) return collection_page def get_author_threads (user_id): """ Placeholder implementation where we can manually add threads to a collection, but ultimately we will query a local Tweet DB that gets populated through various means. Once we store Tweets we can easily query this. We can filter by author_id,conversation_id order by in_reply_to_tweet_id,id """ return get_content(f'collection:twitter.threads_{user_id}') def get_tweet_replies (conversation_id, in_reply_to_id=None, pagination_token=None, max_results=None, author_id=None): """ New function, not used yet """ tweet_source = ApiV2TweetSource(token) auth_user_id = None only_replies = view == 'replies' tweets = [] skip_embed_replies = False if view == 'replies': replies_response = tweet_source.get_thread(in_reply_to_id, only_replies=True, pagination_token = pagination_token, return_dataclass=True) elif view == 'thread': skip_embed_replies = True replies_response = tweet_source.get_thread(conversation_id, only_replies=False, author_id=author_id, pagination_token = pagination_token, return_dataclass=True) elif view == 'conversation': replies_response = tweet_source.get_thread(conversation_id, only_replies=False, pagination_token = pagination_token, return_dataclass=True) elif view == 'tweet': replies_response = None next_token = None #print("conversation meta:") #print(json.dumps(tweets_response.get('meta'), indent=2)) if replies_response and replies_response.meta and replies_response.meta.result_count: cache_tweets_response(replies_response, 'tweet_replies', auth_user_id, user_id=user_id, pagination_token=pagination_token) includes = replies_response.includes tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me, expand_path=request.args.get('expand'), reply_depth=1), replies_response.data)) + tweets next_token = replies_response.meta.next_token # this method is OK except it doesn't work if there are no replies. #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me)) #related_tweets = [] # derived from includes tweets.reverse() query = {} if next_token: query = { **query, # FIXME only_replies 'next_data_url': url_for('.get_tweet2_html', tweet_id=tweet_id, pagination_token=next_token, only_replies = '1' if only_replies else '0', author_id = tweets[0].author_id), 'next_page_url': url_for('.get_tweet2_html', tweet_id=tweet_id, view=view, pagination_token=next_token) } user = { } if view == 'replies': tweet = tweets[0] if tweet.id == '1608510741941989378': unreplied = [ UnrepliedSection( description = "Not clear what GS is still.", span = (40, 80) ) ] tweet = replace(tweet, unreplied = unreplied ) expand_parts = request.args.get('expand') if expand_parts: expand_parts = expand_parts.split(',') def reply_to_thread_item (fi): nonlocal expand_parts if fi.id == '1609714342211244038': print(f'reply_to_thread_item id={fi.id}') unreplied = [ UnrepliedSection( description = "Is there proof of this claim?", span = (40, 80) ) ] fi = replace(fi, unreplied = unreplied ) children = None if expand_parts and len(expand_parts) and fi.id == expand_parts[0]: expand_parts = expand_parts[1:] print(f'getting expanded replied for tweet={fi.id}') expanded_replies_response = tweet_source.get_thread(fi.id, only_replies=True, return_dataclass=True) if expanded_replies_response.data: print('we got expanded responses data') children = list(map(lambda t: tweet_model_dc_vm(expanded_replies_response.includes, t, g.me, expand_path=request.args.get('expand'), reply_depth=1), expanded_replies_response.data)) children = list(map(reply_to_thread_item, children)) return ThreadItem(feed_item=fi, children=children) children = list(map(reply_to_thread_item, tweets[1:])) root = ThreadItem( feed_item = tweet, children = children ) return render_template('tweet-thread.html', user = user, root = root, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info) else: return render_template(f'tweet-collection{theme_variant}.html', user = user, tweets = tweets, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info) def get_following_users (user_id, me=None, max_results=1000, pagination_token=None): if me: twitter_user = session.get(me) token = twitter_user['access_token'] auth_user_id = twitter_user['id'] else: token = os.environ.get('BEARER_TOKEN') auth_user_id = None social_source = TwitterApiV2SocialGraph(token) following_resp = social_source.get_following(user_id, max_results=max_results, pagination_token=pagination_token, return_dataclass=True) cache_users_response(following_resp, 'following', auth_user_id, user_id = user_id, pagination_token=pagination_token) ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/following_{user_id}_{ts}_{pagination_token}.json', 'wt') as f: f.write(json.dumps(cleandict(asdict(following_resp)))) #print(following_resp) #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': following_resp}) #following = list(map(lambda f: f['id'], following_resp.get('data'))) following = list(map(user_model_dc, following_resp.data)) total_count = following_resp.meta.get('result_count') next_token = following_resp.meta.get('next_token') collection_page = CollectionPage( id = user_id, items = following, total_count = total_count, next_token = next_token ) return collection_page def get_followers_user (user_id, me=None, max_results=1000, pagination_token=None): if me: twitter_user = session.get(me) token = twitter_user['access_token'] auth_user_id = twitter_user['id'] else: token = os.environ.get('BEARER_TOKEN') auth_user_id = None use_cache = False # this concept is broken for now if use_cache: # this concept is broken for now print(f'using cache for user {user_id}: {use_cache}') with open(f'.data/cache/followers_{user_id}_{pagination_token}_{use_cache}.json', 'rt') as f: response_json = json.load(f) else: social_source = TwitterApiV2SocialGraph(token) followers_resp = social_source.get_followers(user_id, max_results=max_results, pagination_token=pagination_token, return_dataclass=True) ts = int(time.time() * 1000) print(f'followers cache for {user_id}: {ts}') cache_users_response(followers_resp, 'followers', auth_user_id, user_id = user_id, pagination_token=pagination_token) with open(f'{DATA_DIR}/cache/followers_{user_id}_{ts}.json', 'wt') as f: json.dump(cleandict(asdict(followers_resp)), f, indent=2) #print(followers_resp) #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': followers_resp}) #followers = list(map(lambda f: f['id'], followers_resp.get('data'))) followers = followers_resp.data followers = list(map(user_model_dc, followers)) followers = list(map(user_model_dc, followers_resp.data)) total_count = followers_resp.meta.get('result_count') next_token = followers_resp.meta.get('next_token') collection_page = CollectionPage( id = user_id, items = followers, total_count = total_count, next_token = next_token ) return collection_page def register_content_sources (): init_cache_db() register_content_source('twitter:tweets', get_tweets_collection, id_pattern='') register_content_source('twitter:tweet:', get_tweet_item, id_pattern='(?P\d+)') register_content_source('twitter:tweet:', get_tweet_embed, id_pattern='(?P\d+)') register_content_source('twitter:bookmarks:', get_bookmarks_feed, id_pattern='(?P\d+)') register_content_source('twitter:feed:user:', get_user_feed, id_pattern='(?P\d+)') register_content_source('twitter:user:', get_user, id_pattern='(?P\d+)') register_content_source('twitter:users', get_users, id_pattern='') register_content_source('twitter:feed:reverse_chronological:user:', get_home_feed, id_pattern='(?P\d+)') register_content_source('twitter:tweets:replies:', get_tweet_replies, id_pattern='(?P\d+)') register_content_source('twitter:following:users:', get_following_users, id_pattern='(?P\d+)') register_content_source('twitter:followers:user:', get_followers_user, id_pattern='(?P\d+)') register_content_source('twitter:threads:user:', get_author_threads, id_pattern='(?P\d+)')