Commit 881970a2 by Cliff Dyer Committed by GitHub

Merge pull request #14458 from edx/beryl/celeryutils

Use celeryutils in edx-platform
parents 4926a6b2 be8a898e
......@@ -9,11 +9,12 @@ from django.contrib.auth.models import User
from django.core.exceptions import ValidationError
from django.db.utils import DatabaseError
from logging import getLogger
import newrelic.agent
from celery_utils.logged_task import LoggedTask
from celery_utils.persist_on_failure import PersistOnFailureTask
from courseware.model_data import get_score
from lms.djangoapps.course_blocks.api import get_course_blocks
import newrelic.agent
from openedx.core.djangoapps.celery_utils.persist_on_failure import PersistOnFailureTask
from opaque_keys.edx.keys import UsageKey
from opaque_keys.edx.locator import CourseLocator
from submissions import api as sub_api
......@@ -36,7 +37,14 @@ KNOWN_RETRY_ERRORS = (DatabaseError, ValidationError) # Errors we expect occasi
RECALCULATE_GRADE_DELAY = 2 # in seconds, to prevent excessive _has_db_updated failures. See TNL-6424.
@task(bind=True, base=PersistOnFailureTask, default_retry_delay=30, routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY)
class _BaseTask(PersistOnFailureTask, LoggedTask): # pylint: disable=abstract-method
"""
Include persistence features, as well as logging of task invocation.
"""
abstract = True
@task(bind=True, base=_BaseTask, default_retry_delay=30, routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY)
def recalculate_subsection_grade_v3(self, **kwargs):
"""
Latest version of the recalculate_subsection_grade task. See docstring
......
......@@ -2161,7 +2161,7 @@ INSTALLED_APPS = (
# Customized celery tasks, including persisting failed tasks so they can
# be retried
'openedx.core.djangoapps.celery_utils',
'celery_utils',
# Ability to detect and special-case crawler behavior
'openedx.core.djangoapps.crawlers',
......
"""
Reset persistent grades for learners.
"""
from datetime import timedelta
import logging
from textwrap import dedent
from django.core.management.base import BaseCommand
from django.utils.timezone import now
from ...models import FailedTask
log = logging.getLogger(__name__)
class Command(BaseCommand):
"""
Delete records of FailedTasks that have been resolved
"""
help = dedent(__doc__).strip()
def add_arguments(self, parser):
"""
Add arguments to the command parser.
Uses argparse syntax. See documentation at
https://docs.python.org/3/library/argparse.html.
"""
parser.add_argument(
'--dry-run',
action='store_true',
default=False,
help="Output what we're going to do, but don't actually do it."
)
parser.add_argument(
'--task-name', '-t',
default=None,
help=u"Restrict cleanup to tasks matching the given task-name.",
)
parser.add_argument(
'--age', '-a',
type=int,
default=30,
help=u"Only delete tasks that have been resolved for at least the specified number of days",
)
def handle(self, *args, **options):
tasks = FailedTask.objects.filter(datetime_resolved__lt=now() - timedelta(days=options['age']))
if options['task_name'] is not None:
tasks = tasks.filter(task_name=options['task_name'])
log.info(u'Cleaning up {} tasks'.format(tasks.count()))
if options['dry_run']:
log.info(u"Tasks to clean up:\n{}".format(
u'\n '.join(u'{!r}, resolved {}'.format(task, task.datetime_resolved) for task in tasks)
))
else:
tasks.delete()
"""
Reset persistent grades for learners.
"""
import logging
from textwrap import dedent
from django.core.management.base import BaseCommand
from ...models import FailedTask
log = logging.getLogger(__name__)
class Command(BaseCommand):
"""
Reapply tasks that failed previously.
"""
help = dedent(__doc__).strip()
def add_arguments(self, parser):
"""
Add arguments to the command parser.
Uses argparse syntax. See documentation at
https://docs.python.org/3/library/argparse.html.
"""
parser.add_argument(
'--task-name', '-t',
action='store',
default=None,
help=u"Restrict reapplied tasks to those matching the given task-name."
)
def handle(self, *args, **options):
tasks = FailedTask.objects.filter(datetime_resolved=None)
if options['task_name'] is not None:
tasks = tasks.filter(task_name=options['task_name'])
log.info(u'Reapplying {} tasks'.format(tasks.count()))
log.debug(u'Reapplied tasks: {}'.format(list(tasks)))
seen_tasks = set()
for task in tasks:
if task.task_id in seen_tasks:
continue
seen_tasks.add(task.task_id)
task.reapply()
"""
Test management command to cleanup resolved tasks.
"""
from datetime import timedelta
import ddt
from django.test import TestCase
from django.core.management import call_command
from django.utils.timezone import now
from openedx.core.djangolib.testing.utils import skip_unless_lms
from .... import models
DAY = timedelta(days=1)
MONTH_AGO = now() - (30 * DAY)
@ddt.ddt
@skip_unless_lms
class TestCleanupResolvedTasksCommand(TestCase):
"""
Test cleanup_resolved_tasks management command.
"""
def setUp(self):
self.failed_tasks = [
models.FailedTask.objects.create(
task_name=u'task',
datetime_resolved=MONTH_AGO - DAY,
task_id=u'old',
),
models.FailedTask.objects.create(
task_name=u'task',
datetime_resolved=MONTH_AGO + DAY,
task_id=u'new',
),
models.FailedTask.objects.create(
task_name=u'task',
datetime_resolved=None,
task_id=u'unresolved',
),
models.FailedTask.objects.create(
task_name=u'other',
datetime_resolved=MONTH_AGO - DAY,
task_id=u'other',
),
]
super(TestCleanupResolvedTasksCommand, self).setUp()
@ddt.data(
([], {u'new', u'unresolved'}),
([u'--task-name=task'], {u'new', u'unresolved', u'other'}),
([u'--age=0'], {u'unresolved'}),
([u'--age=0', u'--task-name=task'], {u'unresolved', u'other'}),
([u'--dry-run'], {u'old', u'new', u'unresolved', u'other'}),
)
@ddt.unpack
def test_call_command(self, args, remaining_task_ids):
call_command(u'cleanup_resolved_tasks', *args)
results = set(models.FailedTask.objects.values_list('task_id', flat=True))
self.assertEqual(remaining_task_ids, results)
"""
Test management command to reapply failed tasks.
"""
from collections import Counter
from datetime import datetime
import celery
from django.test import TestCase
from django.core.management import call_command
import mock
from openedx.core.djangolib.testing.utils import skip_unless_lms
from .... import models, persist_on_failure
@skip_unless_lms
class TestReapplyTaskCommand(TestCase):
"""
Test reapply_task management command.
"""
fallible_task_name = (
u'openedx.core.djangoapps.celery_utils.management.commands.tests.test_reapply_tasks.fallible_task'
)
passing_task_name = u'openedx.core.djangoapps.celery_utils.management.commands.tests.test_reapply_tasks.other_task'
@classmethod
def setUpClass(cls):
@celery.task(base=persist_on_failure.PersistOnFailureTask, name=cls.fallible_task_name)
def fallible_task(error_message=None):
"""
Simple task to let us test retry functionality.
"""
if error_message:
raise ValueError(error_message)
cls.fallible_task = fallible_task
@celery.task(base=persist_on_failure.PersistOnFailureTask, name=cls.passing_task_name)
def passing_task():
"""
This task always passes
"""
return 5
cls.passing_task = passing_task
super(TestReapplyTaskCommand, cls).setUpClass()
def setUp(self):
self.failed_tasks = [
models.FailedTask.objects.create(
task_name=self.fallible_task_name,
task_id=u'fail_again',
args=[],
kwargs={"error_message": "Err, yo!"},
exc=u'UhOhError().',
),
models.FailedTask.objects.create(
task_name=self.fallible_task_name,
task_id=u'will_succeed',
args=[],
kwargs={},
exc=u'NetworkErrorMaybe?()',
),
models.FailedTask.objects.create(
task_name=self.passing_task_name,
task_id=u'other_task',
args=[],
kwargs={},
exc=u'RaceCondition()',
),
]
super(TestReapplyTaskCommand, self).setUp()
def _assert_resolved(self, task_object):
"""
Raises an assertion error if the task failed to complete successfully
and record its resolution in the failedtask record.
"""
self.assertIsInstance(task_object.datetime_resolved, datetime)
def _assert_unresolved(self, task_object):
"""
Raises an assertion error if the task completed successfully.
The resolved_datetime will still be None.
"""
self.assertIsNone(task_object.datetime_resolved)
def test_call_command(self):
call_command(u'reapply_tasks')
self._assert_unresolved(models.FailedTask.objects.get(task_id=u'fail_again'))
self._assert_resolved(models.FailedTask.objects.get(task_id=u'will_succeed'))
self._assert_resolved(models.FailedTask.objects.get(task_id=u'other_task'))
def test_call_command_with_specified_task(self):
call_command(u'reapply_tasks', u'--task-name={}'.format(self.fallible_task_name))
self._assert_unresolved(models.FailedTask.objects.get(task_id=u'fail_again'))
self._assert_resolved(models.FailedTask.objects.get(task_id=u'will_succeed'))
self._assert_unresolved(models.FailedTask.objects.get(task_id=u'other_task'))
def test_duplicate_tasks(self):
models.FailedTask.objects.create(
task_name=self.fallible_task_name,
task_id=u'will_succeed',
args=[],
kwargs={},
exc=u'AlsoThisOtherError()',
)
# Verify that only one task got run for this task_id.
with mock.patch.object(self.fallible_task, u'apply_async', wraps=self.fallible_task.apply_async) as mock_apply:
call_command(u'reapply_tasks')
task_id_counts = Counter(call[2][u'task_id'] for call in mock_apply.mock_calls)
self.assertEqual(task_id_counts[u'will_succeed'], 1)
# Verify that both tasks matching that task_id are resolved.
will_succeed_tasks = models.FailedTask.objects.filter(task_id=u'will_succeed').all()
self.assertEqual(len(will_succeed_tasks), 2)
for task_object in will_succeed_tasks:
self._assert_resolved(task_object)
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import django.utils.timezone
import jsonfield.fields
import model_utils.fields
class Migration(migrations.Migration):
dependencies = [
]
operations = [
migrations.CreateModel(
name='FailedTask',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, verbose_name='created', editable=False)),
('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, verbose_name='modified', editable=False)),
('task_name', models.CharField(max_length=255)),
('task_id', models.CharField(max_length=255, db_index=True)),
('args', jsonfield.fields.JSONField(blank=True)),
('kwargs', jsonfield.fields.JSONField(blank=True)),
('exc', models.CharField(max_length=255)),
('datetime_resolved', models.DateTimeField(default=None, null=True, db_index=True, blank=True)),
],
),
migrations.AlterIndexTogether(
name='failedtask',
index_together=set([('task_name', 'exc')]),
),
]
"""
Models to support persistent tasks.
"""
import logging
from celery import current_app
from django.db import models
from jsonfield import JSONField
from model_utils.models import TimeStampedModel
from . import tasks
log = logging.getLogger(__name__)
class FailedTask(TimeStampedModel):
"""
Representation of tasks that have failed
"""
task_name = models.CharField(max_length=255)
task_id = models.CharField(max_length=255, db_index=True)
args = JSONField(blank=True)
kwargs = JSONField(blank=True)
exc = models.CharField(max_length=255)
datetime_resolved = models.DateTimeField(blank=True, null=True, default=None, db_index=True)
class Meta(object):
index_together = [
(u'task_name', u'exc'),
]
def reapply(self):
"""
Enqueue new celery task with the same arguments as the failed task.
"""
if self.datetime_resolved is not None:
raise TypeError(u'Cannot reapply a resolved task: {}'.format(self))
log.info(u'Reapplying failed task: {}'.format(self))
original_task = current_app.tasks[self.task_name]
original_task.apply_async(
self.args,
self.kwargs,
task_id=self.task_id,
link=tasks.mark_resolved.si(self.task_id)
)
def __unicode__(self):
return u'FailedTask: {task_name}, args={args}, kwargs={kwargs} ({resolution})'.format(
task_name=self.task_name,
args=self.args,
kwargs=self.kwargs,
resolution=u"not resolved" if self.datetime_resolved is None else "resolved"
)
"""
Celery utility code for persistent tasks.
"""
from celery import Task
from .models import FailedTask
class PersistOnFailureTask(Task): # pylint: disable=abstract-method
"""
Custom Celery Task base class that persists task data on failure.
"""
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""
If the task fails, persist a record of the task.
"""
if not FailedTask.objects.filter(task_id=task_id, datetime_resolved=None).exists():
FailedTask.objects.create(
task_name=_truncate_to_field(FailedTask, 'task_name', self.name),
task_id=task_id, # Fixed length UUID: No need to truncate
args=args,
kwargs=kwargs,
exc=_truncate_to_field(FailedTask, 'exc', repr(exc)),
)
super(PersistOnFailureTask, self).on_failure(exc, task_id, args, kwargs, einfo)
def _truncate_to_field(model, field_name, value):
"""
If data is too big for the field, it would cause a failure to
insert, so we shorten it, truncating in the middle (because
valuable information often shows up at the end.
"""
field = model._meta.get_field(field_name) # pylint: disable=protected-access
if len(value) > field.max_length:
midpoint = field.max_length // 2
len_after_midpoint = field.max_length - midpoint
first = value[:midpoint]
sep = u'...'
last = value[len(value) - len_after_midpoint + len(sep):]
value = sep.join([first, last])
return value
"""
Celery tasks that support the utils in this module.
"""
from celery import task
from django.utils.timezone import now
@task
def mark_resolved(task_id):
"""
Given a task_id, mark all records of that task as resolved in the
FailedTask table
"""
from . import models # Imported inside the task to resolve circular imports.
models.FailedTask.objects.filter(task_id=task_id, datetime_resolved=None).update(datetime_resolved=now())
u"""
Testing persistent tasks
"""
from __future__ import print_function
from celery import task
from django.test import TestCase
import six
from openedx.core.djangolib.testing.utils import skip_unless_lms
from ..models import FailedTask
from ..persist_on_failure import PersistOnFailureTask
@skip_unless_lms
class PersistOnFailureTaskTestCase(TestCase):
"""
Test that persistent tasks save the appropriate values when needed.
"""
@classmethod
def setUpClass(cls):
@task(base=PersistOnFailureTask)
def exampletask(message=None):
u"""
A simple task for testing persistence
"""
if message:
raise ValueError(message)
return
cls.exampletask = exampletask
super(PersistOnFailureTaskTestCase, cls).setUpClass()
def test_exampletask_without_failure(self):
result = self.exampletask.delay()
result.wait()
self.assertEqual(result.status, u'SUCCESS')
self.assertFalse(FailedTask.objects.exists())
def test_exampletask_with_failure(self):
result = self.exampletask.delay(message=u'The example task failed')
with self.assertRaises(ValueError):
result.wait()
self.assertEqual(result.status, u'FAILURE')
failed_task_object = FailedTask.objects.get()
# Assert that we get the kind of data we expect
self.assertEqual(
failed_task_object.task_name,
u'openedx.core.djangoapps.celery_utils.tests.test_persist_on_failure.exampletask'
)
self.assertEqual(failed_task_object.args, [])
self.assertEqual(failed_task_object.kwargs, {u'message': u'The example task failed'})
self.assertEqual(failed_task_object.exc, u"ValueError(u'The example task failed',)")
self.assertIsNone(failed_task_object.datetime_resolved)
def test_persists_when_called_with_wrong_args(self):
result = self.exampletask.delay(15, u'2001-03-04', err=True)
with self.assertRaises(TypeError):
result.wait()
self.assertEqual(result.status, u'FAILURE')
failed_task_object = FailedTask.objects.get()
self.assertEqual(failed_task_object.args, [15, u'2001-03-04'])
self.assertEqual(failed_task_object.kwargs, {u'err': True})
def test_persists_with_overlength_field(self):
overlong_message = u''.join(u'%03d' % x for x in six.moves.range(100))
result = self.exampletask.delay(message=overlong_message)
with self.assertRaises(ValueError):
result.wait()
failed_task_object = FailedTask.objects.get()
# Length is max field length
self.assertEqual(len(failed_task_object.exc), 255)
# Ellipses are put in the middle
self.assertEqual(u'037...590', failed_task_object.exc[124:133])
# The beginning of the input is captured
self.assertEqual(failed_task_object.exc[:11], u"ValueError(")
# The end of the input is captured
self.assertEqual(failed_task_object.exc[-9:], u"098099',)")
......@@ -91,6 +91,7 @@ git+https://github.com/edx/xblock-utils.git@v1.0.3#egg=xblock-utils==1.0.3
git+https://github.com/edx/edx-user-state-client.git@1.0.1#egg=edx-user-state-client==1.0.1
git+https://github.com/edx/xblock-lti-consumer.git@v1.1.2#egg=lti_consumer-xblock==1.1.2
git+https://github.com/edx/edx-proctoring.git@0.17.0#egg=edx-proctoring==0.17.0
git+https://github.com/edx/edx-celeryutils.git@v0.1.0#egg=edx-celeryutils==0.1.0
# Third Party XBlocks
-e git+https://github.com/mitodl/edx-sga@172a90fd2738f8142c10478356b2d9ed3e55334a#egg=edx-sga
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment