Commit fbee3014 by David Ormsbee

A first cut at storing data differently.

parent 396c7de8
......@@ -19,6 +19,10 @@ class VisibilityTransformer(BlockStructureTransformer):
)
@classmethod
def collect_2(cls, block_cache_unit):
pass
@classmethod
def collect(cls, block_structure):
"""
Collects any information that's necessary to execute this transformer's
......
"""
This is the module that deals with the storage of data associated with the
Block Transforms framework.
# GOALS
A number of the classes in this module only make sense in the context of the
following goals:
1. Data gathered in the collect() phase should be stored by Transformer.
Motivation:
* Transformers can be used in various combinations for different end points.
For example, in serving the course structure to something like Insights,
we need not apply any student-specific transforms like permissions or user
partitions.
* We expect to add new Transformers from time to time. When deploying a new
Transformer, we should be able to run the collect() phase on all courses
for just that one Transformer, rather than having to re-process every
Transformer for every course.
* We want to be able to track collected data size by Transformer.
2. Keep the stored data as small as possible.
Motivation:
* The large size of course structure documents in Split-Mongo has been a
performance headache.
* Many uses for this framework are "a mile wide, an inch deep" types of
queries on the course, where we might need to hit thousands of XBlock
nodes, but only want a small handful of fields.
* We eventually want to be able to plug LMS FieldData to use this system,
meaning that it has to be fast and light for serving single blocks.
3. Avoid modulestore access during the transform() phase.
Motivation:
* It's slow.
* It's complex.
* It makes it harder to test.
* We want to kill it: https://openedx.atlassian.net/wiki/x/moKp
Storing things naively a per-block basis was prohibitively expensive in terms of
space consumption, resulting in about 1K/block uncompressed, and 250 bytes/block
compressed. On a 4K block course, that was 4MB of data -- and that's with a
limited number of transformers. In many cases, the block keys themselves were
larger than the data collected for them. Also, splitting things on a per-block
basis meant that compression was far less efficient, since the collected data
often has really low cardinality across blocks (e.g. due dates, boolean fields,
problem types).
# DATA OVERVIEW
Collected data often falls into one of handful of categories:
1. Present for all blocks, with different data
- e.g. display_name
2. Present for all blocks, but limited to a narrow range of possible values
- e.g. category
3. Only applicable to a small subset of blocks, and null everywhere else
- e.g. weight, video-related fields, etc.
# HIGH LEVEL DESIGN
We start with a `BlockCacheUnit`. This repesents all the data that a given set
of Transformers need for a given set of blocks in the course. A `BlockCacheUnit`
has in it:
* block_structure (BlockStructure) - holds parent/child relations
*
BlockCacheUnit:
* has the BlockStructure
* keeps track of BlockFieldValues
* bundles together a CollectedData for each Transformer as needed
CollectedData
* has a reference to the BCU's structure
* has an xblock_field_values (BlockFieldValues)
* has an transformer_field_values (BlockFieldValues)
* has free-form transformer_data
BlockFieldValues
* use a BlockIndexMapping
* stores many fields
* can't instantiate/serialize itself, BCU does it because it has the BIM
"""
import itertools
class BlockCacheUnit(object):
"""
Responsibilities:
1. Manage storage, retrieval, and caching of all Transform collected data.
2. Assemble CollectedData for a given Transformer during transform()
# And honestly, the storage part of #1 is probably going to get farmed off.
A BCU has three parts:
1. A BlockStructure (mutatable)
3. 0-n CollectedData from Transformers (immutable)
4. 0-n CollectedData that are XBlock fields (mutable)
What you really want is to be able to give a packaged view for a particular
Transformer who wants a certain set of things:
structure
collected data about my transform
fields I requested
When we're figuring out what BCU to instantiate, we need:
root, depth
list of transforms
list of XB fields (combined from transforms)
Am I allowed to ask for other transformer's data?
Args:
structure = BlockStructure
xblock_values = BlockFieldValues
transforms_to_block_values = {transform_id: BlockFieldValues}
transforms_to_data = {transform_id: top level transform data}
"""
def __init__(self, structure, xblock_field_values, transformers_to_field_values, transformers_to_data):
# BlockStructure holding relations for all block keys in this
# BlockCacheUnit. This will mutate between transforms.
self._structure = structure
self._mapping = BlockIndexMapping(structure.get_block_keys())
# BlockFieldData with all XBlock fields we're aware of. This will mutate
# between transforms.
self._xblock_field_values = xblock_field_values
# {transformer.name() -> BlockFieldValues}. Each entry is private to a
# Transformer.
self._transformers_to_field_values = transformers_to_field_values
# Top level data for each transform. {transformer.name() -> data}
self._transformers_to_data = transformers_to_data
def data_for_transform(self, transformer):
"""
Give a Transformer a CollectionData with just the information they're
supposed to have access to during the transform phase.
"""
return CollectedData(
self._structure,
self._xblock_field_values.slice(transformer.xblock_fields),
self._transforms_to_field_values[transformer.name],
self._transforms_to_data[transformer.name]
)
def data_for_collect(self, transformer):
"""
"""
ro_structure = ReadOnlyWrapper(
self._structure,
['add_relation', 'prune', 'remove_block', 'remove_block_if']
)
ro_xblock_field_values = ReadOnlyWrapper(
self._xblock_field_values.slice(transformer.xblock_fields),
['set']
)
return CollectedData(
ro_structure,
ro_xblock_field_values,
self._transforms_to_field_values[transformer.name],
self._transforms_to_data[transformer.name]
)
def set_transformer_data(self, transformer, data):
"""What to do with the version information? We need it somewhere."""
self._transformers_to_data[transformer.name] = data
def init_field_values_for_transformer(self, transformer, fields):
blank_field_values = BlockFieldValues.create_blank(self._mapping, fields)
self._transformers_to_field_values[transformer.name] = blank_field_values
return blank_field_values
def subset(self, new_root_key):
"""
Strategy for this:
1. Ask the BlockStructure for sub_structure(new_root_key)
2. Have it essentially create a new version of itself by copying
all the relations as they exist, doing DFS with cycle guard
3. Use that as the basis for a new set of block keys.
4. Create a BlockFieldValues.slice_by_keys()
5. Preserve the _transformers_to_data
"""
pass
@classmethod
def load_from_modulestore(cls, modulestore, transformers, root_key, version):
"""
Given a modulestore and a set of transformers, populate all the data
needed to construct a BCU that we can then pass through various
Transformer collect() methods.
This method will completely initialize
"""
with modulestore.bulk_operations(root_key.course_key):
# Initialize the structure
root_xblock = modulestore.get_item(root_key, depth=None)
structure = BlockStructure.load_from_xblock(root_xblock)
# Now the XBlock Fields
index_mapping = BlockIndexMapping(structure.get_block_keys())
requested_xblock_fields = {
itertools.chain(tfmr.xblock_fields for tfmr in transformers)
}
xblock_field_values = BlockFieldValues(
index_mapping,
{
field: [getattr(modulestore.get_item(block_key), field, None) for block_key in index_mapping]
for field in requested_xblock_fields
}
)
return cls(structure, xblock_field_values)
class CollectionData(object):
"""
This represents all the data that a Transformer will write out during its
collect() phase and later read during its transform() phase. It essentially
serves as a limited view of data residing in a BlockCacheUnit. Transformers
should call `BlockCacheUnit.collected_data_for()` instead of instantiating
this class directly.
Because this class represents a Transformer's data usage, it gives us a
good place to log how much data each Transformer is using and when it's
asking for it.
"""
def __init__(self, structure, xblock_field_values, field_values, data):
"""
"""
self._structure = structure
self._xblock_field_values = xblock_field_values
self._field_values = field_values
self._data = data
@property
def structure(self):
return self._structure
@property
def xblock_field_values(self):
return self._xblock_field_values
@property
def field_values(self):
return self._field_values
@property
def data(self):
return self._data
class WriteError(Exception):
pass
class ReadOnlyWrapper(object):
"""
Basically a proxy class that intercepts method calls that would alter state.
"""
def __init__(self, real_obj, write_methods):
self._real_obj = real_obj
self._write_methods = set(write_methods)
def __getattribute__(self, name):
if name in self._write_methods:
raise WriteError("Cannot use {} method during collect phase.")
return getattr(self._real_obj, name)
def __repr__(self):
return "ReadOnlyWrapper({!r}, {!r})".format(self._real_obj, self._write_methods)
def __unicode__(self):
return u"ReadOnlyWrapper on {}".format(self._real_obj)
class BlockFieldValues(object):
"""
"""
def __init__(self, block_index_mapping, fields_to_value_lists):
self._block_index_mapping = block_index_mapping
self._fields_to_value_lists = fields_to_value_lists
@classmethod
def create_blank(cls, block_index_mapping, fields):
return cls(
block_index_mapping,
{
field: [None for __ in xrange(len(block_index_mapping))]
for field in fields
}
)
@property
def fields(self):
"""Sorted list of fields available in this BlockFieldValues."""
return sorted(self._fields_to_value_lists.keys())
def slice_by_fields(self, fields):
"""
Return a cheaply sliced BlockFieldValues with a subset of the data.
Note that a sliced BlockFieldValues still points to the same lists as
the original it was taken from. This is mostly for helping folks to keep
track of their real field dependencies explicitly -- so that we can give
a Transformer precisely what it declared it needed, and error early if
it tries to access something it hasn't asked for. It is *not* a way to
make a safe copy.
"""
return BlockFieldValues(
self._block_index_mapping,
{
field: value_list
for field, value_list in self._fields_to_value_lists.items()
if field in fields
}
)
def slice_by_keys(self, new_index_mapping):
raise NotImplementedError
return BlockFieldValues(
new_index_mapping,
{
field: value_list
}
)
def get(self, field, block_key):
index = self._block_index_mapping.index_for(block_key)
value_list = self._value_list_for_field(field)
return value_list[index]
def set(self, field, block_key, value):
index = self._block_index_mapping.index_for(block_key)
value_list = self._value_list_for_field(field)
value_list[index] = value
def __getitem__(self, block_key):
"""Return a dict of field names to values."""
index = self._block_index_mapping.index_for(block_key)
return {
field: value_list[index]
for field, value_list
in self._fields_to_value_lists.items()
}
def _value_list_for_field(self, field):
try:
return self._fields_to_value_lists[field]
except KeyError:
raise KeyError(
"{} has no field '{}' (fields: {})".format(self, field, self.fields)
)
class BlockIndexMapping(object):
"""
Given a list of block_keys, this class will hold a mapping of
block_key -> list index.
It is expected that instances of this are shared among many objects, and you
should *not* mutate any data structures after __init__().
"""
def __init__(self, block_keys):
self._ordered_block_keys = sorted(block_keys)
counter = itertools.count(0)
self._block_keys_to_indexes = {
block_key: counter.next()
for block_key in self._ordered_block_keys
}
def __iter__(self):
return iter(self._ordered_block_keys)
def index_for(self, key):
try:
return self._block_keys_to_indexes[key]
except KeyError as key_err:
raise KeyError("{} has no index mapping for key {}".format(self, key))
"""
...
.
"""
from block_structure import BlockStructureFactory
from transformer import BlockStructureTransformers
def get_blocks(root_block_key, transformers, user_info):
# Load the cached block structure. This will first try to find the exact
# block in ephemeral storage, then fall back to the root course block it
# belongs to in ephemeral storage, and then fall back to the root course
# block stored in permanent storage.
bcu = BlockCacheUnit.load(root_block_key, transformers)
# Note that each transform may mutate the
for transformer in transformers:
with bcu.collected_data_for(transformer) as collected_data:
transformer.transform(user_info, collected_data)
return bcu.structure
def get_blocks(cache, modulestore, user_info, root_block_key, transformers):
unregistered_transformers = BlockStructureTransformers.find_unregistered(transformers)
if unregistered_transformers:
......
......@@ -11,8 +11,6 @@ from transformer import BlockStructureTransformers
logger = getLogger(__name__) # pylint: disable=C0103
TRANSFORMER_VERSION_KEY = '_version'
class BlockRelations(object):
def __init__(self):
......@@ -32,6 +30,9 @@ class BlockStructure(object):
def __iter__(self):
return self.topological_traversal()
def __contains__(self, usage_key):
return usage_key in self._block_relations
def add_relation(self, parent_key, child_key):
self._add_relation(self._block_relations, parent_key, child_key)
......@@ -88,58 +89,6 @@ class BlockStructure(object):
_ = block_relations[block_key]
class BlockData(object):
def __init__(self):
# dictionary mapping xblock field names to their values.
self._xblock_fields = {}
# dictionary mapping transformers' IDs to their collected data.
self._transformer_data = defaultdict(dict)
class BlockStructureBlockData(BlockStructure):
"""
A sub-class of BlockStructure that encapsulates data captured about the blocks.
"""
def __init__(self, root_block_key):
super(BlockStructureBlockData, self).__init__(root_block_key)
# dictionary mapping usage keys to BlockData
self._block_data_map = defaultdict(BlockData)
# dictionary mapping transformer IDs to block-structure-wide transformer data
self._transformer_data = defaultdict(dict)
def get_xblock_field(self, usage_key, field_name, default=None):
block_data = self._block_data_map.get(usage_key)
return block_data._xblock_fields.get(field_name, default) if block_data else default
def get_transformer_data(self, transformer, key, default=None):
return self._transformer_data.get(transformer.name(), {}).get(key, default)
def set_transformer_data(self, transformer, key, value):
self._transformer_data[transformer.name()][key] = value
def get_transformer_data_version(self, transformer):
return self.get_transformer_data(transformer, TRANSFORMER_VERSION_KEY, 0)
def get_transformer_block_data(self, usage_key, transformer, key=None, default=None):
block_data = self._block_data_map.get(usage_key)
if not block_data:
return default
else:
transformer_data = block_data._transformer_data.get(transformer.name(), {})
if key:
return transformer_data.get(key, default)
else:
return transformer_data or default
def set_transformer_block_data(self, usage_key, transformer, key, value):
self._block_data_map[usage_key]._transformer_data[transformer.name()][key] = value
def remove_transformer_block_data(self, usage_key, transformer, key):
self._block_data_map[usage_key]._transformer_data.get(transformer.name(), {}).pop(key, None)
def remove_block(self, usage_key, keep_descendants):
children = self._block_relations[usage_key].children
parents = self._block_relations[usage_key].parents
......@@ -168,48 +117,10 @@ class BlockStructureBlockData(BlockStructure):
return True
list(self.topological_traversal(predicate=predicate, **kwargs))
class BlockStructureCollectedData(BlockStructureBlockData):
"""
A sub-class of BlockStructure that encapsulates information about the blocks during the collect phase.
"""
def __init__(self, root_block_key):
super(BlockStructureCollectedData, self).__init__(root_block_key)
self._xblock_map = {} # dict[UsageKey: XBlock]
self._requested_xblock_fields = set()
def request_xblock_fields(self, *field_names):
self._requested_xblock_fields.update(set(field_names))
def collect_requested_xblock_fields(self):
if not self._requested_xblock_fields:
return
for xblock in self._xblock_map.itervalues():
for field_name in self._requested_xblock_fields:
self._set_xblock_field(xblock, field_name)
def _set_xblock_field(self, xblock, field_name):
if hasattr(xblock, field_name):
self._block_data_map[xblock.location]._xblock_fields[field_name] = getattr(xblock, field_name)
def add_xblock(self, xblock):
self._xblock_map[xblock.location] = xblock
def get_xblock(self, usage_key):
return self._xblock_map[usage_key]
def add_transformer(self, transformer):
if transformer.VERSION == 0:
raise Exception('VERSION attribute is not set on transformer {0}.', transformer.name())
self.set_transformer_data(transformer, TRANSFORMER_VERSION_KEY, transformer.VERSION)
class BlockStructureFactory(object):
@classmethod
def create_from_modulestore(cls, root_block_key, modulestore):
block_structure = BlockStructureCollectedData(root_block_key)
def load_from_xblock(cls, root_xblock):
root_block_key = root_xblock.location
block_structure = BlockStructure(root_block_key)
blocks_visited = set()
def build_block_structure(xblock):
......@@ -224,73 +135,6 @@ class BlockStructureFactory(object):
block_structure.add_relation(xblock.location, child.location)
build_block_structure(child)
root_xblock = modulestore.get_item(root_block_key, depth=None)
build_block_structure(root_xblock)
return block_structure
@classmethod
def serialize_to_cache(cls, block_structure, cache):
data_to_cache = (
block_structure._block_relations,
block_structure._transformer_data,
block_structure._block_data_map
)
zp_data_to_cache = zpickle(data_to_cache)
cache.set(
cls._encode_root_cache_key(block_structure.root_block_key),
zp_data_to_cache
)
logger.debug(
"Wrote BlockStructure {} to cache, size: {}".format(
block_structure.root_block_key, len(zp_data_to_cache)
)
)
@classmethod
def create_from_cache(cls, root_block_key, cache):
"""
Returns:
BlockStructure, if the block structure is in the cache, and
NoneType otherwise.
"""
zp_data_from_cache = cache.get(cls._encode_root_cache_key(root_block_key))
if not zp_data_from_cache:
return None
logger.debug(
"Read BlockStructure {} from cache, size: {}".format(
root_block_key, len(zp_data_from_cache)
)
)
block_relations, transformer_data, block_data_map = zunpickle(zp_data_from_cache)
block_structure = BlockStructureBlockData(root_block_key)
block_structure._block_relations = block_relations
block_structure._transformer_data = transformer_data
block_structure._block_data_map = block_data_map
transformer_issues = {}
for transformer in BlockStructureTransformers.get_registered_transformers():
cached_transformer_version = block_structure.get_transformer_data_version(transformer)
if transformer.VERSION != cached_transformer_version:
transformer_issues[transformer.name()] = "version: {}, cached: {}".format(
transformer.VERSION,
cached_transformer_version,
)
if transformer_issues:
logger.info(
"Collected data for the following transformers have issues:\n{}."
).format('\n'.join([t_name + ": " + t_value for t_name, t_value in transformer_issues.iteritems()]))
return None
return block_structure
@classmethod
def remove_from_cache(cls, root_block_key, cache):
cache.delete(cls._encode_root_cache_key(root_block_key))
# TODO also remove all block data?
@classmethod
def _encode_root_cache_key(cls, root_block_key):
return "root.key." + unicode(root_block_key)
"""
Tests for block_cache.py
def transform(user_info, structure, collected_data):
field_values = collected_data.xblock_field_values
"""
from mock import patch
from unittest import TestCase
from opaque_keys.edx.keys import CourseKey, UsageKey
from opaque_keys.edx.locator import CourseLocator, LibraryLocator, BlockUsageLocator
from .test_utils import (
MockModulestoreFactory, MockCache, MockUserInfo, MockTransformer, ChildrenMapTestMixin
)
from ..block_cache import get_blocks
from ..bcu import BlockCacheUnit, BlockFieldValues, BlockIndexMapping, CollectionData
TEST_COURSE_KEY = CourseLocator(org="BCU", course="Fast", run="101")
class TestBlockCacheUnit(TestCase):
def setUp(self):
self.bcu = BlockCacheUnit(
block_structure,
xblock_field_values,
transformers_to_field_values,
transformers_to_data,
)
class TestBlockFieldValues(TestCase):
def setUp(self):
self.mapping = BlockIndexMapping(
make_locators(TEST_COURSE_KEY, chapter=2, vertical=2, html=5, problem=5)
)
self.field_values = BlockFieldValues(
self.mapping,
{
'block_id': [key.block_id for key in self.mapping],
'block_type': [key.block_type for key in self.mapping],
'has_score': [key.block_type == 'problem' for key in self.mapping],
'horribly_named': [key.block_type == 'vertical' for key in self.mapping],
}
)
def test_get(self):
# Check some values we expect
self.assertEqual(
'chapter',
self.field_values.get('block_type', BlockUsageLocator(TEST_COURSE_KEY, 'chapter', 'chapter_0'))
)
self.assertEqual(
'html_4',
self.field_values.get('block_id', BlockUsageLocator(TEST_COURSE_KEY, 'html', 'html_4'))
)
self.assertTrue(
self.field_values.get('horribly_named', BlockUsageLocator(TEST_COURSE_KEY, 'vertical', 'vertical_1'))
)
self.assertEqual(
self.field_values[BlockUsageLocator(TEST_COURSE_KEY, 'problem', 'problem_4')],
{
'block_type': 'problem',
'block_id': 'problem_4',
'has_score': True,
'horribly_named': False,
}
)
# Make sure we throw key errors for non-existent fields or block keys
with self.assertRaises(KeyError):
self.field_values.get('no_such_field', BlockUsageLocator(TEST_COURSE_KEY, 'html', 'html_1'))
with self.assertRaises(KeyError):
self.field_values.get('block_id', TEST_COURSE_KEY)
def test_slice_by_fields(self):
self.assertEqual(
['block_id', 'block_type', 'has_score', 'horribly_named'],
self.field_values.fields
)
chapter_key = BlockUsageLocator(TEST_COURSE_KEY, 'chapter', 'chapter_0')
empty = self.field_values.slice_by_fields([])
self.assertEqual([], empty.fields)
self.assertEqual({}, empty[chapter_key])
grading = self.field_values.slice_by_fields(['block_id', 'has_score'])
self.assertEqual(
{'block_id': 'chapter_0', 'has_score': False},
grading[chapter_key]
)
self.assertEqual('chapter_0', grading.get('block_id', chapter_key))
# Now test mutation -- these are supposed to point to the same underlying
# lists (or XBlock field mutations wouldn't carry across Transformers)
self.assertFalse(grading.get('has_score', chapter_key))
self.assertFalse(self.field_values.get('has_score', chapter_key))
grading.set('has_score', chapter_key, True)
self.assertTrue(grading.get('has_score', chapter_key))
self.assertTrue(self.field_values.get('has_score', chapter_key))
class TestBlockIndexMapping(TestCase):
def setUp(self):
self.locators = make_locators(
TEST_COURSE_KEY, chapter=2, vertical=3, html=5, problem=5, video=5
)
self.mapping = BlockIndexMapping(self.locators)
def test_locator_ordering(self):
"""Locators should iterate in sorted order."""
sorted_locators = sorted(self.locators)
self.assertEqual(sorted_locators, list(self.mapping))
def test_index_lookup(self):
self.assertEqual(0, self.mapping.index_for(BlockUsageLocator(TEST_COURSE_KEY, 'chapter', 'chapter_0')))
self.assertEqual(2, self.mapping.index_for(BlockUsageLocator(TEST_COURSE_KEY, 'course', '2015')))
self.assertEqual(20, self.mapping.index_for(BlockUsageLocator(TEST_COURSE_KEY, 'video', 'video_4')))
with self.assertRaises(KeyError):
self.mapping.index_for(TEST_COURSE_KEY)
def make_locators(course_key, **block_types_to_qty):
locators = [BlockUsageLocator(TEST_COURSE_KEY, 'course', '2015')]
for block_type, qty in block_types_to_qty.items():
for i in xrange(qty):
block_id = "{}_{}".format(block_type, i)
locators.append(BlockUsageLocator(TEST_COURSE_KEY, block_type, block_id))
return locators
"""
...
How many things affect a Transformer's output?
1. Collected Data for this Transform (both its own and any dependent XBlock fields)
2. Block Structure as it exists at this point in the chain (others could have modified it for their own reasons)
2. User info
3. Time
4. ???
"""
from abc import abstractmethod
from openedx.core.lib.api.plugins import PluginManager
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment