Commit 74e70119 by John Eskew

Use a common MongoDB connection function.

Add some missing @autoretry_read() decorators.
Change to PyMongo 3.x-compatible syntax.
parent 86227e77
"""
MongoDB/GridFS-level code for the contentstore.
"""
import os
import json
import pymongo import pymongo
import gridfs import gridfs
from gridfs.errors import NoFile from gridfs.errors import NoFile
from xmodule.contentstore.content import XASSET_LOCATION_TAG
import logging
from .content import StaticContent, ContentStore, StaticContentStream
from xmodule.exceptions import NotFoundError
from fs.osfs import OSFS from fs.osfs import OSFS
import os
import json
from bson.son import SON from bson.son import SON
from mongodb_proxy import autoretry_read
from opaque_keys.edx.keys import AssetKey from opaque_keys.edx.keys import AssetKey
from xmodule.contentstore.content import XASSET_LOCATION_TAG
from xmodule.exceptions import NotFoundError
from xmodule.modulestore.django import ASSET_IGNORE_REGEX from xmodule.modulestore.django import ASSET_IGNORE_REGEX
from xmodule.util.misc import escape_invalid_characters from xmodule.util.misc import escape_invalid_characters
from xmodule.mongo_connection import connect_to_mongodb
from .content import StaticContent, ContentStore, StaticContentStream
class MongoContentStore(ContentStore): class MongoContentStore(ContentStore):
"""
# pylint: disable=unused-argument MongoDB-backed ContentStore.
def __init__(self, host, db, port=27017, user=None, password=None, bucket='fs', collection=None, **kwargs): """
# pylint: disable=unused-argument, bad-continuation
def __init__(
self, host, db,
port=27017, tz_aware=True, user=None, password=None, bucket='fs', collection=None, **kwargs
):
""" """
Establish the connection with the mongo backend and connect to the collections Establish the connection with the mongo backend and connect to the collections
:param collection: ignores but provided for consistency w/ other doc_store_config patterns :param collection: ignores but provided for consistency w/ other doc_store_config patterns
""" """
logging.debug('Using MongoDB for static content serving at host={0} port={1} db={2}'.format(host, port, db)) # GridFS will throw an exception if the Database is wrapped in a MongoProxy. So don't wrap it.
# The appropriate methods below are marked as autoretry_read - those methods will handle
# Remove the replicaSet parameter. # the AutoReconnect errors.
kwargs.pop('replicaSet', None) proxy = False
mongo_db = connect_to_mongodb(
_db = pymongo.database.Database( db, host,
pymongo.MongoClient( port=port, tz_aware=tz_aware, user=user, password=password, proxy=proxy, **kwargs
host=host,
port=port,
document_class=dict,
**kwargs
),
db
) )
if user is not None and password is not None: self.fs = gridfs.GridFS(mongo_db, bucket) # pylint: disable=invalid-name
_db.authenticate(user, password)
self.fs = gridfs.GridFS(_db, bucket)
self.fs_files = _db[bucket + ".files"] # the underlying collection GridFS uses self.fs_files = mongo_db[bucket + ".files"] # the underlying collection GridFS uses
def close_connections(self): def close_connections(self):
""" """
...@@ -86,11 +84,15 @@ class MongoContentStore(ContentStore): ...@@ -86,11 +84,15 @@ class MongoContentStore(ContentStore):
return content return content
def delete(self, location_or_id): def delete(self, location_or_id):
"""
Delete an asset.
"""
if isinstance(location_or_id, AssetKey): if isinstance(location_or_id, AssetKey):
location_or_id, _ = self.asset_db_key(location_or_id) location_or_id, _ = self.asset_db_key(location_or_id)
# Deletes of non-existent files are considered successful # Deletes of non-existent files are considered successful
self.fs.delete(location_or_id) self.fs.delete(location_or_id)
@autoretry_read()
def find(self, location, throw_on_not_found=True, as_stream=False): def find(self, location, throw_on_not_found=True, as_stream=False):
content_id, __ = self.asset_db_key(location) content_id, __ = self.asset_db_key(location)
...@@ -206,6 +208,7 @@ class MongoContentStore(ContentStore): ...@@ -206,6 +208,7 @@ class MongoContentStore(ContentStore):
self.fs_files.remove(query) self.fs_files.remove(query)
return assets_to_delete return assets_to_delete
@autoretry_read()
def _get_all_content_for_course(self, def _get_all_content_for_course(self,
course_key, course_key,
get_thumbnails=False, get_thumbnails=False,
...@@ -288,6 +291,7 @@ class MongoContentStore(ContentStore): ...@@ -288,6 +291,7 @@ class MongoContentStore(ContentStore):
if not result.get('updatedExisting', True): if not result.get('updatedExisting', True):
raise NotFoundError(asset_db_key) raise NotFoundError(asset_db_key)
@autoretry_read()
def get_attrs(self, location): def get_attrs(self, location):
""" """
Gets all of the attributes associated with the given asset. Note, returns even built in attrs Gets all of the attributes associated with the given asset. Note, returns even built in attrs
......
...@@ -22,7 +22,7 @@ from uuid import uuid4 ...@@ -22,7 +22,7 @@ from uuid import uuid4
from bson.son import SON from bson.son import SON
from datetime import datetime from datetime import datetime
from fs.osfs import OSFS from fs.osfs import OSFS
from mongodb_proxy import MongoProxy, autoretry_read from mongodb_proxy import autoretry_read
from path import Path as path from path import Path as path
from pytz import UTC from pytz import UTC
from contracts import contract, new_contract from contracts import contract, new_contract
...@@ -43,6 +43,7 @@ from xmodule.error_module import ErrorDescriptor ...@@ -43,6 +43,7 @@ from xmodule.error_module import ErrorDescriptor
from xmodule.errortracker import null_error_tracker, exc_info_to_str from xmodule.errortracker import null_error_tracker, exc_info_to_str
from xmodule.exceptions import HeartbeatFailure from xmodule.exceptions import HeartbeatFailure
from xmodule.mako_module import MakoDescriptorSystem from xmodule.mako_module import MakoDescriptorSystem
from xmodule.mongo_connection import connect_to_mongodb
from xmodule.modulestore import ModuleStoreWriteBase, ModuleStoreEnum, BulkOperationsMixin, BulkOpsRecord from xmodule.modulestore import ModuleStoreWriteBase, ModuleStoreEnum, BulkOperationsMixin, BulkOpsRecord
from xmodule.modulestore.draft_and_published import ModuleStoreDraftAndPublished, DIRECT_ONLY_CATEGORIES from xmodule.modulestore.draft_and_published import ModuleStoreDraftAndPublished, DIRECT_ONLY_CATEGORIES
from xmodule.modulestore.edit_info import EditInfoRuntimeMixin from xmodule.modulestore.edit_info import EditInfoRuntimeMixin
...@@ -558,22 +559,16 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo ...@@ -558,22 +559,16 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo
""" """
Create & open the connection, authenticate, and provide pointers to the collection Create & open the connection, authenticate, and provide pointers to the collection
""" """
# Remove the replicaSet parameter. # Set a write concern of 1, which makes writes complete successfully to the primary
kwargs.pop('replicaSet', None) # only before returning. Also makes pymongo report write errors.
kwargs['w'] = 1
self.database = MongoProxy( self.database = connect_to_mongodb(
pymongo.database.Database( db, host,
pymongo.MongoClient( port=port, tz_aware=tz_aware, user=user, password=password,
host=host, retry_wait_time=retry_wait_time, **kwargs
port=port,
tz_aware=tz_aware,
document_class=dict,
**kwargs
),
db
),
wait_time=retry_wait_time
) )
self.collection = self.database[collection] self.collection = self.database[collection]
# Collection which stores asset metadata. # Collection which stores asset metadata.
...@@ -581,14 +576,8 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo ...@@ -581,14 +576,8 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo
asset_collection = self.DEFAULT_ASSET_COLLECTION_NAME asset_collection = self.DEFAULT_ASSET_COLLECTION_NAME
self.asset_collection = self.database[asset_collection] self.asset_collection = self.database[asset_collection]
if user is not None and password is not None:
self.database.authenticate(user, password)
do_connection(**doc_store_config) do_connection(**doc_store_config)
# Force mongo to report errors, at the expense of performance
self.collection.write_concern = {'w': 1}
if default_class is not None: if default_class is not None:
module_path, _, class_name = default_class.rpartition('.') module_path, _, class_name = default_class.rpartition('.')
class_ = getattr(import_module(module_path), class_name) class_ = getattr(import_module(module_path), class_name)
...@@ -1012,6 +1001,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo ...@@ -1012,6 +1001,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo
) )
return [course for course in base_list if not isinstance(course, ErrorDescriptor)] return [course for course in base_list if not isinstance(course, ErrorDescriptor)]
@autoretry_read()
def _find_one(self, location): def _find_one(self, location):
'''Look for a given location in the collection. If the item is not present, raise '''Look for a given location in the collection. If the item is not present, raise
ItemNotFoundError. ItemNotFoundError.
...@@ -1052,6 +1042,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo ...@@ -1052,6 +1042,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo
except ItemNotFoundError: except ItemNotFoundError:
return None return None
@autoretry_read()
def has_course(self, course_key, ignore_case=False, **kwargs): def has_course(self, course_key, ignore_case=False, **kwargs):
""" """
Returns the course_id of the course if it was found, else None Returns the course_id of the course if it was found, else None
...@@ -1073,7 +1064,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo ...@@ -1073,7 +1064,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo
course_query[key] = re.compile(r"(?i)^{}$".format(course_query[key])) course_query[key] = re.compile(r"(?i)^{}$".format(course_query[key]))
else: else:
course_query = {'_id': location.to_deprecated_son()} course_query = {'_id': location.to_deprecated_son()}
course = self.collection.find_one(course_query, fields={'_id': True}) course = self.collection.find_one(course_query, projection={'_id': True})
if course: if course:
return SlashSeparatedCourseKey(course['_id']['org'], course['_id']['course'], course['_id']['name']) return SlashSeparatedCourseKey(course['_id']['org'], course['_id']['course'], course['_id']['name'])
else: else:
...@@ -1234,7 +1225,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo ...@@ -1234,7 +1225,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo
('_id.course', re.compile(u'^{}$'.format(course_id.course), re.IGNORECASE)), ('_id.course', re.compile(u'^{}$'.format(course_id.course), re.IGNORECASE)),
('_id.category', 'course'), ('_id.category', 'course'),
]) ])
courses = self.collection.find(course_search_location, fields=('_id')) courses = self.collection.find(course_search_location, projection={'_id': True})
if courses.count() > 0: if courses.count() > 0:
raise DuplicateCourseError(course_id, courses[0]['_id']) raise DuplicateCourseError(course_id, courses[0]['_id'])
......
...@@ -23,10 +23,11 @@ except ImportError: ...@@ -23,10 +23,11 @@ except ImportError:
import dogstats_wrapper as dog_stats_api import dogstats_wrapper as dog_stats_api
from contracts import check, new_contract from contracts import check, new_contract
from mongodb_proxy import autoretry_read, MongoProxy from mongodb_proxy import autoretry_read
from xmodule.exceptions import HeartbeatFailure from xmodule.exceptions import HeartbeatFailure
from xmodule.modulestore import BlockData from xmodule.modulestore import BlockData
from xmodule.modulestore.split_mongo import BlockKey from xmodule.modulestore.split_mongo import BlockKey
from xmodule.mongo_connection import connect_to_mongodb
new_contract('BlockData', BlockData) new_contract('BlockData', BlockData)
...@@ -287,37 +288,20 @@ class MongoConnection(object): ...@@ -287,37 +288,20 @@ class MongoConnection(object):
""" """
Create & open the connection, authenticate, and provide pointers to the collections Create & open the connection, authenticate, and provide pointers to the collections
""" """
if kwargs.get('replicaSet') is None: # Set a write concern of 1, which makes writes complete successfully to the primary
kwargs.pop('replicaSet', None) # only before returning. Also makes pymongo report write errors.
mongo_class = pymongo.MongoClient kwargs['w'] = 1
else:
mongo_class = pymongo.MongoReplicaSetClient
_client = mongo_class(
host=host,
port=port,
tz_aware=tz_aware,
**kwargs
)
self.database = MongoProxy(
pymongo.database.Database(_client, db),
wait_time=retry_wait_time
)
if user is not None and password is not None: self.database = connect_to_mongodb(
self.database.authenticate(user, password) db, host,
port=port, tz_aware=tz_aware, user=user, password=password,
retry_wait_time=retry_wait_time, **kwargs
)
self.course_index = self.database[collection + '.active_versions'] self.course_index = self.database[collection + '.active_versions']
self.structures = self.database[collection + '.structures'] self.structures = self.database[collection + '.structures']
self.definitions = self.database[collection + '.definitions'] self.definitions = self.database[collection + '.definitions']
# every app has write access to the db (v having a flag to indicate r/o v write)
# Force mongo to report errors, at the expense of performance
# pymongo docs suck but explanation:
# http://api.mongodb.org/java/2.10.1/com/mongodb/WriteConcern.html
self.course_index.write_concern = {'w': 1}
self.structures.write_concern = {'w': 1}
self.definitions.write_concern = {'w': 1}
def heartbeat(self): def heartbeat(self):
""" """
Check that the db is reachable. Check that the db is reachable.
......
"""
Common MongoDB connection functions.
"""
import pymongo
from mongodb_proxy import MongoProxy
# pylint: disable=bad-continuation
def connect_to_mongodb(
db, host,
port=27017, tz_aware=True, user=None, password=None,
retry_wait_time=0.1, proxy=True, **kwargs
):
"""
Returns a MongoDB Database connection, optionally wrapped in a proxy. The proxy
handles AutoReconnect errors by retrying read operations, since these exceptions
typically indicate a temporary step-down condition for MongoDB.
"""
# The MongoReplicaSetClient class is deprecated in Mongo 3.x, in favor of using
# the MongoClient class for all connections. Update/simplify this code when using
# PyMongo 3.x.
if kwargs.get('replicaSet'):
# Enable reading from secondary nodes in the MongoDB replicaset by using the
# MongoReplicaSetClient class.
# The 'replicaSet' parameter in kwargs is required for secondary reads.
# The read_preference should be set to a proper value, like SECONDARY_PREFERRED.
mongo_client_class = pymongo.MongoReplicaSetClient
else:
# No 'replicaSet' in kwargs - so no secondary reads.
mongo_client_class = pymongo.MongoClient
mongo_conn = pymongo.database.Database(
mongo_client_class(
host=host,
port=port,
tz_aware=tz_aware,
document_class=dict,
**kwargs
),
db
)
if proxy:
mongo_conn = MongoProxy(
mongo_conn,
wait_time=retry_wait_time
)
# If credentials were provided, authenticate the user.
if user is not None and password is not None:
mongo_conn.authenticate(user, password)
return mongo_conn
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment