1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105 |
- from typing import List
- from dataclasses import asdict, replace
- from dacite import from_dict
- from importlib.util import find_spec
- from configparser import ConfigParser
- import base64
- import sqlite3
- import os
- import json
- import json_stream
- from zipfile import ZipFile
- import itertools
- import time
- from io import BufferedReader
- import re
- import datetime
- import dateutil
- import dateutil.parser
- import dateutil.tz
- import requests
- from werkzeug.utils import secure_filename
- from flask import json, Response, render_template, request, send_from_directory, Blueprint, session, redirect, g, current_app, jsonify
- from flask_cors import CORS
- from twitter_v2.api import ApiV2TweetSource, TwitterApiV2SocialGraph, ApiV2ConversationSource
- from twitter_v2.types import Tweet, TweetExpansions
- from hogumathi_app.view_model import FeedItem, FeedServiceUser, ThreadItem, FeedItemAction, MediaItem, Card, PublicMetrics, NonPublicMetrics, UnrepliedSection, CollectionPage, cleandict
- from .view_model import user_model_dc, tweet_model_dc_vm
- from . import content_source
- from . import oauth2_login
- theme_variant = ''
- if find_spec('theme_bootstrap5'): # FIXME use g.
- theme_variant = '-bs'
- DATA_DIR='.data'
- twitter_app = Blueprint('twitter_v2_facade', 'twitter_v2_facade',
- static_folder='static',
- static_url_path='',
- url_prefix='/')
-
- twitter_app.register_blueprint(oauth2_login.oauth2_login, url_prefix="/")
- twitter_app.context_processor(oauth2_login.inject_me)
- twitter_app.before_request(oauth2_login.add_me)
- url_for = oauth2_login.url_for_with_me
- def run_script(script_name, script_vars):
- script_path = './{}.py'.format(script_name)
- if (os.path.exists(script_path)):
- script_file = open(script_path, 'r')
- script = script_file.read()
- script_file.close()
- try:
- return exec(script, script_vars)
- except:
- print('error running script: {}'.format(script_name))
- return False
- False
- class ActivityData:
- def __init__ (self, user_id, db_path):
-
- self.db_path = db_path
- self.user_id = user_id
-
- db_exists = os.path.exists(db_path)
-
- self.db = sqlite3.connect(db_path)
-
- if not db_exists:
- self.init_db()
-
- return
-
- def init_db (self):
-
- self.db.execute('create table seen_user (ts, user_id)')
- self.db.execute('create table seen_tweet (ts, tweet_id)')
-
- return
-
- def seen_tweet (self, tweet_id):
- return
-
- def seen_user (self, user_id):
- return
-
- def add_tweet_counts (self, user_id, start, end, tweet_count):
- return [current_ts, user_id, start, end, tweet_count]
-
- def add_tweet_public_metrics (self, tweet_id, like_count, reply_count, retweet_count, quote_count):
- return
-
- def add_tweet_non_public_metrics (self, tweet_id, impression_count, click_count, link_click_count, profile_click_count):
- return
-
- def add_user_public_metrics (self, user_id, followers_count, following_count, tweet_count, listed_count):
- return
- class DataSet:
- def __init__ (self):
- self.items = {}
- return
-
- def update_items (self, items):
- """
- merges objects by ID. Asssigns an ID if none exists. Mutates OG object.
- """
-
- ids = []
-
- for item in items:
- if not 'id' in item:
- #item = dict(item)
- item['id'] = uuid.uuid4().hex
- else:
- existing_item = self.items.get( item['id'] )
- if existing_item:
- existing_item.update(item)
- item = existing_item
- self.items[ item['id'] ] = item
- ids.append( item['id'] )
-
- return ids
-
- def get_items (self):
- return self.items.values()
- class TwitterMetadata:
- def __init__ (self, data_dir):
- self.data_dir = data_dir
-
- os.mkdir(data_dir, exist_ok=True)
-
- def get_tweet (self, tweet_id):
- path = f'{self.data_dir}/tweet_{tweet_id}.json'
-
- if not os.path.exists(path):
- return None
-
- with open(path, 'rt') as f:
- return json.loads(f.read())
-
-
- def update_tweet (self, tweet_id, fields):
- tweet = self.get_tweet(tweet_id)
-
- if not tweet:
- tweet = {'id': tweet_id}
-
- tweet.update(fields)
-
- with open(f'{self.data_dir}/tweet_{tweet_id}.json', 'wt') as f:
- f.write(json.dumps(tweet))
-
- return tweet
- #twitter_meta = TwitterMetadata('./data/meta')
- @twitter_app.route('/tweets', methods=['POST'])
- def post_tweets_create ():
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- text = request.form.get('text')
- reply_to_tweet_id = request.form.get('reply_to_tweet_id')
- quote_tweet_id = request.form.get('quote_tweet_id')
-
- tweet_source = ApiV2TweetSource(token)
- result = tweet_source.create_tweet(text, reply_to_tweet_id=reply_to_tweet_id, quote_tweet_id=quote_tweet_id)
-
- print(result)
-
- run_script('on_tweeted', {'twitter_user': g.twitter_user, 'tweet': result})
-
- if 'HX-Request' in request.headers:
- return render_template('partial/compose-form.html', new_tweet_id=result['data']['id'])
- else:
- response_body = json.dumps({
- 'result': result
- })
- return jsonify(response_body)
- @twitter_app.route('/tweet/<tweet_id>/retweet', methods=['POST'])
- def post_tweet_retweet (tweet_id):
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- tweet_source = ApiV2TweetSource(token)
- result = tweet_source.retweet(tweet_id, user_id=user_id)
-
- print(result)
-
- run_script('on_tweeted', {'twitter_user': g.twitter_user, 'retweet': result})
-
- if 'HX-Request' in request.headers:
- return """retweeted <script>Toast.fire({
- icon: 'success',
- title: 'Retweet was sent; <a style="text-align: right" href="{}">View</a>.'
- });</script>""".replace('{}', url_for('.get_tweet_html', tweet_id=tweet_id))
- else:
- response_body = json.dumps({
- 'result': result
- })
- return jsonify(response_body)
- @twitter_app.route('/tweet/<tweet_id>/bookmark', methods=['POST'])
- def post_tweet_bookmark (tweet_id):
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- tweet_source = ApiV2TweetSource(token)
- result = tweet_source.bookmark(tweet_id, user_id=user_id)
-
- print(result)
- if 'HX-Request' in request.headers:
- return """bookmarked <script>Toast.fire({
- icon: 'success',
- title: 'Tweet was bookmarked; <a style="text-align: right" href="{}">View</a>.'
- });</script>""".replace('{}', url_for('.get_tweet_html', tweet_id=tweet_id))
- else:
- response_body = json.dumps({
- 'result': result
- })
- return jsonify(response_body)
- @twitter_app.route('/tweet/<tweet_id>/bookmark', methods=['DELETE'])
- def delete_tweet_bookmark (tweet_id):
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- tweet_source = ApiV2TweetSource(token)
- result = tweet_source.delete_bookmark(tweet_id, user_id=user_id)
-
- response_body = json.dumps({
- 'result': result
- })
- return jsonify(response_body)
- @twitter_app.route('/tweet/<tweet_id>.html', methods=['GET'])
- def get_tweet_html (tweet_id):
-
- pagination_token = request.args.get('pagination_token')
- view = request.args.get('view', 'replies')
-
-
- if g.twitter_user:
- token = g.twitter_user['access_token']
- else:
- token = os.environ.get('BEARER_TOKEN')
-
-
- tweet_source = ApiV2TweetSource(token)
-
- only_replies = view == 'replies'
-
-
- tweets = []
- if not pagination_token:
- tweets_response = tweet_source.get_tweet(tweet_id, return_dataclass=True)
-
- tweet = tweets_response.data[0]
-
- tweets.append(tweet_model_dc_vm(tweets_response.includes, tweet, g.me))
-
- skip_embed_replies = False
-
- if view == 'replies':
- replies_response = tweet_source.get_thread(tweet_id,
- only_replies=True,
- pagination_token = pagination_token,
- return_dataclass=True)
- elif view == 'thread':
- skip_embed_replies = True
- replies_response = tweet_source.get_thread(tweet_id,
- only_replies=False,
- author_id=tweets[0].author_id,
- pagination_token = pagination_token,
- return_dataclass=True)
-
- elif view == 'conversation':
- replies_response = tweet_source.get_thread(tweet_id,
- only_replies=False,
- pagination_token = pagination_token,
- return_dataclass=True)
- elif view == 'tweet':
- replies_response = None
-
- next_token = None
-
- #print("conversation meta:")
- #print(json.dumps(tweets_response.get('meta'), indent=2))
-
- if replies_response and replies_response.meta and replies_response.meta.result_count:
-
- includes = replies_response.includes
- tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me, expand_path=request.args.get('expand'), reply_depth=1), replies_response.data)) + tweets
-
- next_token = replies_response.meta.next_token
-
- # this method is OK except it doesn't work if there are no replies.
- #tweets.append(tweet_model(includes, list(filter(lambda t: t['id'] == tweet_id, includes.get('tweets')))[0], me))
-
- #related_tweets = [] # derived from includes
-
- tweets.reverse()
-
-
-
- query = {}
-
- if next_token:
- query = {
- **query,
- # FIXME only_replies
- 'next_data_url': url_for('.get_tweet_html', tweet_id=tweet_id, pagination_token=next_token, only_replies = '1' if only_replies else '0', author_id = tweets[0].author_id),
- 'next_page_url': url_for('.get_tweet_html', tweet_id=tweet_id, view=view, pagination_token=next_token)
- }
-
- user = {
- }
-
- if 'HX-Request' in request.headers:
-
- # console.log(res.tweets.map(t => t.text).join("\n\n-\n\n"))
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- page_nav = [
- dict(
- href=url_for('.get_tweet_html', tweet_id=tweets[0].conversation_id, view='thread'),
- label = 'author thread',
- order = 10
- ),
- dict(
- href = url_for('.get_tweet_html', tweet_id=tweets[0].conversation_id, view='conversation'),
- label = 'full convo',
- order = 20
- )
- ]
-
- tweet = tweets_response.data[0]
- user = list(filter(lambda u: u.id == tweet.author_id, tweets_response.includes.users))[0]
-
- source_url = f'https://twitter.com/{user.username}/status/{tweet_id}'
- title = f'Tweet by {user.name} at {tweet.created_at}'
-
- opengraph_info = dict(
- type = 'webpage', # threads might be article
- url = source_url,
- title = title,
- description = tweet.text,
- image = user.profile_image_url
- )
-
-
- if view == 'replies':
- tweet = tweets[0]
-
- if tweet.id == '1608510741941989378':
- unreplied = [
- UnrepliedSection(
- description = "Not clear what GS is still.",
- span = (40, 80)
- )
- ]
- tweet = replace(tweet,
- unreplied = unreplied
- )
-
- expand_parts = request.args.get('expand')
- if expand_parts:
- expand_parts = expand_parts.split(',')
-
- def reply_to_thread_item (fi):
- nonlocal expand_parts
-
- if fi.id == '1609714342211244038':
- print(f'reply_to_thread_item id={fi.id}')
- unreplied = [
- UnrepliedSection(
- description = "Is there proof of this claim?",
- span = (40, 80)
- )
- ]
- fi = replace(fi,
- unreplied = unreplied
- )
-
- children = None
-
- if expand_parts and len(expand_parts) and fi.id == expand_parts[0]:
- expand_parts = expand_parts[1:]
-
- print(f'getting expanded replied for tweet={fi.id}')
-
- expanded_replies_response = tweet_source.get_thread(fi.id,
- only_replies=True,
- return_dataclass=True)
- if expanded_replies_response.data:
- print('we got expanded responses data')
-
- children = list(map(lambda t: tweet_model_dc_vm(expanded_replies_response.includes, t, g.me, expand_path=request.args.get('expand'), reply_depth=1), expanded_replies_response.data))
- children = list(map(reply_to_thread_item, children))
-
-
- return ThreadItem(feed_item=fi, children=children)
-
- children = list(map(reply_to_thread_item, tweets[1:]))
-
- root = ThreadItem(
- feed_item = tweet,
- children = children
- )
- return render_template('tweet-thread.html', user = user, root = root, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info)
- else:
- return render_template(f'tweet-collection{theme_variant}.html', user = user, tweets = tweets, query = query, page_nav=page_nav, skip_embed_replies=skip_embed_replies, opengraph_info=opengraph_info)
- @twitter_app.route('/tweet2/<tweet_id>.html', methods=['GET'])
- def get_tweet2_html (tweet_id):
-
- me = g.me
- use_embed = int(request.args.get('embed', 0))
- pagination_token = request.args.get('pagination_token')
- view = request.args.get('view', 'replies')
-
-
- if g.twitter_user:
- token = g.twitter_user['access_token']
- else:
- token = os.environ.get('BEARER_TOKEN')
-
-
- tweets = []
- if not pagination_token:
- if use_embed:
- tweet = get_content(f'twitter:tweet:{tweet_id}', content_source_id='twitter_v2_facade.content_source:get_tweet_embed')
- tweets.append(tweet)
- else:
- tweet_page = get_content(f'twitter:tweet:{tweet_id}', me=me)
- tweets.append(tweet_page.items[0])
-
-
- return render_template(f'tweet-collection{theme_variant}.html', user = {}, tweets = tweets, query = {})
- @twitter_app.route('/followers/<user_id>.html', methods=['GET'])
- def get_followers_html (user_id):
-
- me = g.me
-
- content_params = cleandict({
- 'max_results': int(request.args.get('max_results', 1000)),
- 'pagination_token': request.args.get('pagination_token')
- })
-
- followers_page = get_content(f'twitter:followers:user:{user_id}', me=me, **content_params)
-
- followers = followers_page.items
-
- content_params['pagination_token'] = followers_page.next_token
-
- query = {
- 'next_data_url': url_for('.get_followers_html', me=me, user_id=user_id, **content_params)
- }
-
- if 'HX-Request' in request.headers:
- return render_template('partial/users-list.html', users=followers, query=query)
- else:
- return render_template('followers.html', users=followers, query=query)
-
- @twitter_app.route('/following/<user_id>.html', methods=['GET'])
- def get_following_html (user_id):
-
- me = g.me
-
- content_params = cleandict({
- 'max_results': int(request.args.get('max_results', 1000)),
- 'pagination_token': request.args.get('pagination_token')
- })
-
- following_page = get_content(f'twitter:following:users:{user_id}', me=me, **content_params)
-
- following = following_page.items
-
- content_params['pagination_token'] = following_page.next_token
-
- query = {
- 'next_data_url': url_for('.get_following_html', me=me, user_id=user_id, **content_params)
- }
-
- if 'HX-Request' in request.headers:
- return render_template('partial/users-list.html', users=following, query=query)
- else:
- return render_template('following.html', users=following, query=query)
-
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- # HTMx partials
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- def tweet_paginated_timeline ():
- return
- @twitter_app.route('/data/tweets/user/<user_id>/media', methods=['GET'])
- def get_data_tweets_media (user_id):
- """
- Not used anywhere... trying an idea. tweet_model needs to be updated.
- """
- token = g.twitter_user['access_token']
-
- pagination_token = request.args.get('pagination_token')
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_media_tweets(author_id=user_id,
- has_images=True,
- is_reply=False,
- is_retweet=False,
- pagination_token = pagination_token)
-
-
- includes = response_json.get('includes')
- tweets = list(map(lambda t: tweet_model(includes, t, g.me), response_json['data']))
- next_token = response_json.get('meta').get('next_token')
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_data_tweets_media', user_id=user_id, pagination_token=next_token)
- }
-
- if 'HX-Request' in request.headers:
- user = {
- 'id': user_id
- }
-
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query)
- else:
- response_body = json.dumps({
- 'data': tweets,
- 'query': query
- })
- return jsonify(response_body)
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- # HTMx views
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- @twitter_app.route('/latest.html', methods=['GET'])
- def get_timeline_home_html (variant = "reverse_chronological", pagination_token=None):
-
- if not g.twitter_user:
- return 'need to login. go to /login.html', 403
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- if not pagination_token:
- pagination_token = request.args.get('pagination_token')
-
- output_format = request.args.get('format', 'html')
-
- tq = cleandict({
- 'pagination_token': pagination_token,
- 'since_id': request.args.get('since_id'),
- 'until_id': request.args.get('until_id'),
- 'end_time': request.args.get('end_time'),
- 'start_time': request.args.get('start_time')
- })
-
- timeline_page = get_content(f'twitter:feed:reverse_chronological:user:{user_id}', me=g.me, **tq)
-
- next_token = timeline_page.next_token
- tweets = timeline_page.items
-
- tq['pagination_token'] = next_token
-
- query = {
- **tq,
- 'format': output_format,
- 'me': g.me
- }
-
- if next_token:
- query = {
- **query,
-
- #'next_data_url': url_for('.get_data_timeline_home', variant=variant, pagination_token=next_token),
- 'next_data_url': url_for('.get_timeline_home_html', **tq),
- 'next_page_url': url_for('.get_timeline_home_html', **tq)
- }
-
- user = {
- 'id': user_id
- }
-
-
- if output_format == 'feed.json':
- return jsonify(cleandict({
- 'data': tweets,
- 'query': query
- }))
- elif 'HX-Request' in request.headers:
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query, show_thread_controls=True)
- else:
- return render_template('tweet-collection.html', user = user, tweets = tweets, query = query, show_thread_controls=True)
- @twitter_app.route('/bookmarks.html', methods=['GET'])
- def get_bookmarks2_html ():
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- pagination_token = request.args.get('pagination_token')
- max_results = int(request.args.get('limit', 10))
-
- collection_page = get_content(f'twitter:bookmarks:{user_id}', pagination_token=pagination_token, max_results=max_results)
- tweets = collection_page.items
-
- next_token = collection_page.next_token
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_bookmarks2_html', user_id=user_id, pagination_token=next_token, limit=max_results),
- 'next_page_url': url_for('.get_bookmarks2_html', user_id=user_id, pagination_token=next_token, limit=max_results)
- }
-
- user = {
- 'id': user_id
- }
-
-
- if 'HX-Request' in request.headers:
- return render_template(f'partial/tweets-timeline{theme_variant}.html', user = user, tweets = tweets, query = query)
- else:
- return render_template(f'tweet-collection{theme_variant}.html', user = user, tweets = tweets, query = query)
-
- #@twitter_app.route('/bookmarks.html', methods=['GET'])
- def get_bookmarks_old_html ():
-
- user_id = g.twitter_user['id']
- token = g.twitter_user['access_token']
-
- pagination_token = request.args.get('pagination_token')
- max_results = int(request.args.get('limit', 10))
-
- tweet_source = ApiV2TweetSource(token)
- response_json = tweet_source.get_bookmarks(user_id,
- pagination_token = pagination_token, return_dataclass=True,
- max_results=max_results
- )
-
- #print(response_json)
-
- includes = response_json.includes
- tweets = list(map(lambda t: tweet_model_dc_vm(includes, t, g.me), response_json.data))
- next_token = response_json.meta.next_token
-
- query = {}
-
- if next_token:
- query = {
- **query,
-
- 'next_data_url': url_for('.get_bookmarks_html', user_id=user_id, pagination_token=next_token, limit=max_results),
- 'next_page_url': url_for('.get_bookmarks_html', user_id=user_id, pagination_token=next_token, limit=max_results)
- }
-
- user = {
- 'id': user_id
- }
-
- ts = int(time.time() * 1000)
- with open(f'{DATA_DIR}/cache/bookmarks_{user_id}_{ts}_{pagination_token}.json', 'wt') as f:
- f.write(json.dumps(response_json))
-
-
- if 'HX-Request' in request.headers:
- return render_template(f'partial/tweets-timeline{theme_variant}.html', user = user, tweets = tweets, query = query)
- else:
- return render_template(f'tweet-collection{theme_variant}.html', user = user, tweets = tweets, query = query)
- from hogumathi_app.content_system import get_content
- @twitter_app.route('/profile/<user_id>/threads.html', methods=['GET'])
- def get_threads_html (user_id):
- category = request.args.get('category')
-
- collection = get_content(f'twitter:threads:user:{user_id}')
-
- print(collection)
-
- return 'ok'
- @twitter_app.route('/profile/<user_id>.html', methods=['GET'])
- def get_profile_html (user_id):
- me = g.get('me')
-
- if g.twitter_user:
- token = g.twitter_user['access_token']
- # issue: retweets don't come back if we request non_public_metrics
- is_me = False and user_id == g.twitter_user['id']
- else:
- token = os.environ.get('BEARER_TOKEN')
- is_me = False
-
- output_format = request.args.get('format', 'html')
-
- pagination_token = request.args.get('pagination_token')
- exclude_replies = int(request.args.get('exclude_replies', 0))
- exclude_retweets = int(request.args.get('exclude_retweets', 0))
- max_results = int(request.args.get('limit', 10))
- since_id = request.args.get('since_id')
- until_id = request.args.get('until_id')
- start_time = request.args.get('start_time')
- end_time = request.args.get('end_time')
-
- query = cleandict({
- 'pagination_token': pagination_token,
- 'exclude_replies': exclude_replies,
- 'exclude_retweets': exclude_retweets,
- 'max_results': max_results,
- 'since_id': since_id,
- 'until_id': until_id,
- 'start_time': start_time,
- 'end_time': end_time
- })
-
- collection_page = get_content(f'twitter:feed:user:{user_id}', me=me, **query)
-
- tweets = collection_page.items
- next_token = collection_page.next_token
-
- # FIXME janky
- query['pagination_token'] = next_token
-
- if next_token:
- query = {
- **query,
-
- 'format': output_format,
-
- 'next_data_url': url_for('.get_profile_html', user_id=user_id, **query),
- 'next_page_url': url_for('.get_profile_html', user_id=user_id , **query)
- }
-
-
-
- if output_format == 'feed.json':
- return jsonify(cleandict({
- 'data': tweets,
- 'query': query
- }))
- elif 'HX-Request' in request.headers:
- profile_user = {
- 'id': user_id
- }
- return render_template(f'partial/tweets-timeline{theme_variant}.html', user = profile_user, tweets = tweets, query = query)
- else:
- # FIXME the user is probably present in the tweet expansions info.
-
- #social_graph = TwitterApiV2SocialGraph(token)
- #users_response = social_graph.get_user(user_id)
-
- #print(users_response)
-
- #user = users_response['data'][0]
-
- user = get_content(f'twitter:user:{user_id}', me=me)
-
- title = f'{user.name} ({user.username})'
-
- # FIXME official Twitter or owner's instance?
- source_url = f'https://www.twitter.com/{user.username}'
-
- opengraph_info = dict(
- type = 'webpage', # threads might be article
- url = source_url,
- title = title,
- description = user.description,
- image = user.avatar_image_url
- )
- page_nav = [
- dict(
- href = url_for('twitter_v2_facade.get_profile_html', user_id=user.id),
- label = 'Timeline',
- order = 10,
- ),
- dict (
- href = url_for('twitter_v2_facade.get_following_html', user_id=user.id),
- label = 'Following',
- order = 40,
- ),
- dict (
- href = url_for('twitter_v2_facade.get_followers_html', user_id=user.id),
- label = 'Followers',
- order = 50,
- ),
- dict (
- href = url_for('twitter_v2_facade.get_threads_html', user_id=user.id),
- label = 'Threads',
- order = 55,
- )
- ]
-
- if not g.twitter_user:
- for uid, acct in session.items():
- if uid.startswith('twitter:'):
- page_nav += [
- dict(
- href = url_for('twitter_v2_facade.get_profile_html', user_id=user_id, me=uid, **query),
- label = f'View as {acct["id"]}',
- order = 1000,
- )
- ]
- if g.twitter_live_enabled:
- page_nav += [
- dict(
- href = url_for('twitter_v2_live_facade.get_likes_html', user_id=user.id),
- label = 'Likes',
- order = 20,
- ),
- dict (
- href = url_for('twitter_v2_live_facade.get_mentions_html', user_id=user.id),
- label = 'Mentions',
- order = 30,
- ),
- dict (
- href = url_for('twitter_v2_live_facade.get_user_activity_html', user_id=user.id),
- label = 'Activity',
- order = 60,
- )
- ]
-
- top8 = get_top8(user_id)
-
- brand = None
- brand_info = {}
- if g.twitter_live_enabled:
- brand = get_content(f'brand:search:account:twitter:{user_id}', expand=False)
-
- if brand:
- page_nav += [
- dict(
- href = url_for('brands.get_brand_html', brand_id=brand['id']),
- label = 'Brand Page',
- order = 5000,
- )
- ]
- brand_info = brand.get('expanded', {})
-
- return render_template(f'user-profile{theme_variant}.html',
- user = user.raw_user,
- user_dc = user,
- tweets = tweets,
- query = query,
- opengraph_info=opengraph_info,
- page_nav = page_nav,
- top8=top8,
- brand=brand,
- **brand_info
- )
- @twitter_app.route('/users.html', methods=['GET'])
- def get_users ():
-
- ids = request.args.get('ids')
-
- if not ids:
- return 'supply ids=', 400
-
- token = g.twitter_user['access_token']
-
- tweet_source = TwitterApiV2SocialGraph(token)
- response_json = tweet_source.get_users(ids)
-
- ts = int(time.time() * 1000)
- with open(f'{DATA_DIR}/cache/users_{ts}.json', 'wt') as f:
- f.write(json.dumps(response_json))
-
- #print(response_json)
- #run_script('on_user_seen', {'twitter_user': g.twitter_user, 'users': response_json})
-
- #following = list(map(lambda f: f['id'], response_json.get('data')))
- users = response_json.get('data')
-
- return render_template('following.html', users=users)
-
- @twitter_app.route('/media/upload', methods=['POST'])
- def post_media_upload ():
-
- token = g.twitter_user['access_token']
- form = {
- 'media_category': 'tweet_image'
- }
-
- headers = {
- 'Authorization': 'Bearer {}'.format(token)
- }
-
- url = 'http://localhost:5004/twitter/fake-twitter/media/upload'
- #url = 'https://upload.twitter.com/1.1/media/upload.json' # .json
-
-
-
- upload_media = {}
- for e in request.files.items():
- media_name = e[0]
- f = e[1]
-
- print('.')
-
- files = {'media': [secure_filename(f.filename), BufferedReader(f), f.content_type]}
- response = requests.post(url, files=files, data=form, headers=headers)
-
- print(response.status_code)
- print(response.text)
-
- response_json = json.loads(response.text)
-
- upload_media[media_name] = response_json
-
- return jsonify({'upload_media': upload_media})
- @twitter_app.route('/fake-twitter/media/upload', methods=['POST'])
- def post_media_upload2 ():
- print(request.content_type)
-
- f = request.files.get('media')
-
- f.seek(0,2)
- media_size = f.tell()
- media = {
- #'_auth': request.headers.get('Authorization'),
- 'media_key': '3_{}'.format(secure_filename(f.filename)),
- 'media_id': secure_filename(f.filename),
- 'size': media_size,
- 'expires_after_secs': 86400,
- 'image': {
- 'image_type': f.content_type,
- 'w': 1,
- 'h': 1
- }
- }
- return jsonify(media)
- def get_nav_items ():
-
- nav_items = [
-
- ]
-
- twitter_user = g.get('twitter_user')
- me = g.get('me')
-
- if twitter_user:
- nav_items += [
- dict(
- href = url_for('twitter_v2_facade.get_timeline_home_html'),
- label = 'Latest Tweets',
- order = 0
- ),
- dict (
- href = url_for('twitter_v2_facade.get_bookmarks2_html'),
- label = 'Bookmarks',
- order = 100
- ),
- dict (
- href = url_for('twitter_v2_facade.get_profile_html', user_id=twitter_user['id']),
- label = 'My Profile',
- order = 200
- ),
- dict (
- href = url_for('twitter_v2_facade.oauth2_login.get_logout_html'),
- label = f'Logout ({me})',
- order = 1000
- )
- ]
-
- if g.get('twitter_live_enabled'):
- nav_items += [
- dict (
- href = url_for('twitter_v2_live_facade.get_conversations_html'),
- label = 'DMs',
- order = 10
- ),
- dict (
- href = url_for('twitter_v2_live_facade.get_mentions_html', user_id=twitter_user['id']),
- label = 'Mentions',
- order = 20
- )
- ]
-
- return nav_items
- @twitter_app.before_request
- def add_module_nav_items_to_template_context ():
- g.module_nav = get_nav_items()
-
- def get_top8 (user_id):
- if user_id != '14520320':
- return
-
- return [
- dict(
- id='14520320'
- ),
- dict(
- id='14520320'
- ),
- dict(
- id='14520320'
- ),
- dict(
- id='14520320'
- ),
-
- dict(
- id='14520320'
- ),
- dict(
- id='14520320'
- ),
- dict(
- id='14520320'
- ),
- dict(
- id='14520320'
- ),
- ]
-
|