|
@@ -9,6 +9,8 @@ And the rest of the Taxonomy.
|
|
|
|
|
|
from dataclasses import asdict
|
|
|
from typing import List, Optional
|
|
|
+import dacite
|
|
|
+
|
|
|
import os
|
|
|
from flask import session, g, request
|
|
|
import time
|
|
@@ -17,6 +19,8 @@ import json
|
|
|
import sqlite3
|
|
|
|
|
|
from twitter_v2.api import ApiV2TweetSource, TwitterApiV2SocialGraph, ApiV2ConversationSource
|
|
|
+import twitter_v2.types as tv2_types
|
|
|
+
|
|
|
|
|
|
import hogumathi_app.view_model as h_vm
|
|
|
|
|
@@ -55,6 +59,7 @@ def init_cache_db ():
|
|
|
accessed_at timestamp,
|
|
|
query_id int,
|
|
|
data text,
|
|
|
+ created_at timestamp,
|
|
|
unique(id, query_id)
|
|
|
)
|
|
|
""")
|
|
@@ -94,7 +99,12 @@ def cache_tweets_response (response_tweets, query_type, auth_user_id, user_id =
|
|
|
tweets = response_tweets.data or []
|
|
|
users = includes and includes.users or []
|
|
|
media = includes and includes.media or []
|
|
|
- next_token = response_tweets.meta.next_token
|
|
|
+ ref_tweets = includes and includes.tweets or []
|
|
|
+
|
|
|
+ if response_tweets.meta and 'next_token' in response_tweets.meta:
|
|
|
+ next_token = response_tweets.meta.next_token
|
|
|
+ else:
|
|
|
+ next_token = None
|
|
|
|
|
|
db = sqlite3.connect(CACHE_PATH)
|
|
|
cur = db.cursor()
|
|
@@ -151,15 +161,20 @@ def cache_tweets_response (response_tweets, query_type, auth_user_id, user_id =
|
|
|
id,
|
|
|
accessed_at,
|
|
|
query_id,
|
|
|
- data
|
|
|
+ data,
|
|
|
+ created_at
|
|
|
)
|
|
|
values (
|
|
|
- ?,?,?,?
|
|
|
+ ?,?,?,?, ?
|
|
|
)
|
|
|
""",
|
|
|
- [ tweet.id, now, query_id, tweet_json ]
|
|
|
+ # dateutil.parser.parse( tweet. created_at ) if error
|
|
|
+ [ tweet.id, now, query_id, tweet_json, tweet.created_at ]
|
|
|
)
|
|
|
|
|
|
+ # FIXME insert ref_tweets, mark in some way... is_ref = 1? sort_order = NULL?
|
|
|
+ # sort_order begins with count having order prior to insert...
|
|
|
+
|
|
|
for user in users:
|
|
|
user_json = json.dumps(cleandict(asdict(user)))
|
|
|
|
|
@@ -270,39 +285,178 @@ def cache_users_response (response_users, query_type, auth_user_id, user_id = No
|
|
|
cur.close()
|
|
|
|
|
|
|
|
|
-def get_cached_query (query_type, auth_user_id, user_id=None):
|
|
|
+def get_cached_collection_all_latest (auth_user_id, query_type = 'bookmarks', user_id=None):
|
|
|
+ """
|
|
|
+ Across all queries of a type, return the latest distinct Tweet.
|
|
|
+
|
|
|
+ This is good for bookmarks, likes or retweets where we remove them after a period upstream
|
|
|
+ but still want to fetch anything we've ever added.
|
|
|
+
|
|
|
+ Ideally we don't need this in the long term and instead auto sync new items to a local collection.
|
|
|
+ "But for now."
|
|
|
+ """
|
|
|
+
|
|
|
sql = """
|
|
|
- select * from query
|
|
|
+ select t.id, t.accessed_at, t.data
|
|
|
+ from query q, tweet t
|
|
|
where
|
|
|
- (auth_user_id in ('14520320') or auth_user_id is null)
|
|
|
- and query_type = 'bookmarks'
|
|
|
+ t.query_id = q.rowid
|
|
|
+ and (q.auth_user_id in (:auth_user_id) or q.auth_user_id is null)
|
|
|
+ and q.query_type = :query_type
|
|
|
+
|
|
|
+ -- need to store author_id with tweets to get the user data out.
|
|
|
+ -- could also make a join table tweet_user, like tweet_media; they won't change.
|
|
|
+
|
|
|
+ --and u.query_id = q.rowid
|
|
|
+ --and u.id == t.author_id
|
|
|
+
|
|
|
+ group by t.id
|
|
|
+ having t.accessed_at = max(t.accessed_at)
|
|
|
+
|
|
|
+ order by t.id desc, t.accessed_at desc
|
|
|
+ limit :limit
|
|
|
"""
|
|
|
- results = []
|
|
|
- next_token = None
|
|
|
+ params = {
|
|
|
+ 'query_type': query_type,
|
|
|
+ 'auth_user_id': auth_user_id,
|
|
|
+ 'limit': 10
|
|
|
+ }
|
|
|
+
|
|
|
+ db = sqlite3.connect(CACHE_PATH)
|
|
|
+ cur = db.cursor()
|
|
|
|
|
|
- return results, next_token
|
|
|
|
|
|
-def get_object_over_time (obj_type, obj_id, auth_user_id):
|
|
|
- cur = None
|
|
|
|
|
|
- results = cur.execute(f"""
|
|
|
- --select id, count(*) c from tweet group by id having c > 1
|
|
|
+ cached_tweets = cur.execute(sql, params).fetchall()
|
|
|
+
|
|
|
+ tweets = list()
|
|
|
+ user_ids = set()
|
|
|
+ media_keys = set()
|
|
|
+ referenced_tweet_ids = set()
|
|
|
+
|
|
|
+ for row in cached_tweets:
|
|
|
+ tweet_id, accessed_at, tweet_json = row
|
|
|
+ tweet = dacite.from_dict( data_class=tv2_types.Tweet, data=json.loads(tweet_json) )
|
|
|
+
|
|
|
+ user_ids.add(tweet.author_id)
|
|
|
+
|
|
|
+ for a in tweet.attachments:
|
|
|
+ for mk in a.media_keys:
|
|
|
+ media_keys.add(mk.media_key)
|
|
|
+
|
|
|
+
|
|
|
+ #for tweet_ref in tweet.referenced_tweets:
|
|
|
+ # referenced_tweet_ids.add(tweet_ref.id)
|
|
|
+ # # FIXME we also need to reference these users.
|
|
|
+
|
|
|
+ tweets.append(feed_item)
|
|
|
+
|
|
|
+ feed_items = []
|
|
|
+
|
|
|
+ includes = {
|
|
|
+ 'tweets': [],
|
|
|
+ 'users': [],
|
|
|
+ 'media': []
|
|
|
+ }
|
|
|
+ for tweet in tweets:
|
|
|
+ # FIXME return view models rather than raw tweets. need to join user and media, see query comment.
|
|
|
+ #feed_item = tweet_model_dc_vm(tweet, ...)
|
|
|
+ feed_item = tweet
|
|
|
|
|
|
- select t.*
|
|
|
+ feed_items.append(feed_item)
|
|
|
+
|
|
|
+ return feed_items
|
|
|
+
|
|
|
+def get_object_over_time (obj_type, obj_id, auth_user_id, only_count = False):
|
|
|
+ """
|
|
|
+ Return all occurances of an object over time,
|
|
|
+ Or if only_count is true then return just the count
|
|
|
+ """
|
|
|
+ db = sqlite3.connect(CACHE_PATH)
|
|
|
+ cur = db.cursor()
|
|
|
+
|
|
|
+ if only_count:
|
|
|
+ fields = 'count(*)'
|
|
|
+ else:
|
|
|
+ fields = 't.*'
|
|
|
+
|
|
|
+ results = cur.execute(f"""
|
|
|
+ select {fields}
|
|
|
from {obj_type} t, query q
|
|
|
where
|
|
|
t.id = :obj_id
|
|
|
and q.rowid = t.query_id
|
|
|
and (q.auth_user_id in (:auth_user_id) or q.auth_user_id is null)
|
|
|
+
|
|
|
""",
|
|
|
{
|
|
|
'obj_id': obj_id,
|
|
|
'auth_user_id': auth_user_id
|
|
|
- })
|
|
|
- results = []
|
|
|
- next_token = None
|
|
|
+ }).fetchall()
|
|
|
+
|
|
|
+ if only_count:
|
|
|
+ return results[0][0]
|
|
|
+ else:
|
|
|
+ return list(results)
|
|
|
+
|
|
|
+
|
|
|
+def get_query_gaps (auth_user_id, query_type = 'home_feed', min_gap_hours = 1.0, max_age_days = 21.0):
|
|
|
+
|
|
|
+
|
|
|
+ sql = """
|
|
|
+ WITH ordered_tweets AS
|
|
|
+ (
|
|
|
+ SELECT
|
|
|
+ t.*,
|
|
|
+ q.auth_user_id,
|
|
|
+ (julianday(current_timestamp) - julianday(t.created_at)) as row_age_days,
|
|
|
+ ROW_NUMBER() OVER (ORDER BY t.created_at asc) rn
|
|
|
+ FROM tweet t
|
|
|
+ JOIN query q
|
|
|
+ on q.rowid = t.query_id
|
|
|
+ WHERE
|
|
|
+ q.query_type = :QUERY_TYPE
|
|
|
+ AND q.auth_user_id = :AUTH_USER_ID
|
|
|
+ AND row_age_days < :MAX_AGE_DAYS
|
|
|
+ )
|
|
|
+ SELECT
|
|
|
+ o1.id since_id,
|
|
|
+ o1.created_at start_time,
|
|
|
+ o2.id until_id,
|
|
|
+ o2.created_at end_time,
|
|
|
+ --CAST(strftime('%s', o2.created_at) as integer) - CAST(strftime('%s', o1.created_at) as integer) gap_seconds2,
|
|
|
+ --(julianday(o2.created_at) - julianday(o1.created_at)) * 86400 gap_seconds,
|
|
|
+ (julianday(o2.created_at) - julianday(o1.created_at)) * 24 gap_hours
|
|
|
+ FROM ordered_tweets o1
|
|
|
+ JOIN ordered_tweets o2
|
|
|
+ ON (
|
|
|
+ o1.rn + 1 = o2.rn
|
|
|
+ )
|
|
|
+ WHERE gap_hours >= :MIN_GAP_HOURS
|
|
|
+ order by start_time desc
|
|
|
+ """
|
|
|
+
|
|
|
+ params = dict(
|
|
|
+ QUERY_TYPE = query_type,
|
|
|
+ AUTH_USER_ID = auth_user_id,
|
|
|
+ MAX_AGE_DAYS = max_age_days,
|
|
|
+ MIN_GAP_HOURS = min_gap_hours
|
|
|
+ )
|
|
|
+
|
|
|
+ db = sqlite3.connect(CACHE_PATH)
|
|
|
+
|
|
|
+ cur = db.cursor()
|
|
|
+ cur.row_factory = sqlite3.Row
|
|
|
+
|
|
|
+ results = cur.execute(sql, params).fetchall()
|
|
|
+
|
|
|
+ cur.close()
|
|
|
|
|
|
- return results, next_token
|
|
|
+ rows = list(map(dict, results))
|
|
|
+
|
|
|
+ return rows
|
|
|
+
|
|
|
+
|
|
|
|
|
|
def get_tweet_item (tweet_id, me=None):
|
|
|
|
|
@@ -335,7 +489,9 @@ def get_tweet_item (tweet_id, me=None):
|
|
|
|
|
|
print(json.dumps(err, indent=2))
|
|
|
|
|
|
-
|
|
|
+ if not tweets_response.data:
|
|
|
+ return
|
|
|
+
|
|
|
includes = tweets_response.includes
|
|
|
tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), tweets_response.data))
|
|
|
|
|
@@ -418,9 +574,15 @@ def get_bookmarks_feed (user_id, pagination_token=None, max_results=10, me=None)
|
|
|
|
|
|
cache_tweets_response(response_tweets, 'bookmarks', user_id, user_id=user_id, pagination_token=pagination_token)
|
|
|
|
|
|
- includes = response_tweets.includes
|
|
|
- tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), response_tweets.data))
|
|
|
- next_token = response_tweets.meta.next_token
|
|
|
+ if response_tweets.data:
|
|
|
+ includes = response_tweets.includes
|
|
|
+ tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), response_tweets.data))
|
|
|
+ next_token = response_tweets.meta.next_token
|
|
|
+ else:
|
|
|
+ print('no tweet data:')
|
|
|
+ print(response_tweets)
|
|
|
+ tweets = []
|
|
|
+ next_token = None
|
|
|
|
|
|
query = {}
|
|
|
|
|
@@ -463,6 +625,8 @@ def get_user_feed (user_id, me=None, **twitter_kwargs):
|
|
|
|
|
|
|
|
|
tweet_source = ApiV2TweetSource(token)
|
|
|
+
|
|
|
+
|
|
|
tweets_response = tweet_source.get_user_timeline(user_id,
|
|
|
return_dataclass=True,
|
|
|
**twitter_kwargs)
|
|
@@ -483,16 +647,29 @@ def get_user_feed (user_id, me=None, **twitter_kwargs):
|
|
|
print('profile get_user_timeline errors:')
|
|
|
print(tweets_response.errors)
|
|
|
|
|
|
+ tweets = tweets_response.data
|
|
|
+
|
|
|
pagination_token=twitter_kwargs.get('pagination_token')
|
|
|
|
|
|
+
|
|
|
+ # NOTE we need to calculate this before we cache the response.
|
|
|
+ tweets_viewed = {}
|
|
|
+
|
|
|
+ if auth_user_id and tweets:
|
|
|
+ for tweet in tweets:
|
|
|
+ tweet_viewed = get_object_over_time('tweet', tweet.id, auth_user_id, only_count=True)
|
|
|
+ #tweet_viewed = len(tweet_over_time)
|
|
|
+
|
|
|
+ tweets_viewed[tweet.id] = tweet_viewed
|
|
|
+
|
|
|
cache_tweets_response(tweets_response, 'user_feed', auth_user_id, user_id=user_id, pagination_token=pagination_token)
|
|
|
|
|
|
ts = int(time.time() * 1000)
|
|
|
with open(f'{DATA_DIR}/cache/tl_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
|
|
|
f.write(json.dumps(cleandict(asdict(tweets_response))))
|
|
|
|
|
|
- if tweets_response.data:
|
|
|
- tweets = list(map(lambda t: tweet_model_dc_vm(tweets_response.includes, t, me), tweets_response.data))
|
|
|
+ if tweets:
|
|
|
+ tweets = list(map(lambda t: tweet_model_dc_vm(tweets_response.includes, t, me, tweets_viewed=tweets_viewed), tweets))
|
|
|
|
|
|
next_token = tweets_response.meta.next_token
|
|
|
|
|
@@ -534,7 +711,7 @@ def get_users (content_ids, me=None, pagination_token=None) -> Optional[List[h_v
|
|
|
social_graph = TwitterApiV2SocialGraph(token)
|
|
|
users_response = social_graph.get_users(content_ids, return_dataclass=True)
|
|
|
|
|
|
- if not len(users_response.data):
|
|
|
+ if not users_response.data or not len(users_response.data):
|
|
|
return
|
|
|
|
|
|
cache_users_response(users_response, f'users', auth_user_id, pagination_token=pagination_token)
|
|
@@ -556,12 +733,30 @@ def get_home_feed (user_id, me, **query_kwargs):
|
|
|
|
|
|
pagination_token = query_kwargs.get('pagination_token')
|
|
|
|
|
|
- cache_tweets_response(response, 'home_feed', auth_user_id, user_id=user_id, pagination_token=pagination_token)
|
|
|
|
|
|
- includes = response.includes
|
|
|
- tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), response.data))
|
|
|
- next_token = response.meta.next_token
|
|
|
|
|
|
+ # NOTE we need to calculate this before we cache the response.
|
|
|
+ tweets_viewed = {}
|
|
|
+
|
|
|
+ if response.data:
|
|
|
+ for tweet in response.data:
|
|
|
+ tweet_viewed = get_object_over_time('tweet', tweet.id, auth_user_id, only_count=True)
|
|
|
+ #tweet_viewed = len(tweet_over_time)
|
|
|
+
|
|
|
+ tweets_viewed[tweet.id] = tweet_viewed
|
|
|
+
|
|
|
+
|
|
|
+ cache_tweets_response(response, 'home_feed', auth_user_id, user_id=user_id, pagination_token=pagination_token)
|
|
|
+
|
|
|
+ includes = response.includes
|
|
|
+ tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me, tweets_viewed=tweets_viewed), response.data))
|
|
|
+ next_token = response.meta.next_token
|
|
|
+ else:
|
|
|
+ print('no tweet data:')
|
|
|
+ print(response)
|
|
|
+ tweets = []
|
|
|
+ next_token = None
|
|
|
+
|
|
|
collection_page = CollectionPage(
|
|
|
id = user_id,
|
|
|
items = tweets,
|