Commit fdc6d915 by Eric Fischer Committed by GitHub

update_in_cache on lms worker (#12689)

This commit "undoes"a previous hotfix, and allows a cms course_publish
signal to trigger a block_structure update_course_in_cache task, which
is run on an lms worker queue.

Changes:
    -exposes ALTERNATE_QUEUE_ENVS
    -adds routing layer in celery.py
    -moves prior dev_with_worker settings file to devstack_with_worker
    -moves course_block api functionality into openedx/core/djangoapps/content/block_structure
parent d544340f
......@@ -5,12 +5,10 @@ and auto discover tasks in all installed django apps.
Taken from: http://celery.readthedocs.org/en/latest/django/first-steps-with-django.html
"""
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
from openedx.core.lib.celery.routers import AlternateEnvironmentRouter
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
......@@ -21,3 +19,18 @@ APP = Celery('proj')
# pickle the object when using Windows.
APP.config_from_object('django.conf:settings')
APP.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
class Router(AlternateEnvironmentRouter):
"""
An implementation of AlternateEnvironmentRouter, for routing tasks to non-cms queues.
"""
@property
def alternate_env_tasks(self):
"""
Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
"""
return {
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms',
}
......@@ -172,8 +172,8 @@ class ContentStoreImportTest(SignalDisconnectTestMixin, ModuleStoreTestCase):
# we try to refresh the inheritance tree for each update_item in the import
with check_exact_number_of_calls(store, 'refresh_cached_metadata_inheritance_tree', 28):
# _get_cached_metadata_inheritance_tree should be called only once
with check_exact_number_of_calls(store, '_get_cached_metadata_inheritance_tree', 1):
# _get_cached_metadata_inheritance_tree should be called twice (once for import, once on publish)
with check_exact_number_of_calls(store, '_get_cached_metadata_inheritance_tree', 2):
# with bulk-edit in progress, the inheritance tree should be recomputed only at the end of the import
# NOTE: On Jenkins, with memcache enabled, the number of calls here is only 1.
......
......@@ -87,6 +87,21 @@ CELERY_QUEUES = {
DEFAULT_PRIORITY_QUEUE: {}
}
# Setup alternate queues, to allow access to cross-process workers
ALTERNATE_QUEUE_ENVS = os.environ.get('ALTERNATE_WORKER_QUEUES', '').split()
ALTERNATE_QUEUES = [
DEFAULT_PRIORITY_QUEUE.replace(QUEUE_VARIANT, alternate + '.')
for alternate in ALTERNATE_QUEUE_ENVS
]
CELERY_QUEUES.update(
{
alternate: {}
for alternate in ALTERNATE_QUEUES
if alternate not in CELERY_QUEUES.keys()
}
)
CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT)
############# NON-SECURE ENV CONFIG ##############################
# Things like server locations, ports, etc.
with open(CONFIG_ROOT / CONFIG_PREFIX + "env.json") as env_file:
......
"""
This config file follows the dev enviroment, but adds the
This config file follows the devstack enviroment, but adds the
requirement of a celery worker running in the background to process
celery tasks.
The worker can be executed using:
When testing locally, run lms/cms with this settings file as well, to test queueing
of tasks onto the appropriate workers.
django_admin.py celery worker
In two separate processes on devstack:
paver devstack studio --settings=devstack_with_worker
./manage.py cms celery worker --settings=devstack_with_worker
"""
# We intentionally define lots of variables that aren't used, and
# want to import all variables from base settings files
# pylint: disable=wildcard-import, unused-wildcard-import
from cms.envs.devstack import *
from dev import *
################################# CELERY ######################################
# Requires a separate celery worker
# Require a separate celery worker
CELERY_ALWAYS_EAGER = False
# Use django db as the broker and result store
BROKER_URL = 'django://'
INSTALLED_APPS += ('djcelery.transport', )
CELERY_RESULT_BACKEND = 'database'
DJKOMBU_POLLING_INTERVAL = 1.0
# Disable transaction management because we are using a worker. Views
# that request a task and wait for the result will deadlock otherwise.
for database_name in DATABASES:
......
......@@ -5,12 +5,10 @@ and auto discover tasks in all installed django apps.
Taken from: http://celery.readthedocs.org/en/latest/django/first-steps-with-django.html
"""
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
from openedx.core.lib.celery.routers import AlternateEnvironmentRouter
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
......@@ -21,3 +19,16 @@ APP = Celery('proj')
# pickle the object when using Windows.
APP.config_from_object('django.conf:settings')
APP.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
class Router(AlternateEnvironmentRouter):
"""
An implementation of AlternateEnvironmentRouter, for routing tasks to non-cms queues.
"""
@property
def alternate_env_tasks(self):
"""
Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
"""
return {}
......@@ -4,7 +4,7 @@ Tests for Blocks api.py
from django.test.client import RequestFactory
from course_blocks.tests.helpers import EnableTransformerRegistryMixin
from openedx.core.djangoapps.content.block_structure.tests.helpers import EnableTransformerRegistryMixin
from student.tests.factories import UserFactory
from xmodule.modulestore import ModuleStoreEnum
from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
......
......@@ -6,8 +6,8 @@ from django.http import Http404, QueryDict
from urllib import urlencode
from rest_framework.exceptions import PermissionDenied
from course_blocks.tests.helpers import EnableTransformerRegistryMixin
from opaque_keys.edx.locator import CourseLocator
from openedx.core.djangoapps.content.block_structure.tests.helpers import EnableTransformerRegistryMixin
from openedx.core.djangoapps.util.test_forms import FormTestMixin
from student.models import CourseEnrollment
from student.tests.factories import UserFactory, CourseEnrollmentFactory
......
......@@ -3,7 +3,7 @@ Tests for Course Blocks serializers
"""
from mock import MagicMock
from course_blocks.tests.helpers import EnableTransformerRegistryMixin
from openedx.core.djangoapps.content.block_structure.tests.helpers import EnableTransformerRegistryMixin
from openedx.core.lib.block_structure.transformers import BlockStructureTransformers
from student.tests.factories import UserFactory
from xmodule.modulestore import ModuleStoreEnum
......
......@@ -7,8 +7,8 @@ from string import join
from urllib import urlencode
from urlparse import urlunparse
from course_blocks.tests.helpers import EnableTransformerRegistryMixin
from opaque_keys.edx.locator import CourseLocator
from openedx.core.djangoapps.content.block_structure.tests.helpers import EnableTransformerRegistryMixin
from student.models import CourseEnrollment
from student.tests.factories import AdminFactory, CourseEnrollmentFactory, UserFactory
from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
......
......@@ -22,6 +22,3 @@ instead (https://openedx.atlassian.net/browse/MA-1019). We have
introduced this redundancy in the short-term as an incremental
implementation approach, reducing risk with initial release of this app.
"""
# Importing signals is necessary to activate the course publish/delete signal handlers.
from . import signals
"""
API entry point to the course_blocks app with top-level
get_course_blocks and clear_course_from_cache functions.
get_course_blocks function.
"""
from django.core.cache import cache
from openedx.core.djangoapps.content.block_structure.api import get_block_structure_manager
from openedx.core.lib.block_structure.manager import BlockStructureManager
from openedx.core.lib.block_structure.transformers import BlockStructureTransformers
from xmodule.modulestore.django import modulestore
......@@ -60,59 +61,7 @@ def get_course_blocks(
transformers = BlockStructureTransformers(COURSE_BLOCK_ACCESS_TRANSFORMERS)
transformers.usage_info = CourseUsageInfo(starting_block_usage_key.course_key, user)
return _get_block_structure_manager(starting_block_usage_key.course_key).get_transformed(
return get_block_structure_manager(starting_block_usage_key.course_key).get_transformed(
transformers,
starting_block_usage_key,
)
def get_course_in_cache(course_key):
"""
A higher order function implemented on top of the
block_structure.get_collected function that returns the block
structure in the cache for the given course_key.
Returns:
BlockStructureBlockData - The collected block structure,
starting at root_block_usage_key.
"""
return _get_block_structure_manager(course_key).get_collected()
def update_course_in_cache(course_key):
"""
A higher order function implemented on top of the
block_structure.updated_collected function that updates the block
structure in the cache for the given course_key.
"""
return _get_block_structure_manager(course_key).update_collected()
def clear_course_from_cache(course_key):
"""
A higher order function implemented on top of the
block_structure.clear_block_cache function that clears the block
structure from the cache for the given course_key.
Note: See Note in get_course_blocks. Even after MA-1604 is
implemented, this implementation should still be valid since the
entire block structure of the course is cached, even though
arbitrary access to an intermediate block will be supported.
"""
_get_block_structure_manager(course_key).clear()
def _get_block_structure_manager(course_key):
"""
Returns the manager for managing Block Structures for the given course.
"""
store = modulestore()
course_usage_key = store.make_course_usage_key(course_key)
return BlockStructureManager(course_usage_key, store, _get_cache())
def _get_cache():
"""
Returns the storage for caching Block Structures.
"""
return cache
......@@ -8,7 +8,7 @@ from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey
from xmodule.modulestore.django import modulestore
from ...api import get_course_in_cache, update_course_in_cache
from openedx.core.djangoapps.content.block_structure.api import get_course_in_cache, update_course_in_cache
log = logging.getLogger(__name__)
......
......@@ -8,7 +8,7 @@ from xmodule.modulestore import ModuleStoreEnum
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory
from .. import generate_course_blocks
from ....tests.helpers import is_course_in_block_structure_cache
from openedx.core.djangoapps.content.block_structure.tests.helpers import is_course_in_block_structure_cache
class TestGenerateCourseBlocks(ModuleStoreTestCase):
......
......@@ -4,9 +4,10 @@ Tests for ContentLibraryTransformer.
import mock
from student.tests.factories import CourseEnrollmentFactory
from openedx.core.djangoapps.content.block_structure.api import clear_course_from_cache
from openedx.core.lib.block_structure.transformers import BlockStructureTransformers
from ...api import get_course_blocks, clear_course_from_cache
from ...api import get_course_blocks
from ..library_content import ContentLibraryTransformer
from .helpers import CourseStructureTestCase
......
......@@ -5,8 +5,8 @@ from django.core.urlresolvers import reverse
from mock import Mock
from . import BaseTestXmodule
from course_api.blocks.tests.helpers import deserialize_usage_key
from course_blocks.tests.helpers import EnableTransformerRegistryMixin
from courseware.module_render import get_module_for_descriptor_internal
from openedx.core.djangoapps.content.block_structure.tests.helpers import EnableTransformerRegistryMixin
from student.tests.factories import UserFactory, CourseEnrollmentFactory
from xmodule.discussion_module import DiscussionModule
from xmodule.modulestore import ModuleStoreEnum
......
......@@ -10,9 +10,9 @@ from student.tests.factories import UserFactory
from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
from xmodule.modulestore.tests.factories import check_mongo_calls
from lms.djangoapps.course_blocks.api import _get_cache
from lms.djangoapps.course_blocks.api import get_course_blocks
from lms.djangoapps.course_blocks.transformers.tests.helpers import CourseStructureTestCase
from openedx.core.djangoapps.content.block_structure.api import get_cache
from ..transformers.grades import GradesTransformer
......@@ -245,6 +245,6 @@ class MultiProblemModulestoreAccessTestCase(CourseStructureTestCase, SharedModul
}
)
blocks = self.build_course(course)
_get_cache().clear()
get_cache().clear()
with check_mongo_calls(2):
get_course_blocks(self.student, blocks[u'course'].location, self.transformers)
......@@ -97,6 +97,21 @@ CELERY_QUEUES = {
HIGH_MEM_QUEUE: {},
}
# Setup alternate queues, to allow access to cross-process workers
ALTERNATE_QUEUE_ENVS = os.environ.get('ALTERNATE_WORKER_QUEUES', '').split()
ALTERNATE_QUEUES = [
DEFAULT_PRIORITY_QUEUE.replace(QUEUE_VARIANT, alternate + '.')
for alternate in ALTERNATE_QUEUE_ENVS
]
CELERY_QUEUES.update(
{
alternate: {}
for alternate in ALTERNATE_QUEUES
if alternate not in CELERY_QUEUES.keys()
}
)
CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT)
# If we're a worker on the high_mem queue, set ourselves to die after processing
# one request to avoid having memory leaks take down the worker server. This env
# var is set in /etc/init/edx-workers.conf -- this should probably be replaced
......
"""
This config file follows the dev enviroment, but adds the
This config file follows the devstack enviroment, but adds the
requirement of a celery worker running in the background to process
celery tasks.
The worker can be executed using:
When testing locally, run lms/cms with this settings file as well, to test queueing
of tasks onto the appropriate workers.
django_admin.py celery worker
In two separate processes on devstack:
paver devstack lms --settings=devstack_with_worker
./manage.py lms celery worker --settings=devstack_with_worker
"""
# We intentionally define lots of variables that aren't used, and
# want to import all variables from base settings files
# pylint: disable=wildcard-import, unused-wildcard-import
from lms.envs.devstack import *
from lms.envs.dev import *
################################# CELERY ######################################
# Requires a separate celery worker
# Require a separate celery worker
CELERY_ALWAYS_EAGER = False
# Use django db as the broker and result store
BROKER_URL = 'django://'
INSTALLED_APPS += ('djcelery.transport', )
CELERY_RESULT_BACKEND = 'database'
DJKOMBU_POLLING_INTERVAL = 1.0
# Disable transaction management because we are using a worker. Views
# that request a task and wait for the result will deadlock otherwise.
for database_name in DATABASES:
......
......@@ -2,3 +2,4 @@
Setup the signals on startup.
"""
import openedx.core.djangoapps.content.course_structures.signals
import openedx.core.djangoapps.content.block_structure.signals
"""
This code exists in openedx/core/djangoapp because it needs access to django signaling mechanisms
Most of the underlying functionality is implemented in openedx/core/lib/block_structure/
"""
"""
Higher order functions built on the BlockStructureManager to interact with a django cache.
"""
from django.core.cache import cache
from openedx.core.lib.block_structure.manager import BlockStructureManager
from xmodule.modulestore.django import modulestore
def get_course_in_cache(course_key):
"""
A higher order function implemented on top of the
block_structure.get_collected function that returns the block
structure in the cache for the given course_key.
Returns:
BlockStructureBlockData - The collected block structure,
starting at root_block_usage_key.
"""
return get_block_structure_manager(course_key).get_collected()
def update_course_in_cache(course_key):
"""
A higher order function implemented on top of the
block_structure.updated_collected function that updates the block
structure in the cache for the given course_key.
"""
return get_block_structure_manager(course_key).update_collected()
def clear_course_from_cache(course_key):
"""
A higher order function implemented on top of the
block_structure.clear_block_cache function that clears the block
structure from the cache for the given course_key.
Note: See Note in get_course_blocks. Even after MA-1604 is
implemented, this implementation should still be valid since the
entire block structure of the course is cached, even though
arbitrary access to an intermediate block will be supported.
"""
get_block_structure_manager(course_key).clear()
def get_block_structure_manager(course_key):
"""
Returns the manager for managing Block Structures for the given course.
"""
store = modulestore()
course_usage_key = store.make_course_usage_key(course_key)
return BlockStructureManager(course_usage_key, store, get_cache())
def get_cache():
"""
Returns the storage for caching Block Structures.
"""
return cache
......@@ -5,13 +5,12 @@ import logging
from celery.task import task
from opaque_keys.edx.keys import CourseKey
from . import api
from openedx.core.djangoapps.content.block_structure import api
log = logging.getLogger('edx.celery.task')
@task()
@task
def update_course_in_cache(course_key):
"""
Updates the course blocks (in the database) for the specified course.
......
......@@ -4,7 +4,7 @@ Helpers for Course Blocks tests.
from openedx.core.lib.block_structure.cache import BlockStructureCache
from openedx.core.lib.block_structure.transformer_registry import TransformerRegistry
from ..api import _get_cache
from ..api import get_cache
class EnableTransformerRegistryMixin(object):
......@@ -30,4 +30,4 @@ def is_course_in_block_structure_cache(course_key, store):
Returns whether the given course is in the Block Structure cache.
"""
course_usage_key = store.make_course_usage_key(course_key)
return BlockStructureCache(_get_cache()).get(course_usage_key) is not None
return BlockStructureCache(get_cache()).get(course_usage_key) is not None
......@@ -6,8 +6,7 @@ from xmodule.modulestore.exceptions import ItemNotFoundError
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from ..api import get_course_blocks, _get_block_structure_manager
from ..transformers.visibility import VisibilityTransformer
from ..api import get_block_structure_manager
from .helpers import is_course_in_block_structure_cache, EnableTransformerRegistryMixin
......@@ -22,26 +21,30 @@ class CourseBlocksSignalTest(EnableTransformerRegistryMixin, ModuleStoreTestCase
self.course = CourseFactory.create()
self.course_usage_key = self.store.make_course_usage_key(self.course.id)
def test_course_publish(self):
# course is not visible to staff only
self.assertFalse(self.course.visible_to_staff_only)
orig_block_structure = get_course_blocks(self.user, self.course_usage_key)
self.assertFalse(
VisibilityTransformer.get_visible_to_staff_only(orig_block_structure, self.course_usage_key)
def test_course_update(self):
test_display_name = "Lightsabers 101"
# Course exists in cache initially
bs_manager = get_block_structure_manager(self.course.id)
orig_block_structure = bs_manager.get_collected()
self.assertTrue(is_course_in_block_structure_cache(self.course.id, self.store))
self.assertNotEqual(
test_display_name,
orig_block_structure.get_xblock_field(self.course_usage_key, 'display_name')
)
# course becomes visible to staff only
self.course.visible_to_staff_only = True
self.course.display_name = test_display_name
self.store.update_item(self.course, self.user.id)
updated_block_structure = get_course_blocks(self.user, self.course_usage_key)
self.assertTrue(
VisibilityTransformer.get_visible_to_staff_only(updated_block_structure, self.course_usage_key)
# Cached version of course has been updated
updated_block_structure = bs_manager.get_collected()
self.assertEqual(
test_display_name,
updated_block_structure.get_xblock_field(self.course_usage_key, 'display_name')
)
def test_course_delete(self):
get_course_blocks(self.user, self.course_usage_key)
bs_manager = _get_block_structure_manager(self.course.id)
bs_manager = get_block_structure_manager(self.course.id)
self.assertIsNotNone(bs_manager.get_collected())
self.assertTrue(is_course_in_block_structure_cache(self.course.id, self.store))
......
......@@ -96,13 +96,3 @@ def update_course_structure(course_key):
structure_model.structure_json = structure_json
structure_model.discussion_id_map_json = discussion_id_map_json
structure_model.save()
# TODO (TNL-4630) For temporary hotfix to delete the block_structure cache.
# Should be moved to proper location.
from django.core.cache import cache
from openedx.core.lib.block_structure.manager import BlockStructureManager
store = modulestore()
course_usage_key = store.make_course_usage_key(course_key)
block_structure_manager = BlockStructureManager(course_usage_key, store, cache)
block_structure_manager.clear()
"""
Custom routers used by both lms and cms when routing tasks to worker queues.
For more, see http://celery.readthedocs.io/en/latest/userguide/routing.html#routers
"""
from abc import ABCMeta, abstractproperty
from django.conf import settings
class AlternateEnvironmentRouter(object):
"""
A custom Router class for use in routing celery tasks to non-default queues.
"""
# this is an abstract base class, implementations must provide alternate_env_tasks
__metaclass__ = ABCMeta
@abstractproperty
def alternate_env_tasks(self):
"""
Defines the task -> alternate worker environment queue to be used when routing.
Subclasses must override this property with their own specific mappings.
"""
return {}
def route_for_task(self, task, args=None, kwargs=None): # pylint: disable=unused-argument
"""
Celery-defined method allowing for custom routing logic.
If None is returned from this method, default routing logic is used.
"""
alternate_env = self.alternate_env_tasks.get(task, None)
if alternate_env:
return self.ensure_queue_env(alternate_env)
return None
def ensure_queue_env(self, desired_env):
"""
Helper method to get the desired type of queue.
If no such queue is defined, default routing logic is used.
"""
queues = getattr(settings, 'CELERY_QUEUES', None)
return next(
(
queue
for queue in queues
if '.{}.'.format(desired_env) in queue
),
None
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment