from configparser import ConfigParser import base64 from flask import json, Response, render_template, request, send_from_directory, Blueprint, url_for, session, redirect, g from flask_cors import CORS import sqlite3 import os import json import json_stream from zipfile import ZipFile import itertools from urllib.parse import urlencode as urlencode_qs import datetime import dateutil import dateutil.parser import dateutil.tz import requests from mastodon_v2.api import MastodonAPSource from mastodon_v2.types import Status from hogumathi_app.view_model import FeedItem, FeedItemAction, PublicMetrics DATA_DIR='.data' twitter_app = Blueprint('mastodon_facade', 'mastodon_facade', static_folder='static', static_url_path='', url_prefix='/') @twitter_app.before_request def add_module_nav_to_template_context (): me = request.args.get('me') if me and me.startswith('mastodon:'): youtube_user = session[ me ] g.module_nav = [ dict( href = url_for('.get_timeline_home_html', me=me), label = 'Public Timeline', order = 100 ), dict( href = url_for('.get_bookmarks_html', me=me), label = 'Bookmarks', order = 200 ) ] def mastodon_model_dc_vm (post_data: Status) -> FeedItem: """ This is the method we should use. The others should be refactored out. """ print(post_data) user = post_data.account source_url = post_data.url avi_icon_url = user.avatar url = url_for('mastodon_facade.get_tweet_html', tweet_id=post_data.id) actions = { 'bookmark': FeedItemAction('mastodon_facade.post_tweet_bookmark', {'tweet_id': post_data.id}), 'delete_bookmark': FeedItemAction('mastodon_facade.delete_tweet_bookmark', {'tweet_id': post_data.id}), 'retweet': FeedItemAction('mastodon_facade.post_tweet_retweet', {'tweet_id': post_data.id}) } t = FeedItem( id = post_data.id, # hugely not a fan of allowing the server's HTML out. can we sanitize it ourselves? html = post_data.content, url = url, source_url = source_url, created_at = post_data.created_at, avi_icon_url = avi_icon_url, display_name = user.display_name, handle = user.acct, author_url = url_for('mastodon_facade.get_profile_html', user_id = user.id, me='mastodon:mastodon.cloud:109271381872332822'), source_author_url = user.url, public_metrics = PublicMetrics( reply_count = post_data.replies_count, retweet_count = post_data.reblogs_count, like_count = post_data.favourites_count ), actions = actions, debug_source_data = post_data ) return t def register_app (instance): endpoint_url = g.app_url redirect_uri = endpoint_url + og_url_for('.get_loggedin_html') client_name = os.environ.get('MASTODON_CLIENT_NAME') url = f'https://{instance}/api/v1/apps' params = { 'client_name': client_name, 'redirect_uris': redirect_uri, 'scopes': 'read write' } resp = requests.post(url, params=params) with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'wt') as f: f.write(resp.text) return resp @twitter_app.get('/api/app//register') def post_api_app_instance_register (instance): if not instance: return 'pass isntance= in request params', 400 resp = register_app(instance) return resp.text, resp.status_code @twitter_app.get('/api/app/') def get_api_app_instance (instance): with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'rt') as f: return Response(f.read(), mimetype='application/json') @twitter_app.get('/login.html') def get_login_html (): instance = request.args.get('instance') force_login = request.args.get('force_login') if not instance: return 'provide instance= in query string.', 400 if not os.path.exists(f'{DATA_DIR}/mastodon-client_{instance}.json'): resp = register_app(instance) print(resp) with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'rt') as f: app_info = json.loads(f.read()) params = { 'client_id': app_info['client_id'], 'redirect_uri': app_info['redirect_uri'], 'scope': 'read write', 'response_type': 'code' } if force_login: params['force_login'] = force_login url = f'https://{instance}/oauth/authorize?' + urlencode_qs(params) return redirect(url) @twitter_app.get('/logged-in.html') def get_logged_in_html (): code = request.args.get('code') instance = request.args.get('instance', 'mastodon.cloud') if not instance: return 'provide instance= in query string.', 400 with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'rt') as f: app_info = json.loads(f.read()) params = { 'client_id': app_info['client_id'], 'client_secret': app_info['client_secret'], 'redirect_uri': app_info['redirect_uri'], 'scope': 'read write', 'grant_type': 'authorization_code', 'code': code, # force_login: True } url = f'https://{instance}/oauth/token' resp = requests.post(url, data=params) auth_info = json.loads(resp.text) #auth_info = {"access_token":"6C6-hD2_OK1vDFc_WcPQnZC5KL0jOUePgQgMQELeV0k","token_type":"Bearer","scope":"read write","created_at":1670088545} access_token = auth_info['access_token'] url = f'https://{instance}/api/v1/accounts/verify_credentials' headers = { 'Authorization': f'Bearer {access_token}' } resp = requests.get(url, headers=headers) mastodon_user = json.loads(resp.text) me = f'mastodon:{instance}:{mastodon_user["id"]}' session_info = { **auth_info, 'instance': instance, 'id': mastodon_user['id'], 'acct': mastodon_user['acct'], 'username': mastodon_user['username'], 'display_name': mastodon_user['display_name'], 'source_url': mastodon_user['url'], 'created_at': mastodon_user['created_at'], } session[me] = session_info return redirect(url_for('.get_timeline_home_html', me=me)) #return resp.text, resp.status_code @twitter_app.post('/data/statuses') def post_tweets_create (): me = request.args.get('me') mastodon_user = session[me] instance = mastodon_user['instance'] access_token = mastodon_user["access_token"] text = request.form.get('text') in_reply_to_id = request.form.get('reply_to_tweet_id') headers = { 'Authorization': f'Bearer {access_token}' } params = { 'text': text } if in_reply_to_id: params['in_reply_to_id'] = in_reply_to_id url = f'https://{instance}/api/v1/statuses/{tweet_id}/bookmark' resp = requests.post(url, data=params, headers=headers) new_status = json.loads(resp.text) new_tweet_id=new_status['id'] if 'HX-Request' in request.headers: return render_template('partial/compose-form.html', new_tweet_id=new_tweet_id, me=me) else: return resp.text, resp.status_code @twitter_app.post('/data/status//retweet') def post_tweet_retweet (tweet_id): me = request.args.get('me') mastodon_user = session[me] instance = mastodon_user['instance'] access_token = mastodon_user["access_token"] headers = { 'Authorization': f'Bearer {access_token}' } url = f'https://{instance}/api/v1/statuses/{tweet_id}/reblog' resp = requests.post(url, headers=headers) # new_status = json.loads(resp.text) # new_tweet_id=new_status['id'] ## old status is in reblog: {id:} property. # if 'HX-Request' in request.headers: # return 'retweeted, new ID: ' + new_tweet_id # else: return resp.text, resp.status_code @twitter_app.get('/status/.html') @twitter_app.get('/statuses.html') def get_tweet_html (tweet_id = None): if not tweet_id: ids = request.args.get('ids').split(',') return f'tweets: {ids}' else: return 'tweet: {tweet_id}' @twitter_app.get('/profile/.html') def get_profile_html (user_id): me = request.args.get('me') mastodon_user = session.get(me) max_id = request.args.get('max_id') instance = request.args.get('instance') or mastodon_user['instance'] access_token = mastodon_user and mastodon_user.get('access_token') print(instance) headers = {} if access_token: headers.update({ 'Authorization': f'Bearer {access_token}' }) mastodon_source = MastodonAPSource(f'https://{instance}', access_token) params = {} try: # if the UID isn't numeric then we'll fallback to string. url = f'https://{instance}/api/v1/accounts/{int(user_id)}' except: url = f'https://{instance}/api/v1/accounts/lookup' params = { 'acct': user_id } resp = requests.get(url, params=params, headers=headers) account_info = json.loads(resp.text) user_id = account_info["id"] tweets = mastodon_source.get_user_statuses(user_id, max_id=max_id, return_dataclasses=True) #tweets = tweet_source.get_timeline("public", timeline_params) max_id = tweets[-1].id if len(tweets) else '' tweets = list(map(mastodon_model_dc_vm, tweets)) print("max_id = " + max_id) query = {} if len(tweets): query = { #'next_data_hx_select': '#tweets', 'next_data_url': url_for('.get_profile_html', user_id=user_id, me=me, max_id=max_id), 'next_page_url': url_for('.get_profile_html', user_id=user_id, me=me, max_id=max_id) } user = { 'id': user_id } #return Response(response_json, mimetype='application/json') if 'HX-Request' in request.headers: return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query, me=me, mastodon_user=mastodon_user) else: return render_template('user-profile.html', user = user, tweets = tweets, query = query, me=me, mastodon_user=mastodon_user) @twitter_app.route('/latest.html', methods=['GET']) def get_timeline_home_html (variant = "reverse_chronological"): me = request.args.get('me') mastodon_user = session[me] instance = mastodon_user.get('instance') token = mastodon_user.get('access_token') max_id = request.args.get('max_id') tweet_source = MastodonAPSource(f"https://{instance}", token) timeline_params = { 'local': True } if max_id: timeline_params['max_id'] = max_id tweets = tweet_source.get_timeline("public", timeline_params, return_dataclasses=True) max_id = tweets[-1].id if len(tweets) else '' tweets = list(map(mastodon_model_dc_vm, tweets)) print("max_id = " + max_id) query = {} if len(tweets): query = { 'next_data_url': url_for('.get_timeline_home_html', max_id=max_id, me=me), 'next_page_url': url_for('.get_timeline_home_html', max_id=max_id, me=me) } user = {} #return Response(response_json, mimetype='application/json') return render_template('tweet-collection.html', user = user, tweets = tweets, query = query, me=me, mastodon_user=mastodon_user) @twitter_app.get('/bookmarks.html') def get_bookmarks_html (): me = request.args.get('me') mastodon_user = session[me] instance = mastodon_user.get('instance') token = mastodon_user.get('access_token') max_id = request.args.get('max_id') tweet_source = MastodonAPSource(f"https://{instance}", token) tweets = tweet_source.get_bookmarks(max_id=max_id, return_dataclasses=True) #tweets = tweet_source.get_timeline("public", timeline_params) max_id = tweets[-1].id if len(tweets) else '' tweets = list(map(mastodon_model_dc_vm, tweets)) print("max_id = " + max_id) query = {} if len(tweets): query = { 'next_data_url': url_for('.get_bookmarks_html', me=me, max_id=max_id), 'next_page_url': url_for('.get_bookmarks_html', me=me, max_id=max_id) } user = {} #return Response(response_json, mimetype='application/json') return render_template('tweet-collection.html', user = user, tweets = tweets, query = query, me=me, mastodon_user=mastodon_user) @twitter_app.post('/data/bookmarks/') def post_tweet_bookmark (tweet_id): me = request.args.get('me') mastodon_user = session[me] instance = mastodon_user['instance'] access_token = mastodon_user["access_token"] max_id = request.args.get('max_id') headers = { 'Authorization': f'Bearer {access_token}' } params = {} url = f'https://{instance}/api/v1/statuses/{tweet_id}/bookmark' resp = requests.post(url, headers=headers) return resp.text, resp.status_code @twitter_app.delete('/data/bookmarks/') def delete_tweet_bookmark (tweet_id): me = request.args.get('me') mastodon_user = session[me] instance = mastodon_user['instance'] access_token = mastodon_user["access_token"] max_id = request.args.get('max_id') headers = { 'Authorization': f'Bearer {access_token}' } params = {} url = f'https://{instance}/api/v1/statuses/{tweet_id}/unbookmark' resp = requests.post(url, headers=headers) return resp.text, resp.status_code