Commit b610215e by Eric Fischer

Get alternate queues from ENV_TOKENS

parent d2b6c9eb
...@@ -87,12 +87,6 @@ CELERY_QUEUES = { ...@@ -87,12 +87,6 @@ CELERY_QUEUES = {
DEFAULT_PRIORITY_QUEUE: {} DEFAULT_PRIORITY_QUEUE: {}
} }
# Setup alternate queues, to allow access to cross-process workers
ALTERNATE_QUEUE_ENVS = os.environ.get('ALTERNATE_WORKER_QUEUES', '').split()
ALTERNATE_QUEUES = [
DEFAULT_PRIORITY_QUEUE.replace(QUEUE_VARIANT, alternate + '.')
for alternate in ALTERNATE_QUEUE_ENVS
]
CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT) CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT)
############# NON-SECURE ENV CONFIG ############################## ############# NON-SECURE ENV CONFIG ##############################
...@@ -365,10 +359,17 @@ BROKER_URL = "{0}://{1}:{2}@{3}/{4}".format(CELERY_BROKER_TRANSPORT, ...@@ -365,10 +359,17 @@ BROKER_URL = "{0}://{1}:{2}@{3}/{4}".format(CELERY_BROKER_TRANSPORT,
CELERY_BROKER_HOSTNAME, CELERY_BROKER_HOSTNAME,
CELERY_BROKER_VHOST) CELERY_BROKER_VHOST)
# Allow CELERY_QUEUES to be overwritten before adding alternates # Allow CELERY_QUEUES to be overwritten by ENV_TOKENS,
ENV_CELERY_QUEUES = ENV_TOKENS.get('CELERY_QUEUES', None) ENV_CELERY_QUEUES = ENV_TOKENS.get('CELERY_QUEUES', None)
if ENV_CELERY_QUEUES: if ENV_CELERY_QUEUES:
CELERY_QUEUES = {queue: {} for queue in ENV_CELERY_QUEUES} CELERY_QUEUES = {queue: {} for queue in ENV_CELERY_QUEUES}
# Then add alternate environment queues
ALTERNATE_QUEUE_ENVS = ENV_TOKENS.get('ALTERNATE_WORKER_QUEUES', '').split()
ALTERNATE_QUEUES = [
DEFAULT_PRIORITY_QUEUE.replace(QUEUE_VARIANT, alternate + '.')
for alternate in ALTERNATE_QUEUE_ENVS
]
CELERY_QUEUES.update( CELERY_QUEUES.update(
{ {
alternate: {} alternate: {}
......
...@@ -98,12 +98,6 @@ CELERY_QUEUES = { ...@@ -98,12 +98,6 @@ CELERY_QUEUES = {
HIGH_MEM_QUEUE: {}, HIGH_MEM_QUEUE: {},
} }
# Setup alternate queues, to allow access to cross-process workers
ALTERNATE_QUEUE_ENVS = os.environ.get('ALTERNATE_WORKER_QUEUES', '').split()
ALTERNATE_QUEUES = [
DEFAULT_PRIORITY_QUEUE.replace(QUEUE_VARIANT, alternate + '.')
for alternate in ALTERNATE_QUEUE_ENVS
]
CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT) CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT)
# If we're a worker on the high_mem queue, set ourselves to die after processing # If we're a worker on the high_mem queue, set ourselves to die after processing
...@@ -268,10 +262,17 @@ BULK_EMAIL_ROUTING_KEY_SMALL_JOBS = ENV_TOKENS.get('BULK_EMAIL_ROUTING_KEY_SMALL ...@@ -268,10 +262,17 @@ BULK_EMAIL_ROUTING_KEY_SMALL_JOBS = ENV_TOKENS.get('BULK_EMAIL_ROUTING_KEY_SMALL
# Queue to use for updating persistent grades # Queue to use for updating persistent grades
RECALCULATE_GRADES_ROUTING_KEY = ENV_TOKENS.get('RECALCULATE_GRADES_ROUTING_KEY', LOW_PRIORITY_QUEUE) RECALCULATE_GRADES_ROUTING_KEY = ENV_TOKENS.get('RECALCULATE_GRADES_ROUTING_KEY', LOW_PRIORITY_QUEUE)
# Allow CELERY_QUEUES to be overwritten before adding alternates # Allow CELERY_QUEUES to be overwritten by ENV_TOKENS,
ENV_CELERY_QUEUES = ENV_TOKENS.get('CELERY_QUEUES', None) ENV_CELERY_QUEUES = ENV_TOKENS.get('CELERY_QUEUES', None)
if ENV_CELERY_QUEUES: if ENV_CELERY_QUEUES:
CELERY_QUEUES = {queue: {} for queue in ENV_CELERY_QUEUES} CELERY_QUEUES = {queue: {} for queue in ENV_CELERY_QUEUES}
# Then add alternate environment queues
ALTERNATE_QUEUE_ENVS = ENV_TOKENS.get('ALTERNATE_WORKER_QUEUES', '').split()
ALTERNATE_QUEUES = [
DEFAULT_PRIORITY_QUEUE.replace(QUEUE_VARIANT, alternate + '.')
for alternate in ALTERNATE_QUEUE_ENVS
]
CELERY_QUEUES.update( CELERY_QUEUES.update(
{ {
alternate: {} alternate: {}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment