Commit 784ef042 by Nimisha Asthagiri Committed by GitHub

Merge pull request #14359 from edx/beryl/command_generate_cb_tasks

Update generate_course_blocks management command to enqueue tasks.
parents 200e8568 c04f4401
"""
Command to load course blocks.
"""
from collections import defaultdict
import logging
from django.core.management.base import BaseCommand, CommandError
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey
from django.core.management.base import BaseCommand
from xmodule.modulestore.django import modulestore
from openedx.core.djangoapps.content.block_structure.api import get_course_in_cache, update_course_in_cache
import openedx.core.djangoapps.content.block_structure.api as api
import openedx.core.djangoapps.content.block_structure.tasks as tasks
import openedx.core.lib.block_structure.cache as cache
from openedx.core.lib.command_utils import (
get_mutually_exclusive_required_option,
validate_dependent_option,
parse_course_keys,
)
log = logging.getLogger(__name__)
......@@ -29,159 +33,109 @@ class Command(BaseCommand):
Entry point for subclassed commands to add custom arguments.
"""
parser.add_argument(
'--all',
help='Generate course blocks for all or specified courses.',
action='store_true',
default=False,
'--courses',
dest='courses',
nargs='+',
help='Generate course blocks for the list of courses provided.',
)
parser.add_argument(
'--dags',
help='Find and log DAGs for all or specified courses.',
'--all_courses',
help='Generate course blocks for all courses, given the requested start and end indices.',
action='store_true',
default=False,
)
parser.add_argument(
'--force',
help='Force update of the course blocks for the requested courses.',
'--enqueue_task',
help='Enqueue the tasks for asynchronous computation.',
action='store_true',
default=False,
)
parser.add_argument(
'--verbose',
help='Enable verbose logging.',
'--routing_key',
dest='routing_key',
help='Routing key to use for asynchronous computation.',
)
parser.add_argument(
'--force_update',
help='Force update of the course blocks for the requested courses.',
action='store_true',
default=False,
)
parser.add_argument(
'--start',
help='Starting index of course.',
'--start_index',
help='Starting index of course list.',
default=0,
type=int,
)
parser.add_argument(
'--end',
help='Ending index of course.',
'--end_index',
help='Ending index of course list.',
default=0,
type=int,
)
def handle(self, *args, **options):
if options.get('all'):
course_keys = [course.id for course in modulestore().get_course_summaries()]
if options.get('start'):
end = options.get('end') or len(course_keys)
course_keys = course_keys[options['start']:end]
else:
if len(args) < 1:
raise CommandError('At least one course or --all must be specified.')
try:
course_keys = [CourseKey.from_string(arg) for arg in args]
except InvalidKeyError:
raise CommandError('Invalid key specified.')
log.info('Generating course blocks for %d courses.', len(course_keys))
courses_mode = get_mutually_exclusive_required_option(options, 'courses', 'all_courses')
validate_dependent_option(options, 'routing_key', 'enqueue_task')
validate_dependent_option(options, 'start_index', 'all_courses')
validate_dependent_option(options, 'end_index', 'all_courses')
if options.get('verbose'):
log.setLevel(logging.DEBUG)
if courses_mode == 'all_courses':
course_keys = [course.id for course in modulestore().get_course_summaries()]
if options.get('start_index'):
end = options.get('end_index') or len(course_keys)
course_keys = course_keys[options['start_index']:end]
else:
log.setLevel(logging.CRITICAL)
dag_info = _DAGInfo()
for course_key in course_keys:
try:
if options.get('force'):
block_structure = update_course_in_cache(course_key)
else:
block_structure = get_course_in_cache(course_key)
if options.get('dags'):
self._find_and_log_dags(block_structure, course_key, dag_info)
except Exception as ex: # pylint: disable=broad-except
log.exception(
'An error occurred while generating course blocks for %s: %s',
unicode(course_key),
ex.message,
)
course_keys = parse_course_keys(options['courses'])
log.info('Finished generating course blocks.')
self._set_log_levels(options)
if options.get('dags'):
log.critical('DAG data: %s', unicode(dag_info))
log.warning('STARTED generating Course Blocks for %d courses.', len(course_keys))
self._generate_course_blocks(options, course_keys)
log.warning('FINISHED generating Course Blocks for %d courses.', len(course_keys))
def _find_and_log_dags(self, block_structure, course_key, dag_info):
def _set_log_levels(self, options):
"""
Finds all DAGs within the given block structure.
Arguments:
BlockStructureBlockData - The block structure in which to find DAGs.
Sets logging levels for this module and the block structure
cache module, based on the given the options.
"""
for block_key in block_structure.get_block_keys():
parents = block_structure.get_parents(block_key)
if len(parents) > 1:
dag_info.on_dag_found(course_key, block_key)
log.warning(
'DAG alert - %s has multiple parents: %s.',
unicode(block_key),
[unicode(parent) for parent in parents],
)
class PrettyDefaultDict(defaultdict):
"""
Wraps defaultdict to provide a better string representation.
"""
__repr__ = dict.__repr__
class _DAGBlockTypeInfo(object):
"""
Class for aggregated DAG data for a specific block type.
"""
def __init__(self):
self.num_of_dag_blocks = 0
def __repr__(self):
return repr(vars(self))
if options.get('verbosity') == 0:
log_level = logging.CRITICAL
elif options.get('verbosity') == 1:
log_level = logging.WARNING
else:
log_level = logging.INFO
class _DAGCourseInfo(object):
"""
Class for aggregated DAG data for a specific course run.
"""
def __init__(self):
self.num_of_dag_blocks = 0
self.dag_data_by_block_type = PrettyDefaultDict(_DAGBlockTypeInfo)
if options.get('verbosity') < 3:
cache_log_level = logging.CRITICAL
else:
cache_log_level = logging.INFO
def __repr__(self):
return repr(vars(self))
log.setLevel(log_level)
cache.logger.setLevel(cache_log_level)
def on_dag_found(self, block_key):
def _generate_course_blocks(self, options, course_keys):
"""
Updates DAG collected data for the given block.
Generates course blocks for the given course_keys per the given options.
"""
self.num_of_dag_blocks += 1
self.dag_data_by_block_type[block_key.category].num_of_dag_blocks += 1
class _DAGInfo(object):
"""
Class for aggregated DAG data.
"""
def __init__(self):
self.total_num_of_dag_blocks = 0
self.total_num_of_dag_courses = 0
self.dag_data_by_course = PrettyDefaultDict(_DAGCourseInfo)
self.dag_data_by_block_type = PrettyDefaultDict(_DAGBlockTypeInfo)
for course_key in course_keys:
try:
log.info('STARTED generating Course Blocks for course: %s.', course_key)
def __repr__(self):
return repr(vars(self))
if options.get('enqueue_task'):
action = tasks.update_course_in_cache if options.get('force_update') else tasks.get_course_in_cache
task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {}
action.apply_async([unicode(course_key)], **task_options)
else:
action = api.update_course_in_cache if options.get('force_update') else api.get_course_in_cache
action(course_key)
def on_dag_found(self, course_key, block_key):
"""
Updates DAG collected data for the given block.
"""
self.total_num_of_dag_blocks += 1
if course_key not in self.dag_data_by_course:
self.total_num_of_dag_courses += 1
self.dag_data_by_course[unicode(course_key)].on_dag_found(block_key)
self.dag_data_by_block_type[block_key.category].num_of_dag_blocks += 1
log.info('FINISHED generating Course Blocks for course: %s.', course_key)
except Exception as ex: # pylint: disable=broad-except
log.exception(
'An error occurred while generating course blocks for %s: %s',
unicode(course_key),
ex.message,
)
"""
Tests for generate_course_blocks management command.
"""
import ddt
from django.core.management.base import CommandError
import itertools
from mock import patch
from xmodule.modulestore import ModuleStoreEnum
......@@ -11,86 +13,140 @@ from .. import generate_course_blocks
from openedx.core.djangoapps.content.block_structure.tests.helpers import is_course_in_block_structure_cache
@ddt.ddt
class TestGenerateCourseBlocks(ModuleStoreTestCase):
"""
Tests generate course blocks management command.
"""
num_courses = 2
def setUp(self):
"""
Create courses in modulestore.
"""
super(TestGenerateCourseBlocks, self).setUp()
self.course_1 = CourseFactory.create()
self.course_2 = CourseFactory.create()
self.courses = [CourseFactory.create() for _ in range(self.num_courses)]
self.course_keys = [course.id for course in self.courses]
self.command = generate_course_blocks.Command()
def _assert_courses_not_in_block_cache(self, *courses):
def _assert_courses_not_in_block_cache(self, *course_keys):
"""
Assert courses don't exist in the course block cache.
"""
for course_key in courses:
for course_key in course_keys:
self.assertFalse(is_course_in_block_structure_cache(course_key, self.store))
def _assert_courses_in_block_cache(self, *courses):
def _assert_courses_in_block_cache(self, *course_keys):
"""
Assert courses exist in course block cache.
"""
for course_key in courses:
for course_key in course_keys:
self.assertTrue(is_course_in_block_structure_cache(course_key, self.store))
def test_generate_all(self):
self._assert_courses_not_in_block_cache(self.course_1.id, self.course_2.id)
self.command.handle(all=True)
self._assert_courses_in_block_cache(self.course_1.id, self.course_2.id)
def _assert_message_presence_in_logs(self, message, mock_log, expected_presence=True):
"""
Asserts that the logger was called with the given message.
"""
message_present = any([message in call_args[0][0] for call_args in mock_log.warning.call_args_list])
if expected_presence:
self.assertTrue(message_present)
else:
self.assertFalse(message_present)
@ddt.data(True, False)
def test_all_courses(self, force_update):
self._assert_courses_not_in_block_cache(*self.course_keys)
self.command.handle(all_courses=True)
self._assert_courses_in_block_cache(*self.course_keys)
with patch(
'openedx.core.lib.block_structure.factory.BlockStructureFactory.create_from_modulestore'
) as mock_update_from_store:
self.command.handle(all=True)
mock_update_from_store.assert_not_called()
self.command.handle(all_courses=True, force_update=force_update)
self.assertEqual(mock_update_from_store.call_count, self.num_courses if force_update else 0)
def test_one_course(self):
self._assert_courses_not_in_block_cache(*self.course_keys)
self.command.handle(courses=[unicode(self.course_keys[0])])
self._assert_courses_in_block_cache(self.course_keys[0])
self._assert_courses_not_in_block_cache(*self.course_keys[1:])
@ddt.data(
*itertools.product(
(True, False),
(True, False),
('route_1', None),
)
)
@ddt.unpack
def test_enqueue(self, enqueue_task, force_update, routing_key):
command_options = dict(all_courses=True, enqueue_task=enqueue_task, force_update=force_update)
if enqueue_task and routing_key:
command_options['routing_key'] = routing_key
def test_generate_force(self):
self._assert_courses_not_in_block_cache(self.course_1.id, self.course_2.id)
self.command.handle(all=True)
self._assert_courses_in_block_cache(self.course_1.id, self.course_2.id)
with patch(
'openedx.core.lib.block_structure.factory.BlockStructureFactory.create_from_modulestore'
) as mock_update_from_store:
self.command.handle(all=True, force=True)
mock_update_from_store.assert_called()
'lms.djangoapps.course_blocks.management.commands.generate_course_blocks.tasks'
) as mock_tasks:
with patch(
'lms.djangoapps.course_blocks.management.commands.generate_course_blocks.api'
) as mock_api:
def test_generate_one(self):
self._assert_courses_not_in_block_cache(self.course_1.id, self.course_2.id)
self.command.handle(unicode(self.course_1.id))
self._assert_courses_in_block_cache(self.course_1.id)
self._assert_courses_not_in_block_cache(self.course_2.id)
self.command.handle(**command_options)
@patch('lms.djangoapps.course_blocks.management.commands.generate_course_blocks.log')
def test_generate_no_dags(self, mock_log):
self.command.handle(dags=True, all=True)
self.assertEquals(mock_log.warning.call_count, 0)
self.assertEqual(
mock_tasks.update_course_in_cache.apply_async.call_count,
self.num_courses if enqueue_task and force_update else 0,
)
self.assertEqual(
mock_tasks.get_course_in_cache.apply_async.call_count,
self.num_courses if enqueue_task and not force_update else 0,
)
@patch('lms.djangoapps.course_blocks.management.commands.generate_course_blocks.log')
def test_generate_with_dags(self, mock_log):
with self.store.branch_setting(ModuleStoreEnum.Branch.draft_preferred):
item1 = ItemFactory.create(parent=self.course_1)
item2 = ItemFactory.create(parent=item1)
item3 = ItemFactory.create(parent=item1)
item2.children.append(item3.location)
self.store.update_item(item2, ModuleStoreEnum.UserID.mgmt_command)
self.store.publish(self.course_1.location, ModuleStoreEnum.UserID.mgmt_command)
self.command.handle(dags=True, all=True)
self.assertEquals(mock_log.warning.call_count, 1)
self.assertEqual(
mock_api.update_course_in_cache.call_count,
self.num_courses if not enqueue_task and force_update else 0,
)
self.assertEqual(
mock_api.get_course_in_cache.call_count,
self.num_courses if not enqueue_task and not force_update else 0,
)
if enqueue_task:
task_action = mock_tasks.update_course_in_cache if force_update else mock_tasks.get_course_in_cache
task_options = task_action.apply_async.call_args[1]
if routing_key:
self.assertEquals(task_options['routing_key'], routing_key)
else:
self.assertNotIn('routing_key', task_options)
@patch('lms.djangoapps.course_blocks.management.commands.generate_course_blocks.log')
def test_not_found_key(self, mock_log):
self.command.handle('fake/course/id', all=False)
self.command.handle(courses=['fake/course/id'])
self.assertTrue(mock_log.exception.called)
def test_invalid_key(self):
with self.assertRaises(CommandError):
self.command.handle('not/found', all=False)
self.command.handle(courses=['not/found'])
def test_no_params(self):
with self.assertRaises(CommandError):
self.command.handle(all=False)
self.command.handle(all_courses=False)
def test_no_course_mode(self):
with self.assertRaisesMessage(CommandError, 'Either --courses or --all_courses must be specified.'):
self.command.handle()
def test_both_course_modes(self):
with self.assertRaisesMessage(CommandError, 'Both --courses and --all_courses cannot be specified.'):
self.command.handle(all_courses=True, courses=['some/course/key'])
@ddt.data(
('routing_key', 'enqueue_task'),
('start_index', 'all_courses'),
('end_index', 'all_courses'),
)
@ddt.unpack
def test_dependent_options_error(self, dependent_option, depending_on_option):
expected_error_message = 'Option --{} requires option --{}.'.format(dependent_option, depending_on_option)
options = {dependent_option: 1, depending_on_option: False, 'courses': ['some/course/key']}
with self.assertRaisesMessage(CommandError, expected_error_message):
self.command.handle(**options)
......@@ -8,9 +8,7 @@ from textwrap import dedent
from django.core.management.base import BaseCommand, CommandError
from django.db.models import Count
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey
from openedx.core.lib.command_utils import get_mutually_exclusive_required_option, parse_course_keys
from lms.djangoapps.grades.models import PersistentSubsectionGrade, PersistentCourseGrade
......@@ -73,8 +71,8 @@ class Command(BaseCommand):
modified_start = None
modified_end = None
run_mode = self._get_mutually_exclusive_option(options, 'delete', 'dry_run')
courses_mode = self._get_mutually_exclusive_option(options, 'courses', 'all_courses')
run_mode = get_mutually_exclusive_required_option(options, 'delete', 'dry_run')
courses_mode = get_mutually_exclusive_required_option(options, 'courses', 'all_courses')
if options.get('modified_start'):
modified_start = datetime.strptime(options['modified_start'], DATE_FORMAT)
......@@ -85,10 +83,7 @@ class Command(BaseCommand):
modified_end = datetime.strptime(options['modified_end'], DATE_FORMAT)
if courses_mode == 'courses':
try:
course_keys = [CourseKey.from_string(course_key_string) for course_key_string in options['courses']]
except InvalidKeyError as error:
raise CommandError('Invalid key specified: {}'.format(error.message))
course_keys = parse_course_keys(options['courses'])
log.info("reset_grade: Started in %s mode!", run_mode)
......@@ -135,16 +130,3 @@ class Command(BaseCommand):
grade_model_class.__name__,
total_for_all_courses,
)
def _get_mutually_exclusive_option(self, options, option_1, option_2):
"""
Validates that exactly one of the 2 given options is specified.
Returns the name of the found option.
"""
if not options.get(option_1) and not options.get(option_2):
raise CommandError('Either --{} or --{} must be specified.'.format(option_1, option_2))
if options.get(option_1) and options.get(option_2):
raise CommandError('Both --{} and --{} cannot be specified.'.format(option_1, option_2))
return option_1 if options.get(option_1) else option_2
......@@ -29,17 +29,37 @@ def update_course_in_cache(course_id):
"""
Updates the course blocks (in the database) for the specified course.
"""
_call_and_retry_if_needed(course_id, api.update_course_in_cache, update_course_in_cache)
@task(
default_retry_delay=settings.BLOCK_STRUCTURES_SETTINGS['BLOCK_STRUCTURES_TASK_DEFAULT_RETRY_DELAY'],
max_retries=settings.BLOCK_STRUCTURES_SETTINGS['BLOCK_STRUCTURES_TASK_MAX_RETRIES'],
)
def get_course_in_cache(course_id):
"""
Gets the course blocks for the specified course, updating the cache if needed.
"""
_call_and_retry_if_needed(course_id, api.get_course_in_cache, get_course_in_cache)
def _call_and_retry_if_needed(course_id, api_method, task_method):
"""
Calls the given api_method with the given course_id, retrying task_method upon failure.
"""
try:
course_key = CourseKey.from_string(course_id)
api.update_course_in_cache(course_key)
api_method(course_key)
except NO_RETRY_TASKS as exc:
# Known unrecoverable errors
raise
except RETRY_TASKS as exc:
log.exception("update_course_in_cache encounted expected error, retrying.")
raise update_course_in_cache.retry(args=[course_id], exc=exc)
log.exception("%s encountered expected error, retrying.", task_method.__name__)
raise task_method.retry(args=[course_id], exc=exc)
except Exception as exc: # pylint: disable=broad-except
log.exception("update_course_in_cache encounted unknown error. Retry #{}".format(
update_course_in_cache.request.retries,
))
raise update_course_in_cache.retry(args=[course_id], exc=exc)
log.exception(
"%s encountered unknown error. Retry #%d",
task_method.__name__,
task_method.request.retries,
)
raise task_method.retry(args=[course_id], exc=exc)
"""
Useful utilities for management commands.
"""
from django.core.management.base import CommandError
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey
def get_mutually_exclusive_required_option(options, option_1, option_2):
"""
Validates that exactly one of the 2 given options is specified.
Returns the name of the found option.
"""
validate_mutually_exclusive_option(options, option_1, option_2)
if not options.get(option_1) and not options.get(option_2):
raise CommandError('Either --{} or --{} must be specified.'.format(option_1, option_2))
return option_1 if options.get(option_1) else option_2
def validate_mutually_exclusive_option(options, option_1, option_2):
"""
Validates that both of the 2 given options are not specified.
"""
if options.get(option_1) and options.get(option_2):
raise CommandError('Both --{} and --{} cannot be specified.'.format(option_1, option_2))
def validate_dependent_option(options, dependent_option, depending_on_option):
"""
Validates that option_1 is specified if dependent_option is specified.
"""
if options.get(dependent_option) and not options.get(depending_on_option):
raise CommandError('Option --{} requires option --{}.'.format(dependent_option, depending_on_option))
def parse_course_keys(course_key_strings):
"""
Parses and returns a list of CourseKey objects from the given
list of course key strings.
"""
try:
return [CourseKey.from_string(course_key_string) for course_key_string in course_key_strings]
except InvalidKeyError as error:
raise CommandError('Invalid key specified: {}'.format(error.message))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment