Commit 691d6872 by Adam Committed by GitHub

Merge pull request #15294 from edx/adam/add-precedes-to-coursegraph-3

write courses to neo4j in background tasks (EDUCATOR-583)
parents 0b90c60b 51abe89a
......@@ -498,3 +498,6 @@ AFFILIATE_COOKIE_NAME = ENV_TOKENS.get('AFFILIATE_COOKIE_NAME', AFFILIATE_COOKIE
############## Settings for Studio Context Sensitive Help ##############
HELP_TOKENS_BOOKS = ENV_TOKENS.get('HELP_TOKENS_BOOKS', HELP_TOKENS_BOOKS)
############## Settings for CourseGraph ############################
COURSEGRAPH_JOB_QUEUE = ENV_TOKENS.get('COURSEGRAPH_JOB_QUEUE', LOW_PRIORITY_QUEUE)
......@@ -1332,3 +1332,6 @@ COURSE_CATALOG_API_URL = None
# Queue to use for updating persistent grades
RECALCULATE_GRADES_ROUTING_KEY = LOW_PRIORITY_QUEUE
############## Settings for CourseGraph ############################
COURSEGRAPH_JOB_QUEUE = LOW_PRIORITY_QUEUE
......@@ -1002,3 +1002,6 @@ COURSES_API_CACHE_TIMEOUT = ENV_TOKENS.get('COURSES_API_CACHE_TIMEOUT', COURSES_
# Add an ICP license for serving content in China if your organization is registered to do so
ICP_LICENSE = ENV_TOKENS.get('ICP_LICENSE', None)
############## Settings for CourseGraph ############################
COURSEGRAPH_JOB_QUEUE = ENV_TOKENS.get('COURSEGRAPH_JOB_QUEUE', LOW_PRIORITY_QUEUE)
......@@ -3190,3 +3190,6 @@ COURSE_ENROLLMENT_MODES = {
############## Settings for the Discovery App ######################
COURSES_API_CACHE_TIMEOUT = 3600 # Value is in seconds
############## Settings for CourseGraph ############################
COURSEGRAPH_JOB_QUEUE = LOW_PRIORITY_QUEUE
......@@ -12,3 +12,5 @@ class CoursegraphConfig(AppConfig):
AppConfig for courseware app
"""
name = 'openedx.core.djangoapps.coursegraph'
from . import tasks # pylint: disable=unused-variable
......@@ -7,334 +7,12 @@ from __future__ import unicode_literals, print_function
import logging
from django.core.management.base import BaseCommand
from django.utils import six, timezone
from opaque_keys.edx.keys import CourseKey
from py2neo import Graph, Node, Relationship, authenticate, NodeSelector
from py2neo.compat import integer, string, unicode as neo4j_unicode
from request_cache.middleware import RequestCache
from xmodule.modulestore.django import modulestore
from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES
from django.utils import six
from openedx.core.djangoapps.content.course_structures.models import CourseStructure
from openedx.core.djangoapps.coursegraph.tasks import ModuleStoreSerializer
log = logging.getLogger(__name__)
# When testing locally, neo4j's bolt logger was noisy, so we'll only have it
# emit logs if there's an error.
bolt_log = logging.getLogger('neo4j.bolt') # pylint: disable=invalid-name
bolt_log.setLevel(logging.ERROR)
PRIMITIVE_NEO4J_TYPES = (integer, string, neo4j_unicode, float, bool)
class ModuleStoreSerializer(object):
"""
Class with functionality to serialize a modulestore into subgraphs,
one graph per course.
"""
def __init__(self, course_keys):
self.course_keys = course_keys
@classmethod
def create(cls, courses=None, skip=None):
"""
Sets the object's course_keys attribute from the `courses` parameter.
If that parameter isn't furnished, loads all course_keys from the
modulestore.
Filters out course_keys in the `skip` parameter, if provided.
Arguments:
courses: A list of string serializations of course keys.
For example, ["course-v1:org+course+run"].
skip: Also a list of string serializations of course keys.
Returns:
a ModulestoreSerializer instance
"""
if courses:
course_keys = [CourseKey.from_string(course.strip()) for course in courses]
else:
course_keys = [
course.id for course in modulestore().get_course_summaries()
]
if skip is not None:
skip_keys = set([CourseKey.from_string(course.strip()) for course in skip])
course_keys = [course_key for course_key in course_keys if course_key not in skip_keys]
return cls(course_keys)
@staticmethod
def serialize_item(item):
"""
Arguments:
item: an XBlock
Returns:
fields: a dictionary of an XBlock's field names and values
block_type: the name of the XBlock's type (i.e. 'course'
or 'problem')
"""
# convert all fields to a dict and filter out parent and children field
fields = dict(
(field, field_value.read_from(item))
for (field, field_value) in six.iteritems(item.fields)
if field not in ['parent', 'children']
)
course_key = item.scope_ids.usage_id.course_key
block_type = item.scope_ids.block_type
# set or reset some defaults
fields['edited_on'] = six.text_type(getattr(item, 'edited_on', ''))
fields['display_name'] = item.display_name_with_default
fields['org'] = course_key.org
fields['course'] = course_key.course
fields['run'] = course_key.run
fields['course_key'] = six.text_type(course_key)
fields['location'] = six.text_type(item.location)
fields['block_type'] = block_type
fields['detached'] = block_type in DETACHED_XBLOCK_TYPES
if block_type == 'course':
# prune the checklists field
if 'checklists' in fields:
del fields['checklists']
# record the time this command was run
fields['time_last_dumped_to_neo4j'] = six.text_type(timezone.now())
return fields, block_type
def serialize_course(self, course_id):
"""
Serializes a course into py2neo Nodes and Relationships
Arguments:
course_id: CourseKey of the course we want to serialize
Returns:
nodes: a list of py2neo Node objects
relationships: a list of py2neo Relationships objects
"""
# create a location to node mapping we'll need later for
# writing relationships
location_to_node = {}
items = modulestore().get_items(course_id)
# create nodes
for item in items:
fields, block_type = self.serialize_item(item)
for field_name, value in six.iteritems(fields):
fields[field_name] = self.coerce_types(value)
node = Node(block_type, 'item', **fields)
location_to_node[item.location] = node
# create relationships
relationships = []
for item in items:
previous_child_node = None
for index, child_loc in enumerate(item.get_children()):
parent_node = location_to_node.get(item.location)
child_node = location_to_node.get(child_loc.location)
child_node["index"] = index
if parent_node is not None and child_node is not None:
relationship = Relationship(parent_node, "PARENT_OF", child_node)
relationships.append(relationship)
if previous_child_node:
ordering_relationship = Relationship(
previous_child_node, "PRECEDES", child_node
)
relationships.append(ordering_relationship)
previous_child_node = child_node
nodes = location_to_node.values()
return nodes, relationships
@staticmethod
def coerce_types(value):
"""
Arguments:
value: the value of an xblock's field
Returns: either the value, a text version of the value, or, if the
value is a list, a list where each element is converted to text.
"""
coerced_value = value
if isinstance(value, list):
coerced_value = [six.text_type(element) for element in coerced_value]
# if it's not one of the types that neo4j accepts,
# just convert it to text
elif not isinstance(value, PRIMITIVE_NEO4J_TYPES):
coerced_value = six.text_type(value)
return coerced_value
@staticmethod
def add_to_transaction(neo4j_entities, transaction):
"""
Arguments:
neo4j_entities: a list of Nodes or Relationships
transaction: a neo4j transaction
"""
for entity in neo4j_entities:
transaction.create(entity)
@staticmethod
def get_command_last_run(course_key, graph):
"""
This information is stored on the course node of a course in neo4j
Arguments:
course_key: a CourseKey
graph: a py2neo Graph
Returns: The datetime that the command was last run, converted into
text, or None, if there's no record of this command last being run.
"""
selector = NodeSelector(graph)
course_node = selector.select(
"course",
course_key=six.text_type(course_key)
).first()
last_this_command_was_run = None
if course_node:
last_this_command_was_run = course_node['time_last_dumped_to_neo4j']
return last_this_command_was_run
@staticmethod
def get_course_last_published(course_key):
"""
We use the CourseStructure table to get when this course was last
published.
Arguments:
course_key: a CourseKey
Returns: The datetime the course was last published at, converted into
text, or None, if there's no record of the last time this course
was published.
"""
try:
structure = CourseStructure.objects.get(course_id=course_key)
course_last_published_date = six.text_type(structure.modified)
except CourseStructure.DoesNotExist:
course_last_published_date = None
return course_last_published_date
def should_dump_course(self, course_key, graph):
"""
Only dump the course if it's been changed since the last time it's been
dumped.
Arguments:
course_key: a CourseKey object.
graph: a py2neo Graph object.
Returns: bool of whether this course should be dumped to neo4j.
"""
last_this_command_was_run = self.get_command_last_run(course_key, graph)
course_last_published_date = self.get_course_last_published(course_key)
# if we don't have a record of the last time this command was run,
# we should serialize the course and dump it
if last_this_command_was_run is None:
return True
# if we've serialized the course recently and we have no published
# events, we will not dump it, and so we can skip serializing it
# again here
if last_this_command_was_run and course_last_published_date is None:
return False
# otherwise, serialize and dump the course if the command was run
# before the course's last published event
return last_this_command_was_run < course_last_published_date
def dump_course_to_neo4j(self, course_key, graph):
"""
Serializes a course and writes it to neo4j.
Arguments:
course_key: course key for the course to be exported
graph: py2neo graph object
"""
nodes, relationships = self.serialize_course(course_key)
log.info(
"%d nodes and %d relationships in %s",
len(nodes),
len(relationships),
course_key,
)
transaction = graph.begin()
course_string = six.text_type(course_key)
try:
# first, delete existing course
transaction.run(
"MATCH (n:item) WHERE n.course_key='{}' DETACH DELETE n".format(
course_string
)
)
# now, re-add it
self.add_to_transaction(nodes, transaction)
self.add_to_transaction(relationships, transaction)
transaction.commit()
except Exception: # pylint: disable=broad-except
log.exception(
"Error trying to dump course %s to neo4j, rolling back",
course_string
)
transaction.rollback()
def dump_courses_to_neo4j(self, graph, override_cache=False):
"""
Method that iterates through a list of courses in a modulestore,
serializes them, then submits tasks to write them to neo4j.
Arguments:
graph: py2neo graph object
override_cache: serialize the courses even if they'be been recently
serialized
Returns: two lists--one of the courses that were successfully written
to neo4j and one of courses that were not.
"""
total_number_of_courses = len(self.course_keys)
submitted_courses = []
skipped_courses = []
for index, course_key in enumerate(self.course_keys):
# first, clear the request cache to prevent memory leaks
RequestCache.clear_request_cache()
log.info(
"Now exporting %s to neo4j: course %d of %d total courses",
course_key,
index + 1,
total_number_of_courses,
)
if not (override_cache or self.should_dump_course(course_key, graph)):
log.info("skipping dumping %s, since it hasn't changed", course_key)
skipped_courses.append(unicode(course_key))
else:
self.dump_course_to_neo4j(course_key, graph)
submitted_courses.append(unicode(course_key))
return submitted_courses, skipped_courses
class Command(BaseCommand):
"""
......@@ -377,33 +55,11 @@ class Command(BaseCommand):
Iterates through each course, serializes them into graphs, and saves
those graphs to neo4j.
"""
host = options['host']
https_port = options['https_port']
http_port = options['http_port']
secure = options['secure']
neo4j_user = options['user']
neo4j_password = options['password']
authenticate(
"{host}:{port}".format(host=host, port=https_port if secure else http_port),
neo4j_user,
neo4j_password,
)
graph = Graph(
bolt=True,
password=neo4j_password,
user=neo4j_user,
https_port=https_port,
http_port=http_port,
host=host,
secure=secure,
)
mss = ModuleStoreSerializer.create(options['courses'], options['skip'])
submitted_courses, skipped_courses = mss.dump_courses_to_neo4j(
graph, override_cache=options['override']
options, override_cache=options['override']
)
log.info(
......@@ -413,7 +69,7 @@ class Command(BaseCommand):
)
if not submitted_courses:
print("No courses exported to neo4j at all!")
print("No courses submitted for export to neo4j at all!")
return
if submitted_courses:
......
......@@ -14,12 +14,18 @@ from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory
from openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j import (
ModuleStoreSerializer,
ModuleStoreSerializer
)
from openedx.core.djangoapps.coursegraph.management.commands.tests.utils import (
MockGraph,
MockNodeSelector,
)
from openedx.core.djangoapps.coursegraph.tasks import (
serialize_item,
serialize_course,
coerce_types,
should_dump_course,
)
from openedx.core.djangoapps.content.course_structures.signals import (
listen_for_course_publish
)
......@@ -109,8 +115,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase):
Tests for the dump to neo4j management command
"""
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.Graph')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.Graph')
@ddt.data(1, 2)
def test_dump_specific_courses(self, number_of_courses, mock_graph_class, mock_selector_class):
"""
......@@ -134,8 +140,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase):
number_rollbacks=0
)
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.Graph')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.Graph')
def test_dump_skip_course(self, mock_graph_class, mock_selector_class):
"""
Test that you can skip courses.
......@@ -160,8 +166,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase):
number_rollbacks=0,
)
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.Graph')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.Graph')
def test_dump_skip_beats_specifying(self, mock_graph_class, mock_selector_class):
"""
Test that if you skip and specify the same course, you'll skip it.
......@@ -187,8 +193,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase):
number_rollbacks=0,
)
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.Graph')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.Graph')
def test_dump_all_courses(self, mock_graph_class, mock_selector_class):
"""
Test if you don't specify which courses to dump, then you'll dump
......@@ -229,7 +235,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
"""
Tests the serialize_item method.
"""
fields, label = self.mss.serialize_item(self.course)
fields, label = serialize_item(self.course)
self.assertEqual(label, "course")
self.assertIn("edited_on", fields.keys())
self.assertIn("display_name", fields.keys())
......@@ -246,7 +252,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
"""
Tests the serialize_course method.
"""
nodes, relationships = self.mss.serialize_course(self.course.id)
nodes, relationships = serialize_course(self.course.id)
self.assertEqual(len(nodes), 9)
# the course has 7 "PARENT_OF" relationships and 3 "PRECEDES"
self.assertEqual(len(relationships), 10)
......@@ -282,7 +288,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
Returns:
A tuple of the string representations of those XBlocks' locations.
"""
return (unicode(xblock1.location), unicode(xblock2.location))
return (six.text_type(xblock1.location), six.text_type(xblock2.location))
def assertBlockPairIsRelationship(self, xblock1, xblock2, relationships, relationship_type):
"""
......@@ -306,7 +312,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
"""
Tests that two nodes that should have a precedes relationship have it.
"""
__, relationships = self.mss.serialize_course(self.course.id)
__, relationships = serialize_course(self.course.id)
self.assertBlockPairIsRelationship(self.video, self.video2, relationships, "PRECEDES")
self.assertBlockPairIsNotRelationship(self.video2, self.video, relationships, "PRECEDES")
self.assertBlockPairIsNotRelationship(self.vertical, self.video, relationships, "PRECEDES")
......@@ -316,7 +322,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
"""
Test that two nodes that should have a parent_of relationship have it.
"""
__, relationships = self.mss.serialize_course(self.course.id)
__, relationships = serialize_course(self.course.id)
self.assertBlockPairIsRelationship(self.vertical, self.video, relationships, "PARENT_OF")
self.assertBlockPairIsRelationship(self.vertical, self.html, relationships, "PARENT_OF")
self.assertBlockPairIsRelationship(self.course, self.chapter, relationships, "PARENT_OF")
......@@ -328,7 +334,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
"""
Test that we add index values on nodes
"""
nodes, relationships = self.mss.serialize_course(self.course.id)
nodes, relationships = serialize_course(self.course.id)
# the html node should have 0 index, and the problem should have 1
html_nodes = [node for node in nodes if node['block_type'] == 'html']
......@@ -359,19 +365,22 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
"""
Tests the coerce_types helper
"""
coerced_value = self.mss.coerce_types(original_value)
coerced_value = coerce_types(original_value)
self.assertEqual(coerced_value, coerced_expected)
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector')
def test_dump_to_neo4j(self, mock_selector_class):
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.authenticate_and_create_graph')
def test_dump_to_neo4j(self, mock_graph_constructor, mock_selector_class):
"""
Tests the dump_to_neo4j method works against a mock
py2neo Graph
"""
mock_graph = MockGraph()
mock_graph_constructor.return_value = mock_graph
mock_selector_class.return_value = MockNodeSelector(mock_graph)
mock_credentials = mock.Mock()
submitted, skipped = self.mss.dump_courses_to_neo4j(mock_graph)
submitted, skipped = self.mss.dump_courses_to_neo4j(mock_credentials)
self.assertCourseDump(
mock_graph,
......@@ -386,16 +395,19 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
self.assertEqual(len(mock_graph.nodes), 11)
self.assertItemsEqual(submitted, self.course_strings)
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector')
def test_dump_to_neo4j_rollback(self, mock_selector_class):
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.authenticate_and_create_graph')
def test_dump_to_neo4j_rollback(self, mock_graph_constructor, mock_selector_class):
"""
Tests that the the dump_to_neo4j method handles the case where there's
an exception trying to write to the neo4j database.
"""
mock_graph = MockGraph(transaction_errors=True)
mock_graph_constructor.return_value = mock_graph
mock_selector_class.return_value = MockNodeSelector(mock_graph)
mock_credentials = mock.Mock()
submitted, skipped = self.mss.dump_courses_to_neo4j(mock_graph)
submitted, skipped = self.mss.dump_courses_to_neo4j(mock_credentials)
self.assertCourseDump(
mock_graph,
......@@ -406,50 +418,64 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
self.assertItemsEqual(submitted, self.course_strings)
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.authenticate_and_create_graph')
@ddt.data((True, 2), (False, 0))
@ddt.unpack
def test_dump_to_neo4j_cache(self, override_cache, expected_number_courses, mock_selector_class):
def test_dump_to_neo4j_cache(
self,
override_cache,
expected_number_courses,
mock_graph_constructor,
mock_selector_class,
):
"""
Tests the caching mechanism and override to make sure we only publish
recently updated courses.
"""
mock_graph = MockGraph()
mock_graph_constructor.return_value = mock_graph
mock_selector_class.return_value = MockNodeSelector(mock_graph)
mock_credentials = mock.Mock()
# run once to warm the cache
self.mss.dump_courses_to_neo4j(
mock_graph, override_cache=override_cache
mock_credentials, override_cache=override_cache
)
# when run the second time, only dump courses if the cache override
# is enabled
submitted, __ = self.mss.dump_courses_to_neo4j(
mock_graph, override_cache=override_cache
mock_credentials, override_cache=override_cache
)
self.assertEqual(len(submitted), expected_number_courses)
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector')
def test_dump_to_neo4j_published(self, mock_selector_class):
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.authenticate_and_create_graph')
def test_dump_to_neo4j_published(self, mock_graph_constructor, mock_selector_class):
"""
Tests that we only dump those courses that have been published after
the last time the command was been run.
"""
mock_graph = MockGraph()
mock_graph_constructor.return_value = mock_graph
mock_selector_class.return_value = MockNodeSelector(mock_graph)
mock_credentials = mock.Mock()
# run once to warm the cache
submitted, skipped = self.mss.dump_courses_to_neo4j(mock_graph)
submitted, skipped = self.mss.dump_courses_to_neo4j(mock_credentials)
self.assertEqual(len(submitted), len(self.course_strings))
# simulate one of the courses being published
listen_for_course_publish(None, self.course.id)
# make sure only the published course was dumped
submitted, __ = self.mss.dump_courses_to_neo4j(mock_graph)
submitted, __ = self.mss.dump_courses_to_neo4j(mock_credentials)
self.assertEqual(len(submitted), 1)
self.assertEqual(submitted[0], unicode(self.course.id))
self.assertEqual(submitted[0], six.text_type(self.course.id))
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.get_course_last_published')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.get_command_last_run')
@ddt.data(
(six.text_type(datetime(2016, 3, 30)), six.text_type(datetime(2016, 3, 31)), True),
(six.text_type(datetime(2016, 3, 31)), six.text_type(datetime(2016, 3, 30)), False),
......@@ -458,17 +484,23 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
(None, None, True),
)
@ddt.unpack
def test_should_dump_course(self, last_command_run, last_course_published, should_dump):
def test_should_dump_course(
self,
last_command_run,
last_course_published,
should_dump,
mock_get_command_last_run,
mock_get_course_last_published,
):
"""
Tests whether a course should be dumped given the last time it was
dumped and the last time it was published.
"""
mss = ModuleStoreSerializer.create()
mss.get_command_last_run = lambda course_key, graph: last_command_run
mss.get_course_last_published = lambda course_key: last_course_published
mock_get_command_last_run.return_value = last_command_run
mock_get_course_last_published.return_value = last_course_published
mock_course_key = mock.Mock
mock_graph = mock.Mock()
self.assertEqual(
mss.should_dump_course(mock_course_key, mock_graph),
should_dump_course(mock_course_key, mock_graph),
should_dump,
)
"""
This file contains a management command for exporting the modulestore to
neo4j, a graph database.
"""
from __future__ import unicode_literals, print_function
import logging
from celery import task
from django.conf import settings
from django.utils import six, timezone
from opaque_keys.edx.keys import CourseKey
from py2neo import Graph, Node, Relationship, authenticate, NodeSelector
from py2neo.compat import integer, string, unicode as neo4j_unicode
from request_cache.middleware import RequestCache
from xmodule.modulestore.django import modulestore
from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES
from openedx.core.djangoapps.content.course_structures.models import CourseStructure
log = logging.getLogger(__name__)
celery_log = logging.getLogger('edx.celery.task')
# When testing locally, neo4j's bolt logger was noisy, so we'll only have it
# emit logs if there's an error.
bolt_log = logging.getLogger('neo4j.bolt') # pylint: disable=invalid-name
bolt_log.setLevel(logging.ERROR)
PRIMITIVE_NEO4J_TYPES = (integer, string, neo4j_unicode, float, bool)
def serialize_item(item):
"""
Args:
item: an XBlock
Returns:
fields: a dictionary of an XBlock's field names and values
block_type: the name of the XBlock's type (i.e. 'course'
or 'problem')
"""
# convert all fields to a dict and filter out parent and children field
fields = dict(
(field, field_value.read_from(item))
for (field, field_value) in six.iteritems(item.fields)
if field not in ['parent', 'children']
)
course_key = item.scope_ids.usage_id.course_key
block_type = item.scope_ids.block_type
# set or reset some defaults
fields['edited_on'] = six.text_type(getattr(item, 'edited_on', ''))
fields['display_name'] = item.display_name_with_default
fields['org'] = course_key.org
fields['course'] = course_key.course
fields['run'] = course_key.run
fields['course_key'] = six.text_type(course_key)
fields['location'] = six.text_type(item.location)
fields['block_type'] = block_type
fields['detached'] = block_type in DETACHED_XBLOCK_TYPES
if block_type == 'course':
# prune the checklists field
if 'checklists' in fields:
del fields['checklists']
# record the time this command was run
fields['time_last_dumped_to_neo4j'] = six.text_type(timezone.now())
return fields, block_type
def coerce_types(value):
"""
Args:
value: the value of an xblock's field
Returns: either the value, a text version of the value, or, if the
value is a list, a list where each element is converted to text.
"""
coerced_value = value
if isinstance(value, list):
coerced_value = [six.text_type(element) for element in coerced_value]
# if it's not one of the types that neo4j accepts,
# just convert it to text
elif not isinstance(value, PRIMITIVE_NEO4J_TYPES):
coerced_value = six.text_type(value)
return coerced_value
def add_to_transaction(neo4j_entities, transaction):
"""
Args:
neo4j_entities: a list of Nodes or Relationships
transaction: a neo4j transaction
"""
for entity in neo4j_entities:
transaction.create(entity)
def get_command_last_run(course_key, graph):
"""
This information is stored on the course node of a course in neo4j
Args:
course_key: a CourseKey
graph: a py2neo Graph
Returns: The datetime that the command was last run, converted into
text, or None, if there's no record of this command last being run.
"""
selector = NodeSelector(graph)
course_node = selector.select(
"course",
course_key=six.text_type(course_key)
).first()
last_this_command_was_run = None
if course_node:
last_this_command_was_run = course_node['time_last_dumped_to_neo4j']
return last_this_command_was_run
def get_course_last_published(course_key):
"""
We use the CourseStructure table to get when this course was last
published.
Args:
course_key: a CourseKey
Returns: The datetime the course was last published at, converted into
text, or None, if there's no record of the last time this course
was published.
"""
try:
structure = CourseStructure.objects.get(course_id=course_key)
course_last_published_date = six.text_type(structure.modified)
except CourseStructure.DoesNotExist:
course_last_published_date = None
return course_last_published_date
def serialize_course(course_id):
"""
Serializes a course into py2neo Nodes and Relationships
Args:
course_id: CourseKey of the course we want to serialize
Returns:
nodes: a list of py2neo Node objects
relationships: a list of py2neo Relationships objects
"""
# create a location to node mapping we'll need later for
# writing relationships
location_to_node = {}
items = modulestore().get_items(course_id)
# create nodes
for item in items:
fields, block_type = serialize_item(item)
for field_name, value in six.iteritems(fields):
fields[field_name] = coerce_types(value)
node = Node(block_type, 'item', **fields)
location_to_node[item.location] = node
# create relationships
relationships = []
for item in items:
previous_child_node = None
for index, child_loc in enumerate(item.get_children()):
parent_node = location_to_node.get(item.location)
child_node = location_to_node.get(child_loc.location)
child_node["index"] = index
if parent_node is not None and child_node is not None:
relationship = Relationship(parent_node, "PARENT_OF", child_node)
relationships.append(relationship)
if previous_child_node:
ordering_relationship = Relationship(
previous_child_node,
"PRECEDES",
child_node,
)
relationships.append(ordering_relationship)
previous_child_node = child_node
nodes = location_to_node.values()
return nodes, relationships
def should_dump_course(course_key, graph):
"""
Only dump the course if it's been changed since the last time it's been
dumped.
Args:
course_key: a CourseKey object.
graph: a py2neo Graph object.
Returns: bool of whether this course should be dumped to neo4j.
"""
last_this_command_was_run = get_command_last_run(course_key, graph)
course_last_published_date = get_course_last_published(course_key)
# if we don't have a record of the last time this command was run,
# we should serialize the course and dump it
if last_this_command_was_run is None:
return True
# if we've serialized the course recently and we have no published
# events, we will not dump it, and so we can skip serializing it
# again here
if last_this_command_was_run and course_last_published_date is None:
return False
# otherwise, serialize and dump the course if the command was run
# before the course's last published event
return last_this_command_was_run < course_last_published_date
@task(routing_key=settings.COURSEGRAPH_JOB_QUEUE)
def dump_course_to_neo4j(course_key_string, credentials):
"""
Serializes a course and writes it to neo4j.
Arguments:
course_key: course key for the course to be exported
credentials (dict): the necessary credentials to connect
to neo4j and create a py2neo `Graph` obje
"""
course_key = CourseKey.from_string(course_key_string)
nodes, relationships = serialize_course(course_key)
celery_log.info(
"Now dumping %s to neo4j: %d nodes and %d relationships",
course_key,
len(nodes),
len(relationships),
)
graph = authenticate_and_create_graph(credentials)
transaction = graph.begin()
course_string = six.text_type(course_key)
try:
# first, delete existing course
transaction.run(
"MATCH (n:item) WHERE n.course_key='{}' DETACH DELETE n".format(
course_string
)
)
# now, re-add it
add_to_transaction(nodes, transaction)
add_to_transaction(relationships, transaction)
transaction.commit()
celery_log.info("Completed dumping %s to neo4j", course_key)
except Exception: # pylint: disable=broad-except
celery_log.exception(
"Error trying to dump course %s to neo4j, rolling back",
course_string
)
transaction.rollback()
class ModuleStoreSerializer(object):
"""
Class with functionality to serialize a modulestore into subgraphs,
one graph per course.
"""
def __init__(self, course_keys):
self.course_keys = course_keys
@classmethod
def create(cls, courses=None, skip=None):
"""
Sets the object's course_keys attribute from the `courses` parameter.
If that parameter isn't furnished, loads all course_keys from the
modulestore.
Filters out course_keys in the `skip` parameter, if provided.
Args:
courses: A list of string serializations of course keys.
For example, ["course-v1:org+course+run"].
skip: Also a list of string serializations of course keys.
"""
if courses:
course_keys = [CourseKey.from_string(course.strip()) for course in courses]
else:
course_keys = [
course.id for course in modulestore().get_course_summaries()
]
if skip is not None:
skip_keys = [CourseKey.from_string(course.strip()) for course in skip]
course_keys = [course_key for course_key in course_keys if course_key not in skip_keys]
return cls(course_keys)
def dump_courses_to_neo4j(self, credentials, override_cache=False):
"""
Method that iterates through a list of courses in a modulestore,
serializes them, then submits tasks to write them to neo4j.
Arguments:
credentials (dict): the necessary credentials to connect
to neo4j and create a py2neo `Graph` object
override_cache: serialize the courses even if they'be been recently
serialized
Returns: two lists--one of the courses that were successfully written
to neo4j and one of courses that were not.
"""
total_number_of_courses = len(self.course_keys)
submitted_courses = []
skipped_courses = []
graph = authenticate_and_create_graph(credentials)
for index, course_key in enumerate(self.course_keys):
# first, clear the request cache to prevent memory leaks
RequestCache.clear_request_cache()
log.info(
"Now submitting %s for export to neo4j: course %d of %d total courses",
course_key,
index + 1,
total_number_of_courses,
)
if not (override_cache or should_dump_course(course_key, graph)):
log.info("skipping submitting %s, since it hasn't changed", course_key)
skipped_courses.append(six.text_type(course_key))
continue
dump_course_to_neo4j.apply_async(
args=[six.text_type(course_key), credentials],
)
submitted_courses.append(six.text_type(course_key))
return submitted_courses, skipped_courses
def authenticate_and_create_graph(credentials):
"""
This function authenticates with neo4j and creates a py2neo graph object
Arguments:
credentials (dict): a dictionary of credentials used to authenticate,
and then create, a py2neo graph object.
Returns: a py2neo `Graph` object.
"""
host = credentials['host']
https_port = credentials['https_port']
http_port = credentials['http_port']
secure = credentials['secure']
neo4j_user = credentials['user']
neo4j_password = credentials['password']
authenticate(
"{host}:{port}".format(
host=host, port=https_port if secure else http_port
),
neo4j_user,
neo4j_password,
)
graph = Graph(
bolt=True,
password=neo4j_password,
user=neo4j_user,
https_port=https_port,
http_port=http_port,
host=host,
secure=secure,
)
return graph
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment