Commit 9db6ff8e by Piotr Mitros

Merge pull request #8 from edx/stkomarov-fixes

Fixes issues with periodic tasks. Adds necessary functionality to memoize to interact with periodic tasks. 
parents 76ab893e 8e5f08a8
...@@ -6,7 +6,7 @@ log=logging.getLogger(__name__) ...@@ -6,7 +6,7 @@ log=logging.getLogger(__name__)
event_handlers = [] event_handlers = []
request_handlers = {'view':{}, 'query':{}} request_handlers = {'view':{}, 'query':{}}
from edinsights.core.views import default_optional_kwargs from edinsights.core.util import default_optional_kwargs
funcskips = default_optional_kwargs.keys()+['params'] # params are additional GET/POST parameters funcskips = default_optional_kwargs.keys()+['params'] # params are additional GET/POST parameters
def register_handler(cls, category, name, description, f, args): def register_handler(cls, category, name, description, f, args):
...@@ -124,6 +124,6 @@ def handle_request(cls, name, **kwargs): ...@@ -124,6 +124,6 @@ def handle_request(cls, name, **kwargs):
else: else:
arglist = inspect.getargspec(handler).args arglist = inspect.getargspec(handler).args
from util import optional_parameter_call, default_optional_kwargs from util import optional_parameter_call
return optional_parameter_call(handler, default_optional_kwargs, kwargs, arglist) return optional_parameter_call(handler, kwargs, arglist)
...@@ -104,7 +104,14 @@ def get_query(f): ...@@ -104,7 +104,14 @@ def get_query(f):
return get_embed('query', config = embed_config) return get_embed('query', config = embed_config)
def optional_parameter_call(function, optional_kwargs, passed_kwargs, arglist = None): default_optional_kwargs = {'fs' : get_filesystem,
'mongodb' : get_mongo,
'cache' : get_cache,
# 'analytics' : get_djobject,
'view' : get_view,
'query' : get_query}
def optional_parameter_call(function, passed_kwargs, arglist = None):
''' Calls a function with parameters: ''' Calls a function with parameters:
passed_kwargs are input parameters the function must take. passed_kwargs are input parameters the function must take.
Format: Dictionary mapping keywords to arguments. Format: Dictionary mapping keywords to arguments.
...@@ -133,19 +140,16 @@ def optional_parameter_call(function, optional_kwargs, passed_kwargs, arglist = ...@@ -133,19 +140,16 @@ def optional_parameter_call(function, optional_kwargs, passed_kwargs, arglist =
for arg in arglist: for arg in arglist:
# This order is important for security. We don't want users # This order is important for security. We don't want users
# being able to pass in 'fs' or 'db' and having that take # being able to pass in 'fs' or 'db' and having that take
# precedence. # precedence.
if arg in optional_kwargs:
args[arg] = optional_kwargs[arg](function) global default_optional_kwargs
if arg in default_optional_kwargs:
args[arg] = default_optional_kwargs[arg](function)
#ignore default arguments in memoize when building cache key
args[arg].memoize_ignore = True
elif arg in passed_kwargs: elif arg in passed_kwargs:
args[arg] = passed_kwargs[arg] args[arg] = passed_kwargs[arg]
else: else:
raise TypeError("Missing argument needed for handler ", arg) raise TypeError("Missing argument needed for handler ", arg)
return function(**args) return function(**args)
default_optional_kwargs = {'fs' : get_filesystem,
'mongodb' : get_mongo,
'cache' : get_cache,
# 'analytics' : get_djobject,
'view' : get_view,
'query' : get_query}
...@@ -13,7 +13,6 @@ from django.conf import settings ...@@ -13,7 +13,6 @@ from django.conf import settings
from djeventstream.signals import event_received from djeventstream.signals import event_received
from registry import event_handlers, request_handlers from registry import event_handlers, request_handlers
from util import default_optional_kwargs
import auth import auth
import util import util
...@@ -124,12 +123,12 @@ def handle_event(sender, **kwargs): ...@@ -124,12 +123,12 @@ def handle_event(sender, **kwargs):
if not batch: ## Message was a list of events, but handler cannot batch events if not batch: ## Message was a list of events, but handler cannot batch events
for event in msg: for event in msg:
try: try:
optional_parameter_call(event_func, default_optional_kwargs, {'events':[event]}) optional_parameter_call(event_func, {'events':[event]})
except: except:
handle_event_exception(e['function']) handle_event_exception(e['function'])
else: ## Message was a list of events, and handler can batch events else: ## Message was a list of events, and handler can batch events
try: try:
optional_parameter_call(event_func, default_optional_kwargs, {'events':msg}) optional_parameter_call(event_func, {'events':msg})
except: except:
handle_event_exception(e['function']) handle_event_exception(e['function'])
......
# This module provides tests for periodic tasks using core.decorators.cron # This module provides tests for periodic tasks using core.decorators.cron
from edinsights.core.decorators import view from edinsights.core.decorators import view, use_fromcache, MemoizeNotInCacheError
from edinsights.periodic.tasks import big_computation from edinsights.periodic.tasks import big_computation
from edinsights.periodic.tasks import big_computation_withfm
#
@view() @view()
def big_computation_visualizer(): def big_computation_visualizer():
return "<html>%s</html>" % big_computation() return "<html>%s</html>" % use_fromcache(big_computation)()
@view()
def big_computation_visualizer_withfm():
try:
# returns instantly, does not perform computation if results are not
# in cache
result = use_fromcache(big_computation_withfm)()
except MemoizeNotInCacheError:
result = "The big computation has not been performed yet"
# alternatively you can display a "please wait" message
# and run big_computation_withfm() without force_retrieve
return "<html>%s</html>" % result
import tempfile import tempfile
import time import time
from django.core.cache import cache
from edinsights.core.decorators import memoize_query, cron from edinsights.core.decorators import memoize_query, cron
from django.utils.timezone import timedelta from django.utils.timezone import timedelta
def timestamp_to_tempfile(filename):
with open(tempfile.gettempdir() + '/' + filename, 'a') as temp_file:
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
# Test tasks are defined in tasks.py files. Other files could also be
# included using CELERY_IMPORTS. Avoid using @cron with nested functions and
# methods(the support of @periodic_task for these is experimental)
# The @cron decorator should precede all other decorators
@cron(run_every=timedelta(seconds=1)) @cron(run_every=timedelta(seconds=1))
def test_cron_task(): def test_cron_task():
""" Simple task that gets executed by the scheduler (celery beat). """ Simple task that gets executed by the scheduler (celery beat).
The test case test_cron verifies that the execution tested by: tests.SimpleTest.test_cron
has taken place.
Defined outside of the SimpleTest class because current support of celery decorators
for methods and nested functions is experimental.
""" """
with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'a') as temp_file: timestamp_to_tempfile('test_cron_task_counter')
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
@cron(run_every=timedelta(seconds=1)) # cron decorators should go on top @cron(run_every=timedelta(seconds=1), force_memoize=False) # cron decorators should go on top
@memoize_query(60, key_override='test_cron_memoize_unique_cache_key') @memoize_query(60)
def test_cron_memoize_task(): def test_cron_memoize_task(fs):
""" Simple task that gets executed by the scheduler (celery beat). """
The test case test_cron_and_memoize verifies that the execution Simple task that gets executed by the scheduler (celery beat).
has taken place. Combines periodic tasks and memoization, with force_memoize=False.
This means that the periodic task will return cached results if possible.
This scenario is probably not what you want.
Defined outside of the SimpleTest class because current support of celery decorators tested by: tests.SimpleTest.test_cron_and_memoize
for methods and nested functions is experimental. """
timestamp_to_tempfile('test_cron_memoize_task')
return 42
The cron decorator should precede all other decorators
@cron(run_every=timedelta(seconds=1), force_memoize=False) # cron decorators should go on top
@memoize_query(cache_time=60)
def big_computation():
""" """
Simple task that gets executed by the scheduler (celery beat) and also by @view
with open(tempfile.gettempdir() + '/' + 'test_cron_memoize_task', 'a') as temp_file: Combines periodic tasks and memoization, with force_memoize=False.
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call This means that the periodic task will return cached results if possible.
This scenario is probably not what you want.
return 42 tested by: tests.SimpleTest.test_cron_and_memoize_and_view
"""
timestamp_to_tempfile('big_computation_counter')
return "FAKERESULT"
@cron(run_every=timedelta(seconds=1)) # cron decorators should go on top
@memoize_query(cache_time=60, key_override='big_computation_key')
def big_computation():
# time.sleep(seconds=10)
with open(tempfile.gettempdir() + '/' + 'big_computation_counter', 'a') as temp_file: @cron(run_every=timedelta(seconds=1), force_memoize=True) # cron decorators should go on top
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call @memoize_query(cache_time=60)
def big_computation_withfm():
"""
Simple task that gets executed by the scheduler (celery beat) and also by @view
Combines periodic tasks and memoization, with force_memoize=True.
This means that the task will redo the computation regardless of
whether the result was already in the cache when it is called from the
task scheduler. If the task is called from code, it will return the cached
result. This scenario is probably what you want.
tested by: tests.SimpleTest.test_cron_and_memoize_and_view_with_forcememoize
"""
timestamp_to_tempfile('big_computation_withfm_counter')
return "FAKERESULTFM"
return "FAKERESULT" # TODO put every task in its own file, and use CELERY_IMPORTS to run
\ No newline at end of file # individual tasks instead of all tasks at the same time for each test
...@@ -5,7 +5,7 @@ import time ...@@ -5,7 +5,7 @@ import time
from django.test import TestCase from django.test import TestCase
from django.test.client import Client from django.test.client import Client
from django.core.cache import cache from django.core.cache import cache
from core.decorators import use_clearcache
def count_timestamps(tempfilename): def count_timestamps(tempfilename):
with open(tempfile.gettempdir() + '/' + tempfilename, 'r') as temp_file: with open(tempfile.gettempdir() + '/' + tempfilename, 'r') as temp_file:
...@@ -14,6 +14,14 @@ def count_timestamps(tempfilename): ...@@ -14,6 +14,14 @@ def count_timestamps(tempfilename):
last_call = float(timestamps[-1].rstrip()) last_call = float(timestamps[-1].rstrip())
return ncalls, last_call return ncalls, last_call
def truncate_tempfile(tempfilename):
"""
Truncates the file used to share state between the test process
and the scheduler process (celery beat).
"""
with open(tempfile.gettempdir() + '/' + tempfilename, 'w') as temp_file:
pass
def run_celery_beat(seconds=3, verbose=False): def run_celery_beat(seconds=3, verbose=False):
""" Runs the task scheduler celery beat for the specified number of seconds as a child process """ Runs the task scheduler celery beat for the specified number of seconds as a child process
""" """
...@@ -34,45 +42,41 @@ def run_celery_beat(seconds=3, verbose=False): ...@@ -34,45 +42,41 @@ def run_celery_beat(seconds=3, verbose=False):
sleep(seconds) sleep(seconds)
celery_beat_process.terminate() celery_beat_process.terminate()
class SimpleTest(TestCase): class SimpleTest(TestCase):
def __init__(self, arg): def __init__(self, arg):
TestCase.__init__(self, arg) TestCase.__init__(self, arg)
def test_cron(self): def test_cron(self):
""" Test that periodic tasks are scheduled and run """ Test that periodic tasks are scheduled and run
tests: tasks.test_cron_task
""" """
# truncate the file used as a counter of test_cron_task calls
# the file is used to share state between the test process and
# the scheduler process (celery beat)
with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'w') as temp_file:
pass
truncate_tempfile('test_cron_task_counter')
run_celery_beat(seconds=3,verbose=False) run_celery_beat(seconds=3,verbose=False)
# verify number of calls and time of last call # verify number of calls and time of last call
ncalls, last_call = count_timestamps('test_cron_task_counter') ncalls, last_call = count_timestamps('test_cron_task_counter')
self.assertGreaterEqual(ncalls,2) self.assertGreaterEqual(ncalls, 2)
self.assertAlmostEqual(last_call, time.time(), delta=100) self.assertAlmostEqual(last_call, time.time(), delta=100)
def test_cron_and_memoize(self): def test_cron_and_memoize(self):
""" Test that periodic tasks are scheduled and run, and the results """ Test that periodic tasks are scheduled and run, and the results
are cached. are cached.
"""
# truncate the file used as a counter of test_cron_task calls tests: tasks.test_cron_memoize_task
# the file is used to share state between the test process and """
# the scheduler process (celery beat) truncate_tempfile('test_cron_memoize_task')
with open(tempfile.gettempdir() + '/' + 'test_cron_memoize_task', 'w') as temp_file:
pass
# clear the cache from any previous executions of this test # clear the cache from any previous executions of this test
cache.delete('test_cron_memoize_unique_cache_key') from tasks import test_cron_memoize_task
use_clearcache(test_cron_memoize_task)()
run_celery_beat(seconds=3,verbose=False) run_celery_beat(seconds=3,verbose=False)
ncalls, last_call = count_timestamps('test_cron_memoize_task') ncalls, last_call = count_timestamps('test_cron_memoize_task')
self.assertEqual(ncalls,1) # after the first call all subsequent calls should be cached self.assertEqual(ncalls,1) # after the first call all subsequent calls should be cached
self.assertAlmostEqual(last_call, time.time(), delta=100) self.assertAlmostEqual(last_call, time.time(), delta=100)
...@@ -80,16 +84,15 @@ class SimpleTest(TestCase): ...@@ -80,16 +84,15 @@ class SimpleTest(TestCase):
def test_cron_and_memoize_and_view(self): def test_cron_and_memoize_and_view(self):
""" Test that periodic tasks are scheduled, run, cached, and the """ Test that periodic tasks are scheduled, run, cached, and the
cached results are available to @view cached results are available to @view
tests: tasks.big_computation
""" """
# truncate the file used as a counter of big_computation calls truncate_tempfile('big_computation_counter')
# the file is used to share state between the test process and
# the scheduler process (celery beat)
with open(tempfile.gettempdir() + '/' + 'big_computation_counter', 'w') as temp_file:
pass
# delete cache from previous executions of this unit test # delete cache from previous executions of this unit test
cache.delete('big_computation_key') from tasks import big_computation
use_clearcache(big_computation)()
run_celery_beat(seconds=3, verbose=False) run_celery_beat(seconds=3, verbose=False)
...@@ -106,4 +109,35 @@ class SimpleTest(TestCase): ...@@ -106,4 +109,35 @@ class SimpleTest(TestCase):
# by the execution of c.get('/view...') # by the execution of c.get('/view...')
ncalls_after, lastcall_after = count_timestamps('big_computation_counter') ncalls_after, lastcall_after = count_timestamps('big_computation_counter')
self.assertEqual(ncalls_before, ncalls_after) self.assertEqual(ncalls_before, ncalls_after)
self.assertEqual(lastcall_before, lastcall_after)
def test_cron_and_memoize_and_view_with_forcememoize(self):
""" Test that periodic tasks are scheduled, run, and cached, and the
cached results are available to @view. If the task is executed from
the scheduler (as a periodic task) the computation should be redone and
the new result should be stored in cache. If the task is executed from code
(e.g. from a @view or @query handler) the result from cache should be returned.
Tests task: tasks.big_computation_withfm
"""
truncate_tempfile('big_computation_withfm_counter')
from tasks import big_computation_withfm
use_clearcache(big_computation_withfm)()
run_celery_beat(seconds=3, verbose=False)
ncalls_before, lastcall_before = count_timestamps('big_computation_withfm_counter')
self.assertGreaterEqual(ncalls_before,2)
self.assertAlmostEqual(lastcall_before, time.time(),delta=100)
c = Client()
status_code = c.get('/view/big_computation_visualizer_withfm').status_code
content = c.get('/view/big_computation_visualizer_withfm').content
self.assertEqual(status_code, 200)
self.assertEqual(content, "<html>FAKERESULTFM</html>")
ncalls_after, lastcall_after = count_timestamps('big_computation_withfm_counter')
self.assertEqual(ncalls_before, ncalls_after)
self.assertEqual(lastcall_before, lastcall_after) self.assertEqual(lastcall_before, lastcall_after)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment