Commit df7c2715 by J. Cliff Dyer Committed by J. Cliff Dyer

Create infrastructure for reapplying tasks

and cleaning up tasks that have been resolved.

TNL-6252
parent 7d7a1982
......@@ -11,7 +11,7 @@ from logging import getLogger
from courseware.model_data import get_score
from lms.djangoapps.course_blocks.api import get_course_blocks
from openedx.core.djangoapps.celery_utils.task import PersistOnFailureTask
from openedx.core.djangoapps.celery_utils.persist_on_failure import PersistOnFailureTask
from opaque_keys.edx.keys import UsageKey
from opaque_keys.edx.locator import CourseLocator
from submissions import api as sub_api
......@@ -19,8 +19,6 @@ from student.models import anonymous_id_for_user
from track.event_transaction_utils import (
set_event_transaction_type,
set_event_transaction_id,
get_event_transaction_type,
get_event_transaction_id
)
from util.date_utils import from_timestamp
from xmodule.modulestore.django import modulestore
......
"""
Reset persistent grades for learners.
"""
from datetime import timedelta
import logging
from textwrap import dedent
from django.core.management.base import BaseCommand
from django.utils.timezone import now
from ...models import FailedTask
log = logging.getLogger(__name__)
class Command(BaseCommand):
"""
Delete records of FailedTasks that have been resolved
"""
help = dedent(__doc__).strip()
def add_arguments(self, parser):
"""
Add arguments to the command parser.
Uses argparse syntax. See documentation at
https://docs.python.org/3/library/argparse.html.
"""
parser.add_argument(
'--dry-run',
action='store_true',
default=False,
help="Output what we're going to do, but don't actually do it."
)
parser.add_argument(
'--task-name', '-t',
default=None,
help=u"Restrict cleanup to tasks matching the given task-name.",
)
parser.add_argument(
'--age', '-a',
type=int,
default=30,
help=u"Only delete tasks that have been resolved for at least the specified number of days",
)
def handle(self, *args, **options):
tasks = FailedTask.objects.filter(datetime_resolved__lt=now() - timedelta(days=options['age']))
if options['task_name'] is not None:
tasks = tasks.filter(task_name=options['task_name'])
log.info(u'Cleaning up {} tasks'.format(tasks.count()))
if options['dry_run']:
log.info(u"Tasks to clean up:\n{}".format(
u'\n '.join(u'{!r}, resolved {}'.format(task, task.datetime_resolved) for task in tasks)
))
else:
tasks.delete()
"""
Reset persistent grades for learners.
"""
import logging
from textwrap import dedent
from django.core.management.base import BaseCommand
from ...models import FailedTask
log = logging.getLogger(__name__)
class Command(BaseCommand):
"""
Reapply tasks that failed previously.
"""
help = dedent(__doc__).strip()
def add_arguments(self, parser):
"""
Add arguments to the command parser.
Uses argparse syntax. See documentation at
https://docs.python.org/3/library/argparse.html.
"""
parser.add_argument(
'--task-name', '-t',
action='store',
default=None,
help=u"Restrict reapplied tasks to those matching the given task-name."
)
def handle(self, *args, **options):
tasks = FailedTask.objects.filter(datetime_resolved=None)
if options['task_name'] is not None:
tasks = tasks.filter(task_name=options['task_name'])
log.info(u'Reapplying {} tasks'.format(tasks.count()))
log.debug(u'Reapplied tasks: {}'.format(list(tasks)))
seen_tasks = set()
for task in tasks:
if task.task_id in seen_tasks:
continue
seen_tasks.add(task.task_id)
task.reapply()
"""
Test management command to cleanup resolved tasks.
"""
from datetime import timedelta
import ddt
from django.test import TestCase
from django.core.management import call_command
from django.utils.timezone import now
from openedx.core.djangolib.testing.utils import skip_unless_lms
from .... import models
DAY = timedelta(days=1)
MONTH_AGO = now() - (30 * DAY)
@ddt.ddt
@skip_unless_lms
class TestCleanupResolvedTasksCommand(TestCase):
"""
Test cleanup_resolved_tasks management command.
"""
def setUp(self):
self.failed_tasks = [
models.FailedTask.objects.create(
task_name=u'task',
datetime_resolved=MONTH_AGO - DAY,
task_id=u'old',
),
models.FailedTask.objects.create(
task_name=u'task',
datetime_resolved=MONTH_AGO + DAY,
task_id=u'new',
),
models.FailedTask.objects.create(
task_name=u'task',
datetime_resolved=None,
task_id=u'unresolved',
),
models.FailedTask.objects.create(
task_name=u'other',
datetime_resolved=MONTH_AGO - DAY,
task_id=u'other',
),
]
super(TestCleanupResolvedTasksCommand, self).setUp()
@ddt.data(
([], {u'new', u'unresolved'}),
([u'--task-name=task'], {u'new', u'unresolved', u'other'}),
([u'--age=0'], {u'unresolved'}),
([u'--age=0', u'--task-name=task'], {u'unresolved', u'other'}),
([u'--dry-run'], {u'old', u'new', u'unresolved', u'other'}),
)
@ddt.unpack
def test_call_command(self, args, remaining_task_ids):
call_command(u'cleanup_resolved_tasks', *args)
results = set(models.FailedTask.objects.values_list('task_id', flat=True))
self.assertEqual(remaining_task_ids, results)
"""
Test management command to reapply failed tasks.
"""
from collections import Counter
from datetime import datetime
import celery
from django.test import TestCase
from django.core.management import call_command
import mock
from openedx.core.djangolib.testing.utils import skip_unless_lms
from .... import models, persist_on_failure
@skip_unless_lms
class TestReapplyTaskCommand(TestCase):
"""
Test reapply_task management command.
"""
fallible_task_name = (
u'openedx.core.djangoapps.celery_utils.management.commands.tests.test_reapply_tasks.fallible_task'
)
passing_task_name = u'openedx.core.djangoapps.celery_utils.management.commands.tests.test_reapply_tasks.other_task'
@classmethod
def setUpClass(cls):
@celery.task(base=persist_on_failure.PersistOnFailureTask, name=cls.fallible_task_name)
def fallible_task(error_message=None):
"""
Simple task to let us test retry functionality.
"""
if error_message:
raise ValueError(error_message)
cls.fallible_task = fallible_task
@celery.task(base=persist_on_failure.PersistOnFailureTask, name=cls.passing_task_name)
def passing_task():
"""
This task always passes
"""
return 5
cls.passing_task = passing_task
super(TestReapplyTaskCommand, cls).setUpClass()
def setUp(self):
self.failed_tasks = [
models.FailedTask.objects.create(
task_name=self.fallible_task_name,
task_id=u'fail_again',
args=[],
kwargs={"error_message": "Err, yo!"},
exc=u'UhOhError().',
),
models.FailedTask.objects.create(
task_name=self.fallible_task_name,
task_id=u'will_succeed',
args=[],
kwargs={},
exc=u'NetworkErrorMaybe?()',
),
models.FailedTask.objects.create(
task_name=self.passing_task_name,
task_id=u'other_task',
args=[],
kwargs={},
exc=u'RaceCondition()',
),
]
super(TestReapplyTaskCommand, self).setUp()
def _assert_resolved(self, task_object):
"""
Raises an assertion error if the task failed to complete successfully
and record its resolution in the failedtask record.
"""
self.assertIsInstance(task_object.datetime_resolved, datetime)
def _assert_unresolved(self, task_object):
"""
Raises an assertion error if the task completed successfully.
The resolved_datetime will still be None.
"""
self.assertIsNone(task_object.datetime_resolved)
def test_call_command(self):
call_command(u'reapply_tasks')
self._assert_unresolved(models.FailedTask.objects.get(task_id=u'fail_again'))
self._assert_resolved(models.FailedTask.objects.get(task_id=u'will_succeed'))
self._assert_resolved(models.FailedTask.objects.get(task_id=u'other_task'))
def test_call_command_with_specified_task(self):
call_command(u'reapply_tasks', u'--task-name={}'.format(self.fallible_task_name))
self._assert_unresolved(models.FailedTask.objects.get(task_id=u'fail_again'))
self._assert_resolved(models.FailedTask.objects.get(task_id=u'will_succeed'))
self._assert_unresolved(models.FailedTask.objects.get(task_id=u'other_task'))
def test_duplicate_tasks(self):
models.FailedTask.objects.create(
task_name=self.fallible_task_name,
task_id=u'will_succeed',
args=[],
kwargs={},
exc=u'AlsoThisOtherError()',
)
# Verify that only one task got run for this task_id.
with mock.patch.object(self.fallible_task, u'apply_async', wraps=self.fallible_task.apply_async) as mock_apply:
call_command(u'reapply_tasks')
task_id_counts = Counter(call[2][u'task_id'] for call in mock_apply.mock_calls)
self.assertEqual(task_id_counts[u'will_succeed'], 1)
# Verify that both tasks matching that task_id are resolved.
will_succeed_tasks = models.FailedTask.objects.filter(task_id=u'will_succeed').all()
self.assertEqual(len(will_succeed_tasks), 2)
for task_object in will_succeed_tasks:
self._assert_resolved(task_object)
......@@ -2,10 +2,17 @@
Models to support persistent tasks.
"""
import logging
from celery import current_app
from django.db import models
from jsonfield import JSONField
from model_utils.models import TimeStampedModel
from . import tasks
log = logging.getLogger(__name__)
class FailedTask(TimeStampedModel):
"""
......@@ -23,6 +30,21 @@ class FailedTask(TimeStampedModel):
(u'task_name', u'exc'),
]
def reapply(self):
"""
Enqueue new celery task with the same arguments as the failed task.
"""
if self.datetime_resolved is not None:
raise TypeError(u'Cannot reapply a resolved task: {}'.format(self))
log.info(u'Reapplying failed task: {}'.format(self))
original_task = current_app.tasks[self.task_name]
original_task.apply_async(
self.args,
self.kwargs,
task_id=self.task_id,
link=tasks.mark_resolved.si(self.task_id)
)
def __unicode__(self):
return u'FailedTask: {task_name}, args={args}, kwargs={kwargs} ({resolution})'.format(
task_name=self.task_name,
......
......@@ -7,8 +7,7 @@ from celery import Task
from .models import FailedTask
# pylint: disable=abstract-method
class PersistOnFailureTask(Task):
class PersistOnFailureTask(Task): # pylint: disable=abstract-method
"""
Custom Celery Task base class that persists task data on failure.
"""
......@@ -17,13 +16,14 @@ class PersistOnFailureTask(Task):
"""
If the task fails, persist a record of the task.
"""
FailedTask.objects.create(
task_name=_truncate_to_field(FailedTask, 'task_name', self.name),
task_id=task_id, # Fixed length UUID: No need to truncate
args=args,
kwargs=kwargs,
exc=_truncate_to_field(FailedTask, 'exc', repr(exc)),
)
if not FailedTask.objects.filter(task_id=task_id, datetime_resolved=None).exists():
FailedTask.objects.create(
task_name=_truncate_to_field(FailedTask, 'task_name', self.name),
task_id=task_id, # Fixed length UUID: No need to truncate
args=args,
kwargs=kwargs,
exc=_truncate_to_field(FailedTask, 'exc', repr(exc)),
)
super(PersistOnFailureTask, self).on_failure(exc, task_id, args, kwargs, einfo)
......
"""
Celery tasks that support the utils in this module.
"""
from celery import task
from django.utils.timezone import now
@task
def mark_resolved(task_id):
"""
Given a task_id, mark all records of that task as resolved in the
FailedTask table
"""
from . import models # Imported inside the task to resolve circular imports.
models.FailedTask.objects.filter(task_id=task_id, datetime_resolved=None).update(datetime_resolved=now())
......@@ -10,7 +10,7 @@ import six
from openedx.core.djangolib.testing.utils import skip_unless_lms
from ..models import FailedTask
from ..task import PersistOnFailureTask
from ..persist_on_failure import PersistOnFailureTask
@skip_unless_lms
......@@ -47,7 +47,7 @@ class PersistOnFailureTaskTestCase(TestCase):
# Assert that we get the kind of data we expect
self.assertEqual(
failed_task_object.task_name,
u'openedx.core.djangoapps.celery_utils.tests.test_task.exampletask'
u'openedx.core.djangoapps.celery_utils.tests.test_persist_on_failure.exampletask'
)
self.assertEqual(failed_task_object.args, [])
self.assertEqual(failed_task_object.kwargs, {u'message': u'The example task failed'})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment