123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635 |
- # -*- coding: utf-8 -*-
- # Unit tests for cache framework
- # Uses whatever cache backend is set in the test settings file.
- from __future__ import with_statement, absolute_import
- import hashlib
- import os
- import re
- import tempfile
- import time
- import warnings
- from django.conf import settings
- from django.core import management
- from django.core.cache import get_cache, DEFAULT_CACHE_ALIAS
- from django.core.cache.backends.base import (CacheKeyWarning,
- InvalidCacheBackendError)
- from django.http import HttpResponse, HttpRequest, QueryDict
- from django.middleware.cache import (FetchFromCacheMiddleware,
- UpdateCacheMiddleware, CacheMiddleware)
- from django.template import Template
- from django.template.response import TemplateResponse
- from django.test import TestCase, RequestFactory
- from django.test.utils import (get_warnings_state, restore_warnings_state,
- override_settings)
- from django.utils import timezone, translation, unittest
- from django.utils.cache import (patch_vary_headers, get_cache_key,
- learn_cache_key, patch_cache_control, patch_response_headers)
- from django.views.decorators.cache import cache_page
- from .models import Poll, expensive_calculation
- # functions/classes for complex data type tests
- def f():
- return 42
- class C:
- def m(n):
- return 24
- class DummyCacheTests(unittest.TestCase):
- # The Dummy cache backend doesn't really behave like a test backend,
- # so it has different test requirements.
- backend_name = 'django.core.cache.backends.dummy.DummyCache'
- def setUp(self):
- self.cache = get_cache(self.backend_name)
- def test_simple(self):
- "Dummy cache backend ignores cache set calls"
- self.cache.set("key", "value")
- self.assertEqual(self.cache.get("key"), None)
- def test_add(self):
- "Add doesn't do anything in dummy cache backend"
- self.cache.add("addkey1", "value")
- result = self.cache.add("addkey1", "newvalue")
- self.assertEqual(result, True)
- self.assertEqual(self.cache.get("addkey1"), None)
- def test_non_existent(self):
- "Non-existent keys aren't found in the dummy cache backend"
- self.assertEqual(self.cache.get("does_not_exist"), None)
- self.assertEqual(self.cache.get("does_not_exist", "bang!"), "bang!")
- def test_get_many(self):
- "get_many returns nothing for the dummy cache backend"
- self.cache.set('a', 'a')
- self.cache.set('b', 'b')
- self.cache.set('c', 'c')
- self.cache.set('d', 'd')
- self.assertEqual(self.cache.get_many(['a', 'c', 'd']), {})
- self.assertEqual(self.cache.get_many(['a', 'b', 'e']), {})
- def test_delete(self):
- "Cache deletion is transparently ignored on the dummy cache backend"
- self.cache.set("key1", "spam")
- self.cache.set("key2", "eggs")
- self.assertEqual(self.cache.get("key1"), None)
- self.cache.delete("key1")
- self.assertEqual(self.cache.get("key1"), None)
- self.assertEqual(self.cache.get("key2"), None)
- def test_has_key(self):
- "The has_key method doesn't ever return True for the dummy cache backend"
- self.cache.set("hello1", "goodbye1")
- self.assertEqual(self.cache.has_key("hello1"), False)
- self.assertEqual(self.cache.has_key("goodbye1"), False)
- def test_in(self):
- "The in operator doesn't ever return True for the dummy cache backend"
- self.cache.set("hello2", "goodbye2")
- self.assertEqual("hello2" in self.cache, False)
- self.assertEqual("goodbye2" in self.cache, False)
- def test_incr(self):
- "Dummy cache values can't be incremented"
- self.cache.set('answer', 42)
- self.assertRaises(ValueError, self.cache.incr, 'answer')
- self.assertRaises(ValueError, self.cache.incr, 'does_not_exist')
- def test_decr(self):
- "Dummy cache values can't be decremented"
- self.cache.set('answer', 42)
- self.assertRaises(ValueError, self.cache.decr, 'answer')
- self.assertRaises(ValueError, self.cache.decr, 'does_not_exist')
- def test_data_types(self):
- "All data types are ignored equally by the dummy cache"
- stuff = {
- 'string' : 'this is a string',
- 'int' : 42,
- 'list' : [1, 2, 3, 4],
- 'tuple' : (1, 2, 3, 4),
- 'dict' : {'A': 1, 'B' : 2},
- 'function' : f,
- 'class' : C,
- }
- self.cache.set("stuff", stuff)
- self.assertEqual(self.cache.get("stuff"), None)
- def test_expiration(self):
- "Expiration has no effect on the dummy cache"
- self.cache.set('expire1', 'very quickly', 1)
- self.cache.set('expire2', 'very quickly', 1)
- self.cache.set('expire3', 'very quickly', 1)
- time.sleep(2)
- self.assertEqual(self.cache.get("expire1"), None)
- self.cache.add("expire2", "newvalue")
- self.assertEqual(self.cache.get("expire2"), None)
- self.assertEqual(self.cache.has_key("expire3"), False)
- def test_unicode(self):
- "Unicode values are ignored by the dummy cache"
- stuff = {
- u'ascii': u'ascii_value',
- u'unicode_ascii': u'Iñtërnâtiônàlizætiøn1',
- u'Iñtërnâtiônàlizætiøn': u'Iñtërnâtiônàlizætiøn2',
- u'ascii2': {u'x' : 1 }
- }
- for (key, value) in stuff.items():
- self.cache.set(key, value)
- self.assertEqual(self.cache.get(key), None)
- def test_set_many(self):
- "set_many does nothing for the dummy cache backend"
- self.cache.set_many({'a': 1, 'b': 2})
- self.cache.set_many({'a': 1, 'b': 2}, timeout=2, version='1')
- def test_delete_many(self):
- "delete_many does nothing for the dummy cache backend"
- self.cache.delete_many(['a', 'b'])
- def test_clear(self):
- "clear does nothing for the dummy cache backend"
- self.cache.clear()
- def test_incr_version(self):
- "Dummy cache versions can't be incremented"
- self.cache.set('answer', 42)
- self.assertRaises(ValueError, self.cache.incr_version, 'answer')
- self.assertRaises(ValueError, self.cache.incr_version, 'does_not_exist')
- def test_decr_version(self):
- "Dummy cache versions can't be decremented"
- self.cache.set('answer', 42)
- self.assertRaises(ValueError, self.cache.decr_version, 'answer')
- self.assertRaises(ValueError, self.cache.decr_version, 'does_not_exist')
- class BaseCacheTests(object):
- # A common set of tests to apply to all cache backends
- def test_simple(self):
- # Simple cache set/get works
- self.cache.set("key", "value")
- self.assertEqual(self.cache.get("key"), "value")
- def test_add(self):
- # A key can be added to a cache
- self.cache.add("addkey1", "value")
- result = self.cache.add("addkey1", "newvalue")
- self.assertEqual(result, False)
- self.assertEqual(self.cache.get("addkey1"), "value")
- def test_prefix(self):
- # Test for same cache key conflicts between shared backend
- self.cache.set('somekey', 'value')
- # should not be set in the prefixed cache
- self.assertFalse(self.prefix_cache.has_key('somekey'))
- self.prefix_cache.set('somekey', 'value2')
- self.assertEqual(self.cache.get('somekey'), 'value')
- self.assertEqual(self.prefix_cache.get('somekey'), 'value2')
- def test_non_existent(self):
- # Non-existent cache keys return as None/default
- # get with non-existent keys
- self.assertEqual(self.cache.get("does_not_exist"), None)
- self.assertEqual(self.cache.get("does_not_exist", "bang!"), "bang!")
- def test_get_many(self):
- # Multiple cache keys can be returned using get_many
- self.cache.set('a', 'a')
- self.cache.set('b', 'b')
- self.cache.set('c', 'c')
- self.cache.set('d', 'd')
- self.assertEqual(self.cache.get_many(['a', 'c', 'd']), {'a' : 'a', 'c' : 'c', 'd' : 'd'})
- self.assertEqual(self.cache.get_many(['a', 'b', 'e']), {'a' : 'a', 'b' : 'b'})
- def test_delete(self):
- # Cache keys can be deleted
- self.cache.set("key1", "spam")
- self.cache.set("key2", "eggs")
- self.assertEqual(self.cache.get("key1"), "spam")
- self.cache.delete("key1")
- self.assertEqual(self.cache.get("key1"), None)
- self.assertEqual(self.cache.get("key2"), "eggs")
- def test_has_key(self):
- # The cache can be inspected for cache keys
- self.cache.set("hello1", "goodbye1")
- self.assertEqual(self.cache.has_key("hello1"), True)
- self.assertEqual(self.cache.has_key("goodbye1"), False)
- def test_in(self):
- # The in operator can be used to inspect cache contents
- self.cache.set("hello2", "goodbye2")
- self.assertEqual("hello2" in self.cache, True)
- self.assertEqual("goodbye2" in self.cache, False)
- def test_incr(self):
- # Cache values can be incremented
- self.cache.set('answer', 41)
- self.assertEqual(self.cache.incr('answer'), 42)
- self.assertEqual(self.cache.get('answer'), 42)
- self.assertEqual(self.cache.incr('answer', 10), 52)
- self.assertEqual(self.cache.get('answer'), 52)
- self.assertRaises(ValueError, self.cache.incr, 'does_not_exist')
- def test_decr(self):
- # Cache values can be decremented
- self.cache.set('answer', 43)
- self.assertEqual(self.cache.decr('answer'), 42)
- self.assertEqual(self.cache.get('answer'), 42)
- self.assertEqual(self.cache.decr('answer', 10), 32)
- self.assertEqual(self.cache.get('answer'), 32)
- self.assertRaises(ValueError, self.cache.decr, 'does_not_exist')
- def test_data_types(self):
- # Many different data types can be cached
- stuff = {
- 'string' : 'this is a string',
- 'int' : 42,
- 'list' : [1, 2, 3, 4],
- 'tuple' : (1, 2, 3, 4),
- 'dict' : {'A': 1, 'B' : 2},
- 'function' : f,
- 'class' : C,
- }
- self.cache.set("stuff", stuff)
- self.assertEqual(self.cache.get("stuff"), stuff)
- def test_cache_read_for_model_instance(self):
- # Don't want fields with callable as default to be called on cache read
- expensive_calculation.num_runs = 0
- Poll.objects.all().delete()
- my_poll = Poll.objects.create(question="Well?")
- self.assertEqual(Poll.objects.count(), 1)
- pub_date = my_poll.pub_date
- self.cache.set('question', my_poll)
- cached_poll = self.cache.get('question')
- self.assertEqual(cached_poll.pub_date, pub_date)
- # We only want the default expensive calculation run once
- self.assertEqual(expensive_calculation.num_runs, 1)
- def test_cache_write_for_model_instance_with_deferred(self):
- # Don't want fields with callable as default to be called on cache write
- expensive_calculation.num_runs = 0
- Poll.objects.all().delete()
- my_poll = Poll.objects.create(question="What?")
- self.assertEqual(expensive_calculation.num_runs, 1)
- defer_qs = Poll.objects.all().defer('question')
- self.assertEqual(defer_qs.count(), 1)
- self.assertEqual(expensive_calculation.num_runs, 1)
- self.cache.set('deferred_queryset', defer_qs)
- # cache set should not re-evaluate default functions
- self.assertEqual(expensive_calculation.num_runs, 1)
- def test_cache_read_for_model_instance_with_deferred(self):
- # Don't want fields with callable as default to be called on cache read
- expensive_calculation.num_runs = 0
- Poll.objects.all().delete()
- my_poll = Poll.objects.create(question="What?")
- self.assertEqual(expensive_calculation.num_runs, 1)
- defer_qs = Poll.objects.all().defer('question')
- self.assertEqual(defer_qs.count(), 1)
- self.cache.set('deferred_queryset', defer_qs)
- self.assertEqual(expensive_calculation.num_runs, 1)
- runs_before_cache_read = expensive_calculation.num_runs
- cached_polls = self.cache.get('deferred_queryset')
- # We only want the default expensive calculation run on creation and set
- self.assertEqual(expensive_calculation.num_runs, runs_before_cache_read)
- def test_expiration(self):
- # Cache values can be set to expire
- self.cache.set('expire1', 'very quickly', 1)
- self.cache.set('expire2', 'very quickly', 1)
- self.cache.set('expire3', 'very quickly', 1)
- time.sleep(2)
- self.assertEqual(self.cache.get("expire1"), None)
- self.cache.add("expire2", "newvalue")
- self.assertEqual(self.cache.get("expire2"), "newvalue")
- self.assertEqual(self.cache.has_key("expire3"), False)
- def test_unicode(self):
- # Unicode values can be cached
- stuff = {
- u'ascii': u'ascii_value',
- u'unicode_ascii': u'Iñtërnâtiônàlizætiøn1',
- u'Iñtërnâtiônàlizætiøn': u'Iñtërnâtiônàlizætiøn2',
- u'ascii2': {u'x' : 1 }
- }
- # Test `set`
- for (key, value) in stuff.items():
- self.cache.set(key, value)
- self.assertEqual(self.cache.get(key), value)
- # Test `add`
- for (key, value) in stuff.items():
- self.cache.delete(key)
- self.cache.add(key, value)
- self.assertEqual(self.cache.get(key), value)
- # Test `set_many`
- for (key, value) in stuff.items():
- self.cache.delete(key)
- self.cache.set_many(stuff)
- for (key, value) in stuff.items():
- self.assertEqual(self.cache.get(key), value)
- def test_binary_string(self):
- # Binary strings should be cacheable
- from zlib import compress, decompress
- value = 'value_to_be_compressed'
- compressed_value = compress(value)
- # Test set
- self.cache.set('binary1', compressed_value)
- compressed_result = self.cache.get('binary1')
- self.assertEqual(compressed_value, compressed_result)
- self.assertEqual(value, decompress(compressed_result))
- # Test add
- self.cache.add('binary1-add', compressed_value)
- compressed_result = self.cache.get('binary1-add')
- self.assertEqual(compressed_value, compressed_result)
- self.assertEqual(value, decompress(compressed_result))
- # Test set_many
- self.cache.set_many({'binary1-set_many': compressed_value})
- compressed_result = self.cache.get('binary1-set_many')
- self.assertEqual(compressed_value, compressed_result)
- self.assertEqual(value, decompress(compressed_result))
- def test_set_many(self):
- # Multiple keys can be set using set_many
- self.cache.set_many({"key1": "spam", "key2": "eggs"})
- self.assertEqual(self.cache.get("key1"), "spam")
- self.assertEqual(self.cache.get("key2"), "eggs")
- def test_set_many_expiration(self):
- # set_many takes a second ``timeout`` parameter
- self.cache.set_many({"key1": "spam", "key2": "eggs"}, 1)
- time.sleep(2)
- self.assertEqual(self.cache.get("key1"), None)
- self.assertEqual(self.cache.get("key2"), None)
- def test_delete_many(self):
- # Multiple keys can be deleted using delete_many
- self.cache.set("key1", "spam")
- self.cache.set("key2", "eggs")
- self.cache.set("key3", "ham")
- self.cache.delete_many(["key1", "key2"])
- self.assertEqual(self.cache.get("key1"), None)
- self.assertEqual(self.cache.get("key2"), None)
- self.assertEqual(self.cache.get("key3"), "ham")
- def test_clear(self):
- # The cache can be emptied using clear
- self.cache.set("key1", "spam")
- self.cache.set("key2", "eggs")
- self.cache.clear()
- self.assertEqual(self.cache.get("key1"), None)
- self.assertEqual(self.cache.get("key2"), None)
- def test_long_timeout(self):
- '''
- Using a timeout greater than 30 days makes memcached think
- it is an absolute expiration timestamp instead of a relative
- offset. Test that we honour this convention. Refs #12399.
- '''
- self.cache.set('key1', 'eggs', 60*60*24*30 + 1) #30 days + 1 second
- self.assertEqual(self.cache.get('key1'), 'eggs')
- self.cache.add('key2', 'ham', 60*60*24*30 + 1)
- self.assertEqual(self.cache.get('key2'), 'ham')
- self.cache.set_many({'key3': 'sausage', 'key4': 'lobster bisque'}, 60*60*24*30 + 1)
- self.assertEqual(self.cache.get('key3'), 'sausage')
- self.assertEqual(self.cache.get('key4'), 'lobster bisque')
- def test_float_timeout(self):
- # Make sure a timeout given as a float doesn't crash anything.
- self.cache.set("key1", "spam", 100.2)
- self.assertEqual(self.cache.get("key1"), "spam")
- def perform_cull_test(self, initial_count, final_count):
- """This is implemented as a utility method, because only some of the backends
- implement culling. The culling algorithm also varies slightly, so the final
- number of entries will vary between backends"""
- # Create initial cache key entries. This will overflow the cache, causing a cull
- for i in range(1, initial_count):
- self.cache.set('cull%d' % i, 'value', 1000)
- count = 0
- # Count how many keys are left in the cache.
- for i in range(1, initial_count):
- if self.cache.has_key('cull%d' % i):
- count = count + 1
- self.assertEqual(count, final_count)
- def test_invalid_keys(self):
- """
- All the builtin backends (except memcached, see below) should warn on
- keys that would be refused by memcached. This encourages portable
- caching code without making it too difficult to use production backends
- with more liberal key rules. Refs #6447.
- """
- # mimic custom ``make_key`` method being defined since the default will
- # never show the below warnings
- def func(key, *args):
- return key
- old_func = self.cache.key_func
- self.cache.key_func = func
- # On Python 2.6+ we could use the catch_warnings context
- # manager to test this warning nicely. Since we can't do that
- # yet, the cleanest option is to temporarily ask for
- # CacheKeyWarning to be raised as an exception.
- _warnings_state = get_warnings_state()
- warnings.simplefilter("error", CacheKeyWarning)
- try:
- # memcached does not allow whitespace or control characters in keys
- self.assertRaises(CacheKeyWarning, self.cache.set, 'key with spaces', 'value')
- # memcached limits key length to 250
- self.assertRaises(CacheKeyWarning, self.cache.set, 'a' * 251, 'value')
- finally:
- restore_warnings_state(_warnings_state)
- self.cache.key_func = old_func
- def test_cache_versioning_get_set(self):
- # set, using default version = 1
- self.cache.set('answer1', 42)
- self.assertEqual(self.cache.get('answer1'), 42)
- self.assertEqual(self.cache.get('answer1', version=1), 42)
- self.assertEqual(self.cache.get('answer1', version=2), None)
- self.assertEqual(self.v2_cache.get('answer1'), None)
- self.assertEqual(self.v2_cache.get('answer1', version=1), 42)
- self.assertEqual(self.v2_cache.get('answer1', version=2), None)
- # set, default version = 1, but manually override version = 2
- self.cache.set('answer2', 42, version=2)
- self.assertEqual(self.cache.get('answer2'), None)
- self.assertEqual(self.cache.get('answer2', version=1), None)
- self.assertEqual(self.cache.get('answer2', version=2), 42)
- self.assertEqual(self.v2_cache.get('answer2'), 42)
- self.assertEqual(self.v2_cache.get('answer2', version=1), None)
- self.assertEqual(self.v2_cache.get('answer2', version=2), 42)
- # v2 set, using default version = 2
- self.v2_cache.set('answer3', 42)
- self.assertEqual(self.cache.get('answer3'), None)
- self.assertEqual(self.cache.get('answer3', version=1), None)
- self.assertEqual(self.cache.get('answer3', version=2), 42)
- self.assertEqual(self.v2_cache.get('answer3'), 42)
- self.assertEqual(self.v2_cache.get('answer3', version=1), None)
- self.assertEqual(self.v2_cache.get('answer3', version=2), 42)
- # v2 set, default version = 2, but manually override version = 1
- self.v2_cache.set('answer4', 42, version=1)
- self.assertEqual(self.cache.get('answer4'), 42)
- self.assertEqual(self.cache.get('answer4', version=1), 42)
- self.assertEqual(self.cache.get('answer4', version=2), None)
- self.assertEqual(self.v2_cache.get('answer4'), None)
- self.assertEqual(self.v2_cache.get('answer4', version=1), 42)
- self.assertEqual(self.v2_cache.get('answer4', version=2), None)
- def test_cache_versioning_add(self):
- # add, default version = 1, but manually override version = 2
- self.cache.add('answer1', 42, version=2)
- self.assertEqual(self.cache.get('answer1', version=1), None)
- self.assertEqual(self.cache.get('answer1', version=2), 42)
- self.cache.add('answer1', 37, version=2)
- self.assertEqual(self.cache.get('answer1', version=1), None)
- self.assertEqual(self.cache.get('answer1', version=2), 42)
- self.cache.add('answer1', 37, version=1)
- self.assertEqual(self.cache.get('answer1', version=1), 37)
- self.assertEqual(self.cache.get('answer1', version=2), 42)
- # v2 add, using default version = 2
- self.v2_cache.add('answer2', 42)
- self.assertEqual(self.cache.get('answer2', version=1), None)
- self.assertEqual(self.cache.get('answer2', version=2), 42)
- self.v2_cache.add('answer2', 37)
- self.assertEqual(self.cache.get('answer2', version=1), None)
- self.assertEqual(self.cache.get('answer2', version=2), 42)
- self.v2_cache.add('answer2', 37, version=1)
- self.assertEqual(self.cache.get('answer2', version=1), 37)
- self.assertEqual(self.cache.get('answer2', version=2), 42)
- # v2 add, default version = 2, but manually override version = 1
- self.v2_cache.add('answer3', 42, version=1)
- self.assertEqual(self.cache.get('answer3', version=1), 42)
- self.assertEqual(self.cache.get('answer3', version=2), None)
- self.v2_cache.add('answer3', 37, version=1)
- self.assertEqual(self.cache.get('answer3', version=1), 42)
- self.assertEqual(self.cache.get('answer3', version=2), None)
- self.v2_cache.add('answer3', 37)
- self.assertEqual(self.cache.get('answer3', version=1), 42)
- self.assertEqual(self.cache.get('answer3', version=2), 37)
- def test_cache_versioning_has_key(self):
- self.cache.set('answer1', 42)
- # has_key
- self.assertTrue(self.cache.has_key('answer1'))
- self.assertTrue(self.cache.has_key('answer1', version=1))
- self.assertFalse(self.cache.has_key('answer1', version=2))
- self.assertFalse(self.v2_cache.has_key('answer1'))
- self.assertTrue(self.v2_cache.has_key('answer1', version=1))
- self.assertFalse(self.v2_cache.has_key('answer1', version=2))
- def test_cache_versioning_delete(self):
- self.cache.set('answer1', 37, version=1)
- self.cache.set('answer1', 42, version=2)
- self.cache.delete('answer1')
- self.assertEqual(self.cache.get('answer1', version=1), None)
- self.assertEqual(self.cache.get('answer1', version=2), 42)
- self.cache.set('answer2', 37, version=1)
- self.cache.set('answer2', 42, version=2)
- self.cache.delete('answer2', version=2)
- self.assertEqual(self.cache.get('answer2', version=1), 37)
- self.assertEqual(self.cache.get('answer2', version=2), None)
- self.cache.set('answer3', 37, version=1)
- self.cache.set('answer3', 42, version=2)
- self.v2_cache.delete('answer3')
- self.assertEqual(self.cache.get('answer3', version=1), 37)
- self.assertEqual(self.cache.get('answer3', version=2), None)
- self.cache.set('answer4', 37, version=1)
- self.cache.set('answer4', 42, version=2)
- self.v2_cache.delete('answer4', version=1)
- self.assertEqual(self.cache.get('answer4', version=1), None)
- self.assertEqual(self.cache.get('answer4', version=2), 42)
- def test_cache_versioning_incr_decr(self):
- self.cache.set('answer1', 37, version=1)
- self.cache.set('answer1', 42, version=2)
- self.cache.incr('answer1')
- self.assertEqual(self.cache.get('answer1', version=1), 38)
- self.assertEqual(self.cache.get('answer1', version=2), 42)
- self.cache.decr('answer1')
- self.assertEqual(self.cache.get('answer1', version=1), 37)
- self.assertEqual(self.cache.get('answer1', version=2), 42)
- self.cache.set('answer2', 37, version=1)
- self.cache.set('answer2', 42, version=2)
- self.cache.incr('answer2', version=2)
- self.assertEqual(self.cache.get('answer2', version=1), 37)
- self.assertEqual(self.cache.get('answer2', version=2), 43)
- self.cache.decr('answer2', version=2)
- self.assertEqual(self.cache.get('answer2', version=1), 37)
- self.assertEqual(self.cache.get('answer2', version=2), 42)
- self.cache.set('answer3', 37, version=1)
- self.cache.set('answer3', 42, version=2)
- self.v2_cache.incr('answer3')
- self.assertEqual(self.cache.get('answer3', version=1), 37)
- self.assertEqual(self.cache.get('answer3', version=2), 43)
- self.v2_cache.decr('answer3')
- self.assertEqual(self.cache.get('answer3', version=1), 37)
- self.assertEqual(self.cache.get('answer3', version=2), 42)
- self.cache.set('answer4', 37, version=1)
- self.cache.set('answer4', 42, version=2)
- self.v2_cache.incr('answer4', version=1)
- self.assertEqual(self.cache.get('answer4', version=1), 38)
- self.assertEqual(self.cache.get('answer4', version=2), 42)
- self.v2_cache.decr('answer4', version=1)
- self.assertEqual(self.cache.get('answer4', version=1), 37)
- self.assertEqual(self.cache.get('answer4', version=2), 42)
- def test_cache_versioning_get_set_many(self):
- # set, using default version = 1
- self.cache.set_many({'ford1': 37, 'arthur1': 42})
- self.assertEqual(self.cache.get_many(['ford1','arthur1']),
- {'ford1': 37, 'arthur1': 42})
- self.assertEqual(self.cache.get_many(['ford1','arthur1'], version=1),
- {'ford1': 37, 'arthur1': 42})
- self.assertEqual(self.cache.get_many(['ford1','arthur1'], version=2), {})
- self.assertEqual(self.v2_cache.get_many(['ford1','arthur1']), {})
- self.assertEqual(self.v2_cache.get_many(['ford1','arthur1'], version=1),
- {'ford1': 37, 'arthur1': 42})
- self.assertEqual(self.v2_cache.get_many(['ford1','arthur1'], version=2), {})
- # set, default version = 1, but manually override version = 2
- self.cache.set_many({'ford2': 37, 'arthur2': 42}, version=2)
- self.assertEqual(self.cache.get_many(['ford2','arthur2']), {})
- self.assertEqual(self.cache.get_many(['ford2','arthur2'], version=1), {})
- self.assertEqual(self.cache.get_many(['ford2','arthur2'], version=2),
- {'ford2': 37, 'arthur2': 42})
- self.assertEqual(self.v2_cache.get_many(['ford2','arthur2']),
- {'ford2': 37, 'arthur2': 42})
- self.assertEqual(self.v2_cache.get_many(['ford2','arthur2'], version=1), {})
- self.assertEqual(self.v2_cache.get_many(['ford2','arthur2'], version=2),
- {'ford2': 37, 'arthur2': 42})
- # v2 set, using default version = 2
- self.v2_cache.set_many({'ford3': 37, 'arthur3': 42})
- self.assertEqual(self.cache.get_many(['ford3','arthur3']), {})
- self.assertEqual(self.cache.get_many(['ford3','arthur3'], version=1), {})
- self.assertEqual(self.cache.get_many(['ford3','arthur3'], version=2),
- {'ford3': 37, 'arthur3': 42})
- self.assertEqual(self.v2_cache.get_many(['ford3','arthur3']),
- {'ford3': 37, 'arthur3': 42})
- self.assertEqual(self.v2_cache.get_many(['ford3','arthur3'], version=1), {})
- self.assertEqual(self.v2_cache.get_many(['ford3','arthur3'], version=2),
- {'ford3': 37, 'arthur3': 42})
- # v2 set, default version = 2, but manually override version = 1
- self.v2_cache.set_many({'ford4': 37, 'arthur4': 42}, version=1)
- self.assertEqual(self.cache.get_many(['ford4','arthur4']),
- {'ford4': 37, 'arthur4': 42})
- self.assertEqual(self.cache.get_many(['ford4','arthur4'], version=1),
- {'ford4': 37, 'arthur4': 42})
- self.assertEqual(self.cache.get_many(['ford4','arthur4'], version=2), {})
- self.assertEqual(self.v2_cache.get_many(['ford4','arthur4']), {})
- self.assertEqual(self.v2_cache.get_many(['ford4','arthur4'], version=1),
- {'ford4': 37, 'arthur4': 42})
- self.assertEqual(self.v2_cache.get_many(['ford4','arthur4'], version=2), {})
- def test_incr_version(self):
- self.cache.set('answer', 42, version=2)
- self.assertEqual(self.cache.get('answer'), None)
- self.assertEqual(self.cache.get('answer', version=1), None)
- self.assertEqual(self.cache.get('answer', version=2), 42)
- self.assertEqual(self.cache.get('answer', version=3), None)
- self.assertEqual(self.cache.incr_version('answer', version=2), 3)
- self.assertEqual(self.cache.get('answer'), None)
- self.assertEqual(self.cache.get('answer', version=1), None)
- self.assertEqual(self.cache.get('answer', version=2), None)
- self.assertEqual(self.cache.get('answer', version=3), 42)
- self.v2_cache.set('answer2', 42)
- self.assertEqual(self.v2_cache.get('answer2'), 42)
- self.assertEqual(self.v2_cache.get('answer2', version=1), None)
- self.assertEqual(self.v2_cache.get('answer2', version=2), 42)
- self.assertEqual(self.v2_cache.get('answer2', version=3), None)
- self.assertEqual(self.v2_cache.incr_version('answer2'), 3)
- self.assertEqual(self.v2_cache.get('answer2'), None)
- self.assertEqual(self.v2_cache.get('answer2', version=1), None)
- self.assertEqual(self.v2_cache.get('answer2', version=2), None)
- self.assertEqual(self.v2_cache.get('answer2', version=3), 42)
- self.assertRaises(ValueError, self.cache.incr_version, 'does_not_exist')
- def test_decr_version(self):
- self.cache.set('answer', 42, version=2)
- self.assertEqual(self.cache.get('answer'), None)
- self.assertEqual(self.cache.get('answer', version=1), None)
- self.assertEqual(self.cache.get('answer', version=2), 42)
- self.assertEqual(self.cache.decr_version('answer', version=2), 1)
- self.assertEqual(self.cache.get('answer'), 42)
- self.assertEqual(self.cache.get('answer', version=1), 42)
- self.assertEqual(self.cache.get('answer', version=2), None)
- self.v2_cache.set('answer2', 42)
- self.assertEqual(self.v2_cache.get('answer2'), 42)
- self.assertEqual(self.v2_cache.get('answer2', version=1), None)
- self.assertEqual(self.v2_cache.get('answer2', version=2), 42)
- self.assertEqual(self.v2_cache.decr_version('answer2'), 1)
- self.assertEqual(self.v2_cache.get('answer2'), None)
- self.assertEqual(self.v2_cache.get('answer2', version=1), 42)
- self.assertEqual(self.v2_cache.get('answer2', version=2), None)
- self.assertRaises(ValueError, self.cache.decr_version, 'does_not_exist', version=2)
- def test_custom_key_func(self):
- # Two caches with different key functions aren't visible to each other
- self.cache.set('answer1', 42)
- self.assertEqual(self.cache.get('answer1'), 42)
- self.assertEqual(self.custom_key_cache.get('answer1'), None)
- self.assertEqual(self.custom_key_cache2.get('answer1'), None)
- self.custom_key_cache.set('answer2', 42)
- self.assertEqual(self.cache.get('answer2'), None)
- self.assertEqual(self.custom_key_cache.get('answer2'), 42)
- self.assertEqual(self.custom_key_cache2.get('answer2'), 42)
- def custom_key_func(key, key_prefix, version):
- "A customized cache key function"
- return 'CUSTOM-' + '-'.join([key_prefix, str(version), key])
- class DBCacheTests(unittest.TestCase, BaseCacheTests):
- backend_name = 'django.core.cache.backends.db.DatabaseCache'
- def setUp(self):
- # Spaces are used in the table name to ensure quoting/escaping is working
- self._table_name = 'test cache table'
- management.call_command('createcachetable', self._table_name, verbosity=0, interactive=False)
- self.cache = get_cache(self.backend_name, LOCATION=self._table_name, OPTIONS={'MAX_ENTRIES': 30})
- self.prefix_cache = get_cache(self.backend_name, LOCATION=self._table_name, KEY_PREFIX='cacheprefix')
- self.v2_cache = get_cache(self.backend_name, LOCATION=self._table_name, VERSION=2)
- self.custom_key_cache = get_cache(self.backend_name, LOCATION=self._table_name, KEY_FUNCTION=custom_key_func)
- self.custom_key_cache2 = get_cache(self.backend_name, LOCATION=self._table_name, KEY_FUNCTION='regressiontests.cache.tests.custom_key_func')
- def tearDown(self):
- from django.db import connection
- cursor = connection.cursor()
- cursor.execute('DROP TABLE %s' % connection.ops.quote_name(self._table_name))
- def test_cull(self):
- self.perform_cull_test(50, 29)
- def test_zero_cull(self):
- self.cache = get_cache(self.backend_name, LOCATION=self._table_name, OPTIONS={'MAX_ENTRIES': 30, 'CULL_FREQUENCY': 0})
- self.perform_cull_test(50, 18)
- def test_old_initialization(self):
- self.cache = get_cache('db://%s?max_entries=30&cull_frequency=0' % self._table_name)
- self.perform_cull_test(50, 18)
- class LocMemCacheTests(unittest.TestCase, BaseCacheTests):
- backend_name = 'django.core.cache.backends.locmem.LocMemCache'
- def setUp(self):
- self.cache = get_cache(self.backend_name, OPTIONS={'MAX_ENTRIES': 30})
- self.prefix_cache = get_cache(self.backend_name, KEY_PREFIX='cacheprefix')
- self.v2_cache = get_cache(self.backend_name, VERSION=2)
- self.custom_key_cache = get_cache(self.backend_name, OPTIONS={'MAX_ENTRIES': 30}, KEY_FUNCTION=custom_key_func)
- self.custom_key_cache2 = get_cache(self.backend_name, OPTIONS={'MAX_ENTRIES': 30}, KEY_FUNCTION='regressiontests.cache.tests.custom_key_func')
- # LocMem requires a hack to make the other caches
- # share a data store with the 'normal' cache.
- self.prefix_cache._cache = self.cache._cache
- self.prefix_cache._expire_info = self.cache._expire_info
- self.v2_cache._cache = self.cache._cache
- self.v2_cache._expire_info = self.cache._expire_info
- self.custom_key_cache._cache = self.cache._cache
- self.custom_key_cache._expire_info = self.cache._expire_info
- self.custom_key_cache2._cache = self.cache._cache
- self.custom_key_cache2._expire_info = self.cache._expire_info
- def tearDown(self):
- self.cache.clear()
- def test_cull(self):
- self.perform_cull_test(50, 29)
- def test_zero_cull(self):
- self.cache = get_cache(self.backend_name, OPTIONS={'MAX_ENTRIES': 30, 'CULL_FREQUENCY': 0})
- self.perform_cull_test(50, 19)
- def test_old_initialization(self):
- self.cache = get_cache('locmem://?max_entries=30&cull_frequency=0')
- self.perform_cull_test(50, 19)
- def test_multiple_caches(self):
- "Check that multiple locmem caches are isolated"
- mirror_cache = get_cache(self.backend_name)
- other_cache = get_cache(self.backend_name, LOCATION='other')
- self.cache.set('value1', 42)
- self.assertEqual(mirror_cache.get('value1'), 42)
- self.assertEqual(other_cache.get('value1'), None)
- # memcached backend isn't guaranteed to be available.
- # To check the memcached backend, the test settings file will
- # need to contain a cache backend setting that points at
- # your memcache server.
- class MemcachedCacheTests(unittest.TestCase, BaseCacheTests):
- backend_name = 'django.core.cache.backends.memcached.MemcachedCache'
- def setUp(self):
- name = settings.CACHES[DEFAULT_CACHE_ALIAS]['LOCATION']
- self.cache = get_cache(self.backend_name, LOCATION=name)
- self.prefix_cache = get_cache(self.backend_name, LOCATION=name, KEY_PREFIX='cacheprefix')
- self.v2_cache = get_cache(self.backend_name, LOCATION=name, VERSION=2)
- self.custom_key_cache = get_cache(self.backend_name, LOCATION=name, KEY_FUNCTION=custom_key_func)
- self.custom_key_cache2 = get_cache(self.backend_name, LOCATION=name, KEY_FUNCTION='regressiontests.cache.tests.custom_key_func')
- def tearDown(self):
- self.cache.clear()
- def test_invalid_keys(self):
- """
- On memcached, we don't introduce a duplicate key validation
- step (for speed reasons), we just let the memcached API
- library raise its own exception on bad keys. Refs #6447.
- In order to be memcached-API-library agnostic, we only assert
- that a generic exception of some kind is raised.
- """
- # memcached does not allow whitespace or control characters in keys
- self.assertRaises(Exception, self.cache.set, 'key with spaces', 'value')
- # memcached limits key length to 250
- self.assertRaises(Exception, self.cache.set, 'a' * 251, 'value')
- MemcachedCacheTests = unittest.skipUnless(settings.CACHES[DEFAULT_CACHE_ALIAS]['BACKEND'].startswith('django.core.cache.backends.memcached.'), "memcached not available")(MemcachedCacheTests)
- class FileBasedCacheTests(unittest.TestCase, BaseCacheTests):
- """
- Specific test cases for the file-based cache.
- """
- backend_name = 'django.core.cache.backends.filebased.FileBasedCache'
- def setUp(self):
- self.dirname = tempfile.mkdtemp()
- self.cache = get_cache(self.backend_name, LOCATION=self.dirname, OPTIONS={'MAX_ENTRIES': 30})
- self.prefix_cache = get_cache(self.backend_name, LOCATION=self.dirname, KEY_PREFIX='cacheprefix')
- self.v2_cache = get_cache(self.backend_name, LOCATION=self.dirname, VERSION=2)
- self.custom_key_cache = get_cache(self.backend_name, LOCATION=self.dirname, KEY_FUNCTION=custom_key_func)
- self.custom_key_cache2 = get_cache(self.backend_name, LOCATION=self.dirname, KEY_FUNCTION='regressiontests.cache.tests.custom_key_func')
- def tearDown(self):
- self.cache.clear()
- def test_hashing(self):
- """Test that keys are hashed into subdirectories correctly"""
- self.cache.set("foo", "bar")
- key = self.cache.make_key("foo")
- keyhash = hashlib.md5(key).hexdigest()
- keypath = os.path.join(self.dirname, keyhash[:2], keyhash[2:4], keyhash[4:])
- self.assertTrue(os.path.exists(keypath))
- def test_subdirectory_removal(self):
- """
- Make sure that the created subdirectories are correctly removed when empty.
- """
- self.cache.set("foo", "bar")
- key = self.cache.make_key("foo")
- keyhash = hashlib.md5(key).hexdigest()
- keypath = os.path.join(self.dirname, keyhash[:2], keyhash[2:4], keyhash[4:])
- self.assertTrue(os.path.exists(keypath))
- self.cache.delete("foo")
- self.assertTrue(not os.path.exists(keypath))
- self.assertTrue(not os.path.exists(os.path.dirname(keypath)))
- self.assertTrue(not os.path.exists(os.path.dirname(os.path.dirname(keypath))))
- def test_cull(self):
- self.perform_cull_test(50, 29)
- def test_old_initialization(self):
- self.cache = get_cache('file://%s?max_entries=30' % self.dirname)
- self.perform_cull_test(50, 29)
- class CustomCacheKeyValidationTests(unittest.TestCase):
- """
- Tests for the ability to mixin a custom ``validate_key`` method to
- a custom cache backend that otherwise inherits from a builtin
- backend, and override the default key validation. Refs #6447.
- """
- def test_custom_key_validation(self):
- cache = get_cache('regressiontests.cache.liberal_backend://')
- # this key is both longer than 250 characters, and has spaces
- key = 'some key with spaces' * 15
- val = 'a value'
- cache.set(key, val)
- self.assertEqual(cache.get(key), val)
- class GetCacheTests(unittest.TestCase):
- def test_simple(self):
- cache = get_cache('locmem://')
- from django.core.cache.backends.locmem import LocMemCache
- self.assertTrue(isinstance(cache, LocMemCache))
- from django.core.cache import cache
- self.assertTrue(isinstance(cache, get_cache('default').__class__))
- cache = get_cache(
- 'django.core.cache.backends.dummy.DummyCache', **{'TIMEOUT': 120})
- self.assertEqual(cache.default_timeout, 120)
- self.assertRaises(InvalidCacheBackendError, get_cache, 'does_not_exist')
- class CacheUtils(TestCase):
- """TestCase for django.utils.cache functions."""
- def setUp(self):
- self.path = '/cache/test/'
- self.cache = get_cache('default')
- def tearDown(self):
- self.cache.clear()
- def _get_request(self, path, method='GET'):
- request = HttpRequest()
- request.META = {
- 'SERVER_NAME': 'testserver',
- 'SERVER_PORT': 80,
- }
- request.method = method
- request.path = request.path_info = "/cache/%s" % path
- return request
- def test_patch_vary_headers(self):
- headers = (
- # Initial vary, new headers, resulting vary.
- (None, ('Accept-Encoding',), 'Accept-Encoding'),
- ('Accept-Encoding', ('accept-encoding',), 'Accept-Encoding'),
- ('Accept-Encoding', ('ACCEPT-ENCODING',), 'Accept-Encoding'),
- ('Cookie', ('Accept-Encoding',), 'Cookie, Accept-Encoding'),
- ('Cookie, Accept-Encoding', ('Accept-Encoding',), 'Cookie, Accept-Encoding'),
- ('Cookie, Accept-Encoding', ('Accept-Encoding', 'cookie'), 'Cookie, Accept-Encoding'),
- (None, ('Accept-Encoding', 'COOKIE'), 'Accept-Encoding, COOKIE'),
- ('Cookie, Accept-Encoding', ('Accept-Encoding', 'cookie'), 'Cookie, Accept-Encoding'),
- ('Cookie , Accept-Encoding', ('Accept-Encoding', 'cookie'), 'Cookie, Accept-Encoding'),
- )
- for initial_vary, newheaders, resulting_vary in headers:
- response = HttpResponse()
- if initial_vary is not None:
- response['Vary'] = initial_vary
- patch_vary_headers(response, newheaders)
- self.assertEqual(response['Vary'], resulting_vary)
- def test_get_cache_key(self):
- request = self._get_request(self.path)
- response = HttpResponse()
- key_prefix = 'localprefix'
- # Expect None if no headers have been set yet.
- self.assertEqual(get_cache_key(request), None)
- # Set headers to an empty list.
- learn_cache_key(request, response)
- self.assertEqual(get_cache_key(request), 'views.decorators.cache.cache_page.settingsprefix.GET.a8c87a3d8c44853d7f79474f7ffe4ad5.d41d8cd98f00b204e9800998ecf8427e')
- # Verify that a specified key_prefix is taken into account.
- learn_cache_key(request, response, key_prefix=key_prefix)
- self.assertEqual(get_cache_key(request, key_prefix=key_prefix), 'views.decorators.cache.cache_page.localprefix.GET.a8c87a3d8c44853d7f79474f7ffe4ad5.d41d8cd98f00b204e9800998ecf8427e')
- def test_get_cache_key_with_query(self):
- request = self._get_request(self.path + '?test=1')
- response = HttpResponse()
- # Expect None if no headers have been set yet.
- self.assertEqual(get_cache_key(request), None)
- # Set headers to an empty list.
- learn_cache_key(request, response)
- # Verify that the querystring is taken into account.
- self.assertEqual(get_cache_key(request), 'views.decorators.cache.cache_page.settingsprefix.GET.bd889c5a59603af44333ed21504db3cd.d41d8cd98f00b204e9800998ecf8427e')
- def test_learn_cache_key(self):
- request = self._get_request(self.path, 'HEAD')
- response = HttpResponse()
- response['Vary'] = 'Pony'
- # Make sure that the Vary header is added to the key hash
- learn_cache_key(request, response)
- self.assertEqual(get_cache_key(request), 'views.decorators.cache.cache_page.settingsprefix.GET.a8c87a3d8c44853d7f79474f7ffe4ad5.d41d8cd98f00b204e9800998ecf8427e')
- def test_patch_cache_control(self):
- tests = (
- # Initial Cache-Control, kwargs to patch_cache_control, expected Cache-Control parts
- (None, {'private' : True}, set(['private'])),
- # Test whether private/public attributes are mutually exclusive
- ('private', {'private' : True}, set(['private'])),
- ('private', {'public' : True}, set(['public'])),
- ('public', {'public' : True}, set(['public'])),
- ('public', {'private' : True}, set(['private'])),
- ('must-revalidate,max-age=60,private', {'public' : True}, set(['must-revalidate', 'max-age=60', 'public'])),
- ('must-revalidate,max-age=60,public', {'private' : True}, set(['must-revalidate', 'max-age=60', 'private'])),
- ('must-revalidate,max-age=60', {'public' : True}, set(['must-revalidate', 'max-age=60', 'public'])),
- )
- cc_delim_re = re.compile(r'\s*,\s*')
- for initial_cc, newheaders, expected_cc in tests:
- response = HttpResponse()
- if initial_cc is not None:
- response['Cache-Control'] = initial_cc
- patch_cache_control(response, **newheaders)
- parts = set(cc_delim_re.split(response['Cache-Control']))
- self.assertEqual(parts, expected_cc)
- CacheUtils = override_settings(
- CACHE_MIDDLEWARE_KEY_PREFIX='settingsprefix',
- CACHE_MIDDLEWARE_SECONDS=1,
- CACHES={
- 'default': {
- 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
- },
- },
- USE_I18N=False,
- )(CacheUtils)
- PrefixedCacheUtils = override_settings(
- CACHES={
- 'default': {
- 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
- 'KEY_PREFIX': 'cacheprefix',
- },
- },
- )(CacheUtils)
- class CacheHEADTest(TestCase):
- def setUp(self):
- self.path = '/cache/test/'
- self.cache = get_cache('default')
- def tearDown(self):
- self.cache.clear()
- def _get_request(self, method):
- request = HttpRequest()
- request.META = {
- 'SERVER_NAME': 'testserver',
- 'SERVER_PORT': 80,
- }
- request.method = method
- request.path = request.path_info = self.path
- return request
- def _get_request_cache(self, method):
- request = self._get_request(method)
- request._cache_update_cache = True
- return request
- def _set_cache(self, request, msg):
- response = HttpResponse()
- response.content = msg
- return UpdateCacheMiddleware().process_response(request, response)
- def test_head_caches_correctly(self):
- test_content = 'test content'
- request = self._get_request_cache('HEAD')
- self._set_cache(request, test_content)
- request = self._get_request('HEAD')
- get_cache_data = FetchFromCacheMiddleware().process_request(request)
- self.assertNotEqual(get_cache_data, None)
- self.assertEqual(test_content, get_cache_data.content)
- def test_head_with_cached_get(self):
- test_content = 'test content'
- request = self._get_request_cache('GET')
- self._set_cache(request, test_content)
- request = self._get_request('HEAD')
- get_cache_data = FetchFromCacheMiddleware().process_request(request)
- self.assertNotEqual(get_cache_data, None)
- self.assertEqual(test_content, get_cache_data.content)
- CacheHEADTest = override_settings(
- CACHE_MIDDLEWARE_SECONDS=60,
- CACHE_MIDDLEWARE_KEY_PREFIX='test',
- CACHES={
- 'default': {
- 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
- },
- },
- )(CacheHEADTest)
- class CacheI18nTest(TestCase):
- def setUp(self):
- self.path = '/cache/test/'
- self.cache = get_cache('default')
- def tearDown(self):
- self.cache.clear()
- def _get_request(self, method='GET'):
- request = HttpRequest()
- request.META = {
- 'SERVER_NAME': 'testserver',
- 'SERVER_PORT': 80,
- }
- request.method = method
- request.path = request.path_info = self.path
- return request
- def _get_request_cache(self, query_string=None):
- request = HttpRequest()
- request.META = {
- 'SERVER_NAME': 'testserver',
- 'SERVER_PORT': 80,
- }
- if query_string:
- request.META['QUERY_STRING'] = query_string
- request.GET = QueryDict(query_string)
- request.path = request.path_info = self.path
- request._cache_update_cache = True
- request.method = 'GET'
- request.session = {}
- return request
- @override_settings(USE_I18N=True, USE_L10N=False, USE_TZ=False)
- def test_cache_key_i18n_translation(self):
- request = self._get_request()
- lang = translation.get_language()
- response = HttpResponse()
- key = learn_cache_key(request, response)
- self.assertIn(lang, key, "Cache keys should include the language name when translation is active")
- key2 = get_cache_key(request)
- self.assertEqual(key, key2)
- @override_settings(USE_I18N=False, USE_L10N=True, USE_TZ=False)
- def test_cache_key_i18n_formatting(self):
- request = self._get_request()
- lang = translation.get_language()
- response = HttpResponse()
- key = learn_cache_key(request, response)
- self.assertIn(lang, key, "Cache keys should include the language name when formatting is active")
- key2 = get_cache_key(request)
- self.assertEqual(key, key2)
- @override_settings(USE_I18N=False, USE_L10N=False, USE_TZ=True)
- def test_cache_key_i18n_timezone(self):
- request = self._get_request()
- tz = timezone.get_current_timezone_name()
- response = HttpResponse()
- key = learn_cache_key(request, response)
- self.assertIn(tz, key, "Cache keys should include the time zone name when time zones are active")
- key2 = get_cache_key(request)
- self.assertEqual(key, key2)
- @override_settings(USE_I18N=False, USE_L10N=False)
- def test_cache_key_no_i18n (self):
- request = self._get_request()
- lang = translation.get_language()
- tz = timezone.get_current_timezone_name()
- response = HttpResponse()
- key = learn_cache_key(request, response)
- self.assertNotIn(lang, key, "Cache keys shouldn't include the language name when i18n isn't active")
- self.assertNotIn(tz, key, "Cache keys shouldn't include the time zone name when i18n isn't active")
- @override_settings(
- CACHE_MIDDLEWARE_KEY_PREFIX="test",
- CACHE_MIDDLEWARE_SECONDS=60,
- USE_ETAGS=True,
- USE_I18N=True,
- )
- def test_middleware(self):
- def set_cache(request, lang, msg):
- translation.activate(lang)
- response = HttpResponse()
- response.content = msg
- return UpdateCacheMiddleware().process_response(request, response)
- # cache with non empty request.GET
- request = self._get_request_cache(query_string='foo=bar&other=true')
- get_cache_data = FetchFromCacheMiddleware().process_request(request)
- # first access, cache must return None
- self.assertEqual(get_cache_data, None)
- response = HttpResponse()
- content = 'Check for cache with QUERY_STRING'
- response.content = content
- UpdateCacheMiddleware().process_response(request, response)
- get_cache_data = FetchFromCacheMiddleware().process_request(request)
- # cache must return content
- self.assertNotEqual(get_cache_data, None)
- self.assertEqual(get_cache_data.content, content)
- # different QUERY_STRING, cache must be empty
- request = self._get_request_cache(query_string='foo=bar&somethingelse=true')
- get_cache_data = FetchFromCacheMiddleware().process_request(request)
- self.assertEqual(get_cache_data, None)
- # i18n tests
- en_message ="Hello world!"
- es_message ="Hola mundo!"
- request = self._get_request_cache()
- set_cache(request, 'en', en_message)
- get_cache_data = FetchFromCacheMiddleware().process_request(request)
- # Check that we can recover the cache
- self.assertNotEqual(get_cache_data, None)
- self.assertEqual(get_cache_data.content, en_message)
- # Check that we use etags
- self.assertTrue(get_cache_data.has_header('ETag'))
- # Check that we can disable etags
- with self.settings(USE_ETAGS=False):
- request._cache_update_cache = True
- set_cache(request, 'en', en_message)
- get_cache_data = FetchFromCacheMiddleware().process_request(request)
- self.assertFalse(get_cache_data.has_header('ETag'))
- # change the session language and set content
- request = self._get_request_cache()
- set_cache(request, 'es', es_message)
- # change again the language
- translation.activate('en')
- # retrieve the content from cache
- get_cache_data = FetchFromCacheMiddleware().process_request(request)
- self.assertEqual(get_cache_data.content, en_message)
- # change again the language
- translation.activate('es')
- get_cache_data = FetchFromCacheMiddleware().process_request(request)
- self.assertEqual(get_cache_data.content, es_message)
- # reset the language
- translation.deactivate()
- CacheI18nTest = override_settings(
- CACHE_MIDDLEWARE_KEY_PREFIX='settingsprefix',
- CACHES={
- 'default': {
- 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
- },
- },
- LANGUAGES=(
- ('en', 'English'),
- ('es', 'Spanish'),
- ),
- )(CacheI18nTest)
- PrefixedCacheI18nTest = override_settings(
- CACHES={
- 'default': {
- 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
- 'KEY_PREFIX': 'cacheprefix'
- },
- },
- )(CacheI18nTest)
- def hello_world_view(request, value):
- return HttpResponse('Hello World %s' % value)
- class CacheMiddlewareTest(TestCase):
- def setUp(self):
- self.factory = RequestFactory()
- self.default_cache = get_cache('default')
- self.other_cache = get_cache('other')
- def tearDown(self):
- self.default_cache.clear()
- self.other_cache.clear()
- def test_constructor(self):
- """
- Ensure the constructor is correctly distinguishing between usage of CacheMiddleware as
- Middleware vs. usage of CacheMiddleware as view decorator and setting attributes
- appropriately.
- """
- # If no arguments are passed in construction, it's being used as middleware.
- middleware = CacheMiddleware()
- # Now test object attributes against values defined in setUp above
- self.assertEqual(middleware.cache_timeout, 30)
- self.assertEqual(middleware.key_prefix, 'middlewareprefix')
- self.assertEqual(middleware.cache_alias, 'other')
- self.assertEqual(middleware.cache_anonymous_only, False)
- # If arguments are being passed in construction, it's being used as a decorator.
- # First, test with "defaults":
- as_view_decorator = CacheMiddleware(cache_alias=None, key_prefix=None)
- self.assertEqual(as_view_decorator.cache_timeout, 300) # Timeout value for 'default' cache, i.e. 300
- self.assertEqual(as_view_decorator.key_prefix, '')
- self.assertEqual(as_view_decorator.cache_alias, 'default') # Value of DEFAULT_CACHE_ALIAS from django.core.cache
- self.assertEqual(as_view_decorator.cache_anonymous_only, False)
- # Next, test with custom values:
- as_view_decorator_with_custom = CacheMiddleware(cache_anonymous_only=True, cache_timeout=60, cache_alias='other', key_prefix='foo')
- self.assertEqual(as_view_decorator_with_custom.cache_timeout, 60)
- self.assertEqual(as_view_decorator_with_custom.key_prefix, 'foo')
- self.assertEqual(as_view_decorator_with_custom.cache_alias, 'other')
- self.assertEqual(as_view_decorator_with_custom.cache_anonymous_only, True)
- def test_middleware(self):
- middleware = CacheMiddleware()
- prefix_middleware = CacheMiddleware(key_prefix='prefix1')
- timeout_middleware = CacheMiddleware(cache_timeout=1)
- request = self.factory.get('/view/')
- # Put the request through the request middleware
- result = middleware.process_request(request)
- self.assertEqual(result, None)
- response = hello_world_view(request, '1')
- # Now put the response through the response middleware
- response = middleware.process_response(request, response)
- # Repeating the request should result in a cache hit
- result = middleware.process_request(request)
- self.assertNotEquals(result, None)
- self.assertEqual(result.content, 'Hello World 1')
- # The same request through a different middleware won't hit
- result = prefix_middleware.process_request(request)
- self.assertEqual(result, None)
- # The same request with a timeout _will_ hit
- result = timeout_middleware.process_request(request)
- self.assertNotEquals(result, None)
- self.assertEqual(result.content, 'Hello World 1')
- @override_settings(CACHE_MIDDLEWARE_ANONYMOUS_ONLY=True)
- def test_cache_middleware_anonymous_only_wont_cause_session_access(self):
- """ The cache middleware shouldn't cause a session access due to
- CACHE_MIDDLEWARE_ANONYMOUS_ONLY if nothing else has accessed the
- session. Refs 13283 """
- from django.contrib.sessions.middleware import SessionMiddleware
- from django.contrib.auth.middleware import AuthenticationMiddleware
- middleware = CacheMiddleware()
- session_middleware = SessionMiddleware()
- auth_middleware = AuthenticationMiddleware()
- request = self.factory.get('/view_anon/')
- # Put the request through the request middleware
- session_middleware.process_request(request)
- auth_middleware.process_request(request)
- result = middleware.process_request(request)
- self.assertEqual(result, None)
- response = hello_world_view(request, '1')
- # Now put the response through the response middleware
- session_middleware.process_response(request, response)
- response = middleware.process_response(request, response)
- self.assertEqual(request.session.accessed, False)
- @override_settings(CACHE_MIDDLEWARE_ANONYMOUS_ONLY=True)
- def test_cache_middleware_anonymous_only_with_cache_page(self):
- """CACHE_MIDDLEWARE_ANONYMOUS_ONLY should still be effective when used
- with the cache_page decorator: the response to a request from an
- authenticated user should not be cached."""
- request = self.factory.get('/view_anon/')
- class MockAuthenticatedUser(object):
- def is_authenticated(self):
- return True
- class MockAccessedSession(object):
- accessed = True
- request.user = MockAuthenticatedUser()
- request.session = MockAccessedSession()
- response = cache_page(hello_world_view)(request, '1')
- self.assertFalse("Cache-Control" in response)
- def test_view_decorator(self):
- # decorate the same view with different cache decorators
- default_view = cache_page(hello_world_view)
- default_with_prefix_view = cache_page(key_prefix='prefix1')(hello_world_view)
- explicit_default_view = cache_page(cache='default')(hello_world_view)
- explicit_default_with_prefix_view = cache_page(cache='default', key_prefix='prefix1')(hello_world_view)
- other_view = cache_page(cache='other')(hello_world_view)
- other_with_prefix_view = cache_page(cache='other', key_prefix='prefix2')(hello_world_view)
- other_with_timeout_view = cache_page(3, cache='other', key_prefix='prefix3')(hello_world_view)
- request = self.factory.get('/view/')
- # Request the view once
- response = default_view(request, '1')
- self.assertEqual(response.content, 'Hello World 1')
- # Request again -- hit the cache
- response = default_view(request, '2')
- self.assertEqual(response.content, 'Hello World 1')
- # Requesting the same view with the explicit cache should yield the same result
- response = explicit_default_view(request, '3')
- self.assertEqual(response.content, 'Hello World 1')
- # Requesting with a prefix will hit a different cache key
- response = explicit_default_with_prefix_view(request, '4')
- self.assertEqual(response.content, 'Hello World 4')
- # Hitting the same view again gives a cache hit
- response = explicit_default_with_prefix_view(request, '5')
- self.assertEqual(response.content, 'Hello World 4')
- # And going back to the implicit cache will hit the same cache
- response = default_with_prefix_view(request, '6')
- self.assertEqual(response.content, 'Hello World 4')
- # Requesting from an alternate cache won't hit cache
- response = other_view(request, '7')
- self.assertEqual(response.content, 'Hello World 7')
- # But a repeated hit will hit cache
- response = other_view(request, '8')
- self.assertEqual(response.content, 'Hello World 7')
- # And prefixing the alternate cache yields yet another cache entry
- response = other_with_prefix_view(request, '9')
- self.assertEqual(response.content, 'Hello World 9')
- # Request from the alternate cache with a new prefix and a custom timeout
- response = other_with_timeout_view(request, '10')
- self.assertEqual(response.content, 'Hello World 10')
- # But if we wait a couple of seconds...
- time.sleep(2)
- # ... the default cache will still hit
- cache = get_cache('default')
- response = default_view(request, '11')
- self.assertEqual(response.content, 'Hello World 1')
- # ... the default cache with a prefix will still hit
- response = default_with_prefix_view(request, '12')
- self.assertEqual(response.content, 'Hello World 4')
- # ... the explicit default cache will still hit
- response = explicit_default_view(request, '13')
- self.assertEqual(response.content, 'Hello World 1')
- # ... the explicit default cache with a prefix will still hit
- response = explicit_default_with_prefix_view(request, '14')
- self.assertEqual(response.content, 'Hello World 4')
- # .. but a rapidly expiring cache won't hit
- response = other_view(request, '15')
- self.assertEqual(response.content, 'Hello World 15')
- # .. even if it has a prefix
- response = other_with_prefix_view(request, '16')
- self.assertEqual(response.content, 'Hello World 16')
- # ... but a view with a custom timeout will still hit
- response = other_with_timeout_view(request, '17')
- self.assertEqual(response.content, 'Hello World 10')
- # And if we wait a few more seconds
- time.sleep(2)
- # the custom timeouot cache will miss
- response = other_with_timeout_view(request, '18')
- self.assertEqual(response.content, 'Hello World 18')
- CacheMiddlewareTest = override_settings(
- CACHE_MIDDLEWARE_ALIAS='other',
- CACHE_MIDDLEWARE_KEY_PREFIX='middlewareprefix',
- CACHE_MIDDLEWARE_SECONDS=30,
- CACHE_MIDDLEWARE_ANONYMOUS_ONLY=False,
- CACHES={
- 'default': {
- 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
- },
- 'other': {
- 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
- 'LOCATION': 'other',
- 'TIMEOUT': '1',
- },
- },
- )(CacheMiddlewareTest)
- class TestWithTemplateResponse(TestCase):
- """
- Tests various headers w/ TemplateResponse.
- Most are probably redundant since they manipulate the same object
- anyway but the Etag header is 'special' because it relies on the
- content being complete (which is not necessarily always the case
- with a TemplateResponse)
- """
- def setUp(self):
- self.path = '/cache/test/'
- self.cache = get_cache('default')
- def tearDown(self):
- self.cache.clear()
- def _get_request(self, path, method='GET'):
- request = HttpRequest()
- request.META = {
- 'SERVER_NAME': 'testserver',
- 'SERVER_PORT': 80,
- }
- request.method = method
- request.path = request.path_info = "/cache/%s" % path
- return request
- def test_patch_vary_headers(self):
- headers = (
- # Initial vary, new headers, resulting vary.
- (None, ('Accept-Encoding',), 'Accept-Encoding'),
- ('Accept-Encoding', ('accept-encoding',), 'Accept-Encoding'),
- ('Accept-Encoding', ('ACCEPT-ENCODING',), 'Accept-Encoding'),
- ('Cookie', ('Accept-Encoding',), 'Cookie, Accept-Encoding'),
- ('Cookie, Accept-Encoding', ('Accept-Encoding',), 'Cookie, Accept-Encoding'),
- ('Cookie, Accept-Encoding', ('Accept-Encoding', 'cookie'), 'Cookie, Accept-Encoding'),
- (None, ('Accept-Encoding', 'COOKIE'), 'Accept-Encoding, COOKIE'),
- ('Cookie, Accept-Encoding', ('Accept-Encoding', 'cookie'), 'Cookie, Accept-Encoding'),
- ('Cookie , Accept-Encoding', ('Accept-Encoding', 'cookie'), 'Cookie, Accept-Encoding'),
- )
- for initial_vary, newheaders, resulting_vary in headers:
- response = TemplateResponse(HttpResponse(), Template("This is a test"))
- if initial_vary is not None:
- response['Vary'] = initial_vary
- patch_vary_headers(response, newheaders)
- self.assertEqual(response['Vary'], resulting_vary)
- def test_get_cache_key(self):
- request = self._get_request(self.path)
- response = TemplateResponse(HttpResponse(), Template("This is a test"))
- key_prefix = 'localprefix'
- # Expect None if no headers have been set yet.
- self.assertEqual(get_cache_key(request), None)
- # Set headers to an empty list.
- learn_cache_key(request, response)
- self.assertEqual(get_cache_key(request), 'views.decorators.cache.cache_page.settingsprefix.GET.a8c87a3d8c44853d7f79474f7ffe4ad5.d41d8cd98f00b204e9800998ecf8427e')
- # Verify that a specified key_prefix is taken into account.
- learn_cache_key(request, response, key_prefix=key_prefix)
- self.assertEqual(get_cache_key(request, key_prefix=key_prefix), 'views.decorators.cache.cache_page.localprefix.GET.a8c87a3d8c44853d7f79474f7ffe4ad5.d41d8cd98f00b204e9800998ecf8427e')
- def test_get_cache_key_with_query(self):
- request = self._get_request(self.path + '?test=1')
- response = TemplateResponse(HttpResponse(), Template("This is a test"))
- # Expect None if no headers have been set yet.
- self.assertEqual(get_cache_key(request), None)
- # Set headers to an empty list.
- learn_cache_key(request, response)
- # Verify that the querystring is taken into account.
- self.assertEqual(get_cache_key(request), 'views.decorators.cache.cache_page.settingsprefix.GET.bd889c5a59603af44333ed21504db3cd.d41d8cd98f00b204e9800998ecf8427e')
- @override_settings(USE_ETAGS=False)
- def test_without_etag(self):
- response = TemplateResponse(HttpResponse(), Template("This is a test"))
- self.assertFalse(response.has_header('ETag'))
- patch_response_headers(response)
- self.assertFalse(response.has_header('ETag'))
- response = response.render()
- self.assertFalse(response.has_header('ETag'))
- @override_settings(USE_ETAGS=True)
- def test_with_etag(self):
- response = TemplateResponse(HttpResponse(), Template("This is a test"))
- self.assertFalse(response.has_header('ETag'))
- patch_response_headers(response)
- self.assertFalse(response.has_header('ETag'))
- response = response.render()
- self.assertTrue(response.has_header('ETag'))
- TestWithTemplateResponse = override_settings(
- CACHE_MIDDLEWARE_KEY_PREFIX='settingsprefix',
- CACHE_MIDDLEWARE_SECONDS=1,
- CACHES={
- 'default': {
- 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
- },
- },
- USE_I18N=False,
- )(TestWithTemplateResponse)
- class TestEtagWithAdmin(TestCase):
- # See https://code.djangoproject.com/ticket/16003
- urls = "regressiontests.admin_views.urls"
- def test_admin(self):
- with self.settings(USE_ETAGS=False):
- response = self.client.get('/test_admin/admin/')
- self.assertEqual(response.status_code, 200)
- self.assertFalse(response.has_header('ETag'))
- with self.settings(USE_ETAGS=True):
- response = self.client.get('/test_admin/admin/')
- self.assertEqual(response.status_code, 200)
- self.assertTrue(response.has_header('ETag'))
|