Commit c04f4401 by Nimisha Asthagiri

Update generate_course_blocks management command to enqueue tasks

parent 43c5cb2a
""" """
Command to load course blocks. Command to load course blocks.
""" """
from collections import defaultdict
import logging import logging
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey
from xmodule.modulestore.django import modulestore from xmodule.modulestore.django import modulestore
from openedx.core.djangoapps.content.block_structure.api import get_course_in_cache, update_course_in_cache import openedx.core.djangoapps.content.block_structure.api as api
import openedx.core.djangoapps.content.block_structure.tasks as tasks
import openedx.core.lib.block_structure.cache as cache
from openedx.core.lib.command_utils import (
get_mutually_exclusive_required_option,
validate_dependent_option,
parse_course_keys,
)
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -29,159 +33,109 @@ class Command(BaseCommand): ...@@ -29,159 +33,109 @@ class Command(BaseCommand):
Entry point for subclassed commands to add custom arguments. Entry point for subclassed commands to add custom arguments.
""" """
parser.add_argument( parser.add_argument(
'--all', '--courses',
help='Generate course blocks for all or specified courses.', dest='courses',
action='store_true', nargs='+',
default=False, help='Generate course blocks for the list of courses provided.',
) )
parser.add_argument( parser.add_argument(
'--dags', '--all_courses',
help='Find and log DAGs for all or specified courses.', help='Generate course blocks for all courses, given the requested start and end indices.',
action='store_true', action='store_true',
default=False, default=False,
) )
parser.add_argument( parser.add_argument(
'--force', '--enqueue_task',
help='Force update of the course blocks for the requested courses.', help='Enqueue the tasks for asynchronous computation.',
action='store_true', action='store_true',
default=False, default=False,
) )
parser.add_argument( parser.add_argument(
'--verbose', '--routing_key',
help='Enable verbose logging.', dest='routing_key',
help='Routing key to use for asynchronous computation.',
)
parser.add_argument(
'--force_update',
help='Force update of the course blocks for the requested courses.',
action='store_true', action='store_true',
default=False, default=False,
) )
parser.add_argument( parser.add_argument(
'--start', '--start_index',
help='Starting index of course.', help='Starting index of course list.',
default=0, default=0,
type=int, type=int,
) )
parser.add_argument( parser.add_argument(
'--end', '--end_index',
help='Ending index of course.', help='Ending index of course list.',
default=0, default=0,
type=int, type=int,
) )
def handle(self, *args, **options): def handle(self, *args, **options):
if options.get('all'): courses_mode = get_mutually_exclusive_required_option(options, 'courses', 'all_courses')
course_keys = [course.id for course in modulestore().get_course_summaries()] validate_dependent_option(options, 'routing_key', 'enqueue_task')
if options.get('start'): validate_dependent_option(options, 'start_index', 'all_courses')
end = options.get('end') or len(course_keys) validate_dependent_option(options, 'end_index', 'all_courses')
course_keys = course_keys[options['start']:end]
else:
if len(args) < 1:
raise CommandError('At least one course or --all must be specified.')
try:
course_keys = [CourseKey.from_string(arg) for arg in args]
except InvalidKeyError:
raise CommandError('Invalid key specified.')
log.info('Generating course blocks for %d courses.', len(course_keys))
if options.get('verbose'): if courses_mode == 'all_courses':
log.setLevel(logging.DEBUG) course_keys = [course.id for course in modulestore().get_course_summaries()]
if options.get('start_index'):
end = options.get('end_index') or len(course_keys)
course_keys = course_keys[options['start_index']:end]
else: else:
log.setLevel(logging.CRITICAL) course_keys = parse_course_keys(options['courses'])
dag_info = _DAGInfo()
for course_key in course_keys:
try:
if options.get('force'):
block_structure = update_course_in_cache(course_key)
else:
block_structure = get_course_in_cache(course_key)
if options.get('dags'):
self._find_and_log_dags(block_structure, course_key, dag_info)
except Exception as ex: # pylint: disable=broad-except
log.exception(
'An error occurred while generating course blocks for %s: %s',
unicode(course_key),
ex.message,
)
log.info('Finished generating course blocks.') self._set_log_levels(options)
if options.get('dags'): log.warning('STARTED generating Course Blocks for %d courses.', len(course_keys))
log.critical('DAG data: %s', unicode(dag_info)) self._generate_course_blocks(options, course_keys)
log.warning('FINISHED generating Course Blocks for %d courses.', len(course_keys))
def _find_and_log_dags(self, block_structure, course_key, dag_info): def _set_log_levels(self, options):
""" """
Finds all DAGs within the given block structure. Sets logging levels for this module and the block structure
cache module, based on the given the options.
Arguments:
BlockStructureBlockData - The block structure in which to find DAGs.
""" """
for block_key in block_structure.get_block_keys(): if options.get('verbosity') == 0:
parents = block_structure.get_parents(block_key) log_level = logging.CRITICAL
if len(parents) > 1: elif options.get('verbosity') == 1:
dag_info.on_dag_found(course_key, block_key) log_level = logging.WARNING
log.warning( else:
'DAG alert - %s has multiple parents: %s.', log_level = logging.INFO
unicode(block_key),
[unicode(parent) for parent in parents],
)
class PrettyDefaultDict(defaultdict):
"""
Wraps defaultdict to provide a better string representation.
"""
__repr__ = dict.__repr__
class _DAGBlockTypeInfo(object):
"""
Class for aggregated DAG data for a specific block type.
"""
def __init__(self):
self.num_of_dag_blocks = 0
def __repr__(self):
return repr(vars(self))
class _DAGCourseInfo(object): if options.get('verbosity') < 3:
""" cache_log_level = logging.CRITICAL
Class for aggregated DAG data for a specific course run. else:
""" cache_log_level = logging.INFO
def __init__(self):
self.num_of_dag_blocks = 0
self.dag_data_by_block_type = PrettyDefaultDict(_DAGBlockTypeInfo)
def __repr__(self): log.setLevel(log_level)
return repr(vars(self)) cache.logger.setLevel(cache_log_level)
def on_dag_found(self, block_key): def _generate_course_blocks(self, options, course_keys):
""" """
Updates DAG collected data for the given block. Generates course blocks for the given course_keys per the given options.
""" """
self.num_of_dag_blocks += 1
self.dag_data_by_block_type[block_key.category].num_of_dag_blocks += 1
class _DAGInfo(object): for course_key in course_keys:
""" try:
Class for aggregated DAG data. log.info('STARTED generating Course Blocks for course: %s.', course_key)
"""
def __init__(self):
self.total_num_of_dag_blocks = 0
self.total_num_of_dag_courses = 0
self.dag_data_by_course = PrettyDefaultDict(_DAGCourseInfo)
self.dag_data_by_block_type = PrettyDefaultDict(_DAGBlockTypeInfo)
def __repr__(self): if options.get('enqueue_task'):
return repr(vars(self)) action = tasks.update_course_in_cache if options.get('force_update') else tasks.get_course_in_cache
task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {}
action.apply_async([unicode(course_key)], **task_options)
else:
action = api.update_course_in_cache if options.get('force_update') else api.get_course_in_cache
action(course_key)
def on_dag_found(self, course_key, block_key): log.info('FINISHED generating Course Blocks for course: %s.', course_key)
""" except Exception as ex: # pylint: disable=broad-except
Updates DAG collected data for the given block. log.exception(
""" 'An error occurred while generating course blocks for %s: %s',
self.total_num_of_dag_blocks += 1 unicode(course_key),
if course_key not in self.dag_data_by_course: ex.message,
self.total_num_of_dag_courses += 1 )
self.dag_data_by_course[unicode(course_key)].on_dag_found(block_key)
self.dag_data_by_block_type[block_key.category].num_of_dag_blocks += 1
""" """
Tests for generate_course_blocks management command. Tests for generate_course_blocks management command.
""" """
import ddt
from django.core.management.base import CommandError from django.core.management.base import CommandError
import itertools
from mock import patch from mock import patch
from xmodule.modulestore import ModuleStoreEnum from xmodule.modulestore import ModuleStoreEnum
...@@ -11,86 +13,140 @@ from .. import generate_course_blocks ...@@ -11,86 +13,140 @@ from .. import generate_course_blocks
from openedx.core.djangoapps.content.block_structure.tests.helpers import is_course_in_block_structure_cache from openedx.core.djangoapps.content.block_structure.tests.helpers import is_course_in_block_structure_cache
@ddt.ddt
class TestGenerateCourseBlocks(ModuleStoreTestCase): class TestGenerateCourseBlocks(ModuleStoreTestCase):
""" """
Tests generate course blocks management command. Tests generate course blocks management command.
""" """
num_courses = 2
def setUp(self): def setUp(self):
""" """
Create courses in modulestore. Create courses in modulestore.
""" """
super(TestGenerateCourseBlocks, self).setUp() super(TestGenerateCourseBlocks, self).setUp()
self.course_1 = CourseFactory.create() self.courses = [CourseFactory.create() for _ in range(self.num_courses)]
self.course_2 = CourseFactory.create() self.course_keys = [course.id for course in self.courses]
self.command = generate_course_blocks.Command() self.command = generate_course_blocks.Command()
def _assert_courses_not_in_block_cache(self, *courses): def _assert_courses_not_in_block_cache(self, *course_keys):
""" """
Assert courses don't exist in the course block cache. Assert courses don't exist in the course block cache.
""" """
for course_key in courses: for course_key in course_keys:
self.assertFalse(is_course_in_block_structure_cache(course_key, self.store)) self.assertFalse(is_course_in_block_structure_cache(course_key, self.store))
def _assert_courses_in_block_cache(self, *courses): def _assert_courses_in_block_cache(self, *course_keys):
""" """
Assert courses exist in course block cache. Assert courses exist in course block cache.
""" """
for course_key in courses: for course_key in course_keys:
self.assertTrue(is_course_in_block_structure_cache(course_key, self.store)) self.assertTrue(is_course_in_block_structure_cache(course_key, self.store))
def test_generate_all(self): def _assert_message_presence_in_logs(self, message, mock_log, expected_presence=True):
self._assert_courses_not_in_block_cache(self.course_1.id, self.course_2.id) """
self.command.handle(all=True) Asserts that the logger was called with the given message.
self._assert_courses_in_block_cache(self.course_1.id, self.course_2.id) """
message_present = any([message in call_args[0][0] for call_args in mock_log.warning.call_args_list])
if expected_presence:
self.assertTrue(message_present)
else:
self.assertFalse(message_present)
@ddt.data(True, False)
def test_all_courses(self, force_update):
self._assert_courses_not_in_block_cache(*self.course_keys)
self.command.handle(all_courses=True)
self._assert_courses_in_block_cache(*self.course_keys)
with patch( with patch(
'openedx.core.lib.block_structure.factory.BlockStructureFactory.create_from_modulestore' 'openedx.core.lib.block_structure.factory.BlockStructureFactory.create_from_modulestore'
) as mock_update_from_store: ) as mock_update_from_store:
self.command.handle(all=True) self.command.handle(all_courses=True, force_update=force_update)
mock_update_from_store.assert_not_called() self.assertEqual(mock_update_from_store.call_count, self.num_courses if force_update else 0)
def test_one_course(self):
self._assert_courses_not_in_block_cache(*self.course_keys)
self.command.handle(courses=[unicode(self.course_keys[0])])
self._assert_courses_in_block_cache(self.course_keys[0])
self._assert_courses_not_in_block_cache(*self.course_keys[1:])
@ddt.data(
*itertools.product(
(True, False),
(True, False),
('route_1', None),
)
)
@ddt.unpack
def test_enqueue(self, enqueue_task, force_update, routing_key):
command_options = dict(all_courses=True, enqueue_task=enqueue_task, force_update=force_update)
if enqueue_task and routing_key:
command_options['routing_key'] = routing_key
def test_generate_force(self):
self._assert_courses_not_in_block_cache(self.course_1.id, self.course_2.id)
self.command.handle(all=True)
self._assert_courses_in_block_cache(self.course_1.id, self.course_2.id)
with patch( with patch(
'openedx.core.lib.block_structure.factory.BlockStructureFactory.create_from_modulestore' 'lms.djangoapps.course_blocks.management.commands.generate_course_blocks.tasks'
) as mock_update_from_store: ) as mock_tasks:
self.command.handle(all=True, force=True) with patch(
mock_update_from_store.assert_called() 'lms.djangoapps.course_blocks.management.commands.generate_course_blocks.api'
) as mock_api:
def test_generate_one(self): self.command.handle(**command_options)
self._assert_courses_not_in_block_cache(self.course_1.id, self.course_2.id)
self.command.handle(unicode(self.course_1.id))
self._assert_courses_in_block_cache(self.course_1.id)
self._assert_courses_not_in_block_cache(self.course_2.id)
@patch('lms.djangoapps.course_blocks.management.commands.generate_course_blocks.log') self.assertEqual(
def test_generate_no_dags(self, mock_log): mock_tasks.update_course_in_cache.apply_async.call_count,
self.command.handle(dags=True, all=True) self.num_courses if enqueue_task and force_update else 0,
self.assertEquals(mock_log.warning.call_count, 0) )
self.assertEqual(
mock_tasks.get_course_in_cache.apply_async.call_count,
self.num_courses if enqueue_task and not force_update else 0,
)
@patch('lms.djangoapps.course_blocks.management.commands.generate_course_blocks.log') self.assertEqual(
def test_generate_with_dags(self, mock_log): mock_api.update_course_in_cache.call_count,
with self.store.branch_setting(ModuleStoreEnum.Branch.draft_preferred): self.num_courses if not enqueue_task and force_update else 0,
item1 = ItemFactory.create(parent=self.course_1) )
item2 = ItemFactory.create(parent=item1) self.assertEqual(
item3 = ItemFactory.create(parent=item1) mock_api.get_course_in_cache.call_count,
item2.children.append(item3.location) self.num_courses if not enqueue_task and not force_update else 0,
self.store.update_item(item2, ModuleStoreEnum.UserID.mgmt_command) )
self.store.publish(self.course_1.location, ModuleStoreEnum.UserID.mgmt_command)
if enqueue_task:
self.command.handle(dags=True, all=True) task_action = mock_tasks.update_course_in_cache if force_update else mock_tasks.get_course_in_cache
self.assertEquals(mock_log.warning.call_count, 1) task_options = task_action.apply_async.call_args[1]
if routing_key:
self.assertEquals(task_options['routing_key'], routing_key)
else:
self.assertNotIn('routing_key', task_options)
@patch('lms.djangoapps.course_blocks.management.commands.generate_course_blocks.log') @patch('lms.djangoapps.course_blocks.management.commands.generate_course_blocks.log')
def test_not_found_key(self, mock_log): def test_not_found_key(self, mock_log):
self.command.handle('fake/course/id', all=False) self.command.handle(courses=['fake/course/id'])
self.assertTrue(mock_log.exception.called) self.assertTrue(mock_log.exception.called)
def test_invalid_key(self): def test_invalid_key(self):
with self.assertRaises(CommandError): with self.assertRaises(CommandError):
self.command.handle('not/found', all=False) self.command.handle(courses=['not/found'])
def test_no_params(self): def test_no_params(self):
with self.assertRaises(CommandError): with self.assertRaises(CommandError):
self.command.handle(all=False) self.command.handle(all_courses=False)
def test_no_course_mode(self):
with self.assertRaisesMessage(CommandError, 'Either --courses or --all_courses must be specified.'):
self.command.handle()
def test_both_course_modes(self):
with self.assertRaisesMessage(CommandError, 'Both --courses and --all_courses cannot be specified.'):
self.command.handle(all_courses=True, courses=['some/course/key'])
@ddt.data(
('routing_key', 'enqueue_task'),
('start_index', 'all_courses'),
('end_index', 'all_courses'),
)
@ddt.unpack
def test_dependent_options_error(self, dependent_option, depending_on_option):
expected_error_message = 'Option --{} requires option --{}.'.format(dependent_option, depending_on_option)
options = {dependent_option: 1, depending_on_option: False, 'courses': ['some/course/key']}
with self.assertRaisesMessage(CommandError, expected_error_message):
self.command.handle(**options)
...@@ -8,9 +8,7 @@ from textwrap import dedent ...@@ -8,9 +8,7 @@ from textwrap import dedent
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand, CommandError
from django.db.models import Count from django.db.models import Count
from opaque_keys import InvalidKeyError from openedx.core.lib.command_utils import get_mutually_exclusive_required_option, parse_course_keys
from opaque_keys.edx.keys import CourseKey
from lms.djangoapps.grades.models import PersistentSubsectionGrade, PersistentCourseGrade from lms.djangoapps.grades.models import PersistentSubsectionGrade, PersistentCourseGrade
...@@ -73,8 +71,8 @@ class Command(BaseCommand): ...@@ -73,8 +71,8 @@ class Command(BaseCommand):
modified_start = None modified_start = None
modified_end = None modified_end = None
run_mode = self._get_mutually_exclusive_option(options, 'delete', 'dry_run') run_mode = get_mutually_exclusive_required_option(options, 'delete', 'dry_run')
courses_mode = self._get_mutually_exclusive_option(options, 'courses', 'all_courses') courses_mode = get_mutually_exclusive_required_option(options, 'courses', 'all_courses')
if options.get('modified_start'): if options.get('modified_start'):
modified_start = datetime.strptime(options['modified_start'], DATE_FORMAT) modified_start = datetime.strptime(options['modified_start'], DATE_FORMAT)
...@@ -85,10 +83,7 @@ class Command(BaseCommand): ...@@ -85,10 +83,7 @@ class Command(BaseCommand):
modified_end = datetime.strptime(options['modified_end'], DATE_FORMAT) modified_end = datetime.strptime(options['modified_end'], DATE_FORMAT)
if courses_mode == 'courses': if courses_mode == 'courses':
try: course_keys = parse_course_keys(options['courses'])
course_keys = [CourseKey.from_string(course_key_string) for course_key_string in options['courses']]
except InvalidKeyError as error:
raise CommandError('Invalid key specified: {}'.format(error.message))
log.info("reset_grade: Started in %s mode!", run_mode) log.info("reset_grade: Started in %s mode!", run_mode)
...@@ -135,16 +130,3 @@ class Command(BaseCommand): ...@@ -135,16 +130,3 @@ class Command(BaseCommand):
grade_model_class.__name__, grade_model_class.__name__,
total_for_all_courses, total_for_all_courses,
) )
def _get_mutually_exclusive_option(self, options, option_1, option_2):
"""
Validates that exactly one of the 2 given options is specified.
Returns the name of the found option.
"""
if not options.get(option_1) and not options.get(option_2):
raise CommandError('Either --{} or --{} must be specified.'.format(option_1, option_2))
if options.get(option_1) and options.get(option_2):
raise CommandError('Both --{} and --{} cannot be specified.'.format(option_1, option_2))
return option_1 if options.get(option_1) else option_2
...@@ -29,17 +29,37 @@ def update_course_in_cache(course_id): ...@@ -29,17 +29,37 @@ def update_course_in_cache(course_id):
""" """
Updates the course blocks (in the database) for the specified course. Updates the course blocks (in the database) for the specified course.
""" """
_call_and_retry_if_needed(course_id, api.update_course_in_cache, update_course_in_cache)
@task(
default_retry_delay=settings.BLOCK_STRUCTURES_SETTINGS['BLOCK_STRUCTURES_TASK_DEFAULT_RETRY_DELAY'],
max_retries=settings.BLOCK_STRUCTURES_SETTINGS['BLOCK_STRUCTURES_TASK_MAX_RETRIES'],
)
def get_course_in_cache(course_id):
"""
Gets the course blocks for the specified course, updating the cache if needed.
"""
_call_and_retry_if_needed(course_id, api.get_course_in_cache, get_course_in_cache)
def _call_and_retry_if_needed(course_id, api_method, task_method):
"""
Calls the given api_method with the given course_id, retrying task_method upon failure.
"""
try: try:
course_key = CourseKey.from_string(course_id) course_key = CourseKey.from_string(course_id)
api.update_course_in_cache(course_key) api_method(course_key)
except NO_RETRY_TASKS as exc: except NO_RETRY_TASKS as exc:
# Known unrecoverable errors # Known unrecoverable errors
raise raise
except RETRY_TASKS as exc: except RETRY_TASKS as exc:
log.exception("update_course_in_cache encounted expected error, retrying.") log.exception("%s encountered expected error, retrying.", task_method.__name__)
raise update_course_in_cache.retry(args=[course_id], exc=exc) raise task_method.retry(args=[course_id], exc=exc)
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
log.exception("update_course_in_cache encounted unknown error. Retry #{}".format( log.exception(
update_course_in_cache.request.retries, "%s encountered unknown error. Retry #%d",
)) task_method.__name__,
raise update_course_in_cache.retry(args=[course_id], exc=exc) task_method.request.retries,
)
raise task_method.retry(args=[course_id], exc=exc)
"""
Useful utilities for management commands.
"""
from django.core.management.base import CommandError
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey
def get_mutually_exclusive_required_option(options, option_1, option_2):
"""
Validates that exactly one of the 2 given options is specified.
Returns the name of the found option.
"""
validate_mutually_exclusive_option(options, option_1, option_2)
if not options.get(option_1) and not options.get(option_2):
raise CommandError('Either --{} or --{} must be specified.'.format(option_1, option_2))
return option_1 if options.get(option_1) else option_2
def validate_mutually_exclusive_option(options, option_1, option_2):
"""
Validates that both of the 2 given options are not specified.
"""
if options.get(option_1) and options.get(option_2):
raise CommandError('Both --{} and --{} cannot be specified.'.format(option_1, option_2))
def validate_dependent_option(options, dependent_option, depending_on_option):
"""
Validates that option_1 is specified if dependent_option is specified.
"""
if options.get(dependent_option) and not options.get(depending_on_option):
raise CommandError('Option --{} requires option --{}.'.format(dependent_option, depending_on_option))
def parse_course_keys(course_key_strings):
"""
Parses and returns a list of CourseKey objects from the given
list of course key strings.
"""
try:
return [CourseKey.from_string(course_key_string) for course_key_string in course_key_strings]
except InvalidKeyError as error:
raise CommandError('Invalid key specified: {}'.format(error.message))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment