123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- """
- A registry for content sources that work in terms of the View Model (view_model.py).
- Generally a source returns a CollectionPage or individual items.
- At present many sources return a List of Maps because the design is being discovered and solidified as it makes sense rather than big design up front.
- May end up similar to Android's ContentProvider, found later. I was
- thinking about using content:// URI scheme.
- https://developer.android.com/reference/android/content/ContentProvider
- Could also be similar to a Coccoon Generator
- https://cocoon.apache.org/1363_1_1.html
- Later processing in Python:
- https://www.innuy.com/blog/build-data-pipeline-python/
- https://www.bonobo-project.org/
- """
- import re
- import inspect
- from functools import lru_cache
- import time
- def get_ttl_hash(seconds=3600):
- """Return the same value withing `seconds` time period"""
- return round(time.time() / seconds)
- class ContentSystem:
- def __init__ (self):
- self.content_sources = {}
- self.hooks = {}
-
- def register_content_source (self, id_prefix, content_source_fn, id_pattern='(\d+)', source_id=None, weight=None):
- if not source_id:
- source_id=f'{inspect.getmodule(content_source_fn).__name__}:{content_source_fn.__name__}'
-
- if weight == None:
- weight = 1000 - len(self.content_sources)
-
- print(f'register_content_source: {id_prefix}: {source_id} with ID pattern {id_pattern} (weight={weight})')
- self.content_sources[ source_id ] = [id_prefix, content_source_fn, id_pattern, source_id, weight]
- @lru_cache(maxsize=1024) # NOTE: mutating return value mutates cached value
- def find_content_id_args (self, id_pattern, content_id):
- id_args = re.fullmatch(id_pattern, content_id)
- if not id_args:
- return [], {}
-
- args = []
- kwargs = id_args.groupdict()
- if not kwargs:
- args = id_args.groups()
-
- return args, kwargs
-
-
- def resolve_content_source (self, content_id, content_source_id=None, *extra_args, **extra_kwargs):
- """
- Resolve possible content sources given the parameters used for get_content.
-
- Returns a generator, typically the first will be used.
-
- Allows for content modules to determine if sources are available,
- without fetching the content itself.
- """
-
- #source_ids = list(self.content_sources.keys())
- #source_ids.sort(key=lambda id_prefix: len(id_prefix), reverse=True)
-
- source_ids = list(self.content_sources.keys())
- source_ids.sort(key=lambda id_prefix: self.content_sources[id_prefix][4], reverse=True) # 4 = weight
-
- #print(source_ids)
-
- for source_id in source_ids:
- if content_source_id and source_id != content_source_id:
- continue
-
- [id_prefix, content_source_fn, id_pattern, source_id, weight] = self.content_sources[ source_id ]
- if not content_id.startswith(id_prefix):
- continue
-
- source_content_id = content_id[len(id_prefix):]
-
- # HACK
- if not id_prefix.endswith(':') and source_content_id:
- continue
-
- print(f'get_content (id={source_content_id}) from source {source_id}, resolves to {source_content_id} ( weight={weight})')
-
- args, kwargs = self.find_content_id_args(id_pattern, source_content_id)
-
- # HACK
- if id_prefix.endswith(':') and not args and not kwargs:
- continue
-
- if extra_args:
- args += extra_args
-
- if extra_kwargs:
- kwargs = {**extra_kwargs, **kwargs}
-
- # if we're calling a bulk source and only get back partial results...
- # we'd want to remove the found content IDs and merge until
- # we find them all...
- # yet we don't want intelligence about the type of content returned.
- # idea: class BulkResponse(dict): pass
- yield content_source_fn, args, kwargs
-
-
- @lru_cache(maxsize=64)
- def get_content (self, content_id, content_source_id=None, ttl_hash=get_ttl_hash(60), *extra_args, **extra_kwargs):
- """
- NOTE: mutating return value mutates cached value
- """
-
- print(f'get_content {content_id}')
- for content_source_fn, args, kwargs in self.resolve_content_source(
- content_id,
- content_source_id=content_source_id,
- *extra_args,
- **extra_kwargs):
-
- content = content_source_fn(*args, **kwargs)
-
- if content:
- self.invoke_hooks('got_content', content_id, content)
-
- return content
-
- @lru_cache(maxsize=8) # NOTE: mutating return value mutates cached value
- def get_all_content (self, content_ids, enable_bulk_fetch=False, ttl_hash=get_ttl_hash(60)):
- """
- Get content from all sources, using a grouping call if possible.
-
- Returns a map of source_id to to result; the caller needs
- to have the intelligence to merge and paginate.
-
- Native implementation is to juse make one call to get_content per ID,
- but we need to figure out a way to pass a list of IDs and pagination
- per source; for exampe a list of 100+ Tweet IDs and 100+ YT videos
- from a Swipe file.
- """
-
- return self.get_all_content2(content_ids, enable_bulk_fetch=enable_bulk_fetch)
-
- def get_all_content2 (self, content_collection_ids, content_args = None, enable_bulk_fetch=False):
- """
- Takes a list of collection IDs and content_args is a map of (args, kwargs) keyed by collection ID.
-
- We could just use keys from content_args with empty values but that's a little confusing.
-
- Interleaving the next page of a source into existing results is a problem.
-
- Gracefully degraded could simply get the next page at the end of all pages and then
- view older content.
-
- We also need intelligence about content types, meaning perhaps some lambdas pass in.
- Eg. CollectionPage.
-
- See feeds facade for an example of merging one page.
-
- Seems like keeping feed items in a DB is becoming the way to go, serving things in order.
-
- Client side content merging might work to insert nodes above, eg. HTMx.
-
- Might be jarring to reader, so make optional. Append all new or merge.
-
- Cache feed between requests on disk, merge in memory, send merge/append result.
-
- """
-
- bulk_prefixes = {
- #'twitter:tweet:': 'twitter:tweets',
- #'youtube:video:': 'youtube:videos',
- }
- bulk_requests = {}
-
- result = {}
-
- for content_id in content_collection_ids:
-
- is_bulk = False
- if enable_bulk_fetch:
- for bulk_prefix in bulk_prefixes:
- if content_id.startswith(bulk_prefix):
- bulk_content_id = bulk_prefixes[ bulk_prefix ]
- if not bulk_content_id in bulk_requests:
- bulk_requests[ bulk_content_id ] = []
- bulk_requests[ bulk_content_id ].append(content_id)
-
- # max size for a content source...
-
- is_bulk = True
-
- if is_bulk:
- continue
-
- if content_args and content_id in content_args:
- extra_args, extra_kwargs = content_args[content_id]
- else:
- extra_args, extra_kwargs = [], {}
-
- result[ content_id ] = self.get_content(content_id, *extra_args, **extra_kwargs)
-
- for bulk_content_id, content_ids in bulk_requests.items():
- print(f'bulk: {bulk_content_id}, content_ids: {content_ids}')
-
-
-
- bulk_response = self.get_content(bulk_content_id, content_ids=tuple(content_ids)) # FIXME me=... workaround, provide bulk id in args map
-
- print(f'bulk_response: {bulk_response}')
-
- # we're not supposed to be smart about get_content response type...
- # does it return a map by convention? better than iterating something else.
- if bulk_response:
- for content_id, content in bulk_response.items():
- if content:
- self.invoke_hooks('got_content', content_id, content)
-
- result.update(bulk_response)
-
- return result
- def register_hook (self, hook_type, hook_fn, *extra_args, **extra_kwargs):
- if not hook_type in self.hooks:
- self.hooks[hook_type] = []
-
- self.hooks[hook_type].append([hook_fn, extra_args, extra_kwargs])
- def invoke_hooks (self, hook_type, *args, **kwargs):
- if not hook_type in self.hooks:
- return
-
- for hook, extra_args, extra_kwargs in self.hooks[hook_type]:
- hook_args = args
- hook_kwargs = kwargs
- if extra_args:
- hook_args = args + extra_args
- if extra_kwargs:
- hook_kwargs = {**extra_kwargs, **hook_kwargs}
-
- hook(*hook_args, **hook_kwargs)
-
- #try:
- # hook(*args, **kwargs)
- #except TypeError as e:
- # print ('tried to call a hook with wrong args. no problem')
- # continue
- class ObjectCache:
-
- create_stmt = """
- create table content (
- provider text,
- id text,
- dt datetime,
- args text, -- could hash
- type text,
- data blob,
- unique (provider, id, dt, args)
- )
- """
-
- insert_stmt = """
- INSERT INTO content (dt, provider, id, args, type, data)
- VALUES (current_timestamp, ?, ?, ?, ?, ?)
- """
-
- select_latest_stmt = """
- SELECT * from content
- WHERE {where_sql}
- GROUP BY provider, id, dt, args
- HAVING dt = max(dt)
-
- """
-
- def __init__ (self, db_path):
- self.db_path = db_path
-
- def put (self, key, value):
- pass
-
- def get (self, key):
- pass
- # The app was coded before we turned this into a class...
- # so we proxy calls with the old interface to this default instance.
- DEFAULT = ContentSystem()
- def reset ():
- print ('compat resetting content system')
- DEFAULT = ContentSystem()
- def register_content_source (id_prefix, content_source_fn, id_pattern='(\d+)', source_id=None, weight=None):
- print('compat register_content_source')
- return DEFAULT.register_content_source(id_prefix, content_source_fn, id_pattern, source_id)
-
- def get_content (content_id, content_source_id=None, *extra_args, **extra_kwargs):
- print('compat get_content')
- return DEFAULT.get_content(content_id, content_source_id, *extra_args, **extra_kwargs)
-
- def get_all_content (content_ids, enable_bulk_fetch=False):
- print('compat get_all_content')
- return DEFAULT.get_all_content(content_ids, enable_bulk_fetch=enable_bulk_fetch)
-
- def register_hook (hook_type, hook_fn, *extra_args, **extra_kwargs):
- print('compat register_hook')
- return DEFAULT.register_hook(hook_type, hook_fn, *extra_args, **extra_kwargs)
-
- def invoke_hooks (hook_type, *args, **kwargs):
- print('compat invoke_hooks')
- return DEFAULT.invoke_hooks(hook_type, *args, **kwargs)
|