Commit cb113dea by Don Mitchell

Separate all db ops from modulestore ops

parent bc4ebfdc
......@@ -88,7 +88,7 @@ class SplitMigrator(object):
index_info = self.split_modulestore.get_course_index_info(course_version_locator)
versions = index_info['versions']
versions['draft'] = versions['published']
self.split_modulestore.update_course_index(course_version_locator, {'versions': versions}, update_versions=True)
self.split_modulestore.update_course_index(index_info)
# clean up orphans in published version: in old mongo, parents pointed to the union of their published and draft
# children which meant some pointers were to non-existent locations in 'direct'
......
......@@ -22,5 +22,4 @@ class DefinitionLazyLoader(object):
Fetch the definition. Note, the caller should replace this lazy
loader pointer with the result so as not to fetch more than once
"""
return self.modulestore.definitions.find_one(
{'_id': self.definition_locator.definition_id})
return self.modulestore.db_connection.get_definition(self.definition_locator.definition_id)
"""
Segregation of pymongo functions from the data modeling mechanisms for split modulestore.
"""
import pymongo
class MongoConnection(object):
"""
Segregation of pymongo functions from the data modeling mechanisms for split modulestore.
"""
def __init__(
self, db, collection, host, port=27017, tz_aware=True, user=None, password=None, **kwargs
):
"""
Create & open the connection, authenticate, and provide pointers to the collections
"""
self.database = pymongo.database.Database(
pymongo.MongoClient(
host=host,
port=port,
tz_aware=tz_aware,
**kwargs
),
db
)
if user is not None and password is not None:
self.database.authenticate(user, password)
self.course_index = self.database[collection + '.active_versions']
self.structures = self.database[collection + '.structures']
self.definitions = self.database[collection + '.definitions']
# every app has write access to the db (v having a flag to indicate r/o v write)
# Force mongo to report errors, at the expense of performance
# pymongo docs suck but explanation:
# http://api.mongodb.org/java/2.10.1/com/mongodb/WriteConcern.html
self.course_index.write_concern = {'w': 1}
self.structures.write_concern = {'w': 1}
self.definitions.write_concern = {'w': 1}
def get_structure(self, key):
"""
Get the structure from the persistence mechanism whose id is the given key
"""
return self.structures.find_one({'_id': key})
def find_matching_structures(self, query):
"""
Find the structure matching the query. Right now the query must be a legal mongo query
:param query: a mongo-style query of {key: [value|{$in ..}|..], ..}
"""
return self.structures.find(query)
def insert_structure(self, structure):
"""
Create the structure in the db
"""
self.structures.insert(structure)
def update_structure(self, structure):
"""
Update the db record for structure
"""
self.structures.update({'_id': structure['_id']}, structure)
def get_course_index(self, key):
"""
Get the course_index from the persistence mechanism whose id is the given key
"""
return self.course_index.find_one({'_id': key})
def find_matching_course_indexes(self, query):
"""
Find the course_index matching the query. Right now the query must be a legal mongo query
:param query: a mongo-style query of {key: [value|{$in ..}|..], ..}
"""
return self.course_index.find(query)
def insert_course_index(self, course_index):
"""
Create the course_index in the db
"""
self.course_index.insert(course_index)
def update_course_index(self, course_index):
"""
Update the db record for course_index
"""
self.course_index.update({'_id': course_index['_id']}, course_index)
def delete_course_index(self, key):
"""
Delete the course_index from the persistence mechanism whose id is the given key
"""
return self.course_index.remove({'_id': key})
def get_definition(self, key):
"""
Get the definition from the persistence mechanism whose id is the given key
"""
return self.definitions.find_one({'_id': key})
def find_matching_definitions(self, query):
"""
Find the definitions matching the query. Right now the query must be a legal mongo query
:param query: a mongo-style query of {key: [value|{$in ..}|..], ..}
"""
return self.definitions.find(query)
def insert_definition(self, definition):
"""
Create the definition in the db
"""
self.definitions.insert(definition)
import threading
import datetime
import logging
import pymongo
import re
from importlib import import_module
from path import path
......@@ -21,6 +20,7 @@ from .caching_descriptor_system import CachingDescriptorSystem
from xblock.fields import Scope
from xblock.runtime import Mixologist
from bson.objectid import ObjectId
from xmodule.modulestore.split_mongo.mongo_connection import MongoConnection
log = logging.getLogger(__name__)
#==============================================================================
......@@ -49,7 +49,6 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
A Mongodb backed ModuleStore supporting versions, inheritance,
and sharing.
"""
# pylint: disable=W0201
def __init__(self, doc_store_config, fs_root, render_template,
default_class=None,
error_tracker=null_error_tracker,
......@@ -62,44 +61,13 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
super(SplitMongoModuleStore, self).__init__(**kwargs)
self.loc_mapper = loc_mapper
def do_connection(
db, collection, host, port=27017, tz_aware=True, user=None, password=None, **kwargs
):
"""
Create & open the connection, authenticate, and provide pointers to the collections
"""
self.db = pymongo.database.Database(
pymongo.MongoClient(
host=host,
port=port,
tz_aware=tz_aware,
**kwargs
),
db
)
if user is not None and password is not None:
self.db.authenticate(user, password)
self.course_index = self.db[collection + '.active_versions']
self.structures = self.db[collection + '.structures']
self.definitions = self.db[collection + '.definitions']
do_connection(**doc_store_config)
self.db_connection = MongoConnection(**doc_store_config)
self.db = self.db_connection.database
# Code review question: How should I expire entries?
# _add_cache could use a lru mechanism to control the cache size?
self.thread_cache = threading.local()
# every app has write access to the db (v having a flag to indicate r/o v write)
# Force mongo to report errors, at the expense of performance
# pymongo docs suck but explanation:
# http://api.mongodb.org/java/2.10.1/com/mongodb/WriteConcern.html
self.course_index.write_concern = {'w': 1}
self.structures.write_concern = {'w': 1}
self.definitions.write_concern = {'w': 1}
if default_class is not None:
module_path, _, class_name = default_class.rpartition('.')
class_ = getattr(import_module(module_path), class_name)
......@@ -138,7 +106,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
block['definition'] = DefinitionLazyLoader(self, block['definition'])
else:
# Load all descendants by id
descendent_definitions = self.definitions.find({
descendent_definitions = self.db_connection.find_matching_definitions({
'_id': {'$in': [block['definition']
for block in new_module_data.itervalues()]}})
# turn into a map
......@@ -226,7 +194,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
if course_locator.course_id is not None and course_locator.branch is not None:
# use the course_id
index = self.course_index.find_one({'_id': course_locator.course_id})
index = self.db_connection.get_course_index(course_locator.course_id)
if index is None:
raise ItemNotFoundError(course_locator)
if course_locator.branch not in index['versions']:
......@@ -241,7 +209,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
# cast string to ObjectId if necessary
version_guid = course_locator.as_object_id(version_guid)
entry = self.structures.find_one({'_id': version_guid})
entry = self.db_connection.get_structure(version_guid)
# b/c more than one course can use same structure, the 'course_id' and 'branch' are not intrinsic to structure
# and the one assoc'd w/ it by another fetch may not be the one relevant to this fetch; so,
......@@ -269,7 +237,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
if qualifiers is None:
qualifiers = {}
qualifiers.update({"versions.{}".format(branch): {"$exists": True}})
matching = self.course_index.find(qualifiers)
matching = self.db_connection.find_matching_course_indexes(qualifiers)
# collect ids and then query for those
version_guids = []
......@@ -279,7 +247,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
version_guids.append(version_guid)
id_version_map[version_guid] = structure['_id']
course_entries = self.structures.find({'_id': {'$in': version_guids}})
course_entries = self.db_connection.find_matching_structures({'_id': {'$in': version_guids}})
# get the block for the course element (s/b the root)
result = []
......@@ -455,7 +423,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
"""
if course_locator.course_id is None:
return None
index = self.course_index.find_one({'_id': course_locator.course_id})
index = self.db_connection.get_course_index(course_locator.course_id)
return index
# TODO figure out a way to make this info accessible from the course descriptor
......@@ -487,7 +455,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
'edited_on': when the change was made
}
"""
definition = self.definitions.find_one({'_id': definition_locator.definition_id})
definition = self.db_connection.get_definition(definition_locator.definition_id)
if definition is None:
return None
return definition['edit_info']
......@@ -509,14 +477,14 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
# TODO if depth is significant, it may make sense to get all that have the same original_version
# and reconstruct the subtree from version_guid
next_entries = self.structures.find({'previous_version' : version_guid})
next_entries = self.db_connection.find_matching_structures({'previous_version' : version_guid})
# must only scan cursor's once
next_versions = [struct for struct in next_entries]
result = {version_guid: [CourseLocator(version_guid=struct['_id']) for struct in next_versions]}
depth = 1
while depth < version_history_depth and len(next_versions) > 0:
depth += 1
next_entries = self.structures.find({'previous_version':
next_entries = self.db_connection.find_matching_structures({'previous_version':
{'$in': [struct['_id'] for struct in next_versions]}})
next_versions = [struct for struct in next_entries]
for course_structure in next_versions:
......@@ -537,7 +505,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
course_struct = self._lookup_course(block_locator.version_agnostic())['structure']
usage_id = block_locator.usage_id
update_version_field = 'blocks.{}.edit_info.update_version'.format(usage_id)
all_versions_with_block = self.structures.find({'original_version': course_struct['original_version'],
all_versions_with_block = self.db_connection.find_matching_structures({'original_version': course_struct['original_version'],
update_version_field: {'$exists': True}})
# find (all) root versions and build map previous: [successors]
possible_roots = []
......@@ -596,7 +564,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
"original_version": new_id,
}
}
self.definitions.insert(document)
self.db_connection.insert_definition(document)
definition_locator = DefinitionLocator(new_id)
return definition_locator
......@@ -618,7 +586,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
# if this looks in cache rather than fresh fetches, then it will probably not detect
# actual change b/c the descriptor and cache probably point to the same objects
old_definition = self.definitions.find_one({'_id': definition_locator.definition_id})
old_definition = self.db_connection.get_definition(definition_locator.definition_id)
if old_definition is None:
raise ItemNotFoundError(definition_locator.url())
......@@ -630,7 +598,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
old_definition['edit_info']['edited_on'] = datetime.datetime.now(UTC)
# previous version id
old_definition['edit_info']['previous_version'] = definition_locator.definition_id
self.definitions.insert(old_definition)
self.db_connection.insert_definition(old_definition)
return DefinitionLocator(old_definition['_id']), True
else:
return definition_locator, False
......@@ -657,7 +625,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
:param course_blocks: the current list of blocks.
:param category:
"""
existing_uses = self.course_index.find({"_id": {"$regex": id_root}})
existing_uses = self.db_connection.find_matching_course_indexes({"_id": {"$regex": id_root}})
if existing_uses.count() > 0:
max_found = 0
matcher = re.compile(id_root + r'(\d+)')
......@@ -779,11 +747,11 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
parent['edit_info']['update_version'] = new_id
if continue_version:
# db update
self.structures.update({'_id': new_id}, new_structure)
self.db_connection.update_structure(new_structure)
# clear cache so things get refetched and inheritance recomputed
self._clear_cache(new_id)
else:
self.structures.insert(new_structure)
self.db_connection.insert_structure(new_structure)
# update the index entry if appropriate
if index_entry is not None:
......@@ -856,7 +824,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
'original_version': definition_id,
}
}
self.definitions.insert(definition_entry)
self.db_connection.insert_definition(definition_entry)
new_id = ObjectId()
draft_structure = {
......@@ -880,7 +848,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
}
}
}
self.structures.insert(draft_structure)
self.db_connection.insert_structure(draft_structure)
if versions_dict is None:
versions_dict = {master_branch: new_id}
......@@ -898,20 +866,20 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
if block_fields is not None:
root_block['fields'].update(block_fields)
if definition_fields is not None:
definition = self.definitions.find_one({'_id': root_block['definition']})
definition = self.db_connection.get_definition(root_block['definition'])
definition['fields'].update(definition_fields)
definition['edit_info']['previous_version'] = definition['_id']
definition['edit_info']['edited_by'] = user_id
definition['edit_info']['edited_on'] = datetime.datetime.now(UTC)
definition['_id'] = ObjectId()
self.definitions.insert(definition)
self.db_connection.insert_definition(definition)
root_block['definition'] = definition['_id']
root_block['edit_info']['edited_on'] = datetime.datetime.now(UTC)
root_block['edit_info']['edited_by'] = user_id
root_block['edit_info']['previous_version'] = root_block['edit_info'].get('update_version')
root_block['edit_info']['update_version'] = new_id
self.structures.insert(draft_structure)
self.db_connection.insert_structure(draft_structure)
versions_dict[master_branch] = new_id
# create the index entry
......@@ -926,7 +894,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
'edited_by': user_id,
'edited_on': datetime.datetime.now(UTC),
'versions': versions_dict}
self.course_index.insert(index_entry)
self.db_connection.insert_course_index(index_entry)
return self.get_course(CourseLocator(course_id=new_id, branch=master_branch))
def update_item(self, descriptor, user_id, force=False):
......@@ -978,7 +946,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
'previous_version': block_data['edit_info']['update_version'],
'update_version': new_id,
}
self.structures.insert(new_structure)
self.db_connection.insert_structure(new_structure)
# update the index entry if appropriate
if index_entry is not None:
self._update_head(index_entry, descriptor.location.branch, new_id)
......@@ -1016,7 +984,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
is_updated = self._persist_subdag(xblock, user_id, new_structure['blocks'], new_id)
if is_updated:
self.structures.insert(new_structure)
self.db_connection.insert_structure(new_structure)
# update the index entry if appropriate
if index_entry is not None:
......@@ -1115,31 +1083,18 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
'''Deprecated, use update_item.'''
raise NotImplementedError('use update_item')
def update_course_index(self, course_locator, new_values_dict, update_versions=False):
def update_course_index(self, updated_index_entry):
"""
Change the given course's index entry for the given fields. new_values_dict
should be a subset of the dict returned by get_course_index_info.
It cannot include '_id' (will raise IllegalArgument).
Provide update_versions=True if you intend this to replace the versions hash.
Note, this operation can be dangerous and break running courses.
Change the given course's index entry.
If the dict includes versions and not update_versions, it will raise an exception.
If the dict includes edited_on or edited_by, it will raise an exception
Note, this operation can be dangerous and break running courses.
Does not return anything useful.
"""
# TODO how should this log the change? edited_on and edited_by for this entry
# has the semantic of who created the course and when; so, changing those will lose
# that information.
if '_id' in new_values_dict:
raise ValueError("Cannot override _id")
if 'edited_on' in new_values_dict or 'edited_by' in new_values_dict:
raise ValueError("Cannot set edited_on or edited_by")
if not update_versions and 'versions' in new_values_dict:
raise ValueError("Cannot override versions without setting update_versions")
self.course_index.update({'_id': course_locator.course_id},
{'$set': new_values_dict})
self.db_connection.update_course_index(updated_index_entry)
def delete_item(self, usage_locator, user_id, delete_children=False, force=False):
"""
......@@ -1182,7 +1137,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
remove_subtree(usage_locator.usage_id)
# update index if appropriate and structures
self.structures.insert(new_structure)
self.db_connection.insert_structure(new_structure)
result = CourseLocator(version_guid=new_id)
......@@ -1204,11 +1159,11 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
:param course_id: uses course_id rather than locator to emphasize its global effect
"""
index = self.course_index.find_one({'_id': course_id})
index = self.db_connection.get_course_index(course_id)
if index is None:
raise ItemNotFoundError(course_id)
# this is the only real delete in the system. should it do something else?
self.course_index.remove(index['_id'])
self.db_connection.delete_course_index(index['_id'])
def get_errored_courses(self):
"""
......@@ -1296,7 +1251,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
block['fields']["children"] = [
usage_id for usage_id in block['fields']["children"] if usage_id in original_structure['blocks']
]
self.structures.update({'_id': original_structure['_id']}, original_structure)
self.db_connection.update_structure(original_structure)
# clear cache again b/c inheritance may be wrong over orphans
self._clear_cache(original_structure['_id'])
......@@ -1379,7 +1334,7 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
else:
return None
else:
index_entry = self.course_index.find_one({'_id': locator.course_id})
index_entry = self.db_connection.get_course_index(locator.course_id)
is_head = (
locator.version_guid is None or
index_entry['versions'][locator.branch] == locator.version_guid
......@@ -1424,9 +1379,8 @@ class SplitMongoModuleStore(ModuleStoreWriteBase):
:param course_locator:
:param new_id:
"""
self.course_index.update(
{"_id": index_entry["_id"]},
{"$set": {"versions.{}".format(branch): new_id}})
index_entry['versions'][branch] = new_id
self.db_connection.update_course_index(index_entry)
def _partition_fields_by_scope(self, category, fields):
"""
......
......@@ -59,9 +59,9 @@ class TestMigration(unittest.TestCase):
dbref = self.loc_mapper.db
dbref.drop_collection(self.loc_mapper.location_map)
split_db = self.split_mongo.db
split_db.drop_collection(split_db.course_index)
split_db.drop_collection(split_db.structures)
split_db.drop_collection(split_db.definitions)
split_db.drop_collection(self.split_mongo.db_connection.course_index)
split_db.drop_collection(self.split_mongo.db_connection.structures)
split_db.drop_collection(self.split_mongo.db_connection.definitions)
# old_mongo doesn't give a db attr, but all of the dbs are the same
dbref.drop_collection(self.old_mongo.collection)
......
......@@ -1018,41 +1018,29 @@ class TestCourseCreation(SplitModuleTest):
Test changing the org, pretty id, etc of a course. Test that it doesn't allow changing the id, etc.
"""
locator = CourseLocator(course_id="GreekHero", branch='draft')
modulestore().update_course_index(locator, {'org': 'funkyU'})
course_info = modulestore().get_course_index_info(locator)
course_info['org'] = 'funkyU'
modulestore().update_course_index(course_info)
course_info = modulestore().get_course_index_info(locator)
self.assertEqual(course_info['org'], 'funkyU')
modulestore().update_course_index(locator, {'org': 'moreFunky', 'prettyid': 'Ancient Greek Demagods'})
course_info['org'] = 'moreFunky'
course_info['prettyid'] = 'Ancient Greek Demagods'
modulestore().update_course_index(course_info)
course_info = modulestore().get_course_index_info(locator)
self.assertEqual(course_info['org'], 'moreFunky')
self.assertEqual(course_info['prettyid'], 'Ancient Greek Demagods')
self.assertRaises(ValueError, modulestore().update_course_index, locator, {'_id': 'funkygreeks'})
with self.assertRaises(ValueError):
modulestore().update_course_index(
locator,
{'edited_on': datetime.datetime.now(UTC)}
)
with self.assertRaises(ValueError):
modulestore().update_course_index(
locator,
{'edited_by': 'sneak'}
)
self.assertRaises(ValueError, modulestore().update_course_index, locator,
{'versions': {'draft': self.GUID_D1}})
# an allowed but not necessarily recommended way to revert the draft version
versions = course_info['versions']
versions['draft'] = self.GUID_D1
modulestore().update_course_index(locator, {'versions': versions}, update_versions=True)
modulestore().update_course_index(course_info)
course = modulestore().get_course(locator)
self.assertEqual(str(course.location.version_guid), self.GUID_D1)
# an allowed but not recommended way to publish a course
versions['published'] = self.GUID_D1
modulestore().update_course_index(locator, {'versions': versions}, update_versions=True)
modulestore().update_course_index(course_info)
course = modulestore().get_course(CourseLocator(course_id=locator.course_id, branch="published"))
self.assertEqual(str(course.location.version_guid), self.GUID_D1)
......@@ -1068,9 +1056,9 @@ class TestCourseCreation(SplitModuleTest):
self.assertEqual(new_course.location.usage_id, 'top')
self.assertEqual(new_course.category, 'chapter')
# look at db to verify
db_structure = modulestore().structures.find_one({
'_id': new_course.location.as_object_id(new_course.location.version_guid)
})
db_structure = modulestore().db_connection.get_structure(
new_course.location.as_object_id(new_course.location.version_guid)
)
self.assertIsNotNone(db_structure, "Didn't find course")
self.assertNotIn('course', db_structure['blocks'])
self.assertIn('top', db_structure['blocks'])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment