web.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. from dataclasses import dataclass
  2. from typing import List, Dict, Optional
  3. from hashlib import sha256
  4. import os
  5. from pathlib import Path
  6. import json
  7. import requests
  8. from flask import Flask, g, redirect, url_for, render_template, jsonify, request, send_from_directory, render_template_string
  9. from . import content_system as h_cs
  10. from . import view_model as h_vm
  11. api = Flask(__name__, static_url_path='')
  12. @api.context_processor
  13. def add_nav_items_to_template_context ():
  14. nav_items = []
  15. route_nav = g.get('route_nav')
  16. if route_nav:
  17. nav_items += route_nav
  18. module_nav = g.get('module_nav')
  19. if module_nav:
  20. nav_items += module_nav
  21. #nav_items.sort(key = lambda ni: ni['order'])
  22. return dict(
  23. nav_items = nav_items
  24. )
  25. @api.get('/login.html')
  26. def get_login_html ():
  27. opengraph_info = dict(
  28. type = 'webpage', # threads might be article
  29. url = g.app_url,
  30. title = 'Hogumathi',
  31. description = 'An app for Twitter, Mastodon, YouTube, etc; Open Source.'
  32. )
  33. return render_template('login.html', opengraph_info=opengraph_info)
  34. @api.get('/')
  35. def index ():
  36. return redirect(url_for('.get_login_html'))
  37. @api.get('/img')
  38. def get_image ():
  39. print('GET IMG: FIXME this should be done with caching proxy')
  40. url = request.args['url']
  41. url_hash = sha256(url.encode('utf-8')).hexdigest()
  42. path = f'.data/cache/media/{url_hash}'
  43. print(f'path = {path}')
  44. if not os.path.exists(path):
  45. resp = requests.get(url)
  46. print(f'status_code = {resp.status_code}')
  47. if resp.status_code >= 200 and resp.status_code < 300:
  48. with open(path, 'wb') as f:
  49. f.write(resp.content)
  50. with open(f'{path}.meta', 'w') as f:
  51. headers = dict(resp.headers)
  52. json.dump(headers, f)
  53. else:
  54. return 'not found.', 404
  55. with open(f'{path}.meta', 'r') as f:
  56. headers = json.load(f)
  57. #print(url)
  58. #print(url_hash)
  59. #print(headers)
  60. # not sure why some responses use lower case.
  61. mimetype = headers.get('Content-Type') or headers.get('content-type')
  62. # Flask goes relative to the module as opposed to the working directory.
  63. media_cache_dir = Path(Path.cwd(), '.data/cache/media')
  64. return send_from_directory(media_cache_dir, url_hash, mimetype=mimetype)
  65. @api.get('/content/abc123.html')
  66. def get_abc123_html ():
  67. return 'abc123'
  68. @api.get('/content/<content_id>.<response_format>')
  69. def get_content_html (content_id, response_format='json', content_kwargs=None):
  70. if not content_kwargs:
  71. content_kwargs = filter(lambda e: e[0].startswith('content:'), request.args.items())
  72. content_kwargs = dict(map(lambda e: [e[0][len('content:'):], e[1]], content_kwargs))
  73. content = h_cs.get_content(content_id, **content_kwargs)
  74. if type(content) == h_vm.FeedItem:
  75. return render_template('tweet-collection.html', tweets=[content], user = {}, query = {})
  76. elif type(content) == h_vm.CollectionPage:
  77. pagination_token = request.args.get('pagination_token')
  78. if content.next_token:
  79. print(f'next_token = {content.next_token}')
  80. return render_template('tweet-collection.html', tweets=content.items, user = {}, query = {})
  81. elif type(content) == list:
  82. return render_template('tweet-collection.html', tweets=content, user = {}, query = {})
  83. else:
  84. return jsonify(content)
  85. @api.get('/content/def456.html')
  86. def get_def456_html ():
  87. return get_content_html('brand:ispoogedaily', response_format='html')
  88. @api.get('/content/search.<response_format>')
  89. def get_content_search_html (response_format = 'html'):
  90. source_id = request.args.get('source')
  91. q = request.args.get('q')
  92. pagination_token = request.args.get('pagination_token')
  93. max_results = int(request.args.get('limit', 10))
  94. # search object store
  95. # search origin sources
  96. # populate object store with results
  97. # similar to how messages app works. Multiple sources within one app.
  98. # That app does not cache results tho, does an online search with each query.
  99. return 'ok'
  100. @api.get('/schedule/jobs.html')
  101. def get_schedule_jobs_html ():
  102. template = """
  103. {% extends "base-bs.html" %}
  104. {% block content %}
  105. {% endblock %}
  106. """
  107. view_model = {
  108. 'jobs': [
  109. {
  110. 'id': '1234',
  111. 'next_run': '',
  112. 'last_run': '',
  113. 'interval': 1,
  114. 'unit': 'minutes',
  115. 'period': '', # period vs. interval?
  116. 'latest': '',
  117. 'start_day': '',
  118. 'cancel_after': ''
  119. }
  120. ]
  121. }
  122. return render_template_string(template, **view_model)
  123. @api.get('/schedule/create-job.html')
  124. def get_schedule_create_job_html ():
  125. template = """
  126. {% extends "base-bs.html" %}
  127. {% block content %}
  128. {% endblock %}
  129. """
  130. view_model = {
  131. }
  132. return render_template_string(template, **view_model)
  133. @dataclass
  134. class FeedItemMedia:
  135. url: str
  136. mimetype: str
  137. @dataclass
  138. class FeedItem:
  139. id: str
  140. text: str
  141. media: Optional[List[FeedItemMedia]] = None
  142. def ingest_feed_item (feed_item: FeedItem) -> FeedItem:
  143. with sqlite3.connect('.data/ingest.db') as db:
  144. with db.cursor() as cur:
  145. #cur.row_factory = sqlite3.Row
  146. feed_item_table_exists = False # example in Hogumathi, twitter archive plugin I think
  147. if not feed_item_table_exists:
  148. cur.execute("""
  149. create table feed_item (
  150. id text,
  151. text text
  152. )
  153. """)
  154. cur.execute("""
  155. create table feed_item_media (
  156. feed_item_id integer,
  157. url text,
  158. mimetype text
  159. )
  160. """)
  161. sql = 'insert into feed_item (id, text) values (?, ?)'
  162. params = [
  163. feed_item.id,
  164. feed_item.text
  165. ]
  166. res = cur.execute(sql, params)
  167. if not res:
  168. print('could not ingest feed_item')
  169. return False
  170. return feed_item
  171. def ingest_feed_item_media (feed_item: FeedItem) -> FeedItem:
  172. print('ingest_feed_item_media')
  173. if not feed_item.media:
  174. return feed_item
  175. with sqlite3.connect('.data/ingest.db') as db:
  176. with db.cursor() as cur:
  177. #cur.row_factory = sqlite3.Row
  178. for media_item in feed_item.media:
  179. # TODO import URL to files app and store that URL.
  180. # may want to support several URLs, so that offline LANs work.
  181. sql = 'insert into feed_item_media (feed_item_id, url, mimetype) values (?, ?, ?)'
  182. params = [
  183. feed_item.id,
  184. media_item.url,
  185. media_item.mimetype
  186. ]
  187. res = cur.execute(sql, params)
  188. if not res:
  189. print('could not ingest feed_item_media')
  190. return feed_item
  191. @api.post('/api/ingest/feed-item')
  192. def api_ingest_feed_item ():
  193. """
  194. Eventually other content sources with ingest here, and this will be the main DB.
  195. Via inigest_feed_Item and ingest_feed_item_media.
  196. They could be worker tasks. Work when submitted directly from browser extension.
  197. """
  198. print('api_ingest_feed_item')
  199. ingest_media = int(request.args.get('ingest_media', 1))
  200. feed_item = request.args.get('feed_item') # FIXME might want to use post body/form
  201. feed_item = from_dict(data_class=FeedItem, data=feed_item)
  202. fi_i_res = ingest_feed_item(feed_item)
  203. if ingest_media: # and fi_i_res(blocking=True, timeout=5):
  204. fi_i_media_res = ingest_feed_item_media(feed_item)
  205. #fi_i_media_res(blocking=True)
  206. return 'ok'
  207. @api.get('/health')
  208. def get_health ():
  209. return 'ok'