test_tasks.py 21.5 KB
Newer Older
1
# -*- coding: utf-8 -*-
2 3 4 5 6 7 8 9
"""
Unit tests for LMS instructor-initiated background tasks.

Runs tasks on answers to course problems to validate that code
paths actually work.

"""
import json
10 11
from itertools import chain, cycle, repeat
from smtplib import SMTPAuthenticationError, SMTPConnectError, SMTPDataError, SMTPServerDisconnected
12
from uuid import uuid4
13 14

from boto.exception import AWSConnectionError
15
from boto.ses.exceptions import (
16
    SESAddressBlacklistedError,
17
    SESAddressNotVerifiedError,
18
    SESDailyQuotaExceededError,
19
    SESDomainEndsWithDotError,
20 21
    SESDomainNotConfirmedError,
    SESIdentityNotVerifiedError,
22
    SESIllegalAddressError,
23 24
    SESLocalAddressCharacterError,
    SESMaxSendingRateExceededError
25
)
26
from celery.states import FAILURE, SUCCESS  # pylint: disable=no-name-in-module, import-error
27 28
from django.conf import settings
from django.core.management import call_command
29 30
from mock import Mock, patch
from nose.plugins.attrib import attr
31
from opaque_keys.edx.locator import CourseLocator
32

33
from bulk_email.models import SEND_TO_LEARNERS, SEND_TO_MYSELF, SEND_TO_STAFF, CourseEmail, Optout
34
from bulk_email.tasks import _get_course_email_context
35
from lms.djangoapps.instructor_task.models import InstructorTask
36 37
from lms.djangoapps.instructor_task.subtasks import SubtaskStatus, update_subtask_status
from lms.djangoapps.instructor_task.tasks import send_bulk_course_email
38
from lms.djangoapps.instructor_task.tests.factories import InstructorTaskFactory
39 40
from lms.djangoapps.instructor_task.tests.test_base import InstructorTaskCourseTestCase
from xmodule.modulestore.tests.factories import CourseFactory
41 42 43 44 45 46 47


class TestTaskFailure(Exception):
    """Dummy exception used for unit tests."""
    pass


48 49 50 51 52 53 54 55 56 57 58 59
def my_update_subtask_status(entry_id, current_task_id, new_subtask_status):
    """
    Check whether a subtask has been updated before really updating.

    Check whether a subtask which has been retried
    has had the retry already write its results here before the code
    that was invoking the retry had a chance to update this status.

    This is the norm in "eager" mode (used by tests) where the retry is called
    and run to completion before control is returned to the code that
    invoked the retry.  If the retries eventually end in failure (e.g. due to
    a maximum number of retries being attempted), the "eager" code will return
60 61
    the error for each retry as it is popped off the stack.  We want to just ignore
    the later updates that are called as the result of the earlier retries.
62 63 64 65 66 67 68

    This should not be an issue in production, where status is updated before
    a task is retried, and is then updated afterwards if the retry fails.
    """
    entry = InstructorTask.objects.get(pk=entry_id)
    subtask_dict = json.loads(entry.subtasks)
    subtask_status_info = subtask_dict['status']
69 70 71
    current_subtask_status = SubtaskStatus.from_dict(subtask_status_info[current_task_id])
    current_retry_count = current_subtask_status.get_retry_count()
    new_retry_count = new_subtask_status.get_retry_count()
72 73 74 75
    if current_retry_count <= new_retry_count:
        update_subtask_status(entry_id, current_task_id, new_subtask_status)


76
@attr(shard=3)
77
@patch('bulk_email.models.html_to_text', Mock(return_value='Mocking CourseEmail.text_message', autospec=True))
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
    """Tests instructor task that send bulk email."""

    def setUp(self):
        super(TestBulkEmailInstructorTask, self).setUp()
        self.initialize_course()
        self.instructor = self.create_instructor('instructor')

        # load initial content (since we don't run migrations as part of tests):
        call_command("loaddata", "course_email_template.json")

    def _create_input_entry(self, course_id=None):
        """
        Creates a InstructorTask entry for testing.

        Overrides the base class version in that this creates CourseEmail.
        """
95
        targets = [SEND_TO_MYSELF, SEND_TO_STAFF, SEND_TO_LEARNERS]
96
        course_id = course_id or self.course.id
97
        course_email = CourseEmail.create(
98
            course_id, self.instructor, targets, "Test Subject", "<p>This is a test message</p>"
99
        )
100
        task_input = {'email_id': course_email.id}
101 102 103 104 105 106 107 108 109 110
        task_id = str(uuid4())
        instructor_task = InstructorTaskFactory.create(
            course_id=course_id,
            requester=self.instructor,
            task_input=json.dumps(task_input),
            task_key='dummy value',
            task_id=task_id,
        )
        return instructor_task

111
    def _run_task_with_mock_celery(self, task_class, entry_id, task_id):
112
        """Mock was not needed for some tests, testing to see if it's needed at all."""
113
        task_args = [entry_id, {}]
114
        return task_class.apply(task_args, task_id=task_id).get()
115 116 117 118 119 120 121 122

    def test_email_missing_current_task(self):
        task_entry = self._create_input_entry()
        with self.assertRaises(ValueError):
            send_bulk_course_email(task_entry.id, {})

    def test_email_undefined_course(self):
        # Check that we fail when passing in a course that doesn't exist.
123
        task_entry = self._create_input_entry(course_id=CourseLocator("bogus", "course", "id"))
124 125 126
        with self.assertRaises(ValueError):
            self._run_task_with_mock_celery(send_bulk_course_email, task_entry.id, task_entry.task_id)

127 128 129 130 131 132 133 134 135 136
    def test_bad_task_id_on_update(self):
        task_entry = self._create_input_entry()

        def dummy_update_subtask_status(entry_id, _current_task_id, new_subtask_status):
            """Passes a bad value for task_id to test update_subtask_status"""
            bogus_task_id = "this-is-bogus"
            update_subtask_status(entry_id, bogus_task_id, new_subtask_status)

        with self.assertRaises(ValueError):
            with patch('bulk_email.tasks.update_subtask_status', dummy_update_subtask_status):
137
                send_bulk_course_email(task_entry.id, {})
138

139
    def _create_students(self, num_students):
140 141
        """Create students for testing"""
        return [self.create_student('robot%d' % i) for i in xrange(num_students)]
142

143 144 145 146 147 148
    def _assert_single_subtask_status(self, entry, succeeded, failed=0, skipped=0, retried_nomax=0, retried_withmax=0):
        """Compare counts with 'subtasks' entry in InstructorTask table."""
        subtask_info = json.loads(entry.subtasks)
        # verify subtask-level counts:
        self.assertEquals(subtask_info.get('total'), 1)
        self.assertEquals(subtask_info.get('succeeded'), 1 if succeeded > 0 else 0)
149
        self.assertEquals(subtask_info.get('failed'), 0 if succeeded > 0 else 1)
150
        # verify individual subtask status:
151
        subtask_status_info = subtask_info.get('status')
152 153 154 155
        task_id_list = subtask_status_info.keys()
        self.assertEquals(len(task_id_list), 1)
        task_id = task_id_list[0]
        subtask_status = subtask_status_info.get(task_id)
156
        print "Testing subtask status: {}".format(subtask_status)
157 158 159 160 161 162 163 164
        self.assertEquals(subtask_status.get('task_id'), task_id)
        self.assertEquals(subtask_status.get('attempted'), succeeded + failed)
        self.assertEquals(subtask_status.get('succeeded'), succeeded)
        self.assertEquals(subtask_status.get('skipped'), skipped)
        self.assertEquals(subtask_status.get('failed'), failed)
        self.assertEquals(subtask_status.get('retried_nomax'), retried_nomax)
        self.assertEquals(subtask_status.get('retried_withmax'), retried_withmax)
        self.assertEquals(subtask_status.get('state'), SUCCESS if succeeded > 0 else FAILURE)
165

166 167 168
    def _test_run_with_task(
            self, task_class, action_name, total, succeeded,
            failed=0, skipped=0, retried_nomax=0, retried_withmax=0):
169 170 171
        """Run a task and check the number of emails processed."""
        task_entry = self._create_input_entry()
        parent_status = self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id)
172

173 174 175
        # check return value
        self.assertEquals(parent_status.get('total'), total)
        self.assertEquals(parent_status.get('action_name'), action_name)
176 177

        # compare with task_output entry in InstructorTask table:
178 179 180 181
        entry = InstructorTask.objects.get(id=task_entry.id)
        status = json.loads(entry.task_output)
        self.assertEquals(status.get('attempted'), succeeded + failed)
        self.assertEquals(status.get('succeeded'), succeeded)
182 183
        self.assertEquals(status.get('skipped'), skipped)
        self.assertEquals(status.get('failed'), failed)
184 185 186 187
        self.assertEquals(status.get('total'), total)
        self.assertEquals(status.get('action_name'), action_name)
        self.assertGreater(status.get('duration_ms'), 0)
        self.assertEquals(entry.task_state, SUCCESS)
188
        self._assert_single_subtask_status(entry, succeeded, failed, skipped, retried_nomax, retried_withmax)
189
        return entry
190 191

    def test_successful(self):
192
        # Select number of emails to fit into a single subtask.
193
        num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
194 195
        # We also send email to the instructor:
        self._create_students(num_emails - 1)
196 197 198 199
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            get_conn.return_value.send_messages.side_effect = cycle([None])
            self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails)

200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
    def test_successful_twice(self):
        # Select number of emails to fit into a single subtask.
        num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
        # We also send email to the instructor:
        self._create_students(num_emails - 1)
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            get_conn.return_value.send_messages.side_effect = cycle([None])
            task_entry = self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails)

        # submit the same task a second time, and confirm that it is not run again.
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            get_conn.return_value.send_messages.side_effect = cycle([Exception("This should not happen!")])
            parent_status = self._run_task_with_mock_celery(send_bulk_course_email, task_entry.id, task_entry.task_id)
        self.assertEquals(parent_status.get('total'), num_emails)
        self.assertEquals(parent_status.get('succeeded'), num_emails)
        self.assertEquals(parent_status.get('failed'), 0)

217 218
    def test_unactivated_user(self):
        # Select number of emails to fit into a single subtask.
219
        num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
220 221 222 223 224 225 226 227 228 229
        # We also send email to the instructor:
        students = self._create_students(num_emails - 1)
        # mark a student as not yet having activated their email:
        student = students[0]
        student.is_active = False
        student.save()
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            get_conn.return_value.send_messages.side_effect = cycle([None])
            self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails - 1, num_emails - 1)

230 231
    def test_skipped(self):
        # Select number of emails to fit into a single subtask.
232
        num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
233 234 235 236 237 238 239 240 241 242
        # We also send email to the instructor:
        students = self._create_students(num_emails - 1)
        # have every fourth student optout:
        expected_skipped = int((num_emails + 3) / 4.0)
        expected_succeeds = num_emails - expected_skipped
        for index in range(0, num_emails, 4):
            Optout.objects.create(user=students[index], course_id=self.course.id)
        # mark some students as opting out
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            get_conn.return_value.send_messages.side_effect = cycle([None])
243 244 245
            self._test_run_with_task(
                send_bulk_course_email, 'emailed', num_emails, expected_succeeds, skipped=expected_skipped
            )
246 247 248 249

    def _test_email_address_failures(self, exception):
        """Test that celery handles bad address errors by failing and not retrying."""
        # Select number of emails to fit into a single subtask.
250
        num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
251 252
        # We also send email to the instructor:
        self._create_students(num_emails - 1)
253 254 255
        expected_fails = int((num_emails + 3) / 4.0)
        expected_succeeds = num_emails - expected_fails
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
256 257
            # have every fourth email fail due to some address failure:
            get_conn.return_value.send_messages.side_effect = cycle([exception, None, None, None])
258 259 260
            self._test_run_with_task(
                send_bulk_course_email, 'emailed', num_emails, expected_succeeds, failed=expected_fails
            )
261

262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
    def test_smtp_blacklisted_user(self):
        # Test that celery handles permanent SMTPDataErrors by failing and not retrying.
        self._test_email_address_failures(SMTPDataError(554, "Email address is blacklisted"))

    def test_ses_blacklisted_user(self):
        # Test that celery handles permanent SMTPDataErrors by failing and not retrying.
        self._test_email_address_failures(SESAddressBlacklistedError(554, "Email address is blacklisted"))

    def test_ses_illegal_address(self):
        # Test that celery handles permanent SMTPDataErrors by failing and not retrying.
        self._test_email_address_failures(SESIllegalAddressError(554, "Email address is illegal"))

    def test_ses_local_address_character_error(self):
        # Test that celery handles permanent SMTPDataErrors by failing and not retrying.
        self._test_email_address_failures(SESLocalAddressCharacterError(554, "Email address contains a bad character"))

278 279 280 281
    def test_ses_domain_ends_with_dot(self):
        # Test that celery handles permanent SMTPDataErrors by failing and not retrying.
        self._test_email_address_failures(SESDomainEndsWithDotError(554, "Email address ends with a dot"))

282 283 284 285 286 287 288
    def _test_retry_after_limited_retry_error(self, exception):
        """Test that celery handles connection failures by retrying."""
        # If we want the batch to succeed, we need to send fewer emails
        # than the max retries, so that the max is not triggered.
        num_emails = settings.BULK_EMAIL_MAX_RETRIES
        # We also send email to the instructor:
        self._create_students(num_emails - 1)
289 290 291
        expected_fails = 0
        expected_succeeds = num_emails
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
292 293
            # Have every other mail attempt fail due to disconnection.
            get_conn.return_value.send_messages.side_effect = cycle([exception, None])
294 295 296 297 298 299 300 301 302
            self._test_run_with_task(
                send_bulk_course_email,
                'emailed',
                num_emails,
                expected_succeeds,
                failed=expected_fails,
                retried_withmax=num_emails
            )

303 304 305 306 307 308 309
    def _test_max_retry_limit_causes_failure(self, exception):
        """Test that celery can hit a maximum number of retries."""
        # Doesn't really matter how many recipients, since we expect
        # to fail on the first.
        num_emails = 10
        # We also send email to the instructor:
        self._create_students(num_emails - 1)
310
        expected_fails = num_emails
311 312 313
        expected_succeeds = 0
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            # always fail to connect, triggering repeated retries until limit is hit:
314
            get_conn.return_value.send_messages.side_effect = cycle([exception])
315 316 317 318 319 320 321 322 323
            with patch('bulk_email.tasks.update_subtask_status', my_update_subtask_status):
                self._test_run_with_task(
                    send_bulk_course_email,
                    'emailed',
                    num_emails,
                    expected_succeeds,
                    failed=expected_fails,
                    retried_withmax=(settings.BULK_EMAIL_MAX_RETRIES + 1)
                )
324 325 326 327 328 329 330 331 332 333 334 335 336 337

    def test_retry_after_smtp_disconnect(self):
        self._test_retry_after_limited_retry_error(SMTPServerDisconnected(425, "Disconnecting"))

    def test_max_retry_after_smtp_disconnect(self):
        self._test_max_retry_limit_causes_failure(SMTPServerDisconnected(425, "Disconnecting"))

    def test_retry_after_smtp_connect_error(self):
        self._test_retry_after_limited_retry_error(SMTPConnectError(424, "Bad Connection"))

    def test_max_retry_after_smtp_connect_error(self):
        self._test_max_retry_limit_causes_failure(SMTPConnectError(424, "Bad Connection"))

    def test_retry_after_aws_connect_error(self):
338 339 340
        self._test_retry_after_limited_retry_error(
            AWSConnectionError("Unable to provide secure connection through proxy")
        )
341 342

    def test_max_retry_after_aws_connect_error(self):
343 344 345
        self._test_max_retry_limit_causes_failure(
            AWSConnectionError("Unable to provide secure connection through proxy")
        )
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383

    def test_retry_after_general_error(self):
        self._test_retry_after_limited_retry_error(Exception("This is some random exception."))

    def test_max_retry_after_general_error(self):
        self._test_max_retry_limit_causes_failure(Exception("This is some random exception."))

    def _test_retry_after_unlimited_retry_error(self, exception):
        """Test that celery handles throttling failures by retrying."""
        num_emails = 8
        # We also send email to the instructor:
        self._create_students(num_emails - 1)
        expected_fails = 0
        expected_succeeds = num_emails
        # Note that because celery in eager mode will call retries synchronously,
        # each retry will increase the stack depth.  It turns out that there is a
        # maximum depth at which a RuntimeError is raised ("maximum recursion depth
        # exceeded").  The maximum recursion depth is 90, so
        # num_emails * expected_retries < 90.
        expected_retries = 10
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            # Cycle through N throttling errors followed by a success.
            get_conn.return_value.send_messages.side_effect = cycle(
                chain(repeat(exception, expected_retries), [None])
            )
            self._test_run_with_task(
                send_bulk_course_email,
                'emailed',
                num_emails,
                expected_succeeds,
                failed=expected_fails,
                retried_nomax=(expected_retries * num_emails)
            )

    def test_retry_after_smtp_throttling_error(self):
        self._test_retry_after_unlimited_retry_error(SMTPDataError(455, "Throttling: Sending rate exceeded"))

    def test_retry_after_ses_throttling_error(self):
384 385 386
        self._test_retry_after_unlimited_retry_error(
            SESMaxSendingRateExceededError(455, "Throttling: Sending rate exceeded")
        )
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412

    def _test_immediate_failure(self, exception):
        """Test that celery can hit a maximum number of retries."""
        # Doesn't really matter how many recipients, since we expect
        # to fail on the first.
        num_emails = 10
        # We also send email to the instructor:
        self._create_students(num_emails - 1)
        expected_fails = num_emails
        expected_succeeds = 0
        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            # always fail to connect, triggering repeated retries until limit is hit:
            get_conn.return_value.send_messages.side_effect = cycle([exception])
            self._test_run_with_task(
                send_bulk_course_email,
                'emailed',
                num_emails,
                expected_succeeds,
                failed=expected_fails,
            )

    def test_failure_on_unhandled_smtp(self):
        self._test_immediate_failure(SMTPAuthenticationError(403, "That password doesn't work!"))

    def test_failure_on_ses_quota_exceeded(self):
        self._test_immediate_failure(SESDailyQuotaExceededError(403, "You're done for the day!"))
413 414 415 416 417 418 419 420 421

    def test_failure_on_ses_address_not_verified(self):
        self._test_immediate_failure(SESAddressNotVerifiedError(403, "Who *are* you?"))

    def test_failure_on_ses_identity_not_verified(self):
        self._test_immediate_failure(SESIdentityNotVerifiedError(403, "May I please see an ID!"))

    def test_failure_on_ses_domain_not_confirmed(self):
        self._test_immediate_failure(SESDomainNotConfirmedError(403, "You're out of bounds!"))
422 423 424 425 426 427

    def test_bulk_emails_with_unicode_course_image_name(self):
        # Test bulk email with unicode characters in course image name
        course_image = u'在淡水測試.jpg'
        self.course = CourseFactory.create(course_image=course_image)

428 429 430
        num_emails = 2
        # We also send email to the instructor:
        self._create_students(num_emails - 1)
431 432 433 434

        with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
            get_conn.return_value.send_messages.side_effect = cycle([None])
            self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails)
435 436 437 438 439

    def test_get_course_email_context_has_correct_keys(self):
        result = _get_course_email_context(self.course)
        self.assertIn('course_title', result)
        self.assertIn('course_root', result)
440
        self.assertIn('course_language', result)
441 442 443 444 445 446
        self.assertIn('course_url', result)
        self.assertIn('course_image_url', result)
        self.assertIn('course_end_date', result)
        self.assertIn('account_settings_url', result)
        self.assertIn('email_settings_url', result)
        self.assertIn('platform_name', result)