@@ -0,0 +1,237 @@
+import os
+import json
+import hashlib
+import time
+import shutil
+import requests
+def find_cache_usage ():
+ usage = 0
+ with os.scandir(CACHE_PATH) as sd:
+ for entry in sd:
+ if entry.name.endswith('.headers'):
+ continue
+ usage = usage + entry.stat().st_size
+ return usage
+def find_oldest_file ():
+ """
+ Returns a DirEntry for the oldest file in the cache path.
+ """
+ oldest_file = None
+ with os.scandir(CACHE_PATH) as sd:
+ for entry in sd:
+ if entry.name.endswith('.headers'):
+ continue
+ ## print(entry.stat())
+ osatime = os.path.getatime(entry.path)
+ ## print(f'{entry.name} atime = {entry.stat().st_atime}, osatime = {osatime}')
+ #if not oldest_file or entry.stat().st_atime < oldest_file.stat().st_atime:
+ if not oldest_file or osatime < oldest_file[1]:
+ oldest_file = [entry, osatime]
+ if oldest_file:
+ return oldest_file[0]
+def purge_cache (request_free_bytes=0):
+ """
+ Deletes the oldest files from the cache until enough bytes have been freed.
+ Returns false if not enough bytes could be freed.
+ Deletes nothing if request bytes freed is 0
+ """
+ usage = find_cache_usage()
+ request_free_bytes = request_free_bytes - (CACHE_QUOTA - usage)
+ bytes_freed = 0
+ oldest_file = find_oldest_file()
+ while oldest_file and bytes_freed < request_free_bytes:
+ file_size = oldest_file.stat().st_size
+ ## print(f'purge_cache: deleting {oldest_file.name}')
+ os.remove(oldest_file.path)
+ if os.path.exists(oldest_file.path + '.headers'):
+ os.remove(oldest_file.path + '.headers')
+ bytes_freed = bytes_freed + file_size
+ oldest_file = find_oldest_file()
+ if request_free_bytes < bytes_freed:
+ return False
+ return True
+def filename_for_url (url):
+ filename = hashlib.md5(url.encode('utf-8')).hexdigest()
+ return filename
+def write_response (url, cache_filename, resp):
+ with open(CACHE_PATH + '/' + cache_filename + '.headers', 'wt', encoding='utf-8') as f:
+ headers = dict(resp.headers)
+ headers['X-Request-URL'] = url
+ headers['X-Cache-Filename'] = cache_filename
+ json.dump(headers, f, indent=2)
+ with open(CACHE_PATH + '/' + cache_filename, 'wb') as f:
+ f.write(resp.content)
+def request_url (url):
+ # add auth like S3
+ # idea: credentials like .netrc
+ return requests.get(url)
+def fetch_file (url):
+ cache_filename = filename_for_url(url)
+ if os.path.exists(CACHE_PATH + '/' + cache_filename):
+ # check expiration
+ return CACHE_PATH + '/' + cache_filename
+ resp = request_url(url)
+ content_length = resp.headers.get('Content-Length', 0)
+ if content_length == 0:
+ content_length = len(resp.content)
+ print(f'WARNING: Content-Length = 0, url = {url}, content len = {content_length}')
+ purge_cache(request_free_bytes=content_length)
+ write_response(url, cache_filename, resp)
+ return CACHE_PATH + '/' + cache_filename
+def main ():
+ if os.path.exists(CACHE_PATH):
+ shutil.rmtree(CACHE_PATH)
+ os.mkdir(CACHE_PATH)
+ test_purge_cache()
+ shutil.rmtree(CACHE_PATH)
+ os.mkdir(CACHE_PATH)
+ test_write_quota()
+ shutil.rmtree(CACHE_PATH)
+def test_purge_cache ():
+ # create 3 files, each 8KB
+ for i in range(0, 5):
+ with open(f'{CACHE_PATH}/{i}.txt', 'wb') as f:
+ buf = os.urandom(8192)
+ f.write(buf)
+ time.sleep(1)
+ with open(f'{CACHE_PATH}/0.txt', 'rb') as f:
+ f.read()
+ oldest_file = find_oldest_file()
+ print(f'oldest_file: {oldest_file.name}')
+ assert oldest_file.name == '1.txt'
+ purge_cache(request_free_bytes=8192)
+ oldest_file = find_oldest_file()
+ print(f'oldest_file: {oldest_file.name}')
+ assert oldest_file.name == '2.txt'
+ purge_cache(request_free_bytes=0)
+ oldest_file = find_oldest_file()
+ print(f'oldest_file: {oldest_file.name}')
+ assert oldest_file.name == '2.txt'
+ purge_cache(request_free_bytes=2*8192)
+ oldest_file = find_oldest_file()
+ print(f'oldest_file: {oldest_file.name}')
+ assert oldest_file.name == '3.txt'
+ purge_cache(request_free_bytes=3*8192)
+ oldest_file = find_oldest_file()
+ print(f'oldest_file: {oldest_file.name}')
+ assert oldest_file.name == '4.txt'
+ purge_cache(request_free_bytes=4*8192)
+ oldest_file = find_oldest_file()
+ print(f'oldest_file: {oldest_file.name}')
+ assert oldest_file.name == '0.txt'
+ purge_cache(request_free_bytes=5*8192)
+ oldest_file = find_oldest_file()
+ assert oldest_file == None
+ # open the first file
+ # purge cache
+ # assert first 2 files exists
+ # purge cache
+ # assert only second file exists
+ # clear file
+ return True
+def test_write_quota ():
+ for i in range(0, 5):
+ with open(f'{CACHE_PATH}/{i}.txt', 'wb') as f:
+ buf = os.urandom(8192)
+ f.write(buf)
+ time.sleep(1)
+ purge_cache(request_free_bytes=2*8192)
+ oldest_file = find_oldest_file()
+ print(f'oldest_file: {oldest_file.name}')
+ assert oldest_file.name == '2.txt'
+ purge_cache(request_free_bytes=2*8192)
+ oldest_file = find_oldest_file()
+ print(f'oldest_file: {oldest_file.name}')
+ assert oldest_file.name == '2.txt'
+ purge_cache(request_free_bytes=2*8192 + 1)
+ oldest_file = find_oldest_file()
+ print(f'oldest_file: {oldest_file.name}')
+ assert oldest_file.name == '3.txt'
+if __name__ == '__main__':
+ main()