Commit 1a682dac by Calen Pennington

Include caching definitions in split bulk operations

parent 40c3253d
...@@ -153,7 +153,10 @@ class CachingDescriptorSystem(MakoDescriptorSystem, EditInfoRuntimeMixin): ...@@ -153,7 +153,10 @@ class CachingDescriptorSystem(MakoDescriptorSystem, EditInfoRuntimeMixin):
if definition_id is not None and not json_data.get('definition_loaded', False): if definition_id is not None and not json_data.get('definition_loaded', False):
definition_loader = DefinitionLazyLoader( definition_loader = DefinitionLazyLoader(
self.modulestore, block_key.type, definition_id, self.modulestore,
course_key,
block_key.type,
definition_id,
lambda fields: self.modulestore.convert_references_to_keys( lambda fields: self.modulestore.convert_references_to_keys(
course_key, self.load_block_type(block_key.type), course_key, self.load_block_type(block_key.type),
fields, self.course_entry.structure['blocks'], fields, self.course_entry.structure['blocks'],
......
...@@ -8,13 +8,14 @@ class DefinitionLazyLoader(object): ...@@ -8,13 +8,14 @@ class DefinitionLazyLoader(object):
object doesn't force access during init but waits until client wants the object doesn't force access during init but waits until client wants the
definition. Only works if the modulestore is a split mongo store. definition. Only works if the modulestore is a split mongo store.
""" """
def __init__(self, modulestore, block_type, definition_id, field_converter): def __init__(self, modulestore, course_key, block_type, definition_id, field_converter):
""" """
Simple placeholder for yet-to-be-fetched data Simple placeholder for yet-to-be-fetched data
:param modulestore: the pymongo db connection with the definitions :param modulestore: the pymongo db connection with the definitions
:param definition_locator: the id of the record in the above to fetch :param definition_locator: the id of the record in the above to fetch
""" """
self.modulestore = modulestore self.modulestore = modulestore
self.course_key = course_key
self.definition_locator = DefinitionLocator(block_type, definition_id) self.definition_locator = DefinitionLocator(block_type, definition_id)
self.field_converter = field_converter self.field_converter = field_converter
...@@ -23,4 +24,4 @@ class DefinitionLazyLoader(object): ...@@ -23,4 +24,4 @@ class DefinitionLazyLoader(object):
Fetch the definition. Note, the caller should replace this lazy Fetch the definition. Note, the caller should replace this lazy
loader pointer with the result so as not to fetch more than once loader pointer with the result so as not to fetch more than once
""" """
return self.modulestore.db_connection.get_definition(self.definition_locator.definition_id) return self.modulestore.get_definition(self.course_key, self.definition_locator.definition_id)
...@@ -268,18 +268,17 @@ class MongoConnection(object): ...@@ -268,18 +268,17 @@ class MongoConnection(object):
return self.definitions.find_one({'_id': key}) return self.definitions.find_one({'_id': key})
@autoretry_read() @autoretry_read()
def find_matching_definitions(self, query): def get_definitions(self, definitions):
""" """
Find the definitions matching the query. Right now the query must be a legal mongo query Retrieve all definitions listed in `definitions`.
:param query: a mongo-style query of {key: [value|{$in ..}|..], ..}
""" """
return self.definitions.find(query) return self.definitions.find({'$in': {'_id': definitions}})
def insert_definition(self, definition): def upsert_definition(self, definition):
""" """
Create the definition in the db Create the definition in the db
""" """
self.definitions.insert(definition) self.definitions.update({'_id': definition['_id']}, definition, upsert=True)
def ensure_indexes(self): def ensure_indexes(self):
""" """
......
...@@ -21,6 +21,7 @@ class TestBulkWriteMixin(unittest.TestCase): ...@@ -21,6 +21,7 @@ class TestBulkWriteMixin(unittest.TestCase):
self.course_key = CourseLocator('org', 'course', 'run-a', branch='test') self.course_key = CourseLocator('org', 'course', 'run-a', branch='test')
self.course_key_b = CourseLocator('org', 'course', 'run-b', branch='test') self.course_key_b = CourseLocator('org', 'course', 'run-b', branch='test')
self.structure = {'this': 'is', 'a': 'structure', '_id': ObjectId()} self.structure = {'this': 'is', 'a': 'structure', '_id': ObjectId()}
self.definition = {'this': 'is', 'a': 'definition', '_id': ObjectId()}
self.index_entry = {'this': 'is', 'an': 'index'} self.index_entry = {'this': 'is', 'an': 'index'}
def assertConnCalls(self, *calls): def assertConnCalls(self, *calls):
...@@ -66,6 +67,20 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -66,6 +67,20 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.assertConnCalls(call.upsert_structure(self.structure)) self.assertConnCalls(call.upsert_structure(self.structure))
self.clear_cache.assert_called_once_with(self.structure['_id']) self.clear_cache.assert_called_once_with(self.structure['_id'])
@ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId())
def test_no_bulk_read_definition(self, version_guid):
# Reading a definition when no bulk operation is active should just call
# through to the db_connection
result = self.bulk.get_definition(self.course_key, version_guid)
self.assertConnCalls(call.get_definition(self.course_key.as_object_id(version_guid)))
self.assertEqual(result, self.conn.get_definition.return_value)
def test_no_bulk_write_definition(self):
# Writing a definition when no bulk operation is active should just
# call through to the db_connection.
self.bulk.update_definition(self.course_key, self.definition)
self.assertConnCalls(call.upsert_definition(self.definition))
@ddt.data(True, False) @ddt.data(True, False)
def test_no_bulk_read_index(self, ignore_case): def test_no_bulk_read_index(self, ignore_case):
# Reading a course index when no bulk operation is active should just call # Reading a course index when no bulk operation is active should just call
...@@ -129,6 +144,68 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -129,6 +144,68 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.conn.mock_calls self.conn.mock_calls
) )
def test_write_index_and_definition_on_close(self):
original_index = {'versions': {}}
self.conn.get_course_index.return_value = copy.deepcopy(original_index)
self.bulk._begin_bulk_operation(self.course_key)
self.conn.reset_mock()
self.bulk.update_definition(self.course_key, self.definition)
self.bulk.insert_course_index(self.course_key, {'versions': {self.course_key.branch: self.definition['_id']}})
self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key)
self.assertConnCalls(
call.upsert_definition(self.definition),
call.update_course_index(
{'versions': {self.course_key.branch: self.definition['_id']}},
from_index=original_index
)
)
def test_write_index_and_multiple_definitions_on_close(self):
original_index = {'versions': {'a': ObjectId(), 'b': ObjectId()}}
self.conn.get_course_index.return_value = copy.deepcopy(original_index)
self.bulk._begin_bulk_operation(self.course_key)
self.conn.reset_mock()
self.bulk.update_definition(self.course_key.replace(branch='a'), self.definition)
other_definition = {'another': 'definition', '_id': ObjectId()}
self.bulk.update_definition(self.course_key.replace(branch='b'), other_definition)
self.bulk.insert_course_index(self.course_key, {'versions': {'a': self.definition['_id'], 'b': other_definition['_id']}})
self.bulk._end_bulk_operation(self.course_key)
self.assertItemsEqual(
[
call.upsert_definition(self.definition),
call.upsert_definition(other_definition),
call.update_course_index(
{'versions': {'a': self.definition['_id'], 'b': other_definition['_id']}},
from_index=original_index
)
],
self.conn.mock_calls
)
def test_write_definition_on_close(self):
self.conn.get_course_index.return_value = None
self.bulk._begin_bulk_operation(self.course_key)
self.conn.reset_mock()
self.bulk.update_definition(self.course_key, self.definition)
self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key)
self.assertConnCalls(call.upsert_definition(self.definition))
def test_write_multiple_definitions_on_close(self):
self.conn.get_course_index.return_value = None
self.bulk._begin_bulk_operation(self.course_key)
self.conn.reset_mock()
self.bulk.update_definition(self.course_key.replace(branch='a'), self.definition)
other_definition = {'another': 'definition', '_id': ObjectId()}
self.bulk.update_definition(self.course_key.replace(branch='b'), other_definition)
self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key)
self.assertItemsEqual(
[call.upsert_definition(self.definition), call.upsert_definition(other_definition)],
self.conn.mock_calls
)
def test_write_index_and_structure_on_close(self): def test_write_index_and_structure_on_close(self):
original_index = {'versions': {}} original_index = {'versions': {}}
self.conn.get_course_index.return_value = copy.deepcopy(original_index) self.conn.get_course_index.return_value = copy.deepcopy(original_index)
...@@ -181,6 +258,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -181,6 +258,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
get_result = self.bulk.get_structure(self.course_key, version_result['_id']) get_result = self.bulk.get_structure(self.course_key, version_result['_id'])
self.assertEquals(version_result, get_result) self.assertEquals(version_result, get_result)
class TestBulkWriteMixinClosedAfterPrevTransaction(TestBulkWriteMixinClosed, TestBulkWriteMixinPreviousTransaction): class TestBulkWriteMixinClosedAfterPrevTransaction(TestBulkWriteMixinClosed, TestBulkWriteMixinPreviousTransaction):
""" """
Test that operations on with a closed transaction aren't affected by a previously executed transaction Test that operations on with a closed transaction aren't affected by a previously executed transaction
...@@ -307,6 +385,37 @@ class TestBulkWriteMixinFindMethods(TestBulkWriteMixin): ...@@ -307,6 +385,37 @@ class TestBulkWriteMixinFindMethods(TestBulkWriteMixin):
else: else:
self.assertNotIn(db_structure(_id), results) self.assertNotIn(db_structure(_id), results)
@ddt.data(
([], [], []),
([1, 2, 3], [1, 2], [1, 2]),
([1, 2, 3], [1], [1, 2]),
([1, 2, 3], [], [1, 2]),
)
@ddt.unpack
def test_get_definitions(self, search_ids, active_ids, db_ids):
db_definition = lambda _id: {'db': 'definition', '_id': _id}
active_definition = lambda _id: {'active': 'definition', '_id': _id}
db_definitions = [db_definition(_id) for _id in db_ids if _id not in active_ids]
for n, _id in enumerate(active_ids):
course_key = CourseLocator('org', 'course', 'run{}'.format(n))
self.bulk._begin_bulk_operation(course_key)
self.bulk.update_definition(course_key, active_definition(_id))
self.conn.get_definitions.return_value = db_definitions
results = self.bulk.get_definitions(search_ids)
self.conn.get_definitions.assert_called_once_with(list(set(search_ids) - set(active_ids)))
for _id in active_ids:
if _id in search_ids:
self.assertIn(active_definition(_id), results)
else:
self.assertNotIn(active_definition(_id), results)
for _id in db_ids:
if _id in search_ids and _id not in active_ids:
self.assertIn(db_definition(_id), results)
else:
self.assertNotIn(db_definition(_id), results)
def test_no_bulk_find_structures_derived_from(self): def test_no_bulk_find_structures_derived_from(self):
ids = [Mock(name='id')] ids = [Mock(name='id')]
self.conn.find_structures_derived_from.return_value = [MagicMock(name='result')] self.conn.find_structures_derived_from.return_value = [MagicMock(name='result')]
...@@ -456,6 +565,45 @@ class TestBulkWriteMixinOpen(TestBulkWriteMixin): ...@@ -456,6 +565,45 @@ class TestBulkWriteMixinOpen(TestBulkWriteMixin):
self.assertEquals(self.conn.get_structure.call_count, 1) self.assertEquals(self.conn.get_structure.call_count, 1)
self.assertEqual(result, self.structure) self.assertEqual(result, self.structure)
@ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId())
def test_read_definition_without_write_from_db(self, version_guid):
# Reading a definition before it's been written (while in bulk operation mode)
# returns the definition from the database
result = self.bulk.get_definition(self.course_key, version_guid)
self.assertEquals(self.conn.get_definition.call_count, 1)
self.assertEqual(result, self.conn.get_definition.return_value)
self.assertCacheNotCleared()
@ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId())
def test_read_definition_without_write_only_reads_once(self, version_guid):
# Reading the same definition multiple times shouldn't hit the database
# more than once
for _ in xrange(2):
result = self.bulk.get_definition(self.course_key, version_guid)
self.assertEquals(self.conn.get_definition.call_count, 1)
self.assertEqual(result, self.conn.get_definition.return_value)
self.assertCacheNotCleared()
@ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId())
def test_read_definition_after_write_no_db(self, version_guid):
# Reading a definition that's already been written shouldn't hit the db at all
self.definition['_id'] = version_guid
self.bulk.update_definition(self.course_key, self.definition)
result = self.bulk.get_definition(self.course_key, version_guid)
self.assertEquals(self.conn.get_definition.call_count, 0)
self.assertEqual(result, self.definition)
@ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId())
def test_read_definition_after_write_after_read(self, version_guid):
# Reading a definition that's been updated after being pulled from the db should
# still get the updated value
self.definition['_id'] = version_guid
self.bulk.get_definition(self.course_key, version_guid)
self.bulk.update_definition(self.course_key, self.definition)
result = self.bulk.get_definition(self.course_key, version_guid)
self.assertEquals(self.conn.get_definition.call_count, 1)
self.assertEqual(result, self.definition)
@ddt.data(True, False) @ddt.data(True, False)
def test_read_index_without_write_from_db(self, ignore_case): def test_read_index_without_write_from_db(self, ignore_case):
# Reading the index without writing to it should pull from the database # Reading the index without writing to it should pull from the database
......
...@@ -115,6 +115,7 @@ def toc_for_course(user, request, course, active_chapter, active_section, field_ ...@@ -115,6 +115,7 @@ def toc_for_course(user, request, course, active_chapter, active_section, field_
field_data_cache must include data from the course module and 2 levels of its descendents field_data_cache must include data from the course module and 2 levels of its descendents
''' '''
with modulestore().bulk_operations(course.id):
course_module = get_module_for_descriptor(user, request, course, field_data_cache, course.id) course_module = get_module_for_descriptor(user, request, course, field_data_cache, course.id)
if course_module is None: if course_module is None:
return None return None
......
...@@ -326,19 +326,29 @@ class TestTOC(ModuleStoreTestCase): ...@@ -326,19 +326,29 @@ class TestTOC(ModuleStoreTestCase):
self.request = factory.get(chapter_url) self.request = factory.get(chapter_url)
self.request.user = UserFactory() self.request.user = UserFactory()
self.modulestore = self.store._get_modulestore_for_courseid(self.course_key) self.modulestore = self.store._get_modulestore_for_courseid(self.course_key)
with self.modulestore.bulk_operations(self.course_key):
with check_mongo_calls(num_finds, num_sends): with check_mongo_calls(num_finds, num_sends):
self.toy_course = self.store.get_course(self.toy_loc, depth=2) self.toy_course = self.store.get_course(self.toy_loc, depth=2)
self.field_data_cache = FieldDataCache.cache_for_descriptor_descendents( self.field_data_cache = FieldDataCache.cache_for_descriptor_descendents(
self.toy_loc, self.request.user, self.toy_course, depth=2 self.toy_loc, self.request.user, self.toy_course, depth=2
) )
# TODO: LMS-11220: Document why split find count is 9 # Mongo makes 3 queries to load the course to depth 2:
# TODO: LMS-11220: Document why mongo find count is 4 # - 1 for the course
@ddt.data((ModuleStoreEnum.Type.mongo, 3, 0), (ModuleStoreEnum.Type.split, 9, 0)) # - 1 for its children
# - 1 for its grandchildren
# Split makes 6 queries to load the course to depth 2:
# - load the structure
# - load 5 definitions
# Split makes 2 queries to render the toc:
# - it loads the active version at the start of the bulk operation
# - it loads the course definition for inheritance, because it's outside
# the bulk-operation marker that loaded the course descriptor
@ddt.data((ModuleStoreEnum.Type.mongo, 3, 0, 0), (ModuleStoreEnum.Type.split, 6, 0, 2))
@ddt.unpack @ddt.unpack
def test_toc_toy_from_chapter(self, default_ms, num_finds, num_sends): def test_toc_toy_from_chapter(self, default_ms, setup_finds, setup_sends, toc_finds):
with self.store.default_store(default_ms): with self.store.default_store(default_ms):
self.setup_modulestore(default_ms, num_finds, num_sends) self.setup_modulestore(default_ms, setup_finds, setup_sends)
expected = ([{'active': True, 'sections': expected = ([{'active': True, 'sections':
[{'url_name': 'Toy_Videos', 'display_name': u'Toy Videos', 'graded': True, [{'url_name': 'Toy_Videos', 'display_name': u'Toy Videos', 'graded': True,
'format': u'Lecture Sequence', 'due': None, 'active': False}, 'format': u'Lecture Sequence', 'due': None, 'active': False},
...@@ -354,20 +364,29 @@ class TestTOC(ModuleStoreTestCase): ...@@ -354,20 +364,29 @@ class TestTOC(ModuleStoreTestCase):
'format': '', 'due': None, 'active': False}], 'format': '', 'due': None, 'active': False}],
'url_name': 'secret:magic', 'display_name': 'secret:magic'}]) 'url_name': 'secret:magic', 'display_name': 'secret:magic'}])
with check_mongo_calls(0, 0): with check_mongo_calls(toc_finds, 0):
actual = render.toc_for_course( actual = render.toc_for_course(
self.request.user, self.request, self.toy_course, self.chapter, None, self.field_data_cache self.request.user, self.request, self.toy_course, self.chapter, None, self.field_data_cache
) )
for toc_section in expected: for toc_section in expected:
self.assertIn(toc_section, actual) self.assertIn(toc_section, actual)
# TODO: LMS-11220: Document why split find count is 9 # Mongo makes 3 queries to load the course to depth 2:
# TODO: LMS-11220: Document why mongo find count is 4 # - 1 for the course
@ddt.data((ModuleStoreEnum.Type.mongo, 3, 0), (ModuleStoreEnum.Type.split, 9, 0)) # - 1 for its children
# - 1 for its grandchildren
# Split makes 6 queries to load the course to depth 2:
# - load the structure
# - load 5 definitions
# Split makes 2 queries to render the toc:
# - it loads the active version at the start of the bulk operation
# - it loads the course definition for inheritance, because it's outside
# the bulk-operation marker that loaded the course descriptor
@ddt.data((ModuleStoreEnum.Type.mongo, 3, 0, 0), (ModuleStoreEnum.Type.split, 6, 0, 2))
@ddt.unpack @ddt.unpack
def test_toc_toy_from_section(self, default_ms, num_finds, num_sends): def test_toc_toy_from_section(self, default_ms, setup_finds, setup_sends, toc_finds):
with self.store.default_store(default_ms): with self.store.default_store(default_ms):
self.setup_modulestore(default_ms, num_finds, num_sends) self.setup_modulestore(default_ms, setup_finds, setup_sends)
section = 'Welcome' section = 'Welcome'
expected = ([{'active': True, 'sections': expected = ([{'active': True, 'sections':
[{'url_name': 'Toy_Videos', 'display_name': u'Toy Videos', 'graded': True, [{'url_name': 'Toy_Videos', 'display_name': u'Toy Videos', 'graded': True,
...@@ -384,6 +403,7 @@ class TestTOC(ModuleStoreTestCase): ...@@ -384,6 +403,7 @@ class TestTOC(ModuleStoreTestCase):
'format': '', 'due': None, 'active': False}], 'format': '', 'due': None, 'active': False}],
'url_name': 'secret:magic', 'display_name': 'secret:magic'}]) 'url_name': 'secret:magic', 'display_name': 'secret:magic'}])
with check_mongo_calls(toc_finds, 0):
actual = render.toc_for_course(self.request.user, self.request, self.toy_course, self.chapter, section, self.field_data_cache) actual = render.toc_for_course(self.request.user, self.request, self.toy_course, self.chapter, section, self.field_data_cache)
for toc_section in expected: for toc_section in expected:
self.assertIn(toc_section, actual) self.assertIn(toc_section, actual)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment