"""
This translates from the Tweet Source and Twitter v2 types
Into ViewModel types such as FeedItem
And the rest of the Taxonomy.
"""
from dataclasses import asdict
from typing import List, Optional
import os
from flask import session, g, request
import time
from datetime import datetime, timezone
import json
import sqlite3
from twitter_v2.api import ApiV2TweetSource, TwitterApiV2SocialGraph, ApiV2ConversationSource
import hogumathi_app.view_model as h_vm
from hogumathi_app.view_model import CollectionPage, cleandict
from hogumathi_app.content_system import register_content_source, get_content, register_hook
from .view_model import tweet_model_dc_vm, user_model_dc
DATA_DIR='.data'
CACHE_PATH = f'{DATA_DIR}/twitter_v2_cache.db'
def init_cache_db ():
db = sqlite3.connect(CACHE_PATH)
cur = db.cursor()
table_exists = cur.execute(f"SELECT count(*) FROM sqlite_master WHERE type='table' AND name='tweet'").fetchone()[0]
if not table_exists:
cur.execute("""
create table query (
created_at timestamp,
user_id text,
last_accessed_at timestamp,
next_token text,
query_type text,
auth_user_id text
)
""")
cur.execute("""
create table tweet (
id text,
accessed_at timestamp,
query_id int,
data text,
unique(id, query_id)
)
""")
cur.execute("""
create table user (
id text,
accessed_at timestamp,
query_id int,
data text,
unique(id, query_id)
)
""")
cur.execute("""
create table medium (
id text,
accessed_at timestamp,
query_id int,
data text,
unique(id, query_id)
)
""")
cur.connection.commit()
print(f'--- created {CACHE_PATH}')
cur.close()
def cache_tweets_response (response_tweets, query_type, auth_user_id, user_id = None, pagination_token=None, ts=None):
"""
In bookmarks I observed that the same next_token is returned even with distinct new queries started.
So in the case of abandoned paginations, we can end up with duplicate next_token records,
meaning we could update the wrong query_id, having downstream timestamp effects.
"""
includes = response_tweets.includes
tweets = response_tweets.data or []
users = includes and includes.users or []
media = includes and includes.media or []
next_token = response_tweets.meta.next_token
db = sqlite3.connect(CACHE_PATH)
cur = db.cursor()
# SQLite is naive by default, so make sure this is UTC.
now = datetime.now(timezone.utc)
if ts:
now = ts
if not pagination_token:
cur.execute("""
insert into query (
created_at,
last_accessed_at,
user_id,
next_token,
query_type,
auth_user_id
)
values (
?,?,?,?,?,?
)
""",
[now, now, user_id, next_token, query_type, auth_user_id]
)
query_id = cur.lastrowid
else:
query_id = cur.execute("""
select rowid from query
where next_token = :next_token
""",
{
'next_token': pagination_token
}).fetchone()[0]
cur.execute("""
update query set
last_accessed_at = :last_accessed_at,
next_token = :next_token
where rowid = :query_id
""",
{
'last_accessed_at': now,
'next_token': next_token,
'query_id': query_id
})
for tweet in tweets:
tweet_json = json.dumps(cleandict(asdict(tweet)))
cur.execute("""
insert or ignore into tweet (
id,
accessed_at,
query_id,
data
)
values (
?,?,?,?
)
""",
[ tweet.id, now, query_id, tweet_json ]
)
for user in users:
user_json = json.dumps(cleandict(asdict(user)))
cur.execute("""
insert or ignore into user (
id,
accessed_at,
query_id,
data
)
values (
?,?,?,?
)
""",
[ user.id, now, query_id, user_json ]
)
for medium in media:
medium_json = json.dumps(cleandict(asdict(medium)))
cur.execute("""
insert or ignore into medium (
id,
accessed_at,
query_id,
data
)
values (
?,?,?,?
)
""",
[ medium.media_key, now, query_id, medium_json ]
)
cur.connection.commit()
cur.close()
def cache_users_response (response_users, query_type, auth_user_id, user_id = None, pagination_token=None, ts=None):
users = response_users.data or []
next_token = response_users.meta and response_users.meta.get('next_token')
db = sqlite3.connect(CACHE_PATH)
cur = db.cursor()
# SQLite is naive by default, so make sure this is UTC.
now = None
if ts:
now = ts
if not pagination_token:
cur.execute("""
insert into query (
created_at,
last_accessed_at,
user_id,
next_token,
query_type,
auth_user_id
)
values (
?,?,?,?,?,?
)
""",
[now, now, user_id, next_token, query_type, auth_user_id]
)
query_id = cur.lastrowid
else:
query_id = cur.execute("""
select rowid from query
where next_token = :next_token
""",
{
'next_token': pagination_token
}).fetchone()[0]
cur.execute("""
update query set
last_accessed_at = :last_accessed_at,
next_token = :next_token
where rowid = :query_id
""",
{
'last_accessed_at': now,
'next_token': next_token,
'query_id': query_id
})
for user in users:
user_json = json.dumps(cleandict(asdict(user)))
cur.execute("""
insert or ignore into user (
id,
accessed_at,
query_id,
data
)
values (
?,?,?,?
)
""",
[ user.id, now, query_id, user_json ]
)
cur.connection.commit()
cur.close()
def get_cached_query (query_type, auth_user_id, user_id=None):
sql = """
select * from query
where
(auth_user_id in ('14520320') or auth_user_id is null)
and query_type = 'bookmarks'
"""
results = []
next_token = None
return results, next_token
def get_object_over_time (obj_type, obj_id, auth_user_id):
cur = None
results = cur.execute(f"""
--select id, count(*) c from tweet group by id having c > 1
select t.*
from {obj_type} t, query q
where
t.id = :obj_id
and q.rowid = t.query_id
and (q.auth_user_id in (:auth_user_id) or q.auth_user_id is null)
""",
{
'obj_id': obj_id,
'auth_user_id': auth_user_id
})
results = []
next_token = None
return results, next_token
def get_tweet_item (tweet_id, me=None):
if me:
twitter_user = session.get(me)
token = twitter_user['access_token']
else:
token = os.environ.get('BEARER_TOKEN')
tweet_source = ApiV2TweetSource(token)
tweets_response = tweet_source.get_tweet(tweet_id, return_dataclass=True)
#print(response_json)
if tweets_response.errors:
# types:
# https://api.twitter.com/2/problems/not-authorized-for-resource (blocked or suspended)
# https://api.twitter.com/2/problems/resource-not-found (deleted)
#print(response_json.get('errors'))
for err in tweets_response.errors:
if not 'type' in err:
print('unknown error type: ' + str(err))
elif err['type'] == 'https://api.twitter.com/2/problems/not-authorized-for-resource':
print('blocked or suspended tweet: ' + err['value'])
elif err['type'] == 'https://api.twitter.com/2/problems/resource-not-found':
print('deleted tweet: ' + err['value'])
else:
print('unknown error')
print(json.dumps(err, indent=2))
includes = tweets_response.includes
tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), tweets_response.data))
collection_page = CollectionPage(
id = tweet_id,
items = tweets,
next_token = None # Fixme
)
return collection_page
def tweet_embed_template (tweet_id):
features = '{"tfw_timeline_list":{"bucket":[],"version":null},"tfw_follower_count_sunset":{"bucket":true,"version":null},"tfw_tweet_edit_backend":{"bucket":"on","version":null},"tfw_refsrc_session":{"bucket":"on","version":null},"tfw_mixed_media_15897":{"bucket":"treatment","version":null},"tfw_experiments_cookie_expiration":{"bucket":1209600,"version":null},"tfw_duplicate_scribes_to_settings":{"bucket":"on","version":null},"tfw_video_hls_dynamic_manifests_15082":{"bucket":"true_bitrate","version":null},"tfw_legacy_timeline_sunset":{"bucket":true,"version":null},"tfw_tweet_edit_frontend":{"bucket":"on","version":null}}'
# base64 + encode URI component
features_encoded = 'eyJ0ZndfdGltZWxpbmVfbGlzdCI6eyJidWNrZXQiOltdLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X2ZvbGxvd2VyX2NvdW50X3N1bnNldCI6eyJidWNrZXQiOnRydWUsInZlcnNpb24iOm51bGx9LCJ0ZndfdHdlZXRfZWRpdF9iYWNrZW5kIjp7ImJ1Y2tldCI6Im9uIiwidmVyc2lvbiI6bnVsbH0sInRmd19yZWZzcmNfc2Vzc2lvbiI6eyJidWNrZXQiOiJvbiIsInZlcnNpb24iOm51bGx9LCJ0ZndfbWl4ZWRfbWVkaWFfMTU4OTciOnsiYnVja2V0IjoidHJlYXRtZW50IiwidmVyc2lvbiI6bnVsbH0sInRmd19leHBlcmltZW50c19jb29raWVfZXhwaXJhdGlvbiI6eyJidWNrZXQiOjEyMDk2MDAsInZlcnNpb24iOm51bGx9LCJ0ZndfZHVwbGljYXRlX3NjcmliZXNfdG9fc2V0dGluZ3MiOnsiYnVja2V0Ijoib24iLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X3ZpZGVvX2hsc19keW5hbWljX21hbmlmZXN0c18xNTA4MiI6eyJidWNrZXQiOiJ0cnVlX2JpdHJhdGUiLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X2xlZ2FjeV90aW1lbGluZV9zdW5zZXQiOnsiYnVja2V0Ijp0cnVlLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X3R3ZWV0X2VkaXRfZnJvbnRlbmQiOnsiYnVja2V0Ijoib24iLCJ2ZXJzaW9uIjpudWxsfX0%3D'
origin = f"http%3A%2F%2Flocalhost%3A5004%2Ftwitter%2Ftweet2%2F{tweet_id}.html"
width = 550
height = 755
theme = "dark" # or light
hide_card = "false"
hide_thread = "false"
src = f"https://platform.twitter.com/embed/Tweet.html?dnt=true&features={features_encoded}&origin={origin}&frame=false&hideCard={hide_card}&hideThread={hide_thread}&id={tweet_id}&lang=en&theme=dark&width={width}px"
html = f"""
"""
return html
# https://developer.twitter.com/en/docs/twitter-for-websites/embedded-tweets/overview
def get_tweet_embed (tweet_id):
html = tweet_embed_template(tweet_id)
post = h_vm.FeedItem(
id = tweet_id,
created_at = 'some time',
display_name = 'Twitter User',
handle = 'tweetuser',
html = html
)
return post
def get_bookmarks_feed (user_id, pagination_token=None, max_results=10, me=None):
if not me:
me = g.get('me') or request.args.get('me')
print(f'get_bookmarks_feed. me={me}')
twitter_user = session.get( me )
if not twitter_user:
return None
token = twitter_user['access_token']
tweet_source = ApiV2TweetSource(token)
response_tweets = tweet_source.get_bookmarks(user_id,
pagination_token = pagination_token,
return_dataclass=True,
max_results=max_results
)
#print(response_json)
cache_tweets_response(response_tweets, 'bookmarks', user_id, user_id=user_id, pagination_token=pagination_token)
includes = response_tweets.includes
tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), response_tweets.data))
next_token = response_tweets.meta.next_token
query = {}
if next_token:
query = {
**query,
}
user = {
'id': user_id
}
ts = int(time.time() * 1000)
with open(f'{DATA_DIR}/cache/bookmarks_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
f.write(json.dumps(cleandict(asdict(response_tweets))))
collection_page = CollectionPage(
id = user_id, # FIXME this should perhaps be the unresolved id
items = tweets,
next_token = next_token
)
return collection_page
def get_user_feed (user_id, me=None, **twitter_kwargs):
if not me and 'me' in g:
me = g.me
if 'twitter_user' in g and g.twitter_user:
token = g.twitter_user['access_token']
# issue: retweets don't come back if we request non_public_metrics
is_me = False and user_id == g.twitter_user['id']
auth_user_id = g.twitter_user['id']
else:
token = os.environ.get('BEARER_TOKEN')
is_me = False
auth_user_id = None
tweet_source = ApiV2TweetSource(token)
tweets_response = tweet_source.get_user_timeline(user_id,
return_dataclass=True,
**twitter_kwargs)
tweets = None
if not tweets_response:
print('no response_json')
if tweets_response.meta and tweets_response.meta.result_count == 0:
print('no results')
print(tweets_response)
if not tweets_response.includes:
print(tweets_response)
print('no tweets_response.includes')
if tweets_response.errors:
print('profile get_user_timeline errors:')
print(tweets_response.errors)
pagination_token=twitter_kwargs.get('pagination_token')
cache_tweets_response(tweets_response, 'user_feed', auth_user_id, user_id=user_id, pagination_token=pagination_token)
ts = int(time.time() * 1000)
with open(f'{DATA_DIR}/cache/tl_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
f.write(json.dumps(cleandict(asdict(tweets_response))))
if tweets_response.data:
tweets = list(map(lambda t: tweet_model_dc_vm(tweets_response.includes, t, me), tweets_response.data))
next_token = tweets_response.meta.next_token
collection_page = CollectionPage(
id = user_id,
items = tweets,
next_token = next_token
)
return collection_page
def get_tweets_collection (content_ids, pagination_token=None, max_results=None):
"""
We might be able to have a generalizer in the content system as well...
If a source exposes a get many interface then use it. We want to avoid many singular fetches.
"""
return []
def get_user (user_id, me=None) -> Optional[h_vm.FeedServiceUser]:
users = get_users([user_id], me=me)
if users:
return users[0]
def get_users (content_ids, me=None, pagination_token=None) -> Optional[List[h_vm.FeedServiceUser]]:
"""
"""
if me:
twitter_user = session.get(me)
token = twitter_user['access_token']
auth_user_id = twitter_user['id']
else:
token = os.environ.get('BEARER_TOKEN')
auth_user_id = None
social_graph = TwitterApiV2SocialGraph(token)
users_response = social_graph.get_users(content_ids, return_dataclass=True)
if not len(users_response.data):
return
cache_users_response(users_response, f'users', auth_user_id, pagination_token=pagination_token)
users = list(map(user_model_dc, users_response.data))
return users
def get_home_feed (user_id, me, **query_kwargs):
twitter_user = session.get(me)
token = twitter_user['access_token']
auth_user_id = twitter_user['id']
tweet_source = ApiV2TweetSource(token)
response = tweet_source.get_home_timeline(user_id, **query_kwargs)
#print(json.dumps(response_json, indent=2))
pagination_token = query_kwargs.get('pagination_token')
cache_tweets_response(response, 'home_feed', auth_user_id, user_id=user_id, pagination_token=pagination_token)
includes = response.includes
tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), response.data))
next_token = response.meta.next_token
collection_page = CollectionPage(
id = user_id,
items = tweets,
next_token = next_token
)
return collection_page
def get_author_threads (user_id):
"""
Placeholder implementation where we can manually add threads to a collection,
but ultimately we will query a local Tweet DB that gets populated through various means.
Once we store Tweets we can easily query this.
We can filter by author_id,conversation_id order by in_reply_to_tweet_id,id
"""
return get_content(f'collection:twitter.threads_{user_id}')
def get_tweet_replies (conversation_id, in_reply_to_id=None, pagination_token=None, max_results=None, author_id=None):
"""
New function, not used yet
"""
tweet_source = ApiV2TweetSource(token)
auth_user_id = None
only_replies = view == 'replies'
tweets = []
skip_embed_replies = False
if view == 'replies':
replies_response = tweet_source.get_thread(in_reply_to_id,
only_replies=True,
pagination_token = pagination_token,
return_dataclass=True)
elif view == 'thread':
skip_embed_replies = True
replies_response = tweet_source.get_thread(conversation_id,
only_replies=False,
author_id=author_id,
pagination_token = pagination_token,
return_dataclass=True)
elif view == 'conversation':
replies_response = tweet_source.get_thread(conversation_id,
only_replies=False,
pagination_token = pagination_token,
return_dataclass=True)
elif view == 'tweet':
replies_response = None
next_token = None
#print("conversation meta:")
#print(json.dumps(tweets_response.get('meta'), indent=2))
if replies_response and replies_response.meta and replies_response.meta.result_count:
cache_tweets_response(replies_response, 'tweet_replies', auth_user_id, user_id=user_id, pagination_token=pagination_token)
includes = replies_response.includes
tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me, expand_path=request.args.get('expand'), reply_depth=1), replies_response.data)) + tweets
next_token = replies_response.meta.next_token
# this method is OK except it doesn't work if there are no replies.
#tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me))
#related_tweets = [] # derived from includes
tweets.reverse()
query = {}
if next_token:
query = {
**query,
# FIXME only_replies
'next_data_url': url_for('.get_tweet2_html', tweet_id=tweet_id, pagination_token=next_token, only_replies = '1' if only_replies else '0', author_id = tweets[0].author_id),
'next_page_url': url_for('.get_tweet2_html', tweet_id=tweet_id, view=view, pagination_token=next_token)
}
user = {
}
if view == 'replies':
tweet = tweets[0]
if tweet.id == '1608510741941989378':
unreplied = [
UnrepliedSection(
description = "Not clear what GS is still.",
span = (40, 80)
)
]
tweet = replace(tweet,
unreplied = unreplied
)
expand_parts = request.args.get('expand')
if expand_parts:
expand_parts = expand_parts.split(',')
def reply_to_thread_item (fi):
nonlocal expand_parts
if fi.id == '1609714342211244038':
print(f'reply_to_thread_item id={fi.id}')
unreplied = [
UnrepliedSection(
description = "Is there proof of this claim?",
span = (40, 80)
)
]
fi = replace(fi,
unreplied = unreplied
)
children = None
if expand_parts and len(expand_parts) and fi.id == expand_parts[0]:
expand_parts = expand_parts[1:]
print(f'getting expanded replied for tweet={fi.id}')
expanded_replies_response = tweet_source.get_thread(fi.id,
only_replies=True,
return_dataclass=True)
if expanded_replies_response.data:
print('we got expanded responses data')
children = list(map(lambda t: tweet_model_dc_vm(expanded_replies_response.includes, t, g.me, expand_path=request.args.get('expand'), reply_depth=1), expanded_replies_response.data))
children = list(map(reply_to_thread_item, children))
return ThreadItem(feed_item=fi, children=children)
children = list(map(reply_to_thread_item, tweets[1:]))
root = ThreadItem(
feed_item = tweet,
children = children
)
return render_template('tweet-thread.html', user = user, root = root, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info)
else:
return render_template(f'tweet-collection{theme_variant}.html', user = user, tweets = tweets, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info)
def get_following_users (user_id, me=None, max_results=1000, pagination_token=None):
if me:
twitter_user = session.get(me)
token = twitter_user['access_token']
auth_user_id = twitter_user['id']
else:
token = os.environ.get('BEARER_TOKEN')
auth_user_id = None
social_source = TwitterApiV2SocialGraph(token)
following_resp = social_source.get_following(user_id,
max_results=max_results, pagination_token=pagination_token, return_dataclass=True)
cache_users_response(following_resp, 'following', auth_user_id, user_id = user_id, pagination_token=pagination_token)
ts = int(time.time() * 1000)
with open(f'{DATA_DIR}/cache/following_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
f.write(json.dumps(cleandict(asdict(following_resp))))
#print(following_resp)
#run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': following_resp})
#following = list(map(lambda f: f['id'], following_resp.get('data')))
following = list(map(user_model_dc, following_resp.data))
total_count = following_resp.meta.get('result_count')
next_token = following_resp.meta.get('next_token')
collection_page = CollectionPage(
id = user_id,
items = following,
total_count = total_count,
next_token = next_token
)
return collection_page
def get_followers_user (user_id, me=None, max_results=1000, pagination_token=None):
if me:
twitter_user = session.get(me)
token = twitter_user['access_token']
auth_user_id = twitter_user['id']
else:
token = os.environ.get('BEARER_TOKEN')
auth_user_id = None
use_cache = False # this concept is broken for now
if use_cache: # this concept is broken for now
print(f'using cache for user {user_id}: {use_cache}')
with open(f'.data/cache/followers_{user_id}_{pagination_token}_{use_cache}.json', 'rt') as f:
response_json = json.load(f)
else:
social_source = TwitterApiV2SocialGraph(token)
followers_resp = social_source.get_followers(user_id, max_results=max_results, pagination_token=pagination_token, return_dataclass=True)
ts = int(time.time() * 1000)
print(f'followers cache for {user_id}: {ts}')
cache_users_response(followers_resp, 'followers', auth_user_id, user_id = user_id, pagination_token=pagination_token)
with open(f'{DATA_DIR}/cache/followers_{user_id}_{ts}.json', 'wt') as f:
json.dump(cleandict(asdict(followers_resp)), f, indent=2)
#print(followers_resp)
#run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': followers_resp})
#followers = list(map(lambda f: f['id'], followers_resp.get('data')))
followers = followers_resp.data
followers = list(map(user_model_dc, followers))
followers = list(map(user_model_dc, followers_resp.data))
total_count = followers_resp.meta.get('result_count')
next_token = followers_resp.meta.get('next_token')
collection_page = CollectionPage(
id = user_id,
items = followers,
total_count = total_count,
next_token = next_token
)
return collection_page
def register_content_sources ():
init_cache_db()
register_content_source('twitter:tweets', get_tweets_collection, id_pattern='')
register_content_source('twitter:tweet:', get_tweet_item, id_pattern='(?P\d+)')
register_content_source('twitter:tweet:', get_tweet_embed, id_pattern='(?P\d+)')
register_content_source('twitter:bookmarks:', get_bookmarks_feed, id_pattern='(?P\d+)')
register_content_source('twitter:feed:user:', get_user_feed, id_pattern='(?P\d+)')
register_content_source('twitter:user:', get_user, id_pattern='(?P\d+)')
register_content_source('twitter:users', get_users, id_pattern='')
register_content_source('twitter:feed:reverse_chronological:user:', get_home_feed, id_pattern='(?P\d+)')
register_content_source('twitter:tweets:replies:', get_tweet_replies, id_pattern='(?P\d+)')
register_content_source('twitter:following:users:', get_following_users, id_pattern='(?P\d+)')
register_content_source('twitter:followers:user:', get_followers_user, id_pattern='(?P\d+)')
register_content_source('twitter:threads:user:', get_author_threads, id_pattern='(?P\d+)')