123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- import os
- import json
- import hashlib
- import time
- import shutil
- import requests
- CACHE_PATH='.data'
- CACHE_QUOTA=5*8192
- def find_cache_usage ():
- usage = 0
- with os.scandir(CACHE_PATH) as sd:
- for entry in sd:
- if entry.name.endswith('.headers'):
- continue
- usage = usage + entry.stat().st_size
-
- return usage
- def find_oldest_file ():
- """
-
- Returns a DirEntry for the oldest file in the cache path.
-
- """
- oldest_file = None
- with os.scandir(CACHE_PATH) as sd:
- for entry in sd:
- if entry.name.endswith('.headers'):
- continue
- ## print(entry.stat())
- osatime = os.path.getatime(entry.path)
- ## print(f'{entry.name} atime = {entry.stat().st_atime}, osatime = {osatime}')
- #if not oldest_file or entry.stat().st_atime < oldest_file.stat().st_atime:
- if not oldest_file or osatime < oldest_file[1]:
- oldest_file = [entry, osatime]
-
- if oldest_file:
- return oldest_file[0]
- def purge_cache (request_free_bytes=0):
- """
-
- Deletes the oldest files from the cache until enough bytes have been freed.
-
- Returns false if not enough bytes could be freed.
-
- Deletes nothing if request bytes freed is 0
-
- """
- usage = find_cache_usage()
-
- request_free_bytes = request_free_bytes - (CACHE_QUOTA - usage)
- bytes_freed = 0
- oldest_file = find_oldest_file()
- while oldest_file and bytes_freed < request_free_bytes:
- file_size = oldest_file.stat().st_size
- ## print(f'purge_cache: deleting {oldest_file.name}')
- os.remove(oldest_file.path)
- if os.path.exists(oldest_file.path + '.headers'):
- os.remove(oldest_file.path + '.headers')
- bytes_freed = bytes_freed + file_size
-
- oldest_file = find_oldest_file()
-
-
- if request_free_bytes < bytes_freed:
- return False
-
-
- return True
- def filename_for_url (url):
- filename = hashlib.md5(url.encode('utf-8')).hexdigest()
-
- return filename
- def write_response (url, cache_filename, resp):
- with open(CACHE_PATH + '/' + cache_filename + '.headers', 'wt', encoding='utf-8') as f:
- headers = dict(resp.headers)
- headers['X-Request-URL'] = url
- headers['X-Cache-Filename'] = cache_filename
- json.dump(headers, f, indent=2)
-
- with open(CACHE_PATH + '/' + cache_filename, 'wb') as f:
- f.write(resp.content)
- def request_url (url):
- # add auth like S3
- # idea: credentials like .netrc
- return requests.get(url)
- def fetch_file (url):
- cache_filename = filename_for_url(url)
-
- if os.path.exists(CACHE_PATH + '/' + cache_filename):
- # check expiration
- return CACHE_PATH + '/' + cache_filename
-
- resp = request_url(url)
-
- content_length = resp.headers.get('Content-Length', 0)
-
- if content_length == 0:
- content_length = len(resp.content)
- print(f'WARNING: Content-Length = 0, url = {url}, content len = {content_length}')
-
- purge_cache(request_free_bytes=content_length)
-
- write_response(url, cache_filename, resp)
-
- return CACHE_PATH + '/' + cache_filename
- def main ():
- if os.path.exists(CACHE_PATH):
- shutil.rmtree(CACHE_PATH)
-
- os.mkdir(CACHE_PATH)
-
- test_purge_cache()
-
- shutil.rmtree(CACHE_PATH)
- os.mkdir(CACHE_PATH)
-
- test_write_quota()
-
- shutil.rmtree(CACHE_PATH)
- def test_purge_cache ():
- # create 3 files, each 8KB
- for i in range(0, 5):
- with open(f'{CACHE_PATH}/{i}.txt', 'wb') as f:
- buf = os.urandom(8192)
- f.write(buf)
- time.sleep(1)
-
- with open(f'{CACHE_PATH}/0.txt', 'rb') as f:
- f.read()
-
- oldest_file = find_oldest_file()
-
- print(f'oldest_file: {oldest_file.name}')
- assert oldest_file.name == '1.txt'
-
- purge_cache(request_free_bytes=8192)
-
- oldest_file = find_oldest_file()
-
- print(f'oldest_file: {oldest_file.name}')
- assert oldest_file.name == '2.txt'
-
- purge_cache(request_free_bytes=0)
-
-
- oldest_file = find_oldest_file()
-
- print(f'oldest_file: {oldest_file.name}')
- assert oldest_file.name == '2.txt'
-
- purge_cache(request_free_bytes=2*8192)
-
-
- oldest_file = find_oldest_file()
-
- print(f'oldest_file: {oldest_file.name}')
- assert oldest_file.name == '3.txt'
-
- purge_cache(request_free_bytes=3*8192)
-
-
- oldest_file = find_oldest_file()
-
- print(f'oldest_file: {oldest_file.name}')
- assert oldest_file.name == '4.txt'
-
- purge_cache(request_free_bytes=4*8192)
-
-
- oldest_file = find_oldest_file()
-
- print(f'oldest_file: {oldest_file.name}')
- assert oldest_file.name == '0.txt'
-
- purge_cache(request_free_bytes=5*8192)
-
-
- oldest_file = find_oldest_file()
-
- assert oldest_file == None
-
- # open the first file
- # purge cache
- # assert first 2 files exists
- # purge cache
- # assert only second file exists
- # clear file
-
- return True
- def test_write_quota ():
- for i in range(0, 5):
- with open(f'{CACHE_PATH}/{i}.txt', 'wb') as f:
- buf = os.urandom(8192)
- f.write(buf)
-
- time.sleep(1)
-
- purge_cache(request_free_bytes=2*8192)
-
- oldest_file = find_oldest_file()
-
- print(f'oldest_file: {oldest_file.name}')
- assert oldest_file.name == '2.txt'
-
- purge_cache(request_free_bytes=2*8192)
-
- oldest_file = find_oldest_file()
-
- print(f'oldest_file: {oldest_file.name}')
- assert oldest_file.name == '2.txt'
-
- purge_cache(request_free_bytes=2*8192 + 1)
-
- oldest_file = find_oldest_file()
-
- print(f'oldest_file: {oldest_file.name}')
- assert oldest_file.name == '3.txt'
- if __name__ == '__main__':
- main()
|