Commit da58ad2a by Adam Palay

only dump courses to neo4j if they've been updated since the last time they were dumped

improvements to the command line interface for caching
parent 01a0f6d6
...@@ -878,6 +878,9 @@ INSTALLED_APPS = ( ...@@ -878,6 +878,9 @@ INSTALLED_APPS = (
'openedx.core.djangoapps.content.course_structures.apps.CourseStructuresConfig', 'openedx.core.djangoapps.content.course_structures.apps.CourseStructuresConfig',
'openedx.core.djangoapps.content.block_structure.apps.BlockStructureConfig', 'openedx.core.djangoapps.content.block_structure.apps.BlockStructureConfig',
# Coursegraph
'openedx.core.djangoapps.coursegraph.apps.CoursegraphConfig',
# Credit courses # Credit courses
'openedx.core.djangoapps.credit', 'openedx.core.djangoapps.credit',
......
...@@ -2034,6 +2034,9 @@ INSTALLED_APPS = ( ...@@ -2034,6 +2034,9 @@ INSTALLED_APPS = (
'openedx.core.djangoapps.content.block_structure.apps.BlockStructureConfig', 'openedx.core.djangoapps.content.block_structure.apps.BlockStructureConfig',
'lms.djangoapps.course_blocks', 'lms.djangoapps.course_blocks',
# Coursegraph
'openedx.core.djangoapps.coursegraph.apps.CoursegraphConfig',
# Old course structure API # Old course structure API
'course_structure_api', 'course_structure_api',
......
"""
Coursegraph Application Configuration
Signal handlers are connected here.
"""
from django.apps import AppConfig
class CoursegraphConfig(AppConfig):
"""
AppConfig for courseware app
"""
name = 'openedx.core.djangoapps.coursegraph'
def ready(self):
"""
Import signals on startup
"""
from openedx.core.djangoapps.coursegraph import signals # pylint: disable=unused-variable
...@@ -2,18 +2,23 @@ ...@@ -2,18 +2,23 @@
This file contains a management command for exporting the modulestore to This file contains a management command for exporting the modulestore to
neo4j, a graph database. neo4j, a graph database.
""" """
from __future__ import unicode_literals from __future__ import unicode_literals, print_function
import logging import logging
from django.conf import settings from django.core.management.base import BaseCommand
from django.core.management.base import BaseCommand, CommandError
from django.utils import six from django.utils import six
from opaque_keys.edx.keys import CourseKey
from py2neo import Graph, Node, Relationship, authenticate from py2neo import Graph, Node, Relationship, authenticate
from py2neo.compat import integer, string, unicode as neo4j_unicode from py2neo.compat import integer, string, unicode as neo4j_unicode
from request_cache.middleware import RequestCache from request_cache.middleware import RequestCache
from xmodule.modulestore.django import modulestore from xmodule.modulestore.django import modulestore
from opaque_keys.edx.keys import CourseKey
from openedx.core.djangoapps.coursegraph.utils import (
CommandLastRunCache,
CourseLastPublishedCache,
)
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -25,13 +30,17 @@ bolt_log.setLevel(logging.ERROR) ...@@ -25,13 +30,17 @@ bolt_log.setLevel(logging.ERROR)
ITERABLE_NEO4J_TYPES = (tuple, list, set, frozenset) ITERABLE_NEO4J_TYPES = (tuple, list, set, frozenset)
PRIMITIVE_NEO4J_TYPES = (integer, string, neo4j_unicode, float, bool) PRIMITIVE_NEO4J_TYPES = (integer, string, neo4j_unicode, float, bool)
COMMAND_LAST_RUN_CACHE = CommandLastRunCache()
COURSE_LAST_PUBLISHED_CACHE = CourseLastPublishedCache()
class ModuleStoreSerializer(object): class ModuleStoreSerializer(object):
""" """
Class with functionality to serialize a modulestore into subgraphs, Class with functionality to serialize a modulestore into subgraphs,
one graph per course. one graph per course.
""" """
def load_course_keys(self, courses=None):
def __init__(self, courses=None):
""" """
Sets the object's course_keys attribute from the `courses` parameter. Sets the object's course_keys attribute from the `courses` parameter.
If that parameter isn't furnished, loads all course_keys from the If that parameter isn't furnished, loads all course_keys from the
...@@ -148,7 +157,6 @@ class ModuleStoreSerializer(object): ...@@ -148,7 +157,6 @@ class ModuleStoreSerializer(object):
return coerced_value return coerced_value
@staticmethod @staticmethod
def add_to_transaction(neo4j_entities, transaction): def add_to_transaction(neo4j_entities, transaction):
""" """
...@@ -159,12 +167,40 @@ class ModuleStoreSerializer(object): ...@@ -159,12 +167,40 @@ class ModuleStoreSerializer(object):
for entity in neo4j_entities: for entity in neo4j_entities:
transaction.create(entity) transaction.create(entity)
@staticmethod
def should_dump_course(course_key):
"""
Only dump the course if it's been changed since the last time it's been
dumped.
:param course_key: a CourseKey object.
:return: bool. Whether or not this course should be dumped to neo4j.
"""
last_this_command_was_run = COMMAND_LAST_RUN_CACHE.get(course_key)
last_course_had_published_event = COURSE_LAST_PUBLISHED_CACHE.get(
course_key
)
# if we have no record of this course being serialized, serialize it
if last_this_command_was_run is None:
return True
# if we've serialized the course recently and we have no published
# events, we can skip re-serializing it
if last_this_command_was_run and last_course_had_published_event is None:
return False
# otherwise, serialize if the command was run before the course's last
# published event
return last_this_command_was_run < last_course_had_published_event
def dump_courses_to_neo4j(self, graph): def dump_courses_to_neo4j(self, graph, override_cache=False):
""" """
Parameters Parameters
---------- ----------
graph: py2neo graph object graph: py2neo graph object
override_cache: serialize the courses even if they'be been recently
serialized
Returns two lists: one of the courses that were successfully written Returns two lists: one of the courses that were successfully written
to neo4j, and one of courses that were not. to neo4j, and one of courses that were not.
...@@ -185,6 +221,11 @@ class ModuleStoreSerializer(object): ...@@ -185,6 +221,11 @@ class ModuleStoreSerializer(object):
index + 1, index + 1,
total_number_of_courses, total_number_of_courses,
) )
if not (override_cache or self.should_dump_course(course_key)):
log.info("skipping dumping %s, since it hasn't changed", course_key)
continue
nodes, relationships = self.serialize_course(course_key) nodes, relationships = self.serialize_course(course_key)
log.info( log.info(
"%d nodes and %d relationships in %s", "%d nodes and %d relationships in %s",
...@@ -217,6 +258,7 @@ class ModuleStoreSerializer(object): ...@@ -217,6 +258,7 @@ class ModuleStoreSerializer(object):
unsuccessful_courses.append(course_string) unsuccessful_courses.append(course_string)
else: else:
COMMAND_LAST_RUN_CACHE.set(course_key)
successful_courses.append(course_string) successful_courses.append(course_string)
return successful_courses, unsuccessful_courses return successful_courses, unsuccessful_courses
...@@ -228,20 +270,34 @@ class Command(BaseCommand): ...@@ -228,20 +270,34 @@ class Command(BaseCommand):
Takes the following named arguments: Takes the following named arguments:
host: the host of the neo4j server host: the host of the neo4j server
port: the port on the server that accepts https requests https_port: the port on the neo4j server that accepts https requests
http_port: the port on the neo4j server that accepts http requests
secure: if set, connects to server over https, otherwise uses http
user: the username for the neo4j user user: the username for the neo4j user
password: the user's password password: the user's password
courses: list of course key strings to serialize. If not specified, all
courses in the modulestore are serialized.
override: if true, dump all--or all specified--courses, regardless of when
they were last dumped. If false, or not set, only dump those courses that
were updated since the last time the command was run.
Example usage: Example usage:
python manage.py lms dump_to_neo4j --host localhost --port 7473 \ python manage.py lms dump_to_neo4j --host localhost --https_port 7473 \
--user user --password password --settings=aws --secure --user user --password password --settings=aws
""" """
def add_arguments(self, parser): def add_arguments(self, parser):
parser.add_argument('--host', type=unicode) parser.add_argument('--host', type=unicode)
parser.add_argument('--port', type=int) parser.add_argument('--https_port', type=int, default=7473)
parser.add_argument('--http_port', type=int, default=7474)
parser.add_argument('--secure', action='store_true')
parser.add_argument('--user', type=unicode) parser.add_argument('--user', type=unicode)
parser.add_argument('--password', type=unicode) parser.add_argument('--password', type=unicode)
parser.add_argument('--courses', type=unicode, nargs='*') parser.add_argument('--courses', type=unicode, nargs='*')
parser.add_argument(
'--override',
action='store_true',
help='dump all--or all specified--courses, ignoring cache',
)
def handle(self, *args, **options): # pylint: disable=unused-argument def handle(self, *args, **options): # pylint: disable=unused-argument
""" """
...@@ -249,12 +305,14 @@ class Command(BaseCommand): ...@@ -249,12 +305,14 @@ class Command(BaseCommand):
those graphs to neo4j. those graphs to neo4j.
""" """
host = options['host'] host = options['host']
port = options['port'] https_port = options['https_port']
http_port = options['http_port']
secure = options['secure']
neo4j_user = options['user'] neo4j_user = options['user']
neo4j_password = options['password'] neo4j_password = options['password']
authenticate( authenticate(
"{host}:{port}".format(host=host, port=port), "{host}:{port}".format(host=host, port=https_port if secure else http_port),
neo4j_user, neo4j_user,
neo4j_password, neo4j_password,
) )
...@@ -263,20 +321,26 @@ class Command(BaseCommand): ...@@ -263,20 +321,26 @@ class Command(BaseCommand):
bolt=True, bolt=True,
password=neo4j_password, password=neo4j_password,
user=neo4j_user, user=neo4j_user,
https_port=port, https_port=https_port,
http_port=http_port,
host=host, host=host,
secure=True secure=secure,
) )
mss = ModuleStoreSerializer() mss = ModuleStoreSerializer(options['courses'])
mss.load_course_keys(options['courses'])
successful_courses, unsuccessful_courses = mss.dump_courses_to_neo4j(
graph, override_cache=options['override']
)
successful_courses, unsuccessful_courses = mss.dump_courses_to_neo4j(graph) if not successful_courses and not unsuccessful_courses:
print("No courses exported to neo4j at all!")
return
if successful_courses: if successful_courses:
print( print(
"These courses exported to neo4j successfully:\n\t" + "These courses exported to neo4j successfully:\n\t" +
"\n\t".join(successful_courses) "\n\t".join(successful_courses)
) )
else: else:
print("No courses exported to neo4j successfully.") print("No courses exported to neo4j successfully.")
......
...@@ -4,18 +4,21 @@ Tests for the dump_to_neo4j management command. ...@@ -4,18 +4,21 @@ Tests for the dump_to_neo4j management command.
""" """
from __future__ import unicode_literals from __future__ import unicode_literals
from datetime import datetime
import ddt import ddt
import mock import mock
from courseware.management.commands.dump_to_neo4j import (
ModuleStoreSerializer,
ITERABLE_NEO4J_TYPES,
)
from django.core.management import call_command from django.core.management import call_command
from django.core.management.base import CommandError
from django.utils import six from django.utils import six
from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory
from openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j import (
ModuleStoreSerializer,
ITERABLE_NEO4J_TYPES,
)
from openedx.core.djangoapps.coursegraph.signals import _listen_for_course_publish
class TestDumpToNeo4jCommandBase(SharedModuleStoreTestCase): class TestDumpToNeo4jCommandBase(SharedModuleStoreTestCase):
""" """
...@@ -44,7 +47,7 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase): ...@@ -44,7 +47,7 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase):
Tests for the dump to neo4j management command Tests for the dump to neo4j management command
""" """
@mock.patch('courseware.management.commands.dump_to_neo4j.Graph') @mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.Graph')
@ddt.data(1, 2) @ddt.data(1, 2)
def test_dump_specific_courses(self, number_of_courses, mock_graph_class): def test_dump_specific_courses(self, number_of_courses, mock_graph_class):
""" """
...@@ -59,7 +62,7 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase): ...@@ -59,7 +62,7 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase):
'dump_to_neo4j', 'dump_to_neo4j',
courses=self.course_strings[:number_of_courses], courses=self.course_strings[:number_of_courses],
host='mock_host', host='mock_host',
port=7473, http_port=7474,
user='mock_user', user='mock_user',
password='mock_password', password='mock_password',
) )
...@@ -68,7 +71,7 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase): ...@@ -68,7 +71,7 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase):
self.assertEqual(mock_transaction.commit.call_count, number_of_courses) self.assertEqual(mock_transaction.commit.call_count, number_of_courses)
self.assertEqual(mock_transaction.commit.rollback.call_count, 0) self.assertEqual(mock_transaction.commit.rollback.call_count, 0)
@mock.patch('courseware.management.commands.dump_to_neo4j.Graph') @mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.Graph')
def test_dump_all_courses(self, mock_graph_class): def test_dump_all_courses(self, mock_graph_class):
""" """
Test if you don't specify which courses to dump, then you'll dump Test if you don't specify which courses to dump, then you'll dump
...@@ -82,7 +85,7 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase): ...@@ -82,7 +85,7 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase):
call_command( call_command(
'dump_to_neo4j', 'dump_to_neo4j',
host='mock_host', host='mock_host',
port=7473, http_port=7474,
user='mock_user', user='mock_user',
password='mock_password', password='mock_password',
) )
...@@ -97,13 +100,17 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -97,13 +100,17 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
""" """
Tests for the ModuleStoreSerializer Tests for the ModuleStoreSerializer
""" """
@classmethod
def setUpClass(cls):
"""Any ModuleStore course/content operations can go here."""
super(TestModuleStoreSerializer, cls).setUpClass()
cls.mss = ModuleStoreSerializer()
def test_serialize_item(self): def test_serialize_item(self):
""" """
Tests the serialize_item method. Tests the serialize_item method.
""" """
mss = ModuleStoreSerializer() fields, label = self.mss.serialize_item(self.course)
mss.load_course_keys()
fields, label = mss.serialize_item(self.course)
self.assertEqual(label, "course") self.assertEqual(label, "course")
self.assertIn("edited_on", fields.keys()) self.assertIn("edited_on", fields.keys())
self.assertIn("display_name", fields.keys()) self.assertIn("display_name", fields.keys())
...@@ -117,9 +124,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -117,9 +124,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
""" """
Tests the serialize_course method. Tests the serialize_course method.
""" """
mss = ModuleStoreSerializer() nodes, relationships = self.mss.serialize_course(
mss.load_course_keys()
nodes, relationships = mss.serialize_course(
self.course.id self.course.id
) )
self.assertEqual(len(nodes), 9) self.assertEqual(len(nodes), 9)
...@@ -154,7 +159,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -154,7 +159,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
""" """
Tests the coerce_types helper for the neo4j base types Tests the coerce_types helper for the neo4j base types
""" """
coerced_value = ModuleStoreSerializer().coerce_types(original_value) coerced_value = self.mss.coerce_types(original_value)
self.assertEqual(coerced_value, coerced_expected) self.assertEqual(coerced_value, coerced_expected)
def test_dump_to_neo4j(self): def test_dump_to_neo4j(self):
...@@ -166,10 +171,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -166,10 +171,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
mock_transaction = mock.Mock() mock_transaction = mock.Mock()
mock_graph.begin.return_value = mock_transaction mock_graph.begin.return_value = mock_transaction
mss = ModuleStoreSerializer() successful, unsuccessful = self.mss.dump_courses_to_neo4j(mock_graph)
mss.load_course_keys()
successful, unsuccessful = mss.dump_courses_to_neo4j(mock_graph)
self.assertEqual(mock_graph.begin.call_count, 2) self.assertEqual(mock_graph.begin.call_count, 2)
self.assertEqual(mock_transaction.commit.call_count, 2) self.assertEqual(mock_transaction.commit.call_count, 2)
...@@ -193,9 +195,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -193,9 +195,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
mock_graph.begin.return_value = mock_transaction mock_graph.begin.return_value = mock_transaction
mock_transaction.run.side_effect = ValueError('Something went wrong!') mock_transaction.run.side_effect = ValueError('Something went wrong!')
mss = ModuleStoreSerializer() successful, unsuccessful = self.mss.dump_courses_to_neo4j(mock_graph)
mss.load_course_keys()
successful, unsuccessful = mss.dump_courses_to_neo4j(mock_graph)
self.assertEqual(mock_graph.begin.call_count, 2) self.assertEqual(mock_graph.begin.call_count, 2)
self.assertEqual(mock_transaction.commit.call_count, 0) self.assertEqual(mock_transaction.commit.call_count, 0)
...@@ -203,3 +203,76 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): ...@@ -203,3 +203,76 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
self.assertEqual(len(successful), 0) self.assertEqual(len(successful), 0)
self.assertItemsEqual(unsuccessful, self.course_strings) self.assertItemsEqual(unsuccessful, self.course_strings)
@ddt.data(
(True, 2),
(False, 0),
)
@ddt.unpack
def test_dump_to_neo4j_cache(self, override_cache, expected_number_courses):
"""
Tests the caching mechanism and override to make sure we only publish
recently updated courses.
"""
mock_graph = mock.Mock()
# run once to warm the cache
successful, unsuccessful = self.mss.dump_courses_to_neo4j(mock_graph)
self.assertEqual(len(successful + unsuccessful), len(self.course_strings))
# when run the second time, only dump courses if the cache override
# is enabled
successful, unsuccessful = self.mss.dump_courses_to_neo4j(
mock_graph, override_cache=override_cache
)
self.assertEqual(len(successful + unsuccessful), expected_number_courses)
def test_dump_to_neo4j_published(self):
"""
Tests that we only dump those courses that have been published after
the last time the command was been run.
"""
mock_graph = mock.Mock()
# run once to warm the cache
successful, unsuccessful = self.mss.dump_courses_to_neo4j(mock_graph)
self.assertEqual(len(successful + unsuccessful), len(self.course_strings))
# simulate one of the courses being published
_listen_for_course_publish(None, self.course.id)
# make sure only the published course was dumped
successful, unsuccessful = self.mss.dump_courses_to_neo4j(mock_graph)
self.assertEqual(len(unsuccessful), 0)
self.assertEqual(len(successful), 1)
self.assertEqual(successful[0], unicode(self.course.id))
@ddt.data(
(datetime(2016, 3, 30), datetime(2016, 3, 31), True),
(datetime(2016, 3, 31), datetime(2016, 3, 30), False),
(datetime(2016, 3, 31), None, False),
(None, datetime(2016, 3, 30), True),
(None, None, True),
)
@ddt.unpack
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.COMMAND_LAST_RUN_CACHE')
@mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.COURSE_LAST_PUBLISHED_CACHE')
def test_should_dump_course(
self,
last_command_run,
last_course_published,
should_dump,
mock_course_last_published_cache,
mock_command_last_run_cache,
):
"""
Tests whether a course should be dumped given the last time it was
dumped and the last time it was published.
"""
mock_command_last_run_cache.get.return_value = last_command_run
mock_course_last_published_cache.get.return_value = last_course_published
mock_course_key = mock.Mock
self.assertEqual(
self.mss.should_dump_course(mock_course_key),
should_dump
)
"""
Signal handlers for the CourseGraph application
"""
from django.dispatch.dispatcher import receiver
from xmodule.modulestore.django import SignalHandler
from openedx.core.djangoapps.coursegraph.utils import CourseLastPublishedCache
@receiver(SignalHandler.course_published)
def _listen_for_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument
"""
Register when the course was published on a course publish event
"""
CourseLastPublishedCache().set(course_key)
"""
Tests for coursegraph's signal handler on course publish
"""
from __future__ import unicode_literals
from opaque_keys.edx.keys import CourseKey
from openedx.core.djangoapps.coursegraph.signals import _listen_for_course_publish
from openedx.core.djangoapps.coursegraph.utils import CourseLastPublishedCache
from openedx.core.djangolib.testing.utils import CacheIsolationTestCase
class TestCourseGraphSignalHandler(CacheIsolationTestCase):
"""
Tests for the course publish course handler
"""
ENABLED_CACHES = ['default']
def test_cache_set_on_course_publish(self):
"""
Tests that the last published cache is set on course publish
"""
course_key = CourseKey.from_string('course-v1:org+course+run')
last_published_cache = CourseLastPublishedCache()
self.assertIsNone(last_published_cache.get(course_key))
_listen_for_course_publish(None, course_key)
self.assertIsNotNone(last_published_cache.get(course_key))
"""
Helpers for the CourseGraph app
"""
from django.core.cache import cache
from django.utils import timezone
class TimeRecordingCacheBase(object):
"""
A base class for caching the current time for some key.
"""
# cache_prefix should be defined in children classes
cache_prefix = None
_cache = cache
def _key(self, course_key):
"""
Make a cache key from the prefix and a course_key
:param course_key: CourseKey object
:return: a cache key
"""
return self.cache_prefix + unicode(course_key)
def get(self, course_key):
"""
Gets the time value associated with the CourseKey.
:param course_key: a CourseKey object.
:return: the time the key was last set.
"""
return self._cache.get(self._key(course_key))
def set(self, course_key):
"""
Sets the current time for a CourseKey key.
:param course_key: a CourseKey object.
"""
return self._cache.set(self._key(course_key), timezone.now())
class CourseLastPublishedCache(TimeRecordingCacheBase):
"""
Used to record the last time that a course had a publish event run on it.
"""
cache_prefix = u'course_last_published'
class CommandLastRunCache(TimeRecordingCacheBase):
"""
Used to record the last time that the dump_to_neo4j command was run on a
course.
"""
cache_prefix = u'dump_to_neo4j_command_last_run'
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment