Commit 6d9c0ecb by Steve Komarov

force_memoize works properly, needs refactoring and readme

parent 43cf2c81
...@@ -99,18 +99,18 @@ def query(category = None, name = None, description = None, args = None): ...@@ -99,18 +99,18 @@ def query(category = None, name = None, description = None, args = None):
def mq_force_memoize(func): def mq_force_memoize(func):
if hasattr(func, 'force_memoize'): if hasattr(func, 'force_memoize'):
print "it has it" print "FORCING MEMOIZE"
return func.force_memoize return func.force_memoize
else: else:
print "it has not" print "not forcing memoize"
return func return func
def mq_force_retrieve(func): def mq_force_retrieve(func):
if hasattr(func, 'force_retrieve'): if hasattr(func, 'force_retrieve'):
print "it has it" print "FORCING RETRIEVE"
return func.force_retrieve return func.force_retrieve
else: else:
print "it has not" print "not forcing retrieve"
return func return func
...@@ -201,9 +201,9 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong ...@@ -201,9 +201,9 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong
return results return results
def opmode_forceretrieve(*args, **kwargs): def opmode_forceretrieve(*args, **kwargs):
# Retrieve from cache if possible otherwise exception # Retrieve from cache if possible otherwise throw an exception
print "Forcing retrieve %s %s " % (f.__name__, key)
key = make_cache_key(f, args, kwargs) key = make_cache_key(f, args, kwargs)
print "Forcing retrieve %s %s " % (f.__name__, key)
results = get_from_cache_if_possible(f, key) results = get_from_cache_if_possible(f, key)
if not results: if not results:
raise KeyError('key %s not found in cache' % key) # TODO better exception class? raise KeyError('key %s not found in cache' % key) # TODO better exception class?
...@@ -218,6 +218,10 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong ...@@ -218,6 +218,10 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong
def cron(run_every, force_memoize=False, params={}): def cron(run_every, force_memoize=False, params={}):
''' Run command periodically ''' Run command periodically
force_memoize: if the function being decorated is also decorated by
@memoize_query, setting this to True will redo the computation
regardless of whether the results of the computation already exist in cache
The task scheduler process (typically celery beat) needs to be started The task scheduler process (typically celery beat) needs to be started
manually by the client module with: manually by the client module with:
python manage.py celery worker -B --loglevel=INFO python manage.py celery worker -B --loglevel=INFO
...@@ -231,13 +235,22 @@ def cron(run_every, force_memoize=False, params={}): ...@@ -231,13 +235,22 @@ def cron(run_every, force_memoize=False, params={}):
@periodic_task(run_every=run_every, name=f.__name__) @periodic_task(run_every=run_every, name=f.__name__)
def run(func=None, *args, **kw): def run(func=None, *args, **kw):
if not func: # if the call originated from the periodic_task decorator
#called as a periodic task... func will be None # func will be None. If the call originated from the rest of
# the code, func will be the same as f
called_as_periodic = True if func is None else False
if called_as_periodic:
print "called as periodic"
if force_memoize:
func = mq_force_memoize(f)
else:
func = f
else:
#called from code
print "called from code"
func = f func = f
if force_memoize:
func = mq_force_memoize(func)
result = optional_parameter_call(func, default_optional_kwargs, params) result = optional_parameter_call(func, default_optional_kwargs, params)
return result return result
......
# This module provides tests for periodic tasks using core.decorators.cron # This module provides tests for periodic tasks using core.decorators.cron
from edinsights.core.decorators import view from edinsights.core.decorators import view, mq_force_retrieve
from edinsights.periodic.tasks import big_computation from edinsights.periodic.tasks import big_computation, big_computation_withfm
@view() @view()
def big_computation_visualizer(): def big_computation_visualizer():
return "<html>%s</html>" % big_computation() return "<html>%s</html>" % mq_force_retrieve(big_computation)()
@view()
def big_computation_visualizer_withfm():
return "<html>%s</html>" % mq_force_retrieve(big_computation_withfm)()
...@@ -6,6 +6,10 @@ from django.core.cache import cache ...@@ -6,6 +6,10 @@ from django.core.cache import cache
from edinsights.core.decorators import memoize_query, cron from edinsights.core.decorators import memoize_query, cron
from django.utils.timezone import timedelta from django.utils.timezone import timedelta
def timestamp_to_tempfile(filename):
with open(tempfile.gettempdir() + '/' + filename, 'a') as temp_file:
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
@cron(run_every=timedelta(seconds=1)) @cron(run_every=timedelta(seconds=1))
def test_cron_task(params={}): def test_cron_task(params={}):
""" Simple task that gets executed by the scheduler (celery beat). """ Simple task that gets executed by the scheduler (celery beat).
...@@ -15,11 +19,10 @@ def test_cron_task(params={}): ...@@ -15,11 +19,10 @@ def test_cron_task(params={}):
Defined outside of the SimpleTest class because current support of celery decorators Defined outside of the SimpleTest class because current support of celery decorators
for methods and nested functions is experimental. for methods and nested functions is experimental.
""" """
with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'a') as temp_file: timestamp_to_tempfile('test_cron_task_counter')
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
@cron(run_every=timedelta(seconds=1), force_memoize=True) # cron decorators should go on top @cron(run_every=timedelta(seconds=1), force_memoize=False) # cron decorators should go on top
@memoize_query(60, key_override='test_cron_memoize_unique_cache_key') @memoize_query(60, key_override='test_cron_memoize_unique_cache_key')
def test_cron_memoize_task(): def test_cron_memoize_task():
""" Simple task that gets executed by the scheduler (celery beat). """ Simple task that gets executed by the scheduler (celery beat).
...@@ -31,18 +34,31 @@ def test_cron_memoize_task(): ...@@ -31,18 +34,31 @@ def test_cron_memoize_task():
The cron decorator should precede all other decorators The cron decorator should precede all other decorators
""" """
timestamp_to_tempfile('test_cron_memoize_task')
with open(tempfile.gettempdir() + '/' + 'test_cron_memoize_task', 'a') as temp_file:
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
return 42 return 42
@cron(run_every=timedelta(seconds=1), force_memoize=True) # cron decorators should go on top
@cron(run_every=timedelta(seconds=1), force_memoize=False) # cron decorators should go on top
@memoize_query(cache_time=60, key_override='big_computation_key') @memoize_query(cache_time=60, key_override='big_computation_key')
def big_computation(): def big_computation():
# time.sleep(seconds=10) """
Combines periodic tasks and memoization, with force_memoize=False.
This means that the periodic task will return cached results if possible.
This scenario is probably not what you want.
"""
timestamp_to_tempfile('big_computation_counter')
return "FAKERESULT"
with open(tempfile.gettempdir() + '/' + 'big_computation_counter', 'a') as temp_file:
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
return "FAKERESULT" @cron(run_every=timedelta(seconds=1), force_memoize=True) # cron decorators should go on top
\ No newline at end of file @memoize_query(cache_time=60, key_override='big_computation_key_withfm')
def big_computation_withfm():
"""
Combines periodic tasks and memoization, with force_memoize=True.
This means that the task will redo the computation regardless of
whether the result was already in the cache when it is called from the
task scheduler. If the task is called from code, it will return the cached
result. This scenario is probably what you want.
"""
timestamp_to_tempfile('big_computation_withfm_counter')
return "FAKERESULTFM"
...@@ -14,6 +14,10 @@ def count_timestamps(tempfilename): ...@@ -14,6 +14,10 @@ def count_timestamps(tempfilename):
last_call = float(timestamps[-1].rstrip()) last_call = float(timestamps[-1].rstrip())
return ncalls, last_call return ncalls, last_call
def truncate_tempfile(filename):
with open(tempfile.gettempdir() + '/' + filename, 'w') as temp_file:
pass
def run_celery_beat(seconds=3, verbose=False): def run_celery_beat(seconds=3, verbose=False):
""" Runs the task scheduler celery beat for the specified number of seconds as a child process """ Runs the task scheduler celery beat for the specified number of seconds as a child process
""" """
...@@ -39,19 +43,18 @@ class SimpleTest(TestCase): ...@@ -39,19 +43,18 @@ class SimpleTest(TestCase):
def __init__(self, arg): def __init__(self, arg):
TestCase.__init__(self, arg) TestCase.__init__(self, arg)
def test_cron(self): def test_cron(self):
""" Test that periodic tasks are scheduled and run """ Test that periodic tasks are scheduled and run
""" """
# truncate the file used as a counter of test_cron_task calls # truncate the file used as a counter of test_cron_task calls
# the file is used to share state between the test process and # the file is used to share state between the test process and
# the scheduler process (celery beat) # the scheduler process (celery beat)
with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'w') as temp_file: truncate_tempfile('test_cron_task_counter')
pass
run_celery_beat(seconds=3,verbose=True) run_celery_beat(seconds=3,verbose=True)
# verify number of calls and time of last call # verify number of calls and time of last call
ncalls, last_call = count_timestamps('test_cron_task_counter') ncalls, last_call = count_timestamps('test_cron_task_counter')
self.assertGreaterEqual(ncalls,2) self.assertGreaterEqual(ncalls,2)
self.assertAlmostEqual(last_call, time.time(), delta=100) self.assertAlmostEqual(last_call, time.time(), delta=100)
...@@ -65,8 +68,7 @@ class SimpleTest(TestCase): ...@@ -65,8 +68,7 @@ class SimpleTest(TestCase):
# truncate the file used as a counter of test_cron_task calls # truncate the file used as a counter of test_cron_task calls
# the file is used to share state between the test process and # the file is used to share state between the test process and
# the scheduler process (celery beat) # the scheduler process (celery beat)
with open(tempfile.gettempdir() + '/' + 'test_cron_memoize_task', 'w') as temp_file: truncate_tempfile('test_cron_memoize_task')
pass
# clear the cache from any previous executions of this test # clear the cache from any previous executions of this test
cache.delete('test_cron_memoize_unique_cache_key') cache.delete('test_cron_memoize_unique_cache_key')
...@@ -85,8 +87,7 @@ class SimpleTest(TestCase): ...@@ -85,8 +87,7 @@ class SimpleTest(TestCase):
# truncate the file used as a counter of big_computation calls # truncate the file used as a counter of big_computation calls
# the file is used to share state between the test process and # the file is used to share state between the test process and
# the scheduler process (celery beat) # the scheduler process (celery beat)
with open(tempfile.gettempdir() + '/' + 'big_computation_counter', 'w') as temp_file: truncate_tempfile('big_computation_counter')
pass
# delete cache from previous executions of this unit test # delete cache from previous executions of this unit test
cache.delete('big_computation_key') cache.delete('big_computation_key')
...@@ -106,4 +107,32 @@ class SimpleTest(TestCase): ...@@ -106,4 +107,32 @@ class SimpleTest(TestCase):
# by the execution of c.get('/view...') # by the execution of c.get('/view...')
ncalls_after, lastcall_after = count_timestamps('big_computation_counter') ncalls_after, lastcall_after = count_timestamps('big_computation_counter')
self.assertEqual(ncalls_before, ncalls_after) self.assertEqual(ncalls_before, ncalls_after)
self.assertEqual(lastcall_before, lastcall_after)
def test_cron_and_memoize_and_view_with_forcememoize(self):
""" Test that periodic tasks are scheduled, run, and cached, and the
cached results are available to @view. If the task is executed from
the scheduler (as a periodic task) the computation will be redone and
the new result will be stored in cache. If the task is executed from code
(e.g. from a @view handler) the result from cache is returned.
Tests task: tasks.big_computation_withfm
"""
truncate_tempfile('big_computation_withfm_counter')
cache.delete('big_computation_key_withfm')
run_celery_beat(seconds=3, verbose=True)
ncalls_before, lastcall_before = count_timestamps('big_computation_withfm_counter')
self.assertGreaterEqual(ncalls_before,2)
self.assertAlmostEqual(lastcall_before, time.time(),delta=100)
c = Client()
status_code = c.get('/view/big_computation_visualizer_withfm').status_code
content = c.get('/view/big_computation_visualizer_withfm').content
self.assertEqual(status_code, 200)
self.assertEqual(content, "<html>FAKERESULTFM</html>")
ncalls_after, lastcall_after = count_timestamps('big_computation_withfm_counter')
self.assertEqual(ncalls_before, ncalls_after)
self.assertEqual(lastcall_before, lastcall_after) self.assertEqual(lastcall_before, lastcall_after)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment