Commit 5fa11047 by Usman Khalid

Wrapped Split MongoConnection in MongoProxy.

PLAT-71
parent 4a969f9f
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
Segregation of pymongo functions from the data modeling mechanisms for split modulestore. Segregation of pymongo functions from the data modeling mechanisms for split modulestore.
""" """
import re import re
from mongodb_proxy import autoretry_read, MongoProxy
import pymongo import pymongo
import time import time
...@@ -68,50 +69,28 @@ def structure_to_mongo(structure): ...@@ -68,50 +69,28 @@ def structure_to_mongo(structure):
return new_structure return new_structure
def autoretry_read(wait=0.1, retries=5):
"""
Automatically retry a read-only method in the case of a pymongo
AutoReconnect exception.
See http://emptysqua.re/blog/save-the-monkey-reliably-writing-to-mongodb/
for a discussion of this technique.
"""
def decorate(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
for attempt in xrange(retries):
try:
return fn(*args, **kwargs)
break
except AutoReconnect:
# Reraise if we failed on our last attempt
if attempt == retries - 1:
raise
if wait:
time.sleep(wait)
return wrapper
return decorate
class MongoConnection(object): class MongoConnection(object):
""" """
Segregation of pymongo functions from the data modeling mechanisms for split modulestore. Segregation of pymongo functions from the data modeling mechanisms for split modulestore.
""" """
def __init__( def __init__(
self, db, collection, host, port=27017, tz_aware=True, user=None, password=None, asset_collection=None, **kwargs self, db, collection, host, port=27017, tz_aware=True, user=None, password=None,
asset_collection=None, retry_wait_time=0.1, **kwargs
): ):
""" """
Create & open the connection, authenticate, and provide pointers to the collections Create & open the connection, authenticate, and provide pointers to the collections
""" """
self.database = pymongo.database.Database( self.database = MongoProxy(
pymongo.MongoClient( pymongo.database.Database(
host=host, pymongo.MongoClient(
port=port, host=host,
tz_aware=tz_aware, port=port,
**kwargs tz_aware=tz_aware,
**kwargs
),
db
), ),
db wait_time=retry_wait_time
) )
# Remove when adding official Split support for asset metadata storage. # Remove when adding official Split support for asset metadata storage.
...@@ -142,7 +121,6 @@ class MongoConnection(object): ...@@ -142,7 +121,6 @@ class MongoConnection(object):
else: else:
raise HeartbeatFailure("Can't connect to {}".format(self.database.name)) raise HeartbeatFailure("Can't connect to {}".format(self.database.name))
@autoretry_read()
def get_structure(self, key): def get_structure(self, key):
""" """
Get the structure from the persistence mechanism whose id is the given key Get the structure from the persistence mechanism whose id is the given key
...@@ -195,7 +173,6 @@ class MongoConnection(object): ...@@ -195,7 +173,6 @@ class MongoConnection(object):
""" """
self.structures.insert(structure_to_mongo(structure)) self.structures.insert(structure_to_mongo(structure))
@autoretry_read()
def get_course_index(self, key, ignore_case=False): def get_course_index(self, key, ignore_case=False):
""" """
Get the course_index from the persistence mechanism whose id is the given key Get the course_index from the persistence mechanism whose id is the given key
...@@ -212,7 +189,6 @@ class MongoConnection(object): ...@@ -212,7 +189,6 @@ class MongoConnection(object):
} }
return self.course_index.find_one(query) return self.course_index.find_one(query)
@autoretry_read()
def find_matching_course_indexes(self, branch=None, search_targets=None): def find_matching_course_indexes(self, branch=None, search_targets=None):
""" """
Find the course_index matching particular conditions. Find the course_index matching particular conditions.
...@@ -271,14 +247,12 @@ class MongoConnection(object): ...@@ -271,14 +247,12 @@ class MongoConnection(object):
'run': course_index['run'], 'run': course_index['run'],
}) })
@autoretry_read()
def get_definition(self, key): def get_definition(self, key):
""" """
Get the definition from the persistence mechanism whose id is the given key Get the definition from the persistence mechanism whose id is the given key
""" """
return self.definitions.find_one({'_id': key}) return self.definitions.find_one({'_id': key})
@autoretry_read()
def get_definitions(self, definitions): def get_definitions(self, definitions):
""" """
Retrieve all definitions listed in `definitions`. Retrieve all definitions listed in `definitions`.
......
...@@ -56,6 +56,7 @@ import datetime ...@@ -56,6 +56,7 @@ import datetime
import logging import logging
from contracts import contract, new_contract from contracts import contract, new_contract
from importlib import import_module from importlib import import_module
from mongodb_proxy import autoretry_read
from path import path from path import path
from pytz import UTC from pytz import UTC
from bson.objectid import ObjectId from bson.objectid import ObjectId
...@@ -72,7 +73,6 @@ from xmodule.modulestore.exceptions import InsufficientSpecificationError, Versi ...@@ -72,7 +73,6 @@ from xmodule.modulestore.exceptions import InsufficientSpecificationError, Versi
from xmodule.modulestore import ( from xmodule.modulestore import (
inheritance, ModuleStoreWriteBase, ModuleStoreEnum, BulkOpsRecord, BulkOperationsMixin inheritance, ModuleStoreWriteBase, ModuleStoreEnum, BulkOpsRecord, BulkOperationsMixin
) )
from xmodule.modulestore.mongodb_proxy import autoretry_read
from ..exceptions import ItemNotFoundError from ..exceptions import ItemNotFoundError
from .caching_descriptor_system import CachingDescriptorSystem from .caching_descriptor_system import CachingDescriptorSystem
......
...@@ -70,9 +70,9 @@ class SplitWMongoCourseBoostrapper(unittest.TestCase): ...@@ -70,9 +70,9 @@ class SplitWMongoCourseBoostrapper(unittest.TestCase):
Remove the test collections, close the db connection Remove the test collections, close the db connection
""" """
split_db = self.split_mongo.db split_db = self.split_mongo.db
split_db.drop_collection(split_db.course_index) split_db.drop_collection(split_db.course_index.proxied_object)
split_db.drop_collection(split_db.structures) split_db.drop_collection(split_db.structures.proxied_object)
split_db.drop_collection(split_db.definitions) split_db.drop_collection(split_db.definitions.proxied_object)
def tear_down_mongo(self): def tear_down_mongo(self):
""" """
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment