from configparser import ConfigParser import base64 from flask import json, Response, render_template, request, send_from_directory, Blueprint, url_for, session, redirect from flask_cors import CORS import sqlite3 import os import json import json_stream from zipfile import ZipFile import itertools from urllib.parse import urlencode as urlencode_qs import datetime import dateutil import dateutil.parser import dateutil.tz import requests DATA_DIR='.data' twitter_app = Blueprint('mastodon_facade', 'mastodon_facade', static_folder='static', static_url_path='', url_prefix='/') def mastodon_model (post_data): # retweeted_by, avi_icon_url, display_name, handle, created_at, text user = post_data['account'] source_url = post_data['url'] avi_icon_url = user['avatar'] url = url_for('.get_tweet_html', tweet_id=post_data['id']) t = { 'id': post_data['id'], # hugely not a fan of allowing the server's HTML out. can we sanitize it ourselves? 'html': post_data['content'], 'url': url, 'source_url': source_url, 'created_at': post_data['created_at'], 'avi_icon_url': avi_icon_url, 'display_name': user['display_name'], 'handle': user['acct'], 'author_url': url_for('.get_profile_html', user_id = user['id'], me='mastodon:mastodon.cloud:109271381872332822'), 'source_author_url': user['url'], 'public_metrics': { 'reply_count': post_data['replies_count'], 'retweet_count': post_data['reblogs_count'], 'like_count': post_data['favourites_count'] }, 'activity': post_data } return t def register_app (instance): client_name = os.environ.get('MASTODON_CLIENT_NAME') redirect_uri = 'http://localhost:5004/mastodon/logged-in.html' url = f'https://{instance}/api/v1/apps' params = { 'client_name': client_name, 'redirect_uris': redirect_uri, 'scopes': 'read write' } resp = requests.post(url, params=params) with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'wt') as f: f.write(resp.text) return resp @twitter_app.get('/api/app//register') def post_api_app_instance_register (instance): if not instance: return 'pass isntance= in request params', 400 resp = register_app(instance) return resp.text, resp.status_code @twitter_app.get('/api/app/') def get_api_app_instance (instance): with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'rt') as f: return Response(f.read(), mimetype='application/json') @twitter_app.get('/login.html') def get_login_html (): instance = request.args.get('instance') force_login = request.args.get('force_login') if not instance: return 'provide instance= in query string.', 400 if not os.path.exists(f'{DATA_DIR}/mastodon-client_{instance}.json'): resp = register_app(instance) print(resp) with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'rt') as f: app_info = json.loads(f.read()) params = { 'client_id': app_info['client_id'], 'redirect_uri': app_info['redirect_uri'], 'scope': 'read write', 'response_type': 'code' } if force_login: params['force_login'] = force_login url = f'https://{instance}/oauth/authorize?' + urlencode_qs(params) return redirect(url) @twitter_app.get('/logged-in.html') def get_logged_in_html (): code = request.args.get('code') instance = request.args.get('instance', 'mastodon.cloud') if not instance: return 'provide instance= in query string.', 400 with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'rt') as f: app_info = json.loads(f.read()) params = { 'client_id': app_info['client_id'], 'client_secret': app_info['client_secret'], 'redirect_uri': app_info['redirect_uri'], 'scope': 'read write', 'grant_type': 'authorization_code', 'code': code, # force_login: True } url = f'https://{instance}/oauth/token' resp = requests.post(url, data=params) auth_info = json.loads(resp.text) #auth_info = {"access_token":"6C6-hD2_OK1vDFc_WcPQnZC5KL0jOUePgQgMQELeV0k","token_type":"Bearer","scope":"read write","created_at":1670088545} access_token = auth_info['access_token'] url = f'https://{instance}/api/v1/accounts/verify_credentials' headers = { 'Authorization': f'Bearer {access_token}' } resp = requests.get(url, headers=headers) mastodon_user = json.loads(resp.text) me = f'mastodon:{instance}:{mastodon_user["id"]}' session_info = { **auth_info, 'instance': instance, 'id': mastodon_user['id'], 'acct': mastodon_user['acct'], 'username': mastodon_user['username'], 'display_name': mastodon_user['display_name'], 'source_url': mastodon_user['url'], 'created_at': mastodon_user['created_at'], } session[me] = session_info return redirect(url_for('.get_latest_html', me=me)) #return resp.text, resp.status_code @twitter_app.post('/data/statuses') def post_tweets_create (): me = request.args.get('me') mastodon_user = session[me] instance = mastodon_user['instance'] access_token = mastodon_user["access_token"] text = request.form.get('text') in_reply_to_id = request.form.get('reply_to_tweet_id') headers = { 'Authorization': f'Bearer {access_token}' } params = { 'text': text } if in_reply_to_id: params['in_reply_to_id'] = in_reply_to_id url = f'https://{instance}/api/v1/statuses/{tweet_id}/bookmark' resp = requests.post(url, data=params, headers=headers) new_status = json.loads(resp.text) new_tweet_id=new_status['id'] if 'HX-Request' in request.headers: return render_template('partial/compose-form.html', new_tweet_id=new_tweet_id, me=me) else: return resp.text, resp.status_code @twitter_app.post('/data/status//retweet') def post_tweet_retweet (tweet_id): me = request.args.get('me') mastodon_user = session[me] instance = mastodon_user['instance'] access_token = mastodon_user["access_token"] headers = { 'Authorization': f'Bearer {access_token}' } url = f'https://{instance}/api/v1/statuses/{tweet_id}/reblog' resp = requests.post(url, headers=headers) # new_status = json.loads(resp.text) # new_tweet_id=new_status['id'] ## old status is in reblog: {id:} property. # if 'HX-Request' in request.headers: # return 'retweeted, new ID: ' + new_tweet_id # else: return resp.text, resp.status_code @twitter_app.get('/status/.html') def get_tweet_html (tweet_id): return 'tweet: ' + tweet_id @twitter_app.get('/profile/.html') def get_profile_html (user_id): me = request.args.get('me') mastodon_user = session[me] instance = mastodon_user['instance'] access_token = mastodon_user["access_token"] max_id = request.args.get('max_id') headers = { 'Authorization': f'Bearer {access_token}' } params = {} try: # if the UID isn't numeric then we'll fallback to string. url = f'https://{instance}/api/v1/accounts/{int(user_id)}' except: url = f'https://{instance}/api/v1/accounts/lookup' params = { 'acct': user_id } resp = requests.get(url, params=params, headers=headers) account_info = json.loads(resp.text) user_id = account_info["id"] url = f'https://{instance}/api/v1/accounts/{user_id}/statuses' params = {} if max_id: params['max_id'] = max_id resp = requests.get(url, params=params, headers=headers) tweets = json.loads(resp.text) #tweets = tweet_source.get_timeline("public", timeline_params) max_id = tweets[-1]["id"] if len(tweets) else '' tweets = list(map(mastodon_model, tweets)) print("max_id = " + max_id) query = {} if len(tweets): query = { #'next_data_hx_select': '#tweets', 'next_data_url': url_for('.get_profile_html', user_id=user_id, me=me, max_id=max_id), 'next_page_url': url_for('.get_profile_html', user_id=user_id, me=me, max_id=max_id) } user = { 'id': user_id } #return Response(response_json, mimetype='application/json') if 'HX-Request' in request.headers: return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query, me=me, mastodon_user=mastodon_user) else: return render_template('user-profile.html', user = user, tweets = tweets, query = query, me=me, mastodon_user=mastodon_user) @twitter_app.route('/latest.html', methods=['GET']) def get_timeline_home_html (variant = "reverse_chronological"): # retweeted_by, avi_icon_url, display_name, handle, created_at, text me = request.args.get('me') mastodon_user = session[me] token = mastodon_user.get('access_token') max_id = request.args.get('max_id') # comes from token or session user_id = "ispoogedaily" if not token: print("No token provided or found in environ.") return Response('{"err": "No token."}', mimetype="application/json", status=400) tweet_source = MastodonAPSource("https://mastodon.cloud", token) timeline_params = { 'local': True } if max_id: timeline_params['max_id'] = max_id tweets = tweet_source.get_timeline("public", timeline_params) max_id = tweets[-1]["id"] if len(tweets) else '' tweets = list(map(mastodon_model, tweets)) print("max_id = " + max_id) query = {} if len(tweets): query = { 'next_data_url': url_for('.get_timeline_home_html', max_id=max_id, me=me), 'next_page_url': url_for('.get_timeline_home_html', max_id=max_id, me=me) } user = { 'id': user_id } #return Response(response_json, mimetype='application/json') return render_template('tweet-collection.html', user = user, tweets = tweets, query = query, me=me, mastodon_user=mastodon_user) @twitter_app.get('/bookmarks.html') def get_bookmarks_html (): me = request.args.get('me') mastodon_user = session[me] instance = mastodon_user['instance'] access_token = mastodon_user["access_token"] max_id = request.args.get('max_id') headers = { 'Authorization': f'Bearer {access_token}' } params = {} url = f'https://{instance}/api/v1/bookmarks' params = {} if max_id: params['max_id'] = max_id resp = requests.get(url, params=params, headers=headers) tweets = json.loads(resp.text) #tweets = tweet_source.get_timeline("public", timeline_params) max_id = tweets[-1]["id"] if len(tweets) else '' tweets = list(map(mastodon_model, tweets)) print("max_id = " + max_id) query = {} if len(tweets): query = { 'next_data_url': url_for('.get_bookmarks_html', me=me, max_id=max_id), 'next_page_url': url_for('.get_bookmarks_html', me=me, max_id=max_id) } user = {} #return Response(response_json, mimetype='application/json') return render_template('tweet-collection.html', user = user, tweets = tweets, query = query, me=me, mastodon_user=mastodon_user) @twitter_app.post('/data/bookmarks/') def post_tweet_bookmark (tweet_id): me = request.args.get('me') mastodon_user = session[me] instance = mastodon_user['instance'] access_token = mastodon_user["access_token"] max_id = request.args.get('max_id') headers = { 'Authorization': f'Bearer {access_token}' } params = {} url = f'https://{instance}/api/v1/statuses/{tweet_id}/bookmark' resp = requests.post(url, headers=headers) return resp.text, resp.status_code @twitter_app.delete('/data/bookmarks/') def delete_tweet_bookmark (tweet_id): me = request.args.get('me') mastodon_user = session[me] instance = mastodon_user['instance'] access_token = mastodon_user["access_token"] max_id = request.args.get('max_id') headers = { 'Authorization': f'Bearer {access_token}' } params = {} url = f'https://{instance}/api/v1/statuses/{tweet_id}/unbookmark' resp = requests.post(url, headers=headers) return resp.text, resp.status_code class MastodonAPSource: def __init__ (self, endpoint, token): self.endpoint = endpoint self.token = token super().__init__() def get_timeline (self, path = "home", params = {}): url = self.endpoint + "/api/v1/timelines/" + path params = { **params } headers = {"Authorization": "Bearer {}".format(self.token)} response = requests.get(url, params=params, headers=headers) print(response) response_json = json.loads(response.text) return response_json # nice parallel is validation # https://github.com/moko256/twitlatte/blob/master/component_client_mastodon/src/main/java/com/github/moko256/latte/client/mastodon/MastodonApiClientImpl.kt def get_mentions_timeline (self): # /api/v1/notifications?exclude_types=follow,favourite,reblog,poll,follow_request return def get_favorited_timeline (self): # /api/v1/notifications?exclude_types=follow,reblog,mention,poll,follow_request return def get_conversations_timeline (self): # /api/v1/conversations return def get_bookmarks_timeline (self): # /account/bookmarks return def get_favorites_timeline (self): # /account/favourites return def get_user_statuses (self, account_id): # /api/v2/search?type=statuses&account_id= # /api/v1/accounts/:id/statuses return def get_statuses (self, ids): # /api/v1/statuses/:id return def get_profile (self, username): # /api/v1/accounts/search return self.search('@' + username, result_type='accounts') def search (q, result_type = None, following = False): # /api/v2/search return