tasks.py 32.4 KB
Newer Older
1 2 3 4
"""
This module contains celery task functions for handling the sending of bulk email
to a course.
"""
5
import re
6
import random
7
import json
8 9
from time import sleep

10
from dogapi import dog_stats_api
11 12
from smtplib import SMTPServerDisconnected, SMTPDataError, SMTPConnectError, SMTPException
from boto.ses.exceptions import (
13 14 15 16
    SESAddressNotVerifiedError,
    SESIdentityNotVerifiedError,
    SESDomainNotConfirmedError,
    SESAddressBlacklistedError,
17 18
    SESDailyQuotaExceededError,
    SESMaxSendingRateExceededError,
19
    SESDomainEndsWithDotError,
20
    SESLocalAddressCharacterError,
21
    SESIllegalAddressError,
22
)
23
from boto.exception import AWSConnectionError
24

25
from celery import task, current_task
26
from celery.utils.log import get_task_logger
27 28
from celery.states import SUCCESS, FAILURE, RETRY
from celery.exceptions import RetryTaskError
29

30
from django.conf import settings
31
from django.contrib.auth.models import User
32
from django.core.mail import EmailMultiAlternatives, get_connection
33
from django.core.urlresolvers import reverse
34

35 36
from bulk_email.models import (
    CourseEmail, Optout, CourseEmailTemplate,
37
    SEND_TO_MYSELF, SEND_TO_ALL, TO_OPTIONS,
38
)
39
from courseware.courses import get_course, course_image_url
40
from student.roles import CourseStaffRole, CourseInstructorRole
41 42
from instructor_task.models import InstructorTask
from instructor_task.subtasks import (
43
    SubtaskStatus,
44
    queue_subtasks_for_query,
45
    check_subtask_is_valid,
46
    update_subtask_status,
47
)
48

49
log = get_task_logger(__name__)
50 51


52 53
# Errors that an individual email is failing to be sent, and should just
# be treated as a fail.
54 55 56 57 58 59
SINGLE_EMAIL_FAILURE_ERRORS = (
    SESAddressBlacklistedError,  # Recipient's email address has been temporarily blacklisted.
    SESDomainEndsWithDotError,  # Recipient's email address' domain ends with a period/dot.
    SESIllegalAddressError,  # Raised when an illegal address is encountered.
    SESLocalAddressCharacterError,  # An address contained a control or whitespace character.
)
60

61 62
# Exceptions that, if caught, should cause the task to be re-tried.
# These errors will be caught a limited number of times before the task fails.
63 64 65 66 67
LIMITED_RETRY_ERRORS = (
    SMTPConnectError,
    SMTPServerDisconnected,
    AWSConnectionError,
)
68

69 70 71 72
# Errors that indicate that a mailing task should be retried without limit.
# An example is if email is being sent too quickly, but may succeed if sent
# more slowly.  When caught by a task, it triggers an exponential backoff and retry.
# Retries happen continuously until the email is sent.
73 74 75
# Note that the SMTPDataErrors here are only those within the 4xx range.
# Those not in this range (i.e. in the 5xx range) are treated as hard failures
# and thus like SINGLE_EMAIL_FAILURE_ERRORS.
76 77 78 79
INFINITE_RETRY_ERRORS = (
    SESMaxSendingRateExceededError,  # Your account's requests/second limit has been exceeded.
    SMTPDataError,
)
80

81 82 83
# Errors that are known to indicate an inability to send any more emails,
# and should therefore not be retried.  For example, exceeding a quota for emails.
# Also, any SMTP errors that are not explicitly enumerated above.
84 85 86 87 88 89 90
BULK_EMAIL_FAILURE_ERRORS = (
    SESAddressNotVerifiedError,  # Raised when a "Reply-To" address has not been validated in SES yet.
    SESIdentityNotVerifiedError,  # Raised when an identity has not been verified in SES yet.
    SESDomainNotConfirmedError,  # Raised when domain ownership is not confirmed for DKIM.
    SESDailyQuotaExceededError,  # 24-hour allotment of outbound email has been exceeded.
    SMTPException,
)
91

92

93
def _get_recipient_queryset(user_id, to_option, course_id, course_location):
94
    """
95
    Returns a query set of email recipients corresponding to the requested to_option category.
96 97

    `to_option` is either SEND_TO_MYSELF, SEND_TO_STAFF, or SEND_TO_ALL.
98 99 100

    Recipients who are in more than one category (e.g. enrolled in the course and are staff or self)
    will be properly deduped.
101
    """
102 103 104 105
    if to_option not in TO_OPTIONS:
        log.error("Unexpected bulk email TO_OPTION found: %s", to_option)
        raise Exception("Unexpected bulk email TO_OPTION found: {0}".format(to_option))

106 107
    if to_option == SEND_TO_MYSELF:
        recipient_qset = User.objects.filter(id=user_id)
108
    else:
109 110
        staff_qset = CourseStaffRole(course_location).users_with_role()
        instructor_qset = CourseInstructorRole(course_location).users_with_role()
111 112
        recipient_qset = staff_qset | instructor_qset
        if to_option == SEND_TO_ALL:
113 114
            # We also require students to have activated their accounts to
            # provide verification that the provided email address is valid.
115
            enrollment_qset = User.objects.filter(
116
                is_active=True,
117 118 119
                courseenrollment__course_id=course_id,
                courseenrollment__is_active=True
            )
120 121
            recipient_qset = recipient_qset | enrollment_qset
        recipient_qset = recipient_qset.distinct()
122

123 124 125 126
    recipient_qset = recipient_qset.order_by('pk')
    return recipient_qset


127
def _get_course_email_context(course):
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
    """
    Returns context arguments to apply to all emails, independent of recipient.
    """
    course_id = course.id
    course_title = course.display_name
    course_url = 'https://{}{}'.format(
        settings.SITE_NAME,
        reverse('course_root', kwargs={'course_id': course_id})
    )
    image_url = 'https://{}{}'.format(settings.SITE_NAME, course_image_url(course))
    email_context = {
        'course_title': course_title,
        'course_url': course_url,
        'course_image_url': image_url,
        'account_settings_url': 'https://{}{}'.format(settings.SITE_NAME, reverse('dashboard')),
        'platform_name': settings.PLATFORM_NAME,
    }
    return email_context


def perform_delegate_email_batches(entry_id, course_id, task_input, action_name):
149
    """
150
    Delegates emails by querying for the list of recipients who should
151 152
    get the mail, chopping up into batches of no more than settings.BULK_EMAIL_EMAILS_PER_TASK
    in size, and queueing up worker jobs.
153
    """
154
    entry = InstructorTask.objects.get(pk=entry_id)
155
    # Get inputs to use in this task from the entry.
156
    user_id = entry.requester.id
157
    task_id = entry.task_id
158

159
    # Perfunctory check, since expansion is made for convenience of other task
160 161
    # code that doesn't need the entry_id.
    if course_id != entry.course_id:
162
        format_msg = "Course id conflict: explicit value {} does not match task value {}"
163
        raise ValueError(format_msg.format(course_id, entry.course_id))
164

165
    # Fetch the CourseEmail.
166
    email_id = task_input['email_id']
167
    try:
168
        email_obj = CourseEmail.objects.get(id=email_id)
169
    except CourseEmail.DoesNotExist:
170
        # The CourseEmail object should be committed in the view function before the task
171
        # is submitted and reaches this point.
172
        log.warning("Task %s: Failed to get CourseEmail with id %s", task_id, email_id)
173
        raise
174

175 176 177 178 179 180 181 182 183 184 185 186
    # Check to see if email batches have already been defined.  This seems to
    # happen sometimes when there is a loss of connection while a task is being
    # queued.  When this happens, the same task gets called again, and a whole
    # new raft of subtasks gets queued up.  We will assume that if subtasks
    # have already been defined, there is no need to redefine them below.
    # So we just return right away.  We don't raise an exception, because we want
    # the current task to be marked with whatever it had been marked with before.
    if len(entry.subtasks) > 0 and len(entry.task_output) > 0:
        log.warning("Task %s has already been processed for email %s!  InstructorTask = %s", task_id, email_id, entry)
        progress = json.loads(entry.task_output)
        return progress

187
    # Sanity check that course for email_obj matches that of the task referencing it.
188
    if course_id != email_obj.course_id:
189
        format_msg = "Course id conflict: explicit value {} does not match email value {}"
190
        raise ValueError(format_msg.format(course_id, email_obj.course_id))
191

192
    # Fetch the course object.
193
    try:
194 195 196 197
        course = get_course(course_id)
    except ValueError:
        log.exception("Task %s: course not found: %s", task_id, course_id)
        raise
198

199
    # Get arguments that will be passed to every subtask.
200 201
    to_option = email_obj.to_option
    global_email_context = _get_course_email_context(course)
202

203 204 205
    def _create_send_email_subtask(to_list, initial_subtask_status):
        """Creates a subtask to send email to a given recipient list."""
        subtask_id = initial_subtask_status.task_id
206 207 208 209
        new_subtask = send_course_email.subtask(
            (
                entry_id,
                email_id,
210
                to_list,
211
                global_email_context,
212
                initial_subtask_status.to_dict(),
213 214 215
            ),
            task_id=subtask_id,
            routing_key=settings.BULK_EMAIL_ROUTING_KEY,
216
        )
217 218 219 220
        return new_subtask

    recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location)
    recipient_fields = ['profile__name', 'email']
221

222 223 224 225 226 227 228 229 230 231 232 233
    log.info("Task %s: Preparing to queue subtasks for sending emails for course %s, email %s, to_option %s",
             task_id, course_id, email_id, to_option)

    progress = queue_subtasks_for_query(
        entry,
        action_name,
        _create_send_email_subtask,
        recipient_qset,
        recipient_fields,
        settings.BULK_EMAIL_EMAILS_PER_QUERY,
        settings.BULK_EMAIL_EMAILS_PER_TASK
    )
234

235
    # We want to return progress here, as this is what will be stored in the
236
    # AsyncResult for the parent task as its return value.
237
    # The AsyncResult will then be marked as SUCCEEDED, and have this return value as its "result".
238
    # That's okay, for the InstructorTask will have the "real" status, and monitoring code
239
    # should be using that instead.
240
    return progress
241 242


243
@task(default_retry_delay=settings.BULK_EMAIL_DEFAULT_RETRY_DELAY, max_retries=settings.BULK_EMAIL_MAX_RETRIES)  # pylint: disable=E1102
244
def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status_dict):
245
    """
246 247 248 249 250 251 252 253 254
    Sends an email to a list of recipients.

    Inputs are:
      * `entry_id`: id of the InstructorTask object to which progress should be recorded.
      * `email_id`: id of the CourseEmail model that is to be emailed.
      * `to_list`: list of recipients.  Each is represented as a dict with the following keys:
        - 'profile__name': full name of User.
        - 'email': email address of User.
        - 'pk': primary key of User model.
255 256
      * `global_email_context`: dict containing values that are unique for this email but the same
        for all recipients of this email.  This dict is to be used to fill in slots in email
257
        template.  It does not include 'name' and 'email', which will be provided by the to_list.
258
      * `subtask_status_dict` : dict containing values representing current status.  Keys are:
259 260 261 262 263 264 265 266 267 268 269 270 271 272

        'task_id' : id of subtask.  This is used to pass task information across retries.
        'attempted' : number of attempts -- should equal succeeded plus failed
        'succeeded' : number that succeeded in processing
        'skipped' : number that were not processed.
        'failed' : number that failed during processing
        'retried_nomax' : number of times the subtask has been retried for conditions that
            should not have a maximum count applied
        'retried_withmax' : number of times the subtask has been retried for conditions that
            should have a maximum count applied
        'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)

        Most values will be zero on initial call, but may be different when the task is
        invoked as part of a retry.
273 274 275 276

    Sends to all addresses contained in to_list that are not also in the Optout table.
    Emails are sent multi-part, in both plain text and html.  Updates InstructorTask object
    with status information (sends, failures, skips) and updates number of subtasks completed.
277
    """
278 279
    subtask_status = SubtaskStatus.from_dict(subtask_status_dict)
    current_task_id = subtask_status.task_id
280
    num_to_send = len(to_list)
281 282
    log.info("Preparing to send email %s to %d recipients as subtask %s for instructor task %d: context = %s, status=%s",
             email_id, num_to_send, current_task_id, entry_id, global_email_context, subtask_status)
283

284 285 286
    # Check that the requested subtask is actually known to the current InstructorTask entry.
    # If this fails, it throws an exception, which should fail this subtask immediately.
    # This can happen when the parent task has been run twice, and results in duplicate
287 288 289 290 291 292
    # subtasks being created for the same InstructorTask entry.  This can happen when Celery
    # loses its connection to its broker, and any current tasks get requeued.
    # We hope to catch this condition in perform_delegate_email_batches() when it's the parent
    # task that is resubmitted, but just in case we fail to do so there, we check here as well.
    # There is also a possibility that this task will be run twice by Celery, for the same reason.
    # To deal with that, we need to confirm that the task has not already been completed.
293
    check_subtask_is_valid(entry_id, current_task_id, subtask_status)
294

295
    send_exception = None
296
    new_subtask_status = None
297 298 299
    try:
        course_title = global_email_context['course_title']
        with dog_stats_api.timer('course_email.single_task.time.overall', tags=[_statsd_tag(course_title)]):
300
            new_subtask_status, send_exception = _send_course_email(
301
                entry_id,
302 303 304
                email_id,
                to_list,
                global_email_context,
305
                subtask_status,
306
            )
307
    except Exception:
308
        # Unexpected exception. Try to write out the failure to the entry before failing.
309
        log.exception("Send-email task %s for email %s: failed unexpectedly!", current_task_id, email_id)
310 311 312
        # We got here for really unexpected reasons.  Since we don't know how far
        # the task got in emailing, we count all recipients as having failed.
        # It at least keeps the counts consistent.
313 314
        subtask_status.increment(failed=num_to_send, state=FAILURE)
        update_subtask_status(entry_id, current_task_id, subtask_status)
315
        raise
316 317 318

    if send_exception is None:
        # Update the InstructorTask object that is storing its progress.
319
        log.info("Send-email task %s for email %s: succeeded", current_task_id, email_id)
320
        update_subtask_status(entry_id, current_task_id, new_subtask_status)
321
    elif isinstance(send_exception, RetryTaskError):
322 323 324 325
        # If retrying, a RetryTaskError needs to be returned to Celery.
        # We assume that the the progress made before the retry condition
        # was encountered has already been updated before the retry call was made,
        # so we only log here.
326
        log.warning("Send-email task %s for email %s: being retried", current_task_id, email_id)
327
        raise send_exception  # pylint: disable=E0702
328
    else:
329
        log.error("Send-email task %s for email %s: failed: %s", current_task_id, email_id, send_exception)
330
        update_subtask_status(entry_id, current_task_id, new_subtask_status)
331
        raise send_exception  # pylint: disable=E0702
332

333 334 335
    # return status in a form that can be serialized by Celery into JSON:
    log.info("Send-email task %s for email %s: returning status %s", current_task_id, email_id, new_subtask_status)
    return new_subtask_status.to_dict()
336

337

338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
def _filter_optouts_from_recipients(to_list, course_id):
    """
    Filters a recipient list based on student opt-outs for a given course.

    Returns the filtered recipient list, as well as the number of optouts
    removed from the list.
    """
    optouts = Optout.objects.filter(
        course_id=course_id,
        user__in=[i['pk'] for i in to_list]
    ).values_list('user__email', flat=True)
    optouts = set(optouts)
    # Only count the num_optout for the first time the optouts are calculated.
    # We assume that the number will not change on retries, and so we don't need
    # to calculate it each time.
    num_optout = len(optouts)
    to_list = [recipient for recipient in to_list if recipient['email'] not in optouts]
    return to_list, num_optout


def _get_source_address(course_id, course_title):
    """
    Calculates an email address to be used as the 'from-address' for sent emails.

    Makes a unique from name and address for each course, e.g.

        "COURSE_TITLE" Course Staff <coursenum-no-reply@courseupdates.edx.org>

    """
    course_title_no_quotes = re.sub(r'"', '', course_title)

    # The course_id is assumed to be in the form 'org/course_num/run',
    # so pull out the course_num.  Then make sure that it can be used
    # in an email address, by substituting a '_' anywhere a non-(ascii, period, or dash)
    # character appears.
    course_num = course_id.split('/')[1]
374 375
    invalid_chars = re.compile(r"[^\w.-]")
    course_num = invalid_chars.sub('_', course_num)
376 377 378 379 380

    from_addr = '"{0}" Course Staff <{1}-{2}>'.format(course_title_no_quotes, course_num, settings.BULK_EMAIL_DEFAULT_FROM_EMAIL)
    return from_addr


381
def _send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status):
382 383
    """
    Performs the email sending task.
384

385 386 387 388 389 390 391 392 393
    Sends an email to a list of recipients.

    Inputs are:
      * `entry_id`: id of the InstructorTask object to which progress should be recorded.
      * `email_id`: id of the CourseEmail model that is to be emailed.
      * `to_list`: list of recipients.  Each is represented as a dict with the following keys:
        - 'profile__name': full name of User.
        - 'email': email address of User.
        - 'pk': primary key of User model.
394 395
      * `global_email_context`: dict containing values that are unique for this email but the same
        for all recipients of this email.  This dict is to be used to fill in slots in email
396
        template.  It does not include 'name' and 'email', which will be provided by the to_list.
397
      * `subtask_status` : object of class SubtaskStatus representing current status.
398 399 400 401

    Sends to all addresses contained in to_list that are not also in the Optout table.
    Emails are sent multi-part, in both plain text and html.

402
    Returns a tuple of two values:
403
      * First value is a SubtaskStatus object which represents current progress at the end of this call.
404

405 406 407
      * Second value is an exception returned by the innards of the method, indicating a fatal error.
        In this case, the number of recipients that were not sent have already been added to the
        'failed' count above.
408
    """
409
    # Get information from current task's request:
410
    task_id = subtask_status.task_id
411

412
    try:
413
        course_email = CourseEmail.objects.get(id=email_id)
414 415
    except CourseEmail.DoesNotExist as exc:
        log.exception("Task %s: could not find email id:%s to send.", task_id, email_id)
416
        raise
417

418
    # Exclude optouts (if not a retry):
419 420 421 422 423
    # Note that we don't have to do the optout logic at all if this is a retry,
    # because we have presumably already performed the optout logic on the first
    # attempt.  Anyone on the to_list on a retry has already passed the filter
    # that existed at that time, and we don't need to keep checking for changes
    # in the Optout list.
424
    if subtask_status.get_retry_count() == 0:
425
        to_list, num_optout = _filter_optouts_from_recipients(to_list, course_email.course_id)
426
        subtask_status.increment(skipped=num_optout)
427

428 429
    course_title = global_email_context['course_title']
    subject = "[" + course_title + "] " + course_email.subject
430
    from_addr = _get_source_address(course_email.course_id, course_title)
431

432
    course_email_template = CourseEmailTemplate.get_template()
433 434 435 436
    try:
        connection = get_connection()
        connection.open()

437
        # Define context values to use in all course emails:
438
        email_context = {'name': '', 'email': ''}
439
        email_context.update(global_email_context)
440

441
        while to_list:
442 443 444 445 446 447 448
            # Update context with user-specific values from the user at the end of the list.
            # At the end of processing this user, they will be popped off of the to_list.
            # That way, the to_list will always contain the recipients remaining to be emailed.
            # This is convenient for retries, which will need to send to those who haven't
            # yet been emailed, but not send to those who have already been sent to.
            current_recipient = to_list[-1]
            email = current_recipient['email']
449
            email_context['email'] = email
450
            email_context['name'] = current_recipient['profile__name']
451

452
            # Construct message content using templates and context:
453 454
            plaintext_msg = course_email_template.render_plaintext(course_email.text_message, email_context)
            html_msg = course_email_template.render_htmltext(course_email.html_message, email_context)
455

456
            # Create email:
457 458
            email_msg = EmailMultiAlternatives(
                subject,
459
                plaintext_msg,
460 461 462 463
                from_addr,
                [email],
                connection=connection
            )
464
            email_msg.attach_alternative(html_msg, 'text/html')
465

466 467 468 469 470
            # Throttle if we have gotten the rate limiter.  This is not very high-tech,
            # but if a task has been retried for rate-limiting reasons, then we sleep
            # for a period of time between all emails within this task.  Choice of
            # the value depends on the number of workers that might be sending email in
            # parallel, and what the SES throttle rate is.
471
            if subtask_status.retried_nomax > 0:
472
                sleep(settings.BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS)
473 474

            try:
475
                log.debug('Email with id %s to be sent to %s', email_id, email)
476

477 478
                with dog_stats_api.timer('course_email.single_send.time.overall', tags=[_statsd_tag(course_title)]):
                    connection.send_messages([email_msg])
479

480
            except SMTPDataError as exc:
481
                # According to SMTP spec, we'll retry error codes in the 4xx range.  5xx range indicates hard failure.
482
                if exc.smtp_code >= 400 and exc.smtp_code < 500:
483
                    # This will cause the outer handler to catch the exception and retry the entire task.
484
                    raise exc
485
                else:
486
                    # This will fall through and not retry the message.
487
                    log.warning('Task %s: email with id %s not delivered to %s due to error %s', task_id, email_id, email, exc.smtp_error)
488
                    dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
489
                    subtask_status.increment(failed=1)
490

491
            except SINGLE_EMAIL_FAILURE_ERRORS as exc:
492
                # This will fall through and not retry the message.
493 494
                log.warning('Task %s: email with id %s not delivered to %s due to error %s', task_id, email_id, email, exc)
                dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
495
                subtask_status.increment(failed=1)
496 497 498

            else:
                dog_stats_api.increment('course_email.sent', tags=[_statsd_tag(course_title)])
499 500 501 502
                if settings.BULK_EMAIL_LOG_SENT_EMAILS:
                    log.info('Email with id %s sent to %s', email_id, email)
                else:
                    log.debug('Email with id %s sent to %s', email_id, email)
503
                subtask_status.increment(succeeded=1)
504

505 506 507
            # Pop the user that was emailed off the end of the list only once they have
            # successfully been processed.  (That way, if there were a failure that
            # needed to be retried, the user is still on the list.)
508 509
            to_list.pop()

510
    except INFINITE_RETRY_ERRORS as exc:
511
        dog_stats_api.increment('course_email.infinite_retry', tags=[_statsd_tag(course_title)])
512 513
        # Increment the "retried_nomax" counter, update other counters with progress to date,
        # and set the state to RETRY:
514
        subtask_status.increment(retried_nomax=1, state=RETRY)
515
        return _submit_for_retry(
516
            entry_id, email_id, to_list, global_email_context, exc, subtask_status, skip_retry_max=True
517 518
        )

519
    except LIMITED_RETRY_ERRORS as exc:
520 521 522
        # Errors caught here cause the email to be retried.  The entire task is actually retried
        # without popping the current recipient off of the existing list.
        # Errors caught are those that indicate a temporary condition that might succeed on retry.
523
        dog_stats_api.increment('course_email.limited_retry', tags=[_statsd_tag(course_title)])
524 525
        # Increment the "retried_withmax" counter, update other counters with progress to date,
        # and set the state to RETRY:
526
        subtask_status.increment(retried_withmax=1, state=RETRY)
527
        return _submit_for_retry(
528
            entry_id, email_id, to_list, global_email_context, exc, subtask_status, skip_retry_max=False
529
        )
530

531 532
    except BULK_EMAIL_FAILURE_ERRORS as exc:
        dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
533 534 535
        num_pending = len(to_list)
        log.exception('Task %s: email with id %d caused send_course_email task to fail with "fatal" exception.  %d emails unsent.',
                      task_id, email_id, num_pending)
536 537
        # Update counters with progress to date, counting unsent emails as failures,
        # and set the state to FAILURE:
538 539
        subtask_status.increment(failed=num_pending, state=FAILURE)
        return subtask_status, exc
540 541 542 543 544 545 546 547 548

    except Exception as exc:
        # Errors caught here cause the email to be retried.  The entire task is actually retried
        # without popping the current recipient off of the existing list.
        # These are unexpected errors.  Since they might be due to a temporary condition that might
        # succeed on retry, we give them a retry.
        dog_stats_api.increment('course_email.limited_retry', tags=[_statsd_tag(course_title)])
        log.exception('Task %s: email with id %d caused send_course_email task to fail with unexpected exception.  Generating retry.',
                      task_id, email_id)
549 550
        # Increment the "retried_withmax" counter, update other counters with progress to date,
        # and set the state to RETRY:
551
        subtask_status.increment(retried_withmax=1, state=RETRY)
552
        return _submit_for_retry(
553
            entry_id, email_id, to_list, global_email_context, exc, subtask_status, skip_retry_max=False
554 555
        )

556
    else:
557 558
        # All went well.  Update counters with progress to date,
        # and set the state to SUCCESS:
559
        subtask_status.increment(state=SUCCESS)
560
        # Successful completion is marked by an exception value of None.
561
        return subtask_status, None
562
    finally:
563
        # Clean up at the end.
564
        connection.close()
565 566


567
def _get_current_task():
568 569 570 571 572 573 574 575 576
    """
    Stub to make it easier to test without actually running Celery.

    This is a wrapper around celery.current_task, which provides access
    to the top of the stack of Celery's tasks.  When running tests, however,
    it doesn't seem to work to mock current_task directly, so this wrapper
    is used to provide a hook to mock in tests, while providing the real
    `current_task` in production.
    """
577 578 579
    return current_task


580
def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current_exception, subtask_status, skip_retry_max=False):
581 582 583 584 585
    """
    Helper function to requeue a task for retry, using the new version of arguments provided.

    Inputs are the same as for running a task, plus two extra indicating the state at the time of retry.
    These include the `current_exception` that the task encountered that is causing the retry attempt,
586 587
    and the `subtask_status` that is to be returned.  A third extra argument `skip_retry_max`
    indicates whether the current retry should be subject to a maximum test.
588 589 590 591

    Returns a tuple of two values:
      * First value is a dict which represents current progress.  Keys are:

592 593 594 595 596 597 598 599 600 601
        'task_id' : id of subtask.  This is used to pass task information across retries.
        'attempted' : number of attempts -- should equal succeeded plus failed
        'succeeded' : number that succeeded in processing
        'skipped' : number that were not processed.
        'failed' : number that failed during processing
        'retried_nomax' : number of times the subtask has been retried for conditions that
            should not have a maximum count applied
        'retried_withmax' : number of times the subtask has been retried for conditions that
            should have a maximum count applied
        'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
602 603 604 605 606

      * Second value is an exception returned by the innards of the method.  If the retry was
        successfully submitted, this value will be the RetryTaskError that retry() returns.
        Otherwise, it (ought to be) the current_exception passed in.
    """
607
    task_id = subtask_status.task_id
608
    log.info("Task %s: Successfully sent to %s users; failed to send to %s users (and skipped %s users)",
609
             task_id, subtask_status.succeeded, subtask_status.failed, subtask_status.skipped)
610

611
    # Calculate time until we retry this task (in seconds):
612 613 614
    # The value for max_retries is increased by the number of times an "infinite-retry" exception
    # has been retried.  We want the regular retries to trigger max-retry checking, but not these
    # special retries.  So we count them separately.
615
    max_retries = _get_current_task().max_retries + subtask_status.retried_nomax
616
    base_delay = _get_current_task().default_retry_delay
617
    if skip_retry_max:
618
        # once we reach five retries, don't increase the countdown further.
619
        retry_index = min(subtask_status.retried_nomax, 5)
620
        exception_type = 'sending-rate'
621 622
        # if we have a cap, after all, apply it now:
        if hasattr(settings, 'BULK_EMAIL_INFINITE_RETRY_CAP'):
623
            retry_cap = settings.BULK_EMAIL_INFINITE_RETRY_CAP + subtask_status.retried_withmax
624
            max_retries = min(max_retries, retry_cap)
625
    else:
626
        retry_index = subtask_status.retried_withmax
627 628
        exception_type = 'transient'

629 630 631 632
    # Skew the new countdown value by a random factor, so that not all
    # retries are deferred by the same amount.
    countdown = ((2 ** retry_index) * base_delay) * random.uniform(.75, 1.25)

633 634
    log.warning('Task %s: email with id %d not delivered due to %s error %s, retrying send to %d recipients in %s seconds (with max_retry=%s)',
                task_id, email_id, exception_type, current_exception, len(to_list), countdown, max_retries)
635

636 637 638 639 640 641 642 643
    # we make sure that we update the InstructorTask with the current subtask status
    # *before* actually calling retry(), to be sure that there is no race
    # condition between this update and the update made by the retried task.
    update_subtask_status(entry_id, task_id, subtask_status)

    # Now attempt the retry.  If it succeeds, it returns a RetryTaskError that
    # needs to be returned back to Celery.  If it fails, we return the existing
    # exception.
644 645 646 647 648 649 650
    try:
        send_course_email.retry(
            args=[
                entry_id,
                email_id,
                to_list,
                global_email_context,
651
                subtask_status.to_dict(),
652 653
            ],
            exc=current_exception,
654
            countdown=countdown,
655
            max_retries=max_retries,
656 657 658
            throw=True,
        )
    except RetryTaskError as retry_error:
659
        # If the retry call is successful, update with the current progress:
660 661
        log.exception('Task %s: email with id %d caused send_course_email task to retry.',
                      task_id, email_id)
662
        return subtask_status, retry_error
663 664 665 666
    except Exception as retry_exc:
        # If there are no more retries, because the maximum has been reached,
        # we expect the original exception to be raised.  We catch it here
        # (and put it in retry_exc just in case it's different, but it shouldn't be),
667 668
        # and update status as if it were any other failure.  That means that
        # the recipients still in the to_list are counted as failures.
669 670
        log.exception('Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s',
                      task_id, email_id, [i['email'] for i in to_list])
671
        num_failed = len(to_list)
672 673
        subtask_status.increment(subtask_status, failed=num_failed, state=FAILURE)
        return subtask_status, retry_exc
674 675


676 677 678 679 680 681
def _statsd_tag(course_title):
    """
    Calculate the tag we will use for DataDog.
    """
    tag = "course_email:{0}".format(course_title)
    return tag[:200]