Commit 1a17b31a by Calen Pennington

Make definitions and structures truly append-only

parent ef23c19b
...@@ -4,6 +4,10 @@ Segregation of pymongo functions from the data modeling mechanisms for split mod ...@@ -4,6 +4,10 @@ Segregation of pymongo functions from the data modeling mechanisms for split mod
import re import re
import pymongo import pymongo
import time import time
# Import this just to export it
from pymongo.errors import DuplicateKeyError # pylint: disable=unused-import
from contracts import check from contracts import check
from functools import wraps from functools import wraps
from pymongo.errors import AutoReconnect from pymongo.errors import AutoReconnect
...@@ -182,11 +186,11 @@ class MongoConnection(object): ...@@ -182,11 +186,11 @@ class MongoConnection(object):
} }
})] })]
def upsert_structure(self, structure): def insert_structure(self, structure):
""" """
Update the db record for structure, creating that record if it doesn't already exist Insert a new structure into the database.
""" """
self.structures.update({'_id': structure['_id']}, structure_to_mongo(structure), upsert=True) self.structures.insert(structure_to_mongo(structure))
@autoretry_read() @autoretry_read()
def get_course_index(self, key, ignore_case=False): def get_course_index(self, key, ignore_case=False):
...@@ -274,11 +278,11 @@ class MongoConnection(object): ...@@ -274,11 +278,11 @@ class MongoConnection(object):
""" """
return self.definitions.find({'$in': {'_id': definitions}}) return self.definitions.find({'$in': {'_id': definitions}})
def upsert_definition(self, definition): def insert_definition(self, definition):
""" """
Create the definition in the db Create the definition in the db
""" """
self.definitions.update({'_id': definition['_id']}, definition, upsert=True) self.definitions.insert(definition)
def ensure_indexes(self): def ensure_indexes(self):
""" """
......
...@@ -75,7 +75,7 @@ from xmodule.modulestore import ( ...@@ -75,7 +75,7 @@ from xmodule.modulestore import (
from ..exceptions import ItemNotFoundError from ..exceptions import ItemNotFoundError
from .caching_descriptor_system import CachingDescriptorSystem from .caching_descriptor_system import CachingDescriptorSystem
from xmodule.modulestore.split_mongo.mongo_connection import MongoConnection from xmodule.modulestore.split_mongo.mongo_connection import MongoConnection, DuplicateKeyError
from xmodule.modulestore.split_mongo import BlockKey, CourseEnvelope from xmodule.modulestore.split_mongo import BlockKey, CourseEnvelope
from xmodule.error_module import ErrorDescriptor from xmodule.error_module import ErrorDescriptor
from collections import defaultdict from collections import defaultdict
...@@ -226,10 +226,22 @@ class SplitBulkWriteMixin(BulkOperationsMixin): ...@@ -226,10 +226,22 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
""" """
# If the content is dirty, then update the database # If the content is dirty, then update the database
for _id in bulk_write_record.structures.viewkeys() - bulk_write_record.structures_in_db: for _id in bulk_write_record.structures.viewkeys() - bulk_write_record.structures_in_db:
self.db_connection.upsert_structure(bulk_write_record.structures[_id]) try:
self.db_connection.insert_structure(bulk_write_record.structures[_id])
except DuplicateKeyError:
# We may not have looked up this structure inside this bulk operation, and thus
# didn't realize that it was already in the database. That's OK, the store is
# append only, so if it's already been written, we can just keep going.
log.debug("Attempted to insert duplicate structure %s", _id)
for _id in bulk_write_record.definitions.viewkeys() - bulk_write_record.definitions_in_db: for _id in bulk_write_record.definitions.viewkeys() - bulk_write_record.definitions_in_db:
self.db_connection.upsert_definition(bulk_write_record.definitions[_id]) try:
self.db_connection.insert_definition(bulk_write_record.definitions[_id])
except DuplicateKeyError:
# We may not have looked up this definition inside this bulk operation, and thus
# didn't realize that it was already in the database. That's OK, the store is
# append only, so if it's already been written, we can just keep going.
log.debug("Attempted to insert duplicate definition %s", _id)
if bulk_write_record.index is not None and bulk_write_record.index != bulk_write_record.initial_index: if bulk_write_record.index is not None and bulk_write_record.index != bulk_write_record.initial_index:
if bulk_write_record.initial_index is None: if bulk_write_record.initial_index is None:
...@@ -295,7 +307,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): ...@@ -295,7 +307,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
if bulk_write_record.active: if bulk_write_record.active:
bulk_write_record.structures[structure['_id']] = structure bulk_write_record.structures[structure['_id']] = structure
else: else:
self.db_connection.upsert_structure(structure) self.db_connection.insert_structure(structure)
def get_definition(self, course_key, definition_guid): def get_definition(self, course_key, definition_guid):
""" """
...@@ -323,7 +335,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): ...@@ -323,7 +335,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
definition_guid = course_key.as_object_id(definition_guid) definition_guid = course_key.as_object_id(definition_guid)
return self.db_connection.get_definition(definition_guid) return self.db_connection.get_definition(definition_guid)
def get_definitions(self, ids): def get_definitions(self, course_key, ids):
""" """
Return all definitions that specified in ``ids``. Return all definitions that specified in ``ids``.
...@@ -331,13 +343,16 @@ class SplitBulkWriteMixin(BulkOperationsMixin): ...@@ -331,13 +343,16 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
the cached version will be preferred. the cached version will be preferred.
Arguments: Arguments:
course_key (:class:`.CourseKey`): The course that these definitions are being loaded
for (to respect bulk operations).
ids (list): A list of definition ids ids (list): A list of definition ids
""" """
definitions = [] definitions = []
ids = set(ids) ids = set(ids)
for _, record in self._active_records: bulk_write_record = self._get_bulk_ops_record(course_key)
for definition in record.definitions.values(): if bulk_write_record.active:
for definition in bulk_write_record.definitions.values():
definition_id = definition.get('_id') definition_id = definition.get('_id')
if definition_id in ids: if definition_id in ids:
ids.remove(definition_id) ids.remove(definition_id)
...@@ -356,7 +371,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): ...@@ -356,7 +371,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
if bulk_write_record.active: if bulk_write_record.active:
bulk_write_record.definitions[definition['_id']] = definition bulk_write_record.definitions[definition['_id']] = definition
else: else:
self.db_connection.upsert_definition(definition) self.db_connection.insert_definition(definition)
def version_structure(self, course_key, structure, user_id): def version_structure(self, course_key, structure, user_id):
""" """
...@@ -596,10 +611,13 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase): ...@@ -596,10 +611,13 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
if not lazy: if not lazy:
# Load all descendants by id # Load all descendants by id
descendent_definitions = self.get_definitions([ descendent_definitions = self.get_definitions(
course_key,
[
block['definition'] block['definition']
for block in new_module_data.itervalues() for block in new_module_data.itervalues()
]) ]
)
# turn into a map # turn into a map
definitions = {definition['_id']: definition definitions = {definition['_id']: definition
for definition in descendent_definitions} for definition in descendent_definitions}
...@@ -1163,16 +1181,17 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase): ...@@ -1163,16 +1181,17 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
new_def_data = self._serialize_fields(old_definition['block_type'], new_def_data) new_def_data = self._serialize_fields(old_definition['block_type'], new_def_data)
if needs_saved(): if needs_saved():
# new id to create new version # Do a deep copy so that we don't corrupt the cached version of the definition
old_definition['_id'] = ObjectId() new_definition = copy.deepcopy(old_definition)
old_definition['fields'] = new_def_data new_definition['_id'] = ObjectId()
old_definition['edit_info']['edited_by'] = user_id new_definition['fields'] = new_def_data
old_definition['edit_info']['edited_on'] = datetime.datetime.now(UTC) new_definition['edit_info']['edited_by'] = user_id
new_definition['edit_info']['edited_on'] = datetime.datetime.now(UTC)
# previous version id # previous version id
old_definition['edit_info']['previous_version'] = definition_locator.definition_id new_definition['edit_info']['previous_version'] = definition_locator.definition_id
old_definition['schema_version'] = self.SCHEMA_VERSION new_definition['schema_version'] = self.SCHEMA_VERSION
self.update_definition(course_key, old_definition) self.update_definition(course_key, new_definition)
return DefinitionLocator(old_definition['block_type'], old_definition['_id']), True return DefinitionLocator(new_definition['block_type'], new_definition['_id']), True
else: else:
return definition_locator, False return definition_locator, False
...@@ -1482,7 +1501,7 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase): ...@@ -1482,7 +1501,7 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
if block_fields is not None: if block_fields is not None:
root_block['fields'].update(self._serialize_fields(root_category, block_fields)) root_block['fields'].update(self._serialize_fields(root_category, block_fields))
if definition_fields is not None: if definition_fields is not None:
definition = self.get_definition(locator, root_block['definition']) definition = copy.deepcopy(self.get_definition(locator, root_block['definition']))
definition['fields'].update(definition_fields) definition['fields'].update(definition_fields)
definition['edit_info']['previous_version'] = definition['_id'] definition['edit_info']['previous_version'] = definition['_id']
definition['edit_info']['edited_by'] = user_id definition['edit_info']['edited_by'] = user_id
...@@ -1839,8 +1858,9 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase): ...@@ -1839,8 +1858,9 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
""" """
# get the destination's index, and source and destination structures. # get the destination's index, and source and destination structures.
with self.bulk_operations(source_course): with self.bulk_operations(source_course):
with self.bulk_operations(destination_course):
source_structure = self._lookup_course(source_course).structure source_structure = self._lookup_course(source_course).structure
with self.bulk_operations(destination_course):
index_entry = self.get_course_index(destination_course) index_entry = self.get_course_index(destination_course)
if index_entry is None: if index_entry is None:
# brand new course # brand new course
......
...@@ -64,7 +64,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -64,7 +64,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
# call through to the db_connection. It should also clear the # call through to the db_connection. It should also clear the
# system cache # system cache
self.bulk.update_structure(self.course_key, self.structure) self.bulk.update_structure(self.course_key, self.structure)
self.assertConnCalls(call.upsert_structure(self.structure)) self.assertConnCalls(call.insert_structure(self.structure))
self.clear_cache.assert_called_once_with(self.structure['_id']) self.clear_cache.assert_called_once_with(self.structure['_id'])
@ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId()) @ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId())
...@@ -79,7 +79,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -79,7 +79,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
# Writing a definition when no bulk operation is active should just # Writing a definition when no bulk operation is active should just
# call through to the db_connection. # call through to the db_connection.
self.bulk.update_definition(self.course_key, self.definition) self.bulk.update_definition(self.course_key, self.definition)
self.assertConnCalls(call.upsert_definition(self.definition)) self.assertConnCalls(call.insert_definition(self.definition))
@ddt.data(True, False) @ddt.data(True, False)
def test_no_bulk_read_index(self, ignore_case): def test_no_bulk_read_index(self, ignore_case):
...@@ -128,7 +128,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -128,7 +128,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.bulk.update_structure(self.course_key, self.structure) self.bulk.update_structure(self.course_key, self.structure)
self.assertConnCalls() self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key) self.bulk._end_bulk_operation(self.course_key)
self.assertConnCalls(call.upsert_structure(self.structure)) self.assertConnCalls(call.insert_structure(self.structure))
def test_write_multiple_structures_on_close(self): def test_write_multiple_structures_on_close(self):
self.conn.get_course_index.return_value = None self.conn.get_course_index.return_value = None
...@@ -140,7 +140,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -140,7 +140,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.assertConnCalls() self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key) self.bulk._end_bulk_operation(self.course_key)
self.assertItemsEqual( self.assertItemsEqual(
[call.upsert_structure(self.structure), call.upsert_structure(other_structure)], [call.insert_structure(self.structure), call.insert_structure(other_structure)],
self.conn.mock_calls self.conn.mock_calls
) )
...@@ -154,7 +154,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -154,7 +154,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.assertConnCalls() self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key) self.bulk._end_bulk_operation(self.course_key)
self.assertConnCalls( self.assertConnCalls(
call.upsert_definition(self.definition), call.insert_definition(self.definition),
call.update_course_index( call.update_course_index(
{'versions': {self.course_key.branch: self.definition['_id']}}, {'versions': {self.course_key.branch: self.definition['_id']}},
from_index=original_index from_index=original_index
...@@ -173,8 +173,8 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -173,8 +173,8 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.bulk._end_bulk_operation(self.course_key) self.bulk._end_bulk_operation(self.course_key)
self.assertItemsEqual( self.assertItemsEqual(
[ [
call.upsert_definition(self.definition), call.insert_definition(self.definition),
call.upsert_definition(other_definition), call.insert_definition(other_definition),
call.update_course_index( call.update_course_index(
{'versions': {'a': self.definition['_id'], 'b': other_definition['_id']}}, {'versions': {'a': self.definition['_id'], 'b': other_definition['_id']}},
from_index=original_index from_index=original_index
...@@ -190,7 +190,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -190,7 +190,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.bulk.update_definition(self.course_key, self.definition) self.bulk.update_definition(self.course_key, self.definition)
self.assertConnCalls() self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key) self.bulk._end_bulk_operation(self.course_key)
self.assertConnCalls(call.upsert_definition(self.definition)) self.assertConnCalls(call.insert_definition(self.definition))
def test_write_multiple_definitions_on_close(self): def test_write_multiple_definitions_on_close(self):
self.conn.get_course_index.return_value = None self.conn.get_course_index.return_value = None
...@@ -202,7 +202,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -202,7 +202,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.assertConnCalls() self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key) self.bulk._end_bulk_operation(self.course_key)
self.assertItemsEqual( self.assertItemsEqual(
[call.upsert_definition(self.definition), call.upsert_definition(other_definition)], [call.insert_definition(self.definition), call.insert_definition(other_definition)],
self.conn.mock_calls self.conn.mock_calls
) )
...@@ -216,7 +216,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -216,7 +216,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.assertConnCalls() self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key) self.bulk._end_bulk_operation(self.course_key)
self.assertConnCalls( self.assertConnCalls(
call.upsert_structure(self.structure), call.insert_structure(self.structure),
call.update_course_index( call.update_course_index(
{'versions': {self.course_key.branch: self.structure['_id']}}, {'versions': {self.course_key.branch: self.structure['_id']}},
from_index=original_index from_index=original_index
...@@ -235,8 +235,8 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): ...@@ -235,8 +235,8 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.bulk._end_bulk_operation(self.course_key) self.bulk._end_bulk_operation(self.course_key)
self.assertItemsEqual( self.assertItemsEqual(
[ [
call.upsert_structure(self.structure), call.insert_structure(self.structure),
call.upsert_structure(other_structure), call.insert_structure(other_structure),
call.update_course_index( call.update_course_index(
{'versions': {'a': self.structure['_id'], 'b': other_structure['_id']}}, {'versions': {'a': self.structure['_id'], 'b': other_structure['_id']}},
from_index=original_index from_index=original_index
...@@ -397,13 +397,12 @@ class TestBulkWriteMixinFindMethods(TestBulkWriteMixin): ...@@ -397,13 +397,12 @@ class TestBulkWriteMixinFindMethods(TestBulkWriteMixin):
active_definition = lambda _id: {'active': 'definition', '_id': _id} active_definition = lambda _id: {'active': 'definition', '_id': _id}
db_definitions = [db_definition(_id) for _id in db_ids if _id not in active_ids] db_definitions = [db_definition(_id) for _id in db_ids if _id not in active_ids]
self.bulk._begin_bulk_operation(self.course_key)
for n, _id in enumerate(active_ids): for n, _id in enumerate(active_ids):
course_key = CourseLocator('org', 'course', 'run{}'.format(n)) self.bulk.update_definition(self.course_key, active_definition(_id))
self.bulk._begin_bulk_operation(course_key)
self.bulk.update_definition(course_key, active_definition(_id))
self.conn.get_definitions.return_value = db_definitions self.conn.get_definitions.return_value = db_definitions
results = self.bulk.get_definitions(search_ids) results = self.bulk.get_definitions(self.course_key, search_ids)
self.conn.get_definitions.assert_called_once_with(list(set(search_ids) - set(active_ids))) self.conn.get_definitions.assert_called_once_with(list(set(search_ids) - set(active_ids)))
for _id in active_ids: for _id in active_ids:
if _id in search_ids: if _id in search_ids:
...@@ -669,7 +668,7 @@ class TestBulkWriteMixinOpen(TestBulkWriteMixin): ...@@ -669,7 +668,7 @@ class TestBulkWriteMixinOpen(TestBulkWriteMixin):
index_copy['versions']['draft'] = index['versions']['published'] index_copy['versions']['draft'] = index['versions']['published']
self.bulk.update_course_index(self.course_key, index_copy) self.bulk.update_course_index(self.course_key, index_copy)
self.bulk._end_bulk_operation(self.course_key) self.bulk._end_bulk_operation(self.course_key)
self.conn.upsert_structure.assert_called_once_with(published_structure) self.conn.insert_structure.assert_called_once_with(published_structure)
self.conn.update_course_index.assert_called_once_with(index_copy, from_index=self.conn.get_course_index.return_value) self.conn.update_course_index.assert_called_once_with(index_copy, from_index=self.conn.get_course_index.return_value)
self.conn.get_course_index.assert_called_once_with(self.course_key) self.conn.get_course_index.assert_called_once_with(self.course_key)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment