Commit 43cf2c81 by Steve Komarov

added force memoize needs tests

parent 4f7a5a66
...@@ -97,6 +97,23 @@ def query(category = None, name = None, description = None, args = None): ...@@ -97,6 +97,23 @@ def query(category = None, name = None, description = None, args = None):
return query_factory return query_factory
def mq_force_memoize(func):
if hasattr(func, 'force_memoize'):
print "it has it"
return func.force_memoize
else:
print "it has not"
return func
def mq_force_retrieve(func):
if hasattr(func, 'force_retrieve'):
print "it has it"
return func.force_retrieve
else:
print "it has not"
return func
def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymongo.database.Database'>", "<class 'fs.osfs.OSFS'>"], key_override=None): def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymongo.database.Database'>", "<class 'fs.osfs.OSFS'>"], key_override=None):
''' Call function only if we do not have the results for its execution already ''' Call function only if we do not have the results for its execution already
We ignore parameters of type pymongo.database.Database and fs.osfs.OSFS. These We ignore parameters of type pymongo.database.Database and fs.osfs.OSFS. These
...@@ -105,66 +122,100 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong ...@@ -105,66 +122,100 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ["<class 'pymong
key_override: use this as a cache key instead of computing a key from the key_override: use this as a cache key instead of computing a key from the
function signature. Useful for testing. function signature. Useful for testing.
''' '''
print "in memoize query"
# Helper functions
def isuseful(a, ignores): def isuseful(a, ignores):
if str(type(a)) in ignores: if str(type(a)) in ignores:
return False return False
return True return True
def view_factory(f): def make_cache_key(f, args, kwargs):
def wrap_function(f, *args, **kwargs): # Assumption: dict gets dumped in same order
# Assumption: dict gets dumped in same order # Arguments are serializable. This is okay since
# Arguments are serializable. This is okay since # this is just for SOA queries, but may break
# this is just for SOA queries, but may break # down if this were to be used as a generic
# down if this were to be used as a generic # memoization framework
# memoization framework m = hashlib.new("md4")
s = str({'uniquifier': 'anevt.memoize',
if key_override is not None: 'name' : f.__name__,
key = key_override 'module' : f.__module__,
'args': [a for a in args if isuseful(a, ignores)],
'kwargs': kwargs})
m.update(s)
if key_override is not None:
key = key_override
else:
key = m.hexdigest()
return key
def compute_and_cache(f, key, args, kwargs):
# HACK: There's a slight race condition here, where we
# might recompute twice.
cache.set(key, 'Processing', timeout)
function_argspec = inspect.getargspec(f)
if function_argspec.varargs or function_argspec.args:
if function_argspec.keywords:
results = f(*args, **kwargs)
else: else:
m = hashlib.new("md4") results = f(*args)
s = str({'uniquifier': 'anevt.memoize', else:
'name' : f.__name__, results = f()
'module' : f.__module__, cache.set(key, results, cache_time)
'args': [a for a in args if isuseful(a, ignores)], return results
'kwargs': kwargs})
m.update(s)
key = m.hexdigest() def get_from_cache_if_possible(f, key):
cached = cache.get(key)
# Check if we've cached the computation, or are in the # If we're already computing it, wait to finish
# process of computing it # computation
while cached == 'Processing':
cached = cache.get(key) cached = cache.get(key)
if cached: time.sleep(0.1)
# print "Cache hit", f.__name__, key # At this point, cached should be the result of the
# If we're already computing it, wait to finish # cache line, unless we had a failure/timeout, in
# computation # which case, it is false
while cached == 'Processing': results = cached
cached = cache.get(key) return results
time.sleep(0.1)
# At this point, cached should be the result of the def factory(f):
# cache line, unless we had a failure/timeout, in print "in factory"
# which case, it is false def opmode_default(f, *args, **kwargs):
results = cached print "in opmode_default"
key = make_cache_key(f, args, kwargs)
if not cached: results = get_from_cache_if_possible(f, key)
# print "Cache miss",f.__name__, key if results:
# HACK: There's a slight race condition here, where we print "Cache hit %s %s" % (f.__name__, key)
# might recompute twice. pass
cache.set(key, 'Processing', timeout) else:
function_argspec = inspect.getargspec(f) print "Cache miss %s %s" % (f.__name__, key)
if function_argspec.varargs or function_argspec.args: results = compute_and_cache(f,key, args, kwargs)
if function_argspec.keywords: return results
results = f(*args, **kwargs)
else:
results = f(*args)
else:
results = f()
cache.set(key, results, cache_time)
def opmode_forcememoize(*args, **kwargs):
# Recompute and store in cache, regardless of whether key
# is in cache.
key = make_cache_key(f, args, kwargs)
print "Forcing memoize %s %s " % (f.__name__, key)
results = compute_and_cache(f, key, args, kwargs)
return results return results
return decorator(wrap_function,f)
return view_factory
def cron(run_every, params=None): def opmode_forceretrieve(*args, **kwargs):
# Retrieve from cache if possible otherwise exception
print "Forcing retrieve %s %s " % (f.__name__, key)
key = make_cache_key(f, args, kwargs)
results = get_from_cache_if_possible(f, key)
if not results:
raise KeyError('key %s not found in cache' % key) # TODO better exception class?
return results
decfun = decorator(opmode_default,f)
decfun.force_memoize = opmode_forcememoize
decfun.force_retrieve = opmode_forceretrieve
return decfun
return factory
def cron(run_every, force_memoize=False, params={}):
''' Run command periodically ''' Run command periodically
The task scheduler process (typically celery beat) needs to be started The task scheduler process (typically celery beat) needs to be started
...@@ -172,13 +223,23 @@ def cron(run_every, params=None): ...@@ -172,13 +223,23 @@ def cron(run_every, params=None):
python manage.py celery worker -B --loglevel=INFO python manage.py celery worker -B --loglevel=INFO
Celery beat will automatically add tasks from files named 'tasks.py' Celery beat will automatically add tasks from files named 'tasks.py'
''' '''
print "in cron"
def factory(f): def factory(f):
print "in factory"
@periodic_task(run_every=run_every, name=f.__name__) @periodic_task(run_every=run_every, name=f.__name__)
def run(func=None): def run(func=None, *args, **kw):
if func:
result = optional_parameter_call(func, default_optional_kwargs, params) if not func:
else: #called as a periodic task... func will be None
result = optional_parameter_call(f, default_optional_kwargs, params) func = f
if force_memoize:
func = mq_force_memoize(func)
result = optional_parameter_call(func, default_optional_kwargs, params)
return result return result
return decorator(run, f) return decorator(run, f)
return factory return factory
......
...@@ -104,7 +104,7 @@ def get_query(f): ...@@ -104,7 +104,7 @@ def get_query(f):
return get_embed('query', config = embed_config) return get_embed('query', config = embed_config)
def optional_parameter_call(function, optional_kwargs, passed_kwargs, arglist = None): def optional_parameter_call(function, optional_kwargs, passed_kwargs, arglist = None):
''' Calls a function with parameters: ''' Calls a function with parameters:
passed_kwargs are input parameters the function must take. passed_kwargs are input parameters the function must take.
Format: Dictionary mapping keywords to arguments. Format: Dictionary mapping keywords to arguments.
......
...@@ -7,7 +7,7 @@ from edinsights.core.decorators import memoize_query, cron ...@@ -7,7 +7,7 @@ from edinsights.core.decorators import memoize_query, cron
from django.utils.timezone import timedelta from django.utils.timezone import timedelta
@cron(run_every=timedelta(seconds=1)) @cron(run_every=timedelta(seconds=1))
def test_cron_task(): def test_cron_task(params={}):
""" Simple task that gets executed by the scheduler (celery beat). """ Simple task that gets executed by the scheduler (celery beat).
The test case test_cron verifies that the execution The test case test_cron verifies that the execution
has taken place. has taken place.
...@@ -19,7 +19,7 @@ def test_cron_task(): ...@@ -19,7 +19,7 @@ def test_cron_task():
temp_file.write(str(time.time()) + '\n') #write a timestamp for each call temp_file.write(str(time.time()) + '\n') #write a timestamp for each call
@cron(run_every=timedelta(seconds=1)) # cron decorators should go on top @cron(run_every=timedelta(seconds=1), force_memoize=True) # cron decorators should go on top
@memoize_query(60, key_override='test_cron_memoize_unique_cache_key') @memoize_query(60, key_override='test_cron_memoize_unique_cache_key')
def test_cron_memoize_task(): def test_cron_memoize_task():
""" Simple task that gets executed by the scheduler (celery beat). """ Simple task that gets executed by the scheduler (celery beat).
...@@ -37,7 +37,7 @@ def test_cron_memoize_task(): ...@@ -37,7 +37,7 @@ def test_cron_memoize_task():
return 42 return 42
@cron(run_every=timedelta(seconds=1)) # cron decorators should go on top @cron(run_every=timedelta(seconds=1), force_memoize=True) # cron decorators should go on top
@memoize_query(cache_time=60, key_override='big_computation_key') @memoize_query(cache_time=60, key_override='big_computation_key')
def big_computation(): def big_computation():
# time.sleep(seconds=10) # time.sleep(seconds=10)
......
...@@ -48,7 +48,7 @@ class SimpleTest(TestCase): ...@@ -48,7 +48,7 @@ class SimpleTest(TestCase):
with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'w') as temp_file: with open(tempfile.gettempdir() + '/' + 'test_cron_task_counter', 'w') as temp_file:
pass pass
run_celery_beat(seconds=3,verbose=False) run_celery_beat(seconds=3,verbose=True)
# verify number of calls and time of last call # verify number of calls and time of last call
...@@ -71,7 +71,7 @@ class SimpleTest(TestCase): ...@@ -71,7 +71,7 @@ class SimpleTest(TestCase):
# clear the cache from any previous executions of this test # clear the cache from any previous executions of this test
cache.delete('test_cron_memoize_unique_cache_key') cache.delete('test_cron_memoize_unique_cache_key')
run_celery_beat(seconds=3,verbose=False) run_celery_beat(seconds=3,verbose=True)
ncalls, last_call = count_timestamps('test_cron_memoize_task') ncalls, last_call = count_timestamps('test_cron_memoize_task')
self.assertEqual(ncalls,1) # after the first call all subsequent calls should be cached self.assertEqual(ncalls,1) # after the first call all subsequent calls should be cached
...@@ -91,7 +91,7 @@ class SimpleTest(TestCase): ...@@ -91,7 +91,7 @@ class SimpleTest(TestCase):
# delete cache from previous executions of this unit test # delete cache from previous executions of this unit test
cache.delete('big_computation_key') cache.delete('big_computation_key')
run_celery_beat(seconds=3, verbose=False) run_celery_beat(seconds=3, verbose=True)
ncalls_before, lastcall_before = count_timestamps('big_computation_counter') ncalls_before, lastcall_before = count_timestamps('big_computation_counter')
self.assertEqual(ncalls_before,1) # after the first call all subsequent calls should be cached self.assertEqual(ncalls_before,1) # after the first call all subsequent calls should be cached
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment