"""
This translates from the Tweet Source and Twitter v2 types
Into ViewModel types such as FeedItem
And the rest of the Taxonomy.
"""
from dataclasses import asdict
from typing import List, Optional
import dacite
import os
from flask import session, g, request
import time
from datetime import datetime, timezone
import json
import sqlite3
from twitter_v2.api import ApiV2TweetSource, TwitterApiV2SocialGraph, ApiV2ConversationSource
import twitter_v2.types as tv2_types
import hogumathi_app.view_model as h_vm
from hogumathi_app.view_model import CollectionPage, cleandict
from hogumathi_app.content_system import register_content_source, get_content, register_hook
from .view_model import tweet_model_dc_vm, user_model_dc
DATA_DIR='.data'
CACHE_PATH = f'{DATA_DIR}/twitter_v2_cache.db'
def init_cache_db ():
db = sqlite3.connect(CACHE_PATH)
cur = db.cursor()
table_exists = cur.execute(f"SELECT count(*) FROM sqlite_master WHERE type='table' AND name='tweet'").fetchone()[0]
if not table_exists:
cur.execute("""
create table query (
created_at timestamp,
user_id text,
last_accessed_at timestamp,
next_token text,
query_type text,
auth_user_id text
)
""")
cur.execute("""
create table tweet (
id text,
accessed_at timestamp,
query_id int,
data text,
created_at timestamp,
unique(id, query_id)
)
""")
cur.execute("""
create table user (
id text,
accessed_at timestamp,
query_id int,
data text,
unique(id, query_id)
)
""")
cur.execute("""
create table medium (
id text,
accessed_at timestamp,
query_id int,
data text,
unique(id, query_id)
)
""")
cur.connection.commit()
print(f'--- created {CACHE_PATH}')
cur.close()
def cache_tweets_response (response_tweets, query_type, auth_user_id, user_id = None, pagination_token=None, ts=None):
"""
In bookmarks I observed that the same next_token is returned even with distinct new queries started.
So in the case of abandoned paginations, we can end up with duplicate next_token records,
meaning we could update the wrong query_id, having downstream timestamp effects.
"""
includes = response_tweets.includes
tweets = response_tweets.data or []
users = includes and includes.users or []
media = includes and includes.media or []
ref_tweets = includes and includes.tweets or []
if response_tweets.meta and 'next_token' in response_tweets.meta:
next_token = response_tweets.meta.next_token
else:
next_token = None
db = sqlite3.connect(CACHE_PATH)
cur = db.cursor()
# SQLite is naive by default, so make sure this is UTC.
now = datetime.now(timezone.utc)
if ts:
now = ts
if not pagination_token:
cur.execute("""
insert into query (
created_at,
last_accessed_at,
user_id,
next_token,
query_type,
auth_user_id
)
values (
?,?,?,?,?,?
)
""",
[now, now, user_id, next_token, query_type, auth_user_id]
)
query_id = cur.lastrowid
else:
query_id = cur.execute("""
select rowid from query
where next_token = :next_token
""",
{
'next_token': pagination_token
}).fetchone()[0]
cur.execute("""
update query set
last_accessed_at = :last_accessed_at,
next_token = :next_token
where rowid = :query_id
""",
{
'last_accessed_at': now,
'next_token': next_token,
'query_id': query_id
})
for tweet in tweets:
tweet_json = json.dumps(cleandict(asdict(tweet)))
cur.execute("""
insert or ignore into tweet (
id,
accessed_at,
query_id,
data,
created_at
)
values (
?,?,?,?, ?
)
""",
# dateutil.parser.parse( tweet. created_at ) if error
[ tweet.id, now, query_id, tweet_json, tweet.created_at ]
)
# FIXME insert ref_tweets, mark in some way... is_ref = 1? sort_order = NULL?
# sort_order begins with count having order prior to insert...
for user in users:
user_json = json.dumps(cleandict(asdict(user)))
cur.execute("""
insert or ignore into user (
id,
accessed_at,
query_id,
data
)
values (
?,?,?,?
)
""",
[ user.id, now, query_id, user_json ]
)
for medium in media:
medium_json = json.dumps(cleandict(asdict(medium)))
cur.execute("""
insert or ignore into medium (
id,
accessed_at,
query_id,
data
)
values (
?,?,?,?
)
""",
[ medium.media_key, now, query_id, medium_json ]
)
cur.connection.commit()
cur.close()
def cache_users_response (response_users, query_type, auth_user_id, user_id = None, pagination_token=None, ts=None):
users = response_users.data or []
next_token = response_users.meta and response_users.meta.get('next_token')
db = sqlite3.connect(CACHE_PATH)
cur = db.cursor()
# SQLite is naive by default, so make sure this is UTC.
now = None
if ts:
now = ts
if not pagination_token:
cur.execute("""
insert into query (
created_at,
last_accessed_at,
user_id,
next_token,
query_type,
auth_user_id
)
values (
?,?,?,?,?,?
)
""",
[now, now, user_id, next_token, query_type, auth_user_id]
)
query_id = cur.lastrowid
else:
query_id = cur.execute("""
select rowid from query
where next_token = :next_token
""",
{
'next_token': pagination_token
}).fetchone()[0]
cur.execute("""
update query set
last_accessed_at = :last_accessed_at,
next_token = :next_token
where rowid = :query_id
""",
{
'last_accessed_at': now,
'next_token': next_token,
'query_id': query_id
})
for user in users:
user_json = json.dumps(cleandict(asdict(user)))
cur.execute("""
insert or ignore into user (
id,
accessed_at,
query_id,
data
)
values (
?,?,?,?
)
""",
[ user.id, now, query_id, user_json ]
)
cur.connection.commit()
cur.close()
def get_cached_collection_all_latest (auth_user_id, query_type = 'bookmarks', user_id=None):
"""
Across all queries of a type, return the latest distinct Tweet.
This is good for bookmarks, likes or retweets where we remove them after a period upstream
but still want to fetch anything we've ever added.
Ideally we don't need this in the long term and instead auto sync new items to a local collection.
"But for now."
"""
sql = """
select t.id, t.accessed_at, t.data
from query q, tweet t
where
t.query_id = q.rowid
and (q.auth_user_id in (:auth_user_id) or q.auth_user_id is null)
and q.query_type = :query_type
-- need to store author_id with tweets to get the user data out.
-- could also make a join table tweet_user, like tweet_media; they won't change.
--and u.query_id = q.rowid
--and u.id == t.author_id
group by t.id
having t.accessed_at = max(t.accessed_at)
order by t.id desc, t.accessed_at desc
limit :limit
"""
params = {
'query_type': query_type,
'auth_user_id': auth_user_id,
'limit': 10
}
db = sqlite3.connect(CACHE_PATH)
cur = db.cursor()
cached_tweets = cur.execute(sql, params).fetchall()
tweets = list()
user_ids = set()
media_keys = set()
referenced_tweet_ids = set()
for row in cached_tweets:
tweet_id, accessed_at, tweet_json = row
tweet = dacite.from_dict( data_class=tv2_types.Tweet, data=json.loads(tweet_json) )
user_ids.add(tweet.author_id)
for a in tweet.attachments:
for mk in a.media_keys:
media_keys.add(mk.media_key)
#for tweet_ref in tweet.referenced_tweets:
# referenced_tweet_ids.add(tweet_ref.id)
# # FIXME we also need to reference these users.
tweets.append(feed_item)
feed_items = []
includes = {
'tweets': [],
'users': [],
'media': []
}
for tweet in tweets:
# FIXME return view models rather than raw tweets. need to join user and media, see query comment.
#feed_item = tweet_model_dc_vm(tweet, ...)
feed_item = tweet
feed_items.append(feed_item)
return feed_items
def get_object_over_time (obj_type, obj_id, auth_user_id, only_count = False):
"""
Return all occurances of an object over time,
Or if only_count is true then return just the count
"""
db = sqlite3.connect(CACHE_PATH)
cur = db.cursor()
if only_count:
fields = 'count(*)'
else:
fields = 't.*'
results = cur.execute(f"""
select {fields}
from {obj_type} t, query q
where
t.id = :obj_id
and q.rowid = t.query_id
and (q.auth_user_id in (:auth_user_id) or q.auth_user_id is null)
""",
{
'obj_id': obj_id,
'auth_user_id': auth_user_id
}).fetchall()
if only_count:
return results[0][0]
else:
return list(results)
def get_query_gaps (auth_user_id, query_type = 'home_feed', min_gap_hours = 1.0, max_age_days = 21.0):
sql = """
WITH ordered_tweets AS
(
SELECT
t.*,
q.auth_user_id,
(julianday(current_timestamp) - julianday(t.created_at)) as row_age_days,
ROW_NUMBER() OVER (ORDER BY t.created_at asc) rn
FROM tweet t
JOIN query q
on q.rowid = t.query_id
WHERE
q.query_type = :QUERY_TYPE
AND q.auth_user_id = :AUTH_USER_ID
AND row_age_days < :MAX_AGE_DAYS
)
SELECT
o1.id since_id,
o1.created_at start_time,
o2.id until_id,
o2.created_at end_time,
--CAST(strftime('%s', o2.created_at) as integer) - CAST(strftime('%s', o1.created_at) as integer) gap_seconds2,
--(julianday(o2.created_at) - julianday(o1.created_at)) * 86400 gap_seconds,
(julianday(o2.created_at) - julianday(o1.created_at)) * 24 gap_hours
FROM ordered_tweets o1
JOIN ordered_tweets o2
ON (
o1.rn + 1 = o2.rn
)
WHERE gap_hours >= :MIN_GAP_HOURS
order by start_time desc
"""
params = dict(
QUERY_TYPE = query_type,
AUTH_USER_ID = auth_user_id,
MAX_AGE_DAYS = max_age_days,
MIN_GAP_HOURS = min_gap_hours
)
db = sqlite3.connect(CACHE_PATH)
cur = db.cursor()
cur.row_factory = sqlite3.Row
results = cur.execute(sql, params).fetchall()
cur.close()
rows = list(map(dict, results))
return rows
def get_tweet_item (tweet_id, me=None):
if me:
twitter_user = session.get(me)
token = twitter_user['access_token']
else:
token = os.environ.get('BEARER_TOKEN')
tweet_source = ApiV2TweetSource(token)
tweets_response = tweet_source.get_tweet(tweet_id, return_dataclass=True)
#print(response_json)
if tweets_response.errors:
# types:
# https://api.twitter.com/2/problems/not-authorized-for-resource (blocked or suspended)
# https://api.twitter.com/2/problems/resource-not-found (deleted)
#print(response_json.get('errors'))
for err in tweets_response.errors:
if not 'type' in err:
print('unknown error type: ' + str(err))
elif err['type'] == 'https://api.twitter.com/2/problems/not-authorized-for-resource':
print('blocked or suspended tweet: ' + err['value'])
elif err['type'] == 'https://api.twitter.com/2/problems/resource-not-found':
print('deleted tweet: ' + err['value'])
else:
print('unknown error')
print(json.dumps(err, indent=2))
if not tweets_response.data:
return
includes = tweets_response.includes
tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), tweets_response.data))
collection_page = CollectionPage(
id = tweet_id,
items = tweets,
next_token = None # Fixme
)
return collection_page
def tweet_embed_template (tweet_id):
features = '{"tfw_timeline_list":{"bucket":[],"version":null},"tfw_follower_count_sunset":{"bucket":true,"version":null},"tfw_tweet_edit_backend":{"bucket":"on","version":null},"tfw_refsrc_session":{"bucket":"on","version":null},"tfw_mixed_media_15897":{"bucket":"treatment","version":null},"tfw_experiments_cookie_expiration":{"bucket":1209600,"version":null},"tfw_duplicate_scribes_to_settings":{"bucket":"on","version":null},"tfw_video_hls_dynamic_manifests_15082":{"bucket":"true_bitrate","version":null},"tfw_legacy_timeline_sunset":{"bucket":true,"version":null},"tfw_tweet_edit_frontend":{"bucket":"on","version":null}}'
# base64 + encode URI component
features_encoded = 'eyJ0ZndfdGltZWxpbmVfbGlzdCI6eyJidWNrZXQiOltdLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X2ZvbGxvd2VyX2NvdW50X3N1bnNldCI6eyJidWNrZXQiOnRydWUsInZlcnNpb24iOm51bGx9LCJ0ZndfdHdlZXRfZWRpdF9iYWNrZW5kIjp7ImJ1Y2tldCI6Im9uIiwidmVyc2lvbiI6bnVsbH0sInRmd19yZWZzcmNfc2Vzc2lvbiI6eyJidWNrZXQiOiJvbiIsInZlcnNpb24iOm51bGx9LCJ0ZndfbWl4ZWRfbWVkaWFfMTU4OTciOnsiYnVja2V0IjoidHJlYXRtZW50IiwidmVyc2lvbiI6bnVsbH0sInRmd19leHBlcmltZW50c19jb29raWVfZXhwaXJhdGlvbiI6eyJidWNrZXQiOjEyMDk2MDAsInZlcnNpb24iOm51bGx9LCJ0ZndfZHVwbGljYXRlX3NjcmliZXNfdG9fc2V0dGluZ3MiOnsiYnVja2V0Ijoib24iLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X3ZpZGVvX2hsc19keW5hbWljX21hbmlmZXN0c18xNTA4MiI6eyJidWNrZXQiOiJ0cnVlX2JpdHJhdGUiLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X2xlZ2FjeV90aW1lbGluZV9zdW5zZXQiOnsiYnVja2V0Ijp0cnVlLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X3R3ZWV0X2VkaXRfZnJvbnRlbmQiOnsiYnVja2V0Ijoib24iLCJ2ZXJzaW9uIjpudWxsfX0%3D'
origin = f"http%3A%2F%2Flocalhost%3A5004%2Ftwitter%2Ftweet2%2F{tweet_id}.html"
width = 550
height = 755
theme = "dark" # or light
hide_card = "false"
hide_thread = "false"
src = f"https://platform.twitter.com/embed/Tweet.html?dnt=true&features={features_encoded}&origin={origin}&frame=false&hideCard={hide_card}&hideThread={hide_thread}&id={tweet_id}&lang=en&theme=dark&width={width}px"
html = f"""
"""
return html
# https://developer.twitter.com/en/docs/twitter-for-websites/embedded-tweets/overview
def get_tweet_embed (tweet_id):
html = tweet_embed_template(tweet_id)
post = h_vm.FeedItem(
id = tweet_id,
created_at = 'some time',
display_name = 'Twitter User',
handle = 'tweetuser',
html = html
)
return post
def get_bookmarks_feed (user_id, pagination_token=None, max_results=10, me=None):
if not me:
me = g.get('me') or request.args.get('me')
print(f'get_bookmarks_feed. me={me}')
twitter_user = session.get( me )
if not twitter_user:
return None
token = twitter_user['access_token']
tweet_source = ApiV2TweetSource(token)
response_tweets = tweet_source.get_bookmarks(user_id,
pagination_token = pagination_token,
return_dataclass=True,
max_results=max_results
)
#print(response_json)
cache_tweets_response(response_tweets, 'bookmarks', user_id, user_id=user_id, pagination_token=pagination_token)
if response_tweets.data:
includes = response_tweets.includes
tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), response_tweets.data))
next_token = response_tweets.meta.next_token
else:
print('no tweet data:')
print(response_tweets)
tweets = []
next_token = None
query = {}
if next_token:
query = {
**query,
}
user = {
'id': user_id
}
ts = int(time.time() * 1000)
with open(f'{DATA_DIR}/cache/bookmarks_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
f.write(json.dumps(cleandict(asdict(response_tweets))))
collection_page = CollectionPage(
id = user_id, # FIXME this should perhaps be the unresolved id
items = tweets,
next_token = next_token
)
return collection_page
def get_user_feed (user_id, me=None, **twitter_kwargs):
if not me and 'me' in g:
me = g.me
if 'twitter_user' in g and g.twitter_user:
token = g.twitter_user['access_token']
# issue: retweets don't come back if we request non_public_metrics
is_me = False and user_id == g.twitter_user['id']
auth_user_id = g.twitter_user['id']
else:
token = os.environ.get('BEARER_TOKEN')
is_me = False
auth_user_id = None
tweet_source = ApiV2TweetSource(token)
tweets_response = tweet_source.get_user_timeline(user_id,
return_dataclass=True,
**twitter_kwargs)
tweets = None
if not tweets_response:
print('no response_json')
if tweets_response.meta and tweets_response.meta.result_count == 0:
print('no results')
print(tweets_response)
if not tweets_response.includes:
print(tweets_response)
print('no tweets_response.includes')
if tweets_response.errors:
print('profile get_user_timeline errors:')
print(tweets_response.errors)
tweets = tweets_response.data
pagination_token=twitter_kwargs.get('pagination_token')
# NOTE we need to calculate this before we cache the response.
tweets_viewed = {}
if auth_user_id and tweets:
for tweet in tweets:
tweet_viewed = get_object_over_time('tweet', tweet.id, auth_user_id, only_count=True)
#tweet_viewed = len(tweet_over_time)
tweets_viewed[tweet.id] = tweet_viewed
cache_tweets_response(tweets_response, 'user_feed', auth_user_id, user_id=user_id, pagination_token=pagination_token)
ts = int(time.time() * 1000)
with open(f'{DATA_DIR}/cache/tl_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
f.write(json.dumps(cleandict(asdict(tweets_response))))
if tweets:
tweets = list(map(lambda t: tweet_model_dc_vm(tweets_response.includes, t, me, tweets_viewed=tweets_viewed), tweets))
next_token = tweets_response.meta.next_token
collection_page = CollectionPage(
id = user_id,
items = tweets,
next_token = next_token
)
return collection_page
def get_tweets_collection (content_ids, pagination_token=None, max_results=None):
"""
We might be able to have a generalizer in the content system as well...
If a source exposes a get many interface then use it. We want to avoid many singular fetches.
"""
return []
def get_user (user_id, me=None) -> Optional[h_vm.FeedServiceUser]:
users = get_users([user_id], me=me)
if users:
return users[0]
def get_users (content_ids, me=None, pagination_token=None) -> Optional[List[h_vm.FeedServiceUser]]:
"""
"""
if me:
twitter_user = session.get(me)
token = twitter_user['access_token']
auth_user_id = twitter_user['id']
else:
token = os.environ.get('BEARER_TOKEN')
auth_user_id = None
social_graph = TwitterApiV2SocialGraph(token)
users_response = social_graph.get_users(content_ids, return_dataclass=True)
if not users_response.data or not len(users_response.data):
return
cache_users_response(users_response, f'users', auth_user_id, pagination_token=pagination_token)
users = list(map(user_model_dc, users_response.data))
return users
def get_home_feed (user_id, me, **query_kwargs):
twitter_user = session.get(me)
token = twitter_user['access_token']
auth_user_id = twitter_user['id']
tweet_source = ApiV2TweetSource(token)
response = tweet_source.get_home_timeline(user_id, **query_kwargs)
#print(json.dumps(response_json, indent=2))
pagination_token = query_kwargs.get('pagination_token')
# NOTE we need to calculate this before we cache the response.
tweets_viewed = {}
if response.data:
for tweet in response.data:
tweet_viewed = get_object_over_time('tweet', tweet.id, auth_user_id, only_count=True)
#tweet_viewed = len(tweet_over_time)
tweets_viewed[tweet.id] = tweet_viewed
cache_tweets_response(response, 'home_feed', auth_user_id, user_id=user_id, pagination_token=pagination_token)
includes = response.includes
tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me, tweets_viewed=tweets_viewed), response.data))
next_token = response.meta.next_token
else:
print('no tweet data:')
print(response)
tweets = []
next_token = None
collection_page = CollectionPage(
id = user_id,
items = tweets,
next_token = next_token
)
return collection_page
def get_author_threads (user_id):
"""
Placeholder implementation where we can manually add threads to a collection,
but ultimately we will query a local Tweet DB that gets populated through various means.
Once we store Tweets we can easily query this.
We can filter by author_id,conversation_id order by in_reply_to_tweet_id,id
"""
return get_content(f'collection:twitter.threads_{user_id}')
def get_tweet_replies (conversation_id, in_reply_to_id=None, pagination_token=None, max_results=None, author_id=None):
"""
New function, not used yet
"""
tweet_source = ApiV2TweetSource(token)
auth_user_id = None
only_replies = view == 'replies'
tweets = []
skip_embed_replies = False
if view == 'replies':
replies_response = tweet_source.get_thread(in_reply_to_id,
only_replies=True,
pagination_token = pagination_token,
return_dataclass=True)
elif view == 'thread':
skip_embed_replies = True
replies_response = tweet_source.get_thread(conversation_id,
only_replies=False,
author_id=author_id,
pagination_token = pagination_token,
return_dataclass=True)
elif view == 'conversation':
replies_response = tweet_source.get_thread(conversation_id,
only_replies=False,
pagination_token = pagination_token,
return_dataclass=True)
elif view == 'tweet':
replies_response = None
next_token = None
#print("conversation meta:")
#print(json.dumps(tweets_response.get('meta'), indent=2))
if replies_response and replies_response.meta and replies_response.meta.result_count:
cache_tweets_response(replies_response, 'tweet_replies', auth_user_id, user_id=user_id, pagination_token=pagination_token)
includes = replies_response.includes
tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me, expand_path=request.args.get('expand'), reply_depth=1), replies_response.data)) + tweets
next_token = replies_response.meta.next_token
# this method is OK except it doesn't work if there are no replies.
#tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me))
#related_tweets = [] # derived from includes
tweets.reverse()
query = {}
if next_token:
query = {
**query,
# FIXME only_replies
'next_data_url': url_for('.get_tweet2_html', tweet_id=tweet_id, pagination_token=next_token, only_replies = '1' if only_replies else '0', author_id = tweets[0].author_id),
'next_page_url': url_for('.get_tweet2_html', tweet_id=tweet_id, view=view, pagination_token=next_token)
}
user = {
}
if view == 'replies':
tweet = tweets[0]
if tweet.id == '1608510741941989378':
unreplied = [
UnrepliedSection(
description = "Not clear what GS is still.",
span = (40, 80)
)
]
tweet = replace(tweet,
unreplied = unreplied
)
expand_parts = request.args.get('expand')
if expand_parts:
expand_parts = expand_parts.split(',')
def reply_to_thread_item (fi):
nonlocal expand_parts
if fi.id == '1609714342211244038':
print(f'reply_to_thread_item id={fi.id}')
unreplied = [
UnrepliedSection(
description = "Is there proof of this claim?",
span = (40, 80)
)
]
fi = replace(fi,
unreplied = unreplied
)
children = None
if expand_parts and len(expand_parts) and fi.id == expand_parts[0]:
expand_parts = expand_parts[1:]
print(f'getting expanded replied for tweet={fi.id}')
expanded_replies_response = tweet_source.get_thread(fi.id,
only_replies=True,
return_dataclass=True)
if expanded_replies_response.data:
print('we got expanded responses data')
children = list(map(lambda t: tweet_model_dc_vm(expanded_replies_response.includes, t, g.me, expand_path=request.args.get('expand'), reply_depth=1), expanded_replies_response.data))
children = list(map(reply_to_thread_item, children))
return ThreadItem(feed_item=fi, children=children)
children = list(map(reply_to_thread_item, tweets[1:]))
root = ThreadItem(
feed_item = tweet,
children = children
)
return render_template('tweet-thread.html', user = user, root = root, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info)
else:
return render_template(f'tweet-collection{theme_variant}.html', user = user, tweets = tweets, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info)
def get_following_users (user_id, me=None, max_results=1000, pagination_token=None):
if me:
twitter_user = session.get(me)
token = twitter_user['access_token']
auth_user_id = twitter_user['id']
else:
token = os.environ.get('BEARER_TOKEN')
auth_user_id = None
social_source = TwitterApiV2SocialGraph(token)
following_resp = social_source.get_following(user_id,
max_results=max_results, pagination_token=pagination_token, return_dataclass=True)
cache_users_response(following_resp, 'following', auth_user_id, user_id = user_id, pagination_token=pagination_token)
ts = int(time.time() * 1000)
with open(f'{DATA_DIR}/cache/following_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
f.write(json.dumps(cleandict(asdict(following_resp))))
#print(following_resp)
#run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': following_resp})
#following = list(map(lambda f: f['id'], following_resp.get('data')))
following = list(map(user_model_dc, following_resp.data))
total_count = following_resp.meta.get('result_count')
next_token = following_resp.meta.get('next_token')
collection_page = CollectionPage(
id = user_id,
items = following,
total_count = total_count,
next_token = next_token
)
return collection_page
def get_followers_user (user_id, me=None, max_results=1000, pagination_token=None):
if me:
twitter_user = session.get(me)
token = twitter_user['access_token']
auth_user_id = twitter_user['id']
else:
token = os.environ.get('BEARER_TOKEN')
auth_user_id = None
use_cache = False # this concept is broken for now
if use_cache: # this concept is broken for now
print(f'using cache for user {user_id}: {use_cache}')
with open(f'.data/cache/followers_{user_id}_{pagination_token}_{use_cache}.json', 'rt') as f:
response_json = json.load(f)
else:
social_source = TwitterApiV2SocialGraph(token)
followers_resp = social_source.get_followers(user_id, max_results=max_results, pagination_token=pagination_token, return_dataclass=True)
ts = int(time.time() * 1000)
print(f'followers cache for {user_id}: {ts}')
cache_users_response(followers_resp, 'followers', auth_user_id, user_id = user_id, pagination_token=pagination_token)
with open(f'{DATA_DIR}/cache/followers_{user_id}_{ts}.json', 'wt') as f:
json.dump(cleandict(asdict(followers_resp)), f, indent=2)
#print(followers_resp)
#run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': followers_resp})
#followers = list(map(lambda f: f['id'], followers_resp.get('data')))
followers = followers_resp.data
followers = list(map(user_model_dc, followers))
followers = list(map(user_model_dc, followers_resp.data))
total_count = followers_resp.meta.get('result_count')
next_token = followers_resp.meta.get('next_token')
collection_page = CollectionPage(
id = user_id,
items = followers,
total_count = total_count,
next_token = next_token
)
return collection_page
def register_content_sources ():
init_cache_db()
register_content_source('twitter:tweets', get_tweets_collection, id_pattern='')
register_content_source('twitter:tweet:', get_tweet_item, id_pattern='(?P\d+)')
register_content_source('twitter:tweet:', get_tweet_embed, id_pattern='(?P\d+)')
register_content_source('twitter:bookmarks:', get_bookmarks_feed, id_pattern='(?P\d+)')
register_content_source('twitter:feed:user:', get_user_feed, id_pattern='(?P\d+)')
register_content_source('twitter:user:', get_user, id_pattern='(?P\d+)')
register_content_source('twitter:users', get_users, id_pattern='')
register_content_source('twitter:feed:reverse_chronological:user:', get_home_feed, id_pattern='(?P\d+)')
register_content_source('twitter:tweets:replies:', get_tweet_replies, id_pattern='(?P\d+)')
register_content_source('twitter:following:users:', get_following_users, id_pattern='(?P\d+)')
register_content_source('twitter:followers:user:', get_followers_user, id_pattern='(?P\d+)')
register_content_source('twitter:threads:user:', get_author_threads, id_pattern='(?P\d+)')