Commit 562f0e31 by Calen Pennington

Add bulk operations to split modulestore

parent f731d5fe
......@@ -54,20 +54,16 @@ class DuplicateItemError(Exception):
self, Exception.__str__(self, *args, **kwargs)
)
class VersionConflictError(Exception):
"""
The caller asked for either draft or published head and gave a version which conflicted with it.
"""
def __init__(self, requestedLocation, currentHeadVersionGuid):
super(VersionConflictError, self).__init__()
self.requestedLocation = requestedLocation
self.currentHeadVersionGuid = currentHeadVersionGuid
def __str__(self, *args, **kwargs):
"""
Print requested and current head info
"""
return u'Requested {} but {} is current head'.format(self.requestedLocation, self.currentHeadVersionGuid)
super(VersionConflictError, self).__init__(u'Requested {}, but current head is {}'.format(
requestedLocation,
currentHeadVersionGuid
))
class DuplicateCourseError(Exception):
......
......@@ -17,6 +17,7 @@ import sys
import logging
import copy
import re
import threading
from uuid import uuid4
from bson.son import SON
......@@ -439,7 +440,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase):
"""
Prevent updating the meta-data inheritance cache for the given course
"""
if not hasattr(self.ignore_write_events_on_courses.courses):
if not hasattr(self.ignore_write_events_on_courses, 'courses'):
self.ignore_write_events_on_courses.courses = set()
self.ignore_write_events_on_courses.courses.add(course_id)
......@@ -449,18 +450,18 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase):
Restart updating the meta-data inheritance cache for the given course.
Refresh the meta-data inheritance cache now since it was temporarily disabled.
"""
if not hasattr(self.ignore_write_events_on_courses.courses):
if not hasattr(self.ignore_write_events_on_courses, 'courses'):
return
if course_id in self.ignore_write_events_on_courses:
self.ignore_write_events_on_courses.remove(course_id)
if course_id in self.ignore_write_events_on_courses.courses:
self.ignore_write_events_on_courses.courses.remove(course_id)
self.refresh_cached_metadata_inheritance_tree(course_id)
def _is_bulk_write_in_progress(self, course_id):
"""
Returns whether a bulk write operation is in progress for the given course.
"""
if not hasattr(self.ignore_write_events_on_courses.courses):
if not hasattr(self.ignore_write_events_on_courses, 'courses'):
return False
course_id = course_id.for_branch(None)
......
......@@ -55,24 +55,25 @@ class SplitMigrator(object):
new_run = source_course_key.run
new_course_key = CourseLocator(new_org, new_course, new_run, branch=ModuleStoreEnum.BranchName.published)
new_fields = self._get_fields_translate_references(original_course, new_course_key, None)
if fields:
new_fields.update(fields)
new_course = self.split_modulestore.create_course(
new_org, new_course, new_run, user_id,
fields=new_fields,
master_branch=ModuleStoreEnum.BranchName.published,
skip_auto_publish=True,
**kwargs
)
with self.split_modulestore.bulk_write_operations(new_course.id):
self._copy_published_modules_to_course(
new_course, original_course.location, source_course_key, user_id, **kwargs
with self.split_modulestore.bulk_write_operations(new_course_key):
new_fields = self._get_fields_translate_references(original_course, new_course_key, None)
if fields:
new_fields.update(fields)
new_course = self.split_modulestore.create_course(
new_org, new_course, new_run, user_id,
fields=new_fields,
master_branch=ModuleStoreEnum.BranchName.published,
skip_auto_publish=True,
**kwargs
)
# create a new version for the drafts
with self.split_modulestore.bulk_write_operations(new_course.id):
self._add_draft_modules_to_course(new_course.location, source_course_key, user_id, **kwargs)
with self.split_modulestore.bulk_write_operations(new_course.id):
self._copy_published_modules_to_course(
new_course, original_course.location, source_course_key, user_id, **kwargs
)
# create a new version for the drafts
self._add_draft_modules_to_course(new_course.location, source_course_key, user_id, **kwargs)
return new_course.id
......@@ -101,7 +102,6 @@ class SplitMigrator(object):
fields=self._get_fields_translate_references(
module, course_version_locator, new_course.location.block_id
),
continue_version=True,
skip_auto_publish=True,
**kwargs
)
......@@ -109,7 +109,7 @@ class SplitMigrator(object):
index_info = self.split_modulestore.get_course_index_info(course_version_locator)
versions = index_info['versions']
versions[ModuleStoreEnum.BranchName.draft] = versions[ModuleStoreEnum.BranchName.published]
self.split_modulestore.update_course_index(index_info)
self.split_modulestore.update_course_index(course_version_locator, index_info)
# clean up orphans in published version: in old mongo, parents pointed to the union of their published and draft
# children which meant some pointers were to non-existent locations in 'direct'
......
......@@ -2,7 +2,7 @@ import sys
import logging
from xblock.runtime import KvsFieldData
from xblock.fields import ScopeIds
from opaque_keys.edx.locator import BlockUsageLocator, LocalId, CourseLocator
from opaque_keys.edx.locator import BlockUsageLocator, LocalId, CourseLocator, DefinitionLocator
from xmodule.mako_module import MakoDescriptorSystem
from xmodule.error_module import ErrorDescriptor
from xmodule.errortracker import exc_info_to_str
......@@ -10,6 +10,7 @@ from xmodule.modulestore.split_mongo import encode_key_for_mongo
from ..exceptions import ItemNotFoundError
from .split_mongo_kvs import SplitMongoKVS
from fs.osfs import OSFS
from .definition_lazy_loader import DefinitionLazyLoader
log = logging.getLogger(__name__)
......@@ -120,9 +121,24 @@ class CachingDescriptorSystem(MakoDescriptorSystem):
self.course_entry['org'] = course_entry_override['org']
self.course_entry['course'] = course_entry_override['course']
self.course_entry['run'] = course_entry_override['run']
# most likely a lazy loader or the id directly
definition = json_data.get('definition', {})
definition_id = self.modulestore.definition_locator(definition)
definition_id = json_data.get('definition')
block_type = json_data['category']
if definition_id is not None and not json_data.get('definition_loaded', False):
definition_loader = DefinitionLazyLoader(
self.modulestore, block_type, definition_id,
lambda fields: self.modulestore.convert_references_to_keys(
course_key, self.load_block_type(block_type),
fields, self.course_entry['structure']['blocks'],
)
)
else:
definition_loader = None
# If no definition id is provide, generate an in-memory id
if definition_id is None:
definition_id = LocalId()
# If no usage id is provided, generate an in-memory id
if block_id is None:
......@@ -130,7 +146,7 @@ class CachingDescriptorSystem(MakoDescriptorSystem):
block_locator = BlockUsageLocator(
course_key,
block_type=json_data.get('category'),
block_type=block_type,
block_id=block_id,
)
......@@ -138,7 +154,7 @@ class CachingDescriptorSystem(MakoDescriptorSystem):
block_locator.course_key, class_, json_data.get('fields', {}), self.course_entry['structure']['blocks'],
)
kvs = SplitMongoKVS(
definition,
definition_loader,
converted_fields,
json_data.get('_inherited_settings'),
**kwargs
......@@ -148,7 +164,7 @@ class CachingDescriptorSystem(MakoDescriptorSystem):
try:
module = self.construct_xblock_from_class(
class_,
ScopeIds(None, json_data.get('category'), definition_id, block_locator),
ScopeIds(None, block_type, definition_id, block_locator),
field_data,
)
except Exception:
......@@ -174,7 +190,7 @@ class CachingDescriptorSystem(MakoDescriptorSystem):
module.previous_version = edit_info.get('previous_version')
module.update_version = edit_info.get('update_version')
module.source_version = edit_info.get('source_version', None)
module.definition_locator = definition_id
module.definition_locator = DefinitionLocator(block_type, definition_id)
# decache any pending field settings
module.save()
......
from opaque_keys.edx.locator import DefinitionLocator
from bson import SON
class DefinitionLazyLoader(object):
......@@ -24,3 +25,9 @@ class DefinitionLazyLoader(object):
loader pointer with the result so as not to fetch more than once
"""
return self.modulestore.db_connection.get_definition(self.definition_locator.definition_id)
def as_son(self):
return SON((
('category', self.definition_locator.block_type),
('definition', self.definition_locator.definition_id)
))
......@@ -76,6 +76,12 @@ class MongoConnection(object):
"""
self.structures.update({'_id': structure['_id']}, structure)
def upsert_structure(self, structure):
"""
Update the db record for structure, creating that record if it doesn't already exist
"""
self.structures.update({'_id': structure['_id']}, structure, upsert=True)
def get_course_index(self, key, ignore_case=False):
"""
Get the course_index from the persistence mechanism whose id is the given key
......@@ -101,13 +107,21 @@ class MongoConnection(object):
"""
self.course_index.insert(course_index)
def update_course_index(self, course_index):
def update_course_index(self, course_index, from_index=None):
"""
Update the db record for course_index
Update the db record for course_index.
Arguments:
from_index: If set, only update an index if it matches the one specified in `from_index`.
"""
self.course_index.update(
son.SON([('org', course_index['org']), ('course', course_index['course']), ('run', course_index['run'])]),
course_index
from_index or son.SON([
('org', course_index['org']),
('course', course_index['course']),
('run', course_index['run'])
]),
course_index,
upsert=False,
)
def delete_course_index(self, course_index):
......
......@@ -298,4 +298,12 @@ def check_mongo_calls(mongo_store, num_finds=0, num_sends=None):
finally:
map(lambda wrap_patch: wrap_patch.stop(), wrap_patches)
call_count = sum([find_wrap.call_count for find_wrap in find_wraps])
assert_equal(call_count, num_finds)
assert_equal(
call_count,
num_finds,
"Expected {} calls, {} were made. Calls: {}".format(
num_finds,
call_count,
[find_wrap.call_args_list for find_wrap in find_wraps]
)
)
......@@ -127,15 +127,16 @@ class TestMixedModuleStore(unittest.TestCase):
Create a course w/ one item in the persistence store using the given course & item location.
"""
# create course
self.course = self.store.create_course(course_key.org, course_key.course, course_key.run, self.user_id)
if isinstance(self.course.id, CourseLocator):
self.course_locations[self.MONGO_COURSEID] = self.course.location
else:
self.assertEqual(self.course.id, course_key)
with self.store.bulk_write_operations(course_key):
self.course = self.store.create_course(course_key.org, course_key.course, course_key.run, self.user_id)
if isinstance(self.course.id, CourseLocator):
self.course_locations[self.MONGO_COURSEID] = self.course.location
else:
self.assertEqual(self.course.id, course_key)
# create chapter
chapter = self.store.create_child(self.user_id, self.course.location, 'chapter', block_id='Overview')
self.writable_chapter_location = chapter.location
# create chapter
chapter = self.store.create_child(self.user_id, self.course.location, 'chapter', block_id='Overview')
self.writable_chapter_location = chapter.location
def _create_block_hierarchy(self):
"""
......@@ -188,8 +189,9 @@ class TestMixedModuleStore(unittest.TestCase):
create_sub_tree(block, tree)
setattr(self, block_info.field_name, block.location)
for tree in trees:
create_sub_tree(self.course, tree)
with self.store.bulk_write_operations(self.course.id):
for tree in trees:
create_sub_tree(self.course, tree)
def _course_key_from_string(self, string):
"""
......@@ -349,10 +351,9 @@ class TestMixedModuleStore(unittest.TestCase):
)
# draft: 2 to look in draft and then published and then 5 for updating ancestors.
# split: 3 to get the course structure & the course definition (show_calculator is scope content)
# before the change. 1 during change to refetch the definition. 3 afterward (b/c it calls get_item to return the "new" object).
# split: 1 for the course index, 1 for the course structure before the change, 1 for the structure after the change
# 2 sends to update index & structure (calculator is a setting field)
@ddt.data(('draft', 7, 5), ('split', 6, 2))
@ddt.data(('draft', 7, 5), ('split', 3, 2))
@ddt.unpack
def test_update_item(self, default_ms, max_find, max_send):
"""
......@@ -434,7 +435,7 @@ class TestMixedModuleStore(unittest.TestCase):
component = self.store.publish(component.location, self.user_id)
self.assertFalse(self.store.has_changes(component))
@ddt.data(('draft', 7, 2), ('split', 13, 4))
@ddt.data(('draft', 7, 2), ('split', 2, 4))
@ddt.unpack
def test_delete_item(self, default_ms, max_find, max_send):
"""
......@@ -453,7 +454,7 @@ class TestMixedModuleStore(unittest.TestCase):
with self.assertRaises(ItemNotFoundError):
self.store.get_item(self.writable_chapter_location)
@ddt.data(('draft', 8, 2), ('split', 13, 4))
@ddt.data(('draft', 8, 2), ('split', 2, 4))
@ddt.unpack
def test_delete_private_vertical(self, default_ms, max_find, max_send):
"""
......@@ -499,7 +500,7 @@ class TestMixedModuleStore(unittest.TestCase):
self.assertFalse(self.store.has_item(leaf_loc))
self.assertNotIn(vert_loc, course.children)
@ddt.data(('draft', 4, 1), ('split', 5, 2))
@ddt.data(('draft', 4, 1), ('split', 1, 2))
@ddt.unpack
def test_delete_draft_vertical(self, default_ms, max_find, max_send):
"""
......@@ -579,7 +580,7 @@ class TestMixedModuleStore(unittest.TestCase):
xml_store.create_course("org", "course", "run", self.user_id)
# draft is 2 to compute inheritance
# split is 3 b/c it gets the definition to check whether wiki is set
# split is 3 (one for the index, one for the definition to check if the wiki is set, and one for the course structure
@ddt.data(('draft', 2, 0), ('split', 3, 0))
@ddt.unpack
def test_get_course(self, default_ms, max_find, max_send):
......@@ -884,7 +885,7 @@ class TestMixedModuleStore(unittest.TestCase):
mongo_store = self.store._get_modulestore_for_courseid(self._course_key_from_string(self.MONGO_COURSEID))
with check_mongo_calls(mongo_store, max_find, max_send):
found_orphans = self.store.get_orphans(self.course_locations[self.MONGO_COURSEID].course_key)
self.assertEqual(set(found_orphans), set(orphan_locations))
self.assertItemsEqual(found_orphans, orphan_locations)
@ddt.data('draft')
def test_create_item_from_parent_location(self, default_ms):
......@@ -953,7 +954,9 @@ class TestMixedModuleStore(unittest.TestCase):
self.assertEqual(len(self.store.get_courses_for_wiki('edX.simple.2012_Fall')), 0)
self.assertEqual(len(self.store.get_courses_for_wiki('no_such_wiki')), 0)
@ddt.data(('draft', 2, 6), ('split', 7, 2))
# Split takes 1 query to read the course structure, deletes all of the entries in memory, and loads the module from an in-memory cache
# Only writes the course structure back to the database once
@ddt.data(('draft', 2, 6), ('split', 1, 1))
@ddt.unpack
def test_unpublish(self, default_ms, max_find, max_send):
"""
......
......@@ -287,9 +287,9 @@ class CourseComparisonTest(unittest.TestCase):
self.assertEqual(expected_item.has_children, actual_item.has_children)
if expected_item.has_children:
expected_children = [
(course1_item_child.location.block_type, course1_item_child.location.block_id)
(expected_item_child.location.block_type, expected_item_child.location.block_id)
# get_children() rather than children to strip privates from public parents
for course1_item_child in expected_item.get_children()
for expected_item_child in expected_item.get_children()
]
actual_children = [
(item_child.location.block_type, item_child.location.block_id)
......
......@@ -91,7 +91,10 @@ def test_lib(options):
}
if test_id:
lib = '/'.join(test_id.split('/')[0:3])
if '/' in test_id:
lib = '/'.join(test_id.split('/')[0:3])
else:
lib = 'common/lib/' + test_id.split('.')[0]
opts['test_id'] = test_id
lib_tests = [suites.LibTestSuite(lib, **opts)]
else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment