Commit 3eaa178f by Steve Komarov

added force_memoize with tests and comments

parent 6d9c0ecb
...@@ -97,20 +97,30 @@ def query(category = None, name = None, description = None, args = None): ...@@ -97,20 +97,30 @@ def query(category = None, name = None, description = None, args = None):
return query_factory return query_factory
def mq_force_memoize(func): def mq_force_memoize(func):
"""
Forces memoization for a function func that has been decorated by
@memoize_query. This means that it will always redo the computation
and store the results in cache, regardless of whether a cached result
already exists.
"""
if hasattr(func, 'force_memoize'): if hasattr(func, 'force_memoize'):
print "FORCING MEMOIZE"
return func.force_memoize return func.force_memoize
else: else:
print "not forcing memoize"
return func return func
def mq_force_retrieve(func): def mq_force_retrieve(func):
"""
Forces retrieval from cache for a function func that has been decorated by
@memoize_query. This means that it will try to get the result from cache.
If the result is not available in cache, it will throw an exception instead
of computing the result.
"""
if hasattr(func, 'force_retrieve'): if hasattr(func, 'force_retrieve'):
print "FORCING RETRIEVE"
return func.force_retrieve return func.force_retrieve
else: else:
print "not forcing retrieve"
return func return func
...@@ -120,9 +130,8 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong ...@@ -120,9 +130,8 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong
will be different per call, but function identically. will be different per call, but function identically.
key_override: use this as a cache key instead of computing a key from the key_override: use this as a cache key instead of computing a key from the
function signature. Useful for testing. function signature.
''' '''
print "in memoize query"
# Helper functions # Helper functions
def isuseful(a, ignores): def isuseful(a, ignores):
...@@ -131,11 +140,16 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong ...@@ -131,11 +140,16 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong
return True return True
def make_cache_key(f, args, kwargs): def make_cache_key(f, args, kwargs):
# Assumption: dict gets dumped in same order """
# Arguments are serializable. This is okay since Makes a cache key out of the function name and passed arguments
# this is just for SOA queries, but may break
# down if this were to be used as a generic Assumption: dict gets dumped in same order
# memoization framework Arguments are serializable. This is okay since
this is just for SOA queries, but may break
down if this were to be used as a generic
memoization framework
"""
m = hashlib.new("md4") m = hashlib.new("md4")
s = str({'uniquifier': 'anevt.memoize', s = str({'uniquifier': 'anevt.memoize',
'name' : f.__name__, 'name' : f.__name__,
...@@ -150,6 +164,10 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong ...@@ -150,6 +164,10 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong
return key return key
def compute_and_cache(f, key, args, kwargs): def compute_and_cache(f, key, args, kwargs):
"""
Runs f and stores the results in cache
"""
# HACK: There's a slight race condition here, where we # HACK: There's a slight race condition here, where we
# might recompute twice. # might recompute twice.
cache.set(key, 'Processing', timeout) cache.set(key, 'Processing', timeout)
...@@ -166,6 +184,9 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong ...@@ -166,6 +184,9 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong
def get_from_cache_if_possible(f, key): def get_from_cache_if_possible(f, key):
"""
Tries to retrieve the result from cache, otherwise returns None
"""
cached = cache.get(key) cached = cache.get(key)
# If we're already computing it, wait to finish # If we're already computing it, wait to finish
# computation # computation
...@@ -179,16 +200,17 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong ...@@ -179,16 +200,17 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong
return results return results
def factory(f): def factory(f):
print "in factory"
def opmode_default(f, *args, **kwargs): def opmode_default(f, *args, **kwargs):
print "in opmode_default" # Get he result from cache if possible, otherwise recompute
# and store in cache
key = make_cache_key(f, args, kwargs) key = make_cache_key(f, args, kwargs)
results = get_from_cache_if_possible(f, key) results = get_from_cache_if_possible(f, key)
if results: if results:
print "Cache hit %s %s" % (f.__name__, key) #print "Cache hit %s %s" % (f.__name__, key)
pass pass
else: else:
print "Cache miss %s %s" % (f.__name__, key) #print "Cache miss %s %s" % (f.__name__, key)
results = compute_and_cache(f,key, args, kwargs) results = compute_and_cache(f,key, args, kwargs)
return results return results
...@@ -196,22 +218,22 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong ...@@ -196,22 +218,22 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong
# Recompute and store in cache, regardless of whether key # Recompute and store in cache, regardless of whether key
# is in cache. # is in cache.
key = make_cache_key(f, args, kwargs) key = make_cache_key(f, args, kwargs)
print "Forcing memoize %s %s " % (f.__name__, key) # print "Forcing memoize %s %s " % (f.__name__, key)
results = compute_and_cache(f, key, args, kwargs) results = compute_and_cache(f, key, args, kwargs)
return results return results
def opmode_forceretrieve(*args, **kwargs): def opmode_forceretrieve(*args, **kwargs):
# Retrieve from cache if possible otherwise throw an exception # Retrieve from cache if possible otherwise throw an exception
key = make_cache_key(f, args, kwargs) key = make_cache_key(f, args, kwargs)
print "Forcing retrieve %s %s " % (f.__name__, key) # print "Forcing retrieve %s %s " % (f.__name__, key)
results = get_from_cache_if_possible(f, key) results = get_from_cache_if_possible(f, key)
if not results: if not results:
raise KeyError('key %s not found in cache' % key) # TODO better exception class? raise KeyError('key %s not found in cache' % key) # TODO better exception class?
return results return results
decfun = decorator(opmode_default,f) decfun = decorator(opmode_default,f)
decfun.force_memoize = opmode_forcememoize decfun.force_memoize = opmode_forcememoize # activated by mq_force_memoize
decfun.force_retrieve = opmode_forceretrieve decfun.force_retrieve = opmode_forceretrieve # activated by mq_force_retrieve
return decfun return decfun
return factory return factory
...@@ -227,28 +249,22 @@ def cron(run_every, force_memoize=False, params={}): ...@@ -227,28 +249,22 @@ def cron(run_every, force_memoize=False, params={}):
python manage.py celery worker -B --loglevel=INFO python manage.py celery worker -B --loglevel=INFO
Celery beat will automatically add tasks from files named 'tasks.py' Celery beat will automatically add tasks from files named 'tasks.py'
''' '''
print "in cron"
def factory(f): def factory(f):
print "in factory"
@periodic_task(run_every=run_every, name=f.__name__) @periodic_task(run_every=run_every, name=f.__name__)
def run(func=None, *args, **kw): def run(func=None, *args, **kw):
# if the call originated from the periodic_task decorator # if the call originated from the periodic_task decorator
# func will be None. If the call originated from the rest of # func will be None. If the call originated from the rest of
# the code, func will be the same as f # the code, func will be the same as f
called_as_periodic = True if func is None else False called_as_periodic = True if func is None else False
if called_as_periodic: if called_as_periodic:
print "called as periodic" #print "called as periodic"
if force_memoize: if force_memoize:
func = mq_force_memoize(f) func = mq_force_memoize(f)
else: else:
func = f func = f
else: else:
#called from code #print "called from code"
print "called from code"
func = f func = f
result = optional_parameter_call(func, default_optional_kwargs, params) result = optional_parameter_call(func, default_optional_kwargs, params)
......
import tempfile import tempfile
import time import time
from django.core.cache import cache
from edinsights.core.decorators import memoize_query, cron from edinsights.core.decorators import memoize_query, cron
from django.utils.timezone import timedelta from django.utils.timezone import timedelta
...@@ -10,14 +8,16 @@ def timestamp_to_tempfile(filename): ...@@ -10,14 +8,16 @@ def timestamp_to_tempfile(filename):
with open(tempfile.gettempdir() + '/' + filename, 'a') as temp_file: with open(tempfile.gettempdir() + '/' + filename, 'a') as temp_file:
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
# Test tasks are defined in tasks.py files. Other files could also be
# included using CELERY_IMPORTS. Avoid using @cron with nested functions and
# methods(the support of @periodic_task for these is experimental)
# The @cron decorator should precede all other decorators
@cron(run_every=timedelta(seconds=1)) @cron(run_every=timedelta(seconds=1))
def test_cron_task(params={}): def test_cron_task():
""" Simple task that gets executed by the scheduler (celery beat). """ Simple task that gets executed by the scheduler (celery beat).
The test case test_cron verifies that the execution tested by: tests.SimpleTest.test_cron
has taken place.
Defined outside of the SimpleTest class because current support of celery decorators
for methods and nested functions is experimental.
""" """
timestamp_to_tempfile('test_cron_task_counter') timestamp_to_tempfile('test_cron_task_counter')
...@@ -25,14 +25,13 @@ def test_cron_task(params={}): ...@@ -25,14 +25,13 @@ def test_cron_task(params={}):
@cron(run_every=timedelta(seconds=1), force_memoize=False) # cron decorators should go on top @cron(run_every=timedelta(seconds=1), force_memoize=False) # cron decorators should go on top
@memoize_query(60, key_override='test_cron_memoize_unique_cache_key') @memoize_query(60, key_override='test_cron_memoize_unique_cache_key')
def test_cron_memoize_task(): def test_cron_memoize_task():
""" Simple task that gets executed by the scheduler (celery beat). """
The test case test_cron_and_memoize verifies that the execution Simple task that gets executed by the scheduler (celery beat).
has taken place. Combines periodic tasks and memoization, with force_memoize=False.
This means that the periodic task will return cached results if possible.
Defined outside of the SimpleTest class because current support of celery decorators This scenario is probably not what you want.
for methods and nested functions is experimental.
The cron decorator should precede all other decorators tested by: tests.SimpleTest.test_cron_and_memoize
""" """
timestamp_to_tempfile('test_cron_memoize_task') timestamp_to_tempfile('test_cron_memoize_task')
return 42 return 42
...@@ -42,9 +41,13 @@ def test_cron_memoize_task(): ...@@ -42,9 +41,13 @@ def test_cron_memoize_task():
@memoize_query(cache_time=60, key_override='big_computation_key') @memoize_query(cache_time=60, key_override='big_computation_key')
def big_computation(): def big_computation():
""" """
Simple task that gets executed by the scheduler (celery beat) and also by @view
Combines periodic tasks and memoization, with force_memoize=False. Combines periodic tasks and memoization, with force_memoize=False.
This means that the periodic task will return cached results if possible. This means that the periodic task will return cached results if possible.
This scenario is probably not what you want. This scenario is probably not what you want.
tested by: tests.SimpleTest.test_cron_and_memoize_and_view
""" """
timestamp_to_tempfile('big_computation_counter') timestamp_to_tempfile('big_computation_counter')
return "FAKERESULT" return "FAKERESULT"
...@@ -54,11 +57,17 @@ def big_computation(): ...@@ -54,11 +57,17 @@ def big_computation():
@memoize_query(cache_time=60, key_override='big_computation_key_withfm') @memoize_query(cache_time=60, key_override='big_computation_key_withfm')
def big_computation_withfm(): def big_computation_withfm():
""" """
Simple task that gets executed by the scheduler (celery beat) and also by @view
Combines periodic tasks and memoization, with force_memoize=True. Combines periodic tasks and memoization, with force_memoize=True.
This means that the task will redo the computation regardless of This means that the task will redo the computation regardless of
whether the result was already in the cache when it is called from the whether the result was already in the cache when it is called from the
task scheduler. If the task is called from code, it will return the cached task scheduler. If the task is called from code, it will return the cached
result. This scenario is probably what you want. result. This scenario is probably what you want.
tested by: tests.SimpleTest.test_cron_and_memoize_and_view_with_forcememoize
""" """
timestamp_to_tempfile('big_computation_withfm_counter') timestamp_to_tempfile('big_computation_withfm_counter')
return "FAKERESULTFM" return "FAKERESULTFM"
# TODO put every task in its own file, and use CELERY_IMPORTS to run
# individual tasks instead of all tasks at the same time for each test
...@@ -14,8 +14,12 @@ def count_timestamps(tempfilename): ...@@ -14,8 +14,12 @@ def count_timestamps(tempfilename):
last_call = float(timestamps[-1].rstrip()) last_call = float(timestamps[-1].rstrip())
return ncalls, last_call return ncalls, last_call
def truncate_tempfile(filename): def truncate_tempfile(tempfilename):
with open(tempfile.gettempdir() + '/' + filename, 'w') as temp_file: """
Truncates the file used to share state between the test process
and the scheduler process (celery beat).
"""
with open(tempfile.gettempdir() + '/' + tempfilename, 'w') as temp_file:
pass pass
def run_celery_beat(seconds=3, verbose=False): def run_celery_beat(seconds=3, verbose=False):
...@@ -38,6 +42,7 @@ def run_celery_beat(seconds=3, verbose=False): ...@@ -38,6 +42,7 @@ def run_celery_beat(seconds=3, verbose=False):
sleep(seconds) sleep(seconds)
celery_beat_process.terminate() celery_beat_process.terminate()
class SimpleTest(TestCase): class SimpleTest(TestCase):
def __init__(self, arg): def __init__(self, arg):
...@@ -46,35 +51,30 @@ class SimpleTest(TestCase): ...@@ -46,35 +51,30 @@ class SimpleTest(TestCase):
def test_cron(self): def test_cron(self):
""" Test that periodic tasks are scheduled and run """ Test that periodic tasks are scheduled and run
tests: tasks.test_cron_task
""" """
# truncate the file used as a counter of test_cron_task calls
# the file is used to share state between the test process and
# the scheduler process (celery beat)
truncate_tempfile('test_cron_task_counter')
truncate_tempfile('test_cron_task_counter')
run_celery_beat(seconds=3,verbose=True) run_celery_beat(seconds=3,verbose=True)
# verify number of calls and time of last call # verify number of calls and time of last call
ncalls, last_call = count_timestamps('test_cron_task_counter') ncalls, last_call = count_timestamps('test_cron_task_counter')
self.assertGreaterEqual(ncalls,2) self.assertGreaterEqual(ncalls, 2)
self.assertAlmostEqual(last_call, time.time(), delta=100) self.assertAlmostEqual(last_call, time.time(), delta=100)
def test_cron_and_memoize(self): def test_cron_and_memoize(self):
""" Test that periodic tasks are scheduled and run, and the results """ Test that periodic tasks are scheduled and run, and the results
are cached. are cached.
"""
# truncate the file used as a counter of test_cron_task calls tests: tasks.test_cron_memoize_task
# the file is used to share state between the test process and """
# the scheduler process (celery beat)
truncate_tempfile('test_cron_memoize_task') truncate_tempfile('test_cron_memoize_task')
# clear the cache from any previous executions of this test # clear the cache from any previous executions of this test
cache.delete('test_cron_memoize_unique_cache_key') cache.delete('test_cron_memoize_unique_cache_key')
run_celery_beat(seconds=3,verbose=True) run_celery_beat(seconds=3,verbose=True)
ncalls, last_call = count_timestamps('test_cron_memoize_task') ncalls, last_call = count_timestamps('test_cron_memoize_task')
self.assertEqual(ncalls,1) # after the first call all subsequent calls should be cached self.assertEqual(ncalls,1) # after the first call all subsequent calls should be cached
self.assertAlmostEqual(last_call, time.time(), delta=100) self.assertAlmostEqual(last_call, time.time(), delta=100)
...@@ -82,11 +82,10 @@ class SimpleTest(TestCase): ...@@ -82,11 +82,10 @@ class SimpleTest(TestCase):
def test_cron_and_memoize_and_view(self): def test_cron_and_memoize_and_view(self):
""" Test that periodic tasks are scheduled, run, cached, and the """ Test that periodic tasks are scheduled, run, cached, and the
cached results are available to @view cached results are available to @view
tests: tasks.big_computation
""" """
# truncate the file used as a counter of big_computation calls
# the file is used to share state between the test process and
# the scheduler process (celery beat)
truncate_tempfile('big_computation_counter') truncate_tempfile('big_computation_counter')
# delete cache from previous executions of this unit test # delete cache from previous executions of this unit test
...@@ -112,9 +111,9 @@ class SimpleTest(TestCase): ...@@ -112,9 +111,9 @@ class SimpleTest(TestCase):
def test_cron_and_memoize_and_view_with_forcememoize(self): def test_cron_and_memoize_and_view_with_forcememoize(self):
""" Test that periodic tasks are scheduled, run, and cached, and the """ Test that periodic tasks are scheduled, run, and cached, and the
cached results are available to @view. If the task is executed from cached results are available to @view. If the task is executed from
the scheduler (as a periodic task) the computation will be redone and the scheduler (as a periodic task) the computation should be redone and
the new result will be stored in cache. If the task is executed from code the new result should be stored in cache. If the task is executed from code
(e.g. from a @view handler) the result from cache is returned. (e.g. from a @view or @query handler) the result from cache should be returned.
Tests task: tasks.big_computation_withfm Tests task: tasks.big_computation_withfm
""" """
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment