Commit e9e56973 by Piotr Mitros

Merge pull request #7 from edx/stkomarov-fixes

Stkomarov fixes
parents 507b403a 4f7a5a66
......@@ -35,7 +35,7 @@ nosetests.xml
.mr.developer.cfg
.project
.pydevproject
db.sql
*db.sql
# Backups
*~
......
......@@ -80,6 +80,24 @@ the test module will be at:
http://127.0.0.1:8000/static/index.html
Running periodic tasks
-------------------------------------
Periodic tasks (which are scheduled with core.decorators.cron)
rely on Celery for execution. It is the reponsability of the
client django project to ensure Celery is configured and running.
To configure, add the following to settings.py of your django
project:
from edinsights.celerysettings import *
To start celery, run from your django project
python manage.py celery worker -B
Only tasks located in files named "tasks.py" located in the main
directory of your django project or installed django app will
be scheduled.
Building on top of the framework
--------------------------------
......
# required for queuing new tasks but does not store results
BROKER_URL = 'mongodb://localhost/celery'
# required for storing results (might be unnecessary)
CELERY_RESULT_BACKEND = 'mongodb://localhost/celeryresult'
#
CELERY_TASK_RESULT_EXPIRES = 60 * 60 #1 hour
\ No newline at end of file
......@@ -21,6 +21,8 @@ from django.core.cache import cache
from django.conf import settings
from celery.task import PeriodicTask, periodic_task
from util import optional_parameter_call
from util import default_optional_kwargs
import registry
from registry import event_handlers, request_handlers
......@@ -95,10 +97,13 @@ def query(category = None, name = None, description = None, args = None):
return query_factory
def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymongo.database.Database'>", "<class 'fs.osfs.OSFS'>"]):
def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymongo.database.Database'>", "<class 'fs.osfs.OSFS'>"], key_override=None):
''' Call function only if we do not have the results for its execution already
We ignore parameters of type pymongo.database.Database and fs.osfs.OSFS. These
will be different per call, but function identically.
key_override: use this as a cache key instead of computing a key from the
function signature. Useful for testing.
'''
def isuseful(a, ignores):
if str(type(a)) in ignores:
......@@ -112,19 +117,24 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong
# this is just for SOA queries, but may break
# down if this were to be used as a generic
# memoization framework
m = hashlib.new("md4")
s = str({'uniquifier': 'anevt.memoize',
'name' : f.__name__,
'module' : f.__module__,
'args': [a for a in args if isuseful(a, ignores)],
'kwargs': kwargs})
m.update(s)
key = m.hexdigest()
if key_override is not None:
key = key_override
else:
m = hashlib.new("md4")
s = str({'uniquifier': 'anevt.memoize',
'name' : f.__name__,
'module' : f.__module__,
'args': [a for a in args if isuseful(a, ignores)],
'kwargs': kwargs})
m.update(s)
key = m.hexdigest()
# Check if we've cached the computation, or are in the
# process of computing it
cached = cache.get(key)
if cached:
#print "Cache hit", key
# print "Cache hit", f.__name__, key
# If we're already computing it, wait to finish
# computation
while cached == 'Processing':
......@@ -136,7 +146,7 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong
results = cached
if not cached:
#print "Cache miss", key
# print "Cache miss",f.__name__, key
# HACK: There's a slight race condition here, where we
# might recompute twice.
cache.set(key, 'Processing', timeout)
......@@ -154,19 +164,23 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong
return decorator(wrap_function,f)
return view_factory
def cron(period, params=None):
def cron(run_every, params=None):
''' Run command periodically
Unknown whether or how well this works.
The task scheduler process (typically celery beat) needs to be started
manually by the client module with:
python manage.py celery worker -B --loglevel=INFO
Celery beat will automatically add tasks from files named 'tasks.py'
'''
def factory(f):
@periodic_task(run_every=period, name=f.__name__)
def run():
import edinsights.core.views
mongodb = core.views.get_mongo(f)
fs = core.views.get_filesystem(f)
f(fs, mongodb, params)
return decorator(run,f)
@periodic_task(run_every=run_every, name=f.__name__)
def run(func=None):
if func:
result = optional_parameter_call(func, default_optional_kwargs, params)
else:
result = optional_parameter_call(f, default_optional_kwargs, params)
return result
return decorator(run, f)
return factory
def event_property(name=None, description=None):
......
......@@ -258,6 +258,9 @@ def get_embed(t, config = None):
return single_embed(t)
class djobject():
## djobject, you should ignore. Use view and query objects directly.
## Combining the two in this way is probably not a good abstraction
## (I could be wrong; just current intuition).
def __init__(self, baseurl = None, headers = {}):
self.view = single_embed('view', baseurl = baseurl, headers = headers)
self.query = single_embed('query', baseurl = baseurl, headers = headers)
......@@ -275,7 +278,7 @@ if __name__ == "__main__":
'policy' : { 'total_user_count' : 'allow',
'user_count' : 'allow',
'dash' : 'deny',
'page_count' : ['user'] }
'page_count' : ['user'] } # List of parameters to lock down
}
context = { 'user' : 'bob',
......
''' This, together with decorators.py, is the entire API intended to
be used by plug-in modules.
All of this should go through queries, not directly through the
DBs. I wrote this, and later released it would break abstractions
in not great ways. We may need to readd for performance eventually?
'''
from util import get_cache, get_filesystem, get_database
......@@ -7,18 +11,28 @@ from util import get_cache, get_filesystem, get_database
def get_replica_database(module):
''' Get a read-replica database of a different module. At
present, not a read-replica, but this will change in the
future. '''
future.
This is a bad idea, and should be removed in the future'''
print 'deprecated'
get_database(module)
def get_replica_filesystem(module):
''' Get a read-replica filesystem of a different module. At
present, not a read-replica, but this will change in the
future. '''
future.
This is a bad idea, and should be removed in the future'''
print 'deprecated'
get_filesystem(module)
def get_replica_cache(module):
''' Get a read-replica cache of a different module. At
present, not a read-replica, but this will change in the
future. '''
future.
This is a bad idea, and should be removed in the future'''
print 'deprecated'
return get_cache(module)
......@@ -7,9 +7,9 @@ Replace this with more appropriate tests for your application.
import time
from django.test import TestCase
from decorators import memoize_query
class SimpleTest(TestCase):
def test_basic_addition(self):
"""
......@@ -19,26 +19,27 @@ class SimpleTest(TestCase):
def __init__(self, arg):
TestCase.__init__(self, arg)
self.calls = 0
self.memoize_calls = 0
def test_memoize(self):
self.calls = 0
self.memoize_calls = 0
return
@memoize_query(0.05)
def double_trouble(x):
self.calls = self.calls + 1
self.memoize_calls = self.memoize_calls + 1
return 2*x
self.assertEqual(double_trouble(2), 4)
self.assertEqual(double_trouble(4), 8)
self.assertEqual(double_trouble(2), 4)
self.assertEqual(double_trouble(4), 8)
self.assertEqual(self.calls, 2)
self.assertEqual(self.memoize_calls, 2)
time.sleep(0.1)
self.assertEqual(double_trouble(2), 4)
self.assertEqual(double_trouble(4), 8)
self.assertEqual(double_trouble(2), 4)
self.assertEqual(double_trouble(4), 8)
self.assertEqual(self.calls, 4)
self.assertEqual(self.memoize_calls, 4)
def test_auth(self):
''' Inject a dummy settings.DJA_AUTH into auth.
......@@ -88,3 +89,7 @@ class SimpleTest(TestCase):
for url in urls:
response = c.get(url)
self.assertEqual(response.status_code, 200)
......@@ -8,7 +8,7 @@ from django.core.cache import cache
from edinsights.modulefs import modulefs
connection = MongoClient()
connection = MongoClient() # TODO: Parameter setting for Mongos over the network
def import_view_modules():
'''
......
......@@ -97,6 +97,7 @@ def handle_event(sender, **kwargs):
This is not a view, but it is the moral equivalent.
'''
# Handle strings, lists, and dictionaries
# TODO handle errors if not valid json
msg = kwargs['msg']
if isinstance(msg,str) or isinstance(msg,unicode):
msg = json.loads(msg)
......
# The django cache is used by core.memoize_query to store results temporarily.
# Because periodic tasks are run from a separate process, and
# because periodic tasks could also be memoized, the
# django cache backend has to be visible across processes.
# Most django.cache.core.backends would work except for LocMemCache
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.filebased.FileBasedCache',
# files in /tmp older than TMPTIME specified in /etc/default/rcS
# are erased automatically on reboot.
# Make sure the specified directory in LOCATION is writeable by apache
'LOCATION': '/tmp/django_cache/',
'TIMEOUT': 60*60, #one hour
'OPTIONS' : {
'MAX_ENTRIES' : 100
}
}
}
\ No newline at end of file
......@@ -2,6 +2,8 @@ import django
from django.db import models
import datetime
from django.utils import timezone
## Create your models here.
#class StudentBookAccesses(models.Model):
# username = models.CharField(max_length=500, unique=True) # TODO: Should not have max_length
......@@ -20,7 +22,7 @@ class FSExpirations(models.Model):
''' May be used instead of the constructor to create a new expiration.
Automatically applies timedelta and saves to DB.
'''
expiration_time = datetime.datetime.now() + datetime.timedelta(days, seconds)
expiration_time = timezone.now() + timezone.timedelta(days, seconds)
# If object exists, update it
objects = cls.objects.filter(module = module, filename = filename)
......@@ -47,7 +49,9 @@ class FSExpirations(models.Model):
@classmethod
def expired(cls):
''' Returns a list of expired objects '''
return cls.objects.filter(expires=True, expiration__lte = datetime.datetime.now())
expiration_lte = timezone.now()
return cls.objects.filter(expires=True, expiration__lte = expiration_lte)
class Meta:
unique_together = (("module","filename"))
......
Overview
========
The purpose of this module is to unit test periodic tasks
created with edinsights.core.decorators.cron
The module does not provide any additional functionallity
Despite the name of the module, your periodic tasks
do NOT have to be inside this module. They can be
located in any tasks.py file in any django app
directory.
Running Tests
=============
Because testing periodic tasks is slow (~20s) they
are excluded from testing by default.
To test the module, add it to INSTALLED_APPS in settings.py
To run the tests:
python manage.py test periodic
# This module provides tests for periodic tasks using core.decorators.cron
from edinsights.core.decorators import view
from edinsights.periodic.tasks import big_computation
@view()
def big_computation_visualizer():
return "<html>%s</html>" % big_computation()
import tempfile
import time
from django.core.cache import cache
from edinsights.core.decorators import memoize_query, cron
from django.utils.timezone import timedelta
@cron(run_every=timedelta(seconds=1))
def test_cron_task():
""" Simple task that gets executed by the scheduler (celery beat).
The test case test_cron verifies that the execution
has taken place.
Defined outside of the SimpleTest class because current support of celery decorators
for methods and nested functions is experimental.
"""
with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'a') as temp_file:
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
@cron(run_every=timedelta(seconds=1)) # cron decorators should go on top
@memoize_query(60, key_override='test_cron_memoize_unique_cache_key')
def test_cron_memoize_task():
""" Simple task that gets executed by the scheduler (celery beat).
The test case test_cron_and_memoize verifies that the execution
has taken place.
Defined outside of the SimpleTest class because current support of celery decorators
for methods and nested functions is experimental.
The cron decorator should precede all other decorators
"""
with open(tempfile.gettempdir() + '/' + 'test_cron_memoize_task', 'a') as temp_file:
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
return 42
@cron(run_every=timedelta(seconds=1)) # cron decorators should go on top
@memoize_query(cache_time=60, key_override='big_computation_key')
def big_computation():
# time.sleep(seconds=10)
with open(tempfile.gettempdir() + '/' + 'big_computation_counter', 'a') as temp_file:
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
return "FAKERESULT"
\ No newline at end of file
import tempfile
import time
from django.test import TestCase
from django.test.client import Client
from django.core.cache import cache
def count_timestamps(tempfilename):
with open(tempfile.gettempdir() + '/' + tempfilename, 'r') as temp_file:
timestamps = temp_file.readlines()
ncalls = len(timestamps)
last_call = float(timestamps[-1].rstrip())
return ncalls, last_call
def run_celery_beat(seconds=3, verbose=False):
""" Runs the task scheduler celery beat for the specified number of seconds as a child process
"""
import os
with open(os.devnull, 'w') as devnull:
from subprocess import Popen
command = ['python', 'manage.py', 'celery', 'worker', '-B', '--loglevel=INFO',]
if verbose:
suppress_output_args = {}
else:
suppress_output_args = {'stdout':devnull, 'stderr':devnull}
celery_beat_process = Popen(command, **suppress_output_args)
# give time to celery beat to execute test_cron_task
from time import sleep
print "running periodic tasks for %s seconds... " % seconds
sleep(seconds)
celery_beat_process.terminate()
class SimpleTest(TestCase):
def __init__(self, arg):
TestCase.__init__(self, arg)
def test_cron(self):
""" Test that periodic tasks are scheduled and run
"""
# truncate the file used as a counter of test_cron_task calls
# the file is used to share state between the test process and
# the scheduler process (celery beat)
with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'w') as temp_file:
pass
run_celery_beat(seconds=3,verbose=False)
# verify number of calls and time of last call
ncalls, last_call = count_timestamps('test_cron_task_counter')
self.assertGreaterEqual(ncalls,2)
self.assertAlmostEqual(last_call, time.time(), delta=100)
def test_cron_and_memoize(self):
""" Test that periodic tasks are scheduled and run, and the results
are cached.
"""
# truncate the file used as a counter of test_cron_task calls
# the file is used to share state between the test process and
# the scheduler process (celery beat)
with open(tempfile.gettempdir() + '/' + 'test_cron_memoize_task', 'w') as temp_file:
pass
# clear the cache from any previous executions of this test
cache.delete('test_cron_memoize_unique_cache_key')
run_celery_beat(seconds=3,verbose=False)
ncalls, last_call = count_timestamps('test_cron_memoize_task')
self.assertEqual(ncalls,1) # after the first call all subsequent calls should be cached
self.assertAlmostEqual(last_call, time.time(), delta=100)
def test_cron_and_memoize_and_view(self):
""" Test that periodic tasks are scheduled, run, cached, and the
cached results are available to @view
"""
# truncate the file used as a counter of big_computation calls
# the file is used to share state between the test process and
# the scheduler process (celery beat)
with open(tempfile.gettempdir() + '/' + 'big_computation_counter', 'w') as temp_file:
pass
# delete cache from previous executions of this unit test
cache.delete('big_computation_key')
run_celery_beat(seconds=3, verbose=False)
ncalls_before, lastcall_before = count_timestamps('big_computation_counter')
self.assertEqual(ncalls_before,1) # after the first call all subsequent calls should be cached
c = Client()
status_code = c.get('/view/big_computation_visualizer').status_code
content = c.get('/view/big_computation_visualizer').content
self.assertEqual(status_code, 200)
self.assertEqual(content, "<html>FAKERESULT</html>")
# ensure big_computation was not called and the cached result was used
# by the execution of c.get('/view...')
ncalls_after, lastcall_after = count_timestamps('big_computation_counter')
self.assertEqual(ncalls_before, ncalls_after)
self.assertEqual(lastcall_before, lastcall_after)
\ No newline at end of file
......@@ -6,12 +6,6 @@ import datetime
# DJOBJECT_CONFIG = [{}, {'baseurl' : 'http://127.0.0.1:9022/'}]
DJ_REQUIRED_APPS = ( 'djeventstream.httphandler',
'djcelery',
'south',
'core',
'modulefs',
'modules',)
# Types of parameters that queries and views can take.
# This is not properly used yet.
......@@ -26,10 +20,6 @@ TIME_BETWEEN_DATA_REGENERATION = datetime.timedelta(minutes=1)
INSTALLED_ANALYTICS_MODULES = ('modules.testmodule',)
#Initialize celery
import djcelery
djcelery.setup_loader()
SNS_SUBSCRIPTIONS = []
#### Default Django settings below.
......@@ -54,14 +44,7 @@ DATABASES = {
}
}
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': 'analytics-experiments'
}
}
# Hosts/domain names that are valid for this site; required if DEBUG is False
# Hosts/domain names that are for this site; required if DEBUG is False
# See https://docs.djangoproject.com/en/1.5/ref/settings/#allowed-hosts
ALLOWED_HOSTS = []
......@@ -69,7 +52,7 @@ ALLOWED_HOSTS = []
# http://en.wikipedia.org/wiki/List_of_tz_zones_by_name
# although not all choices may be available on all operating systems.
# In a Windows environment this must be set to your system time zone.
TIME_ZONE = 'America/Chicago'
TIME_ZONE = 'America/New_York'
# Language code for this installation. All choices can be found here:
# http://www.i18nguy.com/unicode/language-identifiers.html
......@@ -151,6 +134,14 @@ TEMPLATE_DIRS = (
# Don't forget to use absolute paths, not relative paths.
)
DJ_REQUIRED_APPS = ( 'djeventstream.httphandler',
'djcelery',
'south',
'core',
'modulefs',
'modules',
'periodic',)
INSTALLED_APPS = (
'django.contrib.auth',
'django.contrib.contenttypes',
......@@ -192,3 +183,22 @@ LOGGING = {
},
}
}
# # By default timezone-related warnings do not display the location in code
# # where they occurred. The code below will turn these warnings into
# # exceptions with stack trace so that one can identify the offending code.
# # Uncomment to turn timezone warnings into exceptions
# import warnings
# warnings.filterwarnings(
# 'error', r"DateTimeField received a naive datetime",
# RuntimeWarning, r'django\.db\.models\.fields')
#initialize celery
import djcelery
djcelery.setup_loader()
#import the settings for celery from the edinsights module
from edinsights.celerysettings_dev import *
# import django cache settings
from edinsights.djangocachesettings_dev import *
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment