Commit 89485dfa by Usman Khalid

Merge pull request #1 from edx/usman/convert-to-decorator

Replaces Executable class with autoretry_read decorator.
parents 63205d28 25706879
...@@ -14,127 +14,107 @@ Copyright 2013 Gustav Arngarden ...@@ -14,127 +14,107 @@ Copyright 2013 Gustav Arngarden
limitations under the License. limitations under the License.
""" """
import time from functools import wraps
import logging
import pymongo import pymongo
import time
log = logging.getLogger(__name__)
def get_methods(*objs): MONGO_METHODS_NEEDING_RETRY = {
return set( pymongo.collection.Collection: [
attr 'aggregate', 'ensure_index', 'find', 'group', 'inline_map_reduce', 'map_reduce', 'parallel_scan'
for obj in objs ],
for attr in dir(obj) }
if not attr.startswith('_')
and hasattr(getattr(obj, attr), '__call__')
) def autoretry_read(wait=0.1, tries=5):
try:
# will fail to import from older versions of pymongo
from pymongo import MongoClient, MongoReplicaSetClient
except ImportError:
MongoClient, MongoReplicaSetClient = None, None
EXECUTABLE_MONGO_METHODS = get_methods(pymongo.collection.Collection,
pymongo.database.Database,
pymongo.Connection,
pymongo.ReplicaSetConnection,
MongoClient, MongoReplicaSetClient,
pymongo)
class Executable:
""" Wrap a MongoDB-method and handle AutoReconnect-exceptions
using the safe_mongocall decorator.
""" """
Automatically retry a read-only method in the case of a pymongo
AutoReconnect exception.
def __init__(self, method, logger, wait_time=None): This decorator can/should be used around methods which iterate mongo cursors.
self.method = method
self.logger = logger
self.wait_time = wait_time or 60
def __call__(self, *args, **kwargs): See http://emptysqua.re/blog/save-the-monkey-reliably-writing-to-mongodb/
""" Automatic handling of AutoReconnect-exceptions. for a discussion of this technique.
""" """
start = time.time() def decorate(func): # pylint: disable=missing-docstring
i = 0 @wraps(func)
while True: def wrapper(*args, **kwargs): # pylint: disable=missing-docstring
for attempt in xrange(tries):
try: try:
return self.method(*args, **kwargs) return func(*args, **kwargs)
except pymongo.errors.AutoReconnect: except pymongo.errors.AutoReconnect:
end = time.time() log.exception('Attempt {0}'.format(attempt))
delta = end - start # Reraise if we failed on our last attempt
if delta >= self.wait_time: if attempt == tries - 1:
break raise
self.logger.warning('AutoReconnecting, try %d (%.1f seconds)'
% (i, delta))
time.sleep(pow(2, i))
i += 1
# Try one more time, but this time, if it fails, let the
# exception bubble up to the caller.
return self.method(*args, **kwargs)
def __dir__(self): if wait:
return dir(self.method) time.sleep(wait)
return wrapper
def __str__(self): return decorate
return self.method.__str__()
def __repr__(self):
return self.method.__repr__()
class MongoProxy: class MongoProxy:
""" Proxy for MongoDB connection. """
Proxy for MongoDB connection.
Methods that are executable, i.e find, insert etc, get wrapped in an Methods that are executable, i.e find, insert etc, get wrapped in an
Executable-instance that handles AutoReconnect-exceptions transparently. Executable-instance that handles AutoReconnect-exceptions transparently.
""" """
def __init__(self, conn, logger=None, wait_time=None): def __init__(self, proxied_object, wait_time=None, methods_needing_retry=None):
""" conn is an ordinary MongoDB-connection.
""" """
if logger is None: proxied_object is an ordinary MongoDB-connection.
import logging """
logger = logging.getLogger(__name__) self.proxied_object = proxied_object
self.conn = conn
self.logger = logger
self.wait_time = wait_time self.wait_time = wait_time
self.methods_needing_retry = methods_needing_retry or MONGO_METHODS_NEEDING_RETRY
def __getitem__(self, key): def __getitem__(self, key):
""" Create and return proxy around the method in the connection
named "key".
""" """
item = self.conn[key] Create and return proxy around attribute "key" if it is a method.
Otherwise just return the attribute.
"""
item = self.proxied_object[key]
if hasattr(item, '__call__'): if hasattr(item, '__call__'):
return MongoProxy(item, self.logger, self.wait_time) return MongoProxy(item, self.wait_time)
return item return item
def __getattr__(self, key): def __setitem__(self, key, value):
""" If key is the name of an executable method in the MongoDB connection, self.proxied_object[key] = value
for instance find or insert, wrap this method in Executable-class that
handles AutoReconnect-Exception.
""" def __delitem__(self, key):
del self.proxied_object[key]
def __len__(self):
return len(self.proxied_object)
attr = getattr(self.conn, key) def __getattr__(self, key):
"""
If key is the name of an executable method in the MongoDB connection,
for instance find, wrap this method in the autoretry_read decorator
that handles AutoReconnect exceptions. Otherwise wrap it in a MongoProxy object.
"""
attr = getattr(self.proxied_object, key)
if hasattr(attr, '__call__'): if hasattr(attr, '__call__'):
if key in EXECUTABLE_MONGO_METHODS: attributes_for_class = self.methods_needing_retry.get(self.proxied_object.__class__, [])
return Executable(attr, self.logger, self.wait_time) if key in attributes_for_class:
return autoretry_read(self.wait_time)(attr)
else: else:
return MongoProxy(attr, self.logger, self.wait_time) return MongoProxy(attr, self.wait_time)
return attr return attr
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
return self.conn(*args, **kwargs) return self.proxied_object(*args, **kwargs)
def __dir__(self): def __dir__(self):
return dir(self.conn) return dir(self.proxied_object)
def __str__(self): def __str__(self):
return self.conn.__str__() return self.proxied_object.__str__()
def __repr__(self): def __repr__(self):
return self.conn.__repr__() return self.proxied_object.__repr__()
def __nonzero__(self): def __nonzero__(self):
return True return True
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment