Commit c11a9f05 by John Eskew

Merge pull request #6590 from edx/jeskew/optimize_split_asset_search

Optimize large amounts of asset metadata in the Split modulestore.

Other thumb from @dmitchell  in PR: https://github.com/edx/edx-platform/pull/6456
parents 6be35e0f 6de1137f
......@@ -9,6 +9,7 @@ import json
import datetime
from uuid import uuid4
from pytz import UTC
from collections import namedtuple, defaultdict
import collections
from contextlib import contextmanager
......@@ -36,7 +37,6 @@ log = logging.getLogger('edx.modulestore')
new_contract('CourseKey', CourseKey)
new_contract('AssetKey', AssetKey)
new_contract('AssetMetadata', AssetMetadata)
new_contract('SortedListWithKey', SortedListWithKey)
class ModuleStoreEnum(object):
......@@ -276,27 +276,65 @@ class BulkOperationsMixin(object):
return self._get_bulk_ops_record(course_key, ignore_case).active
class ModuleStoreAssetInterface(object):
class IncorrectlySortedList(Exception):
"""
The methods for accessing assets and their metadata
Thrown when calling find() on a SortedAssetList not sorted by filename.
"""
@contract(asset_list='SortedListWithKey', asset_id='AssetKey')
def _find_asset_in_list(self, asset_list, asset_id):
"""
Given a asset list that's a SortedListWithKey, find the index of a particular asset.
pass
class SortedAssetList(SortedListWithKey):
"""
List of assets that is sorted based on an asset attribute.
"""
def __init__(self, **kwargs):
self.filename_sort = False
key_func = kwargs.get('key', None)
if key_func is None:
kwargs['key'] = itemgetter('filename')
self.filename_sort = True
super(SortedAssetList, self).__init__(**kwargs)
@contract(asset_id=AssetKey)
def find(self, asset_id):
"""
Find the index of a particular asset in the list. This method is only functional for lists
sorted by filename. If the list is sorted on any other key, find() raises a
Returns: Index of asset, if found. None if not found.
"""
# Don't attempt to find an asset by filename in a list that's not sorted by filename.
if not self.filename_sort:
raise IncorrectlySortedList()
# See if this asset already exists by checking the external_filename.
# Studio doesn't currently support using multiple course assets with the same filename.
# So use the filename as the unique identifier.
idx = None
idx_left = asset_list.bisect_left({'filename': asset_id.path})
idx_right = asset_list.bisect_right({'filename': asset_id.path})
idx_left = self.bisect_left({'filename': asset_id.path})
idx_right = self.bisect_right({'filename': asset_id.path})
if idx_left != idx_right:
# Asset was found in the list.
idx = idx_left
return idx
@contract(asset_md=AssetMetadata)
def insert_or_update(self, asset_md):
"""
Insert asset metadata if asset is not present. Update asset metadata if asset is already present.
"""
metadata_to_insert = asset_md.to_storable()
asset_idx = self.find(asset_md.asset_id)
if asset_idx is None:
# Add new metadata sorted into the list.
self.add(metadata_to_insert)
else:
# Replace existing metadata.
self[asset_idx] = metadata_to_insert
class ModuleStoreAssetBase(object):
"""
The methods for accessing assets and their metadata
"""
def _find_course_asset(self, asset_key):
"""
Returns same as _find_course_assets plus the index to the given asset or None. Does not convert
......@@ -311,11 +349,11 @@ class ModuleStoreAssetInterface(object):
- the index of asset in list (None if asset does not exist)
"""
course_assets = self._find_course_assets(asset_key.course_key)
all_assets = SortedListWithKey([], key=itemgetter('filename'))
all_assets = SortedAssetList(iterable=[])
# Assets should be pre-sorted, so add them efficiently without sorting.
# extend() will raise a ValueError if the passed-in list is not sorted.
all_assets.extend(course_assets.setdefault(asset_key.block_type, []))
idx = self._find_asset_in_list(all_assets, asset_key)
idx = all_assets.find(asset_key)
return course_assets, idx
......@@ -334,9 +372,8 @@ class ModuleStoreAssetInterface(object):
if asset_idx is None:
return None
info = asset_key.block_type
mdata = AssetMetadata(asset_key, asset_key.path, **kwargs)
all_assets = course_assets[info]
all_assets = course_assets[asset_key.asset_type]
mdata.from_storable(all_assets[asset_idx])
return mdata
......@@ -364,7 +401,7 @@ class ModuleStoreAssetInterface(object):
course_assets = self._find_course_assets(course_key)
# Determine the proper sort - with defaults of ('displayname', SortOrder.ascending).
key_func = itemgetter('filename')
key_func = None
sort_order = ModuleStoreEnum.SortOrder.ascending
if sort:
if sort[0] == 'uploadDate':
......@@ -374,12 +411,12 @@ class ModuleStoreAssetInterface(object):
if asset_type is None:
# Add assets of all types to the sorted list.
all_assets = SortedListWithKey([], key=key_func)
all_assets = SortedAssetList(iterable=[], key=key_func)
for asset_type, val in course_assets.iteritems():
all_assets.update(val)
else:
# Add assets of a single type to the sorted list.
all_assets = SortedListWithKey(course_assets.get(asset_type, []), key=key_func)
all_assets = SortedAssetList(iterable=course_assets.get(asset_type, []), key=key_func)
num_assets = len(all_assets)
start_idx = start
......@@ -405,10 +442,32 @@ class ModuleStoreAssetInterface(object):
return ret_assets
class ModuleStoreAssetWriteInterface(ModuleStoreAssetInterface):
class ModuleStoreAssetWriteInterface(ModuleStoreAssetBase):
"""
The write operations for assets and asset metadata
"""
def _save_assets_by_type(self, course_key, asset_metadata_list, course_assets, user_id, import_only):
"""
Common private method that saves/updates asset metadata items in the internal modulestore
structure used to store asset metadata items.
"""
# Lazily create a sorted list if not already created.
assets_by_type = defaultdict(lambda: SortedAssetList(iterable=course_assets.get(asset_type, [])))
for asset_md in asset_metadata_list:
if asset_md.asset_id.course_key != course_key:
# pylint: disable=logging-format-interpolation
log.warning("Asset's course {} does not match other assets for course {} - not saved.".format(
asset_md.asset_id.course_key, course_key
))
continue
if not import_only:
asset_md.update({'edited_by': user_id, 'edited_on': datetime.datetime.now(UTC)})
asset_type = asset_md.asset_id.asset_type
all_assets = assets_by_type[asset_type]
all_assets.insert_or_update(asset_md)
return assets_by_type
@contract(asset_metadata='AssetMetadata')
def save_asset_metadata(self, asset_metadata, user_id, import_only):
"""
......@@ -485,7 +544,7 @@ class ModuleStoreAssetWriteInterface(ModuleStoreAssetInterface):
# pylint: disable=abstract-method
class ModuleStoreRead(ModuleStoreAssetInterface):
class ModuleStoreRead(ModuleStoreAssetBase):
"""
An abstract interface for a database backend that stores XModuleDescriptor
instances and extends read-only functionality
......
......@@ -26,8 +26,6 @@ from mongodb_proxy import MongoProxy, autoretry_read
from path import path
from pytz import UTC
from contracts import contract, new_contract
from operator import itemgetter
from sortedcontainers import SortedListWithKey
from importlib import import_module
from opaque_keys.edx.keys import UsageKey, CourseKey, AssetKey
......@@ -1535,34 +1533,14 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo
user_id (int|long): user ID saving the asset metadata
import_only (bool): True if edited_on/by data should remain unchanged.
"""
course_assets = self._find_course_assets(asset_metadata_list[0].asset_id.course_key)
changed_asset_types = set()
assets_by_type = {}
for asset_md in asset_metadata_list:
asset_type = asset_md.asset_id.asset_type
changed_asset_types.add(asset_type)
# Lazily create a sorted list if not already created.
if asset_type not in assets_by_type:
assets_by_type[asset_type] = SortedListWithKey(course_assets.get(asset_type, []), key=itemgetter('filename'))
all_assets = assets_by_type[asset_type]
asset_idx = self._find_asset_in_list(assets_by_type[asset_type], asset_md.asset_id)
if not import_only:
asset_md.update({'edited_by': user_id, 'edited_on': datetime.now(UTC)})
# Translate metadata to Mongo format.
metadata_to_insert = asset_md.to_storable()
if asset_idx is None:
# Add new metadata sorted into the list.
all_assets.add(metadata_to_insert)
else:
# Replace existing metadata.
all_assets[asset_idx] = metadata_to_insert
course_key = asset_metadata_list[0].asset_id.course_key
course_assets = self._find_course_assets(course_key)
assets_by_type = self._save_assets_by_type(course_key, asset_metadata_list, course_assets, user_id, import_only)
# Build an update set with potentially multiple embedded fields.
updates_by_type = {}
for asset_type in changed_asset_types:
updates_by_type[self._make_mongo_asset_key(asset_type)] = assets_by_type[asset_type].as_list()
for asset_type, assets in assets_by_type.iteritems():
updates_by_type[self._make_mongo_asset_key(asset_type)] = assets.as_list()
# Update the document.
self.asset_collection.update(
......
......@@ -73,7 +73,7 @@ from opaque_keys.edx.locator import (
from xmodule.modulestore.exceptions import InsufficientSpecificationError, VersionConflictError, DuplicateItemError, \
DuplicateCourseError
from xmodule.modulestore import (
inheritance, ModuleStoreWriteBase, ModuleStoreEnum, BulkOpsRecord, BulkOperationsMixin
inheritance, ModuleStoreWriteBase, ModuleStoreEnum, BulkOpsRecord, BulkOperationsMixin, SortedAssetList
)
from ..exceptions import ItemNotFoundError
......@@ -2416,27 +2416,6 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
"""
return self._lookup_course(course_key).structure.get('assets', {})
def _find_course_asset(self, asset_key):
"""
Return the raw dict of assets type as well as the index to the one being sought from w/in
it's subvalue (or None)
"""
assets = self._lookup_course(asset_key.course_key).structure.get('assets', {})
return assets, self._lookup_course_asset(assets, asset_key)
def _lookup_course_asset(self, structure, asset_key):
"""
Find the course asset in the structure or return None if it does not exist
"""
# See if this asset already exists by checking the external_filename.
# Studio doesn't currently support using multiple course assets with the same filename.
# So use the filename as the unique identifier.
accessor = asset_key.block_type
for idx, asset in enumerate(structure.setdefault(accessor, [])):
if asset['filename'] == asset_key.path:
return idx
return None
def _update_course_assets(self, user_id, asset_key, update_function):
"""
A wrapper for functions wanting to manipulate assets. Gets and versions the structure,
......@@ -2450,12 +2429,17 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
original_structure = self._lookup_course(asset_key.course_key).structure
index_entry = self._get_index_if_valid(asset_key.course_key)
new_structure = self.version_structure(asset_key.course_key, original_structure, user_id)
course_assets = new_structure.setdefault('assets', {})
asset_idx = self._lookup_course_asset(new_structure.setdefault('assets', {}), asset_key)
asset_type = asset_key.asset_type
all_assets = SortedAssetList(iterable=[])
# Assets should be pre-sorted, so add them efficiently without sorting.
# extend() will raise a ValueError if the passed-in list is not sorted.
all_assets.extend(course_assets.setdefault(asset_type, []))
asset_idx = all_assets.find(asset_key)
new_structure['assets'][asset_key.block_type] = update_function(
new_structure['assets'][asset_key.block_type], asset_idx
)
all_assets_updated = update_function(all_assets, asset_idx)
new_structure['assets'][asset_type] = all_assets_updated.as_list()
# update index if appropriate and structures
self.update_structure(asset_key.course_key, new_structure)
......@@ -2466,13 +2450,12 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
def save_asset_metadata_list(self, asset_metadata_list, user_id, import_only=False):
"""
A wrapper for functions wanting to manipulate assets. Gets and versions the structure,
passes the mutable array for all asset types as well as the idx to the function for it to
update, then persists the changed data back into the course.
The update function can raise an exception if it doesn't want to actually do the commit. The
surrounding method probably should catch that exception.
Saves a list of AssetMetadata to the modulestore. The list can be composed of multiple
asset types. This method is optimized for multiple inserts at once - it only re-saves the structure
at the end of all saves/updates.
"""
# Determine course key to use in bulk operation. Use the first asset assuming that
# all assets will be for the same course.
asset_key = asset_metadata_list[0].asset_id
course_key = asset_key.course_key
......@@ -2480,20 +2463,14 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
original_structure = self._lookup_course(course_key).structure
index_entry = self._get_index_if_valid(course_key)
new_structure = self.version_structure(course_key, original_structure, user_id)
course_assets = new_structure.setdefault('assets', {})
# Add all asset metadata to the structure at once.
for asset_metadata in asset_metadata_list:
metadata_to_insert = asset_metadata.to_storable()
asset_md_key = asset_metadata.asset_id
asset_idx = self._lookup_course_asset(new_structure.setdefault('assets', {}), asset_md_key)
assets_by_type = self._save_assets_by_type(
course_key, asset_metadata_list, course_assets, user_id, import_only
)
all_assets = new_structure['assets'][asset_md_key.asset_type]
if asset_idx is None:
all_assets.append(metadata_to_insert)
else:
all_assets[asset_idx] = metadata_to_insert
new_structure['assets'][asset_md_key.asset_type] = all_assets
for asset_type, assets in assets_by_type.iteritems():
new_structure['assets'][asset_type] = assets.as_list()
# update index if appropriate and structures
self.update_structure(course_key, new_structure)
......@@ -2504,21 +2481,9 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
def save_asset_metadata(self, asset_metadata, user_id, import_only=False):
"""
The guts of saving a new or updated asset
Saves or updates a single asset. Simply makes it a list and calls the list save above.
"""
metadata_to_insert = asset_metadata.to_storable()
def _internal_method(all_assets, asset_idx):
"""
Either replace the existing entry or add a new one
"""
if asset_idx is None:
all_assets.append(metadata_to_insert)
else:
all_assets[asset_idx] = metadata_to_insert
return all_assets
return self._update_course_assets(user_id, asset_metadata.asset_id, _internal_method)
return self.save_asset_metadata_list([asset_metadata, ], user_id, import_only)
@contract(asset_key='AssetKey', attr_dict=dict)
def set_asset_metadata_attrs(self, asset_key, attr_dict, user_id):
......
......@@ -528,12 +528,19 @@ class DraftVersioningModuleStore(SplitMongoModuleStore, ModuleStoreDraftAndPubli
"""
Updates both the published and draft branches
"""
asset_key = asset_metadata_list[0].asset_id
asset_metadata_list[0].asset_id = self._map_revision_to_branch(asset_key, ModuleStoreEnum.RevisionOption.published_only)
# if one call gets an exception, don't do the other call but pass on the exception
# Convert each asset key to the proper branch before saving.
asset_keys = [asset_md.asset_id for asset_md in asset_metadata_list]
for asset_md in asset_metadata_list:
asset_key = asset_md.asset_id
asset_md.asset_id = self._map_revision_to_branch(asset_key, ModuleStoreEnum.RevisionOption.published_only)
super(DraftVersioningModuleStore, self).save_asset_metadata_list(asset_metadata_list, user_id, import_only)
asset_metadata_list[0].asset_id = self._map_revision_to_branch(asset_key, ModuleStoreEnum.RevisionOption.draft_only)
for asset_md in asset_metadata_list:
asset_key = asset_md.asset_id
asset_md.asset_id = self._map_revision_to_branch(asset_key, ModuleStoreEnum.RevisionOption.draft_only)
super(DraftVersioningModuleStore, self).save_asset_metadata_list(asset_metadata_list, user_id, import_only)
# Change each asset key back to its original state.
for k in asset_keys:
asset_md.asset_id = k
def _find_course_asset(self, asset_key):
return super(DraftVersioningModuleStore, self)._find_course_asset(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment