Commit 7c0709ce by Calen Pennington

Merge pull request #8322 from cpennington/document-read-metrics

Add metrics around all split-mongo database operations measuring coun…
parents 8f016414 94185e9c
"""
Segregation of pymongo functions from the data modeling mechanisms for split modulestore.
"""
import re
from mongodb_proxy import autoretry_read, MongoProxy
import datetime
import math
import pymongo
import pytz
import re
from contextlib import contextmanager
from time import time
# Import this just to export it
from pymongo.errors import DuplicateKeyError # pylint: disable=unused-import
from contracts import check, new_contract
from mongodb_proxy import autoretry_read, MongoProxy
from xmodule.exceptions import HeartbeatFailure
from xmodule.modulestore import BlockData
from xmodule.modulestore.split_mongo import BlockKey
import datetime
import pytz
import dogstats_wrapper as dog_stats_api
new_contract('BlockData', BlockData)
def structure_from_mongo(structure):
def round_power_2(value):
"""
Return value rounded up to the nearest power of 2.
"""
if value == 0:
return 0
return math.pow(2, math.ceil(math.log(value, 2)))
class Tagger(object):
"""
An object used by :class:`QueryTimer` to allow timed code blocks
to add measurements and tags to the timer.
"""
def __init__(self):
self.added_tags = []
self.measures = []
def measure(self, name, size):
"""
Record a measurement of the timed data. This would be something to
indicate the size of the value being timed.
Arguments:
name: The name of the measurement.
size (float): The size of the measurement.
"""
self.measures.append((name, size))
def tag(self, **kwargs):
"""
Add tags to the timer.
Arguments:
**kwargs: Each keyword is treated as a tag name, and the
value of the argument is the tag value.
"""
self.added_tags.extend(kwargs.items())
@property
def tags(self):
"""
Return all tags for this (this includes any tags added with :meth:`tag`,
and also all of the added measurements, bucketed into powers of 2).
"""
return [
'{}:{}'.format(name, round_power_2(size))
for name, size in self.measures
] + [
'{}:{}'.format(name, value)
for name, value in self.added_tags
]
class QueryTimer(object):
"""
An object that allows timing a block of code while also recording measurements
about that code.
"""
def __init__(self, metric_base, sample_rate=1):
"""
Arguments:
metric_base: The prefix to be used for all queries captured
with this :class:`QueryTimer`.
"""
self._metric_base = metric_base
self._sample_rate = sample_rate
@contextmanager
def timer(self, metric_name, course_context):
"""
Contextmanager which acts as a timer for the metric ``metric_name``,
but which also yields a :class:`Tagger` object that allows the timed block
of code to add tags and quantity measurements. Tags are added verbatim to the
timer output. Measurements are recorded as histogram measurements in their own,
and also as bucketed tags on the timer measurement.
Arguments:
metric_name: The name used to aggregate all of these metrics.
course_context: The course which the query is being made for.
"""
tagger = Tagger()
metric_name = "{}.{}".format(self._metric_base, metric_name)
start = time()
try:
yield tagger
finally:
end = time()
tags = tagger.tags
tags.append('course:{}'.format(course_context))
for name, size in tagger.measures:
dog_stats_api.histogram(
'{}.{}'.format(metric_name, name),
size,
timestamp=end,
tags=[tag for tag in tags if not tag.startswith('{}:'.format(metric_name))],
sample_rate=self._sample_rate,
)
dog_stats_api.histogram(
'{}.duration'.format(metric_name),
end - start,
timestamp=end,
tags=tags,
sample_rate=self._sample_rate,
)
dog_stats_api.increment(
metric_name,
timestamp=end,
tags=tags,
sample_rate=self._sample_rate,
)
TIMER = QueryTimer(__name__, 0.001)
def structure_from_mongo(structure, course_context=None):
"""
Converts the 'blocks' key from a list [block_data] to a map
{BlockKey: block_data}.
Converts 'root' from [block_type, block_id] to BlockKey.
Converts 'blocks.*.fields.children' from [[block_type, block_id]] to [BlockKey].
N.B. Does not convert any other ReferenceFields (because we don't know which fields they are at this level).
Arguments:
structure: The document structure to convert
course_context (CourseKey): For metrics gathering, the CourseKey
for the course that this data is being processed for.
"""
check('seq[2]', structure['root'])
check('list(dict)', structure['blocks'])
for block in structure['blocks']:
if 'children' in block['fields']:
check('list(list[2])', block['fields']['children'])
with TIMER.timer('structure_from_mongo', course_context) as tagger:
tagger.measure('blocks', len(structure['blocks']))
check('seq[2]', structure['root'])
check('list(dict)', structure['blocks'])
for block in structure['blocks']:
if 'children' in block['fields']:
check('list(list[2])', block['fields']['children'])
structure['root'] = BlockKey(*structure['root'])
new_blocks = {}
for block in structure['blocks']:
if 'children' in block['fields']:
block['fields']['children'] = [BlockKey(*child) for child in block['fields']['children']]
new_blocks[BlockKey(block['block_type'], block.pop('block_id'))] = BlockData(**block)
structure['blocks'] = new_blocks
structure['root'] = BlockKey(*structure['root'])
new_blocks = {}
for block in structure['blocks']:
if 'children' in block['fields']:
block['fields']['children'] = [BlockKey(*child) for child in block['fields']['children']]
new_blocks[BlockKey(block['block_type'], block.pop('block_id'))] = BlockData(**block)
structure['blocks'] = new_blocks
return structure
return structure
def structure_to_mongo(structure):
def structure_to_mongo(structure, course_context=None):
"""
Converts the 'blocks' key from a map {BlockKey: block_data} to
a list [block_data], inserting BlockKey.type as 'block_type'
......@@ -52,22 +182,25 @@ def structure_to_mongo(structure):
Doesn't convert 'root', since namedtuple's can be inserted
directly into mongo.
"""
check('BlockKey', structure['root'])
check('dict(BlockKey: BlockData)', structure['blocks'])
for block in structure['blocks'].itervalues():
if 'children' in block.fields:
check('list(BlockKey)', block.fields['children'])
with TIMER.timer('structure_to_mongo', course_context) as tagger:
tagger.measure('blocks', len(structure['blocks']))
new_structure = dict(structure)
new_structure['blocks'] = []
check('BlockKey', structure['root'])
check('dict(BlockKey: BlockData)', structure['blocks'])
for block in structure['blocks'].itervalues():
if 'children' in block.fields:
check('list(BlockKey)', block.fields['children'])
for block_key, block in structure['blocks'].iteritems():
new_block = dict(block.to_storable())
new_block.setdefault('block_type', block_key.type)
new_block['block_id'] = block_key.id
new_structure['blocks'].append(new_block)
new_structure = dict(structure)
new_structure['blocks'] = []
return new_structure
for block_key, block in structure['blocks'].iteritems():
new_block = dict(block.to_storable())
new_block.setdefault('block_type', block_key.type)
new_block['block_id'] = block_key.id
new_structure['blocks'].append(new_block)
return new_structure
class MongoConnection(object):
......@@ -121,34 +254,54 @@ class MongoConnection(object):
else:
raise HeartbeatFailure("Can't connect to {}".format(self.database.name))
def get_structure(self, key):
def get_structure(self, key, course_context=None):
"""
Get the structure from the persistence mechanism whose id is the given key
"""
return structure_from_mongo(self.structures.find_one({'_id': key}))
with TIMER.timer("get_structure", course_context) as tagger_get_structure:
with TIMER.timer("get_structure.find_one", course_context) as tagger_find_one:
doc = self.structures.find_one({'_id': key})
tagger_find_one.measure("blocks", len(doc['blocks']))
tagger_get_structure.measure("blocks", len(doc['blocks']))
return structure_from_mongo(doc, course_context)
@autoretry_read()
def find_structures_by_id(self, ids):
def find_structures_by_id(self, ids, course_context=None):
"""
Return all structures that specified in ``ids``.
Arguments:
ids (list): A list of structure ids
"""
return [structure_from_mongo(structure) for structure in self.structures.find({'_id': {'$in': ids}})]
with TIMER.timer("find_structures_by_id", course_context) as tagger:
tagger.measure("requested_ids", len(ids))
docs = [
structure_from_mongo(structure, course_context)
for structure in self.structures.find({'_id': {'$in': ids}})
]
tagger.measure("structures", len(docs))
return docs
@autoretry_read()
def find_structures_derived_from(self, ids):
def find_structures_derived_from(self, ids, course_context=None):
"""
Return all structures that were immediately derived from a structure listed in ``ids``.
Arguments:
ids (list): A list of structure ids
"""
return [structure_from_mongo(structure) for structure in self.structures.find({'previous_version': {'$in': ids}})]
with TIMER.timer("find_structures_derived_from", course_context) as tagger:
tagger.measure("base_ids", len(ids))
docs = [
structure_from_mongo(structure, course_context)
for structure in self.structures.find({'previous_version': {'$in': ids}})
]
tagger.measure("structures", len(docs))
return docs
@autoretry_read()
def find_ancestor_structures(self, original_version, block_key):
def find_ancestor_structures(self, original_version, block_key, course_context=None):
"""
Find all structures that originated from ``original_version`` that contain ``block_key``.
......@@ -156,45 +309,51 @@ class MongoConnection(object):
original_version (str or ObjectID): The id of a structure
block_key (BlockKey): The id of the block in question
"""
return [
structure_from_mongo(structure)
for structure in self.structures.find({
'original_version': original_version,
'blocks': {
'$elemMatch': {
'block_id': block_key.id,
'block_type': block_key.type,
'edit_info.update_version': {
'$exists': True,
with TIMER.timer("find_ancestor_structures", course_context) as tagger:
docs = [
structure_from_mongo(structure, course_context)
for structure in self.structures.find({
'original_version': original_version,
'blocks': {
'$elemMatch': {
'block_id': block_key.id,
'block_type': block_key.type,
'edit_info.update_version': {
'$exists': True,
},
},
},
},
})
]
})
]
tagger.measure("structures", len(docs))
return docs
def insert_structure(self, structure):
def insert_structure(self, structure, course_context=None):
"""
Insert a new structure into the database.
"""
self.structures.insert(structure_to_mongo(structure))
with TIMER.timer("insert_structure", course_context) as tagger:
tagger.measure("blocks", len(structure["blocks"]))
self.structures.insert(structure_to_mongo(structure, course_context))
def get_course_index(self, key, ignore_case=False):
"""
Get the course_index from the persistence mechanism whose id is the given key
"""
if ignore_case:
query = {
key_attr: re.compile(u'^{}$'.format(re.escape(getattr(key, key_attr))), re.IGNORECASE)
for key_attr in ('org', 'course', 'run')
}
else:
query = {
key_attr: getattr(key, key_attr)
for key_attr in ('org', 'course', 'run')
}
return self.course_index.find_one(query)
def find_matching_course_indexes(self, branch=None, search_targets=None, org_target=None):
with TIMER.timer("get_course_index", key):
if ignore_case:
query = {
key_attr: re.compile(u'^{}$'.format(re.escape(getattr(key, key_attr))), re.IGNORECASE)
for key_attr in ('org', 'course', 'run')
}
else:
query = {
key_attr: getattr(key, key_attr)
for key_attr in ('org', 'course', 'run')
}
return self.course_index.find_one(query)
def find_matching_course_indexes(self, branch=None, search_targets=None, org_target=None, course_context=None):
"""
Find the course_index matching particular conditions.
......@@ -205,75 +364,89 @@ class MongoConnection(object):
org_target: If specified, this is an ORG filter so that only course_indexs are
returned for the specified ORG
"""
query = {}
if branch is not None:
query['versions.{}'.format(branch)] = {'$exists': True}
with TIMER.timer("find_matching_course_indexes", course_context):
query = {}
if branch is not None:
query['versions.{}'.format(branch)] = {'$exists': True}
if search_targets:
for key, value in search_targets.iteritems():
query['search_targets.{}'.format(key)] = value
if search_targets:
for key, value in search_targets.iteritems():
query['search_targets.{}'.format(key)] = value
if org_target:
query['org'] = org_target
if org_target:
query['org'] = org_target
return self.course_index.find(query)
return self.course_index.find(query)
def insert_course_index(self, course_index):
def insert_course_index(self, course_index, course_context=None):
"""
Create the course_index in the db
"""
course_index['last_update'] = datetime.datetime.now(pytz.utc)
self.course_index.insert(course_index)
with TIMER.timer("insert_course_index", course_context):
course_index['last_update'] = datetime.datetime.now(pytz.utc)
self.course_index.insert(course_index)
def update_course_index(self, course_index, from_index=None):
def update_course_index(self, course_index, from_index=None, course_context=None):
"""
Update the db record for course_index.
Arguments:
from_index: If set, only update an index if it matches the one specified in `from_index`.
"""
if from_index:
query = {"_id": from_index["_id"]}
# last_update not only tells us when this course was last updated but also helps
# prevent collisions
if 'last_update' in from_index:
query['last_update'] = from_index['last_update']
else:
query = {
'org': course_index['org'],
'course': course_index['course'],
'run': course_index['run'],
}
course_index['last_update'] = datetime.datetime.now(pytz.utc)
self.course_index.update(query, course_index, upsert=False,)
with TIMER.timer("update_course_index", course_context):
if from_index:
query = {"_id": from_index["_id"]}
# last_update not only tells us when this course was last updated but also helps
# prevent collisions
if 'last_update' in from_index:
query['last_update'] = from_index['last_update']
else:
query = {
'org': course_index['org'],
'course': course_index['course'],
'run': course_index['run'],
}
course_index['last_update'] = datetime.datetime.now(pytz.utc)
self.course_index.update(query, course_index, upsert=False,)
def delete_course_index(self, course_key):
"""
Delete the course_index from the persistence mechanism whose id is the given course_index
"""
query = {
key_attr: getattr(course_key, key_attr)
for key_attr in ('org', 'course', 'run')
}
return self.course_index.remove(query)
with TIMER.timer("delete_course_index", course_key):
query = {
key_attr: getattr(course_key, key_attr)
for key_attr in ('org', 'course', 'run')
}
return self.course_index.remove(query)
def get_definition(self, key):
def get_definition(self, key, course_context=None):
"""
Get the definition from the persistence mechanism whose id is the given key
"""
return self.definitions.find_one({'_id': key})
with TIMER.timer("get_definition", course_context) as tagger:
definition = self.definitions.find_one({'_id': key})
tagger.measure("fields", len(definition['fields']))
tagger.tag(block_type=definition['block_type'])
return definition
def get_definitions(self, definitions):
def get_definitions(self, definitions, course_context=None):
"""
Retrieve all definitions listed in `definitions`.
"""
return self.definitions.find({'_id': {'$in': definitions}})
with TIMER.timer("get_definitions", course_context) as tagger:
tagger.measure('definitions', len(definitions))
definitions = self.definitions.find({'_id': {'$in': definitions}})
return definitions
def insert_definition(self, definition):
def insert_definition(self, definition, course_context=None):
"""
Create the definition in the db
"""
self.definitions.insert(definition)
with TIMER.timer("insert_definition", course_context) as tagger:
tagger.measure('fields', len(definition['fields']))
tagger.tag(block_type=definition['block_type'])
self.definitions.insert(definition)
def ensure_indexes(self):
"""
......
......@@ -127,6 +127,7 @@ class SplitBulkWriteRecord(BulkOpsRecord):
self.modules = defaultdict(dict)
self.definitions = {}
self.definitions_in_db = set()
self.course_key = None
# TODO: This needs to track which branches have actually been modified/versioned,
# so that copying one branch to another doesn't update the original branch.
......@@ -228,6 +229,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
bulk_write_record.initial_index = self.db_connection.get_course_index(course_key)
# Ensure that any edits to the index don't pollute the initial_index
bulk_write_record.index = copy.deepcopy(bulk_write_record.initial_index)
bulk_write_record.course_key = course_key
def _end_outermost_bulk_operation(self, bulk_write_record, structure_key, emit_signals=True):
"""
......@@ -241,7 +243,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
dirty = True
try:
self.db_connection.insert_structure(bulk_write_record.structures[_id])
self.db_connection.insert_structure(bulk_write_record.structures[_id], bulk_write_record.course_key)
except DuplicateKeyError:
# We may not have looked up this structure inside this bulk operation, and thus
# didn't realize that it was already in the database. That's OK, the store is
......@@ -252,7 +254,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
dirty = True
try:
self.db_connection.insert_definition(bulk_write_record.definitions[_id])
self.db_connection.insert_definition(bulk_write_record.definitions[_id], bulk_write_record.course_key)
except DuplicateKeyError:
# We may not have looked up this definition inside this bulk operation, and thus
# didn't realize that it was already in the database. That's OK, the store is
......@@ -263,9 +265,13 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
dirty = True
if bulk_write_record.initial_index is None:
self.db_connection.insert_course_index(bulk_write_record.index)
self.db_connection.insert_course_index(bulk_write_record.index, bulk_write_record.course_key)
else:
self.db_connection.update_course_index(bulk_write_record.index, from_index=bulk_write_record.initial_index)
self.db_connection.update_course_index(
bulk_write_record.index,
from_index=bulk_write_record.initial_index,
course_context=bulk_write_record.course_key
)
if dirty and emit_signals:
self.send_bulk_published_signal(bulk_write_record, structure_key)
......@@ -294,7 +300,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
if bulk_write_record.active:
bulk_write_record.index = index_entry
else:
self.db_connection.insert_course_index(index_entry)
self.db_connection.insert_course_index(index_entry, course_key)
def update_course_index(self, course_key, updated_index_entry):
"""
......@@ -308,7 +314,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
if bulk_write_record.active:
bulk_write_record.index = updated_index_entry
else:
self.db_connection.update_course_index(updated_index_entry)
self.db_connection.update_course_index(updated_index_entry, course_key)
def get_structure(self, course_key, version_guid):
bulk_write_record = self._get_bulk_ops_record(course_key)
......@@ -317,7 +323,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
# The structure hasn't been loaded from the db yet, so load it
if structure is None:
structure = self.db_connection.get_structure(version_guid)
structure = self.db_connection.get_structure(version_guid, course_key)
bulk_write_record.structures[version_guid] = structure
if structure is not None:
bulk_write_record.structures_in_db.add(version_guid)
......@@ -326,7 +332,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
else:
# cast string to ObjectId if necessary
version_guid = course_key.as_object_id(version_guid)
return self.db_connection.get_structure(version_guid)
return self.db_connection.get_structure(version_guid, course_key)
def update_structure(self, course_key, structure):
"""
......@@ -338,7 +344,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
if bulk_write_record.active:
bulk_write_record.structures[structure['_id']] = structure
else:
self.db_connection.insert_structure(structure)
self.db_connection.insert_structure(structure, course_key)
def get_cached_block(self, course_key, version_guid, block_id):
"""
......@@ -387,7 +393,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
# The definition hasn't been loaded from the db yet, so load it
if definition is None:
definition = self.db_connection.get_definition(definition_guid)
definition = self.db_connection.get_definition(definition_guid, course_key)
bulk_write_record.definitions[definition_guid] = definition
if definition is not None:
bulk_write_record.definitions_in_db.add(definition_guid)
......@@ -396,7 +402,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
else:
# cast string to ObjectId if necessary
definition_guid = course_key.as_object_id(definition_guid)
return self.db_connection.get_definition(definition_guid)
return self.db_connection.get_definition(definition_guid, course_key)
def get_definitions(self, course_key, ids):
"""
......@@ -424,7 +430,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
if len(ids):
# Query the db for the definitions.
defs_from_db = self.db_connection.get_definitions(list(ids))
defs_from_db = self.db_connection.get_definitions(list(ids), course_key)
# Add the retrieved definitions to the cache.
bulk_write_record.definitions.update({d.get('_id'): d for d in defs_from_db})
definitions.extend(defs_from_db)
......@@ -439,7 +445,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
if bulk_write_record.active:
bulk_write_record.definitions[definition['_id']] = definition
else:
self.db_connection.insert_definition(definition)
self.db_connection.insert_definition(definition, course_key)
def version_structure(self, course_key, structure, user_id):
"""
......@@ -1206,7 +1212,7 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
'edited_on': course['edited_on']
}
def get_definition_history_info(self, definition_locator):
def get_definition_history_info(self, definition_locator, course_context=None):
"""
Because xblocks doesn't give a means to separate the definition's meta information from
the usage xblock's, this method will get that info for the definition
......@@ -1220,7 +1226,7 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
# The supplied locator is of the wrong type, so it can't possibly be stored in this modulestore.
raise ItemNotFoundError(definition_locator)
definition = self.db_connection.get_definition(definition_locator.definition_id)
definition = self.db_connection.get_definition(definition_locator.definition_id, course_context)
if definition is None:
return None
return definition['edit_info']
......
......@@ -55,7 +55,9 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
# Reading a structure when no bulk operation is active should just call
# through to the db_connection
result = self.bulk.get_structure(self.course_key, version_guid)
self.assertConnCalls(call.get_structure(self.course_key.as_object_id(version_guid)))
self.assertConnCalls(
call.get_structure(self.course_key.as_object_id(version_guid), self.course_key)
)
self.assertEqual(result, self.conn.get_structure.return_value)
self.assertCacheNotCleared()
......@@ -64,7 +66,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
# call through to the db_connection. It should also clear the
# system cache
self.bulk.update_structure(self.course_key, self.structure)
self.assertConnCalls(call.insert_structure(self.structure))
self.assertConnCalls(call.insert_structure(self.structure, self.course_key))
self.clear_cache.assert_called_once_with(self.structure['_id'])
@ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId())
......@@ -72,14 +74,19 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
# Reading a definition when no bulk operation is active should just call
# through to the db_connection
result = self.bulk.get_definition(self.course_key, version_guid)
self.assertConnCalls(call.get_definition(self.course_key.as_object_id(version_guid)))
self.assertConnCalls(
call.get_definition(
self.course_key.as_object_id(version_guid),
self.course_key
)
)
self.assertEqual(result, self.conn.get_definition.return_value)
def test_no_bulk_write_definition(self):
# Writing a definition when no bulk operation is active should just
# call through to the db_connection.
self.bulk.update_definition(self.course_key, self.definition)
self.assertConnCalls(call.insert_definition(self.definition))
self.assertConnCalls(call.insert_definition(self.definition, self.course_key))
@ddt.data(True, False)
def test_no_bulk_read_index(self, ignore_case):
......@@ -94,7 +101,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
# Writing a course index when no bulk operation is active should just call
# through to the db_connection
self.bulk.insert_course_index(self.course_key, self.index_entry)
self.assertConnCalls(call.insert_course_index(self.index_entry))
self.assertConnCalls(call.insert_course_index(self.index_entry, self.course_key))
self.assertCacheNotCleared()
def test_out_of_order_end(self):
......@@ -109,7 +116,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.bulk.insert_course_index(self.course_key, self.index_entry)
self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key)
self.conn.insert_course_index.assert_called_once_with(self.index_entry)
self.conn.insert_course_index.assert_called_once_with(self.index_entry, self.course_key)
def test_write_updated_index_on_close(self):
old_index = {'this': 'is', 'an': 'old index'}
......@@ -119,7 +126,11 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.bulk.insert_course_index(self.course_key, self.index_entry)
self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key)
self.conn.update_course_index.assert_called_once_with(self.index_entry, from_index=old_index)
self.conn.update_course_index.assert_called_once_with(
self.index_entry,
from_index=old_index,
course_context=self.course_key,
)
def test_write_structure_on_close(self):
self.conn.get_course_index.return_value = None
......@@ -128,7 +139,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.bulk.update_structure(self.course_key, self.structure)
self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key)
self.assertConnCalls(call.insert_structure(self.structure))
self.assertConnCalls(call.insert_structure(self.structure, self.course_key))
def test_write_multiple_structures_on_close(self):
self.conn.get_course_index.return_value = None
......@@ -140,7 +151,10 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key)
self.assertItemsEqual(
[call.insert_structure(self.structure), call.insert_structure(other_structure)],
[
call.insert_structure(self.structure, self.course_key),
call.insert_structure(other_structure, self.course_key)
],
self.conn.mock_calls
)
......@@ -154,10 +168,11 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key)
self.assertConnCalls(
call.insert_definition(self.definition),
call.insert_definition(self.definition, self.course_key),
call.update_course_index(
{'versions': {self.course_key.branch: self.definition['_id']}},
from_index=original_index
from_index=original_index,
course_context=self.course_key
)
)
......@@ -173,11 +188,12 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.bulk._end_bulk_operation(self.course_key)
self.assertItemsEqual(
[
call.insert_definition(self.definition),
call.insert_definition(other_definition),
call.insert_definition(self.definition, self.course_key),
call.insert_definition(other_definition, self.course_key),
call.update_course_index(
{'versions': {'a': self.definition['_id'], 'b': other_definition['_id']}},
from_index=original_index
from_index=original_index,
course_context=self.course_key,
)
],
self.conn.mock_calls
......@@ -190,7 +206,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.bulk.update_definition(self.course_key, self.definition)
self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key)
self.assertConnCalls(call.insert_definition(self.definition))
self.assertConnCalls(call.insert_definition(self.definition, self.course_key))
def test_write_multiple_definitions_on_close(self):
self.conn.get_course_index.return_value = None
......@@ -202,7 +218,10 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key)
self.assertItemsEqual(
[call.insert_definition(self.definition), call.insert_definition(other_definition)],
[
call.insert_definition(self.definition, self.course_key),
call.insert_definition(other_definition, self.course_key)
],
self.conn.mock_calls
)
......@@ -216,10 +235,11 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.assertConnCalls()
self.bulk._end_bulk_operation(self.course_key)
self.assertConnCalls(
call.insert_structure(self.structure),
call.insert_structure(self.structure, self.course_key),
call.update_course_index(
{'versions': {self.course_key.branch: self.structure['_id']}},
from_index=original_index
from_index=original_index,
course_context=self.course_key,
)
)
......@@ -235,11 +255,12 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
self.bulk._end_bulk_operation(self.course_key)
self.assertItemsEqual(
[
call.insert_structure(self.structure),
call.insert_structure(other_structure),
call.insert_structure(self.structure, self.course_key),
call.insert_structure(other_structure, self.course_key),
call.update_course_index(
{'versions': {'a': self.structure['_id'], 'b': other_structure['_id']}},
from_index=original_index
from_index=original_index,
course_context=self.course_key,
)
],
self.conn.mock_calls
......@@ -408,7 +429,7 @@ class TestBulkWriteMixinFindMethods(TestBulkWriteMixin):
results = self.bulk.get_definitions(self.course_key, search_ids)
definitions_gotten = list(set(search_ids) - set(active_ids))
if len(definitions_gotten) > 0:
self.conn.get_definitions.assert_called_once_with(definitions_gotten)
self.conn.get_definitions.assert_called_once_with(definitions_gotten, self.course_key)
else:
# If no definitions to get, then get_definitions() should *not* have been called.
self.assertEquals(self.conn.get_definitions.call_count, 0)
......@@ -677,8 +698,12 @@ class TestBulkWriteMixinOpen(TestBulkWriteMixin):
index_copy['versions']['draft'] = index['versions']['published']
self.bulk.update_course_index(self.course_key, index_copy)
self.bulk._end_bulk_operation(self.course_key)
self.conn.insert_structure.assert_called_once_with(published_structure)
self.conn.update_course_index.assert_called_once_with(index_copy, from_index=self.conn.get_course_index.return_value)
self.conn.insert_structure.assert_called_once_with(published_structure, self.course_key)
self.conn.update_course_index.assert_called_once_with(
index_copy,
from_index=self.conn.get_course_index.return_value,
course_context=self.course_key,
)
self.conn.get_course_index.assert_called_once_with(self.course_key)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment