123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326 |
- from dataclasses import replace
- from flask import g, request
- import sqlite3
- from twitter_v2.types import Tweet, TweetExpansions
- from hogumathi_app.view_model import FeedServiceUser, FeedItem, FeedItemAction, CollectionPage, PublicMetrics, Card, MediaItem
- from . import oauth2_login
- url_for = oauth2_login.url_for_with_me
- def user_model_dc (user, my_url_for=url_for):
-
- fsu = FeedServiceUser(
- id = user.id,
- name = user.name,
- username = user.username,
- created_at = user.created_at,
- description = user.description,
- preview_image_url = user.profile_image_url,
- website = user.url,
- is_verified = user.verified,
- is_protected = user.protected,
- location = user.location,
-
- url = my_url_for('twitter_v2_facade.get_profile_html', user_id=user.id),
- source_url = f'https://twitter.com/{user.username}',
-
- raw_user = user
- )
-
- return fsu
- def tweet_model_dc_vm (includes: TweetExpansions, tweet: Tweet, me, my_url_for=url_for, my_g=g, reply_depth=0, expand_path=None) -> FeedItem:
-
- # retweeted_by, avi_icon_url, display_name, handle, created_at, text
-
-
- user = list(filter(lambda u: u.id == tweet.author_id, includes.users))[0]
-
- published_by = user_model_dc(user, my_url_for=my_url_for)
-
- url = my_url_for('twitter_v2_facade.get_tweet_html', tweet_id=tweet.id, view='tweet')
- source_url = 'https://twitter.com/{}/status/{}'.format(user.username, tweet.id)
-
- avi_icon_url = my_url_for('get_image', url=user.profile_image_url)
-
- retweet_of = None
- quoted = None
- replied_to = None
-
- if tweet.referenced_tweets:
- retweet_of = list(filter(lambda r: r.type == 'retweeted', tweet.referenced_tweets))
- quoted = list(filter(lambda r: r.type == 'quoted', tweet.referenced_tweets))
- replied_to = list(filter(lambda r: r.type == 'replied_to', tweet.referenced_tweets))
-
- if reply_depth:
- if expand_path:
- expand_path += f',{tweet.id}'
- else:
- expand_path = tweet.id
-
- actions = {
- 'view_replies': FeedItemAction('twitter_v2_facade.get_tweet_html', {'tweet_id': tweet.conversation_id, 'view': 'replies', 'expand': expand_path}),
-
- 'view_thread': FeedItemAction('twitter_v2_facade.get_tweet_html', {'tweet_id': tweet.conversation_id, 'view': 'thread'}),
- 'view_conversation': FeedItemAction('twitter_v2_facade.get_tweet_html', {'tweet_id': tweet.conversation_id, 'view': 'conversation'}),
- }
-
- if reply_depth:
- vr = actions['view_replies']
- url = my_url_for(vr.route, **vr.route_params)
-
-
- is_bookmarked = None
- if me:
- cache_db = sqlite3.connect('.data/twitter_v2_cache.db')
- auth_user_id = me[len('twitter:'):]
- # this will cache deleted bookmarks. we need a next level abstraction over events / aggregate.
- is_bookmarked = cache_db.execute('select count(*) from tweet t, query q where q.rowid = t.query_id and q.query_type=? and t.id=? and q.auth_user_id=?', ['bookmarks', tweet.id, auth_user_id]).fetchone()[0] and True
- cache_db.close()
-
- if my_g.get('twitter_user'):
- actions.update(
- retweet = FeedItemAction('twitter_v2_facade.post_tweet_retweet', {'tweet_id': tweet.id})
- )
- if is_bookmarked:
- actions.update(
- delete_bookmark = FeedItemAction('twitter_v2_facade.delete_tweet_bookmark', {'tweet_id': tweet.id})
- )
- else:
- actions.update(
- bookmark = FeedItemAction('twitter_v2_facade.post_tweet_bookmark', {'tweet_id': tweet.id})
- )
-
- if my_g.get('twitter_live_enabled'):
- actions.update(
- view_activity = FeedItemAction('twitter_v2_live_facade.get_tweet_activity_html', {'tweet_id': tweet.id})
- )
-
-
- t = FeedItem(
- id = tweet.id,
- text = tweet.text,
- created_at = tweet.created_at,
- published_by = published_by,
- author_is_verified = user.verified,
- url = url,
-
- conversation_id = tweet.conversation_id,
-
- avi_icon_url = avi_icon_url,
-
- display_name = user.name,
- handle = user.username,
-
- author_url = my_url_for('twitter_v2_facade.get_profile_html', user_id=user.id),
- author_id = user.id,
-
- source_url = source_url,
- source_author_url = 'https://twitter.com/{}'.format(user.username),
- #'is_edited': len(tweet['edit_history_tweet_ids']) > 1
-
- actions = actions,
- is_bookmarked = is_bookmarked
- )
-
- if reply_depth:
- t = replace(t, reply_depth = reply_depth)
-
- # HACK we should not refer to the request directly...
- if request and request.args.get('marked_reply') == str(t.id):
- t = replace(t, is_marked = True)
-
- # This is where we should put "is_bookmark", "is_liked", "is_in_collection", etc...
-
- if tweet.entities:
- if tweet.entities.urls:
- urls = list(filter(lambda u: u.title and u.description, tweet.entities.urls))
-
- if len(urls):
- url = urls[0]
- card = Card(
- display_url = url.display_url.split('/')[0],
- source_url = url.unwound_url,
- content = url.description,
- title = url.title
- )
-
- if url.images:
- print(url.images)
- card = replace(card,
- preview_image_url = my_url_for('get_image', url=url.images[1].url),
- image_url = my_url_for('get_image', url=url.images[0].url)
- )
-
- t = replace(t, card = card)
-
- if tweet.public_metrics:
- public_metrics = PublicMetrics(
- reply_count = tweet.public_metrics.reply_count,
- quote_count = tweet.public_metrics.quote_count,
- retweet_count = tweet.public_metrics.retweet_count,
- like_count = tweet.public_metrics.like_count,
- impression_count = tweet.public_metrics.impression_count,
- bookmark_count = tweet.public_metrics.bookmark_count,
-
- )
-
- t = replace(t, public_metrics = public_metrics)
-
- if tweet.non_public_metrics:
- non_public_metrics = NonPublicMetrics(
- impression_count = tweet.non_public_metrics.impression_count,
- user_profile_clicks = tweet.non_public_metrics.user_profile_clicks,
- url_link_clicks = tweet.non_public_metrics.url_link_clicks
- )
-
- t = replace(t, non_public_metrics = non_public_metrics)
-
- if retweet_of and len(retweet_of):
- print('found retweet_of')
- t = replace(t, retweeted_tweet_id = retweet_of[0].id)
-
- retweeted_tweet:Tweet = list(filter(lambda t: t.id == retweet_of[0].id, includes.tweets))[0]
-
- rt = tweet_model_dc_vm(includes, retweeted_tweet, me)
-
- t = replace(rt,
- retweeted_tweet_id = retweet_of[0].id,
- source_retweeted_by_url = 'https://twitter.com/{}'.format(user.username),
- retweeted_by = user.name,
- retweeted_by_url = my_url_for('twitter_v2_facade.get_profile_html', user_id=user.id)
- )
-
-
- try:
- if tweet.attachments and tweet.attachments.media_keys and includes.media:
-
- media_keys = tweet.attachments.media_keys
-
- def first_media (mk):
- medias = list(filter(lambda m: m.media_key == mk, includes.media))
- if len(medias):
- return medias[0]
- return None
-
- media = list(filter(lambda m: m != None, map(first_media, media_keys)))
-
- photos = filter(lambda m: m.type == 'photo', media)
- videos = filter(lambda m: m.type == 'video', media)
-
- photo_media = map(lambda p: MediaItem(
- media_key = p.media_key,
- type = 'photo',
- preview_image_url = my_url_for('get_image', url=p.url + '?name=tiny&format=webp'),
- url = my_url_for('get_image', url=p.url),
- width = p.width,
- height = p.height
- ), photos)
-
- def video_to_mi (v):
- use_hls = False # mainly iOS
- max_bitrate = 100000000
-
- if use_hls:
- variants = list(filter(lambda var: var.content_type == 'application/x-mpegURL'))
- else:
-
- variants = list(filter(lambda var: var.content_type != 'application/x-mpegURL' and var.bit_rate <= max_bitrate, v.variants))
-
- variants.sort(key=lambda v: v.bit_rate, reverse=True)
-
- url = None
- content_type = None
- size = None
- if len(variants):
- if len(variants) > 1:
- print('multiple qualifying variants (using first):')
- print(variants)
- variant = variants[0]
-
- url = my_url_for('get_image', url=variant.url)
- content_type = variant.content_type
- size = int(v.duration_ms / 1000 * variant.bit_rate)
-
- public_metrics = None
- if v.public_metrics and v.public_metrics.view_count:
- public_metrics = PublicMetrics(
- view_count = v.public_metrics.view_count
- )
-
- mi = MediaItem(
- media_key = v.media_key,
- type = 'video',
- preview_image_url = my_url_for('get_image', url=v.preview_image_url + '?name=tiny&format=webp'),
- image_url = my_url_for('get_image', url=v.preview_image_url),
- width = v.width,
- height = v.height,
- url=url,
- content_type = content_type,
- duration_ms = v.duration_ms,
- size = size,
- public_metrics = public_metrics
- )
-
- return mi
-
- video_media = map(video_to_mi, videos)
-
-
-
- t = replace(t,
- photos = list(photo_media),
- videos = list(video_media)
- )
-
- elif tweet.attachments and tweet.attachments.media_keys and not includes.media:
- print('tweet had attachments and media keys, but no expansion media content was given')
- print(tweet.attachments.media_keys)
-
- except:
- # it seems like this comes when we have a retweeted tweet with media on it.
- print('exception adding attachments to tweet:')
- print(tweet)
- print('view tweet:')
- print(t)
- print('included media:')
- print(includes.media)
-
- raise 'exception adding attachments to tweet'
-
-
-
- try:
- if quoted and len(quoted):
- t = replace(t, quoted_tweet_id = quoted[0].id)
-
- quoted_tweets = list(filter(lambda t: t.id == quoted[0].id, includes.tweets))
-
- if len(quoted_tweets):
- t = replace(t, quoted_tweet = tweet_model_dc_vm(includes, quoted_tweets[0], me))
- except:
- raise 'error adding quoted tweet'
-
- try:
- if replied_to and len(replied_to) and includes.tweets:
- t = replace(t, replied_tweet_id = replied_to[0].id)
-
- if reply_depth < 1:
-
- replied_tweets = list(filter(lambda t: t.id == replied_to[0].id, includes.tweets))
- if len(replied_tweets):
- t = replace(t, replied_tweet = tweet_model_dc_vm(includes, replied_tweets[0], me, reply_depth=reply_depth + 1))
- else:
- print("No replied tweet found (t={}, rep={})".format(t.id, t.replied_tweet_id))
- except:
- raise 'error adding replied_to tweet'
-
- return t
|