Commit 3672c9a2 by Calen Pennington

Make find_matching_* work during split bulk operations

parent e7ce4106
...@@ -205,9 +205,10 @@ class TemplateTests(unittest.TestCase): ...@@ -205,9 +205,10 @@ class TemplateTests(unittest.TestCase):
data="<problem></problem>" data="<problem></problem>"
) )
# course root only updated 2x # The draft course root has 2 revisions: the published revision, and then the subsequent
# changes to the draft revision
version_history = self.split_store.get_block_generations(test_course.location) version_history = self.split_store.get_block_generations(test_course.location)
# create course causes 2 versions for the time being; skip the first. # Base calculations on the draft revision, not the initial published revision
version_history = version_history.children[0] version_history = version_history.children[0]
self.assertEqual(version_history.locator.version_guid, test_course.location.version_guid) self.assertEqual(version_history.locator.version_guid, test_course.location.version_guid)
self.assertEqual(len(version_history.children), 1) self.assertEqual(len(version_history.children), 1)
......
...@@ -57,24 +57,36 @@ class MongoConnection(object): ...@@ -57,24 +57,36 @@ class MongoConnection(object):
""" """
return self.structures.find_one({'_id': key}) return self.structures.find_one({'_id': key})
def find_matching_structures(self, query): def find_structures_by_id(self, ids):
""" """
Find the structure matching the query. Right now the query must be a legal mongo query Return all structures that specified in ``ids``.
:param query: a mongo-style query of {key: [value|{$in ..}|..], ..}
Arguments:
ids (list): A list of structure ids
""" """
return self.structures.find(query) return self.structures.find({'_id': {'$in': ids}})
def insert_structure(self, structure): def find_structures_derived_from(self, ids):
""" """
Create the structure in the db Return all structures that were immediately derived from a structure listed in ``ids``.
Arguments:
ids (list): A list of structure ids
""" """
self.structures.insert(structure) return self.structures.find({'previous_version': {'$in': ids}})
def update_structure(self, structure): def find_ancestor_structures(self, original_version, block_id):
""" """
Update the db record for structure Find all structures that originated from ``original_version`` that contain ``block_id``.
Arguments:
original_version (str or ObjectID): The id of a structure
block_id (str): The id of the block in question
""" """
self.structures.update({'_id': structure['_id']}, structure) return self.structures.find({
'original_version': original_version,
'blocks.{}.edit_info.update_version'.format(block_id): {'$exists': True}
})
def upsert_structure(self, structure): def upsert_structure(self, structure):
""" """
...@@ -94,11 +106,23 @@ class MongoConnection(object): ...@@ -94,11 +106,23 @@ class MongoConnection(object):
]) ])
) )
def find_matching_course_indexes(self, query): def find_matching_course_indexes(self, branch=None, search_targets=None):
""" """
Find the course_index matching the query. Right now the query must be a legal mongo query Find the course_index matching particular conditions.
:param query: a mongo-style query of {key: [value|{$in ..}|..], ..}
Arguments:
branch: If specified, this branch must exist in the returned courses
search_targets: If specified, this must be a dictionary specifying field values
that must exist in the search_targets of the returned courses
""" """
query = son.SON()
if branch is not None:
query['versions.{}'.format(branch)] = {'$exists': True}
if search_targets:
for key, value in search_targets.iteritems():
query['search_targets.{}'.format(key)] = value
return self.course_index.find(query) return self.course_index.find(query)
def insert_course_index(self, course_index): def insert_course_index(self, course_index):
......
...@@ -107,28 +107,57 @@ EXCLUDE_ALL = '*' ...@@ -107,28 +107,57 @@ EXCLUDE_ALL = '*'
class BulkWriteRecord(object): class BulkWriteRecord(object):
def __init__(self): def __init__(self):
self.active_count = 0 self._active_count = 0
self.dirty_branches = set()
self.initial_index = None self.initial_index = None
self.index = None self.index = None
self._structures = {} self.structures = {}
self.structures_in_db = set()
def set_structure(self, branch, structure): # This stores the set of branches for whom version_structure
if self.index is not None: # has been called
self.index['versions'][branch] = structure['_id'] self.dirty_branches = set()
self._structures[branch] = structure
self.dirty_branches.add(branch) @property
def active(self):
"""
Return whether this bulk write is active.
"""
return self._active_count > 0
def nest(self):
"""
Record another level of nesting of this bulk write operation
"""
self._active_count += 1
def unnest(self):
"""
Record the completion of a level of nesting of the bulk write operation
"""
self._active_count -= 1
def get_structure(self, branch): @property
return self._structures.get(branch) def is_root(self):
"""
Return whether the bulk write is at the root (first) level of nesting
"""
return self._active_count == 1
def structure_for_branch(self, branch):
return self.structures.get(self.index.get('versions', {}).get(branch))
def set_structure_for_branch(self, branch, structure):
self.index.get('versions', {})[branch] = structure['_id']
self.structures[structure['_id']] = structure
self.dirty_branches.add(branch)
def __repr__(self): def __repr__(self):
return u"BulkWriteRecord<{}, {}, {}, {}, {}>".format( return u"BulkWriteRecord<{!r}, {!r}, {!r}, {!r}, {!r}>".format(
self.active_count, self._active_count,
self.dirty_branches,
self.initial_index, self.initial_index,
self.index, self.index,
self._structures, self.structures,
self.structures_in_db,
) )
...@@ -177,6 +206,15 @@ class BulkWriteMixin(object): ...@@ -177,6 +206,15 @@ class BulkWriteMixin(object):
# If nothing org/course/run aren't set, use a bulk record that is identified just by the version_guid # If nothing org/course/run aren't set, use a bulk record that is identified just by the version_guid
return self._active_bulk_writes.records[course_key.replace(org=None, course=None, run=None, branch=None)] return self._active_bulk_writes.records[course_key.replace(org=None, course=None, run=None, branch=None)]
@property
def _active_records(self):
"""
Yield all active (CourseLocator, BulkWriteRecord) tuples.
"""
for course_key, record in getattr(self._active_bulk_writes, 'records', {}).iteritems():
if record.active:
yield (course_key, record)
def _clear_bulk_write_record(self, course_key): def _clear_bulk_write_record(self, course_key):
if not isinstance(course_key, CourseLocator): if not isinstance(course_key, CourseLocator):
raise TypeError('{!r} is not a CourseLocator'.format(course_key)) raise TypeError('{!r} is not a CourseLocator'.format(course_key))
...@@ -194,11 +232,13 @@ class BulkWriteMixin(object): ...@@ -194,11 +232,13 @@ class BulkWriteMixin(object):
Begin a bulk write operation on course_key. Begin a bulk write operation on course_key.
""" """
bulk_write_record = self._get_bulk_write_record(course_key) bulk_write_record = self._get_bulk_write_record(course_key)
bulk_write_record.active_count += 1
if bulk_write_record.active_count > 1: # Increment the number of active bulk operations (bulk operations
return # on the same course can be nested)
bulk_write_record.nest()
# If this is the highest level bulk operation, then initialize it
if bulk_write_record.is_root:
bulk_write_record.initial_index = self.db_connection.get_course_index(course_key) bulk_write_record.initial_index = self.db_connection.get_course_index(course_key)
# Ensure that any edits to the index don't pollute the initial_index # Ensure that any edits to the index don't pollute the initial_index
bulk_write_record.index = copy.deepcopy(bulk_write_record.initial_index) bulk_write_record.index = copy.deepcopy(bulk_write_record.initial_index)
...@@ -209,23 +249,20 @@ class BulkWriteMixin(object): ...@@ -209,23 +249,20 @@ class BulkWriteMixin(object):
""" """
# If no bulk write is active, return # If no bulk write is active, return
bulk_write_record = self._get_bulk_write_record(course_key) bulk_write_record = self._get_bulk_write_record(course_key)
if bulk_write_record.active_count == 0: if not bulk_write_record.active:
return return
bulk_write_record.active_count -= 1 bulk_write_record.unnest()
# If more than one nested bulk write is active, decrement and continue # If this wasn't the outermost context, then don't close out the
if bulk_write_record.active_count > 0: # bulk write operation.
if bulk_write_record.active:
return return
# If this is the last active bulk write, and the content is dirty, # This is the last active bulk write. If the content is dirty,
# then mark it as inactive, and update the database # then update the database
if bulk_write_record.dirty_branches: for _id in bulk_write_record.structures.viewkeys() - bulk_write_record.structures_in_db:
for branch in bulk_write_record.dirty_branches: self.db_connection.upsert_structure(bulk_write_record.structures[_id])
try:
self.db_connection.insert_structure(bulk_write_record.get_structure(branch))
except DuplicateKeyError:
pass # The structure already exists, so we don't have to write it out again
if bulk_write_record.index is not None and bulk_write_record.index != bulk_write_record.initial_index: if bulk_write_record.index is not None and bulk_write_record.index != bulk_write_record.initial_index:
if bulk_write_record.initial_index is None: if bulk_write_record.initial_index is None:
...@@ -239,7 +276,7 @@ class BulkWriteMixin(object): ...@@ -239,7 +276,7 @@ class BulkWriteMixin(object):
""" """
Return whether a bulk write is active on `course_key`. Return whether a bulk write is active on `course_key`.
""" """
return self._get_bulk_write_record(course_key, ignore_case).active_count > 0 return self._get_bulk_write_record(course_key, ignore_case).active
def get_course_index(self, course_key, ignore_case=False): def get_course_index(self, course_key, ignore_case=False):
""" """
...@@ -252,7 +289,7 @@ class BulkWriteMixin(object): ...@@ -252,7 +289,7 @@ class BulkWriteMixin(object):
def insert_course_index(self, course_key, index_entry): def insert_course_index(self, course_key, index_entry):
bulk_write_record = self._get_bulk_write_record(course_key) bulk_write_record = self._get_bulk_write_record(course_key)
if bulk_write_record.active_count > 0: if bulk_write_record.active:
bulk_write_record.index = index_entry bulk_write_record.index = index_entry
else: else:
self.db_connection.insert_course_index(index_entry) self.db_connection.insert_course_index(index_entry)
...@@ -266,21 +303,22 @@ class BulkWriteMixin(object): ...@@ -266,21 +303,22 @@ class BulkWriteMixin(object):
Does not return anything useful. Does not return anything useful.
""" """
bulk_write_record = self._get_bulk_write_record(course_key) bulk_write_record = self._get_bulk_write_record(course_key)
if bulk_write_record.active_count > 0: if bulk_write_record.active:
bulk_write_record.index = updated_index_entry bulk_write_record.index = updated_index_entry
else: else:
self.db_connection.update_course_index(updated_index_entry) self.db_connection.update_course_index(updated_index_entry)
def get_structure(self, course_key, version_guid): def get_structure(self, course_key, version_guid):
bulk_write_record = self._get_bulk_write_record(course_key) bulk_write_record = self._get_bulk_write_record(course_key)
if bulk_write_record.active_count > 0: if bulk_write_record.active:
structure = bulk_write_record.get_structure(course_key.branch) structure = bulk_write_record.structures.get(version_guid)
# The structure hasn't been loaded from the db yet, so load it # The structure hasn't been loaded from the db yet, so load it
if structure is None: if structure is None:
structure_id = bulk_write_record.index['versions'][course_key.branch] structure = self.db_connection.get_structure(version_guid)
structure = self.db_connection.get_structure(structure_id) bulk_write_record.structures[version_guid] = structure
bulk_write_record._structures[course_key.branch] = structure if structure is not None:
bulk_write_record.structures_in_db.add(version_guid)
return structure return structure
else: else:
...@@ -289,24 +327,26 @@ class BulkWriteMixin(object): ...@@ -289,24 +327,26 @@ class BulkWriteMixin(object):
return self.db_connection.get_structure(version_guid) return self.db_connection.get_structure(version_guid)
def update_structure(self, course_key, structure): def update_structure(self, course_key, structure):
"""
Update a course structure, respecting the current bulk operation status
(no data will be written to the database if a bulk operation is active.)
"""
self._clear_cache(structure['_id']) self._clear_cache(structure['_id'])
bulk_write_record = self._get_bulk_write_record(course_key) bulk_write_record = self._get_bulk_write_record(course_key)
if bulk_write_record.active_count > 0: if bulk_write_record.active:
bulk_write_record.set_structure(course_key.branch, structure) bulk_write_record.structures[structure['_id']] = structure
else: else:
self.db_connection.upsert_structure(structure) self.db_connection.upsert_structure(structure)
def version_structure(self, course_key, structure, user_id): def version_structure(self, course_key, structure, user_id):
""" """
Copy the structure and update the history info (edited_by, edited_on, previous_version) Copy the structure and update the history info (edited_by, edited_on, previous_version)
:param structure:
:param user_id:
""" """
bulk_write_record = self._get_bulk_write_record(course_key) bulk_write_record = self._get_bulk_write_record(course_key)
# If we have an active bulk write, and it's already been edited, then just use that structure # If we have an active bulk write, and it's already been edited, then just use that structure
if bulk_write_record.active_count > 0 and course_key.branch in bulk_write_record.dirty_branches: if bulk_write_record.active and course_key.branch in bulk_write_record.dirty_branches:
return bulk_write_record.get_structure(course_key.branch) return bulk_write_record.structure_for_branch(course_key.branch)
# Otherwise, make a new structure # Otherwise, make a new structure
new_structure = copy.deepcopy(structure) new_structure = copy.deepcopy(structure)
...@@ -317,11 +357,132 @@ class BulkWriteMixin(object): ...@@ -317,11 +357,132 @@ class BulkWriteMixin(object):
new_structure['schema_version'] = self.SCHEMA_VERSION new_structure['schema_version'] = self.SCHEMA_VERSION
# If we're in a bulk write, update the structure used there, and mark it as dirty # If we're in a bulk write, update the structure used there, and mark it as dirty
if bulk_write_record.active_count > 0: if bulk_write_record.active:
bulk_write_record.set_structure(course_key.branch, new_structure) bulk_write_record.set_structure_for_branch(course_key.branch, new_structure)
return new_structure return new_structure
def version_block(self, block_info, user_id, update_version):
"""
Update the block_info dictionary based on it having been edited
"""
if block_info['edit_info'].get('update_version') == update_version:
return
block_info['edit_info'] = {
'edited_on': datetime.datetime.now(UTC),
'edited_by': user_id,
'previous_version': block_info['edit_info']['update_version'],
'update_version': update_version,
}
def find_matching_course_indexes(self, branch=None, search_targets=None):
"""
Find the course_indexes which have the specified branch and search_targets.
"""
indexes = self.db_connection.find_matching_course_indexes(branch, search_targets)
for _, record in self._active_records:
if branch and branch not in record.index.get('versions', {}):
continue
if search_targets:
if any(
'search_targets' not in record.index or
field not in record.index['search_targets'] or
record.index['search_targets'][field] != value
for field, value in search_targets.iteritems()
):
continue
indexes.append(record.index)
return indexes
def find_structures_by_id(self, ids):
"""
Return all structures that specified in ``ids``.
If a structure with the same id is in both the cache and the database,
the cached version will be preferred.
Arguments:
ids (list): A list of structure ids
"""
structures = []
ids = set(ids)
for _, record in self._active_records:
for structure in record.structures.values():
structure_id = structure.get('_id')
if structure_id in ids:
ids.remove(structure_id)
structures.append(structure)
structures.extend(self.db_connection.find_structures_by_id(list(ids)))
return structures
def find_structures_derived_from(self, ids):
"""
Return all structures that were immediately derived from a structure listed in ``ids``.
Arguments:
ids (list): A list of structure ids
"""
found_structure_ids = set()
structures = []
for _, record in self._active_records:
for structure in record.structures.values():
if structure.get('previous_version') in ids:
structures.append(structure)
if '_id' in structure:
found_structure_ids.add(structure['_id'])
structures.extend(
structure
for structure in self.db_connection.find_structures_derived_from(ids)
if structure['_id'] not in found_structure_ids
)
return structures
def find_ancestor_structures(self, original_version, block_id):
"""
Find all structures that originated from ``original_version`` that contain ``block_id``.
Any structure found in the cache will be preferred to a structure with the same id from the database.
Arguments:
original_version (str or ObjectID): The id of a structure
block_id (str): The id of the block in question
"""
found_structure_ids = set()
structures = []
for _, record in self._active_records:
for structure in record.structures.values():
if 'original_version' not in structure:
continue
if structure['original_version'] != original_version:
continue
if block_id not in structure.get('blocks', {}):
continue
if 'update_version' not in structure['blocks'][block_id].get('edit_info', {}):
continue
structures.append(structure)
found_structure_ids.add(structure['_id'])
structures.extend(
structure
for structure in self.db_connection.find_ancestor_structures(original_version, block_id)
if structure['_id'] not in found_structure_ids
)
return structures
class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase): class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase):
""" """
...@@ -555,10 +716,7 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase): ...@@ -555,10 +716,7 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase):
:param branch: the branch for which to return courses. :param branch: the branch for which to return courses.
:param qualifiers: an optional dict restricting which elements should match :param qualifiers: an optional dict restricting which elements should match
''' '''
if qualifiers is None: matching_indexes = self.find_matching_course_indexes(branch)
qualifiers = {}
qualifiers.update({"versions.{}".format(branch): {"$exists": True}})
matching_indexes = self.db_connection.find_matching_course_indexes(qualifiers)
# collect ids and then query for those # collect ids and then query for those
version_guids = [] version_guids = []
...@@ -568,7 +726,7 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase): ...@@ -568,7 +726,7 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase):
version_guids.append(version_guid) version_guids.append(version_guid)
id_version_map[version_guid] = course_index id_version_map[version_guid] = course_index
matching_structures = self.db_connection.find_matching_structures({'_id': {'$in': version_guids}}) matching_structures = self.find_structures_by_id(version_guids)
# get the blocks for each course index (s/b the root) # get the blocks for each course index (s/b the root)
result = [] result = []
...@@ -834,15 +992,14 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase): ...@@ -834,15 +992,14 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase):
# TODO if depth is significant, it may make sense to get all that have the same original_version # TODO if depth is significant, it may make sense to get all that have the same original_version
# and reconstruct the subtree from version_guid # and reconstruct the subtree from version_guid
next_entries = self.db_connection.find_matching_structures({'previous_version': version_guid}) next_entries = self.find_structures_derived_from([version_guid])
# must only scan cursor's once # must only scan cursor's once
next_versions = [struct for struct in next_entries] next_versions = [struct for struct in next_entries]
result = {version_guid: [CourseLocator(version_guid=struct['_id']) for struct in next_versions]} result = {version_guid: [CourseLocator(version_guid=struct['_id']) for struct in next_versions]}
depth = 1 depth = 1
while depth < version_history_depth and len(next_versions) > 0: while depth < version_history_depth and len(next_versions) > 0:
depth += 1 depth += 1
next_entries = self.db_connection.find_matching_structures({'previous_version': next_entries = self.find_structures_derived_from([struct['_id'] for struct in next_versions])
{'$in': [struct['_id'] for struct in next_versions]}})
next_versions = [struct for struct in next_entries] next_versions = [struct for struct in next_entries]
for course_structure in next_versions: for course_structure in next_versions:
result.setdefault(course_structure['previous_version'], []).append( result.setdefault(course_structure['previous_version'], []).append(
...@@ -861,12 +1018,9 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase): ...@@ -861,12 +1018,9 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase):
# course_agnostic means we don't care if the head and version don't align, trust the version # course_agnostic means we don't care if the head and version don't align, trust the version
course_struct = self._lookup_course(block_locator.course_key.course_agnostic())['structure'] course_struct = self._lookup_course(block_locator.course_key.course_agnostic())['structure']
block_id = block_locator.block_id block_id = block_locator.block_id
update_version_field = 'blocks.{}.edit_info.update_version'.format(block_id) all_versions_with_block = self.find_ancestor_structures(
all_versions_with_block = self.db_connection.find_matching_structures( original_version=course_struct['original_version'],
{ block_id=block_id
'original_version': course_struct['original_version'],
update_version_field: {'$exists': True},
}
) )
# find (all) root versions and build map {previous: {successors}..} # find (all) root versions and build map {previous: {successors}..}
possible_roots = [] possible_roots = []
...@@ -1063,12 +1217,6 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase): ...@@ -1063,12 +1217,6 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase):
new_id = new_structure['_id'] new_id = new_structure['_id']
edit_info = {
'edited_on': datetime.datetime.now(UTC),
'edited_by': user_id,
'previous_version': None,
'update_version': new_id,
}
# generate usage id # generate usage id
if block_id is not None: if block_id is not None:
if encode_key_for_mongo(block_id) in new_structure['blocks']: if encode_key_for_mongo(block_id) in new_structure['blocks']:
...@@ -1081,12 +1229,13 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase): ...@@ -1081,12 +1229,13 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase):
block_fields = partitioned_fields.get(Scope.settings, {}) block_fields = partitioned_fields.get(Scope.settings, {})
if Scope.children in partitioned_fields: if Scope.children in partitioned_fields:
block_fields.update(partitioned_fields[Scope.children]) block_fields.update(partitioned_fields[Scope.children])
self._update_block_in_structure(new_structure, new_block_id, { self._update_block_in_structure(new_structure, new_block_id, self._new_block(
"category": block_type, user_id,
"definition": definition_locator.definition_id, block_type,
"fields": self._serialize_fields(block_type, block_fields), block_fields,
'edit_info': edit_info, definition_locator.definition_id,
}) new_id,
))
self.update_structure(course_key, new_structure) self.update_structure(course_key, new_structure)
...@@ -1145,12 +1294,7 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase): ...@@ -1145,12 +1294,7 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase):
if parent['edit_info']['update_version'] != new_structure['_id']: if parent['edit_info']['update_version'] != new_structure['_id']:
# if the parent hadn't been previously changed in this bulk transaction, indicate that it's # if the parent hadn't been previously changed in this bulk transaction, indicate that it's
# part of the bulk transaction # part of the bulk transaction
parent['edit_info'] = { self.version_block(parent, user_id, new_structure['_id'])
'edited_on': datetime.datetime.now(UTC),
'edited_by': user_id,
'previous_version': parent['edit_info']['update_version'],
'update_version': new_structure['_id'],
}
# db update # db update
self.update_structure(parent_usage_key.course_key, new_structure) self.update_structure(parent_usage_key.course_key, new_structure)
...@@ -1412,12 +1556,7 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase): ...@@ -1412,12 +1556,7 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase):
block_data["fields"] = settings block_data["fields"] = settings
new_id = new_structure['_id'] new_id = new_structure['_id']
block_data['edit_info'] = { self.version_block(block_data, user_id, new_id)
'edited_on': datetime.datetime.now(UTC),
'edited_by': user_id,
'previous_version': block_data['edit_info']['update_version'],
'update_version': new_id,
}
self.update_structure(course_key, new_structure) self.update_structure(course_key, new_structure)
# update the index entry if appropriate # update the index entry if appropriate
if index_entry is not None: if index_entry is not None:
...@@ -1567,18 +1706,22 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase): ...@@ -1567,18 +1706,22 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase):
block_fields['children'] = children block_fields['children'] = children
if is_updated: if is_updated:
previous_version = None if is_new else structure_blocks[encoded_block_id]['edit_info'].get('update_version') if is_new:
structure_blocks[encoded_block_id] = { block_info = self._new_block(
"category": xblock.category, user_id,
"definition": xblock.definition_locator.definition_id, xblock.category,
"fields": block_fields, block_fields,
'edit_info': { xblock.definition_locator.definition_id,
'previous_version': previous_version, new_id,
'update_version': new_id, raw=True
'edited_by': user_id, )
'edited_on': datetime.datetime.now(UTC) else:
} block_info = structure_blocks[encoded_block_id]
} block_info['fields'] = block_fields
block_info['definition'] = xblock.definition_locator.definition_id
self.version_block(block_info, user_id, new_id)
structure_blocks[encoded_block_id] = block_info
return is_updated return is_updated
...@@ -2188,8 +2331,8 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase): ...@@ -2188,8 +2331,8 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase):
Returns: list of branch-agnostic course_keys Returns: list of branch-agnostic course_keys
""" """
entries = self.db_connection.find_matching_course_indexes( entries = self.find_matching_course_indexes(
{'search_targets.{}'.format(field_name): field_value} search_targets={field_name: field_value}
) )
return [ return [
CourseLocator(entry['org'], entry['course'], entry['run']) # Branch agnostic CourseLocator(entry['org'], entry['course'], entry['run']) # Branch agnostic
......
...@@ -103,7 +103,7 @@ class DraftVersioningModuleStore(ModuleStoreDraftAndPublished, SplitMongoModuleS ...@@ -103,7 +103,7 @@ class DraftVersioningModuleStore(ModuleStoreDraftAndPublished, SplitMongoModuleS
def create_item( def create_item(
self, user_id, course_key, block_type, block_id=None, self, user_id, course_key, block_type, block_id=None,
definition_locator=None, fields=None, definition_locator=None, fields=None,
force=False, continue_version=False, skip_auto_publish=False, **kwargs force=False, skip_auto_publish=False, **kwargs
): ):
""" """
See :py:meth `ModuleStoreDraftAndPublished.create_item` See :py:meth `ModuleStoreDraftAndPublished.create_item`
...@@ -113,7 +113,7 @@ class DraftVersioningModuleStore(ModuleStoreDraftAndPublished, SplitMongoModuleS ...@@ -113,7 +113,7 @@ class DraftVersioningModuleStore(ModuleStoreDraftAndPublished, SplitMongoModuleS
item = super(DraftVersioningModuleStore, self).create_item( item = super(DraftVersioningModuleStore, self).create_item(
user_id, course_key, block_type, block_id=block_id, user_id, course_key, block_type, block_id=block_id,
definition_locator=definition_locator, fields=fields, definition_locator=definition_locator, fields=fields,
force=force, continue_version=continue_version, **kwargs force=force, **kwargs
) )
if not skip_auto_publish: if not skip_auto_publish:
self._auto_publish_no_children(item.location, item.location.category, user_id, **kwargs) self._auto_publish_no_children(item.location, item.location.category, user_id, **kwargs)
...@@ -121,13 +121,13 @@ class DraftVersioningModuleStore(ModuleStoreDraftAndPublished, SplitMongoModuleS ...@@ -121,13 +121,13 @@ class DraftVersioningModuleStore(ModuleStoreDraftAndPublished, SplitMongoModuleS
def create_child( def create_child(
self, user_id, parent_usage_key, block_type, block_id=None, self, user_id, parent_usage_key, block_type, block_id=None,
fields=None, continue_version=False, **kwargs fields=None, **kwargs
): ):
parent_usage_key = self._map_revision_to_branch(parent_usage_key) parent_usage_key = self._map_revision_to_branch(parent_usage_key)
with self.bulk_write_operations(parent_usage_key.course_key): with self.bulk_write_operations(parent_usage_key.course_key):
item = super(DraftVersioningModuleStore, self).create_child( item = super(DraftVersioningModuleStore, self).create_child(
user_id, parent_usage_key, block_type, block_id=block_id, user_id, parent_usage_key, block_type, block_id=block_id,
fields=fields, continue_version=continue_version, **kwargs fields=fields, **kwargs
) )
self._auto_publish_no_children(parent_usage_key, item.location.category, user_id, **kwargs) self._auto_publish_no_children(parent_usage_key, item.location.category, user_id, **kwargs)
return item return item
......
...@@ -16,6 +16,7 @@ class TestBulkWriteMixin(unittest.TestCase): ...@@ -16,6 +16,7 @@ class TestBulkWriteMixin(unittest.TestCase):
self.bulk.SCHEMA_VERSION = 1 self.bulk.SCHEMA_VERSION = 1
self.clear_cache = self.bulk._clear_cache = Mock(name='_clear_cache') self.clear_cache = self.bulk._clear_cache = Mock(name='_clear_cache')
self.conn = self.bulk.db_connection = MagicMock(name='db_connection', spec=MongoConnection) self.conn = self.bulk.db_connection = MagicMock(name='db_connection', spec=MongoConnection)
self.conn.get_course_index.return_value = {'initial': 'index'}
self.course_key = CourseLocator('org', 'course', 'run-a') self.course_key = CourseLocator('org', 'course', 'run-a')
self.course_key_b = CourseLocator('org', 'course', 'run-b') self.course_key_b = CourseLocator('org', 'course', 'run-b')
...@@ -112,7 +113,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -112,7 +113,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.bulk.update_structure(self.course_key, self.structure) self.bulk.update_structure(self.course_key, self.structure)
self.assertConnCalls() self.assertConnCalls()
self.bulk._end_bulk_write_operation(self.course_key) self.bulk._end_bulk_write_operation(self.course_key)
self.assertConnCalls(call.insert_structure(self.structure)) self.assertConnCalls(call.upsert_structure(self.structure))
def test_write_multiple_structures_on_close(self): def test_write_multiple_structures_on_close(self):
self.conn.get_course_index.return_value = None self.conn.get_course_index.return_value = None
...@@ -124,7 +125,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -124,7 +125,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.assertConnCalls() self.assertConnCalls()
self.bulk._end_bulk_write_operation(self.course_key) self.bulk._end_bulk_write_operation(self.course_key)
self.assertItemsEqual( self.assertItemsEqual(
[call.insert_structure(self.structure), call.insert_structure(other_structure)], [call.upsert_structure(self.structure), call.upsert_structure(other_structure)],
self.conn.mock_calls self.conn.mock_calls
) )
...@@ -134,10 +135,11 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -134,10 +135,11 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.bulk._begin_bulk_write_operation(self.course_key) self.bulk._begin_bulk_write_operation(self.course_key)
self.conn.reset_mock() self.conn.reset_mock()
self.bulk.update_structure(self.course_key, self.structure) self.bulk.update_structure(self.course_key, self.structure)
self.bulk.insert_course_index(self.course_key, {'versions': {self.course_key.branch: self.structure['_id']}})
self.assertConnCalls() self.assertConnCalls()
self.bulk._end_bulk_write_operation(self.course_key) self.bulk._end_bulk_write_operation(self.course_key)
self.assertConnCalls( self.assertConnCalls(
call.insert_structure(self.structure), call.upsert_structure(self.structure),
call.update_course_index( call.update_course_index(
{'versions': {self.course_key.branch: self.structure['_id']}}, {'versions': {self.course_key.branch: self.structure['_id']}},
from_index=original_index from_index=original_index
...@@ -152,12 +154,12 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -152,12 +154,12 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.bulk.update_structure(self.course_key.replace(branch='a'), self.structure) self.bulk.update_structure(self.course_key.replace(branch='a'), self.structure)
other_structure = {'another': 'structure', '_id': ObjectId()} other_structure = {'another': 'structure', '_id': ObjectId()}
self.bulk.update_structure(self.course_key.replace(branch='b'), other_structure) self.bulk.update_structure(self.course_key.replace(branch='b'), other_structure)
self.assertConnCalls() self.bulk.insert_course_index(self.course_key, {'versions': {'a': self.structure['_id'], 'b': other_structure['_id']}})
self.bulk._end_bulk_write_operation(self.course_key) self.bulk._end_bulk_write_operation(self.course_key)
self.assertItemsEqual( self.assertItemsEqual(
[ [
call.insert_structure(self.structure), call.upsert_structure(self.structure),
call.insert_structure(other_structure), call.upsert_structure(other_structure),
call.update_course_index( call.update_course_index(
{'versions': {'a': self.structure['_id'], 'b': other_structure['_id']}}, {'versions': {'a': self.structure['_id'], 'b': other_structure['_id']}},
from_index=original_index from_index=original_index
...@@ -180,6 +182,225 @@ class TestBulkWriteMixinClosedAfterPrevTransaction(TestBulkWriteMixinClosed, Tes ...@@ -180,6 +182,225 @@ class TestBulkWriteMixinClosedAfterPrevTransaction(TestBulkWriteMixinClosed, Tes
@ddt.ddt @ddt.ddt
class TestBulkWriteMixinFindMethods(TestBulkWriteMixin):
"""
Tests of BulkWriteMixin methods for finding many structures or indexes
"""
def test_no_bulk_find_matching_course_indexes(self):
branch = Mock(name='branch')
search_targets = MagicMock(name='search_targets')
self.conn.find_matching_course_indexes.return_value = [Mock(name='result')]
result = self.bulk.find_matching_course_indexes(branch, search_targets)
self.assertConnCalls(call.find_matching_course_indexes(branch, search_targets))
self.assertEqual(result, self.conn.find_matching_course_indexes.return_value)
self.assertCacheNotCleared()
@ddt.data(
(None, None, [], []),
(
'draft',
None,
[{'versions': {'draft': '123'}}],
[
{'versions': {'published': '123'}},
{}
],
),
(
'draft',
{'f1': 'v1'},
[{'versions': {'draft': '123'}, 'search_targets': {'f1': 'v1'}}],
[
{'versions': {'draft': '123'}, 'search_targets': {'f1': 'value2'}},
{'versions': {'published': '123'}, 'search_targets': {'f1': 'v1'}},
{'search_targets': {'f1': 'v1'}},
{'versions': {'draft': '123'}},
],
),
(
None,
{'f1': 'v1'},
[
{'versions': {'draft': '123'}, 'search_targets': {'f1': 'v1'}},
{'versions': {'published': '123'}, 'search_targets': {'f1': 'v1'}},
{'search_targets': {'f1': 'v1'}},
],
[
{'versions': {'draft': '123'}, 'search_targets': {'f1': 'v2'}},
{'versions': {'draft': '123'}, 'search_targets': {'f2': 'v1'}},
{'versions': {'draft': '123'}},
],
),
(
None,
{'f1': 'v1', 'f2': 2},
[
{'search_targets': {'f1': 'v1', 'f2': 2}},
{'search_targets': {'f1': 'v1', 'f2': 2}},
],
[
{'versions': {'draft': '123'}, 'search_targets': {'f1': 'v1'}},
{'search_targets': {'f1': 'v1'}},
{'versions': {'draft': '123'}, 'search_targets': {'f1': 'v2'}},
{'versions': {'draft': '123'}},
],
),
)
@ddt.unpack
def test_find_matching_course_indexes(self, branch, search_targets, matching, unmatching):
db_indexes = [Mock(name='from_db')]
for n, index in enumerate(matching + unmatching):
course_key = CourseLocator('org', 'course', 'run{}'.format(n))
self.bulk._begin_bulk_write_operation(course_key)
self.bulk.insert_course_index(course_key, index)
expected = matching + db_indexes
self.conn.find_matching_course_indexes.return_value = db_indexes
result = self.bulk.find_matching_course_indexes(branch, search_targets)
self.assertItemsEqual(result, expected)
for item in unmatching:
self.assertNotIn(item, result)
def test_no_bulk_find_structures_by_id(self):
ids = [Mock(name='id')]
self.conn.find_structures_by_id.return_value = [MagicMock(name='result')]
result = self.bulk.find_structures_by_id(ids)
self.assertConnCalls(call.find_structures_by_id(ids))
self.assertEqual(result, self.conn.find_structures_by_id.return_value)
self.assertCacheNotCleared()
@ddt.data(
([], [], []),
([1, 2, 3], [1, 2], [1, 2]),
([1, 2, 3], [1], [1, 2]),
([1, 2, 3], [], [1, 2]),
)
@ddt.unpack
def test_find_structures_by_id(self, search_ids, active_ids, db_ids):
db_structure = lambda _id: {'db': 'structure', '_id': _id}
active_structure = lambda _id: {'active': 'structure', '_id': _id}
db_structures = [db_structure(_id) for _id in db_ids if _id not in active_ids]
for n, _id in enumerate(active_ids):
course_key = CourseLocator('org', 'course', 'run{}'.format(n))
self.bulk._begin_bulk_write_operation(course_key)
self.bulk.update_structure(course_key, active_structure(_id))
self.conn.find_structures_by_id.return_value = db_structures
results = self.bulk.find_structures_by_id(search_ids)
self.conn.find_structures_by_id.assert_called_once_with(list(set(search_ids) - set(active_ids)))
for _id in active_ids:
if _id in search_ids:
self.assertIn(active_structure(_id), results)
else:
self.assertNotIn(active_structure(_id), results)
for _id in db_ids:
if _id in search_ids and _id not in active_ids:
self.assertIn(db_structure(_id), results)
else:
self.assertNotIn(db_structure(_id), results)
def test_no_bulk_find_structures_derived_from(self):
ids = [Mock(name='id')]
self.conn.find_structures_derived_from.return_value = [MagicMock(name='result')]
result = self.bulk.find_structures_derived_from(ids)
self.assertConnCalls(call.find_structures_derived_from(ids))
self.assertEqual(result, self.conn.find_structures_derived_from.return_value)
self.assertCacheNotCleared()
@ddt.data(
# Test values are:
# - previous_versions to search for
# - documents in the cache with $previous_version.$_id
# - documents in the db with $previous_version.$_id
([], [], []),
(['1', '2', '3'], ['1.a', '1.b', '2.c'], ['1.a', '2.c']),
(['1', '2', '3'], ['1.a'], ['1.a', '2.c']),
(['1', '2', '3'], [], ['1.a', '2.c']),
(['1', '2', '3'], ['4.d'], ['1.a', '2.c']),
)
@ddt.unpack
def test_find_structures_derived_from(self, search_ids, active_ids, db_ids):
def db_structure(_id):
previous, _, current = _id.partition('.')
return {'db': 'structure', 'previous_version': previous, '_id': current}
def active_structure(_id):
previous, _, current = _id.partition('.')
return {'active': 'structure', 'previous_version': previous, '_id': current}
db_structures = [db_structure(_id) for _id in db_ids]
active_structures = []
for n, _id in enumerate(active_ids):
course_key = CourseLocator('org', 'course', 'run{}'.format(n))
self.bulk._begin_bulk_write_operation(course_key)
structure = active_structure(_id)
self.bulk.update_structure(course_key, structure)
active_structures.append(structure)
self.conn.find_structures_derived_from.return_value = db_structures
results = self.bulk.find_structures_derived_from(search_ids)
self.conn.find_structures_derived_from.assert_called_once_with(search_ids)
for structure in active_structures:
if structure['previous_version'] in search_ids:
self.assertIn(structure, results)
else:
self.assertNotIn(structure, results)
for structure in db_structures:
if (
structure['previous_version'] in search_ids and # We're searching for this document
not any(active.endswith(structure['_id']) for active in active_ids) # This document doesn't match any active _ids
):
self.assertIn(structure, results)
else:
self.assertNotIn(structure, results)
def test_no_bulk_find_ancestor_structures(self):
original_version = Mock(name='original_version')
block_id = Mock(name='block_id')
self.conn.find_ancestor_structures.return_value = [MagicMock(name='result')]
result = self.bulk.find_ancestor_structures(original_version, block_id)
self.assertConnCalls(call.find_ancestor_structures(original_version, block_id))
self.assertEqual(result, self.conn.find_ancestor_structures.return_value)
self.assertCacheNotCleared()
@ddt.data(
# Test values are:
# - original_version
# - block_id
# - matching documents in the cache
# - non-matching documents in the cache
# - expected documents returned from the db
# - unexpected documents returned from the db
('ov', 'bi', [{'original_version': 'ov', 'blocks': {'bi': {'edit_info': {'update_version': 'foo'}}}}], [], [], []),
('ov', 'bi', [{'original_version': 'ov', 'blocks': {'bi': {'edit_info': {'update_version': 'foo'}}}, '_id': 'foo'}], [], [], [{'_id': 'foo'}]),
('ov', 'bi', [], [{'blocks': {'bi': {'edit_info': {'update_version': 'foo'}}}}], [], []),
('ov', 'bi', [], [{'original_version': 'ov'}], [], []),
('ov', 'bi', [], [], [{'original_version': 'ov', 'blocks': {'bi': {'edit_info': {'update_version': 'foo'}}}}], []),
(
'ov',
'bi',
[{'original_version': 'ov', 'blocks': {'bi': {'edit_info': {'update_version': 'foo'}}}}],
[],
[{'original_version': 'ov', 'blocks': {'bi': {'edit_info': {'update_version': 'bar'}}}}],
[]
),
)
@ddt.unpack
def test_find_ancestor_structures(self, original_version, block_id, active_match, active_unmatch, db_match, db_unmatch):
for structure in active_match + active_unmatch + db_match + db_unmatch:
structure.setdefault('_id', ObjectId())
for n, structure in enumerate(active_match + active_unmatch):
course_key = CourseLocator('org', 'course', 'run{}'.format(n))
self.bulk._begin_bulk_write_operation(course_key)
self.bulk.update_structure(course_key, structure)
self.conn.find_ancestor_structures.return_value = db_match + db_unmatch
results = self.bulk.find_ancestor_structures(original_version, block_id)
self.conn.find_ancestor_structures.assert_called_once_with(original_version, block_id)
self.assertItemsEqual(active_match + db_match, results)
@ddt.ddt
class TestBulkWriteMixinOpen(TestBulkWriteMixin): class TestBulkWriteMixinOpen(TestBulkWriteMixin):
""" """
Tests of the bulk write mixin when bulk write operations are open Tests of the bulk write mixin when bulk write operations are open
...@@ -210,6 +431,7 @@ class TestBulkWriteMixinOpen(TestBulkWriteMixin): ...@@ -210,6 +431,7 @@ class TestBulkWriteMixinOpen(TestBulkWriteMixin):
@ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId()) @ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId())
def test_read_structure_after_write_no_db(self, version_guid): def test_read_structure_after_write_no_db(self, version_guid):
# Reading a structure that's already been written shouldn't hit the db at all # Reading a structure that's already been written shouldn't hit the db at all
self.structure['_id'] = version_guid
self.bulk.update_structure(self.course_key, self.structure) self.bulk.update_structure(self.course_key, self.structure)
result = self.bulk.get_structure(self.course_key, version_guid) result = self.bulk.get_structure(self.course_key, version_guid)
self.assertEquals(self.conn.get_structure.call_count, 0) self.assertEquals(self.conn.get_structure.call_count, 0)
...@@ -219,7 +441,8 @@ class TestBulkWriteMixinOpen(TestBulkWriteMixin): ...@@ -219,7 +441,8 @@ class TestBulkWriteMixinOpen(TestBulkWriteMixin):
def test_read_structure_after_write_after_read(self, version_guid): def test_read_structure_after_write_after_read(self, version_guid):
# Reading a structure that's been updated after being pulled from the db should # Reading a structure that's been updated after being pulled from the db should
# still get the updated value # still get the updated value
result = self.bulk.get_structure(self.course_key, version_guid) self.structure['_id'] = version_guid
self.bulk.get_structure(self.course_key, version_guid)
self.bulk.update_structure(self.course_key, self.structure) self.bulk.update_structure(self.course_key, self.structure)
result = self.bulk.get_structure(self.course_key, version_guid) result = self.bulk.get_structure(self.course_key, version_guid)
self.assertEquals(self.conn.get_structure.call_count, 1) self.assertEquals(self.conn.get_structure.call_count, 1)
...@@ -278,6 +501,23 @@ class TestBulkWriteMixinOpen(TestBulkWriteMixin): ...@@ -278,6 +501,23 @@ class TestBulkWriteMixinOpen(TestBulkWriteMixin):
self.structure['_id'] self.structure['_id']
) )
def test_copy_branch_versions(self):
# Directly updating an index so that the draft branch points to the published index
# version should work, and should only persist a single structure
self.maxDiff = None
published_structure = {'published': 'structure', '_id': ObjectId()}
self.bulk.update_structure(self.course_key, published_structure)
index = {'versions': {'published': published_structure['_id']}}
self.bulk.insert_course_index(self.course_key, index)
index_copy = copy.deepcopy(index)
index_copy['versions']['draft'] = index['versions']['published']
self.bulk.update_course_index(self.course_key, index_copy)
self.bulk._end_bulk_write_operation(self.course_key)
self.conn.upsert_structure.assert_called_once_with(published_structure)
self.conn.update_course_index.assert_called_once_with(index_copy, from_index=self.conn.get_course_index.return_value)
self.conn.get_course_index.assert_called_once_with(self.course_key)
class TestBulkWriteMixinOpenAfterPrevTransaction(TestBulkWriteMixinOpen, TestBulkWriteMixinPreviousTransaction): class TestBulkWriteMixinOpenAfterPrevTransaction(TestBulkWriteMixinOpen, TestBulkWriteMixinPreviousTransaction):
""" """
Test that operations on with an open transaction aren't affected by a previously executed transaction Test that operations on with an open transaction aren't affected by a previously executed transaction
......
...@@ -332,7 +332,8 @@ class TestTOC(ModuleStoreTestCase): ...@@ -332,7 +332,8 @@ class TestTOC(ModuleStoreTestCase):
self.toy_loc, self.request.user, self.toy_course, depth=2) self.toy_loc, self.request.user, self.toy_course, depth=2)
@ddt.data((ModuleStoreEnum.Type.mongo, 3, 0), (ModuleStoreEnum.Type.split, 7, 0)) # TODO: LMS-11220: Document why split find count is 21
@ddt.data((ModuleStoreEnum.Type.mongo, 3, 0), (ModuleStoreEnum.Type.split, 21, 0))
@ddt.unpack @ddt.unpack
def test_toc_toy_from_chapter(self, default_ms, num_finds, num_sends): def test_toc_toy_from_chapter(self, default_ms, num_finds, num_sends):
with self.store.default_store(default_ms): with self.store.default_store(default_ms):
...@@ -359,7 +360,8 @@ class TestTOC(ModuleStoreTestCase): ...@@ -359,7 +360,8 @@ class TestTOC(ModuleStoreTestCase):
for toc_section in expected: for toc_section in expected:
self.assertIn(toc_section, actual) self.assertIn(toc_section, actual)
@ddt.data((ModuleStoreEnum.Type.mongo, 3, 0), (ModuleStoreEnum.Type.split, 7, 0)) # TODO: LMS-11220: Document why split find count is 21
@ddt.data((ModuleStoreEnum.Type.mongo, 3, 0), (ModuleStoreEnum.Type.split, 21, 0))
@ddt.unpack @ddt.unpack
def test_toc_toy_from_section(self, default_ms, num_finds, num_sends): def test_toc_toy_from_section(self, default_ms, num_finds, num_sends):
with self.store.default_store(default_ms): with self.store.default_store(default_ms):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment