123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551 |
- from configparser import ConfigParser
- import base64
- from flask import json, Response, render_template, request, send_from_directory, Blueprint, url_for, session, redirect
- from flask_cors import CORS
- import sqlite3
- import os
- import json
- import json_stream
- from zipfile import ZipFile
- import itertools
- from urllib.parse import urlencode as urlencode_qs
- import datetime
- import dateutil
- import dateutil.parser
- import dateutil.tz
- import requests
- DATA_DIR='.data'
- twitter_app = Blueprint('mastodon_facade', 'mastodon_facade',
- static_folder='static',
- static_url_path='',
- url_prefix='/')
- def mastodon_model (post_data):
- # retweeted_by, avi_icon_url, display_name, handle, created_at, text
-
-
- user = post_data['account']
-
- source_url = post_data['url']
-
- avi_icon_url = user['avatar']
-
- url = url_for('.get_tweet_html', tweet_id=post_data['id'])
-
-
- t = {
- 'id': post_data['id'],
- # hugely not a fan of allowing the server's HTML out. can we sanitize it ourselves?
- 'html': post_data['content'],
- 'url': url,
- 'source_url': source_url,
- 'created_at': post_data['created_at'],
- 'avi_icon_url': avi_icon_url,
- 'display_name': user['display_name'],
- 'handle': user['acct'],
-
- 'author_url': url_for('.get_profile_html', user_id = user['id'], me='mastodon:mastodon.cloud:109271381872332822'),
- 'source_author_url': user['url'],
-
- 'public_metrics': {
- 'reply_count': post_data['replies_count'],
- 'retweet_count': post_data['reblogs_count'],
- 'like_count': post_data['favourites_count']
- },
-
- 'activity': post_data
- }
-
- return t
-
-
- def register_app (instance):
- client_name = os.environ.get('MASTODON_CLIENT_NAME')
- redirect_uri = 'http://localhost:5004/mastodon/logged-in.html'
-
- url = f'https://{instance}/api/v1/apps'
-
- params = {
- 'client_name': client_name,
- 'redirect_uris': redirect_uri,
- 'scopes': 'read write'
- }
-
- resp = requests.post(url, params=params)
-
- with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'wt') as f:
- f.write(resp.text)
-
- return resp
- @twitter_app.get('/api/app/<instance>/register')
- def post_api_app_instance_register (instance):
-
- if not instance:
- return 'pass isntance= in request params', 400
-
- resp = register_app(instance)
-
- return resp.text, resp.status_code
- @twitter_app.get('/api/app/<instance>')
- def get_api_app_instance (instance):
-
- with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'rt') as f:
- return Response(f.read(), mimetype='application/json')
- @twitter_app.get('/login.html')
- def get_login_html ():
- instance = request.args.get('instance')
- force_login = request.args.get('force_login')
-
- if not instance:
- return 'provide instance= in query string.', 400
-
- if not os.path.exists(f'{DATA_DIR}/mastodon-client_{instance}.json'):
- resp = register_app(instance)
-
- print(resp)
-
- with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'rt') as f:
- app_info = json.loads(f.read())
-
- params = {
- 'client_id': app_info['client_id'],
- 'redirect_uri': app_info['redirect_uri'],
- 'scope': 'read write',
-
- 'response_type': 'code'
- }
-
- if force_login:
- params['force_login'] = force_login
-
- url = f'https://{instance}/oauth/authorize?' + urlencode_qs(params)
-
- return redirect(url)
- @twitter_app.get('/logged-in.html')
- def get_logged_in_html ():
- code = request.args.get('code')
- instance = request.args.get('instance', 'mastodon.cloud')
-
- if not instance:
- return 'provide instance= in query string.', 400
-
- with open(f'{DATA_DIR}/mastodon-client_{instance}.json', 'rt') as f:
- app_info = json.loads(f.read())
-
- params = {
- 'client_id': app_info['client_id'],
- 'client_secret': app_info['client_secret'],
-
- 'redirect_uri': app_info['redirect_uri'],
- 'scope': 'read write',
-
- 'grant_type': 'authorization_code',
- 'code': code,
- # force_login: True
- }
-
- url = f'https://{instance}/oauth/token'
-
- resp = requests.post(url, data=params)
-
- auth_info = json.loads(resp.text)
-
-
- #auth_info = {"access_token":"6C6-hD2_OK1vDFc_WcPQnZC5KL0jOUePgQgMQELeV0k","token_type":"Bearer","scope":"read write","created_at":1670088545}
-
- access_token = auth_info['access_token']
-
-
- url = f'https://{instance}/api/v1/accounts/verify_credentials'
-
-
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
-
- resp = requests.get(url, headers=headers)
-
- mastodon_user = json.loads(resp.text)
-
- me = f'mastodon:{instance}:{mastodon_user["id"]}'
-
- session_info = {
- **auth_info,
-
- 'instance': instance,
-
- 'id': mastodon_user['id'],
- 'acct': mastodon_user['acct'],
- 'username': mastodon_user['username'],
- 'display_name': mastodon_user['display_name'],
- 'source_url': mastodon_user['url'],
- 'created_at': mastodon_user['created_at'],
- }
-
- session[me] = session_info
-
- return redirect(url_for('.get_latest_html', me=me))
-
- #return resp.text, resp.status_code
- @twitter_app.post('/data/statuses')
- def post_tweets_create ():
- me = request.args.get('me')
- mastodon_user = session[me]
-
- instance = mastodon_user['instance']
- access_token = mastodon_user["access_token"]
-
- text = request.form.get('text')
- in_reply_to_id = request.form.get('reply_to_tweet_id')
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
- params = {
- 'text': text
- }
-
- if in_reply_to_id:
- params['in_reply_to_id'] = in_reply_to_id
-
- url = f'https://{instance}/api/v1/statuses/{tweet_id}/bookmark'
-
- resp = requests.post(url, data=params, headers=headers)
-
- new_status = json.loads(resp.text)
-
- new_tweet_id=new_status['id']
-
- if 'HX-Request' in request.headers:
- return render_template('partial/compose-form.html', new_tweet_id=new_tweet_id, me=me)
- else:
- return resp.text, resp.status_code
- @twitter_app.post('/data/status/<tweet_id>/retweet')
- def post_tweet_retweet (tweet_id):
- me = request.args.get('me')
- mastodon_user = session[me]
-
- instance = mastodon_user['instance']
- access_token = mastodon_user["access_token"]
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
- url = f'https://{instance}/api/v1/statuses/{tweet_id}/reblog'
-
- resp = requests.post(url, headers=headers)
-
- # new_status = json.loads(resp.text)
-
- # new_tweet_id=new_status['id']
- ## old status is in reblog: {id:} property.
-
- # if 'HX-Request' in request.headers:
- # return 'retweeted, new ID: ' + new_tweet_id
- # else:
- return resp.text, resp.status_code
- @twitter_app.get('/status/<tweet_id>.html')
- def get_tweet_html (tweet_id):
- return 'tweet: ' + tweet_id
- @twitter_app.get('/profile/<user_id>.html')
- def get_profile_html (user_id):
- me = request.args.get('me')
- mastodon_user = session[me]
-
- instance = mastodon_user['instance']
- access_token = mastodon_user["access_token"]
-
- max_id = request.args.get('max_id')
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
- params = {}
-
- try:
- # if the UID isn't numeric then we'll fallback to string.
- url = f'https://{instance}/api/v1/accounts/{int(user_id)}'
- except:
- url = f'https://{instance}/api/v1/accounts/lookup'
- params = {
- 'acct': user_id
- }
-
- resp = requests.get(url, params=params, headers=headers)
- account_info = json.loads(resp.text)
-
-
- user_id = account_info["id"]
-
- url = f'https://{instance}/api/v1/accounts/{user_id}/statuses'
-
- params = {}
-
- if max_id:
- params['max_id'] = max_id
-
- resp = requests.get(url, params=params, headers=headers)
- tweets = json.loads(resp.text)
- #tweets = tweet_source.get_timeline("public", timeline_params)
-
- max_id = tweets[-1]["id"] if len(tweets) else ''
- tweets = list(map(mastodon_model, tweets))
-
- print("max_id = " + max_id)
-
- query = {}
-
- if len(tweets):
- query = {
- #'next_data_hx_select': '#tweets',
- 'next_data_url': url_for('.get_profile_html', user_id=user_id, me=me, max_id=max_id),
- 'next_page_url': url_for('.get_profile_html', user_id=user_id, me=me, max_id=max_id)
- }
-
- user = {
- 'id': user_id
- }
-
- #return Response(response_json, mimetype='application/json')
-
- if 'HX-Request' in request.headers:
- return render_template('partial/tweets-timeline.html', user = user, tweets = tweets, query = query,
- me=me, mastodon_user=mastodon_user)
- else:
- return render_template('user-profile.html', user = user, tweets = tweets, query = query,
- me=me, mastodon_user=mastodon_user)
- @twitter_app.route('/latest.html', methods=['GET'])
- def get_timeline_home_html (variant = "reverse_chronological"):
- # retweeted_by, avi_icon_url, display_name, handle, created_at, text
- me = request.args.get('me')
- mastodon_user = session[me]
-
- token = mastodon_user.get('access_token')
- max_id = request.args.get('max_id')
-
- # comes from token or session
- user_id = "ispoogedaily"
-
- if not token:
- print("No token provided or found in environ.")
- return Response('{"err": "No token."}', mimetype="application/json", status=400)
-
-
- tweet_source = MastodonAPSource("https://mastodon.cloud", token)
-
- timeline_params = {
- 'local': True
- }
-
- if max_id:
- timeline_params['max_id'] = max_id
-
- tweets = tweet_source.get_timeline("public", timeline_params)
-
- max_id = tweets[-1]["id"] if len(tweets) else ''
- tweets = list(map(mastodon_model, tweets))
-
- print("max_id = " + max_id)
-
- query = {}
-
- if len(tweets):
- query = {
- 'next_data_url': url_for('.get_timeline_home_html', max_id=max_id, me=me),
- 'next_page_url': url_for('.get_timeline_home_html', max_id=max_id, me=me)
- }
-
- user = {
- 'id': user_id
- }
-
- #return Response(response_json, mimetype='application/json')
-
- return render_template('tweet-collection.html', user = user, tweets = tweets, query = query,
- me=me, mastodon_user=mastodon_user)
- @twitter_app.get('/bookmarks.html')
- def get_bookmarks_html ():
- me = request.args.get('me')
- mastodon_user = session[me]
-
- instance = mastodon_user['instance']
- access_token = mastodon_user["access_token"]
-
- max_id = request.args.get('max_id')
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
- params = {}
-
- url = f'https://{instance}/api/v1/bookmarks'
-
- params = {}
-
- if max_id:
- params['max_id'] = max_id
-
- resp = requests.get(url, params=params, headers=headers)
- tweets = json.loads(resp.text)
- #tweets = tweet_source.get_timeline("public", timeline_params)
-
- max_id = tweets[-1]["id"] if len(tweets) else ''
- tweets = list(map(mastodon_model, tweets))
-
- print("max_id = " + max_id)
-
- query = {}
-
- if len(tweets):
- query = {
- 'next_data_url': url_for('.get_bookmarks_html', me=me, max_id=max_id),
- 'next_page_url': url_for('.get_bookmarks_html', me=me, max_id=max_id)
- }
-
- user = {}
-
- #return Response(response_json, mimetype='application/json')
-
- return render_template('tweet-collection.html', user = user, tweets = tweets, query = query,
- me=me, mastodon_user=mastodon_user)
- @twitter_app.post('/data/bookmarks/<tweet_id>')
- def post_tweet_bookmark (tweet_id):
- me = request.args.get('me')
- mastodon_user = session[me]
-
- instance = mastodon_user['instance']
- access_token = mastodon_user["access_token"]
-
- max_id = request.args.get('max_id')
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
- params = {}
-
- url = f'https://{instance}/api/v1/statuses/{tweet_id}/bookmark'
-
- resp = requests.post(url, headers=headers)
-
- return resp.text, resp.status_code
- @twitter_app.delete('/data/bookmarks/<tweet_id>')
- def delete_tweet_bookmark (tweet_id):
- me = request.args.get('me')
- mastodon_user = session[me]
-
- instance = mastodon_user['instance']
- access_token = mastodon_user["access_token"]
-
- max_id = request.args.get('max_id')
- headers = {
- 'Authorization': f'Bearer {access_token}'
- }
- params = {}
-
- url = f'https://{instance}/api/v1/statuses/{tweet_id}/unbookmark'
-
- resp = requests.post(url, headers=headers)
-
- return resp.text, resp.status_code
- class MastodonAPSource:
- def __init__ (self, endpoint, token):
- self.endpoint = endpoint
- self.token = token
- super().__init__()
- def get_timeline (self, path = "home", params = {}):
- url = self.endpoint + "/api/v1/timelines/" + path
- params = {
- **params
- }
-
-
- headers = {"Authorization": "Bearer {}".format(self.token)}
-
- response = requests.get(url, params=params, headers=headers)
-
- print(response)
-
- response_json = json.loads(response.text)
-
- return response_json
-
- # nice parallel is validation
- # https://github.com/moko256/twitlatte/blob/master/component_client_mastodon/src/main/java/com/github/moko256/latte/client/mastodon/MastodonApiClientImpl.kt
-
- def get_mentions_timeline (self):
- # /api/v1/notifications?exclude_types=follow,favourite,reblog,poll,follow_request
- return
-
- def get_favorited_timeline (self):
- # /api/v1/notifications?exclude_types=follow,reblog,mention,poll,follow_request
- return
-
- def get_conversations_timeline (self):
- # /api/v1/conversations
- return
-
- def get_bookmarks_timeline (self):
- # /account/bookmarks
- return
-
- def get_favorites_timeline (self):
- # /account/favourites
- return
-
-
- def get_user_statuses (self, account_id):
- # /api/v2/search?type=statuses&account_id=
- # /api/v1/accounts/:id/statuses
- return
-
- def get_statuses (self, ids):
- # /api/v1/statuses/:id
- return
-
- def get_profile (self, username):
- # /api/v1/accounts/search
- return self.search('@' + username, result_type='accounts')
-
- def search (q, result_type = None, following = False):
- # /api/v2/search
- return
|