from dataclasses import replace from flask import g, request import sqlite3 from twitter_v2.types import Tweet, TweetExpansions from hogumathi_app.view_model import FeedServiceUser, FeedItem, FeedItemAction, CollectionPage, PublicMetrics, Card, MediaItem from . import oauth2_login url_for = oauth2_login.url_for_with_me def user_model_dc (user, my_url_for=url_for): fsu = FeedServiceUser( id = user.id, name = user.name, username = user.username, created_at = user.created_at, description = user.description, preview_image_url = user.profile_image_url, website = user.url, is_verified = user.verified, is_protected = user.protected, location = user.location, url = my_url_for('twitter_v2_facade.get_profile_html', user_id=user.id), source_url = f'https://twitter.com/{user.username}', raw_user = user ) return fsu def tweet_model_dc_vm (includes: TweetExpansions, tweet: Tweet, me, my_url_for=url_for, my_g=g, reply_depth=0, expand_path=None) -> FeedItem: # retweeted_by, avi_icon_url, display_name, handle, created_at, text user = list(filter(lambda u: u.id == tweet.author_id, includes.users))[0] published_by = user_model_dc(user, my_url_for=my_url_for) url = my_url_for('twitter_v2_facade.get_tweet_html', tweet_id=tweet.id, view='tweet') source_url = 'https://twitter.com/{}/status/{}'.format(user.username, tweet.id) avi_icon_url = my_url_for('get_image', url=user.profile_image_url) retweet_of = None quoted = None replied_to = None if tweet.referenced_tweets: retweet_of = list(filter(lambda r: r.type == 'retweeted', tweet.referenced_tweets)) quoted = list(filter(lambda r: r.type == 'quoted', tweet.referenced_tweets)) replied_to = list(filter(lambda r: r.type == 'replied_to', tweet.referenced_tweets)) if reply_depth: if expand_path: expand_path += f',{tweet.id}' else: expand_path = tweet.id actions = { 'view_replies': FeedItemAction('twitter_v2_facade.get_tweet_html', {'tweet_id': tweet.conversation_id, 'view': 'replies', 'expand': expand_path}), 'view_thread': FeedItemAction('twitter_v2_facade.get_tweet_html', {'tweet_id': tweet.conversation_id, 'view': 'thread'}), 'view_conversation': FeedItemAction('twitter_v2_facade.get_tweet_html', {'tweet_id': tweet.conversation_id, 'view': 'conversation'}), } if reply_depth: vr = actions['view_replies'] url = my_url_for(vr.route, **vr.route_params) is_bookmarked = None if me: cache_db = sqlite3.connect('.data/twitter_v2_cache.db') auth_user_id = me[len('twitter:'):] # this will cache deleted bookmarks. we need a next level abstraction over events / aggregate. is_bookmarked = cache_db.execute('select count(*) from tweet t, query q where q.rowid = t.query_id and q.query_type=? and t.id=? and q.auth_user_id=?', ['bookmarks', tweet.id, auth_user_id]).fetchone()[0] and True cache_db.close() if my_g.get('twitter_user'): actions.update( retweet = FeedItemAction('twitter_v2_facade.post_tweet_retweet', {'tweet_id': tweet.id}) ) if is_bookmarked: actions.update( delete_bookmark = FeedItemAction('twitter_v2_facade.delete_tweet_bookmark', {'tweet_id': tweet.id}) ) else: actions.update( bookmark = FeedItemAction('twitter_v2_facade.post_tweet_bookmark', {'tweet_id': tweet.id}) ) if my_g.get('twitter_live_enabled'): actions.update( view_activity = FeedItemAction('twitter_v2_live_facade.get_tweet_activity_html', {'tweet_id': tweet.id}) ) t = FeedItem( id = tweet.id, text = tweet.text, created_at = tweet.created_at, published_by = published_by, author_is_verified = user.verified, url = url, conversation_id = tweet.conversation_id, avi_icon_url = avi_icon_url, display_name = user.name, handle = user.username, author_url = my_url_for('twitter_v2_facade.get_profile_html', user_id=user.id), author_id = user.id, source_url = source_url, source_author_url = 'https://twitter.com/{}'.format(user.username), #'is_edited': len(tweet['edit_history_tweet_ids']) > 1 actions = actions, is_bookmarked = is_bookmarked ) if reply_depth: t = replace(t, reply_depth = reply_depth) # HACK we should not refer to the request directly... if request and request.args.get('marked_reply') == str(t.id): t = replace(t, is_marked = True) # This is where we should put "is_bookmark", "is_liked", "is_in_collection", etc... if tweet.entities: if tweet.entities.urls: urls = list(filter(lambda u: u.title and u.description, tweet.entities.urls)) if len(urls): url = urls[0] card = Card( display_url = url.display_url.split('/')[0], source_url = url.unwound_url, content = url.description, title = url.title ) if url.images: print(url.images) card = replace(card, preview_image_url = my_url_for('get_image', url=url.images[1].url), image_url = my_url_for('get_image', url=url.images[0].url) ) t = replace(t, card = card) if tweet.public_metrics: public_metrics = PublicMetrics( reply_count = tweet.public_metrics.reply_count, quote_count = tweet.public_metrics.quote_count, retweet_count = tweet.public_metrics.retweet_count, like_count = tweet.public_metrics.like_count, impression_count = tweet.public_metrics.impression_count, bookmark_count = tweet.public_metrics.bookmark_count, ) t = replace(t, public_metrics = public_metrics) if tweet.non_public_metrics: non_public_metrics = NonPublicMetrics( impression_count = tweet.non_public_metrics.impression_count, user_profile_clicks = tweet.non_public_metrics.user_profile_clicks, url_link_clicks = tweet.non_public_metrics.url_link_clicks ) t = replace(t, non_public_metrics = non_public_metrics) if retweet_of and len(retweet_of): print('found retweet_of') t = replace(t, retweeted_tweet_id = retweet_of[0].id) retweeted_tweet:Tweet = list(filter(lambda t: t.id == retweet_of[0].id, includes.tweets))[0] rt = tweet_model_dc_vm(includes, retweeted_tweet, me) t = replace(rt, retweeted_tweet_id = retweet_of[0].id, source_retweeted_by_url = 'https://twitter.com/{}'.format(user.username), retweeted_by = user.name, retweeted_by_url = my_url_for('twitter_v2_facade.get_profile_html', user_id=user.id) ) try: if tweet.attachments and tweet.attachments.media_keys and includes.media: media_keys = tweet.attachments.media_keys def first_media (mk): medias = list(filter(lambda m: m.media_key == mk, includes.media)) if len(medias): return medias[0] return None media = list(filter(lambda m: m != None, map(first_media, media_keys))) photos = filter(lambda m: m.type == 'photo', media) videos = filter(lambda m: m.type == 'video', media) photo_media = map(lambda p: MediaItem( media_key = p.media_key, type = 'photo', preview_image_url = my_url_for('get_image', url=p.url + '?name=tiny&format=webp'), url = my_url_for('get_image', url=p.url), width = p.width, height = p.height ), photos) def video_to_mi (v): use_hls = False # mainly iOS max_bitrate = 100000000 if use_hls: variants = list(filter(lambda var: var.content_type == 'application/x-mpegURL')) else: variants = list(filter(lambda var: var.content_type != 'application/x-mpegURL' and var.bit_rate <= max_bitrate, v.variants)) variants.sort(key=lambda v: v.bit_rate, reverse=True) url = None content_type = None size = None if len(variants): if len(variants) > 1: print('multiple qualifying variants (using first):') print(variants) variant = variants[0] url = my_url_for('get_image', url=variant.url) content_type = variant.content_type size = int(v.duration_ms / 1000 * variant.bit_rate) public_metrics = None if v.public_metrics and v.public_metrics.view_count: public_metrics = PublicMetrics( view_count = v.public_metrics.view_count ) mi = MediaItem( media_key = v.media_key, type = 'video', preview_image_url = my_url_for('get_image', url=v.preview_image_url + '?name=tiny&format=webp'), image_url = my_url_for('get_image', url=v.preview_image_url), width = v.width, height = v.height, url=url, content_type = content_type, duration_ms = v.duration_ms, size = size, public_metrics = public_metrics ) return mi video_media = map(video_to_mi, videos) t = replace(t, photos = list(photo_media), videos = list(video_media) ) elif tweet.attachments and tweet.attachments.media_keys and not includes.media: print('tweet had attachments and media keys, but no expansion media content was given') print(tweet.attachments.media_keys) except: # it seems like this comes when we have a retweeted tweet with media on it. print('exception adding attachments to tweet:') print(tweet) print('view tweet:') print(t) print('included media:') print(includes.media) raise 'exception adding attachments to tweet' try: if quoted and len(quoted): t = replace(t, quoted_tweet_id = quoted[0].id) quoted_tweets = list(filter(lambda t: t.id == quoted[0].id, includes.tweets)) if len(quoted_tweets): t = replace(t, quoted_tweet = tweet_model_dc_vm(includes, quoted_tweets[0], me)) except: raise 'error adding quoted tweet' try: if replied_to and len(replied_to) and includes.tweets: t = replace(t, replied_tweet_id = replied_to[0].id) if reply_depth < 1: replied_tweets = list(filter(lambda t: t.id == replied_to[0].id, includes.tweets)) if len(replied_tweets): t = replace(t, replied_tweet = tweet_model_dc_vm(includes, replied_tweets[0], me, reply_depth=reply_depth + 1)) else: print("No replied tweet found (t={}, rep={})".format(t.id, t.replied_tweet_id)) except: raise 'error adding replied_to tweet' return t