from ulid import ULID from dataclasses import asdict, replace from importlib.util import find_spec import os import json from flask import request, g, jsonify, render_template, Blueprint, url_for, session from twitter_v2.api import ApiV2TweetSource from . import view_model as h_vm from .view_model import FeedItem, CollectionPage, cleandict from .content_system import get_content, get_all_content, register_content_source twitter_enabled = False if find_spec('twitter_v2_facade'): from twitter_v2_facade.view_model import tweet_model_dc_vm twitter_enabled = True youtube_enabled = False if find_spec('youtube_v3_facade'): from youtube_v3_facade.content_source import youtube_model, get_youtube_builder youtube_enabled = True DATA_DIR=".data" item_collections_bp = Blueprint('item_collections', 'item_collections', static_folder='static', static_url_path='', url_prefix='/') def get_tweet_collection (collection_id): json_path = f'{DATA_DIR}/collection/{collection_id}.json' if not os.path.exists(json_path): return with open(json_path, 'rt', encoding='utf-8') as f: collection = json.loads(f.read()) return collection def collection_from_card_source (url): """ temp1 = await fetch('http://localhost:5000/notes/cards/search?q=twitter.com/&limit=10').then(r => r.json()) re = /(http[^\s]+twitter\.com\/[^\/]+\/status\/[\d]+)/ig tweetLinks = temp1.cards.map(c => c.card.content).map(c => c.match(re)) tweetLinks2 = tweetLinks.flat().filter(l => l) tweetLinksS = Array.from(new Set(tweetLinks2)) statusUrls = tweetLinksS.map(s => new URL(s)) //users = Array.from(new Set(statusUrls.map(s => s.pathname.split('/')[1]))) ids = Array.from(new Set(statusUrls.map(s => parseInt(s.pathname.split('/')[3])))) """ """ temp1 = JSON.parse(document.body.innerText) // get swipe note + created_at + tweet user + tweet ID tweetCards = temp1.cards.map(c => c.card).filter(c => c.content.match(re)) tweets = tweetCards.map(c => ({created_at: c.created_at, content: c.content, tweets: c.content.match(re).map(m => new URL(m))})) tweets.filter(t => t.tweets.filter(t2 => t2.user.toLowerCase() == 'stephenmpinto').length) // HN re = /(http[^\s]+news.ycombinator\.com\/[^\s]+\=[\d]+)/ig linkCards = temp1.cards.map(c => c.card).filter(c => c.content.match(re)) links = linkCards.map(c => ({created_at: c.created_at, content: c.content, links: c.content.match(re).map(m => new URL(m))})) // YT (I thnk I've already done this one) """ # more in 2022 twitter report return None def expand_item (item, me, tweets = None, includes = None, yt_videos = None): if 'id' in item: t = list(filter(lambda t: item['id'] == t.id, tweets)) if not len(t): print("no tweet for item: " + item['id']) feed_item = FeedItem( id = item['id'], text = "(Deleted, suspended or blocked)", created_at = "", handle = "error", display_name = "Error" ) # FIXME 1) put this in relative order to the collection # FIXME 2) we can use the tweet link to get the user ID... else: t = t[0] feed_item = tweet_model_dc_vm(includes, t, me) note = item.get('note') feed_item = replace(feed_item, note = note) elif 'yt_id' in item: yt_id = item['yt_id'] vid = list(filter(lambda v: v['id'] == yt_id, yt_videos))[0] feed_item = youtube_model(vid) note = item.get('note') feed_item.update({'note': note}) return feed_item # pagination token is the next tweet_ID @item_collections_bp.get('/collection/.html') def get_collection_html (collection_id): me = request.args.get('me') acct = session.get(me) max_results = int(request.args.get('max_results', 10)) pagination_token = int(request.args.get('pagination_token', 0)) collection = get_tweet_collection(collection_id) if 'authorized_users' in collection and (not acct or not me in collection['authorized_users']): return 'access denied.', 403 items = collection['items'][pagination_token:(pagination_token + max_results)] if not len(items): return 'no tweets', 404 twitter_token = os.environ.get('BEARER_TOKEN') if me and me.startswith('twitter:') and acct: twitter_token = acct['access_token'] tweet_source = ApiV2TweetSource(twitter_token) tweet_ids = filter(lambda i: 'id' in i, items) tweet_ids = list(map(lambda item: item['id'], tweet_ids)) tweets_response = tweet_source.get_tweets( tweet_ids, return_dataclass=True ) yt_ids = filter(lambda i: 'yt_id' in i, items) yt_ids = list(map(lambda item: item['yt_id'], yt_ids)) youtube = get_youtube_builder() videos_response = youtube.videos().list(id=','.join(yt_ids), part='snippet,contentDetails,liveStreamingDetails,statistics,recordingDetails', maxResults=1).execute() #print(response_json) if tweets_response.errors: # types: # https://api.twitter.com/2/problems/not-authorized-for-resource (blocked or suspended) # https://api.twitter.com/2/problems/resource-not-found (deleted) #print(response_json.get('errors')) for err in tweets_response.errors: if not 'type' in err: print('unknown error type: ' + str(err)) elif err['type'] == 'https://api.twitter.com/2/problems/not-authorized-for-resource': print('blocked or suspended tweet: ' + err['value']) elif err['type'] == 'https://api.twitter.com/2/problems/resource-not-found': print('deleted tweet: ' + err['value']) else: print('unknown error') print(json.dumps(err, indent=2)) includes = tweets_response.includes tweets = tweets_response.data feed_items = list(map(lambda item: expand_item(item, me, tweets, includes, videos_response['items']), items)) if request.args.get('format') == 'json': return jsonify({'ids': tweet_ids, 'tweets': cleandict(asdict(tweets_response)), 'feed_items': feed_items, 'items': items, 'pagination_token': pagination_token}) else: query = {} if pagination_token: query['next_data_url'] = url_for('.get_collection_html', collection_id=collection_id, pagination_token=pagination_token) if 'HX-Request' in request.headers: return render_template('partial/tweets-timeline.html', tweets = feed_items, user = {}, query = query) else: if pagination_token: query['next_page_url'] = url_for('.get_collection_html', collection_id=collection_id, pagination_token=pagination_token) return render_template('tweet-collection.html', tweets = feed_items, user = {}, query = query) def get_collection_list (me = None): me = request.args.get('me') acct = session.get(me) collections = [] with os.scandir('.data/collection') as collections_files: for collection_file in collections_files: if not collection_file.name.endswith('.json'): continue with open(collection_file.path, 'rt', encoding='utf-8') as f: coll = json.load(f) if 'authorized_users' in coll and (not acct or not me in coll['authorized_users']): continue collection_id = collection_file.name[:-len('.json')] version = coll.get('_version') if not version: # legacy version = str(ULID()) coll_info = dict( id = collection_id, _version = version, href = url_for('.get_collection_html', collection_id=collection_id) ) collections.append(coll_info) return collections # pagination token is the next tweet_ID @item_collections_bp.get('/collections.html') def get_collections_html (): me = request.args.get('me') collections = get_content('collections:list', me=me) return jsonify(collections) def update_collection (collection_id, new_collection, op='replace', version=None, me = None): path = f'.data/collection/{collection_id}.json' existing_collection = None if os.path.exists(path): with open(path, 'rt', encoding='utf-8') as f: existing_collection = json.load(f) existing_version = existing_collection and existing_collection.get('_version') if existing_collection and existing_version != version: raise Error('updating with a wrong version. probably using a stale copy. fetch and retry op.') if op == 'insert': after_id = request.form.get('after_id') raise Error('not supported yet') elif op == 'append': existing_collection['items'] += new_collection['items'] new_collection = existing_collection elif op == 'prepend': existing_collection['items'] = new_collection['items'] + existing_collection['items'] new_collection = existing_collection new_version = str(ULID()) # content addressable hash of json w/o this key or similar new_collection['_version'] = new_version with open(path, 'wt', encoding='utf-8') as f: json.dump(new_collection, f) return new_version @item_collections_bp.post('/collection/.html') def post_collection_html (collection_id): op = request.form.get('op', 'replace') version = request.form.get('version') new_collection = request.form.get('collection.json') new_collection = json.loads(new_collection) # FIXME probably wrong new_version = get_content('collection:update', collection_id, new_collection, op=op, me=me) return jsonify({'_version': new_version}) @item_collections_bp.get('/collection/test-update/.html') def get_collection_test_update_html (collection_id): me = None op = 'prepend' version = request.args.get('version') new_collection = { 'items': [{'id': 'zzz999'}] } new_version = get_content(f'collections:update:{collection_id}', new_collection, version=version, op=op, me=me) return jsonify({'_version': new_version}) @item_collections_bp.post('/data/collection/create/from-cards') def post_data_collection_create_from_cards (): """ // create collection from search, supporting multiple Tweets per card and Tweets in multiple Cards. re = /(https?[a-z0-9\.\/\:]+twitter\.com\/[0-9a-z\_]+\/status\/[\d]+)/ig temp1 = await fetch('http://localhost:5000/notes/cards/search?q=twitter.com/').then(r => r.json()) cardMatches = temp1.cards .map(cm => Object.assign({}, cm, {tweetLinks: Array.from(new Set(cm.card.content.match(re)))})) .filter(cm => cm.tweetLinks && cm.tweetLinks.length) .map(cm => Object.assign({}, cm, {tweetUrls: cm.tweetLinks.map(l => new URL(l))})) .map(cm => Object.assign({}, cm, {tweetInfos: cm.tweetUrls.map(u => ({user: u.pathname.split('/')[1], tweetId: u.pathname.split('/')[3]}))})); collectionCards = {} cardMatches.forEach(function (cm) { if (!cm.tweetLinks.length) { return; } cm.tweetInfos.forEach(function (ti) { if (!collectionCards[ti.tweetId]) { collectionCards[ti.tweetId] = []; } collectionCards[ti.tweetId].push(cm.card); }) }) var collectionItems = []; Object.entries(collectionCards).forEach(function (e) { var tweetId = e[0], cards = e[1]; var note = cards.map(function (card) { return card.created_at + "\n\n" + card.content; }).join("\n\n-\n\n"); collectionItems.push({id: tweetId, note: note, tweet_infos: cm.tweetInfos, card_infos: cards.map(c => 'card#' + c.id)}); }) """ collection = { 'items': [], # described in JS function above 'authorized_users': [g.twitter_user['id']] } return jsonify(collection) def get_item_id (obj_or_dict, id_key='id'): if type(obj_or_dict) == dict: return obj_or_dict[id_key] else: return getattr(obj_or_dict, id_key) def expand_item2 (item, me, content_responses): content_id = item['id'] content_response = content_responses[ content_id ] if type(content_response) == CollectionPage: tweets = content_response.items elif type(content_response) == list: tweets = content_response else: tweets = [content_response] # endswith is a hack. Really FeedItems should return a full ID with prefix. t = list(filter(lambda t: content_id.endswith(f':{get_item_id(t)}'), tweets)) if not len(t): print("no tweet for item: " + item['id']) feed_item = FeedItem( id = item['id'], text = "(Deleted, suspended or blocked)", created_at = "", handle = "error", display_name = "Error" ) # FIXME 1) put this in relative order to the collection # FIXME 2) we can use the tweet link to get the user ID... else: feed_item = t[0] note = item.get('note') if type(feed_item) == dict: feed_item.update(note = note) else: feed_item = replace(feed_item, note = note) return feed_item def get_collection (collection_id, me=None, pagination_token:str = None, max_results:int =10): collection = get_tweet_collection(collection_id) if not collection: return first_idx = int(pagination_token or 0) last_idx = first_idx + max_results items = collection['items'][first_idx:last_idx] content_ids = list(map(lambda item: item['id'], items)) content_responses = get_all_content( tuple(content_ids), enable_bulk_fetch=True ) feed_items = list(map(lambda item: expand_item2(item, me, content_responses), items)) collection['items'] = feed_items if len(collection['items']) == max_results: collection['next_token'] = str(last_idx) return collection def register_content_sources (): register_content_source("collection:", get_collection, id_pattern="([^:]+)") register_content_source("collections:list", get_collection_list, id_pattern="") register_content_source("collections:update:", update_collection, id_pattern="([A-Za-z0-9\-\_\.]+)") # pagination token is the next tweet_ID @item_collections_bp.get('/collection2/.html') def get_collection2_html (collection_id): me = request.args.get('me') acct = session.get(me) max_results = int(request.args.get('limit', 1)) pagination_token = request.args.get('pagination_token', 0) #collection = get_tweet_collection(collection_id) collection = get_content(f'collection:{collection_id}', me=me, pagination_token=pagination_token, max_results=max_results) if 'authorized_users' in collection and (not acct or not me in collection['authorized_users']): return 'access denied.', 403 feed_items = collection['items'] pagination_token = collection.get('next_token') if not len(feed_items): return 'no tweets', 404 if request.args.get('format') == 'json': return jsonify({'ids': tweet_ids, 'tweets': cleandict(asdict(tweets_response)), 'feed_items': feed_items, 'items': items, 'pagination_token': pagination_token}) else: query = {} if pagination_token: query['next_data_url'] = url_for('.get_collection2_html', collection_id=collection_id, pagination_token=pagination_token, limit=max_results, me=me) query['next_page_url'] = url_for('.get_collection2_html', collection_id=collection_id, pagination_token=pagination_token, limit=max_results, me=me) if 'HX-Request' in request.headers: return render_template('partial/tweets-timeline.html', tweets = feed_items, user = {}, query = query) else: if pagination_token: query['next_page_url'] = url_for('.get_collection2_html', me=me, collection_id=collection_id, pagination_token=pagination_token) source_url = url_for('.get_collection2_html', collection_id=collection_id, _external=True) title = f'Collection: {collection_id} on Hogumathi' opengraph_info = dict( type = 'webpage', # threads might be article url = source_url, title = title, description = title, #image = user.profile_image_url ) return render_template('tweet-collection.html', tweets = feed_items, user = {}, query = query, opengraph_info=opengraph_info)