test_tasks.py 20.7 KB
Newer Older
1
# -*- coding: utf-8 -*-
2 3 4 5 6 7 8 9 10
"""
Unit tests for LMS instructor-initiated background tasks.

Runs tasks on answers to course problems to validate that code
paths actually work.

"""
import json
from uuid import uuid4
11
from itertools import cycle, chain, repeat
12
from mock import patch, Mock
13
from nose.plugins.attrib import attr
14 15
from smtplib import SMTPServerDisconnected, SMTPDataError, SMTPConnectError, SMTPAuthenticationError
from boto.ses.exceptions import (
16 17 18 19
    SESAddressNotVerifiedError,
    SESIdentityNotVerifiedError,
    SESDomainNotConfirmedError,
    SESAddressBlacklistedError,
20 21
    SESDailyQuotaExceededError,
    SESMaxSendingRateExceededError,
22
    SESDomainEndsWithDotError,
23
    SESLocalAddressCharacterError,
24
    SESIllegalAddressError,
25 26
)
from boto.exception import AWSConnectionError
27

28
from celery.states import SUCCESS, FAILURE  # pylint: disable=no-name-in-module, import-error
29 30 31 32

from django.conf import settings
from django.core.management import call_command

33 34
from xmodule.modulestore.tests.factories import CourseFactory

35
from bulk_email.models import CourseEmail, Optout, SEND_TO_ALL
36 37

from instructor_task.tasks import send_bulk_course_email
38
from instructor_task.subtasks import update_subtask_status, SubtaskStatus
39 40 41
from instructor_task.models import InstructorTask
from instructor_task.tests.test_base import InstructorTaskCourseTestCase
from instructor_task.tests.factories import InstructorTaskFactory
42
from opaque_keys.edx.locations import SlashSeparatedCourseKey
43 44 45 46 47 48 49


class TestTaskFailure(Exception):
    """Dummy exception used for unit tests."""
    pass


50 51 52 53 54 55 56 57 58 59 60 61
def my_update_subtask_status(entry_id, current_task_id, new_subtask_status):
    """
    Check whether a subtask has been updated before really updating.

    Check whether a subtask which has been retried
    has had the retry already write its results here before the code
    that was invoking the retry had a chance to update this status.

    This is the norm in "eager" mode (used by tests) where the retry is called
    and run to completion before control is returned to the code that
    invoked the retry.  If the retries eventually end in failure (e.g. due to
    a maximum number of retries being attempted), the "eager" code will return
62 63
    the error for each retry as it is popped off the stack.  We want to just ignore
    the later updates that are called as the result of the earlier retries.
64 65 66 67 68 69 70

    This should not be an issue in production, where status is updated before
    a task is retried, and is then updated afterwards if the retry fails.
    """
    entry = InstructorTask.objects.get(pk=entry_id)
    subtask_dict = json.loads(entry.subtasks)
    subtask_status_info = subtask_dict['status']
71 72 73
    current_subtask_status = SubtaskStatus.from_dict(subtask_status_info[current_task_id])
    current_retry_count = current_subtask_status.get_retry_count()
    new_retry_count = new_subtask_status.get_retry_count()
74 75 76 77
    if current_retry_count <= new_retry_count:
        update_subtask_status(entry_id, current_task_id, new_subtask_status)


78
@attr('shard_1')
79
@patch('bulk_email.models.html_to_text', Mock(return_value='Mocking CourseEmail.text_message', autospec=True))
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
    """Tests instructor task that send bulk email."""

    def setUp(self):
        super(TestBulkEmailInstructorTask, self).setUp()
        self.initialize_course()
        self.instructor = self.create_instructor('instructor')

        # load initial content (since we don't run migrations as part of tests):
        call_command("loaddata", "course_email_template.json")

    def _create_input_entry(self, course_id=None):
        """
        Creates a InstructorTask entry for testing.

        Overrides the base class version in that this creates CourseEmail.
        """
        to_option = SEND_TO_ALL
        course_id = course_id or self.course.id
99 100 101
        course_email = CourseEmail.create(
            course_id, self.instructor, to_option, "Test Subject", "<p>This is a test message</p>"
        )
102
        task_input = {'email_id': course_email.id}
103 104 105 106 107 108 109 110 111 112
        task_id = str(uuid4())
        instructor_task = InstructorTaskFactory.create(
            course_id=course_id,
            requester=self.instructor,
            task_input=json.dumps(task_input),
            task_key='dummy value',
            task_id=task_id,
        )
        return instructor_task

113
    def _run_task_with_mock_celery(self, task_class, entry_id, task_id):
114
        """Mock was not needed for some tests, testing to see if it's needed at all."""
115
        task_args = [entry_id, {}]
116
        return task_class.apply(task_args, task_id=task_id).get()
117 118 119 120 121 122 123 124

    def test_email_missing_current_task(self):
        task_entry = self._create_input_entry()
        with self.assertRaises(ValueError):
            send_bulk_course_email(task_entry.id, {})

    def test_email_undefined_course(self):
        # Check that we fail when passing in a course that doesn't exist.
125
        task_entry = self._create_input_entry(course_id=SlashSeparatedCourseKey("bogus", "course", "id"))
126 127 128
        with self.assertRaises(ValueError):
            self._run_task_with_mock_celery(send_bulk_course_email, task_entry.id, task_entry.task_id)

129 130 131 132 133 134 135 136 137 138
    def test_bad_task_id_on_update(self):
        task_entry = self._create_input_entry()

        def dummy_update_subtask_status(entry_id, _current_task_id, new_subtask_status):
            """Passes a bad value for task_id to test update_subtask_status"""
            bogus_task_id = "this-is-bogus"
            update_subtask_status(entry_id, bogus_task_id, new_subtask_status)

        with self.assertRaises(ValueError):
            with patch('bulk_email.tasks.update_subtask_status', dummy_update_subtask_status):
139
                send_bulk_course_email(task_entry.id, {})
140

141
    def _create_students(self, num_students):
142 143
        """Create students for testing"""
        return [self.create_student('robot%d' % i) for i in xrange(num_students)]
144

145 146 147 148 149 150
    def _assert_single_subtask_status(self, entry, succeeded, failed=0, skipped=0, retried_nomax=0, retried_withmax=0):
        """Compare counts with 'subtasks' entry in InstructorTask table."""
        subtask_info = json.loads(entry.subtasks)
        # verify subtask-level counts:
        self.assertEquals(subtask_info.get('total'), 1)
        self.assertEquals(subtask_info.get('succeeded'), 1 if succeeded > 0 else 0)
151
        self.assertEquals(subtask_info.get('failed'), 0 if succeeded > 0 else 1)
152
        # verify individual subtask status:
153
        subtask_status_info = subtask_info.get('status')
154 155 156 157
        task_id_list = subtask_status_info.keys()
        self.assertEquals(len(task_id_list), 1)
        task_id = task_id_list[0]
        subtask_status = subtask_status_info.get(task_id)
158
        print "Testing subtask status: {}".format(subtask_status)
159 160 161 162 163 164 165 166
        self.assertEquals(subtask_status.get('task_id'), task_id)
        self.assertEquals(subtask_status.get('attempted'), succeeded + failed)
        self.assertEquals(subtask_status.get('succeeded'), succeeded)
        self.assertEquals(subtask_status.get('skipped'), skipped)
        self.assertEquals(subtask_status.get('failed'), failed)
        self.assertEquals(subtask_status.get('retried_nomax'), retried_nomax)
        self.assertEquals(subtask_status.get('retried_withmax'), retried_withmax)
        self.assertEquals(subtask_status.get('state'), SUCCESS if succeeded > 0 else FAILURE)
167

168 169 170
    def _test_run_with_task(
            self, task_class, action_name, total, succeeded,
            failed=0, skipped=0, retried_nomax=0, retried_withmax=0):
171 172 173
        """Run a task and check the number of emails processed."""
        task_entry = self._create_input_entry()
        parent_status = self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id)
174

175 176 177
        # check return value
        self.assertEquals(parent_status.get('total'), total)
        self.assertEquals(parent_status.get('action_name'), action_name)
178 179

        # compare with task_output entry in InstructorTask table:
180 181 182 183
        entry = InstructorTask.objects.get(id=task_entry.id)
        status = json.loads(entry.task_output)
        self.assertEquals(status.get('attempted'), succeeded + failed)
        self.assertEquals(status.get('succeeded'), succeeded)
184 185
        self.assertEquals(status.get('skipped'), skipped)
        self.assertEquals(status.get('failed'), failed)
186 187 188 189
        self.assertEquals(status.get('total'), total)
        self.assertEquals(status.get('action_name'), action_name)
        self.assertGreater(status.get('duration_ms'), 0)
        self.assertEquals(entry.task_state, SUCCESS)
190
        self._assert_single_subtask_status(entry, succeeded, failed, skipped, retried_nomax, retried_withmax)
191
        return entry
192 193

    def test_successful(self):
194
        # Select number of emails to fit into a single subtask.
195
        num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
196 197
        # We also send email to the instructor:
        self._create_students(num_emails - 1)
198 199 200 201
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            get_conn.return_value.send_messages.side_effect = cycle([None])
            self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails)

202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
    def test_successful_twice(self):
        # Select number of emails to fit into a single subtask.
        num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
        # We also send email to the instructor:
        self._create_students(num_emails - 1)
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            get_conn.return_value.send_messages.side_effect = cycle([None])
            task_entry = self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails)

        # submit the same task a second time, and confirm that it is not run again.
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            get_conn.return_value.send_messages.side_effect = cycle([Exception("This should not happen!")])
            parent_status = self._run_task_with_mock_celery(send_bulk_course_email, task_entry.id, task_entry.task_id)
        self.assertEquals(parent_status.get('total'), num_emails)
        self.assertEquals(parent_status.get('succeeded'), num_emails)
        self.assertEquals(parent_status.get('failed'), 0)

219 220
    def test_unactivated_user(self):
        # Select number of emails to fit into a single subtask.
221
        num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
222 223 224 225 226 227 228 229 230 231
        # We also send email to the instructor:
        students = self._create_students(num_emails - 1)
        # mark a student as not yet having activated their email:
        student = students[0]
        student.is_active = False
        student.save()
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            get_conn.return_value.send_messages.side_effect = cycle([None])
            self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails - 1, num_emails - 1)

232 233
    def test_skipped(self):
        # Select number of emails to fit into a single subtask.
234
        num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
235 236 237 238 239 240 241 242 243 244
        # We also send email to the instructor:
        students = self._create_students(num_emails - 1)
        # have every fourth student optout:
        expected_skipped = int((num_emails + 3) / 4.0)
        expected_succeeds = num_emails - expected_skipped
        for index in range(0, num_emails, 4):
            Optout.objects.create(user=students[index], course_id=self.course.id)
        # mark some students as opting out
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            get_conn.return_value.send_messages.side_effect = cycle([None])
245 246 247
            self._test_run_with_task(
                send_bulk_course_email, 'emailed', num_emails, expected_succeeds, skipped=expected_skipped
            )
248 249 250 251

    def _test_email_address_failures(self, exception):
        """Test that celery handles bad address errors by failing and not retrying."""
        # Select number of emails to fit into a single subtask.
252
        num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
253 254
        # We also send email to the instructor:
        self._create_students(num_emails - 1)
255 256 257
        expected_fails = int((num_emails + 3) / 4.0)
        expected_succeeds = num_emails - expected_fails
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
258 259
            # have every fourth email fail due to some address failure:
            get_conn.return_value.send_messages.side_effect = cycle([exception, None, None, None])
260 261 262
            self._test_run_with_task(
                send_bulk_course_email, 'emailed', num_emails, expected_succeeds, failed=expected_fails
            )
263

264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
    def test_smtp_blacklisted_user(self):
        # Test that celery handles permanent SMTPDataErrors by failing and not retrying.
        self._test_email_address_failures(SMTPDataError(554, "Email address is blacklisted"))

    def test_ses_blacklisted_user(self):
        # Test that celery handles permanent SMTPDataErrors by failing and not retrying.
        self._test_email_address_failures(SESAddressBlacklistedError(554, "Email address is blacklisted"))

    def test_ses_illegal_address(self):
        # Test that celery handles permanent SMTPDataErrors by failing and not retrying.
        self._test_email_address_failures(SESIllegalAddressError(554, "Email address is illegal"))

    def test_ses_local_address_character_error(self):
        # Test that celery handles permanent SMTPDataErrors by failing and not retrying.
        self._test_email_address_failures(SESLocalAddressCharacterError(554, "Email address contains a bad character"))

280 281 282 283
    def test_ses_domain_ends_with_dot(self):
        # Test that celery handles permanent SMTPDataErrors by failing and not retrying.
        self._test_email_address_failures(SESDomainEndsWithDotError(554, "Email address ends with a dot"))

284 285 286 287 288 289 290
    def _test_retry_after_limited_retry_error(self, exception):
        """Test that celery handles connection failures by retrying."""
        # If we want the batch to succeed, we need to send fewer emails
        # than the max retries, so that the max is not triggered.
        num_emails = settings.BULK_EMAIL_MAX_RETRIES
        # We also send email to the instructor:
        self._create_students(num_emails - 1)
291 292 293
        expected_fails = 0
        expected_succeeds = num_emails
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
294 295
            # Have every other mail attempt fail due to disconnection.
            get_conn.return_value.send_messages.side_effect = cycle([exception, None])
296 297 298 299 300 301 302 303 304
            self._test_run_with_task(
                send_bulk_course_email,
                'emailed',
                num_emails,
                expected_succeeds,
                failed=expected_fails,
                retried_withmax=num_emails
            )

305 306 307 308 309 310 311
    def _test_max_retry_limit_causes_failure(self, exception):
        """Test that celery can hit a maximum number of retries."""
        # Doesn't really matter how many recipients, since we expect
        # to fail on the first.
        num_emails = 10
        # We also send email to the instructor:
        self._create_students(num_emails - 1)
312
        expected_fails = num_emails
313 314 315
        expected_succeeds = 0
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            # always fail to connect, triggering repeated retries until limit is hit:
316
            get_conn.return_value.send_messages.side_effect = cycle([exception])
317 318 319 320 321 322 323 324 325
            with patch('bulk_email.tasks.update_subtask_status', my_update_subtask_status):
                self._test_run_with_task(
                    send_bulk_course_email,
                    'emailed',
                    num_emails,
                    expected_succeeds,
                    failed=expected_fails,
                    retried_withmax=(settings.BULK_EMAIL_MAX_RETRIES + 1)
                )
326 327 328 329 330 331 332 333 334 335 336 337 338 339

    def test_retry_after_smtp_disconnect(self):
        self._test_retry_after_limited_retry_error(SMTPServerDisconnected(425, "Disconnecting"))

    def test_max_retry_after_smtp_disconnect(self):
        self._test_max_retry_limit_causes_failure(SMTPServerDisconnected(425, "Disconnecting"))

    def test_retry_after_smtp_connect_error(self):
        self._test_retry_after_limited_retry_error(SMTPConnectError(424, "Bad Connection"))

    def test_max_retry_after_smtp_connect_error(self):
        self._test_max_retry_limit_causes_failure(SMTPConnectError(424, "Bad Connection"))

    def test_retry_after_aws_connect_error(self):
340 341 342
        self._test_retry_after_limited_retry_error(
            AWSConnectionError("Unable to provide secure connection through proxy")
        )
343 344

    def test_max_retry_after_aws_connect_error(self):
345 346 347
        self._test_max_retry_limit_causes_failure(
            AWSConnectionError("Unable to provide secure connection through proxy")
        )
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385

    def test_retry_after_general_error(self):
        self._test_retry_after_limited_retry_error(Exception("This is some random exception."))

    def test_max_retry_after_general_error(self):
        self._test_max_retry_limit_causes_failure(Exception("This is some random exception."))

    def _test_retry_after_unlimited_retry_error(self, exception):
        """Test that celery handles throttling failures by retrying."""
        num_emails = 8
        # We also send email to the instructor:
        self._create_students(num_emails - 1)
        expected_fails = 0
        expected_succeeds = num_emails
        # Note that because celery in eager mode will call retries synchronously,
        # each retry will increase the stack depth.  It turns out that there is a
        # maximum depth at which a RuntimeError is raised ("maximum recursion depth
        # exceeded").  The maximum recursion depth is 90, so
        # num_emails * expected_retries < 90.
        expected_retries = 10
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            # Cycle through N throttling errors followed by a success.
            get_conn.return_value.send_messages.side_effect = cycle(
                chain(repeat(exception, expected_retries), [None])
            )
            self._test_run_with_task(
                send_bulk_course_email,
                'emailed',
                num_emails,
                expected_succeeds,
                failed=expected_fails,
                retried_nomax=(expected_retries * num_emails)
            )

    def test_retry_after_smtp_throttling_error(self):
        self._test_retry_after_unlimited_retry_error(SMTPDataError(455, "Throttling: Sending rate exceeded"))

    def test_retry_after_ses_throttling_error(self):
386 387 388
        self._test_retry_after_unlimited_retry_error(
            SESMaxSendingRateExceededError(455, "Throttling: Sending rate exceeded")
        )
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414

    def _test_immediate_failure(self, exception):
        """Test that celery can hit a maximum number of retries."""
        # Doesn't really matter how many recipients, since we expect
        # to fail on the first.
        num_emails = 10
        # We also send email to the instructor:
        self._create_students(num_emails - 1)
        expected_fails = num_emails
        expected_succeeds = 0
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            # always fail to connect, triggering repeated retries until limit is hit:
            get_conn.return_value.send_messages.side_effect = cycle([exception])
            self._test_run_with_task(
                send_bulk_course_email,
                'emailed',
                num_emails,
                expected_succeeds,
                failed=expected_fails,
            )

    def test_failure_on_unhandled_smtp(self):
        self._test_immediate_failure(SMTPAuthenticationError(403, "That password doesn't work!"))

    def test_failure_on_ses_quota_exceeded(self):
        self._test_immediate_failure(SESDailyQuotaExceededError(403, "You're done for the day!"))
415 416 417 418 419 420 421 422 423

    def test_failure_on_ses_address_not_verified(self):
        self._test_immediate_failure(SESAddressNotVerifiedError(403, "Who *are* you?"))

    def test_failure_on_ses_identity_not_verified(self):
        self._test_immediate_failure(SESIdentityNotVerifiedError(403, "May I please see an ID!"))

    def test_failure_on_ses_domain_not_confirmed(self):
        self._test_immediate_failure(SESDomainNotConfirmedError(403, "You're out of bounds!"))
424 425 426 427 428 429 430 431 432 433 434 435

    def test_bulk_emails_with_unicode_course_image_name(self):
        # Test bulk email with unicode characters in course image name
        course_image = u'在淡水測試.jpg'
        self.course = CourseFactory.create(course_image=course_image)

        num_emails = 1
        self._create_students(num_emails)

        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            get_conn.return_value.send_messages.side_effect = cycle([None])
            self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails)