Commit 89518d79 by Nimisha Asthagiri Committed by GitHub

Merge pull request #14719 from edx/neem/block-structure-waffle-in-task

Have generate_course_blocks pass with_storage to celery tasks
parents bb31724f e83182c7
......@@ -33,4 +33,5 @@ class Router(AlternateEnvironmentRouter):
"""
return {
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms',
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms',
}
......@@ -41,7 +41,7 @@ class ContentStoreImportTest(SignalDisconnectTestMixin, ModuleStoreTestCase):
self.client.login(username=self.user.username, password=self.user_password)
# block_structure.update_course_in_cache cannot succeed in tests, as it needs to be run async on an lms worker
self.task_patcher = patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache')
self.task_patcher = patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2')
self._mock_lms_task = self.task_patcher.start()
def tearDown(self):
......
......@@ -2,12 +2,17 @@
This module contains various configuration settings via
waffle switches for the Block Structure framework.
"""
import logging
from openedx.core.djangolib.waffle_utils import is_switch_enabled
from request_cache.middleware import request_cached
from request_cache.middleware import request_cached, RequestCache, func_call_cache_key
from .models import BlockStructureConfiguration
log = logging.getLogger(__name__)
INVALIDATE_CACHE_ON_PUBLISH = u'invalidate_cache_on_publish'
STORAGE_BACKING_FOR_CACHE = u'storage_backing_for_cache'
RAISE_ERROR_WHEN_NOT_FOUND = u'raise_error_when_not_found'
......@@ -23,6 +28,19 @@ def is_enabled(setting_name):
return is_switch_enabled(bs_waffle_name)
def enable_for_current_request(setting_name):
"""
Enables the given block_structure setting for the
duration of the current request.
"""
cache_key = func_call_cache_key(
is_switch_enabled.request_cached_contained_func,
_bs_waffle_switch_name(setting_name),
)
RequestCache.get_request_cache().data[cache_key] = True
log.warning(u'BlockStructure: Config %s is enabled for current request.', setting_name)
@request_cached
def num_versions_to_keep():
"""
......
......@@ -7,7 +7,7 @@ from django.core.management.base import BaseCommand
from xmodule.modulestore.django import modulestore
import openedx.core.djangoapps.content.block_structure.api as api
from openedx.core.djangoapps.content.block_structure.config import _bs_waffle_switch_name, STORAGE_BACKING_FOR_CACHE
from openedx.core.djangoapps.content.block_structure.config import STORAGE_BACKING_FOR_CACHE, enable_for_current_request
import openedx.core.djangoapps.content.block_structure.tasks as tasks
import openedx.core.djangoapps.content.block_structure.store as store
from openedx.core.lib.command_utils import (
......@@ -15,8 +15,6 @@ from openedx.core.lib.command_utils import (
validate_dependent_option,
parse_course_keys,
)
from request_cache.middleware import RequestCache, func_call_cache_key
from openedx.core.djangolib.waffle_utils import is_switch_enabled
log = logging.getLogger(__name__)
......@@ -100,9 +98,9 @@ class Command(BaseCommand):
self._set_log_levels(options)
log.critical(u'STARTED generating Course Blocks for %d courses.', len(course_keys))
log.critical(u'BlockStructure: STARTED generating Course Blocks for %d courses.', len(course_keys))
self._generate_course_blocks(options, course_keys)
log.critical(u'FINISHED generating Course Blocks for %d courses.', len(course_keys))
log.critical(u'BlockStructure: FINISHED generating Course Blocks for %d courses.', len(course_keys))
def _set_log_levels(self, options):
"""
......@@ -129,16 +127,14 @@ class Command(BaseCommand):
Generates course blocks for the given course_keys per the given options.
"""
if options.get('with_storage'):
self._enable_storage()
enable_for_current_request(STORAGE_BACKING_FOR_CACHE)
for course_key in course_keys:
try:
log.info(u'STARTED generating Course Blocks for course: %s.', course_key)
self._generate_for_course(options, course_key)
log.info(u'FINISHED generating Course Blocks for course: %s.', course_key)
except Exception as ex: # pylint: disable=broad-except
log.exception(
u'An error occurred while generating course blocks for %s: %s',
u'BlockStructure: An error occurred while generating course blocks for %s: %s',
unicode(course_key),
ex.message,
)
......@@ -148,20 +144,15 @@ class Command(BaseCommand):
Generates course blocks for the given course_key per the given options.
"""
if options.get('enqueue_task'):
action = tasks.update_course_in_cache if options.get('force_update') else tasks.get_course_in_cache
action = tasks.update_course_in_cache_v2 if options.get('force_update') else tasks.get_course_in_cache_v2
task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {}
action.apply_async([unicode(course_key)], **task_options)
result = action.apply_async(
kwargs=dict(course_id=unicode(course_key), with_storage=options.get('with_storage')),
**task_options
)
log.info(u'BlockStructure: ENQUEUED generating for course: %s, task_id: %s.', course_key, result.id)
else:
log.info(u'BlockStructure: STARTED generating for course: %s.', course_key)
action = api.update_course_in_cache if options.get('force_update') else api.get_course_in_cache
action(course_key)
def _enable_storage(self):
"""
Enables storage backing by setting the waffle's cached value to True.
"""
cache_key = func_call_cache_key(
is_switch_enabled.request_cached_contained_func,
_bs_waffle_switch_name(STORAGE_BACKING_FOR_CACHE),
)
RequestCache.get_request_cache().data[cache_key] = True
log.warning(u'STORAGE_BACKING_FOR_CACHE is enabled.')
log.info(u'BlockStructure: FINISHED generating for course: %s.', course_key)
......@@ -107,20 +107,20 @@ class TestGenerateCourseBlocks(ModuleStoreTestCase):
command_options['routing_key'] = routing_key
with patch(
'lms.djangoapps.course_blocks.management.commands.generate_course_blocks.tasks'
'openedx.core.djangoapps.content.block_structure.management.commands.generate_course_blocks.tasks'
) as mock_tasks:
with patch(
'lms.djangoapps.course_blocks.management.commands.generate_course_blocks.api'
'openedx.core.djangoapps.content.block_structure.management.commands.generate_course_blocks.api'
) as mock_api:
self.command.handle(**command_options)
self.assertEqual(
mock_tasks.update_course_in_cache.apply_async.call_count,
mock_tasks.update_course_in_cache_v2.apply_async.call_count,
self.num_courses if enqueue_task and force_update else 0,
)
self.assertEqual(
mock_tasks.get_course_in_cache.apply_async.call_count,
mock_tasks.get_course_in_cache_v2.apply_async.call_count,
self.num_courses if enqueue_task and not force_update else 0,
)
......@@ -134,14 +134,17 @@ class TestGenerateCourseBlocks(ModuleStoreTestCase):
)
if enqueue_task:
task_action = mock_tasks.update_course_in_cache if force_update else mock_tasks.get_course_in_cache
if force_update:
task_action = mock_tasks.update_course_in_cache_v2
else:
task_action = mock_tasks.get_course_in_cache_v2
task_options = task_action.apply_async.call_args[1]
if routing_key:
self.assertEquals(task_options['routing_key'], routing_key)
else:
self.assertNotIn('routing_key', task_options)
@patch('lms.djangoapps.course_blocks.management.commands.generate_course_blocks.log')
@patch('openedx.core.djangoapps.content.block_structure.management.commands.generate_course_blocks.log')
def test_not_found_key(self, mock_log):
self.command.handle(courses=['fake/course/id'])
self.assertTrue(mock_log.exception.called)
......
......@@ -10,11 +10,11 @@ from opaque_keys.edx.locator import LibraryLocator
from . import config
from .api import clear_course_from_cache
from .tasks import update_course_in_cache
from .tasks import update_course_in_cache_v2
@receiver(SignalHandler.course_published)
def _listen_for_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument
def _update_block_structure_on_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument
"""
Catches the signal that a course has been published in the module
store and creates/updates the corresponding cache entry.
......@@ -26,14 +26,14 @@ def _listen_for_course_publish(sender, course_key, **kwargs): # pylint: disable
if config.is_enabled(config.INVALIDATE_CACHE_ON_PUBLISH):
clear_course_from_cache(course_key)
update_course_in_cache.apply_async(
[unicode(course_key)],
update_course_in_cache_v2.apply_async(
kwargs=dict(course_id=unicode(course_key)),
countdown=settings.BLOCK_STRUCTURES_SETTINGS['COURSE_PUBLISH_TASK_DELAY'],
)
@receiver(SignalHandler.course_deleted)
def _listen_for_course_delete(sender, course_key, **kwargs): # pylint: disable=unused-argument
def _delete_block_structure_on_course_delete(sender, course_key, **kwargs): # pylint: disable=unused-argument
"""
Catches the signal that a course has been deleted from the
module store and invalidates the corresponding cache entry if one
......
......@@ -13,6 +13,7 @@ from opaque_keys.edx.keys import CourseKey
from xmodule.modulestore.exceptions import ItemNotFoundError
from openedx.core.djangoapps.content.block_structure import api
from openedx.core.djangoapps.content.block_structure.config import STORAGE_BACKING_FOR_CACHE, enable_for_current_request
log = logging.getLogger('edx.celery.task')
......@@ -21,53 +22,101 @@ RETRY_TASKS = (ItemNotFoundError, TypeError, ValInternalError)
NO_RETRY_TASKS = (XMLSyntaxError, LoncapaProblemError, UnicodeEncodeError)
@task(
default_retry_delay=settings.BLOCK_STRUCTURES_SETTINGS['TASK_DEFAULT_RETRY_DELAY'],
max_retries=settings.BLOCK_STRUCTURES_SETTINGS['TASK_MAX_RETRIES'],
bind=True,
)
def block_structure_task(**kwargs):
"""
Decorator for block structure tasks.
"""
return task(
default_retry_delay=settings.BLOCK_STRUCTURES_SETTINGS['TASK_DEFAULT_RETRY_DELAY'],
max_retries=settings.BLOCK_STRUCTURES_SETTINGS['TASK_MAX_RETRIES'],
bind=True,
**kwargs
)
@block_structure_task()
def update_course_in_cache_v2(self, **kwargs):
"""
Updates the course blocks (mongo -> BlockStructure) for the specified course.
Keyword Arguments:
course_id (string) - The string serialized value of the course key.
with_storage (boolean) - Whether or not storage backing should be
enabled for the generated block structure(s).
"""
_update_course_in_cache(self, **kwargs)
@block_structure_task()
def update_course_in_cache(self, course_id):
"""
Updates the course blocks (in the database) for the specified course.
Updates the course blocks (mongo -> BlockStructure) for the specified course.
"""
_update_course_in_cache(self, course_id=course_id)
def _update_course_in_cache(self, **kwargs):
"""
Updates the course blocks (mongo -> BlockStructure) for the specified course.
"""
if kwargs.get('with_storage'):
enable_for_current_request(STORAGE_BACKING_FOR_CACHE)
_call_and_retry_if_needed(self, api.update_course_in_cache, **kwargs)
@block_structure_task()
def get_course_in_cache_v2(self, **kwargs):
"""
Gets the course blocks for the specified course, updating the cache if needed.
Keyword Arguments:
course_id (string) - The string serialized value of the course key.
with_storage (boolean) - Whether or not storage backing should be
enabled for any generated block structure(s).
"""
_call_and_retry_if_needed(course_id, api.update_course_in_cache, update_course_in_cache, self.request.id)
_get_course_in_cache(self, **kwargs)
@task(
default_retry_delay=settings.BLOCK_STRUCTURES_SETTINGS['TASK_DEFAULT_RETRY_DELAY'],
max_retries=settings.BLOCK_STRUCTURES_SETTINGS['TASK_MAX_RETRIES'],
bind=True,
)
@block_structure_task()
def get_course_in_cache(self, course_id):
"""
Gets the course blocks for the specified course, updating the cache if needed.
"""
_call_and_retry_if_needed(course_id, api.get_course_in_cache, get_course_in_cache, self.request.id)
_get_course_in_cache(self, course_id=course_id)
def _get_course_in_cache(self, **kwargs):
"""
Gets the course blocks for the specified course, updating the cache if needed.
"""
if kwargs.get('with_storage'):
enable_for_current_request(STORAGE_BACKING_FOR_CACHE)
_call_and_retry_if_needed(self, api.get_course_in_cache, **kwargs)
def _call_and_retry_if_needed(course_id, api_method, task_method, task_id):
def _call_and_retry_if_needed(self, api_method, **kwargs):
"""
Calls the given api_method with the given course_id, retrying task_method upon failure.
"""
try:
course_key = CourseKey.from_string(course_id)
course_key = CourseKey.from_string(kwargs['course_id'])
api_method(course_key)
except NO_RETRY_TASKS as exc:
except NO_RETRY_TASKS:
# Known unrecoverable errors
log.exception(
"update_course_in_cache encountered unrecoverable error in course {}, task_id {}".format(
course_id,
task_id
)
"BlockStructure: %s encountered unrecoverable error in course %s, task_id %s",
self.__name__,
kwargs.get('course_id'),
self.request.id,
)
raise
except RETRY_TASKS as exc:
log.exception("%s encountered expected error, retrying.", task_method.__name__)
raise task_method.retry(args=[course_id], exc=exc)
log.exception("%s encountered expected error, retrying.", self.__name__)
raise self.retry(kwargs=kwargs, exc=exc)
except Exception as exc: # pylint: disable=broad-except
log.exception(
"%s encountered unknown error. Retry #%d",
task_method.__name__,
task_method.request.retries,
"BlockStructure: %s encountered unknown error in course %s, task_id %s. Retry #%d",
self.__name__,
kwargs.get('course_id'),
self.request.id,
self.request.retries,
)
raise task_method.retry(args=[course_id], exc=exc)
raise self.retry(kwargs=kwargs, exc=exc)
......@@ -11,7 +11,7 @@ from xmodule.modulestore.tests.factories import CourseFactory
from ..api import get_block_structure_manager
from ..config import INVALIDATE_CACHE_ON_PUBLISH
from ..signals import _listen_for_course_publish
from ..signals import _update_block_structure_on_course_publish
from .helpers import is_course_in_block_structure_cache, override_config_setting
......@@ -76,7 +76,7 @@ class CourseBlocksSignalTest(ModuleStoreTestCase):
(LibraryLocator(org='org', course='course'), False),
)
@ddt.unpack
@patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache.apply_async')
@patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2.apply_async')
def test_update_only_for_courses(self, key, expect_update_called, mock_update):
_listen_for_course_publish(sender=None, course_key=key)
_update_block_structure_on_course_publish(sender=None, course_key=key)
self.assertEqual(mock_update.called, expect_update_called)
......@@ -6,19 +6,19 @@ from mock import patch
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from ..tasks import update_course_in_cache
from ..tasks import update_course_in_cache_v2
class UpdateCourseInCacheTaskTest(ModuleStoreTestCase):
"""
Ensures that the update_course_in_cache task runs as expected.
"""
@patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache.retry')
@patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2.retry')
@patch('openedx.core.djangoapps.content.block_structure.api.update_course_in_cache')
def test_retry_on_error(self, mock_update, mock_retry):
"""
Ensures that tasks will be retried if IntegrityErrors are encountered.
"""
mock_update.side_effect = Exception("WHAMMY")
update_course_in_cache.apply(args=["invalid_course_key raises exception 12345 meow"])
update_course_in_cache_v2.apply(kwargs=dict(course_id="invalid_course_key raises exception 12345 meow"))
self.assertTrue(mock_retry.called)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment