Commit 8e5f08a8 by Steve Komarov

added ignore_memoize attribute, removed unnecessary default_optional_kwargs,…

added ignore_memoize attribute, removed unnecessary default_optional_kwargs, added clear_cache operation mode for memoize_query, fixed minor bugs
parent 68b5db6a
...@@ -12,16 +12,13 @@ import inspect ...@@ -12,16 +12,13 @@ import inspect
import logging import logging
import time import time
from decorator import decorator from decorator import decorator
from django.core.cache import cache from django.core.cache import cache
from django.conf import settings from django.conf import settings
from celery.task import PeriodicTask, periodic_task from celery.task import periodic_task
from util import optional_parameter_call from util import optional_parameter_call
from util import default_optional_kwargs
import registry import registry
from registry import event_handlers, request_handlers from registry import event_handlers, request_handlers
...@@ -101,10 +98,17 @@ def query(category = None, name = None, description = None, args = None): ...@@ -101,10 +98,17 @@ def query(category = None, name = None, description = None, args = None):
class MemoizeNotInCacheError(Exception): class MemoizeNotInCacheError(Exception):
""" Raised when using use_fromcache and the requested cache key is not in
the cache
"""
pass pass
class MemoizeAttributeError(Exception): class MemoizeAttributeError(Exception):
""" Raised when requesting to use one of a function's following attributes:
force_memoize, from_cache, clear_cache, but the function does not have the
requested attribute because it was not decorated by @memoize_query
"""
pass pass
...@@ -143,29 +147,19 @@ def use_clearcache(func): ...@@ -143,29 +147,19 @@ def use_clearcache(func):
raise MemoizeAttributeError("Function %s does not have attribute %s" % raise MemoizeAttributeError("Function %s does not have attribute %s" %
func.__name__, "clear_cache") func.__name__, "clear_cache")
# classes to ignore when creating a cache key def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = ()):
from pymongo.database import Database
import fs.osfs
from core.util import CacheHelper
import django.core.cache
DEFAULT_IGNORES = (Database, fs.osfs, CacheHelper, django.core.cache)
def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = DEFAULT_IGNORES):
''' Call function only if we do not have the results for its execution already ''' Call function only if we do not have the results for its execution already
We ignore parameters of type pymongo.database.Database and fs.osfs.OSFS. These
will be different per call, but function identically.
key_override: use this as a cache key instead of computing a key from the ignores: a list of classes to ignore when creating a cache key. Arguments
function signature. having a memoize_ignore attribute set to True are automatically ignored.
''' '''
# Helper functions # Helper functions
def isuseful(a): def isuseful(a):
if hasattr(a, 'memoize_ignore') and a.memoize_ignore is True: if hasattr(a, 'memoize_ignore') and a.memoize_ignore is True and not isinstance(a, ignores):
return False return False
return True return True
def make_cache_key(f, args, kwargs): def make_cache_key(f, args, kwargs):
""" """
Makes a cache key out of the function name and passed arguments Makes a cache key out of the function name and passed arguments
...@@ -190,12 +184,16 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = DEFAULT_IGNORES) ...@@ -190,12 +184,16 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = DEFAULT_IGNORES)
def compute_and_cache(f, key, args, kwargs): def compute_and_cache(f, key, args, kwargs):
""" """
Runs f and stores the results in cache Runs f and stores the results in cache
"""
# HACK: There's a slight race condition here, where we HACK: There's slight race condition here, where we might recompute twice
# might recompute twice. """
if cache.get(key) is None:
# While processing the request set the cache value to a unique
# string that cannot be mistaken for an actual result. Do this
# only if there was not cached result already -- this will allow
# callers to access the old cached value while we compute the new.
cache.set(key, 'Processing-14c44a51-31a6-4ba0-aed5-a52164ce4613', timeout)
cache.set(key, 'Processing', timeout)
function_argspec = inspect.getargspec(f) function_argspec = inspect.getargspec(f)
if function_argspec.varargs or function_argspec.args: if function_argspec.varargs or function_argspec.args:
if function_argspec.keywords: if function_argspec.keywords:
...@@ -215,7 +213,7 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = DEFAULT_IGNORES) ...@@ -215,7 +213,7 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = DEFAULT_IGNORES)
cached = cache.get(key) cached = cache.get(key)
# If we're already computing it, wait to finish # If we're already computing it, wait to finish
# computation # computation
while cached == 'Processing': while cached == 'Processing-14c44a51-31a6-4ba0-aed5-a52164ce4613':
cached = cache.get(key) cached = cache.get(key)
time.sleep(0.1) time.sleep(0.1)
# At this point, cached should be the result of the # At this point, cached should be the result of the
...@@ -226,31 +224,30 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = DEFAULT_IGNORES) ...@@ -226,31 +224,30 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = DEFAULT_IGNORES)
def factory(f): def factory(f):
def operationmode_default(f, *args, **kwargs):
def operation_mode_default(f, *args, **kwargs): """ Get he result from cache if possible, otherwise recompute
# Get he result from cache if possible, otherwise recompute and store in cache
# and store in cache """
key = make_cache_key(f, args, kwargs) key = make_cache_key(f, args, kwargs)
results = get_from_cache_if_possible(f, key) results = get_from_cache_if_possible(f, key)
if results: if results:
#print "Cache hit %s %s" % (f.__name__, key) # we got the results from cache, do not recompute
pass pass
else: else:
#print "Cache miss %s %s" % (f.__name__, key)
results = compute_and_cache(f,key, args, kwargs) results = compute_and_cache(f,key, args, kwargs)
return results return results
def operation_mode_forcememoize(*args, **kwargs): def operationmode_forcememoize(*args, **kwargs):
# Recompute and store in cache, regardless of whether key """ Recompute and store in cache, regardless of whether key
# is in cache. is in cache.
"""
key = make_cache_key(f, args, kwargs) key = make_cache_key(f, args, kwargs)
# print "Forcing memoize %s %s " % (f.__name__, key)
results = compute_and_cache(f, key, args, kwargs) results = compute_and_cache(f, key, args, kwargs)
return results return results
def operation_mode_fromcache(*args, **kwargs): def operationmode_fromcache(*args, **kwargs):
# Retrieve from cache if possible otherwise throw an exception """ Retrieve from cache if possible otherwise throw an exception
# print "Forcing retrieve %s %s " % (f.__name__, key) """
key = make_cache_key(f, args, kwargs) key = make_cache_key(f, args, kwargs)
results = get_from_cache_if_possible(f, key) results = get_from_cache_if_possible(f, key)
if not results: if not results:
...@@ -262,9 +259,9 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = DEFAULT_IGNORES) ...@@ -262,9 +259,9 @@ def memoize_query(cache_time = 60*4, timeout = 60*15, ignores = DEFAULT_IGNORES)
return cache.delete(key) return cache.delete(key)
decfun = decorator(operation_mode_default,f) decfun = decorator(operationmode_default,f)
decfun.force_memoize = operation_mode_forcememoize # activated by use_forcememoize decfun.force_memoize = operationmode_forcememoize # activated by use_forcememoize
decfun.from_cache = operation_mode_fromcache # activated by use_fromcache decfun.from_cache = operationmode_fromcache # activated by use_fromcache
decfun.clear_cache = operation_mode_clearcache decfun.clear_cache = operation_mode_clearcache
return decfun return decfun
return factory return factory
...@@ -284,27 +281,29 @@ def cron(run_every, force_memoize=False, params={}): ...@@ -284,27 +281,29 @@ def cron(run_every, force_memoize=False, params={}):
def factory(f): def factory(f):
@periodic_task(run_every=run_every, name=f.__name__) @periodic_task(run_every=run_every, name=f.__name__)
def run(func=None, *args, **kw): def run(func=None, *args, **kw):
# This function can be called from two distinct places. It can be """ Executes the function decorated by @cron
# called by the task scheduler (due to @periodic_task),
# in which case func will be None. This function can be called from two distinct places. It can be
# It can also be called as a result of calling the function we called by the task scheduler (due to @periodic_task),
# are currently decorating with @cron. In this case func will be in which case func will be None.
# the same as f.
It can also be called as a result of calling the function we
are currently decorating with @cron. In this case func will be
the same as f.
"""
# Was it called from the task scheduler? # Was it called from the task scheduler?
called_as_periodic = True if func is None else False called_as_periodic = True if func is None else False
if called_as_periodic: if called_as_periodic:
#print "called as periodic"
if force_memoize: if force_memoize:
func = use_forcememoize(f) func = use_forcememoize(f)
else: else:
func = f func = f
else: else:
#print "called from code"
func = f func = f
result = optional_parameter_call(func, default_optional_kwargs, params) result = optional_parameter_call(func, params)
return result return result
return decorator(run, f) return decorator(run, f)
......
...@@ -6,7 +6,7 @@ log=logging.getLogger(__name__) ...@@ -6,7 +6,7 @@ log=logging.getLogger(__name__)
event_handlers = [] event_handlers = []
request_handlers = {'view':{}, 'query':{}} request_handlers = {'view':{}, 'query':{}}
from edinsights.core.views import default_optional_kwargs from edinsights.core.util import default_optional_kwargs
funcskips = default_optional_kwargs.keys()+['params'] # params are additional GET/POST parameters funcskips = default_optional_kwargs.keys()+['params'] # params are additional GET/POST parameters
def register_handler(cls, category, name, description, f, args): def register_handler(cls, category, name, description, f, args):
...@@ -124,6 +124,6 @@ def handle_request(cls, name, **kwargs): ...@@ -124,6 +124,6 @@ def handle_request(cls, name, **kwargs):
else: else:
arglist = inspect.getargspec(handler).args arglist = inspect.getargspec(handler).args
from util import optional_parameter_call, default_optional_kwargs from util import optional_parameter_call
return optional_parameter_call(handler, default_optional_kwargs, kwargs, arglist) return optional_parameter_call(handler, kwargs, arglist)
...@@ -104,7 +104,14 @@ def get_query(f): ...@@ -104,7 +104,14 @@ def get_query(f):
return get_embed('query', config = embed_config) return get_embed('query', config = embed_config)
def optional_parameter_call(function, optional_kwargs, passed_kwargs, arglist = None): default_optional_kwargs = {'fs' : get_filesystem,
'mongodb' : get_mongo,
'cache' : get_cache,
# 'analytics' : get_djobject,
'view' : get_view,
'query' : get_query}
def optional_parameter_call(function, passed_kwargs, arglist = None):
''' Calls a function with parameters: ''' Calls a function with parameters:
passed_kwargs are input parameters the function must take. passed_kwargs are input parameters the function must take.
Format: Dictionary mapping keywords to arguments. Format: Dictionary mapping keywords to arguments.
...@@ -133,10 +140,12 @@ def optional_parameter_call(function, optional_kwargs, passed_kwargs, arglist = ...@@ -133,10 +140,12 @@ def optional_parameter_call(function, optional_kwargs, passed_kwargs, arglist =
for arg in arglist: for arg in arglist:
# This order is important for security. We don't want users # This order is important for security. We don't want users
# being able to pass in 'fs' or 'db' and having that take # being able to pass in 'fs' or 'db' and having that take
# precedence. # precedence.
if arg in optional_kwargs:
args[arg] = optional_kwargs[arg](function) global default_optional_kwargs
#ignore default arguments in memoize if arg in default_optional_kwargs:
args[arg] = default_optional_kwargs[arg](function)
#ignore default arguments in memoize when building cache key
args[arg].memoize_ignore = True args[arg].memoize_ignore = True
elif arg in passed_kwargs: elif arg in passed_kwargs:
args[arg] = passed_kwargs[arg] args[arg] = passed_kwargs[arg]
...@@ -144,10 +153,3 @@ def optional_parameter_call(function, optional_kwargs, passed_kwargs, arglist = ...@@ -144,10 +153,3 @@ def optional_parameter_call(function, optional_kwargs, passed_kwargs, arglist =
raise TypeError("Missing argument needed for handler ", arg) raise TypeError("Missing argument needed for handler ", arg)
return function(**args) return function(**args)
default_optional_kwargs = {'fs' : get_filesystem,
'mongodb' : get_mongo,
'cache' : get_cache,
# 'analytics' : get_djobject,
'view' : get_view,
'query' : get_query}
...@@ -13,7 +13,6 @@ from django.conf import settings ...@@ -13,7 +13,6 @@ from django.conf import settings
from djeventstream.signals import event_received from djeventstream.signals import event_received
from registry import event_handlers, request_handlers from registry import event_handlers, request_handlers
from util import default_optional_kwargs
import auth import auth
import util import util
...@@ -124,12 +123,12 @@ def handle_event(sender, **kwargs): ...@@ -124,12 +123,12 @@ def handle_event(sender, **kwargs):
if not batch: ## Message was a list of events, but handler cannot batch events if not batch: ## Message was a list of events, but handler cannot batch events
for event in msg: for event in msg:
try: try:
optional_parameter_call(event_func, default_optional_kwargs, {'events':[event]}) optional_parameter_call(event_func, {'events':[event]})
except: except:
handle_event_exception(e['function']) handle_event_exception(e['function'])
else: ## Message was a list of events, and handler can batch events else: ## Message was a list of events, and handler can batch events
try: try:
optional_parameter_call(event_func, default_optional_kwargs, {'events':msg}) optional_parameter_call(event_func, {'events':msg})
except: except:
handle_event_exception(e['function']) handle_event_exception(e['function'])
......
...@@ -14,13 +14,11 @@ def timestamp_to_tempfile(filename): ...@@ -14,13 +14,11 @@ def timestamp_to_tempfile(filename):
# methods(the support of @periodic_task for these is experimental) # methods(the support of @periodic_task for these is experimental)
# The @cron decorator should precede all other decorators # The @cron decorator should precede all other decorators
@cron(run_every=timedelta(seconds=1)) @cron(run_every=timedelta(seconds=1))
def test_cron_task(): def test_cron_task():
""" Simple task that gets executed by the scheduler (celery beat). """ Simple task that gets executed by the scheduler (celery beat).
tested by: tests.SimpleTest.test_cron tested by: tests.SimpleTest.test_cron
""" """
timestamp_to_tempfile('test_cron_task_counter') timestamp_to_tempfile('test_cron_task_counter')
...@@ -35,7 +33,6 @@ def test_cron_memoize_task(fs): ...@@ -35,7 +33,6 @@ def test_cron_memoize_task(fs):
tested by: tests.SimpleTest.test_cron_and_memoize tested by: tests.SimpleTest.test_cron_and_memoize
""" """
timestamp_to_tempfile('test_cron_memoize_task') timestamp_to_tempfile('test_cron_memoize_task')
return 42 return 42
......
...@@ -73,9 +73,7 @@ class SimpleTest(TestCase): ...@@ -73,9 +73,7 @@ class SimpleTest(TestCase):
truncate_tempfile('test_cron_memoize_task') truncate_tempfile('test_cron_memoize_task')
# clear the cache from any previous executions of this test # clear the cache from any previous executions of this test
# cache.delete('test_cron_memoize_unique_cache_key')
from tasks import test_cron_memoize_task from tasks import test_cron_memoize_task
use_clearcache(test_cron_memoize_task)() use_clearcache(test_cron_memoize_task)()
run_celery_beat(seconds=3,verbose=False) run_celery_beat(seconds=3,verbose=False)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment