# -*- coding: utf-8 -*- """ Unit tests for LMS instructor-initiated background tasks helper functions. - Tests that CSV grade report generation works with unicode emails. - Tests all of the existing reports. """ import os import shutil from datetime import datetime import urllib import ddt from freezegun import freeze_time from mock import Mock, patch from nose.plugins.attrib import attr import tempfile import unicodecsv from django.core.urlresolvers import reverse from django.test.utils import override_settings from capa.tests.response_xml_factory import MultipleChoiceResponseXMLFactory from certificates.models import CertificateStatuses, GeneratedCertificate from certificates.tests.factories import GeneratedCertificateFactory, CertificateWhitelistFactory from course_modes.models import CourseMode from courseware.tests.factories import InstructorFactory from lms.djangoapps.instructor_task.tests.test_base import ( InstructorTaskCourseTestCase, TestReportMixin, InstructorTaskModuleTestCase ) from openedx.core.djangoapps.course_groups.models import CourseUserGroupPartitionGroup, CohortMembership from django.conf import settings from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase from pytz import UTC from student.tests.factories import CourseEnrollmentFactory, UserFactory from openedx.core.djangoapps.course_groups.tests.helpers import CohortFactory import openedx.core.djangoapps.user_api.course_tag.api as course_tag_api from openedx.core.djangoapps.user_api.partition_schemes import RandomUserPartitionScheme from shoppingcart.models import Order, PaidCourseRegistration, CourseRegistrationCode, Invoice, \ CourseRegistrationCodeInvoiceItem, InvoiceTransaction, Coupon from student.tests.factories import UserFactory, CourseModeFactory from student.models import CourseEnrollment, CourseEnrollmentAllowed, ManualEnrollmentAudit, ALLOWEDTOENROLL_TO_ENROLLED from lms.djangoapps.verify_student.tests.factories import SoftwareSecurePhotoVerificationFactory from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory from xmodule.partitions.partitions import Group, UserPartition from lms.djangoapps.instructor_task.models import ReportStore from survey.models import SurveyForm, SurveyAnswer from lms.djangoapps.instructor_task.tasks_helper import ( cohort_students_and_upload, upload_problem_responses_csv, upload_grades_csv, upload_problem_grade_report, upload_students_csv, upload_may_enroll_csv, upload_enrollment_report, upload_exec_summary_report, upload_course_survey_report, generate_students_certificates, upload_ora2_data, UPDATE_STATUS_FAILED, UPDATE_STATUS_SUCCEEDED, ) from instructor_analytics.basic import UNAVAILABLE from openedx.core.djangoapps.util.testing import ContentGroupTestCase, TestConditionalContent from teams.tests.factories import CourseTeamFactory, CourseTeamMembershipFactory class InstructorGradeReportTestCase(TestReportMixin, InstructorTaskCourseTestCase): """ Base class for grade report tests. """ def _verify_cell_data_for_user(self, username, course_id, column_header, expected_cell_content): """ Verify cell data in the grades CSV for a particular user. """ with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_grades_csv(None, None, course_id, None, 'graded') self.assertDictContainsSubset({'attempted': 2, 'succeeded': 2, 'failed': 0}, result) report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD') report_csv_filename = report_store.links_for(course_id)[0][0] report_path = report_store.path_to(course_id, report_csv_filename) with report_store.storage.open(report_path) as csv_file: for row in unicodecsv.DictReader(csv_file): if row.get('username') == username: self.assertEqual(row[column_header], expected_cell_content) @ddt.ddt class TestInstructorGradeReport(InstructorGradeReportTestCase): """ Tests that CSV grade report generation works. """ def setUp(self): super(TestInstructorGradeReport, self).setUp() self.course = CourseFactory.create() @ddt.data([u'student@example.com', u'ni\xf1o@example.com']) def test_unicode_emails(self, emails): """ Test that students with unicode characters in emails is handled. """ for i, email in enumerate(emails): self.create_student('student{0}'.format(i), email) self.current_task = Mock() self.current_task.update_state = Mock() with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task') as mock_current_task: mock_current_task.return_value = self.current_task result = upload_grades_csv(None, None, self.course.id, None, 'graded') num_students = len(emails) self.assertDictContainsSubset({'attempted': num_students, 'succeeded': num_students, 'failed': 0}, result) @patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task') @patch('lms.djangoapps.instructor_task.tasks_helper.iterate_grades_for') def test_grading_failure(self, mock_iterate_grades_for, _mock_current_task): """ Test that any grading errors are properly reported in the progress dict and uploaded to the report store. """ # mock an error response from `iterate_grades_for` mock_iterate_grades_for.return_value = [ (self.create_student('username', 'student@example.com'), {}, 'Cannot grade student') ] result = upload_grades_csv(None, None, self.course.id, None, 'graded') self.assertDictContainsSubset({'attempted': 1, 'succeeded': 0, 'failed': 1}, result) report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD') self.assertTrue(any('grade_report_err' in item[0] for item in report_store.links_for(self.course.id))) def test_cohort_data_in_grading(self): """ Test that cohort data is included in grades csv if cohort configuration is enabled for course. """ cohort_groups = ['cohort 1', 'cohort 2'] course = CourseFactory.create(cohort_config={'cohorted': True, 'auto_cohort': True, 'auto_cohort_groups': cohort_groups}) user_1 = 'user_1' user_2 = 'user_2' CourseEnrollment.enroll(UserFactory.create(username=user_1), course.id) CourseEnrollment.enroll(UserFactory.create(username=user_2), course.id) # In auto cohorting a group will be assigned to a user only when user visits a problem # In grading calculation we only add a group in csv if group is already assigned to # user rather than creating a group automatically at runtime self._verify_cell_data_for_user(user_1, course.id, 'Cohort Name', '') self._verify_cell_data_for_user(user_2, course.id, 'Cohort Name', '') def test_unicode_cohort_data_in_grading(self): """ Test that cohorts can contain unicode characters. """ course = CourseFactory.create(cohort_config={'cohorted': True}) # Create users and manually assign cohorts user1 = UserFactory.create(username='user1') user2 = UserFactory.create(username='user2') CourseEnrollment.enroll(user1, course.id) CourseEnrollment.enroll(user2, course.id) professor_x = u'ÞrÖfessÖr X' magneto = u'MàgnëtÖ' cohort1 = CohortFactory(course_id=course.id, name=professor_x) cohort2 = CohortFactory(course_id=course.id, name=magneto) membership1 = CohortMembership(course_user_group=cohort1, user=user1) membership1.save() membership2 = CohortMembership(course_user_group=cohort2, user=user2) membership2.save() self._verify_cell_data_for_user(user1.username, course.id, 'Cohort Name', professor_x) self._verify_cell_data_for_user(user2.username, course.id, 'Cohort Name', magneto) def test_unicode_user_partitions(self): """ Test that user partition groups can contain unicode characters. """ user_groups = [u'ÞrÖfessÖr X', u'MàgnëtÖ'] user_partition = UserPartition( 0, 'x_man', 'X Man', [ Group(0, user_groups[0]), Group(1, user_groups[1]) ] ) # Create course with group configurations self.initialize_course( course_factory_kwargs={ 'user_partitions': [user_partition] } ) _groups = [group.name for group in self.course.user_partitions[0].groups] self.assertEqual(_groups, user_groups) def test_cohort_scheme_partition(self): """ Test that cohort-schemed user partitions are ignored in the grades export. """ # Set up a course with 'cohort' and 'random' user partitions. cohort_scheme_partition = UserPartition( 0, 'Cohort-schemed Group Configuration', 'Group Configuration based on Cohorts', [Group(0, 'Group A'), Group(1, 'Group B')], scheme_id='cohort' ) experiment_group_a = Group(2, u'Expériment Group A') experiment_group_b = Group(3, u'Expériment Group B') experiment_partition = UserPartition( 1, u'Content Expériment Configuration', u'Group Configuration for Content Expériments', [experiment_group_a, experiment_group_b], scheme_id='random' ) course = CourseFactory.create( cohort_config={'cohorted': True}, user_partitions=[cohort_scheme_partition, experiment_partition] ) # Create user_a and user_b which are enrolled in the course # and assigned to experiment_group_a and experiment_group_b, # respectively. user_a = UserFactory.create(username='user_a') user_b = UserFactory.create(username='user_b') CourseEnrollment.enroll(user_a, course.id) CourseEnrollment.enroll(user_b, course.id) course_tag_api.set_course_tag( user_a, course.id, RandomUserPartitionScheme.key_for_partition(experiment_partition), experiment_group_a.id ) course_tag_api.set_course_tag( user_b, course.id, RandomUserPartitionScheme.key_for_partition(experiment_partition), experiment_group_b.id ) # Assign user_a to a group in the 'cohort'-schemed user # partition (by way of a cohort) to verify that the user # partition group does not show up in the "Experiment Group" # cell. cohort_a = CohortFactory.create(course_id=course.id, name=u'Cohørt A', users=[user_a]) CourseUserGroupPartitionGroup( course_user_group=cohort_a, partition_id=cohort_scheme_partition.id, group_id=cohort_scheme_partition.groups[0].id ).save() # Verify that we see user_a and user_b in their respective # content experiment groups, and that we do not see any # content groups. experiment_group_message = u'Experiment Group ({content_experiment})' self._verify_cell_data_for_user( user_a.username, course.id, experiment_group_message.format( content_experiment=experiment_partition.name ), experiment_group_a.name ) self._verify_cell_data_for_user( user_b.username, course.id, experiment_group_message.format( content_experiment=experiment_partition.name ), experiment_group_b.name ) # Make sure cohort info is correct. cohort_name_header = 'Cohort Name' self._verify_cell_data_for_user( user_a.username, course.id, cohort_name_header, cohort_a.name ) self._verify_cell_data_for_user( user_b.username, course.id, cohort_name_header, u'Default Group', ) @patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task') @patch('lms.djangoapps.instructor_task.tasks_helper.iterate_grades_for') def test_unicode_in_csv_header(self, mock_iterate_grades_for, _mock_current_task): """ Tests that CSV grade report works if unicode in headers. """ # mock a response from `iterate_grades_for` mock_iterate_grades_for.return_value = [ ( self.create_student('username', 'student@example.com'), {'section_breakdown': [{'label': u'\u8282\u540e\u9898 01'}], 'percent': 0, 'grade': None}, 'Cannot grade student' ) ] result = upload_grades_csv(None, None, self.course.id, None, 'graded') self.assertDictContainsSubset({'attempted': 1, 'succeeded': 1, 'failed': 0}, result) class TestTeamGradeReport(InstructorGradeReportTestCase): """ Test that teams appear correctly in the grade report when it is enabled for the course. """ def setUp(self): super(TestTeamGradeReport, self).setUp() self.course = CourseFactory.create(teams_configuration={ 'max_size': 2, 'topics': [{'topic-id': 'topic', 'name': 'Topic', 'description': 'A Topic'}] }) self.student1 = UserFactory.create() CourseEnrollment.enroll(self.student1, self.course.id) self.student2 = UserFactory.create() CourseEnrollment.enroll(self.student2, self.course.id) def test_team_in_grade_report(self): self._verify_cell_data_for_user(self.student1.username, self.course.id, 'Team Name', '') def test_correct_team_name_in_grade_report(self): team1 = CourseTeamFactory.create(course_id=self.course.id) CourseTeamMembershipFactory.create(team=team1, user=self.student1) team2 = CourseTeamFactory.create(course_id=self.course.id) CourseTeamMembershipFactory.create(team=team2, user=self.student2) self._verify_cell_data_for_user(self.student1.username, self.course.id, 'Team Name', team1.name) self._verify_cell_data_for_user(self.student2.username, self.course.id, 'Team Name', team2.name) def test_team_deleted(self): team1 = CourseTeamFactory.create(course_id=self.course.id) membership1 = CourseTeamMembershipFactory.create(team=team1, user=self.student1) team2 = CourseTeamFactory.create(course_id=self.course.id) CourseTeamMembershipFactory.create(team=team2, user=self.student2) team1.delete() membership1.delete() self._verify_cell_data_for_user(self.student1.username, self.course.id, 'Team Name', '') self._verify_cell_data_for_user(self.student2.username, self.course.id, 'Team Name', team2.name) class TestProblemResponsesReport(TestReportMixin, InstructorTaskCourseTestCase): """ Tests that generation of CSV files listing student answers to a given problem works. """ def setUp(self): super(TestProblemResponsesReport, self).setUp() self.course = CourseFactory.create() def test_success(self): task_input = {'problem_location': ''} with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): with patch('lms.djangoapps.instructor_task.tasks_helper.list_problem_responses') as patched_data_source: patched_data_source.return_value = [ {'username': 'user0', 'state': u'state0'}, {'username': 'user1', 'state': u'state1'}, {'username': 'user2', 'state': u'state2'}, ] result = upload_problem_responses_csv(None, None, self.course.id, task_input, 'calculated') report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD') links = report_store.links_for(self.course.id) self.assertEquals(len(links), 1) self.assertDictContainsSubset({'attempted': 3, 'succeeded': 3, 'failed': 0}, result) @ddt.ddt @patch.dict('django.conf.settings.FEATURES', {'ENABLE_PAID_COURSE_REGISTRATION': True}) class TestInstructorDetailedEnrollmentReport(TestReportMixin, InstructorTaskCourseTestCase): """ Tests that CSV detailed enrollment generation works. """ def setUp(self): super(TestInstructorDetailedEnrollmentReport, self).setUp() self.course = CourseFactory.create() CourseModeFactory.create( course_id=self.course.id, min_price=50, mode_slug=CourseMode.DEFAULT_SHOPPINGCART_MODE_SLUG ) # create testing invoice 1 self.instructor = InstructorFactory(course_key=self.course.id) self.sale_invoice_1 = Invoice.objects.create( total_amount=1234.32, company_name='Test1', company_contact_name='TestName', company_contact_email='Test@company.com', recipient_name='Testw', recipient_email='test1@test.com', customer_reference_number='2Fwe23S', internal_reference="A", course_id=self.course.id, is_valid=True ) self.invoice_item = CourseRegistrationCodeInvoiceItem.objects.create( invoice=self.sale_invoice_1, qty=1, unit_price=1234.32, course_id=self.course.id ) def test_success(self): self.create_student('student', 'student@example.com') task_input = {'features': []} with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_enrollment_report(None, None, self.course.id, task_input, 'generating_enrollment_report') self.assertDictContainsSubset({'attempted': 1, 'succeeded': 1, 'failed': 0}, result) def test_student_paid_course_enrollment_report(self): """ test to check the paid user enrollment csv report status and enrollment source. """ student = UserFactory() student_cart = Order.get_cart_for_user(student) PaidCourseRegistration.add_to_order(student_cart, self.course.id) student_cart.purchase() task_input = {'features': []} with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_enrollment_report(None, None, self.course.id, task_input, 'generating_enrollment_report') self.assertDictContainsSubset({'attempted': 1, 'succeeded': 1, 'failed': 0}, result) self._verify_cell_data_in_csv(student.username, 'Enrollment Source', 'Credit Card - Individual') self._verify_cell_data_in_csv(student.username, 'Payment Status', 'purchased') def test_student_manually_enrolled_in_detailed_enrollment_source(self): """ test to check the manually enrolled user enrollment report status and enrollment source. """ student = UserFactory() enrollment = CourseEnrollment.enroll(student, self.course.id) ManualEnrollmentAudit.create_manual_enrollment_audit( self.instructor, student.email, ALLOWEDTOENROLL_TO_ENROLLED, 'manually enrolling unenrolled user', enrollment ) task_input = {'features': []} with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_enrollment_report(None, None, self.course.id, task_input, 'generating_enrollment_report') enrollment_source = u'manually enrolled by username: {username}'.format( username=self.instructor.username) self.assertDictContainsSubset({'attempted': 1, 'succeeded': 1, 'failed': 0}, result) self._verify_cell_data_in_csv(student.username, 'Enrollment Source', enrollment_source) self._verify_cell_data_in_csv( student.username, 'Manual (Un)Enrollment Reason', 'manually enrolling unenrolled user' ) self._verify_cell_data_in_csv(student.username, 'Payment Status', 'TBD') def test_student_used_enrollment_code_for_course_enrollment(self): """ test to check the user enrollment source and payment status in the enrollment detailed report """ student = UserFactory() self.client.login(username=student.username, password='test') student_cart = Order.get_cart_for_user(student) paid_course_reg_item = PaidCourseRegistration.add_to_order(student_cart, self.course.id) # update the quantity of the cart item paid_course_reg_item resp = self.client.post(reverse('shoppingcart.views.update_user_cart'), {'ItemId': paid_course_reg_item.id, 'qty': '4'}) self.assertEqual(resp.status_code, 200) student_cart.purchase() course_reg_codes = CourseRegistrationCode.objects.filter(order=student_cart) redeem_url = reverse('register_code_redemption', args=[course_reg_codes[0].code]) response = self.client.get(redeem_url) self.assertEquals(response.status_code, 200) # check button text self.assertIn('Activate Course Enrollment', response.content) response = self.client.post(redeem_url) self.assertEquals(response.status_code, 200) task_input = {'features': []} with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_enrollment_report(None, None, self.course.id, task_input, 'generating_enrollment_report') self.assertDictContainsSubset({'attempted': 1, 'succeeded': 1, 'failed': 0}, result) self._verify_cell_data_in_csv(student.username, 'Enrollment Source', 'Used Registration Code') self._verify_cell_data_in_csv(student.username, 'Payment Status', 'purchased') def test_student_used_invoice_unpaid_enrollment_code_for_course_enrollment(self): """ test to check the user enrollment source and payment status in the enrollment detailed report """ student = UserFactory() self.client.login(username=student.username, password='test') course_registration_code = CourseRegistrationCode( code='abcde', course_id=self.course.id.to_deprecated_string(), created_by=self.instructor, invoice=self.sale_invoice_1, invoice_item=self.invoice_item, mode_slug=CourseMode.DEFAULT_SHOPPINGCART_MODE_SLUG ) course_registration_code.save() redeem_url = reverse('register_code_redemption', args=['abcde']) response = self.client.get(redeem_url) self.assertEquals(response.status_code, 200) # check button text self.assertIn('Activate Course Enrollment', response.content) response = self.client.post(redeem_url) self.assertEquals(response.status_code, 200) task_input = {'features': []} with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_enrollment_report(None, None, self.course.id, task_input, 'generating_enrollment_report') self.assertDictContainsSubset({'attempted': 1, 'succeeded': 1, 'failed': 0}, result) self._verify_cell_data_in_csv(student.username, 'Enrollment Source', 'Used Registration Code') self._verify_cell_data_in_csv(student.username, 'Payment Status', 'Invoice Outstanding') def test_student_used_invoice_paid_enrollment_code_for_course_enrollment(self): """ test to check the user enrollment source and payment status in the enrollment detailed report """ student = UserFactory() self.client.login(username=student.username, password='test') invoice_transaction = InvoiceTransaction( invoice=self.sale_invoice_1, amount=self.sale_invoice_1.total_amount, status='completed', created_by=self.instructor, last_modified_by=self.instructor ) invoice_transaction.save() course_registration_code = CourseRegistrationCode( code='abcde', course_id=self.course.id.to_deprecated_string(), created_by=self.instructor, invoice=self.sale_invoice_1, invoice_item=self.invoice_item, mode_slug=CourseMode.DEFAULT_SHOPPINGCART_MODE_SLUG ) course_registration_code.save() redeem_url = reverse('register_code_redemption', args=['abcde']) response = self.client.get(redeem_url) self.assertEquals(response.status_code, 200) # check button text self.assertIn('Activate Course Enrollment', response.content) response = self.client.post(redeem_url) self.assertEquals(response.status_code, 200) task_input = {'features': []} with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_enrollment_report(None, None, self.course.id, task_input, 'generating_enrollment_report') self.assertDictContainsSubset({'attempted': 1, 'succeeded': 1, 'failed': 0}, result) self._verify_cell_data_in_csv(student.username, 'Enrollment Source', 'Used Registration Code') self._verify_cell_data_in_csv(student.username, 'Payment Status', 'Invoice Paid') def _verify_cell_data_in_csv(self, username, column_header, expected_cell_content): """ Verify that the last ReportStore CSV contains the expected content. """ report_store = ReportStore.from_config(config_name='FINANCIAL_REPORTS') report_csv_filename = report_store.links_for(self.course.id)[0][0] report_path = report_store.path_to(self.course.id, report_csv_filename) with report_store.storage.open(report_path) as csv_file: # Expand the dict reader generator so we don't lose it's content for row in unicodecsv.DictReader(csv_file): if row.get('Username') == username: self.assertEqual(row[column_header], expected_cell_content) @ddt.ddt class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase): """ Test that the problem CSV generation works. """ def setUp(self): super(TestProblemGradeReport, self).setUp() self.initialize_course() # Add unicode data to CSV even though unicode usernames aren't # technically possible in openedx. self.student_1 = self.create_student(u'üser_1') self.student_2 = self.create_student(u'üser_2') self.csv_header_row = [u'Student ID', u'Email', u'Username', u'Final Grade'] @patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task') def test_no_problems(self, _get_current_task): """ Verify that we see no grade information for a course with no graded problems. """ result = upload_problem_grade_report(None, None, self.course.id, None, 'graded') self.assertDictContainsSubset({'action_name': 'graded', 'attempted': 2, 'succeeded': 2, 'failed': 0}, result) self.verify_rows_in_csv([ dict(zip( self.csv_header_row, [unicode(self.student_1.id), self.student_1.email, self.student_1.username, '0.0'] )), dict(zip( self.csv_header_row, [unicode(self.student_2.id), self.student_2.email, self.student_2.username, '0.0'] )) ]) @patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task') def test_single_problem(self, _get_current_task): vertical = ItemFactory.create( parent_location=self.problem_section.location, category='vertical', metadata={'graded': True}, display_name='Problem Vertical' ) self.define_option_problem(u'Problem1', parent=vertical) self.submit_student_answer(self.student_1.username, u'Problem1', ['Option 1']) result = upload_problem_grade_report(None, None, self.course.id, None, 'graded') self.assertDictContainsSubset({'action_name': 'graded', 'attempted': 2, 'succeeded': 2, 'failed': 0}, result) problem_name = u'Homework 1: Problem - Problem1' header_row = self.csv_header_row + [problem_name + ' (Earned)', problem_name + ' (Possible)'] self.verify_rows_in_csv([ dict(zip( header_row, [ unicode(self.student_1.id), self.student_1.email, self.student_1.username, '0.01', '1.0', '2.0'] )), dict(zip( header_row, [ unicode(self.student_2.id), self.student_2.email, self.student_2.username, '0.0', '0.0', '2' ] )) ]) @patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task') @patch('lms.djangoapps.instructor_task.tasks_helper.iterate_grades_for') @ddt.data(u'Cannot grade student', '') def test_grading_failure(self, error_message, mock_iterate_grades_for, _mock_current_task): """ Test that any grading errors are properly reported in the progress dict and uploaded to the report store. """ # mock an error response from `iterate_grades_for` student = self.create_student(u'username', u'student@example.com') mock_iterate_grades_for.return_value = [ (student, {}, error_message) ] result = upload_problem_grade_report(None, None, self.course.id, None, 'graded') self.assertDictContainsSubset({'attempted': 1, 'succeeded': 0, 'failed': 1}, result) report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD') self.assertTrue(any('grade_report_err' in item[0] for item in report_store.links_for(self.course.id))) self.verify_rows_in_csv([ { u'Student ID': unicode(student.id), u'Email': student.email, u'Username': student.username, u'error_msg': error_message if error_message else "Unknown error" } ]) @attr(shard=3) class TestProblemReportSplitTestContent(TestReportMixin, TestConditionalContent, InstructorTaskModuleTestCase): """ Test the problem report on a course that has split tests. """ OPTION_1 = 'Option 1' OPTION_2 = 'Option 2' def setUp(self): super(TestProblemReportSplitTestContent, self).setUp() self.problem_a_url = u'problem_a_url' self.problem_b_url = u'problem_b_url' self.define_option_problem(self.problem_a_url, parent=self.vertical_a) self.define_option_problem(self.problem_b_url, parent=self.vertical_b) def test_problem_grade_report(self): """ Test that we generate the correct grade report when dealing with A/B tests. In order to verify that the behavior of the grade report is correct, we submit answers for problems that the student won't have access to. A/B tests won't restrict access to the problems, but it should not show up in that student's course tree when generating the grade report, hence the N/A's in the grade report. """ # student A will get 100%, student B will get 50% because # OPTION_1 is the correct option, and OPTION_2 is the # incorrect option self.submit_student_answer(self.student_a.username, self.problem_a_url, [self.OPTION_1, self.OPTION_1]) self.submit_student_answer(self.student_a.username, self.problem_b_url, [self.OPTION_1, self.OPTION_1]) self.submit_student_answer(self.student_b.username, self.problem_a_url, [self.OPTION_1, self.OPTION_2]) self.submit_student_answer(self.student_b.username, self.problem_b_url, [self.OPTION_1, self.OPTION_2]) with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_problem_grade_report(None, None, self.course.id, None, 'graded') self.assertDictContainsSubset( {'action_name': 'graded', 'attempted': 2, 'succeeded': 2, 'failed': 0}, result ) problem_names = [u'Homework 1: Problem - problem_a_url', u'Homework 1: Problem - problem_b_url'] header_row = [u'Student ID', u'Email', u'Username', u'Final Grade'] for problem in problem_names: header_row += [problem + ' (Earned)', problem + ' (Possible)'] self.verify_rows_in_csv([ dict(zip( header_row, [ unicode(self.student_a.id), self.student_a.email, self.student_a.username, u'1.0', u'2.0', u'2.0', u'N/A', u'N/A' ] )), dict(zip( header_row, [ unicode(self.student_b.id), self.student_b.email, self.student_b.username, u'0.5', u'N/A', u'N/A', u'1.0', u'2.0' ] )) ]) def test_problem_grade_report_valid_columns_order(self): """ Test that in the CSV grade report columns are placed in the proper order """ grader_num = 7 self.course = CourseFactory.create( grading_policy={ "GRADER": [{ "type": "Homework %d" % i, "min_count": 1, "drop_count": 0, "short_label": "HW %d" % i, "weight": 1.0 } for i in xrange(1, grader_num)] } ) # Create users self.student_a = UserFactory.create(username='student_a', email='student_a@example.com') CourseEnrollmentFactory.create(user=self.student_a, course_id=self.course.id) self.student_b = UserFactory.create(username='student_b', email='student_b@example.com') CourseEnrollmentFactory.create(user=self.student_b, course_id=self.course.id) problem_vertical_list = [] for i in xrange(1, grader_num): chapter_name = 'Chapter %d' % i problem_section_name = 'Problem section %d' % i problem_section_format = 'Homework %d' % i problem_vertical_name = 'Problem Unit %d' % i chapter = ItemFactory.create(parent_location=self.course.location, display_name=chapter_name) # Add a sequence to the course to which the problems can be added problem_section = ItemFactory.create(parent_location=chapter.location, category='sequential', metadata={'graded': True, 'format': problem_section_format}, display_name=problem_section_name) # Create a vertical problem_vertical = ItemFactory.create( parent_location=problem_section.location, category='vertical', display_name=problem_vertical_name ) problem_vertical_list.append(problem_vertical) problem_names = [] for i in xrange(1, grader_num): problem_url = 'test_problem_%d' % i self.define_option_problem(problem_url, parent=problem_vertical_list[i - 1]) title = 'Homework %d 1: Problem section %d - %s' % (i, i, problem_url) problem_names.append(title) header_row = [u'Student ID', u'Email', u'Username', u'Final Grade'] for problem in problem_names: header_row += [problem + ' (Earned)', problem + ' (Possible)'] with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): upload_problem_grade_report(None, None, self.course.id, None, 'graded') self.assertEquals(self.get_csv_row_with_headers(), header_row) class TestProblemReportCohortedContent(TestReportMixin, ContentGroupTestCase, InstructorTaskModuleTestCase): """ Test the problem report on a course that has cohorted content. """ def setUp(self): super(TestProblemReportCohortedContent, self).setUp() # construct cohorted problems to work on. self.add_course_content() vertical = ItemFactory.create( parent_location=self.problem_section.location, category='vertical', metadata={'graded': True}, display_name='Problem Vertical' ) self.define_option_problem( u"Problem0", parent=vertical, group_access={self.course.user_partitions[0].id: [self.course.user_partitions[0].groups[0].id]} ) self.define_option_problem( u"Problem1", parent=vertical, group_access={self.course.user_partitions[0].id: [self.course.user_partitions[0].groups[1].id]} ) def _format_user_grade(self, header_row, user, grade): """ Helper method that format the user grade Args: header_row(list): header row of csv containing Student ID, Email, Username etc user(object): Django user object grade(list): Users' grade list """ return dict(zip( header_row, [ unicode(user.id), user.email, user.username, ] + grade )) def test_cohort_content(self): self.submit_student_answer(self.alpha_user.username, u'Problem0', ['Option 1', 'Option 1']) resp = self.submit_student_answer(self.alpha_user.username, u'Problem1', ['Option 1', 'Option 1']) self.assertEqual(resp.status_code, 404) resp = self.submit_student_answer(self.beta_user.username, u'Problem0', ['Option 1', 'Option 2']) self.assertEqual(resp.status_code, 404) self.submit_student_answer(self.beta_user.username, u'Problem1', ['Option 1', 'Option 2']) with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_problem_grade_report(None, None, self.course.id, None, 'graded') self.assertDictContainsSubset( {'action_name': 'graded', 'attempted': 4, 'succeeded': 4, 'failed': 0}, result ) problem_names = [u'Homework 1: Problem - Problem0', u'Homework 1: Problem - Problem1'] header_row = [u'Student ID', u'Email', u'Username', u'Final Grade'] for problem in problem_names: header_row += [problem + ' (Earned)', problem + ' (Possible)'] user_grades = [ {'user': self.staff_user, 'grade': [u'0.0', u'N/A', u'N/A', u'N/A', u'N/A']}, {'user': self.alpha_user, 'grade': [u'1.0', u'2.0', u'2.0', u'N/A', u'N/A']}, {'user': self.beta_user, 'grade': [u'0.5', u'N/A', u'N/A', u'1.0', u'2.0']}, {'user': self.non_cohorted_user, 'grade': [u'0.0', u'N/A', u'N/A', u'N/A', u'N/A']}, ] # Verify generated grades and expected grades match expected_grades = [self._format_user_grade(header_row, **user_grade) for user_grade in user_grades] self.verify_rows_in_csv(expected_grades) @ddt.ddt class TestExecutiveSummaryReport(TestReportMixin, InstructorTaskCourseTestCase): """ Tests that Executive Summary report generation works. """ def setUp(self): super(TestExecutiveSummaryReport, self).setUp() self.course = CourseFactory.create() CourseModeFactory.create( course_id=self.course.id, min_price=50, mode_slug=CourseMode.DEFAULT_SHOPPINGCART_MODE_SLUG ) self.instructor = InstructorFactory(course_key=self.course.id) self.student1 = UserFactory() self.student2 = UserFactory() self.student1_cart = Order.get_cart_for_user(self.student1) self.student2_cart = Order.get_cart_for_user(self.student2) self.sale_invoice_1 = Invoice.objects.create( total_amount=1234.32, company_name='Test1', company_contact_name='TestName', company_contact_email='Test@company.com', recipient_name='Testw', recipient_email='test1@test.com', customer_reference_number='2Fwe23S', internal_reference="A", course_id=self.course.id, is_valid=True ) InvoiceTransaction.objects.create( invoice=self.sale_invoice_1, amount=self.sale_invoice_1.total_amount, status='completed', created_by=self.instructor, last_modified_by=self.instructor ) self.invoice_item = CourseRegistrationCodeInvoiceItem.objects.create( invoice=self.sale_invoice_1, qty=10, unit_price=1234.32, course_id=self.course.id ) for i in range(5): coupon = Coupon( code='coupon{0}'.format(i), description='test_description', course_id=self.course.id, percentage_discount='{0}'.format(i), created_by=self.instructor, is_active=True, ) coupon.save() def test_successfully_generate_executive_summary_report(self): """ Test that successfully generates the executive summary report. """ task_input = {'features': []} with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_exec_summary_report( None, None, self.course.id, task_input, 'generating executive summary report' ) ReportStore.from_config(config_name='FINANCIAL_REPORTS') self.assertDictContainsSubset({'attempted': 1, 'succeeded': 1, 'failed': 0}, result) def students_purchases(self): """ Students purchases the courses using enrollment and coupon codes. """ self.client.login(username=self.student1.username, password='test') paid_course_reg_item = PaidCourseRegistration.add_to_order(self.student1_cart, self.course.id) # update the quantity of the cart item paid_course_reg_item resp = self.client.post(reverse('shoppingcart.views.update_user_cart'), { 'ItemId': paid_course_reg_item.id, 'qty': '4' }) self.assertEqual(resp.status_code, 200) # apply the coupon code to the item in the cart resp = self.client.post(reverse('shoppingcart.views.use_code'), {'code': 'coupon1'}) self.assertEqual(resp.status_code, 200) self.student1_cart.purchase() course_reg_codes = CourseRegistrationCode.objects.filter(order=self.student1_cart) redeem_url = reverse('register_code_redemption', args=[course_reg_codes[0].code]) response = self.client.get(redeem_url) self.assertEquals(response.status_code, 200) # check button text self.assertIn('Activate Course Enrollment', response.content) response = self.client.post(redeem_url) self.assertEquals(response.status_code, 200) self.client.login(username=self.student2.username, password='test') PaidCourseRegistration.add_to_order(self.student2_cart, self.course.id) # apply the coupon code to the item in the cart resp = self.client.post(reverse('shoppingcart.views.use_code'), {'code': 'coupon1'}) self.assertEqual(resp.status_code, 200) self.student2_cart.purchase() @patch.dict('django.conf.settings.FEATURES', {'ENABLE_PAID_COURSE_REGISTRATION': True}) def test_generate_executive_summary_report(self): """ test to generate executive summary report and then test the report authenticity. """ self.students_purchases() task_input = {'features': []} with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_exec_summary_report( None, None, self.course.id, task_input, 'generating executive summary report' ) report_store = ReportStore.from_config(config_name='FINANCIAL_REPORTS') expected_data = [ 'Gross Revenue Collected', '$1481.82', 'Gross Revenue Pending', '$0.00', 'Average Price per Seat', '$296.36', 'Number of seats purchased using coupon codes', '<td>2</td>' ] self.assertDictContainsSubset({'attempted': 1, 'succeeded': 1, 'failed': 0}, result) self._verify_html_file_report(report_store, expected_data) def _verify_html_file_report(self, report_store, expected_data): """ Verify grade report data. """ report_html_filename = report_store.links_for(self.course.id)[0][0] report_path = report_store.path_to(self.course.id, report_html_filename) with report_store.storage.open(report_path) as html_file: html_file_data = html_file.read() for data in expected_data: self.assertIn(data, html_file_data) @ddt.ddt class TestCourseSurveyReport(TestReportMixin, InstructorTaskCourseTestCase): """ Tests that Course Survey report generation works. """ def setUp(self): super(TestCourseSurveyReport, self).setUp() self.course = CourseFactory.create() self.question1 = "question1" self.question2 = "question2" self.question3 = "question3" self.answer1 = "answer1" self.answer2 = "answer2" self.answer3 = "answer3" self.student1 = UserFactory() self.student2 = UserFactory() self.test_survey_name = 'TestSurvey' self.test_form = '<input name="field1"></input>' self.survey_form = SurveyForm.create(self.test_survey_name, self.test_form) self.survey1 = SurveyAnswer.objects.create(user=self.student1, form=self.survey_form, course_key=self.course.id, field_name=self.question1, field_value=self.answer1) self.survey2 = SurveyAnswer.objects.create(user=self.student1, form=self.survey_form, course_key=self.course.id, field_name=self.question2, field_value=self.answer2) self.survey3 = SurveyAnswer.objects.create(user=self.student2, form=self.survey_form, course_key=self.course.id, field_name=self.question1, field_value=self.answer3) self.survey4 = SurveyAnswer.objects.create(user=self.student2, form=self.survey_form, course_key=self.course.id, field_name=self.question2, field_value=self.answer2) self.survey5 = SurveyAnswer.objects.create(user=self.student2, form=self.survey_form, course_key=self.course.id, field_name=self.question3, field_value=self.answer1) def test_successfully_generate_course_survey_report(self): """ Test that successfully generates the course survey report. """ task_input = {'features': []} with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_course_survey_report( None, None, self.course.id, task_input, 'generating course survey report' ) self.assertDictContainsSubset({'attempted': 2, 'succeeded': 2, 'failed': 0}, result) @patch.dict('django.conf.settings.FEATURES', {'ENABLE_PAID_COURSE_REGISTRATION': True}) def test_generate_course_survey_report(self): """ test to generate course survey report and then test the report authenticity. """ task_input = {'features': []} with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_course_survey_report( None, None, self.course.id, task_input, 'generating course survey report' ) report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD') header_row = ",".join(['User ID', 'User Name', 'Email', self.question1, self.question2, self.question3]) student1_row = ",".join([ str(self.student1.id), # pylint: disable=no-member self.student1.username, self.student1.email, self.answer1, self.answer2 ]) student2_row = ",".join([ str(self.student2.id), # pylint: disable=no-member self.student2.username, self.student2.email, self.answer3, self.answer2, self.answer1 ]) expected_data = [header_row, student1_row, student2_row] self.assertDictContainsSubset({'attempted': 2, 'succeeded': 2, 'failed': 0}, result) self._verify_csv_file_report(report_store, expected_data) def _verify_csv_file_report(self, report_store, expected_data): """ Verify course survey data. """ report_csv_filename = report_store.links_for(self.course.id)[0][0] report_path = report_store.path_to(self.course.id, report_csv_filename) with report_store.storage.open(report_path) as csv_file: csv_file_data = csv_file.read() for data in expected_data: self.assertIn(data, csv_file_data) @ddt.ddt class TestStudentReport(TestReportMixin, InstructorTaskCourseTestCase): """ Tests that CSV student profile report generation works. """ def setUp(self): super(TestStudentReport, self).setUp() self.course = CourseFactory.create() def test_success(self): self.create_student('student', 'student@example.com') task_input = {'features': []} with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_students_csv(None, None, self.course.id, task_input, 'calculated') report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD') links = report_store.links_for(self.course.id) self.assertEquals(len(links), 1) self.assertDictContainsSubset({'attempted': 1, 'succeeded': 1, 'failed': 0}, result) @ddt.data([u'student', u'student\xec']) def test_unicode_usernames(self, students): """ Test that students with unicode characters in their usernames are handled. """ for i, student in enumerate(students): self.create_student(username=student, email='student{0}@example.com'.format(i)) self.current_task = Mock() self.current_task.update_state = Mock() task_input = { 'features': [ 'id', 'username', 'name', 'email', 'language', 'location', 'year_of_birth', 'gender', 'level_of_education', 'mailing_address', 'goals' ] } with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task') as mock_current_task: mock_current_task.return_value = self.current_task result = upload_students_csv(None, None, self.course.id, task_input, 'calculated') # This assertion simply confirms that the generation completed with no errors num_students = len(students) self.assertDictContainsSubset({'attempted': num_students, 'succeeded': num_students, 'failed': 0}, result) class TestTeamStudentReport(TestReportMixin, InstructorTaskCourseTestCase): "Test the student report when including teams information. " def setUp(self): super(TestTeamStudentReport, self).setUp() self.course = CourseFactory.create(teams_configuration={ 'max_size': 2, 'topics': [{'topic-id': 'topic', 'name': 'Topic', 'description': 'A Topic'}] }) self.student1 = UserFactory.create() CourseEnrollment.enroll(self.student1, self.course.id) self.student2 = UserFactory.create() CourseEnrollment.enroll(self.student2, self.course.id) def _generate_and_verify_teams_column(self, username, expected_team): """ Run the upload_students_csv task and verify that the correct team was added to the CSV. """ current_task = Mock() current_task.update_state = Mock() task_input = { 'features': [ 'id', 'username', 'name', 'email', 'language', 'location', 'year_of_birth', 'gender', 'level_of_education', 'mailing_address', 'goals', 'team' ] } with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task') as mock_current_task: mock_current_task.return_value = current_task result = upload_students_csv(None, None, self.course.id, task_input, 'calculated') self.assertDictContainsSubset({'attempted': 2, 'succeeded': 2, 'failed': 0}, result) report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD') report_csv_filename = report_store.links_for(self.course.id)[0][0] report_path = report_store.path_to(self.course.id, report_csv_filename) with report_store.storage.open(report_path) as csv_file: for row in unicodecsv.DictReader(csv_file): if row.get('username') == username: self.assertEqual(row['team'], expected_team) def test_team_column_no_teams(self): self._generate_and_verify_teams_column(self.student1.username, UNAVAILABLE) self._generate_and_verify_teams_column(self.student2.username, UNAVAILABLE) def test_team_column_with_teams(self): team1 = CourseTeamFactory.create(course_id=self.course.id) CourseTeamMembershipFactory.create(team=team1, user=self.student1) team2 = CourseTeamFactory.create(course_id=self.course.id) CourseTeamMembershipFactory.create(team=team2, user=self.student2) self._generate_and_verify_teams_column(self.student1.username, team1.name) self._generate_and_verify_teams_column(self.student2.username, team2.name) def test_team_column_with_deleted_team(self): team1 = CourseTeamFactory.create(course_id=self.course.id) membership1 = CourseTeamMembershipFactory.create(team=team1, user=self.student1) team2 = CourseTeamFactory.create(course_id=self.course.id) CourseTeamMembershipFactory.create(team=team2, user=self.student2) team1.delete() membership1.delete() self._generate_and_verify_teams_column(self.student1.username, UNAVAILABLE) self._generate_and_verify_teams_column(self.student2.username, team2.name) @ddt.ddt class TestListMayEnroll(TestReportMixin, InstructorTaskCourseTestCase): """ Tests that generation of CSV files containing information about students who may enroll in a given course (but have not signed up for it yet) works. """ def _create_enrollment(self, email): "Factory method for creating CourseEnrollmentAllowed objects." return CourseEnrollmentAllowed.objects.create( email=email, course_id=self.course.id ) def setUp(self): super(TestListMayEnroll, self).setUp() self.course = CourseFactory.create() def test_success(self): self._create_enrollment('user@example.com') task_input = {'features': []} with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_may_enroll_csv(None, None, self.course.id, task_input, 'calculated') report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD') links = report_store.links_for(self.course.id) self.assertEquals(len(links), 1) self.assertDictContainsSubset({'attempted': 1, 'succeeded': 1, 'failed': 0}, result) def test_unicode_email_addresses(self): """ Test handling of unicode characters in email addresses of students who may enroll in a course. """ enrollments = [u'student@example.com', u'ni\xf1o@example.com'] for email in enrollments: self._create_enrollment(email) task_input = {'features': ['email']} with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): result = upload_may_enroll_csv(None, None, self.course.id, task_input, 'calculated') # This assertion simply confirms that the generation completed with no errors num_enrollments = len(enrollments) self.assertDictContainsSubset({'attempted': num_enrollments, 'succeeded': num_enrollments, 'failed': 0}, result) class MockDefaultStorage(object): """Mock django's DefaultStorage""" def __init__(self): pass def open(self, file_name): """Mock out DefaultStorage.open with standard python open""" return open(file_name) @patch('lms.djangoapps.instructor_task.tasks_helper.DefaultStorage', new=MockDefaultStorage) class TestCohortStudents(TestReportMixin, InstructorTaskCourseTestCase): """ Tests that bulk student cohorting works. """ def setUp(self): super(TestCohortStudents, self).setUp() self.course = CourseFactory.create() self.cohort_1 = CohortFactory(course_id=self.course.id, name='Cohort 1') self.cohort_2 = CohortFactory(course_id=self.course.id, name='Cohort 2') self.student_1 = self.create_student(username=u'student_1\xec', email='student_1@example.com') self.student_2 = self.create_student(username='student_2', email='student_2@example.com') self.csv_header_row = ['Cohort Name', 'Exists', 'Students Added', 'Students Not Found'] def _cohort_students_and_upload(self, csv_data): """ Call `cohort_students_and_upload` with a file generated from `csv_data`. """ with tempfile.NamedTemporaryFile() as temp_file: temp_file.write(csv_data.encode('utf-8')) temp_file.flush() with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): return cohort_students_and_upload(None, None, self.course.id, {'file_name': temp_file.name}, 'cohorted') def test_username(self): result = self._cohort_students_and_upload( u'username,email,cohort\n' u'student_1\xec,,Cohort 1\n' u'student_2,,Cohort 2' ) self.assertDictContainsSubset({'total': 2, 'attempted': 2, 'succeeded': 2, 'failed': 0}, result) self.verify_rows_in_csv( [ dict(zip(self.csv_header_row, ['Cohort 1', 'True', '1', ''])), dict(zip(self.csv_header_row, ['Cohort 2', 'True', '1', ''])), ], verify_order=False ) def test_email(self): result = self._cohort_students_and_upload( 'username,email,cohort\n' ',student_1@example.com,Cohort 1\n' ',student_2@example.com,Cohort 2' ) self.assertDictContainsSubset({'total': 2, 'attempted': 2, 'succeeded': 2, 'failed': 0}, result) self.verify_rows_in_csv( [ dict(zip(self.csv_header_row, ['Cohort 1', 'True', '1', ''])), dict(zip(self.csv_header_row, ['Cohort 2', 'True', '1', ''])), ], verify_order=False ) def test_username_and_email(self): result = self._cohort_students_and_upload( u'username,email,cohort\n' u'student_1\xec,student_1@example.com,Cohort 1\n' u'student_2,student_2@example.com,Cohort 2' ) self.assertDictContainsSubset({'total': 2, 'attempted': 2, 'succeeded': 2, 'failed': 0}, result) self.verify_rows_in_csv( [ dict(zip(self.csv_header_row, ['Cohort 1', 'True', '1', ''])), dict(zip(self.csv_header_row, ['Cohort 2', 'True', '1', ''])), ], verify_order=False ) def test_prefer_email(self): """ Test that `cohort_students_and_upload` greedily prefers 'email' over 'username' when identifying the user. This means that if a correct email is present, an incorrect or non-matching username will simply be ignored. """ result = self._cohort_students_and_upload( u'username,email,cohort\n' u'student_1\xec,student_1@example.com,Cohort 1\n' # valid username and email u'Invalid,student_2@example.com,Cohort 2' # invalid username, valid email ) self.assertDictContainsSubset({'total': 2, 'attempted': 2, 'succeeded': 2, 'failed': 0}, result) self.verify_rows_in_csv( [ dict(zip(self.csv_header_row, ['Cohort 1', 'True', '1', ''])), dict(zip(self.csv_header_row, ['Cohort 2', 'True', '1', ''])), ], verify_order=False ) def test_non_existent_user(self): result = self._cohort_students_and_upload( 'username,email,cohort\n' 'Invalid,,Cohort 1\n' 'student_2,also_fake@bad.com,Cohort 2' ) self.assertDictContainsSubset({'total': 2, 'attempted': 2, 'succeeded': 0, 'failed': 2}, result) self.verify_rows_in_csv( [ dict(zip(self.csv_header_row, ['Cohort 1', 'True', '0', 'Invalid'])), dict(zip(self.csv_header_row, ['Cohort 2', 'True', '0', 'also_fake@bad.com'])), ], verify_order=False ) def test_non_existent_cohort(self): result = self._cohort_students_and_upload( 'username,email,cohort\n' ',student_1@example.com,Does Not Exist\n' 'student_2,,Cohort 2' ) self.assertDictContainsSubset({'total': 2, 'attempted': 2, 'succeeded': 1, 'failed': 1}, result) self.verify_rows_in_csv( [ dict(zip(self.csv_header_row, ['Does Not Exist', 'False', '0', ''])), dict(zip(self.csv_header_row, ['Cohort 2', 'True', '1', ''])), ], verify_order=False ) def test_too_few_commas(self): """ A CSV file may be malformed and lack traling commas at the end of a row. In this case, those cells take on the value None by the CSV parser. Make sure we handle None values appropriately. i.e.: header_1,header_2,header_3 val_1,val_2,val_3 <- good row val_1,, <- good row val_1 <- bad row; no trailing commas to indicate empty rows """ result = self._cohort_students_and_upload( u'username,email,cohort\n' u'student_1\xec,\n' u'student_2' ) self.assertDictContainsSubset({'total': 2, 'attempted': 2, 'succeeded': 0, 'failed': 2}, result) self.verify_rows_in_csv( [ dict(zip(self.csv_header_row, ['', 'False', '0', ''])), ], verify_order=False ) def test_only_header_row(self): result = self._cohort_students_and_upload( u'username,email,cohort' ) self.assertDictContainsSubset({'total': 0, 'attempted': 0, 'succeeded': 0, 'failed': 0}, result) self.verify_rows_in_csv([]) def test_carriage_return(self): """ Test that we can handle carriage returns in our file. """ result = self._cohort_students_and_upload( u'username,email,cohort\r' u'student_1\xec,,Cohort 1\r' u'student_2,,Cohort 2' ) self.assertDictContainsSubset({'total': 2, 'attempted': 2, 'succeeded': 2, 'failed': 0}, result) self.verify_rows_in_csv( [ dict(zip(self.csv_header_row, ['Cohort 1', 'True', '1', ''])), dict(zip(self.csv_header_row, ['Cohort 2', 'True', '1', ''])), ], verify_order=False ) def test_carriage_return_line_feed(self): """ Test that we can handle carriage returns and line feeds in our file. """ result = self._cohort_students_and_upload( u'username,email,cohort\r\n' u'student_1\xec,,Cohort 1\r\n' u'student_2,,Cohort 2' ) self.assertDictContainsSubset({'total': 2, 'attempted': 2, 'succeeded': 2, 'failed': 0}, result) self.verify_rows_in_csv( [ dict(zip(self.csv_header_row, ['Cohort 1', 'True', '1', ''])), dict(zip(self.csv_header_row, ['Cohort 2', 'True', '1', ''])), ], verify_order=False ) def test_move_users_to_new_cohort(self): membership1 = CohortMembership(course_user_group=self.cohort_1, user=self.student_1) membership1.save() membership2 = CohortMembership(course_user_group=self.cohort_2, user=self.student_2) membership2.save() result = self._cohort_students_and_upload( u'username,email,cohort\n' u'student_1\xec,,Cohort 2\n' u'student_2,,Cohort 1' ) self.assertDictContainsSubset({'total': 2, 'attempted': 2, 'succeeded': 2, 'failed': 0}, result) self.verify_rows_in_csv( [ dict(zip(self.csv_header_row, ['Cohort 1', 'True', '1', ''])), dict(zip(self.csv_header_row, ['Cohort 2', 'True', '1', ''])), ], verify_order=False ) def test_move_users_to_same_cohort(self): membership1 = CohortMembership(course_user_group=self.cohort_1, user=self.student_1) membership1.save() membership2 = CohortMembership(course_user_group=self.cohort_2, user=self.student_2) membership2.save() result = self._cohort_students_and_upload( u'username,email,cohort\n' u'student_1\xec,,Cohort 1\n' u'student_2,,Cohort 2' ) self.assertDictContainsSubset({'total': 2, 'attempted': 2, 'skipped': 2, 'failed': 0}, result) self.verify_rows_in_csv( [ dict(zip(self.csv_header_row, ['Cohort 1', 'True', '0', ''])), dict(zip(self.csv_header_row, ['Cohort 2', 'True', '0', ''])), ], verify_order=False ) @ddt.ddt @patch('lms.djangoapps.instructor_task.tasks_helper.DefaultStorage', new=MockDefaultStorage) class TestGradeReportEnrollmentAndCertificateInfo(TestReportMixin, InstructorTaskModuleTestCase): """ Test that grade report has correct user enrolment, verification, and certificate information. """ def setUp(self): super(TestGradeReportEnrollmentAndCertificateInfo, self).setUp() self.initialize_course() self.create_problem() self.columns_to_check = [ 'Enrollment Track', 'Verification Status', 'Certificate Eligible', 'Certificate Delivered', 'Certificate Type' ] def create_problem(self, problem_display_name='test_problem', parent=None): """ Create a multiple choice response problem. """ if parent is None: parent = self.problem_section factory = MultipleChoiceResponseXMLFactory() args = {'choices': [False, True, False]} problem_xml = factory.build_xml(**args) ItemFactory.create( parent_location=parent.location, parent=parent, category="problem", display_name=problem_display_name, data=problem_xml ) def user_is_embargoed(self, user, is_embargoed): """ Set a users emabargo state. """ user_profile = UserFactory(username=user.username, email=user.email).profile user_profile.allow_certificate = not is_embargoed user_profile.save() def _verify_csv_data(self, username, expected_data): """ Verify grade report data. """ with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'): upload_grades_csv(None, None, self.course.id, None, 'graded') report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD') report_csv_filename = report_store.links_for(self.course.id)[0][0] report_path = report_store.path_to(self.course.id, report_csv_filename) with report_store.storage.open(report_path) as csv_file: for row in unicodecsv.DictReader(csv_file): if row.get('username') == username: csv_row_data = [row[column] for column in self.columns_to_check] self.assertEqual(csv_row_data, expected_data) def _create_user_data(self, user_enroll_mode, has_passed, whitelisted, is_embargoed, verification_status, certificate_status, certificate_mode): """ Create user data to be used during grade report generation. """ user = self.create_student('u1', mode=user_enroll_mode) if has_passed: self.submit_student_answer('u1', 'test_problem', ['choice_1']) CertificateWhitelistFactory.create(user=user, course_id=self.course.id, whitelist=whitelisted) self.user_is_embargoed(user, is_embargoed) if user_enroll_mode in CourseMode.VERIFIED_MODES: SoftwareSecurePhotoVerificationFactory.create(user=user, status=verification_status) GeneratedCertificateFactory.create( user=user, course_id=self.course.id, status=certificate_status, mode=certificate_mode ) return user @ddt.data( ( 'verified', False, False, False, 'approved', 'notpassing', 'honor', ['verified', 'ID Verified', 'N', 'N', 'N/A'] ), ( 'verified', False, True, False, 'approved', 'downloadable', 'verified', ['verified', 'ID Verified', 'Y', 'Y', 'verified'] ), ( 'honor', True, True, True, 'approved', 'restricted', 'honor', ['honor', 'N/A', 'N', 'N', 'N/A'] ), ( 'verified', True, True, False, 'must_retry', 'downloadable', 'honor', ['verified', 'Not ID Verified', 'Y', 'Y', 'honor'] ), ) @ddt.unpack def test_grade_report_enrollment_and_certificate_info( self, user_enroll_mode, has_passed, whitelisted, is_embargoed, verification_status, certificate_status, certificate_mode, expected_output ): user = self._create_user_data( user_enroll_mode, has_passed, whitelisted, is_embargoed, verification_status, certificate_status, certificate_mode ) self._verify_csv_data(user.username, expected_output) @attr(shard=3) @ddt.ddt @override_settings(CERT_QUEUE='test-queue') class TestCertificateGeneration(InstructorTaskModuleTestCase): """ Test certificate generation task works. """ ENABLED_CACHES = ['default', 'mongo_metadata_inheritance', 'loc_cache'] def setUp(self): super(TestCertificateGeneration, self).setUp() self.initialize_course() def test_certificate_generation_for_students(self): """ Verify that certificates generated for all eligible students enrolled in a course. """ # create 10 students students = self._create_students(10) # mark 2 students to have certificates generated already for student in students[:2]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.downloadable, mode='honor' ) # white-list 5 students for student in students[2:7]: CertificateWhitelistFactory.create(user=student, course_id=self.course.id, whitelist=True) task_input = {'student_set': None} expected_results = { 'action_name': 'certificates generated', 'total': 10, 'attempted': 8, 'succeeded': 5, 'failed': 3, 'skipped': 2 } with self.assertNumQueries(166): self.assertCertificatesGenerated(task_input, expected_results) expected_results = { 'action_name': 'certificates generated', 'total': 10, 'attempted': 0, 'succeeded': 0, 'failed': 0, 'skipped': 10 } with self.assertNumQueries(3): self.assertCertificatesGenerated(task_input, expected_results) @ddt.data( CertificateStatuses.downloadable, CertificateStatuses.generating, CertificateStatuses.notpassing, CertificateStatuses.audit_passing, ) def test_certificate_generation_all_whitelisted(self, status): """ Verify that certificates are generated for all white-listed students, whether or not they already had certs generated for them. """ students = self._create_students(5) # whitelist 3 for student in students[:3]: CertificateWhitelistFactory.create( user=student, course_id=self.course.id, whitelist=True ) # generate certs for 2 for student in students[:2]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=status, ) task_input = {'student_set': 'all_whitelisted'} # only certificates for the 3 whitelisted students should have been run expected_results = { 'action_name': 'certificates generated', 'total': 3, 'attempted': 3, 'succeeded': 3, 'failed': 0, 'skipped': 0 } self.assertCertificatesGenerated(task_input, expected_results) # the first 3 students (who were whitelisted) have passing # certificate statuses for student in students[:3]: self.assertIn( GeneratedCertificate.certificate_for_student(student, self.course.id).status, CertificateStatuses.PASSED_STATUSES ) # The last 2 students still don't have certs for student in students[3:]: self.assertIsNone( GeneratedCertificate.certificate_for_student(student, self.course.id) ) @ddt.data( (CertificateStatuses.downloadable, 2), (CertificateStatuses.generating, 2), (CertificateStatuses.notpassing, 4), (CertificateStatuses.audit_passing, 4), ) @ddt.unpack def test_certificate_generation_whitelisted_not_generated(self, status, expected_certs): """ Verify that certificates are generated only for those students who do not have `downloadable` or `generating` certificates. """ # create 5 students students = self._create_students(5) # mark 2 students to have certificates generated already for student in students[:2]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=status, ) # white-list 4 students for student in students[:4]: CertificateWhitelistFactory.create( user=student, course_id=self.course.id, whitelist=True ) task_input = {'student_set': 'whitelisted_not_generated'} # certificates should only be generated for the whitelisted students # who do not yet have passing certificates. expected_results = { 'action_name': 'certificates generated', 'total': expected_certs, 'attempted': expected_certs, 'succeeded': expected_certs, 'failed': 0, 'skipped': 0 } self.assertCertificatesGenerated( task_input, expected_results ) # the first 4 students have passing certificate statuses since # they either were whitelisted or had one before for student in students[:4]: self.assertIn( GeneratedCertificate.certificate_for_student(student, self.course.id).status, CertificateStatuses.PASSED_STATUSES ) # The last student still doesn't have a cert self.assertIsNone( GeneratedCertificate.certificate_for_student(students[4], self.course.id) ) def test_certificate_generation_specific_student(self): """ Tests generating a certificate for a specific student. """ student = self.create_student(username="Hamnet", email="ham@ardenforest.co.uk") CertificateWhitelistFactory.create(user=student, course_id=self.course.id, whitelist=True) task_input = { 'student_set': 'specific_student', 'specific_student_id': student.id } expected_results = { 'action_name': 'certificates generated', 'total': 1, 'attempted': 1, 'succeeded': 1, 'failed': 0, 'skipped': 0, } self.assertCertificatesGenerated(task_input, expected_results) def test_specific_student_not_enrolled(self): """ Tests generating a certificate for a specific student if that student is not enrolled in the course. """ student = self.create_student(username="jacques", email="antlers@ardenforest.co.uk") task_input = { 'student_set': 'specific_student', 'specific_student_id': student.id } expected_results = { 'action_name': 'certificates generated', 'total': 1, 'attempted': 1, 'succeeded': 0, 'failed': 1, 'skipped': 0, } self.assertCertificatesGenerated(task_input, expected_results) def test_certificate_regeneration_for_statuses_to_regenerate(self): """ Verify that certificates are regenerated for all eligible students enrolled in a course whose generated certificate statuses lies in the list 'statuses_to_regenerate' given in task_input. """ # create 10 students students = self._create_students(10) # mark 2 students to have certificates generated already for student in students[:2]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.downloadable, mode='honor' ) # mark 3 students to have certificates generated with status 'error' for student in students[2:5]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.error, mode='honor' ) # mark 6th students to have certificates generated with status 'deleted' for student in students[5:6]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.deleted, mode='honor' ) # white-list 7 students for student in students[:7]: CertificateWhitelistFactory.create(user=student, course_id=self.course.id, whitelist=True) # Certificates should be regenerated for students having generated certificates with status # 'downloadable' or 'error' which are total of 5 students in this test case task_input = {'statuses_to_regenerate': [CertificateStatuses.downloadable, CertificateStatuses.error]} expected_results = { 'action_name': 'certificates generated', 'total': 10, 'attempted': 5, 'succeeded': 5, 'failed': 0, 'skipped': 5 } self.assertCertificatesGenerated( task_input, expected_results ) def test_certificate_regeneration_with_expected_failures(self): """ Verify that certificates are regenerated for all eligible students enrolled in a course whose generated certificate statuses lies in the list 'statuses_to_regenerate' given in task_input. """ # Default grade for students default_grade = '-1' # create 10 students students = self._create_students(10) # mark 2 students to have certificates generated already for student in students[:2]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.downloadable, mode='honor', grade=default_grade ) # mark 3 students to have certificates generated with status 'error' for student in students[2:5]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.error, mode='honor', grade=default_grade ) # mark 6th students to have certificates generated with status 'deleted' for student in students[5:6]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.deleted, mode='honor', grade=default_grade ) # mark rest of the 4 students with having generated certificates with status 'generating' # These students are not added in white-list and they have not completed grades so certificate generation # for these students should fail other than the one student that has been added to white-list # so from these students 3 failures and 1 success for student in students[6:]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.generating, mode='honor', grade=default_grade ) # white-list 7 students for student in students[:7]: CertificateWhitelistFactory.create(user=student, course_id=self.course.id, whitelist=True) # Regenerated certificates for students having generated certificates with status # 'deleted' or 'generating' task_input = {'statuses_to_regenerate': [CertificateStatuses.deleted, CertificateStatuses.generating]} expected_results = { 'action_name': 'certificates generated', 'total': 10, 'attempted': 5, 'succeeded': 2, 'failed': 3, 'skipped': 5 } self.assertCertificatesGenerated(task_input, expected_results) generated_certificates = GeneratedCertificate.eligible_certificates.filter( user__in=students, course_id=self.course.id, mode='honor' ) certificate_statuses = [generated_certificate.status for generated_certificate in generated_certificates] certificate_grades = [generated_certificate.grade for generated_certificate in generated_certificates] # Verify from results from database # Certificates are being generated for 2 white-listed students that had statuses in 'deleted'' and 'generating' self.assertEqual(certificate_statuses.count(CertificateStatuses.generating), 2) # 5 students are skipped that had Certificate Status 'downloadable' and 'error' self.assertEqual(certificate_statuses.count(CertificateStatuses.downloadable), 2) self.assertEqual(certificate_statuses.count(CertificateStatuses.error), 3) # grades will be '0.0' as students are either white-listed or ending in error self.assertEqual(certificate_grades.count('0.0'), 5) # grades will be '-1' for students that were skipped self.assertEqual(certificate_grades.count(default_grade), 5) def test_certificate_regeneration_with_existing_unavailable_status(self): """ Verify that certificates are regenerated for all eligible students enrolled in a course whose generated certificate status lies in the list 'statuses_to_regenerate' given in task_input. but the 'unavailable' status is not touched if it is not in the 'statuses_to_regenerate' list. """ # Default grade for students default_grade = '-1' # create 10 students students = self._create_students(10) # mark 2 students to have certificates generated already for student in students[:2]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.downloadable, mode='honor', grade=default_grade ) # mark 3 students to have certificates generated with status 'error' for student in students[2:5]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.error, mode='honor', grade=default_grade ) # mark 2 students to have generated certificates with status 'unavailable' for student in students[5:7]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.unavailable, mode='honor', grade=default_grade ) # mark 3 students to have generated certificates with status 'generating' for student in students[7:]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.generating, mode='honor', grade=default_grade ) # white-list all students for student in students[:]: CertificateWhitelistFactory.create(user=student, course_id=self.course.id, whitelist=True) # Regenerated certificates for students having generated certificates with status # 'downloadable', 'error' or 'generating' task_input = { 'statuses_to_regenerate': [ CertificateStatuses.downloadable, CertificateStatuses.error, CertificateStatuses.generating ] } expected_results = { 'action_name': 'certificates generated', 'total': 10, 'attempted': 8, 'succeeded': 8, 'failed': 0, 'skipped': 2 } self.assertCertificatesGenerated( task_input, expected_results ) generated_certificates = GeneratedCertificate.eligible_certificates.filter( user__in=students, course_id=self.course.id, mode='honor' ) certificate_statuses = [generated_certificate.status for generated_certificate in generated_certificates] certificate_grades = [generated_certificate.grade for generated_certificate in generated_certificates] # Verify from results from database # Certificates are being generated for 8 students that had statuses in 'downloadable', 'error' and 'generating' self.assertEqual(certificate_statuses.count(CertificateStatuses.generating), 8) # 2 students are skipped that had Certificate Status 'unavailable' self.assertEqual(certificate_statuses.count(CertificateStatuses.unavailable), 2) # grades will be '0.0' as students are white-listed and have not completed any tasks self.assertEqual(certificate_grades.count('0.0'), 8) # grades will be '-1' for students that have not been processed self.assertEqual(certificate_grades.count(default_grade), 2) # Verify that students with status 'unavailable were skipped unavailable_certificates = \ [cert for cert in generated_certificates if cert.status == CertificateStatuses.unavailable and cert.grade == default_grade] self.assertEquals(len(unavailable_certificates), 2) def test_certificate_regeneration_for_students(self): """ Verify that certificates are regenerated for all students passed in task_input. """ # create 10 students students = self._create_students(10) # mark 2 students to have certificates generated already for student in students[:2]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.downloadable, mode='honor' ) # mark 3 students to have certificates generated with status 'error' for student in students[2:5]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.error, mode='honor' ) # mark 6th students to have certificates generated with status 'deleted' for student in students[5:6]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.deleted, mode='honor' ) # mark 7th students to have certificates generated with status 'norpassing' for student in students[6:7]: GeneratedCertificateFactory.create( user=student, course_id=self.course.id, status=CertificateStatuses.notpassing, mode='honor' ) # white-list 7 students for student in students[:7]: CertificateWhitelistFactory.create(user=student, course_id=self.course.id, whitelist=True) # Certificates should be regenerated for students having generated certificates with status # 'downloadable' or 'error' which are total of 5 students in this test case task_input = {'student_set': "all_whitelisted"} expected_results = { 'action_name': 'certificates generated', 'total': 7, 'attempted': 7, 'succeeded': 7, 'failed': 0, 'skipped': 0, } self.assertCertificatesGenerated(task_input, expected_results) def assertCertificatesGenerated(self, task_input, expected_results): """ Generate certificates for the given task_input and compare with expected_results. """ current_task = Mock() current_task.update_state = Mock() with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task') as mock_current_task: mock_current_task.return_value = current_task with patch('capa.xqueue_interface.XQueueInterface.send_to_queue') as mock_queue: mock_queue.return_value = (0, "Successfully queued") result = generate_students_certificates( None, None, self.course.id, task_input, 'certificates generated' ) self.assertDictContainsSubset( expected_results, result ) def _create_students(self, number_of_students): """ Create Students for course. """ return [ self.create_student( username='student_{}'.format(index), email='student_{}@example.com'.format(index) ) for index in xrange(number_of_students) ] class TestInstructorOra2Report(SharedModuleStoreTestCase): """ Tests that ORA2 response report generation works. """ @classmethod def setUpClass(cls): super(TestInstructorOra2Report, cls).setUpClass() cls.course = CourseFactory.create() def setUp(self): super(TestInstructorOra2Report, self).setUp() self.current_task = Mock() self.current_task.update_state = Mock() def tearDown(self): super(TestInstructorOra2Report, self).tearDown() if os.path.exists(settings.GRADES_DOWNLOAD['ROOT_PATH']): shutil.rmtree(settings.GRADES_DOWNLOAD['ROOT_PATH']) def test_report_fails_if_error(self): with patch( 'lms.djangoapps.instructor_task.tasks_helper.OraAggregateData.collect_ora2_data' ) as mock_collect_data: mock_collect_data.side_effect = KeyError with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task') as mock_current_task: mock_current_task.return_value = self.current_task response = upload_ora2_data(None, None, self.course.id, None, 'generated') self.assertEqual(response, UPDATE_STATUS_FAILED) @freeze_time('2001-01-01 00:00:00') def test_report_stores_results(self): test_header = ['field1', 'field2'] test_rows = [['row1_field1', 'row1_field2'], ['row2_field1', 'row2_field2']] with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task') as mock_current_task: mock_current_task.return_value = self.current_task with patch( 'lms.djangoapps.instructor_task.tasks_helper.OraAggregateData.collect_ora2_data' ) as mock_collect_data: mock_collect_data.return_value = (test_header, test_rows) with patch( 'lms.djangoapps.instructor_task.models.DjangoStorageReportStore.store_rows' ) as mock_store_rows: return_val = upload_ora2_data(None, None, self.course.id, None, 'generated') # pylint: disable=maybe-no-member timestamp_str = datetime.now(UTC).strftime('%Y-%m-%d-%H%M') course_id_string = urllib.quote(self.course.id.to_deprecated_string().replace('/', '_')) filename = u'{}_ORA_data_{}.csv'.format(course_id_string, timestamp_str) self.assertEqual(return_val, UPDATE_STATUS_SUCCEEDED) mock_store_rows.assert_called_once_with(self.course.id, filename, [test_header] + test_rows)