from hashlib import sha256 import os from pathlib import Path import json import requests from flask import Flask, g, redirect, url_for, render_template, jsonify, request, send_from_directory, render_template_string from . import content_system as h_cs from . import view_model as h_vm api = Flask(__name__, static_url_path='') @api.context_processor def add_nav_items_to_template_context (): nav_items = [] route_nav = g.get('route_nav') if route_nav: nav_items += route_nav module_nav = g.get('module_nav') if module_nav: nav_items += module_nav #nav_items.sort(key = lambda ni: ni['order']) return dict( nav_items = nav_items ) @api.get('/login.html') def get_login_html (): opengraph_info = dict( type = 'webpage', # threads might be article url = g.app_url, title = 'Hogumathi', description = 'An app for Twitter, Mastodon, YouTube, etc; Open Source.' ) return render_template('login.html', opengraph_info=opengraph_info) @api.get('/') def index (): return redirect(url_for('.get_login_html')) @api.get('/img') def get_image (): print('GET IMG') url = request.args['url'] url_hash = sha256(url.encode('utf-8')).hexdigest() path = f'.data/cache/media/{url_hash}' print(f'path = {path}') if not os.path.exists(path): resp = requests.get(url) print(f'status_code = {resp.status_code}') if resp.status_code >= 200 and resp.status_code < 300: with open(path, 'wb') as f: f.write(resp.content) with open(f'{path}.meta', 'w') as f: headers = dict(resp.headers) json.dump(headers, f) else: return 'not found.', 404 with open(f'{path}.meta', 'r') as f: headers = json.load(f) #print(url) #print(url_hash) #print(headers) # not sure why some responses use lower case. mimetype = headers.get('Content-Type') or headers.get('content-type') # Flask goes relative to the module as opposed to the working directory. media_cache_dir = Path(Path.cwd(), '.data/cache/media') return send_from_directory(media_cache_dir, url_hash, mimetype=mimetype) @api.get('/content/abc123.html') def get_abc123_html (): return 'abc123' @api.get('/content/.html') def get_content_html (content_id, content_kwargs=None): if not content_kwargs: content_kwargs = filter(lambda e: e[0].startswith('content:'), request.args.items()) content_kwargs = dict(map(lambda e: [e[0][len('content:'):], e[1]], content_kwargs)) content = h_cs.get_content(content_id, **content_kwargs) if type(content) == h_vm.FeedItem: return render_template('tweet-collection.html', tweets=[content], user = {}, query = {}) elif type(content) == h_vm.CollectionPage: pagination_token = request.args.get('pagination_token') if content.next_token: print(f'next_token = {content.next_token}') return render_template('tweet-collection.html', tweets=content.items, user = {}, query = {}) elif type(content) == list: return render_template('tweet-collection.html', tweets=content, user = {}, query = {}) else: return jsonify(content) @api.get('/content/def456.html') def get_def456_html (): return get_content_html('brand:ispoogedaily') @api.get('/content/search.html') def get_content_search_html (): source_id = request.args.get('source') q = request.args.get('q') pagination_token = request.args.get('pagination_token') max_results = int(request.args.get('limit', 10)) # search object store # search origin sources # populate object store with results # similar to how messages app works. Multiple sources within one app. # That app does not cache results tho, does an online search with each query. return 'ok' @api.get('/schedule/jobs.html') def get_schedule_jobs_html (): template = """ {% extends "base-bs.html" %} {% block content %} {% endblock %} """ view_model = { 'jobs': [ { 'id': '1234', 'next_run': '', 'last_run': '', 'interval': 1, 'unit': 'minutes', 'period': '', # period vs. interval? 'latest': '', 'start_day': '', 'cancel_after': '' } ] } return render_template_string(template, **view_model) @api.get('/schedule/create-job.html') def get_schedule_create_job_html (): template = """ {% extends "base-bs.html" %} {% block content %} {% endblock %} """ view_model = { } return render_template_string(template, **view_model)