content_source.py 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030
  1. """
  2. This translates from the Tweet Source and Twitter v2 types
  3. Into ViewModel types such as FeedItem
  4. And the rest of the Taxonomy.
  5. """
  6. from dataclasses import asdict
  7. from typing import List, Optional
  8. import dacite
  9. import os
  10. from flask import session, g, request
  11. import time
  12. from datetime import datetime, timezone
  13. import json
  14. import sqlite3
  15. from twitter_v2.api import ApiV2TweetSource, TwitterApiV2SocialGraph, ApiV2ConversationSource
  16. import twitter_v2.types as tv2_types
  17. import hogumathi_app.view_model as h_vm
  18. from hogumathi_app.view_model import CollectionPage, cleandict
  19. from hogumathi_app.content_system import register_content_source, get_content, register_hook
  20. from .view_model import tweet_model_dc_vm, user_model_dc
  21. DATA_DIR='.data'
  22. CACHE_PATH = f'{DATA_DIR}/twitter_v2_cache.db'
  23. def init_cache_db ():
  24. db = sqlite3.connect(CACHE_PATH)
  25. cur = db.cursor()
  26. table_exists = cur.execute(f"SELECT count(*) FROM sqlite_master WHERE type='table' AND name='tweet'").fetchone()[0]
  27. if not table_exists:
  28. cur.execute("""
  29. create table query (
  30. created_at timestamp,
  31. user_id text,
  32. last_accessed_at timestamp,
  33. next_token text,
  34. query_type text,
  35. auth_user_id text
  36. )
  37. """)
  38. cur.execute("""
  39. create table tweet (
  40. id text,
  41. accessed_at timestamp,
  42. query_id int,
  43. data text,
  44. created_at timestamp,
  45. unique(id, query_id)
  46. )
  47. """)
  48. cur.execute("""
  49. create table user (
  50. id text,
  51. accessed_at timestamp,
  52. query_id int,
  53. data text,
  54. unique(id, query_id)
  55. )
  56. """)
  57. cur.execute("""
  58. create table medium (
  59. id text,
  60. accessed_at timestamp,
  61. query_id int,
  62. data text,
  63. unique(id, query_id)
  64. )
  65. """)
  66. cur.connection.commit()
  67. print(f'--- created {CACHE_PATH}')
  68. cur.close()
  69. def cache_tweets_response (response_tweets, query_type, auth_user_id, user_id = None, pagination_token=None, ts=None):
  70. """
  71. In bookmarks I observed that the same next_token is returned even with distinct new queries started.
  72. So in the case of abandoned paginations, we can end up with duplicate next_token records,
  73. meaning we could update the wrong query_id, having downstream timestamp effects.
  74. """
  75. includes = response_tweets.includes
  76. tweets = response_tweets.data or []
  77. users = includes and includes.users or []
  78. media = includes and includes.media or []
  79. ref_tweets = includes and includes.tweets or []
  80. if response_tweets.meta and 'next_token' in response_tweets.meta:
  81. next_token = response_tweets.meta.next_token
  82. else:
  83. next_token = None
  84. db = sqlite3.connect(CACHE_PATH)
  85. cur = db.cursor()
  86. # SQLite is naive by default, so make sure this is UTC.
  87. now = datetime.now(timezone.utc)
  88. if ts:
  89. now = ts
  90. if not pagination_token:
  91. cur.execute("""
  92. insert into query (
  93. created_at,
  94. last_accessed_at,
  95. user_id,
  96. next_token,
  97. query_type,
  98. auth_user_id
  99. )
  100. values (
  101. ?,?,?,?,?,?
  102. )
  103. """,
  104. [now, now, user_id, next_token, query_type, auth_user_id]
  105. )
  106. query_id = cur.lastrowid
  107. else:
  108. query_id = cur.execute("""
  109. select rowid from query
  110. where next_token = :next_token
  111. """,
  112. {
  113. 'next_token': pagination_token
  114. }).fetchone()[0]
  115. cur.execute("""
  116. update query set
  117. last_accessed_at = :last_accessed_at,
  118. next_token = :next_token
  119. where rowid = :query_id
  120. """,
  121. {
  122. 'last_accessed_at': now,
  123. 'next_token': next_token,
  124. 'query_id': query_id
  125. })
  126. for tweet in tweets:
  127. tweet_json = json.dumps(cleandict(asdict(tweet)))
  128. cur.execute("""
  129. insert or ignore into tweet (
  130. id,
  131. accessed_at,
  132. query_id,
  133. data,
  134. created_at
  135. )
  136. values (
  137. ?,?,?,?, ?
  138. )
  139. """,
  140. # dateutil.parser.parse( tweet. created_at ) if error
  141. [ tweet.id, now, query_id, tweet_json, tweet.created_at ]
  142. )
  143. # FIXME insert ref_tweets, mark in some way... is_ref = 1? sort_order = NULL?
  144. # sort_order begins with count having order prior to insert...
  145. for user in users:
  146. user_json = json.dumps(cleandict(asdict(user)))
  147. cur.execute("""
  148. insert or ignore into user (
  149. id,
  150. accessed_at,
  151. query_id,
  152. data
  153. )
  154. values (
  155. ?,?,?,?
  156. )
  157. """,
  158. [ user.id, now, query_id, user_json ]
  159. )
  160. for medium in media:
  161. medium_json = json.dumps(cleandict(asdict(medium)))
  162. cur.execute("""
  163. insert or ignore into medium (
  164. id,
  165. accessed_at,
  166. query_id,
  167. data
  168. )
  169. values (
  170. ?,?,?,?
  171. )
  172. """,
  173. [ medium.media_key, now, query_id, medium_json ]
  174. )
  175. cur.connection.commit()
  176. cur.close()
  177. def cache_users_response (response_users, query_type, auth_user_id, user_id = None, pagination_token=None, ts=None):
  178. users = response_users.data or []
  179. next_token = response_users.meta and response_users.meta.get('next_token')
  180. db = sqlite3.connect(CACHE_PATH)
  181. cur = db.cursor()
  182. # SQLite is naive by default, so make sure this is UTC.
  183. now = None
  184. if ts:
  185. now = ts
  186. if not pagination_token:
  187. cur.execute("""
  188. insert into query (
  189. created_at,
  190. last_accessed_at,
  191. user_id,
  192. next_token,
  193. query_type,
  194. auth_user_id
  195. )
  196. values (
  197. ?,?,?,?,?,?
  198. )
  199. """,
  200. [now, now, user_id, next_token, query_type, auth_user_id]
  201. )
  202. query_id = cur.lastrowid
  203. else:
  204. query_id = cur.execute("""
  205. select rowid from query
  206. where next_token = :next_token
  207. """,
  208. {
  209. 'next_token': pagination_token
  210. }).fetchone()[0]
  211. cur.execute("""
  212. update query set
  213. last_accessed_at = :last_accessed_at,
  214. next_token = :next_token
  215. where rowid = :query_id
  216. """,
  217. {
  218. 'last_accessed_at': now,
  219. 'next_token': next_token,
  220. 'query_id': query_id
  221. })
  222. for user in users:
  223. user_json = json.dumps(cleandict(asdict(user)))
  224. cur.execute("""
  225. insert or ignore into user (
  226. id,
  227. accessed_at,
  228. query_id,
  229. data
  230. )
  231. values (
  232. ?,?,?,?
  233. )
  234. """,
  235. [ user.id, now, query_id, user_json ]
  236. )
  237. cur.connection.commit()
  238. cur.close()
  239. def get_cached_collection_all_latest (auth_user_id, query_type = 'bookmarks', user_id=None):
  240. """
  241. Across all queries of a type, return the latest distinct Tweet.
  242. This is good for bookmarks, likes or retweets where we remove them after a period upstream
  243. but still want to fetch anything we've ever added.
  244. Ideally we don't need this in the long term and instead auto sync new items to a local collection.
  245. "But for now."
  246. """
  247. sql = """
  248. select t.id, t.accessed_at, t.data
  249. from query q, tweet t
  250. where
  251. t.query_id = q.rowid
  252. and (q.auth_user_id in (:auth_user_id) or q.auth_user_id is null)
  253. and q.query_type = :query_type
  254. -- need to store author_id with tweets to get the user data out.
  255. -- could also make a join table tweet_user, like tweet_media; they won't change.
  256. --and u.query_id = q.rowid
  257. --and u.id == t.author_id
  258. group by t.id
  259. having t.accessed_at = max(t.accessed_at)
  260. order by t.id desc, t.accessed_at desc
  261. limit :limit
  262. """
  263. params = {
  264. 'query_type': query_type,
  265. 'auth_user_id': auth_user_id,
  266. 'limit': 10
  267. }
  268. db = sqlite3.connect(CACHE_PATH)
  269. cur = db.cursor()
  270. cached_tweets = cur.execute(sql, params).fetchall()
  271. tweets = list()
  272. user_ids = set()
  273. media_keys = set()
  274. referenced_tweet_ids = set()
  275. for row in cached_tweets:
  276. tweet_id, accessed_at, tweet_json = row
  277. tweet = dacite.from_dict( data_class=tv2_types.Tweet, data=json.loads(tweet_json) )
  278. user_ids.add(tweet.author_id)
  279. for a in tweet.attachments:
  280. for mk in a.media_keys:
  281. media_keys.add(mk.media_key)
  282. #for tweet_ref in tweet.referenced_tweets:
  283. # referenced_tweet_ids.add(tweet_ref.id)
  284. # # FIXME we also need to reference these users.
  285. tweets.append(feed_item)
  286. feed_items = []
  287. includes = {
  288. 'tweets': [],
  289. 'users': [],
  290. 'media': []
  291. }
  292. for tweet in tweets:
  293. # FIXME return view models rather than raw tweets. need to join user and media, see query comment.
  294. #feed_item = tweet_model_dc_vm(tweet, ...)
  295. feed_item = tweet
  296. feed_items.append(feed_item)
  297. return feed_items
  298. def get_object_over_time (obj_type, obj_id, auth_user_id, only_count = False):
  299. """
  300. Return all occurances of an object over time,
  301. Or if only_count is true then return just the count
  302. """
  303. db = sqlite3.connect(CACHE_PATH)
  304. cur = db.cursor()
  305. if only_count:
  306. fields = 'count(*)'
  307. else:
  308. fields = 't.*'
  309. results = cur.execute(f"""
  310. select {fields}
  311. from {obj_type} t, query q
  312. where
  313. t.id = :obj_id
  314. and q.rowid = t.query_id
  315. and (q.auth_user_id in (:auth_user_id) or q.auth_user_id is null)
  316. """,
  317. {
  318. 'obj_id': obj_id,
  319. 'auth_user_id': auth_user_id
  320. }).fetchall()
  321. if only_count:
  322. return results[0][0]
  323. else:
  324. return list(results)
  325. def get_query_gaps (auth_user_id, query_type = 'home_feed', min_gap_hours = 1.0, max_age_days = 21.0):
  326. sql = """
  327. WITH ordered_tweets AS
  328. (
  329. SELECT
  330. t.*,
  331. q.auth_user_id,
  332. (julianday(current_timestamp) - julianday(t.created_at)) as row_age_days,
  333. ROW_NUMBER() OVER (ORDER BY t.created_at asc) rn
  334. FROM tweet t
  335. JOIN query q
  336. on q.rowid = t.query_id
  337. WHERE
  338. q.query_type = :QUERY_TYPE
  339. AND q.auth_user_id = :AUTH_USER_ID
  340. AND row_age_days < :MAX_AGE_DAYS
  341. )
  342. SELECT
  343. o1.id since_id,
  344. o1.created_at start_time,
  345. o2.id until_id,
  346. o2.created_at end_time,
  347. --CAST(strftime('%s', o2.created_at) as integer) - CAST(strftime('%s', o1.created_at) as integer) gap_seconds2,
  348. --(julianday(o2.created_at) - julianday(o1.created_at)) * 86400 gap_seconds,
  349. (julianday(o2.created_at) - julianday(o1.created_at)) * 24 gap_hours
  350. FROM ordered_tweets o1
  351. JOIN ordered_tweets o2
  352. ON (
  353. o1.rn + 1 = o2.rn
  354. )
  355. WHERE gap_hours >= :MIN_GAP_HOURS
  356. order by start_time desc
  357. """
  358. params = dict(
  359. QUERY_TYPE = query_type,
  360. AUTH_USER_ID = auth_user_id,
  361. MAX_AGE_DAYS = max_age_days,
  362. MIN_GAP_HOURS = min_gap_hours
  363. )
  364. db = sqlite3.connect(CACHE_PATH)
  365. cur = db.cursor()
  366. cur.row_factory = sqlite3.Row
  367. results = cur.execute(sql, params).fetchall()
  368. cur.close()
  369. rows = list(map(dict, results))
  370. return rows
  371. def get_tweet_item (tweet_id, me=None):
  372. if me:
  373. twitter_user = session.get(me)
  374. token = twitter_user['access_token']
  375. else:
  376. token = os.environ.get('BEARER_TOKEN')
  377. tweet_source = ApiV2TweetSource(token)
  378. tweets_response = tweet_source.get_tweet(tweet_id, return_dataclass=True)
  379. #print(response_json)
  380. if tweets_response.errors:
  381. # types:
  382. # https://api.twitter.com/2/problems/not-authorized-for-resource (blocked or suspended)
  383. # https://api.twitter.com/2/problems/resource-not-found (deleted)
  384. #print(response_json.get('errors'))
  385. for err in tweets_response.errors:
  386. if not 'type' in err:
  387. print('unknown error type: ' + str(err))
  388. elif err['type'] == 'https://api.twitter.com/2/problems/not-authorized-for-resource':
  389. print('blocked or suspended tweet: ' + err['value'])
  390. elif err['type'] == 'https://api.twitter.com/2/problems/resource-not-found':
  391. print('deleted tweet: ' + err['value'])
  392. else:
  393. print('unknown error')
  394. print(json.dumps(err, indent=2))
  395. if not tweets_response.data:
  396. return
  397. includes = tweets_response.includes
  398. tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), tweets_response.data))
  399. collection_page = CollectionPage(
  400. id = tweet_id,
  401. items = tweets,
  402. next_token = None # Fixme
  403. )
  404. return collection_page
  405. def tweet_embed_template (tweet_id):
  406. features = '{"tfw_timeline_list":{"bucket":[],"version":null},"tfw_follower_count_sunset":{"bucket":true,"version":null},"tfw_tweet_edit_backend":{"bucket":"on","version":null},"tfw_refsrc_session":{"bucket":"on","version":null},"tfw_mixed_media_15897":{"bucket":"treatment","version":null},"tfw_experiments_cookie_expiration":{"bucket":1209600,"version":null},"tfw_duplicate_scribes_to_settings":{"bucket":"on","version":null},"tfw_video_hls_dynamic_manifests_15082":{"bucket":"true_bitrate","version":null},"tfw_legacy_timeline_sunset":{"bucket":true,"version":null},"tfw_tweet_edit_frontend":{"bucket":"on","version":null}}'
  407. # base64 + encode URI component
  408. features_encoded = 'eyJ0ZndfdGltZWxpbmVfbGlzdCI6eyJidWNrZXQiOltdLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X2ZvbGxvd2VyX2NvdW50X3N1bnNldCI6eyJidWNrZXQiOnRydWUsInZlcnNpb24iOm51bGx9LCJ0ZndfdHdlZXRfZWRpdF9iYWNrZW5kIjp7ImJ1Y2tldCI6Im9uIiwidmVyc2lvbiI6bnVsbH0sInRmd19yZWZzcmNfc2Vzc2lvbiI6eyJidWNrZXQiOiJvbiIsInZlcnNpb24iOm51bGx9LCJ0ZndfbWl4ZWRfbWVkaWFfMTU4OTciOnsiYnVja2V0IjoidHJlYXRtZW50IiwidmVyc2lvbiI6bnVsbH0sInRmd19leHBlcmltZW50c19jb29raWVfZXhwaXJhdGlvbiI6eyJidWNrZXQiOjEyMDk2MDAsInZlcnNpb24iOm51bGx9LCJ0ZndfZHVwbGljYXRlX3NjcmliZXNfdG9fc2V0dGluZ3MiOnsiYnVja2V0Ijoib24iLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X3ZpZGVvX2hsc19keW5hbWljX21hbmlmZXN0c18xNTA4MiI6eyJidWNrZXQiOiJ0cnVlX2JpdHJhdGUiLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X2xlZ2FjeV90aW1lbGluZV9zdW5zZXQiOnsiYnVja2V0Ijp0cnVlLCJ2ZXJzaW9uIjpudWxsfSwidGZ3X3R3ZWV0X2VkaXRfZnJvbnRlbmQiOnsiYnVja2V0Ijoib24iLCJ2ZXJzaW9uIjpudWxsfX0%3D'
  409. origin = f"http%3A%2F%2Flocalhost%3A5004%2Ftwitter%2Ftweet2%2F{tweet_id}.html"
  410. width = 550
  411. height = 755
  412. theme = "dark" # or light
  413. hide_card = "false"
  414. hide_thread = "false"
  415. src = f"https://platform.twitter.com/embed/Tweet.html?dnt=true&features={features_encoded}&origin={origin}&frame=false&hideCard={hide_card}&hideThread={hide_thread}&id={tweet_id}&lang=en&theme=dark&width={width}px"
  416. html = f"""
  417. <iframe src="{src}" data-tweet-id="{tweet_id}"
  418. scrolling="no" frameborder="0" allowtransparency="true" allowfullscreen="true" class="" style="position: static; visibility: visible; width: {width}px; height: {height}px; display: block; flex-grow: 1;" title="Twitter Tweet"
  419. ></iframe>
  420. """
  421. return html
  422. # https://developer.twitter.com/en/docs/twitter-for-websites/embedded-tweets/overview
  423. def get_tweet_embed (tweet_id):
  424. html = tweet_embed_template(tweet_id)
  425. post = h_vm.FeedItem(
  426. id = tweet_id,
  427. created_at = 'some time',
  428. display_name = 'Twitter User',
  429. handle = 'tweetuser',
  430. html = html
  431. )
  432. return post
  433. def get_bookmarks_feed (user_id, pagination_token=None, max_results=10, me=None):
  434. if not me:
  435. me = g.get('me') or request.args.get('me')
  436. print(f'get_bookmarks_feed. me={me}')
  437. twitter_user = session.get( me )
  438. if not twitter_user:
  439. return None
  440. token = twitter_user['access_token']
  441. tweet_source = ApiV2TweetSource(token)
  442. response_tweets = tweet_source.get_bookmarks(user_id,
  443. pagination_token = pagination_token,
  444. return_dataclass=True,
  445. max_results=max_results
  446. )
  447. #print(response_json)
  448. cache_tweets_response(response_tweets, 'bookmarks', user_id, user_id=user_id, pagination_token=pagination_token)
  449. if response_tweets.data:
  450. includes = response_tweets.includes
  451. tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me), response_tweets.data))
  452. next_token = response_tweets.meta.next_token
  453. else:
  454. print('no tweet data:')
  455. print(response_tweets)
  456. tweets = []
  457. next_token = None
  458. query = {}
  459. if next_token:
  460. query = {
  461. **query,
  462. }
  463. user = {
  464. 'id': user_id
  465. }
  466. ts = int(time.time() * 1000)
  467. with open(f'{DATA_DIR}/cache/bookmarks_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
  468. f.write(json.dumps(cleandict(asdict(response_tweets))))
  469. collection_page = CollectionPage(
  470. id = user_id, # FIXME this should perhaps be the unresolved id
  471. items = tweets,
  472. next_token = next_token
  473. )
  474. return collection_page
  475. def get_user_feed (user_id, me=None, **twitter_kwargs):
  476. if not me and 'me' in g:
  477. me = g.me
  478. if 'twitter_user' in g and g.twitter_user:
  479. token = g.twitter_user['access_token']
  480. # issue: retweets don't come back if we request non_public_metrics
  481. is_me = False and user_id == g.twitter_user['id']
  482. auth_user_id = g.twitter_user['id']
  483. else:
  484. token = os.environ.get('BEARER_TOKEN')
  485. is_me = False
  486. auth_user_id = None
  487. tweet_source = ApiV2TweetSource(token)
  488. tweets_response = tweet_source.get_user_timeline(user_id,
  489. return_dataclass=True,
  490. **twitter_kwargs)
  491. tweets = None
  492. if not tweets_response:
  493. print('no response_json')
  494. if tweets_response.meta and tweets_response.meta.result_count == 0:
  495. print('no results')
  496. print(tweets_response)
  497. if not tweets_response.includes:
  498. print(tweets_response)
  499. print('no tweets_response.includes')
  500. if tweets_response.errors:
  501. print('profile get_user_timeline errors:')
  502. print(tweets_response.errors)
  503. tweets = tweets_response.data
  504. pagination_token=twitter_kwargs.get('pagination_token')
  505. # NOTE we need to calculate this before we cache the response.
  506. tweets_viewed = {}
  507. if auth_user_id and tweets:
  508. for tweet in tweets:
  509. tweet_viewed = get_object_over_time('tweet', tweet.id, auth_user_id, only_count=True)
  510. #tweet_viewed = len(tweet_over_time)
  511. tweets_viewed[tweet.id] = tweet_viewed
  512. cache_tweets_response(tweets_response, 'user_feed', auth_user_id, user_id=user_id, pagination_token=pagination_token)
  513. ts = int(time.time() * 1000)
  514. with open(f'{DATA_DIR}/cache/tl_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
  515. f.write(json.dumps(cleandict(asdict(tweets_response))))
  516. if tweets:
  517. tweets = list(map(lambda t: tweet_model_dc_vm(tweets_response.includes, t, me, tweets_viewed=tweets_viewed), tweets))
  518. next_token = tweets_response.meta.next_token
  519. collection_page = CollectionPage(
  520. id = user_id,
  521. items = tweets,
  522. next_token = next_token
  523. )
  524. return collection_page
  525. def get_tweets_collection (content_ids, pagination_token=None, max_results=None):
  526. """
  527. We might be able to have a generalizer in the content system as well...
  528. If a source exposes a get many interface then use it. We want to avoid many singular fetches.
  529. """
  530. return []
  531. def get_user (user_id, me=None) -> Optional[h_vm.FeedServiceUser]:
  532. users = get_users([user_id], me=me)
  533. if users:
  534. return users[0]
  535. def get_users (content_ids, me=None, pagination_token=None) -> Optional[List[h_vm.FeedServiceUser]]:
  536. """
  537. """
  538. if me:
  539. twitter_user = session.get(me)
  540. token = twitter_user['access_token']
  541. auth_user_id = twitter_user['id']
  542. else:
  543. token = os.environ.get('BEARER_TOKEN')
  544. auth_user_id = None
  545. social_graph = TwitterApiV2SocialGraph(token)
  546. users_response = social_graph.get_users(content_ids, return_dataclass=True)
  547. if not users_response.data or not len(users_response.data):
  548. return
  549. cache_users_response(users_response, f'users', auth_user_id, pagination_token=pagination_token)
  550. users = list(map(user_model_dc, users_response.data))
  551. return users
  552. def get_home_feed (user_id, me, **query_kwargs):
  553. twitter_user = session.get(me)
  554. token = twitter_user['access_token']
  555. auth_user_id = twitter_user['id']
  556. tweet_source = ApiV2TweetSource(token)
  557. response = tweet_source.get_home_timeline(user_id, **query_kwargs)
  558. #print(json.dumps(response_json, indent=2))
  559. pagination_token = query_kwargs.get('pagination_token')
  560. # NOTE we need to calculate this before we cache the response.
  561. tweets_viewed = {}
  562. if response.data:
  563. for tweet in response.data:
  564. tweet_viewed = get_object_over_time('tweet', tweet.id, auth_user_id, only_count=True)
  565. #tweet_viewed = len(tweet_over_time)
  566. tweets_viewed[tweet.id] = tweet_viewed
  567. cache_tweets_response(response, 'home_feed', auth_user_id, user_id=user_id, pagination_token=pagination_token)
  568. includes = response.includes
  569. tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, me, tweets_viewed=tweets_viewed), response.data))
  570. next_token = response.meta.next_token
  571. else:
  572. print('no tweet data:')
  573. print(response)
  574. tweets = []
  575. next_token = None
  576. collection_page = CollectionPage(
  577. id = user_id,
  578. items = tweets,
  579. next_token = next_token
  580. )
  581. return collection_page
  582. def get_author_threads (user_id):
  583. """
  584. Placeholder implementation where we can manually add threads to a collection,
  585. but ultimately we will query a local Tweet DB that gets populated through various means.
  586. Once we store Tweets we can easily query this.
  587. We can filter by author_id,conversation_id order by in_reply_to_tweet_id,id
  588. """
  589. return get_content(f'collection:twitter.threads_{user_id}')
  590. def get_tweet_replies (conversation_id, in_reply_to_id=None, pagination_token=None, max_results=None, author_id=None):
  591. """
  592. New function, not used yet
  593. """
  594. tweet_source = ApiV2TweetSource(token)
  595. auth_user_id = None
  596. only_replies = view == 'replies'
  597. tweets = []
  598. skip_embed_replies = False
  599. if view == 'replies':
  600. replies_response = tweet_source.get_thread(in_reply_to_id,
  601. only_replies=True,
  602. pagination_token = pagination_token,
  603. return_dataclass=True)
  604. elif view == 'thread':
  605. skip_embed_replies = True
  606. replies_response = tweet_source.get_thread(conversation_id,
  607. only_replies=False,
  608. author_id=author_id,
  609. pagination_token = pagination_token,
  610. return_dataclass=True)
  611. elif view == 'conversation':
  612. replies_response = tweet_source.get_thread(conversation_id,
  613. only_replies=False,
  614. pagination_token = pagination_token,
  615. return_dataclass=True)
  616. elif view == 'tweet':
  617. replies_response = None
  618. next_token = None
  619. #print("conversation meta:")
  620. #print(json.dumps(tweets_response.get('meta'), indent=2))
  621. if replies_response and replies_response.meta and replies_response.meta.result_count:
  622. cache_tweets_response(replies_response, 'tweet_replies', auth_user_id, user_id=user_id, pagination_token=pagination_token)
  623. includes = replies_response.includes
  624. tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me, expand_path=request.args.get('expand'), reply_depth=1), replies_response.data)) + tweets
  625. next_token = replies_response.meta.next_token
  626. # this method is OK except it doesn't work if there are no replies.
  627. #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me))
  628. #related_tweets = [] # derived from includes
  629. tweets.reverse()
  630. query = {}
  631. if next_token:
  632. query = {
  633. **query,
  634. # FIXME only_replies
  635. 'next_data_url': url_for('.get_tweet2_html', tweet_id=tweet_id, pagination_token=next_token, only_replies = '1' if only_replies else '0', author_id = tweets[0].author_id),
  636. 'next_page_url': url_for('.get_tweet2_html', tweet_id=tweet_id, view=view, pagination_token=next_token)
  637. }
  638. user = {
  639. }
  640. if view == 'replies':
  641. tweet = tweets[0]
  642. if tweet.id == '1608510741941989378':
  643. unreplied = [
  644. UnrepliedSection(
  645. description = "Not clear what GS is still.",
  646. span = (40, 80)
  647. )
  648. ]
  649. tweet = replace(tweet,
  650. unreplied = unreplied
  651. )
  652. expand_parts = request.args.get('expand')
  653. if expand_parts:
  654. expand_parts = expand_parts.split(',')
  655. def reply_to_thread_item (fi):
  656. nonlocal expand_parts
  657. if fi.id == '1609714342211244038':
  658. print(f'reply_to_thread_item id={fi.id}')
  659. unreplied = [
  660. UnrepliedSection(
  661. description = "Is there proof of this claim?",
  662. span = (40, 80)
  663. )
  664. ]
  665. fi = replace(fi,
  666. unreplied = unreplied
  667. )
  668. children = None
  669. if expand_parts and len(expand_parts) and fi.id == expand_parts[0]:
  670. expand_parts = expand_parts[1:]
  671. print(f'getting expanded replied for tweet={fi.id}')
  672. expanded_replies_response = tweet_source.get_thread(fi.id,
  673. only_replies=True,
  674. return_dataclass=True)
  675. if expanded_replies_response.data:
  676. print('we got expanded responses data')
  677. children = list(map(lambda t: tweet_model_dc_vm(expanded_replies_response.includes, t, g.me, expand_path=request.args.get('expand'), reply_depth=1), expanded_replies_response.data))
  678. children = list(map(reply_to_thread_item, children))
  679. return ThreadItem(feed_item=fi, children=children)
  680. children = list(map(reply_to_thread_item, tweets[1:]))
  681. root = ThreadItem(
  682. feed_item = tweet,
  683. children = children
  684. )
  685. return render_template('tweet-thread.html', user = user, root = root, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info)
  686. else:
  687. return render_template(f'tweet-collection{theme_variant}.html', user = user, tweets = tweets, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info)
  688. def get_following_users (user_id, me=None, max_results=1000, pagination_token=None):
  689. if me:
  690. twitter_user = session.get(me)
  691. token = twitter_user['access_token']
  692. auth_user_id = twitter_user['id']
  693. else:
  694. token = os.environ.get('BEARER_TOKEN')
  695. auth_user_id = None
  696. social_source = TwitterApiV2SocialGraph(token)
  697. following_resp = social_source.get_following(user_id,
  698. max_results=max_results, pagination_token=pagination_token, return_dataclass=True)
  699. cache_users_response(following_resp, 'following', auth_user_id, user_id = user_id, pagination_token=pagination_token)
  700. ts = int(time.time() * 1000)
  701. with open(f'{DATA_DIR}/cache/following_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
  702. f.write(json.dumps(cleandict(asdict(following_resp))))
  703. #print(following_resp)
  704. #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': following_resp})
  705. #following = list(map(lambda f: f['id'], following_resp.get('data')))
  706. following = list(map(user_model_dc, following_resp.data))
  707. total_count = following_resp.meta.get('result_count')
  708. next_token = following_resp.meta.get('next_token')
  709. collection_page = CollectionPage(
  710. id = user_id,
  711. items = following,
  712. total_count = total_count,
  713. next_token = next_token
  714. )
  715. return collection_page
  716. def get_followers_user (user_id, me=None, max_results=1000, pagination_token=None):
  717. if me:
  718. twitter_user = session.get(me)
  719. token = twitter_user['access_token']
  720. auth_user_id = twitter_user['id']
  721. else:
  722. token = os.environ.get('BEARER_TOKEN')
  723. auth_user_id = None
  724. use_cache = False # this concept is broken for now
  725. if use_cache: # this concept is broken for now
  726. print(f'using cache for user {user_id}: {use_cache}')
  727. with open(f'.data/cache/followers_{user_id}_{pagination_token}_{use_cache}.json', 'rt') as f:
  728. response_json = json.load(f)
  729. else:
  730. social_source = TwitterApiV2SocialGraph(token)
  731. followers_resp = social_source.get_followers(user_id, max_results=max_results, pagination_token=pagination_token, return_dataclass=True)
  732. ts = int(time.time() * 1000)
  733. print(f'followers cache for {user_id}: {ts}')
  734. cache_users_response(followers_resp, 'followers', auth_user_id, user_id = user_id, pagination_token=pagination_token)
  735. with open(f'{DATA_DIR}/cache/followers_{user_id}_{ts}.json', 'wt') as f:
  736. json.dump(cleandict(asdict(followers_resp)), f, indent=2)
  737. #print(followers_resp)
  738. #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': followers_resp})
  739. #followers = list(map(lambda f: f['id'], followers_resp.get('data')))
  740. followers = followers_resp.data
  741. followers = list(map(user_model_dc, followers))
  742. followers = list(map(user_model_dc, followers_resp.data))
  743. total_count = followers_resp.meta.get('result_count')
  744. next_token = followers_resp.meta.get('next_token')
  745. collection_page = CollectionPage(
  746. id = user_id,
  747. items = followers,
  748. total_count = total_count,
  749. next_token = next_token
  750. )
  751. return collection_page
  752. def register_content_sources ():
  753. init_cache_db()
  754. register_content_source('twitter:tweets', get_tweets_collection, id_pattern='')
  755. register_content_source('twitter:tweet:', get_tweet_item, id_pattern='(?P<tweet_id>\d+)')
  756. register_content_source('twitter:tweet:', get_tweet_embed, id_pattern='(?P<tweet_id>\d+)')
  757. register_content_source('twitter:bookmarks:', get_bookmarks_feed, id_pattern='(?P<user_id>\d+)')
  758. register_content_source('twitter:feed:user:', get_user_feed, id_pattern='(?P<user_id>\d+)')
  759. register_content_source('twitter:user:', get_user, id_pattern='(?P<user_id>\d+)')
  760. register_content_source('twitter:users', get_users, id_pattern='')
  761. register_content_source('twitter:feed:reverse_chronological:user:', get_home_feed, id_pattern='(?P<user_id>\d+)')
  762. register_content_source('twitter:tweets:replies:', get_tweet_replies, id_pattern='(?P<conversation_id>\d+)')
  763. register_content_source('twitter:following:users:', get_following_users, id_pattern='(?P<user_id>\d+)')
  764. register_content_source('twitter:followers:user:', get_followers_user, id_pattern='(?P<user_id>\d+)')
  765. register_content_source('twitter:threads:user:', get_author_threads, id_pattern='(?P<user_id>\d+)')