from dataclasses import dataclass from typing import List, Dict, Optional from hashlib import sha256 import os from pathlib import Path import json import requests from flask import Flask, g, redirect, url_for, render_template, jsonify, request, send_from_directory, render_template_string from . import content_system as h_cs from . import view_model as h_vm api = Flask(__name__, static_url_path='') @api.context_processor def add_nav_items_to_template_context (): nav_items = [] route_nav = g.get('route_nav') if route_nav: nav_items += route_nav module_nav = g.get('module_nav') if module_nav: nav_items += module_nav #nav_items.sort(key = lambda ni: ni['order']) return dict( nav_items = nav_items ) @api.get('/login.html') def get_login_html (): opengraph_info = dict( type = 'webpage', # threads might be article url = g.app_url, title = 'Hogumathi', description = 'An app for Twitter, Mastodon, YouTube, etc; Open Source.' ) return render_template('login.html', opengraph_info=opengraph_info) @api.get('/') def index (): return redirect(url_for('.get_login_html')) @api.get('/img') def get_image (): print('GET IMG: FIXME this should be done with caching proxy') url = request.args['url'] url_hash = sha256(url.encode('utf-8')).hexdigest() path = f'.data/cache/media/{url_hash}' print(f'path = {path}') if not os.path.exists(path): resp = requests.get(url) print(f'status_code = {resp.status_code}') if resp.status_code >= 200 and resp.status_code < 300: with open(path, 'wb') as f: f.write(resp.content) with open(f'{path}.meta', 'w') as f: headers = dict(resp.headers) json.dump(headers, f) else: return 'not found.', 404 with open(f'{path}.meta', 'r') as f: headers = json.load(f) #print(url) #print(url_hash) #print(headers) # not sure why some responses use lower case. mimetype = headers.get('Content-Type') or headers.get('content-type') # Flask goes relative to the module as opposed to the working directory. media_cache_dir = Path(Path.cwd(), '.data/cache/media') return send_from_directory(media_cache_dir, url_hash, mimetype=mimetype) @api.get('/content/abc123.html') def get_abc123_html (): return 'abc123' @api.get('/content/.') def get_content_html (content_id, response_format='json', content_kwargs=None): if not content_kwargs: content_kwargs = filter(lambda e: e[0].startswith('content:'), request.args.items()) content_kwargs = dict(map(lambda e: [e[0][len('content:'):], e[1]], content_kwargs)) content = h_cs.get_content(content_id, **content_kwargs) if type(content) == h_vm.FeedItem: return render_template('tweet-collection.html', tweets=[content], user = {}, query = {}) elif type(content) == h_vm.CollectionPage: pagination_token = request.args.get('pagination_token') if content.next_token: print(f'next_token = {content.next_token}') return render_template('tweet-collection.html', tweets=content.items, user = {}, query = {}) elif type(content) == list: return render_template('tweet-collection.html', tweets=content, user = {}, query = {}) else: return jsonify(content) @api.get('/content/def456.html') def get_def456_html (): return get_content_html('brand:ispoogedaily', response_format='html') @api.get('/content/search.') def get_content_search_html (response_format = 'html'): source_id = request.args.get('source') q = request.args.get('q') pagination_token = request.args.get('pagination_token') max_results = int(request.args.get('limit', 10)) # search object store # search origin sources # populate object store with results # similar to how messages app works. Multiple sources within one app. # That app does not cache results tho, does an online search with each query. return 'ok' @api.get('/schedule/jobs.html') def get_schedule_jobs_html (): template = """ {% extends "base-bs.html" %} {% block content %} {% endblock %} """ view_model = { 'jobs': [ { 'id': '1234', 'next_run': '', 'last_run': '', 'interval': 1, 'unit': 'minutes', 'period': '', # period vs. interval? 'latest': '', 'start_day': '', 'cancel_after': '' } ] } return render_template_string(template, **view_model) @api.get('/schedule/create-job.html') def get_schedule_create_job_html (): template = """ {% extends "base-bs.html" %} {% block content %} {% endblock %} """ view_model = { } return render_template_string(template, **view_model) @dataclass class FeedItemMedia: url: str mimetype: str @dataclass class FeedItem: id: str text: str media: Optional[List[FeedItemMedia]] = None def ingest_feed_item (feed_item: FeedItem) -> FeedItem: with sqlite3.connect('.data/ingest.db') as db: with db.cursor() as cur: #cur.row_factory = sqlite3.Row feed_item_table_exists = False # example in Hogumathi, twitter archive plugin I think if not feed_item_table_exists: cur.execute(""" create table feed_item ( id text, text text ) """) cur.execute(""" create table feed_item_media ( feed_item_id integer, url text, mimetype text ) """) sql = 'insert into feed_item (id, text) values (?, ?)' params = [ feed_item.id, feed_item.text ] res = cur.execute(sql, params) if not res: print('could not ingest feed_item') return False return feed_item def ingest_feed_item_media (feed_item: FeedItem) -> FeedItem: print('ingest_feed_item_media') if not feed_item.media: return feed_item with sqlite3.connect('.data/ingest.db') as db: with db.cursor() as cur: #cur.row_factory = sqlite3.Row for media_item in feed_item.media: # TODO import URL to files app and store that URL. # may want to support several URLs, so that offline LANs work. sql = 'insert into feed_item_media (feed_item_id, url, mimetype) values (?, ?, ?)' params = [ feed_item.id, media_item.url, media_item.mimetype ] res = cur.execute(sql, params) if not res: print('could not ingest feed_item_media') return feed_item @api.post('/api/ingest/feed-item') def api_ingest_feed_item (): """ Eventually other content sources with ingest here, and this will be the main DB. Via inigest_feed_Item and ingest_feed_item_media. They could be worker tasks. Work when submitted directly from browser extension. """ print('api_ingest_feed_item') ingest_media = int(request.args.get('ingest_media', 1)) feed_item = request.args.get('feed_item') # FIXME might want to use post body/form feed_item = from_dict(data_class=FeedItem, data=feed_item) fi_i_res = ingest_feed_item(feed_item) if ingest_media: # and fi_i_res(blocking=True, timeout=5): fi_i_media_res = ingest_feed_item_media(feed_item) #fi_i_media_res(blocking=True) return 'ok' @api.get('/health') def get_health (): return 'ok'