""" A registry for content sources that work in terms of the View Model (view_model.py). Generally a source returns a CollectionPage or individual items. At present many sources return a List of Maps because the design is being discovered and solidified as it makes sense rather than big design up front. May end up similar to Android's ContentProvider, found later. I was thinking about using content:// URI scheme. https://developer.android.com/reference/android/content/ContentProvider Could also be similar to a Coccoon Generator https://cocoon.apache.org/1363_1_1.html Later processing in Python: https://www.innuy.com/blog/build-data-pipeline-python/ https://www.bonobo-project.org/ """ import re import inspect from functools import lru_cache import time def get_ttl_hash(seconds=3600): """Return the same value withing `seconds` time period""" return round(time.time() / seconds) class ContentSystem: def __init__ (self): self.content_sources = {} self.hooks = {} def register_content_source (self, id_prefix, content_source_fn, id_pattern='(\d+)', source_id=None, weight=None): if not source_id: source_id=f'{inspect.getmodule(content_source_fn).__name__}:{content_source_fn.__name__}' if weight == None: weight = 1000 - len(self.content_sources) print(f'register_content_source: {id_prefix}: {source_id} with ID pattern {id_pattern} (weight={weight})') self.content_sources[ source_id ] = [id_prefix, content_source_fn, id_pattern, source_id, weight] @lru_cache(maxsize=1024) # NOTE: mutating return value mutates cached value def find_content_id_args (self, id_pattern, content_id): id_args = re.fullmatch(id_pattern, content_id) if not id_args: return [], {} args = [] kwargs = id_args.groupdict() if not kwargs: args = id_args.groups() return args, kwargs def resolve_content_source (self, content_id, content_source_id=None, *extra_args, **extra_kwargs): """ Resolve possible content sources given the parameters used for get_content. Returns a generator, typically the first will be used. Allows for content modules to determine if sources are available, without fetching the content itself. """ #source_ids = list(self.content_sources.keys()) #source_ids.sort(key=lambda id_prefix: len(id_prefix), reverse=True) source_ids = list(self.content_sources.keys()) source_ids.sort(key=lambda id_prefix: self.content_sources[id_prefix][4], reverse=True) # 4 = weight #print(source_ids) for source_id in source_ids: if content_source_id and source_id != content_source_id: continue [id_prefix, content_source_fn, id_pattern, source_id, weight] = self.content_sources[ source_id ] if not content_id.startswith(id_prefix): continue source_content_id = content_id[len(id_prefix):] # HACK if not id_prefix.endswith(':') and source_content_id: continue print(f'get_content (id={source_content_id}) from source {source_id}, resolves to {source_content_id} ( weight={weight})') args, kwargs = self.find_content_id_args(id_pattern, source_content_id) # HACK if id_prefix.endswith(':') and not args and not kwargs: continue if extra_args: args += extra_args if extra_kwargs: kwargs = {**extra_kwargs, **kwargs} # if we're calling a bulk source and only get back partial results... # we'd want to remove the found content IDs and merge until # we find them all... # yet we don't want intelligence about the type of content returned. # idea: class BulkResponse(dict): pass yield content_source_fn, args, kwargs #@lru_cache(maxsize=64) def get_content (self, content_id, content_source_id=None, ttl_hash=get_ttl_hash(60), *extra_args, **extra_kwargs): """ NOTE: mutating return value mutates cached value """ print(f'get_content {content_id}') for content_source_fn, args, kwargs in self.resolve_content_source( content_id, content_source_id=content_source_id, *extra_args, **extra_kwargs): content = content_source_fn(*args, **kwargs) if content: self.invoke_hooks('got_content', content_id, content) return content #@lru_cache(maxsize=8) # NOTE: mutating return value mutates cached value def get_all_content (self, content_ids, enable_bulk_fetch=False, ttl_hash=get_ttl_hash(60)): """ Get content from all sources, using a grouping call if possible. Returns a map of source_id to to result; the caller needs to have the intelligence to merge and paginate. Native implementation is to juse make one call to get_content per ID, but we need to figure out a way to pass a list of IDs and pagination per source; for exampe a list of 100+ Tweet IDs and 100+ YT videos from a Swipe file. """ return self.get_all_content2(content_ids, enable_bulk_fetch=enable_bulk_fetch) def get_all_content2 (self, content_collection_ids, content_args = None, enable_bulk_fetch=False): """ Takes a list of collection IDs and content_args is a map of (args, kwargs) keyed by collection ID. We could just use keys from content_args with empty values but that's a little confusing. Interleaving the next page of a source into existing results is a problem. Gracefully degraded could simply get the next page at the end of all pages and then view older content. We also need intelligence about content types, meaning perhaps some lambdas pass in. Eg. CollectionPage. See feeds facade for an example of merging one page. Seems like keeping feed items in a DB is becoming the way to go, serving things in order. Client side content merging might work to insert nodes above, eg. HTMx. Might be jarring to reader, so make optional. Append all new or merge. Cache feed between requests on disk, merge in memory, send merge/append result. """ bulk_prefixes = { #'twitter:tweet:': 'twitter:tweets', #'youtube:video:': 'youtube:videos', } bulk_requests = {} result = {} for content_id in content_collection_ids: is_bulk = False if enable_bulk_fetch: for bulk_prefix in bulk_prefixes: if content_id.startswith(bulk_prefix): bulk_content_id = bulk_prefixes[ bulk_prefix ] if not bulk_content_id in bulk_requests: bulk_requests[ bulk_content_id ] = [] bulk_requests[ bulk_content_id ].append(content_id) # max size for a content source... is_bulk = True if is_bulk: continue if content_args and content_id in content_args: extra_args, extra_kwargs = content_args[content_id] else: extra_args, extra_kwargs = [], {} result[ content_id ] = self.get_content(content_id, *extra_args, **extra_kwargs) for bulk_content_id, content_ids in bulk_requests.items(): print(f'bulk: {bulk_content_id}, content_ids: {content_ids}') bulk_response = self.get_content(bulk_content_id, content_ids=tuple(content_ids)) # FIXME me=... workaround, provide bulk id in args map print(f'bulk_response: {bulk_response}') # we're not supposed to be smart about get_content response type... # does it return a map by convention? better than iterating something else. if bulk_response: for content_id, content in bulk_response.items(): if content: self.invoke_hooks('got_content', content_id, content) result.update(bulk_response) return result def register_hook (self, hook_type, hook_fn, *extra_args, **extra_kwargs): if not hook_type in self.hooks: self.hooks[hook_type] = [] self.hooks[hook_type].append([hook_fn, extra_args, extra_kwargs]) def invoke_hooks (self, hook_type, *args, **kwargs): if not hook_type in self.hooks: return for hook, extra_args, extra_kwargs in self.hooks[hook_type]: hook_args = args hook_kwargs = kwargs if extra_args: hook_args = args + extra_args if extra_kwargs: hook_kwargs = {**extra_kwargs, **hook_kwargs} hook(*hook_args, **hook_kwargs) #try: # hook(*args, **kwargs) #except TypeError as e: # print ('tried to call a hook with wrong args. no problem') # continue class ObjectCache: create_stmt = """ create table content ( provider text, id text, dt datetime, args text, -- could hash type text, data blob, unique (provider, id, dt, args) ) """ insert_stmt = """ INSERT INTO content (dt, provider, id, args, type, data) VALUES (current_timestamp, ?, ?, ?, ?, ?) """ select_latest_stmt = """ SELECT * from content WHERE {where_sql} GROUP BY provider, id, dt, args HAVING dt = max(dt) """ def __init__ (self, db_path): self.db_path = db_path def put (self, key, value): pass def get (self, key): pass # The app was coded before we turned this into a class... # so we proxy calls with the old interface to this default instance. DEFAULT = ContentSystem() def reset (): print ('compat resetting content system') DEFAULT = ContentSystem() def register_content_source (id_prefix, content_source_fn, id_pattern='(\d+)', source_id=None, weight=None): print('compat register_content_source') return DEFAULT.register_content_source(id_prefix, content_source_fn, id_pattern, source_id) def get_content (content_id, content_source_id=None, *extra_args, **extra_kwargs): print('compat get_content') return DEFAULT.get_content(content_id, content_source_id, *extra_args, **extra_kwargs) def get_all_content (content_ids, enable_bulk_fetch=False): print('compat get_all_content') return DEFAULT.get_all_content(content_ids, enable_bulk_fetch=enable_bulk_fetch) def register_hook (hook_type, hook_fn, *extra_args, **extra_kwargs): print('compat register_hook') return DEFAULT.register_hook(hook_type, hook_fn, *extra_args, **extra_kwargs) def invoke_hooks (hook_type, *args, **kwargs): print('compat invoke_hooks') return DEFAULT.invoke_hooks(hook_type, *args, **kwargs)