tasks.py 33.1 KB
Newer Older
1 2 3 4
"""
This module contains celery task functions for handling the sending of bulk email
to a course.
"""
5
import re
6
import random
7
import json
8 9
from time import sleep

10
from dogapi import dog_stats_api
11 12
from smtplib import SMTPServerDisconnected, SMTPDataError, SMTPConnectError, SMTPException
from boto.ses.exceptions import (
13 14 15 16
    SESAddressNotVerifiedError,
    SESIdentityNotVerifiedError,
    SESDomainNotConfirmedError,
    SESAddressBlacklistedError,
17 18
    SESDailyQuotaExceededError,
    SESMaxSendingRateExceededError,
19
    SESDomainEndsWithDotError,
20
    SESLocalAddressCharacterError,
21
    SESIllegalAddressError,
22
)
23
from boto.exception import AWSConnectionError
24

25
from celery import task, current_task
26
from celery.utils.log import get_task_logger
27 28
from celery.states import SUCCESS, FAILURE, RETRY
from celery.exceptions import RetryTaskError
29

30
from django.conf import settings
31
from django.contrib.auth.models import User
32
from django.core.mail import EmailMultiAlternatives, get_connection
33
from django.core.urlresolvers import reverse
34

35 36
from bulk_email.models import (
    CourseEmail, Optout, CourseEmailTemplate,
37
    SEND_TO_MYSELF, SEND_TO_ALL, TO_OPTIONS,
38
)
39
from courseware.courses import get_course, course_image_url
40
from student.roles import CourseStaffRole, CourseInstructorRole
41 42
from instructor_task.models import InstructorTask
from instructor_task.subtasks import (
43
    SubtaskStatus,
44
    queue_subtasks_for_query,
45
    check_subtask_is_valid,
46
    update_subtask_status,
47
)
48
from util.query import use_read_replica_if_available
49

50
log = get_task_logger(__name__)
51 52


53 54
# Errors that an individual email is failing to be sent, and should just
# be treated as a fail.
55 56 57 58 59 60
SINGLE_EMAIL_FAILURE_ERRORS = (
    SESAddressBlacklistedError,  # Recipient's email address has been temporarily blacklisted.
    SESDomainEndsWithDotError,  # Recipient's email address' domain ends with a period/dot.
    SESIllegalAddressError,  # Raised when an illegal address is encountered.
    SESLocalAddressCharacterError,  # An address contained a control or whitespace character.
)
61

62 63
# Exceptions that, if caught, should cause the task to be re-tried.
# These errors will be caught a limited number of times before the task fails.
64 65 66 67 68
LIMITED_RETRY_ERRORS = (
    SMTPConnectError,
    SMTPServerDisconnected,
    AWSConnectionError,
)
69

70 71 72 73
# Errors that indicate that a mailing task should be retried without limit.
# An example is if email is being sent too quickly, but may succeed if sent
# more slowly.  When caught by a task, it triggers an exponential backoff and retry.
# Retries happen continuously until the email is sent.
74 75 76
# Note that the SMTPDataErrors here are only those within the 4xx range.
# Those not in this range (i.e. in the 5xx range) are treated as hard failures
# and thus like SINGLE_EMAIL_FAILURE_ERRORS.
77 78 79 80
INFINITE_RETRY_ERRORS = (
    SESMaxSendingRateExceededError,  # Your account's requests/second limit has been exceeded.
    SMTPDataError,
)
81

82 83 84
# Errors that are known to indicate an inability to send any more emails,
# and should therefore not be retried.  For example, exceeding a quota for emails.
# Also, any SMTP errors that are not explicitly enumerated above.
85 86 87 88 89 90 91
BULK_EMAIL_FAILURE_ERRORS = (
    SESAddressNotVerifiedError,  # Raised when a "Reply-To" address has not been validated in SES yet.
    SESIdentityNotVerifiedError,  # Raised when an identity has not been verified in SES yet.
    SESDomainNotConfirmedError,  # Raised when domain ownership is not confirmed for DKIM.
    SESDailyQuotaExceededError,  # 24-hour allotment of outbound email has been exceeded.
    SMTPException,
)
92

93

94
def _get_recipient_queryset(user_id, to_option, course_id, course_location):
95
    """
96
    Returns a query set of email recipients corresponding to the requested to_option category.
97 98

    `to_option` is either SEND_TO_MYSELF, SEND_TO_STAFF, or SEND_TO_ALL.
99 100 101

    Recipients who are in more than one category (e.g. enrolled in the course and are staff or self)
    will be properly deduped.
102
    """
103 104 105 106
    if to_option not in TO_OPTIONS:
        log.error("Unexpected bulk email TO_OPTION found: %s", to_option)
        raise Exception("Unexpected bulk email TO_OPTION found: {0}".format(to_option))

107 108
    if to_option == SEND_TO_MYSELF:
        recipient_qset = User.objects.filter(id=user_id)
109
    else:
110 111
        staff_qset = CourseStaffRole(course_id).users_with_role()
        instructor_qset = CourseInstructorRole(course_id).users_with_role()
112
        recipient_qset = (staff_qset | instructor_qset).distinct()
113
        if to_option == SEND_TO_ALL:
114 115
            # We also require students to have activated their accounts to
            # provide verification that the provided email address is valid.
116
            enrollment_qset = User.objects.filter(
117
                is_active=True,
118 119 120
                courseenrollment__course_id=course_id,
                courseenrollment__is_active=True
            )
121 122 123 124 125 126 127 128 129 130 131
            # Now we do some queryset sidestepping to avoid doing a DISTINCT
            # query across the course staff and the enrolled students, which
            # forces the creation of a temporary table in the db.
            unenrolled_staff_qset = recipient_qset.exclude(
                courseenrollment__course_id=course_id, courseenrollment__is_active=True
            )
            # use read_replica if available:
            unenrolled_staff_qset = use_read_replica_if_available(unenrolled_staff_qset)

            unenrolled_staff_ids = [user.id for user in unenrolled_staff_qset]
            recipient_qset = enrollment_qset | User.objects.filter(id__in=unenrolled_staff_ids)
132

133 134
    # again, use read_replica if available to lighten the load for large queries
    return use_read_replica_if_available(recipient_qset)
135 136


137
def _get_course_email_context(course):
138 139 140
    """
    Returns context arguments to apply to all emails, independent of recipient.
    """
141
    course_id = course.id.to_deprecated_string()
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
    course_title = course.display_name
    course_url = 'https://{}{}'.format(
        settings.SITE_NAME,
        reverse('course_root', kwargs={'course_id': course_id})
    )
    image_url = 'https://{}{}'.format(settings.SITE_NAME, course_image_url(course))
    email_context = {
        'course_title': course_title,
        'course_url': course_url,
        'course_image_url': image_url,
        'account_settings_url': 'https://{}{}'.format(settings.SITE_NAME, reverse('dashboard')),
        'platform_name': settings.PLATFORM_NAME,
    }
    return email_context


def perform_delegate_email_batches(entry_id, course_id, task_input, action_name):
159
    """
160
    Delegates emails by querying for the list of recipients who should
161 162
    get the mail, chopping up into batches of no more than settings.BULK_EMAIL_EMAILS_PER_TASK
    in size, and queueing up worker jobs.
163
    """
164
    entry = InstructorTask.objects.get(pk=entry_id)
165
    # Get inputs to use in this task from the entry.
166
    user_id = entry.requester.id
167
    task_id = entry.task_id
168

169
    # Perfunctory check, since expansion is made for convenience of other task
170 171
    # code that doesn't need the entry_id.
    if course_id != entry.course_id:
172
        format_msg = u"Course id conflict: explicit value %r does not match task value %r"
173
        log.warning(u"Task %s: " + format_msg, task_id, course_id, entry.course_id)
174
        raise ValueError(format_msg % (course_id, entry.course_id))
175

176
    # Fetch the CourseEmail.
177
    email_id = task_input['email_id']
178
    try:
179
        email_obj = CourseEmail.objects.get(id=email_id)
180
    except CourseEmail.DoesNotExist:
181
        # The CourseEmail object should be committed in the view function before the task
182
        # is submitted and reaches this point.
183
        log.warning(u"Task %s: Failed to get CourseEmail with id %s", task_id, email_id)
184
        raise
185

186 187 188 189 190 191 192 193
    # Check to see if email batches have already been defined.  This seems to
    # happen sometimes when there is a loss of connection while a task is being
    # queued.  When this happens, the same task gets called again, and a whole
    # new raft of subtasks gets queued up.  We will assume that if subtasks
    # have already been defined, there is no need to redefine them below.
    # So we just return right away.  We don't raise an exception, because we want
    # the current task to be marked with whatever it had been marked with before.
    if len(entry.subtasks) > 0 and len(entry.task_output) > 0:
194
        log.warning(u"Task %s has already been processed for email %s!  InstructorTask = %s", task_id, email_id, entry)
195 196 197
        progress = json.loads(entry.task_output)
        return progress

198
    # Sanity check that course for email_obj matches that of the task referencing it.
199
    if course_id != email_obj.course_id:
200
        format_msg = u"Course id conflict: explicit value %r does not match email value %r"
201
        log.warning(u"Task %s: " + format_msg, task_id, course_id, email_obj.course_id)
202
        raise ValueError(format_msg % (course_id, email_obj.course_id))
203

204
    # Fetch the course object.
205 206 207
    course = get_course(course_id)

    if course is None:
208
        msg = u"Task %s: course not found: %s"
209 210
        log.error(msg, task_id, course_id)
        raise ValueError(msg % (task_id, course_id))
211

212
    # Get arguments that will be passed to every subtask.
213 214
    to_option = email_obj.to_option
    global_email_context = _get_course_email_context(course)
215

216 217 218
    def _create_send_email_subtask(to_list, initial_subtask_status):
        """Creates a subtask to send email to a given recipient list."""
        subtask_id = initial_subtask_status.task_id
219 220 221 222
        new_subtask = send_course_email.subtask(
            (
                entry_id,
                email_id,
223
                to_list,
224
                global_email_context,
225
                initial_subtask_status.to_dict(),
226 227 228
            ),
            task_id=subtask_id,
            routing_key=settings.BULK_EMAIL_ROUTING_KEY,
229
        )
230 231 232 233
        return new_subtask

    recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location)
    recipient_fields = ['profile__name', 'email']
234

235
    log.info(u"Task %s: Preparing to queue subtasks for sending emails for course %s, email %s, to_option %s",
236 237 238 239 240 241 242 243
             task_id, course_id, email_id, to_option)

    progress = queue_subtasks_for_query(
        entry,
        action_name,
        _create_send_email_subtask,
        recipient_qset,
        recipient_fields,
244
        settings.BULK_EMAIL_EMAILS_PER_TASK,
245
    )
246

247
    # We want to return progress here, as this is what will be stored in the
248
    # AsyncResult for the parent task as its return value.
249
    # The AsyncResult will then be marked as SUCCEEDED, and have this return value as its "result".
250
    # That's okay, for the InstructorTask will have the "real" status, and monitoring code
251
    # should be using that instead.
252
    return progress
253 254


255
@task(default_retry_delay=settings.BULK_EMAIL_DEFAULT_RETRY_DELAY, max_retries=settings.BULK_EMAIL_MAX_RETRIES)  # pylint: disable=E1102
256
def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status_dict):
257
    """
258 259 260 261 262 263 264 265 266
    Sends an email to a list of recipients.

    Inputs are:
      * `entry_id`: id of the InstructorTask object to which progress should be recorded.
      * `email_id`: id of the CourseEmail model that is to be emailed.
      * `to_list`: list of recipients.  Each is represented as a dict with the following keys:
        - 'profile__name': full name of User.
        - 'email': email address of User.
        - 'pk': primary key of User model.
267 268
      * `global_email_context`: dict containing values that are unique for this email but the same
        for all recipients of this email.  This dict is to be used to fill in slots in email
269
        template.  It does not include 'name' and 'email', which will be provided by the to_list.
270
      * `subtask_status_dict` : dict containing values representing current status.  Keys are:
271 272 273 274 275 276 277 278 279 280 281 282 283 284

        'task_id' : id of subtask.  This is used to pass task information across retries.
        'attempted' : number of attempts -- should equal succeeded plus failed
        'succeeded' : number that succeeded in processing
        'skipped' : number that were not processed.
        'failed' : number that failed during processing
        'retried_nomax' : number of times the subtask has been retried for conditions that
            should not have a maximum count applied
        'retried_withmax' : number of times the subtask has been retried for conditions that
            should have a maximum count applied
        'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)

        Most values will be zero on initial call, but may be different when the task is
        invoked as part of a retry.
285 286 287 288

    Sends to all addresses contained in to_list that are not also in the Optout table.
    Emails are sent multi-part, in both plain text and html.  Updates InstructorTask object
    with status information (sends, failures, skips) and updates number of subtasks completed.
289
    """
290 291
    subtask_status = SubtaskStatus.from_dict(subtask_status_dict)
    current_task_id = subtask_status.task_id
292
    num_to_send = len(to_list)
293
    log.info(u"Preparing to send email %s to %d recipients as subtask %s for instructor task %d: context = %s, status=%s",
294
             email_id, num_to_send, current_task_id, entry_id, global_email_context, subtask_status)
295

296 297 298
    # Check that the requested subtask is actually known to the current InstructorTask entry.
    # If this fails, it throws an exception, which should fail this subtask immediately.
    # This can happen when the parent task has been run twice, and results in duplicate
299 300 301 302 303 304
    # subtasks being created for the same InstructorTask entry.  This can happen when Celery
    # loses its connection to its broker, and any current tasks get requeued.
    # We hope to catch this condition in perform_delegate_email_batches() when it's the parent
    # task that is resubmitted, but just in case we fail to do so there, we check here as well.
    # There is also a possibility that this task will be run twice by Celery, for the same reason.
    # To deal with that, we need to confirm that the task has not already been completed.
305
    check_subtask_is_valid(entry_id, current_task_id, subtask_status)
306

307
    send_exception = None
308
    new_subtask_status = None
309 310 311
    try:
        course_title = global_email_context['course_title']
        with dog_stats_api.timer('course_email.single_task.time.overall', tags=[_statsd_tag(course_title)]):
312
            new_subtask_status, send_exception = _send_course_email(
313
                entry_id,
314 315 316
                email_id,
                to_list,
                global_email_context,
317
                subtask_status,
318
            )
319
    except Exception:
320
        # Unexpected exception. Try to write out the failure to the entry before failing.
321
        log.exception("Send-email task %s for email %s: failed unexpectedly!", current_task_id, email_id)
322 323 324
        # We got here for really unexpected reasons.  Since we don't know how far
        # the task got in emailing, we count all recipients as having failed.
        # It at least keeps the counts consistent.
325 326
        subtask_status.increment(failed=num_to_send, state=FAILURE)
        update_subtask_status(entry_id, current_task_id, subtask_status)
327
        raise
328 329 330

    if send_exception is None:
        # Update the InstructorTask object that is storing its progress.
331
        log.info("Send-email task %s for email %s: succeeded", current_task_id, email_id)
332
        update_subtask_status(entry_id, current_task_id, new_subtask_status)
333
    elif isinstance(send_exception, RetryTaskError):
334 335 336 337
        # If retrying, a RetryTaskError needs to be returned to Celery.
        # We assume that the the progress made before the retry condition
        # was encountered has already been updated before the retry call was made,
        # so we only log here.
338
        log.warning("Send-email task %s for email %s: being retried", current_task_id, email_id)
339
        raise send_exception  # pylint: disable=E0702
340
    else:
341
        log.error("Send-email task %s for email %s: failed: %s", current_task_id, email_id, send_exception)
342
        update_subtask_status(entry_id, current_task_id, new_subtask_status)
343
        raise send_exception  # pylint: disable=E0702
344

345 346 347
    # return status in a form that can be serialized by Celery into JSON:
    log.info("Send-email task %s for email %s: returning status %s", current_task_id, email_id, new_subtask_status)
    return new_subtask_status.to_dict()
348

349

350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
def _filter_optouts_from_recipients(to_list, course_id):
    """
    Filters a recipient list based on student opt-outs for a given course.

    Returns the filtered recipient list, as well as the number of optouts
    removed from the list.
    """
    optouts = Optout.objects.filter(
        course_id=course_id,
        user__in=[i['pk'] for i in to_list]
    ).values_list('user__email', flat=True)
    optouts = set(optouts)
    # Only count the num_optout for the first time the optouts are calculated.
    # We assume that the number will not change on retries, and so we don't need
    # to calculate it each time.
    num_optout = len(optouts)
    to_list = [recipient for recipient in to_list if recipient['email'] not in optouts]
    return to_list, num_optout


def _get_source_address(course_id, course_title):
    """
    Calculates an email address to be used as the 'from-address' for sent emails.

    Makes a unique from name and address for each course, e.g.

        "COURSE_TITLE" Course Staff <coursenum-no-reply@courseupdates.edx.org>

    """
    course_title_no_quotes = re.sub(r'"', '', course_title)

381
    # For the email address, get the course.  Then make sure that it can be used
382 383
    # in an email address, by substituting a '_' anywhere a non-(ascii, period, or dash)
    # character appears.
384 385 386 387 388
    from_addr = u'"{0}" Course Staff <{1}-{2}>'.format(
        course_title_no_quotes,
        re.sub(r"[^\w.-]", '_', course_id.course),
        settings.BULK_EMAIL_DEFAULT_FROM_EMAIL
    )
389 390 391
    return from_addr


392
def _send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status):
393 394
    """
    Performs the email sending task.
395

396 397 398 399 400 401 402 403 404
    Sends an email to a list of recipients.

    Inputs are:
      * `entry_id`: id of the InstructorTask object to which progress should be recorded.
      * `email_id`: id of the CourseEmail model that is to be emailed.
      * `to_list`: list of recipients.  Each is represented as a dict with the following keys:
        - 'profile__name': full name of User.
        - 'email': email address of User.
        - 'pk': primary key of User model.
405 406
      * `global_email_context`: dict containing values that are unique for this email but the same
        for all recipients of this email.  This dict is to be used to fill in slots in email
407
        template.  It does not include 'name' and 'email', which will be provided by the to_list.
408
      * `subtask_status` : object of class SubtaskStatus representing current status.
409 410 411 412

    Sends to all addresses contained in to_list that are not also in the Optout table.
    Emails are sent multi-part, in both plain text and html.

413
    Returns a tuple of two values:
414
      * First value is a SubtaskStatus object which represents current progress at the end of this call.
415

416 417 418
      * Second value is an exception returned by the innards of the method, indicating a fatal error.
        In this case, the number of recipients that were not sent have already been added to the
        'failed' count above.
419
    """
420
    # Get information from current task's request:
421
    task_id = subtask_status.task_id
422

423
    try:
424
        course_email = CourseEmail.objects.get(id=email_id)
425 426
    except CourseEmail.DoesNotExist as exc:
        log.exception("Task %s: could not find email id:%s to send.", task_id, email_id)
427
        raise
428

429
    # Exclude optouts (if not a retry):
430 431 432 433 434
    # Note that we don't have to do the optout logic at all if this is a retry,
    # because we have presumably already performed the optout logic on the first
    # attempt.  Anyone on the to_list on a retry has already passed the filter
    # that existed at that time, and we don't need to keep checking for changes
    # in the Optout list.
435
    if subtask_status.get_retry_count() == 0:
436
        to_list, num_optout = _filter_optouts_from_recipients(to_list, course_email.course_id)
437
        subtask_status.increment(skipped=num_optout)
438

439 440
    course_title = global_email_context['course_title']
    subject = "[" + course_title + "] " + course_email.subject
441
    from_addr = _get_source_address(course_email.course_id, course_title)
442

443
    course_email_template = CourseEmailTemplate.get_template()
444 445 446 447
    try:
        connection = get_connection()
        connection.open()

448
        # Define context values to use in all course emails:
449
        email_context = {'name': '', 'email': ''}
450
        email_context.update(global_email_context)
451

452
        while to_list:
453 454 455 456 457 458 459
            # Update context with user-specific values from the user at the end of the list.
            # At the end of processing this user, they will be popped off of the to_list.
            # That way, the to_list will always contain the recipients remaining to be emailed.
            # This is convenient for retries, which will need to send to those who haven't
            # yet been emailed, but not send to those who have already been sent to.
            current_recipient = to_list[-1]
            email = current_recipient['email']
460
            email_context['email'] = email
461
            email_context['name'] = current_recipient['profile__name']
462

463
            # Construct message content using templates and context:
464 465
            plaintext_msg = course_email_template.render_plaintext(course_email.text_message, email_context)
            html_msg = course_email_template.render_htmltext(course_email.html_message, email_context)
466

467
            # Create email:
468 469
            email_msg = EmailMultiAlternatives(
                subject,
470
                plaintext_msg,
471 472 473 474
                from_addr,
                [email],
                connection=connection
            )
475
            email_msg.attach_alternative(html_msg, 'text/html')
476

477 478 479 480 481
            # Throttle if we have gotten the rate limiter.  This is not very high-tech,
            # but if a task has been retried for rate-limiting reasons, then we sleep
            # for a period of time between all emails within this task.  Choice of
            # the value depends on the number of workers that might be sending email in
            # parallel, and what the SES throttle rate is.
482
            if subtask_status.retried_nomax > 0:
483
                sleep(settings.BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS)
484 485

            try:
486
                log.debug('Email with id %s to be sent to %s', email_id, email)
487

488 489
                with dog_stats_api.timer('course_email.single_send.time.overall', tags=[_statsd_tag(course_title)]):
                    connection.send_messages([email_msg])
490

491
            except SMTPDataError as exc:
492
                # According to SMTP spec, we'll retry error codes in the 4xx range.  5xx range indicates hard failure.
493
                if exc.smtp_code >= 400 and exc.smtp_code < 500:
494
                    # This will cause the outer handler to catch the exception and retry the entire task.
495
                    raise exc
496
                else:
497
                    # This will fall through and not retry the message.
498
                    log.warning('Task %s: email with id %s not delivered to %s due to error %s', task_id, email_id, email, exc.smtp_error)
499
                    dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
500
                    subtask_status.increment(failed=1)
501

502
            except SINGLE_EMAIL_FAILURE_ERRORS as exc:
503
                # This will fall through and not retry the message.
504 505
                log.warning('Task %s: email with id %s not delivered to %s due to error %s', task_id, email_id, email, exc)
                dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
506
                subtask_status.increment(failed=1)
507 508 509

            else:
                dog_stats_api.increment('course_email.sent', tags=[_statsd_tag(course_title)])
510 511 512 513
                if settings.BULK_EMAIL_LOG_SENT_EMAILS:
                    log.info('Email with id %s sent to %s', email_id, email)
                else:
                    log.debug('Email with id %s sent to %s', email_id, email)
514
                subtask_status.increment(succeeded=1)
515

516 517 518
            # Pop the user that was emailed off the end of the list only once they have
            # successfully been processed.  (That way, if there were a failure that
            # needed to be retried, the user is still on the list.)
519 520
            to_list.pop()

521
    except INFINITE_RETRY_ERRORS as exc:
522
        dog_stats_api.increment('course_email.infinite_retry', tags=[_statsd_tag(course_title)])
523 524
        # Increment the "retried_nomax" counter, update other counters with progress to date,
        # and set the state to RETRY:
525
        subtask_status.increment(retried_nomax=1, state=RETRY)
526
        return _submit_for_retry(
527
            entry_id, email_id, to_list, global_email_context, exc, subtask_status, skip_retry_max=True
528 529
        )

530
    except LIMITED_RETRY_ERRORS as exc:
531 532 533
        # Errors caught here cause the email to be retried.  The entire task is actually retried
        # without popping the current recipient off of the existing list.
        # Errors caught are those that indicate a temporary condition that might succeed on retry.
534
        dog_stats_api.increment('course_email.limited_retry', tags=[_statsd_tag(course_title)])
535 536
        # Increment the "retried_withmax" counter, update other counters with progress to date,
        # and set the state to RETRY:
537
        subtask_status.increment(retried_withmax=1, state=RETRY)
538
        return _submit_for_retry(
539
            entry_id, email_id, to_list, global_email_context, exc, subtask_status, skip_retry_max=False
540
        )
541

542 543
    except BULK_EMAIL_FAILURE_ERRORS as exc:
        dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
544 545 546
        num_pending = len(to_list)
        log.exception('Task %s: email with id %d caused send_course_email task to fail with "fatal" exception.  %d emails unsent.',
                      task_id, email_id, num_pending)
547 548
        # Update counters with progress to date, counting unsent emails as failures,
        # and set the state to FAILURE:
549 550
        subtask_status.increment(failed=num_pending, state=FAILURE)
        return subtask_status, exc
551 552 553 554 555 556 557 558 559

    except Exception as exc:
        # Errors caught here cause the email to be retried.  The entire task is actually retried
        # without popping the current recipient off of the existing list.
        # These are unexpected errors.  Since they might be due to a temporary condition that might
        # succeed on retry, we give them a retry.
        dog_stats_api.increment('course_email.limited_retry', tags=[_statsd_tag(course_title)])
        log.exception('Task %s: email with id %d caused send_course_email task to fail with unexpected exception.  Generating retry.',
                      task_id, email_id)
560 561
        # Increment the "retried_withmax" counter, update other counters with progress to date,
        # and set the state to RETRY:
562
        subtask_status.increment(retried_withmax=1, state=RETRY)
563
        return _submit_for_retry(
564
            entry_id, email_id, to_list, global_email_context, exc, subtask_status, skip_retry_max=False
565 566
        )

567
    else:
568 569
        # All went well.  Update counters with progress to date,
        # and set the state to SUCCESS:
570
        subtask_status.increment(state=SUCCESS)
571
        # Successful completion is marked by an exception value of None.
572
        return subtask_status, None
573
    finally:
574
        # Clean up at the end.
575
        connection.close()
576 577


578
def _get_current_task():
579 580 581 582 583 584 585 586 587
    """
    Stub to make it easier to test without actually running Celery.

    This is a wrapper around celery.current_task, which provides access
    to the top of the stack of Celery's tasks.  When running tests, however,
    it doesn't seem to work to mock current_task directly, so this wrapper
    is used to provide a hook to mock in tests, while providing the real
    `current_task` in production.
    """
588 589 590
    return current_task


591
def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current_exception, subtask_status, skip_retry_max=False):
592 593 594 595 596
    """
    Helper function to requeue a task for retry, using the new version of arguments provided.

    Inputs are the same as for running a task, plus two extra indicating the state at the time of retry.
    These include the `current_exception` that the task encountered that is causing the retry attempt,
597 598
    and the `subtask_status` that is to be returned.  A third extra argument `skip_retry_max`
    indicates whether the current retry should be subject to a maximum test.
599 600 601 602

    Returns a tuple of two values:
      * First value is a dict which represents current progress.  Keys are:

603 604 605 606 607 608 609 610 611 612
        'task_id' : id of subtask.  This is used to pass task information across retries.
        'attempted' : number of attempts -- should equal succeeded plus failed
        'succeeded' : number that succeeded in processing
        'skipped' : number that were not processed.
        'failed' : number that failed during processing
        'retried_nomax' : number of times the subtask has been retried for conditions that
            should not have a maximum count applied
        'retried_withmax' : number of times the subtask has been retried for conditions that
            should have a maximum count applied
        'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
613 614 615 616 617

      * Second value is an exception returned by the innards of the method.  If the retry was
        successfully submitted, this value will be the RetryTaskError that retry() returns.
        Otherwise, it (ought to be) the current_exception passed in.
    """
618
    task_id = subtask_status.task_id
619
    log.info("Task %s: Successfully sent to %s users; failed to send to %s users (and skipped %s users)",
620
             task_id, subtask_status.succeeded, subtask_status.failed, subtask_status.skipped)
621

622
    # Calculate time until we retry this task (in seconds):
623 624 625
    # The value for max_retries is increased by the number of times an "infinite-retry" exception
    # has been retried.  We want the regular retries to trigger max-retry checking, but not these
    # special retries.  So we count them separately.
626
    max_retries = _get_current_task().max_retries + subtask_status.retried_nomax
627
    base_delay = _get_current_task().default_retry_delay
628
    if skip_retry_max:
629
        # once we reach five retries, don't increase the countdown further.
630
        retry_index = min(subtask_status.retried_nomax, 5)
631
        exception_type = 'sending-rate'
632 633
        # if we have a cap, after all, apply it now:
        if hasattr(settings, 'BULK_EMAIL_INFINITE_RETRY_CAP'):
634
            retry_cap = settings.BULK_EMAIL_INFINITE_RETRY_CAP + subtask_status.retried_withmax
635
            max_retries = min(max_retries, retry_cap)
636
    else:
637
        retry_index = subtask_status.retried_withmax
638 639
        exception_type = 'transient'

640 641 642 643
    # Skew the new countdown value by a random factor, so that not all
    # retries are deferred by the same amount.
    countdown = ((2 ** retry_index) * base_delay) * random.uniform(.75, 1.25)

644 645
    log.warning('Task %s: email with id %d not delivered due to %s error %s, retrying send to %d recipients in %s seconds (with max_retry=%s)',
                task_id, email_id, exception_type, current_exception, len(to_list), countdown, max_retries)
646

647 648 649 650 651 652 653 654
    # we make sure that we update the InstructorTask with the current subtask status
    # *before* actually calling retry(), to be sure that there is no race
    # condition between this update and the update made by the retried task.
    update_subtask_status(entry_id, task_id, subtask_status)

    # Now attempt the retry.  If it succeeds, it returns a RetryTaskError that
    # needs to be returned back to Celery.  If it fails, we return the existing
    # exception.
655 656 657 658 659 660 661
    try:
        send_course_email.retry(
            args=[
                entry_id,
                email_id,
                to_list,
                global_email_context,
662
                subtask_status.to_dict(),
663 664
            ],
            exc=current_exception,
665
            countdown=countdown,
666
            max_retries=max_retries,
667 668 669
            throw=True,
        )
    except RetryTaskError as retry_error:
670
        # If the retry call is successful, update with the current progress:
671
        log.exception(u'Task %s: email with id %d caused send_course_email task to retry.',
672
                      task_id, email_id)
673
        return subtask_status, retry_error
674 675 676 677
    except Exception as retry_exc:
        # If there are no more retries, because the maximum has been reached,
        # we expect the original exception to be raised.  We catch it here
        # (and put it in retry_exc just in case it's different, but it shouldn't be),
678 679
        # and update status as if it were any other failure.  That means that
        # the recipients still in the to_list are counted as failures.
680
        log.exception(u'Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s',
681
                      task_id, email_id, [i['email'] for i in to_list])
682
        num_failed = len(to_list)
683 684
        subtask_status.increment(subtask_status, failed=num_failed, state=FAILURE)
        return subtask_status, retry_exc
685 686


687 688 689 690
def _statsd_tag(course_title):
    """
    Calculate the tag we will use for DataDog.
    """
691
    tag = u"course_email:{0}".format(course_title).encode('utf-8')
692
    return tag[:200]