""" This translates from the Tweet Source and Twitter v2 types Into ViewModel types such as FeedItem And the rest of the Taxonomy. """ from dataclasses import asdict from typing import List, Optional import dacite import os from flask import session, g, request import time from datetime import datetime, timezone import json import sqlite3 from twitter_v2.api import ApiV2TweetSource, TwitterApiV2SocialGraph, ApiV2ConversationSource import twitter_v2.types as tv2_types import hogumathi_app.view_model as h_vm from hogumathi_app.view_model import CollectionPage, cleandict from hogumathi_app.content_system import register_content_source, get_content, register_hook from .view_model import tweet_model_dc_vm, user_model_dc DATA_DIR='.data' CACHE_PATH = f'{DATA_DIR}/twitter_v2_cache.db' def init_cache_db (): db = sqlite3.connect(CACHE_PATH) cur = db.cursor() table_exists = cur.execute(f"SELECT count(*) FROM sqlite_master WHERE type='table' AND name='tweet'").fetchone()[0] if not table_exists: cur.execute(""" create table query ( created_at timestamp, user_id text, last_accessed_at timestamp, next_token text, query_type text, auth_user_id text ) """) cur.execute(""" create table tweet ( id text, accessed_at timestamp, query_id int, data text, created_at timestamp, unique(id, query_id) ) """) cur.execute(""" create table user ( id text, accessed_at timestamp, query_id int, data text, unique(id, query_id) ) """) cur.execute(""" create table medium ( id text, accessed_at timestamp, query_id int, data text, unique(id, query_id) ) """) cur.connection.commit() print(f'--- created {CACHE_PATH}') cur.close() def cache_tweets_response (response_tweets, query_type, auth_user_id, user_id = None, pagination_token=None, ts=None): """ In bookmarks I observed that the same next_token is returned even with distinct new queries started. So in the case of abandoned paginations, we can end up with duplicate next_token records, meaning we could update the wrong query_id, having downstream timestamp effects. """ includes = response_tweets.includes tweets = response_tweets.data or [] users = includes and includes.users or [] media = includes and includes.media or [] ref_tweets = includes and includes.tweets or [] if response_tweets.meta and 'next_token' in response_tweets.meta: next_token = response_tweets.meta.next_token else: next_token = None db = sqlite3.connect(CACHE_PATH) cur = db.cursor() # SQLite is naive by default, so make sure this is UTC. now = datetime.now(timezone.utc) if ts: now = ts if not pagination_token: cur.execute(""" insert into query ( created_at, last_accessed_at, user_id, next_token, query_type, auth_user_id ) values ( ?,?,?,?,?,? ) """, [now, now, user_id, next_token, query_type, auth_user_id] ) query_id = cur.lastrowid else: query_id = cur.execute(""" select rowid from query where next_token = :next_token """, { 'next_token': pagination_token }).fetchone()[0] cur.execute(""" update query set last_accessed_at = :last_accessed_at, next_token = :next_token where rowid = :query_id """, { 'last_accessed_at': now, 'next_token': next_token, 'query_id': query_id }) for tweet in tweets: tweet_json = json.dumps(cleandict(asdict(tweet))) cur.execute(""" insert or ignore into tweet ( id, accessed_at, query_id, data, created_at ) values ( ?,?,?,?, ? ) """, # dateutil.parser.parse( tweet. created_at ) if error [ tweet.id, now, query_id, tweet_json, tweet.created_at ] ) # FIXME insert ref_tweets, mark in some way... is_ref = 1? sort_order = NULL? # sort_order begins with count having order prior to insert... for user in users: user_json = json.dumps(cleandict(asdict(user))) cur.execute(""" insert or ignore into user ( id, accessed_at, query_id, data ) values ( ?,?,?,? ) """, [ user.id, now, query_id, user_json ] ) for medium in media: medium_json = json.dumps(cleandict(asdict(medium))) cur.execute(""" insert or ignore into medium ( id, accessed_at, query_id, data ) values ( ?,?,?,? ) """, [ medium.media_key, now, query_id, medium_json ] ) cur.connection.commit() cur.close() def cache_users_response (response_users, query_type, auth_user_id, user_id = None, pagination_token=None, ts=None): users = response_users.data or [] next_token = response_users.meta and response_users.meta.get('next_token') db = sqlite3.connect(CACHE_PATH) cur = db.cursor() # SQLite is naive by default, so make sure this is UTC. now = None if ts: now = ts if not pagination_token: cur.execute(""" insert into query ( created_at, last_accessed_at, user_id, next_token, query_type, auth_user_id ) values ( ?,?,?,?,?,? ) """, [now, now, user_id, next_token, query_type, auth_user_id] ) query_id = cur.lastrowid else: query_id = cur.execute(""" select rowid from query where next_token = :next_token """, { 'next_token': pagination_token }).fetchone()[0] cur.execute(""" update query set last_accessed_at = :last_accessed_at, next_token = :next_token where rowid = :query_id """, { 'last_accessed_at': now, 'next_token': next_token, 'query_id': query_id }) for user in users: user_json = json.dumps(cleandict(asdict(user))) cur.execute(""" insert or ignore into user ( id, accessed_at, query_id, data ) values ( ?,?,?,? ) """, [ user.id, now, query_id, user_json ] ) cur.connection.commit() cur.close() def get_cached_collection_all_latest (auth_user_id, query_type = 'bookmarks', user_id=None): """ Across all queries of a type, return the latest distinct Tweet. This is good for bookmarks, likes or retweets where we remove them after a period upstream but still want to fetch anything we've ever added. Ideally we don't need this in the long term and instead auto sync new items to a local collection. "But for now." """ sql = """ select t.id, t.accessed_at, t.data from query q, tweet t where t.query_id = q.rowid and (q.auth_user_id in (:auth_user_id) or q.auth_user_id is null) and q.query_type = :query_type -- need to store author_id with tweets to get the user data out. -- could also make a join table tweet_user, like tweet_media; they won't change. --and u.query_id = q.rowid --and u.id == t.author_id group by t.id having t.accessed_at = max(t.accessed_at) order by t.id desc, t.accessed_at desc limit :limit """ params = { 'query_type': query_type, 'auth_user_id': auth_user_id, 'limit': 10 } db = sqlite3.connect(CACHE_PATH) cur = db.cursor() cached_tweets = cur.execute(sql, params).fetchall() tweets = list() user_ids = set() media_keys = set() referenced_tweet_ids = set() for row in cached_tweets: tweet_id, accessed_at, tweet_json = row tweet = dacite.from_dict( data_class=tv2_types.Tweet, data=json.loads(tweet_json) ) user_ids.add(tweet.author_id) for a in tweet.attachments: for mk in a.media_keys: media_keys.add(mk.media_key) #for tweet_ref in tweet.referenced_tweets: # referenced_tweet_ids.add(tweet_ref.id) # # FIXME we also need to reference these users. tweets.append(feed_item) feed_items = [] includes = { 'tweets': [], 'users': [], 'media': [] } for tweet in tweets: # FIXME return view models rather than raw tweets. need to join user and media, see query comment. #feed_item = tweet_model_dc_vm(tweet, ...) feed_item = tweet feed_items.append(feed_item) return feed_items def get_object_over_time (obj_type, obj_id, auth_user_id, only_count = False): """ Return all occurances of an object over time, Or if only_count is true then return just the count """ db = sqlite3.connect(CACHE_PATH) cur = db.cursor() if only_count: fields = 'count(*)' else: fields = 't.*' results = cur.execute(f""" select {fields} from {obj_type} t, query q where t.id = :obj_id and q.rowid = t.query_id and (q.auth_user_id in (:auth_user_id) or q.auth_user_id is null) """, { 'obj_id': obj_id, 'auth_user_id': auth_user_id }).fetchall() if only_count: return results[0][0] else: return list(results) def get_query_gaps (auth_user_id, query_type = 'home_feed', min_gap_hours = 1.0, max_age_days = 21.0): sql = """ WITH ordered_tweets AS ( SELECT t.*, q.auth_user_id, (julianday(current_timestamp) - julianday(t.created_at)) as row_age_days, ROW_NUMBER() OVER (ORDER BY t.created_at asc) rn FROM tweet t JOIN query q on q.rowid = t.query_id WHERE q.query_type = :QUERY_TYPE AND q.auth_user_id = :AUTH_USER_ID AND row_age_days < :MAX_AGE_DAYS ) SELECT o1.id since_id, o1.created_at start_time, o2.id until_id, o2.created_at end_time, --CAST(strftime('%s', o2.created_at) as integer) - CAST(strftime('%s', o1.created_at) as integer) gap_seconds2, --(julianday(o2.created_at) - julianday(o1.created_at)) * 86400 gap_seconds, (julianday(o2.created_at) - julianday(o1.created_at)) * 24 gap_hours FROM ordered_tweets o1 JOIN ordered_tweets o2 ON ( o1.rn + 1 = o2.rn ) WHERE gap_hours >= :MIN_GAP_HOURS order by start_time desc """ params = dict( QUERY_TYPE = query_type, AUTH_USER_ID = auth_user_id, MAX_AGE_DAYS = max_age_days, MIN_GAP_HOURS = min_gap_hours ) db = sqlite3.connect(CACHE_PATH) cur = db.cursor() cur.row_factory = sqlite3.Row results = cur.execute(sql, params).fetchall() cur.close() rows = list(map(dict, results)) return rows def get_tweet_item (tweet_id, me=None): if me: twitter_user = session.get(me) token = twitter_user['access_token'] else: token = os.environ.get('BEARER_TOKEN') tweet_source = ApiV2TweetSource(token) tweets_response = tweet_source.get_tweet(tweet_id, return_dataclass=True) #print(response_json) if tweets_response.errors: # types: # https://api.twitter.com/2/problems/not-authorized-for-resource (blocked or suspended) # https://api.twitter.com/2/problems/resource-not-found (deleted) #print(response_json.get('errors')) for err in tweets_response.errors: if not 'type' in err: print('unknown error type: ' + str(err)) elif err['type'] == 'https://api.twitter.com/2/problems/not-authorized-for-resource': print('blocked or suspended tweet: ' + err['value']) elif err['type'] == 'https://api.twitter.com/2/problems/resource-not-found': print('deleted tweet: ' + err['value']) else: print('unknown error') print(json.dumps(err, indent=2)) if not tweets_response.data: return includes = tweets_response.includes tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), tweets_response.data)) collection_page = CollectionPage( id = tweet_id, items = tweets, next_token = None # Fixme ) return collection_page def tweet_embed_template (tweet_id): features = '{"tfw_timeline_list":{"bucket":[],"version":null},"tfw_follower_count_sunset":{"bucket":true,"version":null},"tfw_tweet_edit_backend":{"bucket":"on","version":null},"tfw_refsrc_session":{"bucket":"on","version":null},"tfw_mixed_media_15897":{"bucket":"treatment","version":null},"tfw_experiments_cookie_expiration":{"bucket":1209600,"version":null},"tfw_duplicate_scribes_to_settings":{"bucket":"on","version":null},"tfw_video_hls_dynamic_manifests_15082":{"bucket":"true_bitrate","version":null},"tfw_legacy_timeline_sunset":{"bucket":true,"version":null},"tfw_tweet_edit_frontend":{"bucket":"on","version":null}}' # base64 + encode URI component features_encoded = 'eyJ0ZndfdGltZWxpbmVfbGlzdCI6eyJidWNrZXQiOltdLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X2ZvbGxvd2VyX2NvdW50X3N1bnNldCI6eyJidWNrZXQiOnRydWUsInZlcnNpb24iOm51bGx9LCJ0ZndfdHdlZXRfZWRpdF9iYWNrZW5kIjp7ImJ1Y2tldCI6Im9uIiwidmVyc2lvbiI6bnVsbH0sInRmd19yZWZzcmNfc2Vzc2lvbiI6eyJidWNrZXQiOiJvbiIsInZlcnNpb24iOm51bGx9LCJ0ZndfbWl4ZWRfbWVkaWFfMTU4OTciOnsiYnVja2V0IjoidHJlYXRtZW50IiwidmVyc2lvbiI6bnVsbH0sInRmd19leHBlcmltZW50c19jb29raWVfZXhwaXJhdGlvbiI6eyJidWNrZXQiOjEyMDk2MDAsInZlcnNpb24iOm51bGx9LCJ0ZndfZHVwbGljYXRlX3NjcmliZXNfdG9fc2V0dGluZ3MiOnsiYnVja2V0Ijoib24iLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X3ZpZGVvX2hsc19keW5hbWljX21hbmlmZXN0c18xNTA4MiI6eyJidWNrZXQiOiJ0cnVlX2JpdHJhdGUiLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X2xlZ2FjeV90aW1lbGluZV9zdW5zZXQiOnsiYnVja2V0Ijp0cnVlLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X3R3ZWV0X2VkaXRfZnJvbnRlbmQiOnsiYnVja2V0Ijoib24iLCJ2ZXJzaW9uIjpudWxsfX0%3D' origin = f"http%3A%2F%2Flocalhost%3A5004%2Ftwitter%2Ftweet2%2F{tweet_id}.html" width = 550 height = 755 theme = "dark" # or light hide_card = "false" hide_thread = "false" src = f"https://platform.twitter.com/embed/Tweet.html?dnt=true&features={features_encoded}&origin={origin}&frame=false&hideCard={hide_card}&hideThread={hide_thread}&id={tweet_id}&lang=en&theme=dark&width={width}px" html = f""" """ return html # https://developer.twitter.com/en/docs/twitter-for-websites/embedded-tweets/overview def get_tweet_embed (tweet_id): html = tweet_embed_template(tweet_id) post = h_vm.FeedItem( id = tweet_id, created_at = 'some time', display_name = 'Twitter User', handle = 'tweetuser', html = html ) return post def get_bookmarks_feed (user_id, pagination_token=None, max_results=10, me=None): if not me: me = g.get('me') or request.args.get('me') print(f'get_bookmarks_feed. me={me}') twitter_user = session.get( me ) if not twitter_user: return None token = twitter_user['access_token'] tweet_source = ApiV2TweetSource(token) response_tweets = tweet_source.get_bookmarks(user_id, pagination_token = pagination_token, return_dataclass=True, max_results=max_results ) #print(response_json) cache_tweets_response(response_tweets, 'bookmarks', user_id, user_id=user_id, pagination_token=pagination_token) if response_tweets.data: includes = response_tweets.includes tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), response_tweets.data)) next_token = response_tweets.meta.next_token else: print('no tweet data:') print(response_tweets) tweets = [] next_token = None query = {} if next_token: query = { **query, } user = { 'id': user_id } ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/bookmarks_{user_id}_{ts}_{pagination_token}.json', 'wt') as f: f.write(json.dumps(cleandict(asdict(response_tweets)))) collection_page = CollectionPage( id = user_id, # FIXME this should perhaps be the unresolved id items = tweets, next_token = next_token ) return collection_page def get_user_feed (user_id, me=None, **twitter_kwargs): if not me and 'me' in g: me = g.me if 'twitter_user' in g and g.twitter_user: token = g.twitter_user['access_token'] # issue: retweets don't come back if we request non_public_metrics is_me = False and user_id == g.twitter_user['id'] auth_user_id = g.twitter_user['id'] else: token = os.environ.get('BEARER_TOKEN') is_me = False auth_user_id = None tweet_source = ApiV2TweetSource(token) tweets_response = tweet_source.get_user_timeline(user_id, return_dataclass=True, **twitter_kwargs) tweets = None if not tweets_response: print('no response_json') if tweets_response.meta and tweets_response.meta.result_count == 0: print('no results') print(tweets_response) if not tweets_response.includes: print(tweets_response) print('no tweets_response.includes') if tweets_response.errors: print('profile get_user_timeline errors:') print(tweets_response.errors) tweets = tweets_response.data pagination_token=twitter_kwargs.get('pagination_token') # NOTE we need to calculate this before we cache the response. tweets_viewed = {} if auth_user_id and tweets: for tweet in tweets: tweet_viewed = get_object_over_time('tweet', tweet.id, auth_user_id, only_count=True) #tweet_viewed = len(tweet_over_time) tweets_viewed[tweet.id] = tweet_viewed cache_tweets_response(tweets_response, 'user_feed', auth_user_id, user_id=user_id, pagination_token=pagination_token) ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/tl_{user_id}_{ts}_{pagination_token}.json', 'wt') as f: f.write(json.dumps(cleandict(asdict(tweets_response)))) if tweets: tweets = list(map(lambda t: tweet_model_dc_vm(tweets_response.includes, t, me, tweets_viewed=tweets_viewed), tweets)) next_token = tweets_response.meta.next_token collection_page = CollectionPage( id = user_id, items = tweets, next_token = next_token ) return collection_page def get_tweets_collection (content_ids, pagination_token=None, max_results=None): """ We might be able to have a generalizer in the content system as well... If a source exposes a get many interface then use it. We want to avoid many singular fetches. """ return [] def get_user (user_id, me=None) -> Optional[h_vm.FeedServiceUser]: users = get_users([user_id], me=me) if users: return users[0] def get_users (content_ids, me=None, pagination_token=None) -> Optional[List[h_vm.FeedServiceUser]]: """ """ if me: twitter_user = session.get(me) token = twitter_user['access_token'] auth_user_id = twitter_user['id'] else: token = os.environ.get('BEARER_TOKEN') auth_user_id = None social_graph = TwitterApiV2SocialGraph(token) users_response = social_graph.get_users(content_ids, return_dataclass=True) if not users_response.data or not len(users_response.data): return cache_users_response(users_response, f'users', auth_user_id, pagination_token=pagination_token) users = list(map(user_model_dc, users_response.data)) return users def get_home_feed (user_id, me, **query_kwargs): twitter_user = session.get(me) token = twitter_user['access_token'] auth_user_id = twitter_user['id'] tweet_source = ApiV2TweetSource(token) response = tweet_source.get_home_timeline(user_id, **query_kwargs) #print(json.dumps(response_json, indent=2)) pagination_token = query_kwargs.get('pagination_token') # NOTE we need to calculate this before we cache the response. tweets_viewed = {} if response.data: for tweet in response.data: tweet_viewed = get_object_over_time('tweet', tweet.id, auth_user_id, only_count=True) #tweet_viewed = len(tweet_over_time) tweets_viewed[tweet.id] = tweet_viewed cache_tweets_response(response, 'home_feed', auth_user_id, user_id=user_id, pagination_token=pagination_token) includes = response.includes tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me, tweets_viewed=tweets_viewed), response.data)) next_token = response.meta.next_token else: print('no tweet data:') print(response) tweets = [] next_token = None collection_page = CollectionPage( id = user_id, items = tweets, next_token = next_token ) return collection_page def get_author_threads (user_id): """ Placeholder implementation where we can manually add threads to a collection, but ultimately we will query a local Tweet DB that gets populated through various means. Once we store Tweets we can easily query this. We can filter by author_id,conversation_id order by in_reply_to_tweet_id,id """ return get_content(f'collection:twitter.threads_{user_id}') def get_tweet_replies (conversation_id, in_reply_to_id=None, pagination_token=None, max_results=None, author_id=None): """ New function, not used yet """ tweet_source = ApiV2TweetSource(token) auth_user_id = None only_replies = view == 'replies' tweets = [] skip_embed_replies = False if view == 'replies': replies_response = tweet_source.get_thread(in_reply_to_id, only_replies=True, pagination_token = pagination_token, return_dataclass=True) elif view == 'thread': skip_embed_replies = True replies_response = tweet_source.get_thread(conversation_id, only_replies=False, author_id=author_id, pagination_token = pagination_token, return_dataclass=True) elif view == 'conversation': replies_response = tweet_source.get_thread(conversation_id, only_replies=False, pagination_token = pagination_token, return_dataclass=True) elif view == 'tweet': replies_response = None next_token = None #print("conversation meta:") #print(json.dumps(tweets_response.get('meta'), indent=2)) if replies_response and replies_response.meta and replies_response.meta.result_count: cache_tweets_response(replies_response, 'tweet_replies', auth_user_id, user_id=user_id, pagination_token=pagination_token) includes = replies_response.includes tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me, expand_path=request.args.get('expand'), reply_depth=1), replies_response.data)) + tweets next_token = replies_response.meta.next_token # this method is OK except it doesn't work if there are no replies. #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me)) #related_tweets = [] # derived from includes tweets.reverse() query = {} if next_token: query = { **query, # FIXME only_replies 'next_data_url': url_for('.get_tweet2_html', tweet_id=tweet_id, pagination_token=next_token, only_replies = '1' if only_replies else '0', author_id = tweets[0].author_id), 'next_page_url': url_for('.get_tweet2_html', tweet_id=tweet_id, view=view, pagination_token=next_token) } user = { } if view == 'replies': tweet = tweets[0] if tweet.id == '1608510741941989378': unreplied = [ UnrepliedSection( description = "Not clear what GS is still.", span = (40, 80) ) ] tweet = replace(tweet, unreplied = unreplied ) expand_parts = request.args.get('expand') if expand_parts: expand_parts = expand_parts.split(',') def reply_to_thread_item (fi): nonlocal expand_parts if fi.id == '1609714342211244038': print(f'reply_to_thread_item id={fi.id}') unreplied = [ UnrepliedSection( description = "Is there proof of this claim?", span = (40, 80) ) ] fi = replace(fi, unreplied = unreplied ) children = None if expand_parts and len(expand_parts) and fi.id == expand_parts[0]: expand_parts = expand_parts[1:] print(f'getting expanded replied for tweet={fi.id}') expanded_replies_response = tweet_source.get_thread(fi.id, only_replies=True, return_dataclass=True) if expanded_replies_response.data: print('we got expanded responses data') children = list(map(lambda t: tweet_model_dc_vm(expanded_replies_response.includes, t, g.me, expand_path=request.args.get('expand'), reply_depth=1), expanded_replies_response.data)) children = list(map(reply_to_thread_item, children)) return ThreadItem(feed_item=fi, children=children) children = list(map(reply_to_thread_item, tweets[1:])) root = ThreadItem( feed_item = tweet, children = children ) return render_template('tweet-thread.html', user = user, root = root, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info) else: return render_template(f'tweet-collection{theme_variant}.html', user = user, tweets = tweets, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info) def get_following_users (user_id, me=None, max_results=1000, pagination_token=None): if me: twitter_user = session.get(me) token = twitter_user['access_token'] auth_user_id = twitter_user['id'] else: token = os.environ.get('BEARER_TOKEN') auth_user_id = None social_source = TwitterApiV2SocialGraph(token) following_resp = social_source.get_following(user_id, max_results=max_results, pagination_token=pagination_token, return_dataclass=True) cache_users_response(following_resp, 'following', auth_user_id, user_id = user_id, pagination_token=pagination_token) ts = int(time.time() * 1000) with open(f'{DATA_DIR}/cache/following_{user_id}_{ts}_{pagination_token}.json', 'wt') as f: f.write(json.dumps(cleandict(asdict(following_resp)))) #print(following_resp) #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': following_resp}) #following = list(map(lambda f: f['id'], following_resp.get('data'))) following = list(map(user_model_dc, following_resp.data)) total_count = following_resp.meta.get('result_count') next_token = following_resp.meta.get('next_token') collection_page = CollectionPage( id = user_id, items = following, total_count = total_count, next_token = next_token ) return collection_page def get_followers_user (user_id, me=None, max_results=1000, pagination_token=None): if me: twitter_user = session.get(me) token = twitter_user['access_token'] auth_user_id = twitter_user['id'] else: token = os.environ.get('BEARER_TOKEN') auth_user_id = None use_cache = False # this concept is broken for now if use_cache: # this concept is broken for now print(f'using cache for user {user_id}: {use_cache}') with open(f'.data/cache/followers_{user_id}_{pagination_token}_{use_cache}.json', 'rt') as f: response_json = json.load(f) else: social_source = TwitterApiV2SocialGraph(token) followers_resp = social_source.get_followers(user_id, max_results=max_results, pagination_token=pagination_token, return_dataclass=True) ts = int(time.time() * 1000) print(f'followers cache for {user_id}: {ts}') cache_users_response(followers_resp, 'followers', auth_user_id, user_id = user_id, pagination_token=pagination_token) with open(f'{DATA_DIR}/cache/followers_{user_id}_{ts}.json', 'wt') as f: json.dump(cleandict(asdict(followers_resp)), f, indent=2) #print(followers_resp) #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': followers_resp}) #followers = list(map(lambda f: f['id'], followers_resp.get('data'))) followers = followers_resp.data followers = list(map(user_model_dc, followers)) followers = list(map(user_model_dc, followers_resp.data)) total_count = followers_resp.meta.get('result_count') next_token = followers_resp.meta.get('next_token') collection_page = CollectionPage( id = user_id, items = followers, total_count = total_count, next_token = next_token ) return collection_page def register_content_sources (): init_cache_db() register_content_source('twitter:tweets', get_tweets_collection, id_pattern='') register_content_source('twitter:tweet:', get_tweet_item, id_pattern='(?P\d+)') register_content_source('twitter:tweet:', get_tweet_embed, id_pattern='(?P\d+)') register_content_source('twitter:bookmarks:', get_bookmarks_feed, id_pattern='(?P\d+)') register_content_source('twitter:feed:user:', get_user_feed, id_pattern='(?P\d+)') register_content_source('twitter:user:', get_user, id_pattern='(?P\d+)') register_content_source('twitter:users', get_users, id_pattern='') register_content_source('twitter:feed:reverse_chronological:user:', get_home_feed, id_pattern='(?P\d+)') register_content_source('twitter:tweets:replies:', get_tweet_replies, id_pattern='(?P\d+)') register_content_source('twitter:following:users:', get_following_users, id_pattern='(?P\d+)') register_content_source('twitter:followers:user:', get_followers_user, id_pattern='(?P\d+)') register_content_source('twitter:threads:user:', get_author_threads, id_pattern='(?P\d+)')