Commit e22fb097 by Steve Komarov

fixed periodic tasks/added celerysettings config

parent 06fed7ab
...@@ -35,7 +35,7 @@ nosetests.xml ...@@ -35,7 +35,7 @@ nosetests.xml
.mr.developer.cfg .mr.developer.cfg
.project .project
.pydevproject .pydevproject
db.sql *db.sql
# Backups # Backups
*~ *~
......
# required for queuing new tasks but does not store results
BROKER_URL = 'mongodb://localhost/celery'
# required for storing results (might be unnecessary)
CELERY_RESULT_BACKEND = 'mongodb://localhost/celeryresult'
#
CELERY_TASK_RESULT_EXPIRES = 60 * 60 #1 hour
\ No newline at end of file
...@@ -154,17 +154,20 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong ...@@ -154,17 +154,20 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong
return decorator(wrap_function,f) return decorator(wrap_function,f)
return view_factory return view_factory
def cron(period, params=None): def cron(run_every, params=None):
''' Run command periodically ''' Run command periodically
Unknown whether or how well this works. The task scheduler process (typically celery beat) needs to be started
manually by the client module with:
python manage.py celery worker -B --loglevel=INFO
Celery beat will automatically add tasks from files named 'tasks.py'
''' '''
def factory(f): def factory(f):
@periodic_task(run_every=period, name=f.__name__) @periodic_task(run_every=run_every, name=f.__name__)
def run(): def run():
import edinsights.core.views from edinsights import core
mongodb = core.views.get_mongo(f) mongodb = core.util.get_mongo(f)
fs = core.views.get_filesystem(f) fs = core.util.get_filesystem(f)
f(fs, mongodb, params) f(fs, mongodb, params)
return decorator(run,f) return decorator(run,f)
return factory return factory
......
...@@ -258,6 +258,9 @@ def get_embed(t, config = None): ...@@ -258,6 +258,9 @@ def get_embed(t, config = None):
return single_embed(t) return single_embed(t)
class djobject(): class djobject():
## djobject, you should ignore. Use view and query objects directly.
## Combining the two in this way is probably not a good abstraction
## (I could be wrong; just current intuition).
def __init__(self, baseurl = None, headers = {}): def __init__(self, baseurl = None, headers = {}):
self.view = single_embed('view', baseurl = baseurl, headers = headers) self.view = single_embed('view', baseurl = baseurl, headers = headers)
self.query = single_embed('query', baseurl = baseurl, headers = headers) self.query = single_embed('query', baseurl = baseurl, headers = headers)
...@@ -275,7 +278,7 @@ if __name__ == "__main__": ...@@ -275,7 +278,7 @@ if __name__ == "__main__":
'policy' : { 'total_user_count' : 'allow', 'policy' : { 'total_user_count' : 'allow',
'user_count' : 'allow', 'user_count' : 'allow',
'dash' : 'deny', 'dash' : 'deny',
'page_count' : ['user'] } 'page_count' : ['user'] } # List of parameters to lock down
} }
context = { 'user' : 'bob', context = { 'user' : 'bob',
......
''' This, together with decorators.py, is the entire API intended to ''' This, together with decorators.py, is the entire API intended to
be used by plug-in modules. be used by plug-in modules.
All of this should go through queries, not directly through the
DBs. I wrote this, and later released it would break abstractions
in not great ways. We may need to readd for performance eventually?
''' '''
from util import get_cache, get_filesystem, get_database from util import get_cache, get_filesystem, get_database
...@@ -7,18 +11,28 @@ from util import get_cache, get_filesystem, get_database ...@@ -7,18 +11,28 @@ from util import get_cache, get_filesystem, get_database
def get_replica_database(module): def get_replica_database(module):
''' Get a read-replica database of a different module. At ''' Get a read-replica database of a different module. At
present, not a read-replica, but this will change in the present, not a read-replica, but this will change in the
future. ''' future.
This is a bad idea, and should be removed in the future'''
print 'deprecated'
get_database(module) get_database(module)
def get_replica_filesystem(module): def get_replica_filesystem(module):
''' Get a read-replica filesystem of a different module. At ''' Get a read-replica filesystem of a different module. At
present, not a read-replica, but this will change in the present, not a read-replica, but this will change in the
future. ''' future.
This is a bad idea, and should be removed in the future'''
print 'deprecated'
get_filesystem(module) get_filesystem(module)
def get_replica_cache(module): def get_replica_cache(module):
''' Get a read-replica cache of a different module. At ''' Get a read-replica cache of a different module. At
present, not a read-replica, but this will change in the present, not a read-replica, but this will change in the
future. ''' future.
This is a bad idea, and should be removed in the future'''
print 'deprecated'
return get_cache(module) return get_cache(module)
...@@ -4,11 +4,29 @@ when you run "manage.py test". ...@@ -4,11 +4,29 @@ when you run "manage.py test".
Replace this with more appropriate tests for your application. Replace this with more appropriate tests for your application.
""" """
import time import time, tempfile
from django.test import TestCase from django.test import TestCase
from decorators import memoize_query from decorators import memoize_query,cron
from django.utils.timezone import timedelta
from celery.task import periodic_task
@cron(run_every=timedelta(seconds=1))
def test_cron_task(*args):
""" Simple task that gets executed by the scheduler (celery beat).
The test case test_cron defined below verifies that the execution
has taken place.
Defined outside of the SimpleTest class because current support of celery decorators
for methods and nested functions is experimental.
"""
with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'a') as temp_file:
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
class SimpleTest(TestCase): class SimpleTest(TestCase):
def test_basic_addition(self): def test_basic_addition(self):
...@@ -19,26 +37,27 @@ class SimpleTest(TestCase): ...@@ -19,26 +37,27 @@ class SimpleTest(TestCase):
def __init__(self, arg): def __init__(self, arg):
TestCase.__init__(self, arg) TestCase.__init__(self, arg)
self.calls = 0 self.memoize_calls = 0
def test_memoize(self): def test_memoize(self):
self.calls = 0 self.memoize_calls = 0
@memoize_query(0.05) @memoize_query(0.05)
def double_trouble(x): def double_trouble(x):
self.calls = self.calls + 1 self.memoize_calls = self.memoize_calls + 1
return 2*x return 2*x
self.assertEqual(double_trouble(2), 4) self.assertEqual(double_trouble(2), 4)
self.assertEqual(double_trouble(4), 8) self.assertEqual(double_trouble(4), 8)
self.assertEqual(double_trouble(2), 4) self.assertEqual(double_trouble(2), 4)
self.assertEqual(double_trouble(4), 8) self.assertEqual(double_trouble(4), 8)
self.assertEqual(self.calls, 2) self.assertEqual(self.memoize_calls, 2)
time.sleep(0.1) time.sleep(0.1)
self.assertEqual(double_trouble(2), 4) self.assertEqual(double_trouble(2), 4)
self.assertEqual(double_trouble(4), 8) self.assertEqual(double_trouble(4), 8)
self.assertEqual(double_trouble(2), 4) self.assertEqual(double_trouble(2), 4)
self.assertEqual(double_trouble(4), 8) self.assertEqual(double_trouble(4), 8)
self.assertEqual(self.calls, 4) self.assertEqual(self.memoize_calls, 4)
def test_auth(self): def test_auth(self):
''' Inject a dummy settings.DJA_AUTH into auth. ''' Inject a dummy settings.DJA_AUTH into auth.
...@@ -88,3 +107,37 @@ class SimpleTest(TestCase): ...@@ -88,3 +107,37 @@ class SimpleTest(TestCase):
for url in urls: for url in urls:
response = c.get(url) response = c.get(url)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
def test_cron(self):
""" Test that periodic tasks are scheduled and run
"""
# truncate the file used as a counter of test_cron_task calls
# the file is used to share state between the test process and
# the scheduler process (celery beat)
with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'w') as temp_file:
pass
import os
with open(os.devnull, 'w') as devnull:
from subprocess import Popen,PIPE
command = ['python', 'manage.py', 'celery', 'worker', '-B', '--loglevel=INFO', '--settings=testsettings',]
#supressing stdout and stderr, remove if more debug info is needed
celery_beat_process = Popen(command, stdout=devnull, stderr=devnull)
# give time to celery beat to execute test_cron_task
from time import sleep
sleep_duration = 3
print "Sleeping for %s seconds... " % sleep_duration
sleep(sleep_duration)
celery_beat_process.terminate()
# verify number of calls and time of last call
with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'r') as temp_file:
timestamps = temp_file.readlines()
ncalls = len(timestamps)
self.assertGreaterEqual(ncalls,2)
last_call = float(timestamps[-1].rstrip())
self.assertAlmostEqual(last_call, time.time(), delta=10)
...@@ -8,7 +8,7 @@ from django.core.cache import cache ...@@ -8,7 +8,7 @@ from django.core.cache import cache
from edinsights.modulefs import modulefs from edinsights.modulefs import modulefs
connection = MongoClient() connection = MongoClient() # TODO: Parameter setting for Mongos over the network
def import_view_modules(): def import_view_modules():
''' '''
......
...@@ -97,6 +97,7 @@ def handle_event(sender, **kwargs): ...@@ -97,6 +97,7 @@ def handle_event(sender, **kwargs):
This is not a view, but it is the moral equivalent. This is not a view, but it is the moral equivalent.
''' '''
# Handle strings, lists, and dictionaries # Handle strings, lists, and dictionaries
# TODO handle errors if not valid json
msg = kwargs['msg'] msg = kwargs['msg']
if isinstance(msg,str) or isinstance(msg,unicode): if isinstance(msg,str) or isinstance(msg,unicode):
msg = json.loads(msg) msg = json.loads(msg)
......
...@@ -2,6 +2,8 @@ import django ...@@ -2,6 +2,8 @@ import django
from django.db import models from django.db import models
import datetime import datetime
from django.utils import timezone
## Create your models here. ## Create your models here.
#class StudentBookAccesses(models.Model): #class StudentBookAccesses(models.Model):
# username = models.CharField(max_length=500, unique=True) # TODO: Should not have max_length # username = models.CharField(max_length=500, unique=True) # TODO: Should not have max_length
...@@ -20,7 +22,7 @@ class FSExpirations(models.Model): ...@@ -20,7 +22,7 @@ class FSExpirations(models.Model):
''' May be used instead of the constructor to create a new expiration. ''' May be used instead of the constructor to create a new expiration.
Automatically applies timedelta and saves to DB. Automatically applies timedelta and saves to DB.
''' '''
expiration_time = datetime.datetime.now() + datetime.timedelta(days, seconds) expiration_time = timezone.now() + timezone.timedelta(days, seconds)
# If object exists, update it # If object exists, update it
objects = cls.objects.filter(module = module, filename = filename) objects = cls.objects.filter(module = module, filename = filename)
...@@ -47,7 +49,9 @@ class FSExpirations(models.Model): ...@@ -47,7 +49,9 @@ class FSExpirations(models.Model):
@classmethod @classmethod
def expired(cls): def expired(cls):
''' Returns a list of expired objects ''' ''' Returns a list of expired objects '''
return cls.objects.filter(expires=True, expiration__lte = datetime.datetime.now())
expiration_lte = timezone.now()
return cls.objects.filter(expires=True, expiration__lte = expiration_lte)
class Meta: class Meta:
unique_together = (("module","filename")) unique_together = (("module","filename"))
......
...@@ -6,12 +6,6 @@ import datetime ...@@ -6,12 +6,6 @@ import datetime
# DJOBJECT_CONFIG = [{}, {'baseurl' : 'http://127.0.0.1:9022/'}] # DJOBJECT_CONFIG = [{}, {'baseurl' : 'http://127.0.0.1:9022/'}]
DJ_REQUIRED_APPS = ( 'djeventstream.httphandler',
'djcelery',
'south',
'core',
'modulefs',
'modules',)
# Types of parameters that queries and views can take. # Types of parameters that queries and views can take.
# This is not properly used yet. # This is not properly used yet.
...@@ -26,10 +20,6 @@ TIME_BETWEEN_DATA_REGENERATION = datetime.timedelta(minutes=1) ...@@ -26,10 +20,6 @@ TIME_BETWEEN_DATA_REGENERATION = datetime.timedelta(minutes=1)
INSTALLED_ANALYTICS_MODULES = ('modules.testmodule',) INSTALLED_ANALYTICS_MODULES = ('modules.testmodule',)
#Initialize celery
import djcelery
djcelery.setup_loader()
SNS_SUBSCRIPTIONS = [] SNS_SUBSCRIPTIONS = []
#### Default Django settings below. #### Default Django settings below.
...@@ -151,6 +141,13 @@ TEMPLATE_DIRS = ( ...@@ -151,6 +141,13 @@ TEMPLATE_DIRS = (
# Don't forget to use absolute paths, not relative paths. # Don't forget to use absolute paths, not relative paths.
) )
DJ_REQUIRED_APPS = ( 'djeventstream.httphandler',
'djcelery',
'south',
'core',
'modulefs',
'modules',)
INSTALLED_APPS = ( INSTALLED_APPS = (
'django.contrib.auth', 'django.contrib.auth',
'django.contrib.contenttypes', 'django.contrib.contenttypes',
...@@ -192,3 +189,11 @@ LOGGING = { ...@@ -192,3 +189,11 @@ LOGGING = {
}, },
} }
} }
##uncomment to provide full location of code that uses naive datetime
#import warnings
#warnings.filterwarnings(
# 'error', r"DateTimeField received a naive datetime",
# RuntimeWarning, r'django\.db\.models\.fields')
from celerysettings import *
\ No newline at end of file
# This settings file is used for testing cron (periodic tasks)
from settings import *
CELERY_IMPORTS = ()
# CELERY_IMPORTS += ('core.testtasks',)
CELERY_IMPORTS += ('core.tests',)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment