123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- from dataclasses import dataclass
- from typing import List, Dict, Optional
- from hashlib import sha256
- import os
- from pathlib import Path
- import json
- import requests
- from flask import Flask, g, redirect, url_for, render_template, jsonify, request, send_from_directory, render_template_string
- from . import content_system as h_cs
- from . import view_model as h_vm
- api = Flask(__name__, static_url_path='')
- @api.context_processor
- def add_nav_items_to_template_context ():
- nav_items = []
-
- route_nav = g.get('route_nav')
-
- if route_nav:
- nav_items += route_nav
-
- module_nav = g.get('module_nav')
-
- if module_nav:
- nav_items += module_nav
-
- #nav_items.sort(key = lambda ni: ni['order'])
-
- return dict(
- nav_items = nav_items
- )
- @api.get('/login.html')
- def get_login_html ():
- opengraph_info = dict(
- type = 'webpage', # threads might be article
- url = g.app_url,
- title = 'Hogumathi',
- description = 'An app for Twitter, Mastodon, YouTube, etc; Open Source.'
- )
-
- return render_template('login.html', opengraph_info=opengraph_info)
- @api.get('/')
- def index ():
- return redirect(url_for('.get_login_html'))
- @api.get('/img')
- def get_image ():
- print('GET IMG: FIXME this should be done with caching proxy')
- url = request.args['url']
- url_hash = sha256(url.encode('utf-8')).hexdigest()
- path = f'.data/cache/media/{url_hash}'
-
- print(f'path = {path}')
-
- if not os.path.exists(path):
- resp = requests.get(url)
- print(f'status_code = {resp.status_code}')
- if resp.status_code >= 200 and resp.status_code < 300:
- with open(path, 'wb') as f:
- f.write(resp.content)
- with open(f'{path}.meta', 'w') as f:
- headers = dict(resp.headers)
- json.dump(headers, f)
- else:
- return 'not found.', 404
-
- with open(f'{path}.meta', 'r') as f:
- headers = json.load(f)
-
- #print(url)
- #print(url_hash)
- #print(headers)
-
- # not sure why some responses use lower case.
- mimetype = headers.get('Content-Type') or headers.get('content-type')
-
- # Flask goes relative to the module as opposed to the working directory.
- media_cache_dir = Path(Path.cwd(), '.data/cache/media')
-
- return send_from_directory(media_cache_dir, url_hash, mimetype=mimetype)
- @api.get('/content/abc123.html')
- def get_abc123_html ():
- return 'abc123'
- @api.get('/content/<content_id>.<response_format>')
- def get_content_html (content_id, response_format='json', content_kwargs=None):
-
- if not content_kwargs:
- content_kwargs = filter(lambda e: e[0].startswith('content:'), request.args.items())
- content_kwargs = dict(map(lambda e: [e[0][len('content:'):], e[1]], content_kwargs))
-
- content = h_cs.get_content(content_id, **content_kwargs)
-
- if type(content) == h_vm.FeedItem:
- return render_template('tweet-collection.html', tweets=[content], user = {}, query = {})
- elif type(content) == h_vm.CollectionPage:
- pagination_token = request.args.get('pagination_token')
-
- if content.next_token:
- print(f'next_token = {content.next_token}')
-
- return render_template('tweet-collection.html', tweets=content.items, user = {}, query = {})
- elif type(content) == list:
- return render_template('tweet-collection.html', tweets=content, user = {}, query = {})
- else:
- return jsonify(content)
- @api.get('/content/def456.html')
- def get_def456_html ():
- return get_content_html('brand:ispoogedaily', response_format='html')
-
- @api.get('/content/search.<response_format>')
- def get_content_search_html (response_format = 'html'):
- source_id = request.args.get('source')
- q = request.args.get('q')
- pagination_token = request.args.get('pagination_token')
- max_results = int(request.args.get('limit', 10))
-
- # search object store
- # search origin sources
- # populate object store with results
-
- # similar to how messages app works. Multiple sources within one app.
- # That app does not cache results tho, does an online search with each query.
-
-
- return 'ok'
-
- @api.get('/schedule/jobs.html')
- def get_schedule_jobs_html ():
-
- template = """
- {% extends "base-bs.html" %}
-
- {% block content %}
-
- {% endblock %}
- """
-
- view_model = {
- 'jobs': [
- {
- 'id': '1234',
- 'next_run': '',
- 'last_run': '',
- 'interval': 1,
- 'unit': 'minutes',
- 'period': '', # period vs. interval?
- 'latest': '',
- 'start_day': '',
- 'cancel_after': ''
- }
- ]
- }
-
- return render_template_string(template, **view_model)
-
- @api.get('/schedule/create-job.html')
- def get_schedule_create_job_html ():
-
- template = """
- {% extends "base-bs.html" %}
-
- {% block content %}
-
- {% endblock %}
- """
-
- view_model = {
-
- }
-
- return render_template_string(template, **view_model)
- @dataclass
- class FeedItemMedia:
- url: str
- mimetype: str
- @dataclass
- class FeedItem:
- id: str
- text: str
- media: Optional[List[FeedItemMedia]] = None
- def ingest_feed_item (feed_item: FeedItem) -> FeedItem:
- with sqlite3.connect('.data/ingest.db') as db:
- with db.cursor() as cur:
- #cur.row_factory = sqlite3.Row
-
- feed_item_table_exists = False # example in Hogumathi, twitter archive plugin I think
- if not feed_item_table_exists:
- cur.execute("""
- create table feed_item (
- id text,
- text text
- )
-
- """)
- cur.execute("""
- create table feed_item_media (
- feed_item_id integer,
- url text,
- mimetype text
- )
-
- """)
- sql = 'insert into feed_item (id, text) values (?, ?)'
- params = [
- feed_item.id,
- feed_item.text
- ]
-
- res = cur.execute(sql, params)
-
- if not res:
- print('could not ingest feed_item')
- return False
-
-
- return feed_item
- def ingest_feed_item_media (feed_item: FeedItem) -> FeedItem:
- print('ingest_feed_item_media')
-
- if not feed_item.media:
- return feed_item
-
- with sqlite3.connect('.data/ingest.db') as db:
- with db.cursor() as cur:
- #cur.row_factory = sqlite3.Row
-
- for media_item in feed_item.media:
-
- # TODO import URL to files app and store that URL.
- # may want to support several URLs, so that offline LANs work.
-
- sql = 'insert into feed_item_media (feed_item_id, url, mimetype) values (?, ?, ?)'
- params = [
- feed_item.id,
- media_item.url,
- media_item.mimetype
- ]
- res = cur.execute(sql, params)
-
- if not res:
- print('could not ingest feed_item_media')
-
- return feed_item
-
- @api.post('/api/ingest/feed-item')
- def api_ingest_feed_item ():
- """
- Eventually other content sources with ingest here, and this will be the main DB.
- Via inigest_feed_Item and ingest_feed_item_media.
-
- They could be worker tasks. Work when submitted directly from browser extension.
- """
- print('api_ingest_feed_item')
- ingest_media = int(request.args.get('ingest_media', 1))
- feed_item = request.args.get('feed_item') # FIXME might want to use post body/form
- feed_item = from_dict(data_class=FeedItem, data=feed_item)
-
- fi_i_res = ingest_feed_item(feed_item)
-
- if ingest_media: # and fi_i_res(blocking=True, timeout=5):
- fi_i_media_res = ingest_feed_item_media(feed_item)
-
- #fi_i_media_res(blocking=True)
-
- return 'ok'
- @api.get('/health')
- def get_health ():
- return 'ok'
|