routers.py 1.98 KB
Newer Older
1 2 3 4 5 6 7
"""
Custom routers used by both lms and cms when routing tasks to worker queues.

For more, see http://celery.readthedocs.io/en/latest/userguide/routing.html#routers
"""
from abc import ABCMeta, abstractproperty
from django.conf import settings
8 9 10
import logging

log = logging.getLogger(__name__)
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35


class AlternateEnvironmentRouter(object):
    """
    A custom Router class for use in routing celery tasks to non-default queues.
    """
    # this is an abstract base class, implementations must provide alternate_env_tasks
    __metaclass__ = ABCMeta

    @abstractproperty
    def alternate_env_tasks(self):
        """
        Defines the task -> alternate worker environment queue to be used when routing.

        Subclasses must override this property with their own specific mappings.
        """
        return {}

    def route_for_task(self, task, args=None, kwargs=None):  # pylint: disable=unused-argument
        """
        Celery-defined method allowing for custom routing logic.

        If None is returned from this method, default routing logic is used.
        """
        alternate_env = self.alternate_env_tasks.get(task, None)
36 37 38 39 40 41 42
        if 'update_course_in_cache' in task:
            log.info("TNL-5408: task={task}, args={args}, alternate_env={alt_env}, queues={queues}".format(
                task=task,
                args=args,
                alt_env=alternate_env,
                queues=getattr(settings, 'CELERY_QUEUES', []).keys()
            ))
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
        if alternate_env:
            return self.ensure_queue_env(alternate_env)
        return None

    def ensure_queue_env(self, desired_env):
        """
        Helper method to get the desired type of queue.

        If no such queue is defined, default routing logic is used.
        """
        queues = getattr(settings, 'CELERY_QUEUES', None)
        return next(
            (
                queue
                for queue in queues
                if '.{}.'.format(desired_env) in queue
            ),
            None
        )