Commit 69d76b69 by Adam Palay

write courses to neo4j in background tasks (EDUCATOR-583)

move tasks into tasks.py

add more logging
parent 14fed7a7
...@@ -498,3 +498,6 @@ AFFILIATE_COOKIE_NAME = ENV_TOKENS.get('AFFILIATE_COOKIE_NAME', AFFILIATE_COOKIE ...@@ -498,3 +498,6 @@ AFFILIATE_COOKIE_NAME = ENV_TOKENS.get('AFFILIATE_COOKIE_NAME', AFFILIATE_COOKIE
############## Settings for Studio Context Sensitive Help ############## ############## Settings for Studio Context Sensitive Help ##############
HELP_TOKENS_BOOKS = ENV_TOKENS.get('HELP_TOKENS_BOOKS', HELP_TOKENS_BOOKS) HELP_TOKENS_BOOKS = ENV_TOKENS.get('HELP_TOKENS_BOOKS', HELP_TOKENS_BOOKS)
############## Settings for CourseGraph ############################
COURSEGRAPH_JOB_QUEUE = LOW_PRIORITY_QUEUE
...@@ -1315,3 +1315,6 @@ COURSE_CATALOG_API_URL = None ...@@ -1315,3 +1315,6 @@ COURSE_CATALOG_API_URL = None
# Queue to use for updating persistent grades # Queue to use for updating persistent grades
RECALCULATE_GRADES_ROUTING_KEY = LOW_PRIORITY_QUEUE RECALCULATE_GRADES_ROUTING_KEY = LOW_PRIORITY_QUEUE
############## Settings for CourseGraph ############################
COURSEGRAPH_JOB_QUEUE = LOW_PRIORITY_QUEUE
...@@ -999,3 +999,6 @@ COURSES_API_CACHE_TIMEOUT = ENV_TOKENS.get('COURSES_API_CACHE_TIMEOUT', COURSES_ ...@@ -999,3 +999,6 @@ COURSES_API_CACHE_TIMEOUT = ENV_TOKENS.get('COURSES_API_CACHE_TIMEOUT', COURSES_
# Add an ICP license for serving content in China if your organization is registered to do so # Add an ICP license for serving content in China if your organization is registered to do so
ICP_LICENSE = ENV_TOKENS.get('ICP_LICENSE', None) ICP_LICENSE = ENV_TOKENS.get('ICP_LICENSE', None)
############## Settings for CourseGraph ############################
COURSEGRAPH_JOB_QUEUE = LOW_PRIORITY_QUEUE
...@@ -3190,3 +3190,6 @@ COURSE_ENROLLMENT_MODES = { ...@@ -3190,3 +3190,6 @@ COURSE_ENROLLMENT_MODES = {
############## Settings for the Discovery App ###################### ############## Settings for the Discovery App ######################
COURSES_API_CACHE_TIMEOUT = 3600 # Value is in seconds COURSES_API_CACHE_TIMEOUT = 3600 # Value is in seconds
############## Settings for CourseGraph ############################
COURSEGRAPH_JOB_QUEUE = LOW_PRIORITY_QUEUE
...@@ -12,3 +12,5 @@ class CoursegraphConfig(AppConfig): ...@@ -12,3 +12,5 @@ class CoursegraphConfig(AppConfig):
AppConfig for courseware app AppConfig for courseware app
""" """
name = 'openedx.core.djangoapps.coursegraph' name = 'openedx.core.djangoapps.coursegraph'
from . import tasks # pylint: disable=unused-variable
...@@ -7,334 +7,12 @@ from __future__ import unicode_literals, print_function ...@@ -7,334 +7,12 @@ from __future__ import unicode_literals, print_function
import logging import logging
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.utils import six, timezone from django.utils import six
from opaque_keys.edx.keys import CourseKey
from py2neo import Graph, Node, Relationship, authenticate, NodeSelector
from py2neo.compat import integer, string, unicode as neo4j_unicode
from request_cache.middleware import RequestCache
from xmodule.modulestore.django import modulestore
from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES
from openedx.core.djangoapps.content.course_structures.models import CourseStructure from openedx.core.djangoapps.coursegraph.tasks import ModuleStoreSerializer
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# When testing locally, neo4j's bolt logger was noisy, so we'll only have it
# emit logs if there's an error.
bolt_log = logging.getLogger('neo4j.bolt') # pylint: disable=invalid-name
bolt_log.setLevel(logging.ERROR)
PRIMITIVE_NEO4J_TYPES = (integer, string, neo4j_unicode, float, bool)
class ModuleStoreSerializer(object):
"""
Class with functionality to serialize a modulestore into subgraphs,
one graph per course.
"""
def __init__(self, course_keys):
self.course_keys = course_keys
@classmethod
def create(cls, courses=None, skip=None):
"""
Sets the object's course_keys attribute from the `courses` parameter.
If that parameter isn't furnished, loads all course_keys from the
modulestore.
Filters out course_keys in the `skip` parameter, if provided.
Arguments:
courses: A list of string serializations of course keys.
For example, ["course-v1:org+course+run"].
skip: Also a list of string serializations of course keys.
Returns:
a ModulestoreSerializer instance
"""
if courses:
course_keys = [CourseKey.from_string(course.strip()) for course in courses]
else:
course_keys = [
course.id for course in modulestore().get_course_summaries()
]
if skip is not None:
skip_keys = set([CourseKey.from_string(course.strip()) for course in skip])
course_keys = [course_key for course_key in course_keys if course_key not in skip_keys]
return cls(course_keys)
@staticmethod
def serialize_item(item):
"""
Arguments:
item: an XBlock
Returns:
fields: a dictionary of an XBlock's field names and values
block_type: the name of the XBlock's type (i.e. 'course'
or 'problem')
"""
# convert all fields to a dict and filter out parent and children field
fields = dict(
(field, field_value.read_from(item))
for (field, field_value) in six.iteritems(item.fields)
if field not in ['parent', 'children']
)
course_key = item.scope_ids.usage_id.course_key
block_type = item.scope_ids.block_type
# set or reset some defaults
fields['edited_on'] = six.text_type(getattr(item, 'edited_on', ''))
fields['display_name'] = item.display_name_with_default
fields['org'] = course_key.org
fields['course'] = course_key.course
fields['run'] = course_key.run
fields['course_key'] = six.text_type(course_key)
fields['location'] = six.text_type(item.location)
fields['block_type'] = block_type
fields['detached'] = block_type in DETACHED_XBLOCK_TYPES
if block_type == 'course':
# prune the checklists field
if 'checklists' in fields:
del fields['checklists']
# record the time this command was run
fields['time_last_dumped_to_neo4j'] = six.text_type(timezone.now())
return fields, block_type
def serialize_course(self, course_id):
"""
Serializes a course into py2neo Nodes and Relationships
Arguments:
course_id: CourseKey of the course we want to serialize
Returns:
nodes: a list of py2neo Node objects
relationships: a list of py2neo Relationships objects
"""
# create a location to node mapping we'll need later for
# writing relationships
location_to_node = {}
items = modulestore().get_items(course_id)
# create nodes
for item in items:
fields, block_type = self.serialize_item(item)
for field_name, value in six.iteritems(fields):
fields[field_name] = self.coerce_types(value)
node = Node(block_type, 'item', **fields)
location_to_node[item.location] = node
# create relationships
relationships = []
for item in items:
previous_child_node = None
for index, child_loc in enumerate(item.get_children()):
parent_node = location_to_node.get(item.location)
child_node = location_to_node.get(child_loc.location)
child_node["index"] = index
if parent_node is not None and child_node is not None:
relationship = Relationship(parent_node, "PARENT_OF", child_node)
relationships.append(relationship)
if previous_child_node:
ordering_relationship = Relationship(
previous_child_node, "PRECEDES", child_node
)
relationships.append(ordering_relationship)
previous_child_node = child_node
nodes = location_to_node.values()
return nodes, relationships
@staticmethod
def coerce_types(value):
"""
Arguments:
value: the value of an xblock's field
Returns: either the value, a text version of the value, or, if the
value is a list, a list where each element is converted to text.
"""
coerced_value = value
if isinstance(value, list):
coerced_value = [six.text_type(element) for element in coerced_value]
# if it's not one of the types that neo4j accepts,
# just convert it to text
elif not isinstance(value, PRIMITIVE_NEO4J_TYPES):
coerced_value = six.text_type(value)
return coerced_value
@staticmethod
def add_to_transaction(neo4j_entities, transaction):
"""
Arguments:
neo4j_entities: a list of Nodes or Relationships
transaction: a neo4j transaction
"""
for entity in neo4j_entities:
transaction.create(entity)
@staticmethod
def get_command_last_run(course_key, graph):
"""
This information is stored on the course node of a course in neo4j
Arguments:
course_key: a CourseKey
graph: a py2neo Graph
Returns: The datetime that the command was last run, converted into
text, or None, if there's no record of this command last being run.
"""
selector = NodeSelector(graph)
course_node = selector.select(
"course",
course_key=six.text_type(course_key)
).first()
last_this_command_was_run = None
if course_node:
last_this_command_was_run = course_node['time_last_dumped_to_neo4j']
return last_this_command_was_run
@staticmethod
def get_course_last_published(course_key):
"""
We use the CourseStructure table to get when this course was last
published.
Arguments:
course_key: a CourseKey
Returns: The datetime the course was last published at, converted into
text, or None, if there's no record of the last time this course
was published.
"""
try:
structure = CourseStructure.objects.get(course_id=course_key)
course_last_published_date = six.text_type(structure.modified)
except CourseStructure.DoesNotExist:
course_last_published_date = None
return course_last_published_date
def should_dump_course(self, course_key, graph):
"""
Only dump the course if it's been changed since the last time it's been
dumped.
Arguments:
course_key: a CourseKey object.
graph: a py2neo Graph object.
Returns: bool of whether this course should be dumped to neo4j.
"""
last_this_command_was_run = self.get_command_last_run(course_key, graph)
course_last_published_date = self.get_course_last_published(course_key)
# if we don't have a record of the last time this command was run,
# we should serialize the course and dump it
if last_this_command_was_run is None:
return True
# if we've serialized the course recently and we have no published
# events, we will not dump it, and so we can skip serializing it
# again here
if last_this_command_was_run and course_last_published_date is None:
return False
# otherwise, serialize and dump the course if the command was run
# before the course's last published event
return last_this_command_was_run < course_last_published_date
def dump_course_to_neo4j(self, course_key, graph):
"""
Serializes a course and writes it to neo4j.
Arguments:
course_key: course key for the course to be exported
graph: py2neo graph object
"""
nodes, relationships = self.serialize_course(course_key)
log.info(
"%d nodes and %d relationships in %s",
len(nodes),
len(relationships),
course_key,
)
transaction = graph.begin()
course_string = six.text_type(course_key)
try:
# first, delete existing course
transaction.run(
"MATCH (n:item) WHERE n.course_key='{}' DETACH DELETE n".format(
course_string
)
)
# now, re-add it
self.add_to_transaction(nodes, transaction)
self.add_to_transaction(relationships, transaction)
transaction.commit()
except Exception: # pylint: disable=broad-except
log.exception(
"Error trying to dump course %s to neo4j, rolling back",
course_string
)
transaction.rollback()
def dump_courses_to_neo4j(self, graph, override_cache=False):
"""
Method that iterates through a list of courses in a modulestore,
serializes them, then submits tasks to write them to neo4j.
Arguments:
graph: py2neo graph object
override_cache: serialize the courses even if they'be been recently
serialized
Returns: two lists--one of the courses that were successfully written
to neo4j and one of courses that were not.
"""
total_number_of_courses = len(self.course_keys)
submitted_courses = []
skipped_courses = []
for index, course_key in enumerate(self.course_keys):
# first, clear the request cache to prevent memory leaks
RequestCache.clear_request_cache()
log.info(
"Now exporting %s to neo4j: course %d of %d total courses",
course_key,
index + 1,
total_number_of_courses,
)
if not (override_cache or self.should_dump_course(course_key, graph)):
log.info("skipping dumping %s, since it hasn't changed", course_key)
skipped_courses.append(unicode(course_key))
else:
self.dump_course_to_neo4j(course_key, graph)
submitted_courses.append(unicode(course_key))
return submitted_courses, skipped_courses
class Command(BaseCommand): class Command(BaseCommand):
""" """
...@@ -377,33 +55,11 @@ class Command(BaseCommand): ...@@ -377,33 +55,11 @@ class Command(BaseCommand):
Iterates through each course, serializes them into graphs, and saves Iterates through each course, serializes them into graphs, and saves
those graphs to neo4j. those graphs to neo4j.
""" """
host = options['host']
https_port = options['https_port']
http_port = options['http_port']
secure = options['secure']
neo4j_user = options['user']
neo4j_password = options['password']
authenticate(
"{host}:{port}".format(host=host, port=https_port if secure else http_port),
neo4j_user,
neo4j_password,
)
graph = Graph(
bolt=True,
password=neo4j_password,
user=neo4j_user,
https_port=https_port,
http_port=http_port,
host=host,
secure=secure,
)
mss = ModuleStoreSerializer.create(options['courses'], options['skip']) mss = ModuleStoreSerializer.create(options['courses'], options['skip'])
submitted_courses, skipped_courses = mss.dump_courses_to_neo4j( submitted_courses, skipped_courses = mss.dump_courses_to_neo4j(
graph, override_cache=options['override'] options, override_cache=options['override']
) )
log.info( log.info(
...@@ -413,7 +69,7 @@ class Command(BaseCommand): ...@@ -413,7 +69,7 @@ class Command(BaseCommand):
) )
if not submitted_courses: if not submitted_courses:
print("No courses exported to neo4j at all!") print("No courses submitted for export to neo4j at all!")
return return
if submitted_courses: if submitted_courses:
......
...@@ -14,12 +14,18 @@ from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase ...@@ -14,12 +14,18 @@ from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory
from openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j import ( from openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j import (
ModuleStoreSerializer, ModuleStoreSerializer
) )
from openedx.core.djangoapps.coursegraph.management.commands.tests.utils import ( from openedx.core.djangoapps.coursegraph.management.commands.tests.utils import (
MockGraph, MockGraph,
MockNodeSelector, MockNodeSelector,
) )
from openedx.core.djangoapps.coursegraph.tasks import (
serialize_item,
serialize_course,
coerce_types,
should_dump_course,
)
from openedx.core.djangoapps.content.course_structures.signals import ( from openedx.core.djangoapps.content.course_structures.signals import (
listen_for_course_publish listen_for_course_publish
) )
...@@ -109,8 +115,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase): ...@@ -109,8 +115,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase):
Tests for the dump to neo4j management command Tests for the dump to neo4j management command
""" """
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.Graph') @mock.patch('openedx.core.djangoapps.coursegraph.tasks.Graph')
@ddt.data(1, 2) @ddt.data(1, 2)
def test_dump_specific_courses(self, number_of_courses, mock_graph_class, mock_selector_class): def test_dump_specific_courses(self, number_of_courses, mock_graph_class, mock_selector_class):
""" """
...@@ -134,8 +140,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase): ...@@ -134,8 +140,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase):
number_rollbacks=0 number_rollbacks=0
) )
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.Graph') @mock.patch('openedx.core.djangoapps.coursegraph.tasks.Graph')
def test_dump_skip_course(self, mock_graph_class, mock_selector_class): def test_dump_skip_course(self, mock_graph_class, mock_selector_class):
""" """
Test that you can skip courses. Test that you can skip courses.
...@@ -160,8 +166,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase): ...@@ -160,8 +166,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase):
number_rollbacks=0, number_rollbacks=0,
) )
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.Graph') @mock.patch('openedx.core.djangoapps.coursegraph.tasks.Graph')
def test_dump_skip_beats_specifying(self, mock_graph_class, mock_selector_class): def test_dump_skip_beats_specifying(self, mock_graph_class, mock_selector_class):
""" """
Test that if you skip and specify the same course, you'll skip it. Test that if you skip and specify the same course, you'll skip it.
...@@ -187,8 +193,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase): ...@@ -187,8 +193,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase):
number_rollbacks=0, number_rollbacks=0,
) )
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.Graph') @mock.patch('openedx.core.djangoapps.coursegraph.tasks.Graph')
def test_dump_all_courses(self, mock_graph_class, mock_selector_class): def test_dump_all_courses(self, mock_graph_class, mock_selector_class):
""" """
Test if you don't specify which courses to dump, then you'll dump Test if you don't specify which courses to dump, then you'll dump
...@@ -229,7 +235,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -229,7 +235,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
""" """
Tests the serialize_item method. Tests the serialize_item method.
""" """
fields, label = self.mss.serialize_item(self.course) fields, label = serialize_item(self.course)
self.assertEqual(label, "course") self.assertEqual(label, "course")
self.assertIn("edited_on", fields.keys()) self.assertIn("edited_on", fields.keys())
self.assertIn("display_name", fields.keys()) self.assertIn("display_name", fields.keys())
...@@ -246,7 +252,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -246,7 +252,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
""" """
Tests the serialize_course method. Tests the serialize_course method.
""" """
nodes, relationships = self.mss.serialize_course(self.course.id) nodes, relationships = serialize_course(self.course.id)
self.assertEqual(len(nodes), 9) self.assertEqual(len(nodes), 9)
# the course has 7 "PARENT_OF" relationships and 3 "PRECEDES" # the course has 7 "PARENT_OF" relationships and 3 "PRECEDES"
self.assertEqual(len(relationships), 10) self.assertEqual(len(relationships), 10)
...@@ -282,7 +288,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -282,7 +288,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
Returns: Returns:
A tuple of the string representations of those XBlocks' locations. A tuple of the string representations of those XBlocks' locations.
""" """
return (unicode(xblock1.location), unicode(xblock2.location)) return (six.text_type(xblock1.location), six.text_type(xblock2.location))
def assertBlockPairIsRelationship(self, xblock1, xblock2, relationships, relationship_type): def assertBlockPairIsRelationship(self, xblock1, xblock2, relationships, relationship_type):
""" """
...@@ -306,7 +312,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -306,7 +312,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
""" """
Tests that two nodes that should have a precedes relationship have it. Tests that two nodes that should have a precedes relationship have it.
""" """
__, relationships = self.mss.serialize_course(self.course.id) __, relationships = serialize_course(self.course.id)
self.assertBlockPairIsRelationship(self.video, self.video2, relationships, "PRECEDES") self.assertBlockPairIsRelationship(self.video, self.video2, relationships, "PRECEDES")
self.assertBlockPairIsNotRelationship(self.video2, self.video, relationships, "PRECEDES") self.assertBlockPairIsNotRelationship(self.video2, self.video, relationships, "PRECEDES")
self.assertBlockPairIsNotRelationship(self.vertical, self.video, relationships, "PRECEDES") self.assertBlockPairIsNotRelationship(self.vertical, self.video, relationships, "PRECEDES")
...@@ -316,7 +322,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -316,7 +322,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
""" """
Test that two nodes that should have a parent_of relationship have it. Test that two nodes that should have a parent_of relationship have it.
""" """
__, relationships = self.mss.serialize_course(self.course.id) __, relationships = serialize_course(self.course.id)
self.assertBlockPairIsRelationship(self.vertical, self.video, relationships, "PARENT_OF") self.assertBlockPairIsRelationship(self.vertical, self.video, relationships, "PARENT_OF")
self.assertBlockPairIsRelationship(self.vertical, self.html, relationships, "PARENT_OF") self.assertBlockPairIsRelationship(self.vertical, self.html, relationships, "PARENT_OF")
self.assertBlockPairIsRelationship(self.course, self.chapter, relationships, "PARENT_OF") self.assertBlockPairIsRelationship(self.course, self.chapter, relationships, "PARENT_OF")
...@@ -328,7 +334,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -328,7 +334,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
""" """
Test that we add index values on nodes Test that we add index values on nodes
""" """
nodes, relationships = self.mss.serialize_course(self.course.id) nodes, relationships = serialize_course(self.course.id)
# the html node should have 0 index, and the problem should have 1 # the html node should have 0 index, and the problem should have 1
html_nodes = [node for node in nodes if node['block_type'] == 'html'] html_nodes = [node for node in nodes if node['block_type'] == 'html']
...@@ -359,19 +365,22 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -359,19 +365,22 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
""" """
Tests the coerce_types helper Tests the coerce_types helper
""" """
coerced_value = self.mss.coerce_types(original_value) coerced_value = coerce_types(original_value)
self.assertEqual(coerced_value, coerced_expected) self.assertEqual(coerced_value, coerced_expected)
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
def test_dump_to_neo4j(self, mock_selector_class): @mock.patch('openedx.core.djangoapps.coursegraph.tasks.authenticate_and_create_graph')
def test_dump_to_neo4j(self, mock_graph_constructor, mock_selector_class):
""" """
Tests the dump_to_neo4j method works against a mock Tests the dump_to_neo4j method works against a mock
py2neo Graph py2neo Graph
""" """
mock_graph = MockGraph() mock_graph = MockGraph()
mock_graph_constructor.return_value = mock_graph
mock_selector_class.return_value = MockNodeSelector(mock_graph) mock_selector_class.return_value = MockNodeSelector(mock_graph)
mock_credentials = mock.Mock()
submitted, skipped = self.mss.dump_courses_to_neo4j(mock_graph) submitted, skipped = self.mss.dump_courses_to_neo4j(mock_credentials)
self.assertCourseDump( self.assertCourseDump(
mock_graph, mock_graph,
...@@ -386,16 +395,19 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -386,16 +395,19 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
self.assertEqual(len(mock_graph.nodes), 11) self.assertEqual(len(mock_graph.nodes), 11)
self.assertItemsEqual(submitted, self.course_strings) self.assertItemsEqual(submitted, self.course_strings)
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
def test_dump_to_neo4j_rollback(self, mock_selector_class): @mock.patch('openedx.core.djangoapps.coursegraph.tasks.authenticate_and_create_graph')
def test_dump_to_neo4j_rollback(self, mock_graph_constructor, mock_selector_class):
""" """
Tests that the the dump_to_neo4j method handles the case where there's Tests that the the dump_to_neo4j method handles the case where there's
an exception trying to write to the neo4j database. an exception trying to write to the neo4j database.
""" """
mock_graph = MockGraph(transaction_errors=True) mock_graph = MockGraph(transaction_errors=True)
mock_graph_constructor.return_value = mock_graph
mock_selector_class.return_value = MockNodeSelector(mock_graph) mock_selector_class.return_value = MockNodeSelector(mock_graph)
mock_credentials = mock.Mock()
submitted, skipped = self.mss.dump_courses_to_neo4j(mock_graph) submitted, skipped = self.mss.dump_courses_to_neo4j(mock_credentials)
self.assertCourseDump( self.assertCourseDump(
mock_graph, mock_graph,
...@@ -406,50 +418,64 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -406,50 +418,64 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
self.assertItemsEqual(submitted, self.course_strings) self.assertItemsEqual(submitted, self.course_strings)
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.authenticate_and_create_graph')
@ddt.data((True, 2), (False, 0)) @ddt.data((True, 2), (False, 0))
@ddt.unpack @ddt.unpack
def test_dump_to_neo4j_cache(self, override_cache, expected_number_courses, mock_selector_class): def test_dump_to_neo4j_cache(
self,
override_cache,
expected_number_courses,
mock_graph_constructor,
mock_selector_class,
):
""" """
Tests the caching mechanism and override to make sure we only publish Tests the caching mechanism and override to make sure we only publish
recently updated courses. recently updated courses.
""" """
mock_graph = MockGraph() mock_graph = MockGraph()
mock_graph_constructor.return_value = mock_graph
mock_selector_class.return_value = MockNodeSelector(mock_graph) mock_selector_class.return_value = MockNodeSelector(mock_graph)
mock_credentials = mock.Mock()
# run once to warm the cache # run once to warm the cache
self.mss.dump_courses_to_neo4j( self.mss.dump_courses_to_neo4j(
mock_graph, override_cache=override_cache mock_credentials, override_cache=override_cache
) )
# when run the second time, only dump courses if the cache override # when run the second time, only dump courses if the cache override
# is enabled # is enabled
submitted, __ = self.mss.dump_courses_to_neo4j( submitted, __ = self.mss.dump_courses_to_neo4j(
mock_graph, override_cache=override_cache mock_credentials, override_cache=override_cache
) )
self.assertEqual(len(submitted), expected_number_courses) self.assertEqual(len(submitted), expected_number_courses)
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
def test_dump_to_neo4j_published(self, mock_selector_class): @mock.patch('openedx.core.djangoapps.coursegraph.tasks.authenticate_and_create_graph')
def test_dump_to_neo4j_published(self, mock_graph_constructor, mock_selector_class):
""" """
Tests that we only dump those courses that have been published after Tests that we only dump those courses that have been published after
the last time the command was been run. the last time the command was been run.
""" """
mock_graph = MockGraph() mock_graph = MockGraph()
mock_graph_constructor.return_value = mock_graph
mock_selector_class.return_value = MockNodeSelector(mock_graph) mock_selector_class.return_value = MockNodeSelector(mock_graph)
mock_credentials = mock.Mock()
# run once to warm the cache # run once to warm the cache
submitted, skipped = self.mss.dump_courses_to_neo4j(mock_graph) submitted, skipped = self.mss.dump_courses_to_neo4j(mock_credentials)
self.assertEqual(len(submitted), len(self.course_strings)) self.assertEqual(len(submitted), len(self.course_strings))
# simulate one of the courses being published # simulate one of the courses being published
listen_for_course_publish(None, self.course.id) listen_for_course_publish(None, self.course.id)
# make sure only the published course was dumped # make sure only the published course was dumped
submitted, __ = self.mss.dump_courses_to_neo4j(mock_graph) submitted, __ = self.mss.dump_courses_to_neo4j(mock_credentials)
self.assertEqual(len(submitted), 1) self.assertEqual(len(submitted), 1)
self.assertEqual(submitted[0], unicode(self.course.id)) self.assertEqual(submitted[0], six.text_type(self.course.id))
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.get_course_last_published')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.get_command_last_run')
@ddt.data( @ddt.data(
(six.text_type(datetime(2016, 3, 30)), six.text_type(datetime(2016, 3, 31)), True), (six.text_type(datetime(2016, 3, 30)), six.text_type(datetime(2016, 3, 31)), True),
(six.text_type(datetime(2016, 3, 31)), six.text_type(datetime(2016, 3, 30)), False), (six.text_type(datetime(2016, 3, 31)), six.text_type(datetime(2016, 3, 30)), False),
...@@ -458,17 +484,23 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -458,17 +484,23 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
(None, None, True), (None, None, True),
) )
@ddt.unpack @ddt.unpack
def test_should_dump_course(self, last_command_run, last_course_published, should_dump): def test_should_dump_course(
self,
last_command_run,
last_course_published,
should_dump,
mock_get_command_last_run,
mock_get_course_last_published,
):
""" """
Tests whether a course should be dumped given the last time it was Tests whether a course should be dumped given the last time it was
dumped and the last time it was published. dumped and the last time it was published.
""" """
mss = ModuleStoreSerializer.create() mock_get_command_last_run.return_value = last_command_run
mss.get_command_last_run = lambda course_key, graph: last_command_run mock_get_course_last_published.return_value = last_course_published
mss.get_course_last_published = lambda course_key: last_course_published
mock_course_key = mock.Mock mock_course_key = mock.Mock
mock_graph = mock.Mock() mock_graph = mock.Mock()
self.assertEqual( self.assertEqual(
mss.should_dump_course(mock_course_key, mock_graph), should_dump_course(mock_course_key, mock_graph),
should_dump, should_dump,
) )
"""
This file contains a management command for exporting the modulestore to
neo4j, a graph database.
"""
from __future__ import unicode_literals, print_function
import logging
from celery import task
from django.conf import settings
from django.utils import six, timezone
from opaque_keys.edx.keys import CourseKey
from py2neo import Graph, Node, Relationship, authenticate, NodeSelector
from py2neo.compat import integer, string, unicode as neo4j_unicode
from request_cache.middleware import RequestCache
from xmodule.modulestore.django import modulestore
from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES
from openedx.core.djangoapps.content.course_structures.models import CourseStructure
log = logging.getLogger(__name__)
celery_log = logging.getLogger('edx.celery.task')
# When testing locally, neo4j's bolt logger was noisy, so we'll only have it
# emit logs if there's an error.
bolt_log = logging.getLogger('neo4j.bolt') # pylint: disable=invalid-name
bolt_log.setLevel(logging.ERROR)
PRIMITIVE_NEO4J_TYPES = (integer, string, neo4j_unicode, float, bool)
def serialize_item(item):
"""
Args:
item: an XBlock
Returns:
fields: a dictionary of an XBlock's field names and values
block_type: the name of the XBlock's type (i.e. 'course'
or 'problem')
"""
# convert all fields to a dict and filter out parent and children field
fields = dict(
(field, field_value.read_from(item))
for (field, field_value) in six.iteritems(item.fields)
if field not in ['parent', 'children']
)
course_key = item.scope_ids.usage_id.course_key
block_type = item.scope_ids.block_type
# set or reset some defaults
fields['edited_on'] = six.text_type(getattr(item, 'edited_on', ''))
fields['display_name'] = item.display_name_with_default
fields['org'] = course_key.org
fields['course'] = course_key.course
fields['run'] = course_key.run
fields['course_key'] = six.text_type(course_key)
fields['location'] = six.text_type(item.location)
fields['block_type'] = block_type
fields['detached'] = block_type in DETACHED_XBLOCK_TYPES
if block_type == 'course':
# prune the checklists field
if 'checklists' in fields:
del fields['checklists']
# record the time this command was run
fields['time_last_dumped_to_neo4j'] = six.text_type(timezone.now())
return fields, block_type
def coerce_types(value):
"""
Args:
value: the value of an xblock's field
Returns: either the value, a text version of the value, or, if the
value is a list, a list where each element is converted to text.
"""
coerced_value = value
if isinstance(value, list):
coerced_value = [six.text_type(element) for element in coerced_value]
# if it's not one of the types that neo4j accepts,
# just convert it to text
elif not isinstance(value, PRIMITIVE_NEO4J_TYPES):
coerced_value = six.text_type(value)
return coerced_value
def add_to_transaction(neo4j_entities, transaction):
"""
Args:
neo4j_entities: a list of Nodes or Relationships
transaction: a neo4j transaction
"""
for entity in neo4j_entities:
transaction.create(entity)
def get_command_last_run(course_key, graph):
"""
This information is stored on the course node of a course in neo4j
Args:
course_key: a CourseKey
graph: a py2neo Graph
Returns: The datetime that the command was last run, converted into
text, or None, if there's no record of this command last being run.
"""
selector = NodeSelector(graph)
course_node = selector.select(
"course",
course_key=six.text_type(course_key)
).first()
last_this_command_was_run = None
if course_node:
last_this_command_was_run = course_node['time_last_dumped_to_neo4j']
return last_this_command_was_run
def get_course_last_published(course_key):
"""
We use the CourseStructure table to get when this course was last
published.
Args:
course_key: a CourseKey
Returns: The datetime the course was last published at, converted into
text, or None, if there's no record of the last time this course
was published.
"""
try:
structure = CourseStructure.objects.get(course_id=course_key)
course_last_published_date = six.text_type(structure.modified)
except CourseStructure.DoesNotExist:
course_last_published_date = None
return course_last_published_date
def serialize_course(course_id):
"""
Serializes a course into py2neo Nodes and Relationships
Args:
course_id: CourseKey of the course we want to serialize
Returns:
nodes: a list of py2neo Node objects
relationships: a list of py2neo Relationships objects
"""
# create a location to node mapping we'll need later for
# writing relationships
location_to_node = {}
items = modulestore().get_items(course_id)
# create nodes
for item in items:
fields, block_type = serialize_item(item)
for field_name, value in six.iteritems(fields):
fields[field_name] = coerce_types(value)
node = Node(block_type, 'item', **fields)
location_to_node[item.location] = node
# create relationships
relationships = []
for item in items:
previous_child_node = None
for index, child_loc in enumerate(item.get_children()):
parent_node = location_to_node.get(item.location)
child_node = location_to_node.get(child_loc.location)
child_node["index"] = index
if parent_node is not None and child_node is not None:
relationship = Relationship(parent_node, "PARENT_OF", child_node)
relationships.append(relationship)
if previous_child_node:
ordering_relationship = Relationship(
previous_child_node,
"PRECEDES",
child_node,
)
relationships.append(ordering_relationship)
previous_child_node = child_node
nodes = location_to_node.values()
return nodes, relationships
def should_dump_course(course_key, graph):
"""
Only dump the course if it's been changed since the last time it's been
dumped.
Args:
course_key: a CourseKey object.
graph: a py2neo Graph object.
Returns: bool of whether this course should be dumped to neo4j.
"""
last_this_command_was_run = get_command_last_run(course_key, graph)
course_last_published_date = get_course_last_published(course_key)
# if we don't have a record of the last time this command was run,
# we should serialize the course and dump it
if last_this_command_was_run is None:
return True
# if we've serialized the course recently and we have no published
# events, we will not dump it, and so we can skip serializing it
# again here
if last_this_command_was_run and course_last_published_date is None:
return False
# otherwise, serialize and dump the course if the command was run
# before the course's last published event
return last_this_command_was_run < course_last_published_date
@task
def dump_course_to_neo4j(course_key_string, credentials):
"""
Serializes a course and writes it to neo4j.
Arguments:
course_key: course key for the course to be exported
credentials (dict): the necessary credentials to connect
to neo4j and create a py2neo `Graph` obje
"""
course_key = CourseKey.from_string(course_key_string)
nodes, relationships = serialize_course(course_key)
celery_log.info(
"Now dumping %s to neo4j: %d nodes and %d relationships",
course_key,
len(nodes),
len(relationships),
)
graph = authenticate_and_create_graph(credentials)
transaction = graph.begin()
course_string = six.text_type(course_key)
try:
# first, delete existing course
transaction.run(
"MATCH (n:item) WHERE n.course_key='{}' DETACH DELETE n".format(
course_string
)
)
# now, re-add it
add_to_transaction(nodes, transaction)
add_to_transaction(relationships, transaction)
transaction.commit()
celery_log.info("Completed dumping %s to neo4j", course_key)
except Exception: # pylint: disable=broad-except
celery_log.exception(
"Error trying to dump course %s to neo4j, rolling back",
course_string
)
transaction.rollback()
class ModuleStoreSerializer(object):
"""
Class with functionality to serialize a modulestore into subgraphs,
one graph per course.
"""
def __init__(self, course_keys):
self.course_keys = course_keys
@classmethod
def create(cls, courses=None, skip=None):
"""
Sets the object's course_keys attribute from the `courses` parameter.
If that parameter isn't furnished, loads all course_keys from the
modulestore.
Filters out course_keys in the `skip` parameter, if provided.
Args:
courses: A list of string serializations of course keys.
For example, ["course-v1:org+course+run"].
skip: Also a list of string serializations of course keys.
"""
if courses:
course_keys = [CourseKey.from_string(course.strip()) for course in courses]
else:
course_keys = [
course.id for course in modulestore().get_course_summaries()
]
if skip is not None:
skip_keys = [CourseKey.from_string(course.strip()) for course in skip]
course_keys = [course_key for course_key in course_keys if course_key not in skip_keys]
return cls(course_keys)
def dump_courses_to_neo4j(self, credentials, override_cache=False):
"""
Method that iterates through a list of courses in a modulestore,
serializes them, then submits tasks to write them to neo4j.
Arguments:
credentials (dict): the necessary credentials to connect
to neo4j and create a py2neo `Graph` object
override_cache: serialize the courses even if they'be been recently
serialized
Returns: two lists--one of the courses that were successfully written
to neo4j and one of courses that were not.
"""
total_number_of_courses = len(self.course_keys)
submitted_courses = []
skipped_courses = []
graph = authenticate_and_create_graph(credentials)
for index, course_key in enumerate(self.course_keys):
# first, clear the request cache to prevent memory leaks
RequestCache.clear_request_cache()
log.info(
"Now submitting %s for export to neo4j: course %d of %d total courses",
course_key,
index + 1,
total_number_of_courses,
)
if not (override_cache or should_dump_course(course_key, graph)):
log.info("skipping submitting %s, since it hasn't changed", course_key)
skipped_courses.append(six.text_type(course_key))
continue
dump_course_to_neo4j.apply_async(
args=[six.text_type(course_key), credentials],
routing_key=settings.COURSEGRAPH_JOB_QUEUE,
)
submitted_courses.append(six.text_type(course_key))
return submitted_courses, skipped_courses
def authenticate_and_create_graph(credentials):
"""
This function authenticates with neo4j and creates a py2neo graph object
Arguments:
credentials (dict): a dictionary of credentials used to authenticate,
and then create, a py2neo graph object.
Returns: a py2neo `Graph` object.
"""
host = credentials['host']
https_port = credentials['https_port']
http_port = credentials['http_port']
secure = credentials['secure']
neo4j_user = credentials['user']
neo4j_password = credentials['password']
authenticate(
"{host}:{port}".format(
host=host, port=https_port if secure else http_port
),
neo4j_user,
neo4j_password,
)
graph = Graph(
bolt=True,
password=neo4j_password,
user=neo4j_user,
https_port=https_port,
http_port=http_port,
host=host,
secure=secure,
)
return graph
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment