123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501 |
- from configparser import ConfigParser
- import base64
- from flask import json, Response, render_template, request, send_from_directory, Blueprint, url_for, session, redirect, g
- from flask_cors import CORS
- import sqlite3
- import os
- import json
- import json_stream
- from zipfile import ZipFile
- import itertools
- from urllib.parse import urlencode as urlencode_qs
- import datetime
- import dateutil
- import dateutil.parser
- import dateutil.tz
- import requests
- from mastodon_source import MastodonAPSource
- from mastodon_v2_types import Status
- from view_model import FeedItem, FeedItemAction, PublicMetrics
- DATA_DIR='.data'
- twitter_app = Blueprint('mastodon_facade', 'mastodon_facade',
- static_folder='static',
- static_url_path='',
- url_prefix='/')
- @twitter_app.before_request
- def add_module_nav_to_template_context ():
-
- me = request.args.get('me')
-
- if me and me.startswith('mastodon:'):
- youtube_user = session[ me ]
-
- g.module_nav = [
- dict(
- href = url_for('.get_timeline_home_html', me=me),
- label = 'Public Timeline',
- order = 100
- ),
- dict(
- href = url_for('.get_bookmarks_html', me=me),
- label = 'Bookmarks',
- order = 200
- )
- ]
- def mastodon_model_dc_vm (post_data: Status) -> FeedItem:
- """
- This is the method we should use. The others should be refactored out.
- """
-
- user = post_data.account
-
- source_url = post_data.url
- avi_icon_url = user.avatar
- url = url_for('mastodon_facade.get_tweet_html', tweet_id=post_data.id)
-
- actions = {
- 'bookmark': FeedItemAction('mastodon_facade.post_tweet_bookmark', {'tweet_id': post_data.id}),
- 'delete_bookmark': FeedItemAction('mastodon_facade.delete_tweet_bookmark', {'tweet_id': post_data.id}),
-
- 'retweet': FeedItemAction('mastodon_facade.post_tweet_retweet', {'tweet_id': post_data.id})
- }
-
- t = FeedItem(
- id = post_data.id,
- # hugely not a fan of allowing the server's HTML out. can we sanitize it ourselves?
- html = post_data.content,
- url = url,
- source_url = source_url,
- created_at = post_data.created_at,
- avi_icon_url = avi_icon_url,
- display_name = user.display_name,
- handle = user.acct,
-
- author_url = url_for('mastodon_facade.get_profile_html', user_id = user.id, me='mastodon:mastodon.cloud:109271381872332822'),
- source_author_url = user.url,
-
- public_metrics = PublicMetrics(
- reply_count = post_data.replies_count,
- retweet_count = post_data.reblogs_count,
- like_count = post_data.favourites_count
- ),
-
- actions = actions,
-
- debug_source_data = post_data
- )
-
- return t
- def register_app (instance):
- endpoint_url = g.app_url
- redirect_uri = endpoint_url + og_url_for('.get_loggedin_html')
-
- client_name = os.environ.get('MASTODON_CLIENT_NAME')
-
- url = f'https://{instance}/api/v1/apps'
-
- params = {
- 'client_name': client_name,
- 'redirect_uris': redirect_uri,
- 'scopes': 'read write'
- }
-
- resp = requests.post(url, params=params)
-
- with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'wt') as f:
- f.write(resp.text)
-
- return resp
- @twitter_app.get('/api/app/<instance>/register')
- def post_api_app_instance_register (instance):
-
- if not instance:
- return 'pass isntance= in request params', 400
-
- resp = register_app(instance)
-
- return resp.text, resp.status_code
- @twitter_app.get('/api/app/<instance>')
- def get_api_app_instance (instance):
-
- with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'rt') as f:
- return Response(f.read(), mimetype='application/json')
- @twitter_app.get('/login.html')
- def get_login_html ():
- instance = request.args.get('instance')
- force_login = request.args.get('force_login')
-
- if not instance:
- return 'provide instance= in query string.', 400
-
- if not os.path.exists(f'{DATA_DIR}/mastodon-client_{instance}.json'):
- resp = register_app(instance)
-
- print(resp)
-
- with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'rt') as f:
- app_info = json.loads(f.read())
-
- params = {
- 'client_id': app_info['client_id'],
- 'redirect_uri': app_info['redirect_uri'],
- 'scope': 'read write',
-
- 'response_type': 'code'
- }
-
- if force_login:
- params['force_login'] = force_login
-
- url = f'https://{instance}/oauth/authorize?' + urlencode_qs(params)
-
- return redirect(url)
- @twitter_app.get('/logged-in.html')
- def get_logged_in_html ():
- code = request.args.get('code')
- instance = request.args.get('instance', 'mastodon.cloud')
-
- if not instance:
- return 'provide instance= in query string.', 400
-
- with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'rt') as f:
- app_info = json.loads(f.read())
-
- params = {
- 'client_id': app_info['client_id'],
- 'client_secret': app_info['client_secret'],
-
- 'redirect_uri': app_info['redirect_uri'],
- 'scope': 'read write',
-
- 'grant_type': 'authorization_code',
- 'code': code,
- # force_login: True
- }
-
- url = f'https://{instance}/oauth/token'
-
- resp = requests.post(url, data=params)
-
- auth_info = json.loads(resp.text)
-
-
- #auth_info = {"access_token":"6C6-hD2_OK1vDFc_WcPQnZC5KL0jOUePgQgMQELeV0k","token_type":"Bearer","scope":"read write","created_at":1670088545}
-
- access_token = auth_info['access_token']
-
-
- url = f'https://{instance}/api/v1/accounts/verify_credentials'
-
-
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
-
- resp = requests.get(url, headers=headers)
-
- mastodon_user = json.loads(resp.text)
-
- me = f'mastodon:{instance}:{mastodon_user["id"]}'
-
- session_info = {
- **auth_info,
-
- 'instance': instance,
-
- 'id': mastodon_user['id'],
- 'acct': mastodon_user['acct'],
- 'username': mastodon_user['username'],
- 'display_name': mastodon_user['display_name'],
- 'source_url': mastodon_user['url'],
- 'created_at': mastodon_user['created_at'],
- }
-
- session[me] = session_info
-
- return redirect(url_for('.get_timeline_home_html', me=me))
-
- #return resp.text, resp.status_code
- @twitter_app.post('/data/statuses')
- def post_tweets_create ():
- me = request.args.get('me')
- mastodon_user = session[me]
-
- instance = mastodon_user['instance']
- access_token = mastodon_user["access_token"]
-
- text = request.form.get('text')
- in_reply_to_id = request.form.get('reply_to_tweet_id')
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
- params = {
- 'text': text
- }
-
- if in_reply_to_id:
- params['in_reply_to_id'] = in_reply_to_id
-
- url = f'https://{instance}/api/v1/statuses/{tweet_id}/bookmark'
-
- resp = requests.post(url, data=params, headers=headers)
-
- new_status = json.loads(resp.text)
-
- new_tweet_id=new_status['id']
-
- if 'HX-Request' in request.headers:
- return render_template('partial/compose-form.html', new_tweet_id=new_tweet_id, me=me)
- else:
- return resp.text, resp.status_code
- @twitter_app.post('/data/status/<tweet_id>/retweet')
- def post_tweet_retweet (tweet_id):
- me = request.args.get('me')
- mastodon_user = session[me]
-
- instance = mastodon_user['instance']
- access_token = mastodon_user["access_token"]
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
- url = f'https://{instance}/api/v1/statuses/{tweet_id}/reblog'
-
- resp = requests.post(url, headers=headers)
-
- # new_status = json.loads(resp.text)
-
- # new_tweet_id=new_status['id']
- ## old status is in reblog: {id:} property.
-
- # if 'HX-Request' in request.headers:
- # return 'retweeted, new ID: ' + new_tweet_id
- # else:
- return resp.text, resp.status_code
- @twitter_app.get('/status/<tweet_id>.html')
- @twitter_app.get('/statuses.html')
- def get_tweet_html (tweet_id = None):
- if not tweet_id:
- ids = request.args.get('ids').split(',')
- return f'tweets: {ids}'
- else:
- return 'tweet: {tweet_id}'
- @twitter_app.get('/profile/<user_id>.html')
- def get_profile_html (user_id):
- me = request.args.get('me')
- mastodon_user = session[me]
-
- instance = mastodon_user['instance']
- access_token = mastodon_user["access_token"]
-
- max_id = request.args.get('max_id')
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
- params = {}
-
- try:
- # if the UID isn't numeric then we'll fallback to string.
- url = f'https://{instance}/api/v1/accounts/{int(user_id)}'
- except:
- url = f'https://{instance}/api/v1/accounts/lookup'
- params = {
- 'acct': user_id
- }
-
- resp = requests.get(url, params=params, headers=headers)
- account_info = json.loads(resp.text)
-
-
- user_id = account_info["id"]
-
- url = f'https://{instance}/api/v1/accounts/{user_id}/statuses'
-
- params = {}
-
- if max_id:
- params['max_id'] = max_id
-
- resp = requests.get(url, params=params, headers=headers)
- tweets = json.loads(resp.text)
- #tweets = tweet_source.get_timeline("public", timeline_params)
-
- max_id = tweets[-1]["id"] if len(tweets) else ''
- tweets = list(map(mastodon_model_dc_vm, tweets))
-
- print("max_id = " + max_id)
-
- query = {}
-
- if len(tweets):
- query = {
- #'next_data_hx_select': '#tweets',
- 'next_data_url': url_for('.get_profile_html', user_id=user_id, me=me, max_id=max_id),
- 'next_page_url': url_for('.get_profile_html', user_id=user_id, me=me, max_id=max_id)
- }
-
- user = {
- 'id': user_id
- }
-
- #return Response(response_json, mimetype='application/json')
-
- if 'HX-Request' in request.headers:
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query,
- me=me, mastodon_user=mastodon_user)
- else:
- return render_template('user-profile.html', user = user, tweets = tweets, query = query,
- me=me, mastodon_user=mastodon_user)
- @twitter_app.route('/latest.html', methods=['GET'])
- def get_timeline_home_html (variant = "reverse_chronological"):
- me = request.args.get('me')
- mastodon_user = session[me]
-
- instance = mastodon_user.get('instance')
- token = mastodon_user.get('access_token')
-
- max_id = request.args.get('max_id')
- tweet_source = MastodonAPSource(f"https://{instance}", token)
-
- timeline_params = {
- 'local': True
- }
-
- if max_id:
- timeline_params['max_id'] = max_id
-
- tweets = tweet_source.get_timeline("public", timeline_params, return_dataclasses=True)
-
- max_id = tweets[-1].id if len(tweets) else ''
- tweets = list(map(mastodon_model_dc_vm, tweets))
-
- print("max_id = " + max_id)
-
- query = {}
-
- if len(tweets):
- query = {
- 'next_data_url': url_for('.get_timeline_home_html', max_id=max_id, me=me),
- 'next_page_url': url_for('.get_timeline_home_html', max_id=max_id, me=me)
- }
-
- user = {}
-
- #return Response(response_json, mimetype='application/json')
-
- return render_template('tweet-collection.html', user = user, tweets = tweets, query = query,
- me=me, mastodon_user=mastodon_user)
- @twitter_app.get('/bookmarks.html')
- def get_bookmarks_html ():
- me = request.args.get('me')
- mastodon_user = session[me]
-
- instance = mastodon_user.get('instance')
- token = mastodon_user.get('access_token')
-
- max_id = request.args.get('max_id')
- tweet_source = MastodonAPSource(f"https://{instance}", token)
-
- tweets = tweet_source.get_bookmarks(max_id=max_id, return_dataclasses=True)
-
- #tweets = tweet_source.get_timeline("public", timeline_params)
-
- max_id = tweets[-1].id if len(tweets) else ''
- tweets = list(map(mastodon_model_dc_vm, tweets))
-
- print("max_id = " + max_id)
-
- query = {}
-
- if len(tweets):
- query = {
- 'next_data_url': url_for('.get_bookmarks_html', me=me, max_id=max_id),
- 'next_page_url': url_for('.get_bookmarks_html', me=me, max_id=max_id)
- }
-
- user = {}
-
- #return Response(response_json, mimetype='application/json')
-
- return render_template('tweet-collection.html', user = user, tweets = tweets, query = query,
- me=me, mastodon_user=mastodon_user)
- @twitter_app.post('/data/bookmarks/<tweet_id>')
- def post_tweet_bookmark (tweet_id):
- me = request.args.get('me')
- mastodon_user = session[me]
-
- instance = mastodon_user['instance']
- access_token = mastodon_user["access_token"]
-
- max_id = request.args.get('max_id')
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
- params = {}
-
- url = f'https://{instance}/api/v1/statuses/{tweet_id}/bookmark'
-
- resp = requests.post(url, headers=headers)
-
- return resp.text, resp.status_code
- @twitter_app.delete('/data/bookmarks/<tweet_id>')
- def delete_tweet_bookmark (tweet_id):
- me = request.args.get('me')
- mastodon_user = session[me]
-
- instance = mastodon_user['instance']
- access_token = mastodon_user["access_token"]
-
- max_id = request.args.get('max_id')
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
- params = {}
-
- url = f'https://{instance}/api/v1/statuses/{tweet_id}/unbookmark'
-
- resp = requests.post(url, headers=headers)
-
- return resp.text, resp.status_code
|