Commit 4f7a5a66 by Steve Komarov

more periodic tests

parent 77487d8f
......@@ -97,10 +97,13 @@ def query(category = None, name = None, description = None, args = None):
return query_factory
def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymongo.database.Database'>", "<class 'fs.osfs.OSFS'>"]):
def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymongo.database.Database'>", "<class 'fs.osfs.OSFS'>"], key_override=None):
''' Call function only if we do not have the results for its execution already
We ignore parameters of type pymongo.database.Database and fs.osfs.OSFS. These
will be different per call, but function identically.
key_override: use this as a cache key instead of computing a key from the
function signature. Useful for testing.
'''
def isuseful(a, ignores):
if str(type(a)) in ignores:
......@@ -114,14 +117,19 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong
# this is just for SOA queries, but may break
# down if this were to be used as a generic
# memoization framework
m = hashlib.new("md4")
s = str({'uniquifier': 'anevt.memoize',
'name' : f.__name__,
'module' : f.__module__,
'args': [a for a in args if isuseful(a, ignores)],
'kwargs': kwargs})
m.update(s)
key = m.hexdigest()
if key_override is not None:
key = key_override
else:
m = hashlib.new("md4")
s = str({'uniquifier': 'anevt.memoize',
'name' : f.__name__,
'module' : f.__module__,
'args': [a for a in args if isuseful(a, ignores)],
'kwargs': kwargs})
m.update(s)
key = m.hexdigest()
# Check if we've cached the computation, or are in the
# process of computing it
cached = cache.get(key)
......@@ -164,11 +172,15 @@ def cron(run_every, params=None):
python manage.py celery worker -B --loglevel=INFO
Celery beat will automatically add tasks from files named 'tasks.py'
'''
def factory(func):
@periodic_task(run_every=run_every, name=func.__name__)
def run():
optional_parameter_call(func, default_optional_kwargs, params)
return decorator(run,func)
def factory(f):
@periodic_task(run_every=run_every, name=f.__name__)
def run(func=None):
if func:
result = optional_parameter_call(func, default_optional_kwargs, params)
else:
result = optional_parameter_call(f, default_optional_kwargs, params)
return result
return decorator(run, f)
return factory
def event_property(name=None, description=None):
......
......@@ -4,64 +4,11 @@ when you run "manage.py test".
Replace this with more appropriate tests for your application.
"""
import time, tempfile
import time
from django.test import TestCase
from decorators import memoize_query
from decorators import memoize_query,cron
from django.utils.timezone import timedelta
from celery.task import periodic_task
@cron(run_every=timedelta(seconds=1))
def test_cron_task():
""" Simple task that gets executed by the scheduler (celery beat).
The test case test_cron verifies that the execution
has taken place.
Defined outside of the SimpleTest class because current support of celery decorators
for methods and nested functions is experimental.
"""
with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'a') as temp_file:
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
@cron(run_every=timedelta(seconds=1)) # cron decorators should go on top
@memoize_query(60)
def test_cron_memoize_task():
""" Simple task that gets executed by the scheduler (celery beat).
The test case test_cron_and_memoize verifies that the execution
has taken place.
Defined outside of the SimpleTest class because current support of celery decorators
for methods and nested functions is experimental.
The cron decorator should precede all other decorators
"""
with open(tempfile.gettempdir() + '/' + 'test_cron_memoize_task', 'a') as temp_file:
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
return 42
def run_celery_beat(seconds=3, verbose=False):
""" Runs the task scheduler celery beat for the specified number of seconds as a child process
"""
import os
with open(os.devnull, 'w') as devnull:
from subprocess import Popen
command = ['python', 'manage.py', 'celery', 'worker', '-B', '--loglevel=INFO', '--settings=testsettings',]
if verbose:
suppress_output_args = {}
else:
suppress_output_args = {'stdout':devnull, 'stderr':devnull}
celery_beat_process = Popen(command, **suppress_output_args)
# give time to celery beat to execute test_cron_task
from time import sleep
print "running periodic tasks for %s seconds... " % seconds
sleep(seconds)
celery_beat_process.terminate()
class SimpleTest(TestCase):
def test_basic_addition(self):
......@@ -74,7 +21,6 @@ class SimpleTest(TestCase):
TestCase.__init__(self, arg)
self.memoize_calls = 0
def test_memoize(self):
self.memoize_calls = 0
return
......@@ -144,45 +90,6 @@ class SimpleTest(TestCase):
response = c.get(url)
self.assertEqual(response.status_code, 200)
def test_cron(self):
""" Test that periodic tasks are scheduled and run
"""
# truncate the file used as a counter of test_cron_task calls
# the file is used to share state between the test process and
# the scheduler process (celery beat)
with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'w') as temp_file:
pass
run_celery_beat(seconds=3,verbose=False)
# verify number of calls and time of last call
with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'r') as temp_file:
timestamps = temp_file.readlines()
ncalls = len(timestamps)
self.assertGreaterEqual(ncalls,2)
last_call = float(timestamps[-1].rstrip())
self.assertAlmostEqual(last_call, time.time(), delta=100)
def test_cron_and_memoize(self):
""" Test that periodic tasks are scheduled and run
"""
# truncate the file used as a counter of test_cron_task calls
# the file is used to share state between the test process and
# the scheduler process (celery beat)
with open(tempfile.gettempdir() + '/' + 'test_cron_memoize_task', 'w') as temp_file:
pass
run_celery_beat(seconds=3,verbose=False)
# verify number of calls and time of last call
with open(tempfile.gettempdir() + '/' + 'test_cron_memoize_task', 'r') as temp_file:
timestamps = temp_file.readlines()
ncalls = len(timestamps)
self.assertEqual(ncalls,1) # after the first call all subsequent calls should be cached
last_call = float(timestamps[-1].rstrip())
self.assertAlmostEqual(last_call, time.time(), delta=100)
# The django cache is used by core.memoize_query to store results temporarily.
# Because periodic tasks are run from a separate process, and
# because periodic tasks could also be memoized, the
# django cache backend has to be visible across processes.
# Most django.cache.core.backends would work except for LocMemCache
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.filebased.FileBasedCache',
# files in /tmp older than TMPTIME specified in /etc/default/rcS
# are erased automatically on reboot.
# Make sure the specified directory in LOCATION is writeable by apache
'LOCATION': '/tmp/django_cache/',
'TIMEOUT': 60*60, #one hour
'OPTIONS' : {
'MAX_ENTRIES' : 100
}
}
}
\ No newline at end of file
Overview
========
The purpose of this module is to unit test periodic tasks
created with edinsights.core.decorators.cron
The module does not provide any additional functionallity
Despite the name of the module, your periodic tasks
do NOT have to be inside this module. They can be
located in any tasks.py file in any django app
directory.
Running Tests
=============
Because testing periodic tasks is slow (~20s) they
are excluded from testing by default.
To test the module, add it to INSTALLED_APPS in settings.py
To run the tests:
python manage.py test periodic
# This module provides tests for periodic tasks using core.decorators.cron
from edinsights.core.decorators import view
from edinsights.periodic.tasks import big_computation
@view()
def big_computation_visualizer():
return "<html>%s</html>" % big_computation()
import tempfile
import time
from django.core.cache import cache
from edinsights.core.decorators import memoize_query, cron
from django.utils.timezone import timedelta
@cron(run_every=timedelta(seconds=1))
def test_cron_task():
""" Simple task that gets executed by the scheduler (celery beat).
The test case test_cron verifies that the execution
has taken place.
Defined outside of the SimpleTest class because current support of celery decorators
for methods and nested functions is experimental.
"""
with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'a') as temp_file:
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
@cron(run_every=timedelta(seconds=1)) # cron decorators should go on top
@memoize_query(60, key_override='test_cron_memoize_unique_cache_key')
def test_cron_memoize_task():
""" Simple task that gets executed by the scheduler (celery beat).
The test case test_cron_and_memoize verifies that the execution
has taken place.
Defined outside of the SimpleTest class because current support of celery decorators
for methods and nested functions is experimental.
The cron decorator should precede all other decorators
"""
with open(tempfile.gettempdir() + '/' + 'test_cron_memoize_task', 'a') as temp_file:
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
return 42
@cron(run_every=timedelta(seconds=1)) # cron decorators should go on top
@memoize_query(cache_time=60, key_override='big_computation_key')
def big_computation():
# time.sleep(seconds=10)
with open(tempfile.gettempdir() + '/' + 'big_computation_counter', 'a') as temp_file:
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
return "FAKERESULT"
\ No newline at end of file
import tempfile
import time
from django.test import TestCase
from django.test.client import Client
from django.core.cache import cache
def count_timestamps(tempfilename):
with open(tempfile.gettempdir() + '/' + tempfilename, 'r') as temp_file:
timestamps = temp_file.readlines()
ncalls = len(timestamps)
last_call = float(timestamps[-1].rstrip())
return ncalls, last_call
def run_celery_beat(seconds=3, verbose=False):
""" Runs the task scheduler celery beat for the specified number of seconds as a child process
"""
import os
with open(os.devnull, 'w') as devnull:
from subprocess import Popen
command = ['python', 'manage.py', 'celery', 'worker', '-B', '--loglevel=INFO',]
if verbose:
suppress_output_args = {}
else:
suppress_output_args = {'stdout':devnull, 'stderr':devnull}
celery_beat_process = Popen(command, **suppress_output_args)
# give time to celery beat to execute test_cron_task
from time import sleep
print "running periodic tasks for %s seconds... " % seconds
sleep(seconds)
celery_beat_process.terminate()
class SimpleTest(TestCase):
def __init__(self, arg):
TestCase.__init__(self, arg)
def test_cron(self):
""" Test that periodic tasks are scheduled and run
"""
# truncate the file used as a counter of test_cron_task calls
# the file is used to share state between the test process and
# the scheduler process (celery beat)
with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'w') as temp_file:
pass
run_celery_beat(seconds=3,verbose=False)
# verify number of calls and time of last call
ncalls, last_call = count_timestamps('test_cron_task_counter')
self.assertGreaterEqual(ncalls,2)
self.assertAlmostEqual(last_call, time.time(), delta=100)
def test_cron_and_memoize(self):
""" Test that periodic tasks are scheduled and run, and the results
are cached.
"""
# truncate the file used as a counter of test_cron_task calls
# the file is used to share state between the test process and
# the scheduler process (celery beat)
with open(tempfile.gettempdir() + '/' + 'test_cron_memoize_task', 'w') as temp_file:
pass
# clear the cache from any previous executions of this test
cache.delete('test_cron_memoize_unique_cache_key')
run_celery_beat(seconds=3,verbose=False)
ncalls, last_call = count_timestamps('test_cron_memoize_task')
self.assertEqual(ncalls,1) # after the first call all subsequent calls should be cached
self.assertAlmostEqual(last_call, time.time(), delta=100)
def test_cron_and_memoize_and_view(self):
""" Test that periodic tasks are scheduled, run, cached, and the
cached results are available to @view
"""
# truncate the file used as a counter of big_computation calls
# the file is used to share state between the test process and
# the scheduler process (celery beat)
with open(tempfile.gettempdir() + '/' + 'big_computation_counter', 'w') as temp_file:
pass
# delete cache from previous executions of this unit test
cache.delete('big_computation_key')
run_celery_beat(seconds=3, verbose=False)
ncalls_before, lastcall_before = count_timestamps('big_computation_counter')
self.assertEqual(ncalls_before,1) # after the first call all subsequent calls should be cached
c = Client()
status_code = c.get('/view/big_computation_visualizer').status_code
content = c.get('/view/big_computation_visualizer').content
self.assertEqual(status_code, 200)
self.assertEqual(content, "<html>FAKERESULT</html>")
# ensure big_computation was not called and the cached result was used
# by the execution of c.get('/view...')
ncalls_after, lastcall_after = count_timestamps('big_computation_counter')
self.assertEqual(ncalls_before, ncalls_after)
self.assertEqual(lastcall_before, lastcall_after)
\ No newline at end of file
......@@ -44,18 +44,7 @@ DATABASES = {
}
}
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': 'analytics-experiments',
'TIMEOUT': 60*60, #one hour
'OPTIONS' : {
'MAX_ENTRIES' : 100
}
}
}
# Hosts/domain names that are valid for this site; required if DEBUG is False
# Hosts/domain names that are for this site; required if DEBUG is False
# See https://docs.djangoproject.com/en/1.5/ref/settings/#allowed-hosts
ALLOWED_HOSTS = []
......@@ -150,7 +139,8 @@ DJ_REQUIRED_APPS = ( 'djeventstream.httphandler',
'south',
'core',
'modulefs',
'modules',)
'modules',
'periodic',)
INSTALLED_APPS = (
'django.contrib.auth',
......@@ -203,4 +193,12 @@ LOGGING = {
# 'error', r"DateTimeField received a naive datetime",
# RuntimeWarning, r'django\.db\.models\.fields')
from celerysettings import *
\ No newline at end of file
#initialize celery
import djcelery
djcelery.setup_loader()
#import the settings for celery from the edinsights module
from edinsights.celerysettings_dev import *
# import django cache settings
from edinsights.djangocachesettings_dev import *
\ No newline at end of file
# This settings file is used for testing cron (periodic tasks)
from settings import *
CELERY_IMPORTS = ()
# CELERY_IMPORTS += ('core.testtasks',)
CELERY_IMPORTS += ('core.tests',)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment