"""
Integration Tests for LMS instructor-initiated background tasks.

Runs tasks on answers to course problems to validate that code
paths actually work.

"""
from collections import namedtuple
import ddt
import json
import logging
from mock import patch
from nose.plugins.attrib import attr
import textwrap

from celery.states import SUCCESS, FAILURE
from django.contrib.auth.models import User
from django.core.urlresolvers import reverse

from openedx.core.djangoapps.util.testing import TestConditionalContent
from capa.tests.response_xml_factory import (CodeResponseXMLFactory,
                                             CustomResponseXMLFactory)
from xmodule.modulestore.tests.factories import ItemFactory
from xmodule.modulestore import ModuleStoreEnum

from courseware.model_data import StudentModule

from lms.djangoapps.instructor_task.api import (
    submit_rescore_problem_for_all_students,
    submit_rescore_problem_for_student,
    submit_reset_problem_attempts_for_all_students,
    submit_delete_problem_state_for_all_students
)
from lms.djangoapps.instructor_task.models import InstructorTask
from lms.djangoapps.instructor_task.tasks_helper import upload_grades_csv
from lms.djangoapps.instructor_task.tests.test_base import (
    InstructorTaskModuleTestCase,
    TestReportMixin,
    OPTION_1,
    OPTION_2,
)
from capa.responsetypes import StudentInputError
from lms.djangoapps.grades.new.course_grade import CourseGradeFactory
from openedx.core.lib.url_utils import quote_slashes


log = logging.getLogger(__name__)


class TestIntegrationTask(InstructorTaskModuleTestCase):
    """
    Base class to provide general methods used for "integration" testing of particular tasks.
    """

    def _assert_task_failure(self, entry_id, task_type, problem_url_name, expected_message):
        """Confirm that expected values are stored in InstructorTask on task failure."""
        instructor_task = InstructorTask.objects.get(id=entry_id)
        self.assertEqual(instructor_task.task_state, FAILURE)
        self.assertEqual(instructor_task.requester.username, 'instructor')
        self.assertEqual(instructor_task.task_type, task_type)
        task_input = json.loads(instructor_task.task_input)
        self.assertNotIn('student', task_input)
        self.assertEqual(task_input['problem_url'], InstructorTaskModuleTestCase.problem_location(problem_url_name).to_deprecated_string())
        status = json.loads(instructor_task.task_output)
        self.assertEqual(status['exception'], 'ZeroDivisionError')
        self.assertEqual(status['message'], expected_message)
        # check status returned:
        status = InstructorTaskModuleTestCase.get_task_status(instructor_task.task_id)
        self.assertEqual(status['message'], expected_message)


@attr(shard=3)
@ddt.ddt
class TestRescoringTask(TestIntegrationTask):
    """
    Integration-style tests for rescoring problems in a background task.

    Exercises real problems with a minimum of patching.
    """

    def setUp(self):
        super(TestRescoringTask, self).setUp()

        self.initialize_course()
        self.create_instructor('instructor')
        self.user1 = self.create_student('u1')
        self.user2 = self.create_student('u2')
        self.user3 = self.create_student('u3')
        self.user4 = self.create_student('u4')
        self.users = [self.user1, self.user2, self.user3, self.user4]
        self.logout()

        # set up test user for performing test operations
        self.setup_user()

    def render_problem(self, username, problem_url_name):
        """
        Use ajax interface to request html for a problem.
        """
        # make sure that the requested user is logged in, so that the ajax call works
        # on the right problem:
        self.login_username(username)
        # make ajax call:
        modx_url = reverse('xblock_handler', kwargs={
            'course_id': self.course.id.to_deprecated_string(),
            'usage_id': quote_slashes(InstructorTaskModuleTestCase.problem_location(problem_url_name).to_deprecated_string()),
            'handler': 'xmodule_handler',
            'suffix': 'problem_get',
        })
        resp = self.client.post(modx_url, {})
        return resp

    def check_state(self, user, descriptor, expected_score, expected_max_score, expected_attempts=1):
        """
        Check that the StudentModule state contains the expected values.

        The student module is found for the test course, given the `username` and problem `descriptor`.

        Values checked include the number of attempts, the score, and the max score for a problem.
        """
        module = self.get_student_module(user.username, descriptor)
        self.assertEqual(module.grade, expected_score)
        self.assertEqual(module.max_grade, expected_max_score)
        state = json.loads(module.state)
        attempts = state['attempts']
        self.assertEqual(attempts, expected_attempts)
        if attempts > 0:
            self.assertIn('correct_map', state)
            self.assertIn('student_answers', state)
            self.assertGreater(len(state['correct_map']), 0)
            self.assertGreater(len(state['student_answers']), 0)

        # assume only one problem in the subsection and the grades
        # are in sync.
        expected_subsection_grade = expected_score

        course_grade = CourseGradeFactory().create(user, self.course)
        self.assertEquals(
            course_grade.graded_subsections_by_format['Homework'][self.problem_section.location].graded_total.earned,
            expected_subsection_grade,
        )

    def submit_rescore_all_student_answers(self, instructor, problem_url_name, only_if_higher=False):
        """Submits the particular problem for rescoring"""
        return submit_rescore_problem_for_all_students(
            self.create_task_request(instructor),
            InstructorTaskModuleTestCase.problem_location(problem_url_name),
            only_if_higher,
        )

    def submit_rescore_one_student_answer(self, instructor, problem_url_name, student, only_if_higher=False):
        """Submits the particular problem for rescoring for a particular student"""
        return submit_rescore_problem_for_student(
            self.create_task_request(instructor),
            InstructorTaskModuleTestCase.problem_location(problem_url_name),
            student,
            only_if_higher,
        )

    def verify_rescore_results(self, problem_edit, new_expected_scores, new_expected_max, rescore_if_higher):
        """
        Common helper to verify the results of rescoring for a single
        student and all students are as expected.
        """
        # get descriptor:
        problem_url_name = 'H1P1'
        self.define_option_problem(problem_url_name)
        location = InstructorTaskModuleTestCase.problem_location(problem_url_name)
        descriptor = self.module_store.get_item(location)

        # first store answers for each of the separate users:
        self.submit_student_answer('u1', problem_url_name, [OPTION_1, OPTION_1])
        self.submit_student_answer('u2', problem_url_name, [OPTION_1, OPTION_2])
        self.submit_student_answer('u3', problem_url_name, [OPTION_2, OPTION_1])
        self.submit_student_answer('u4', problem_url_name, [OPTION_2, OPTION_2])

        # verify each user's grade
        expected_original_scores = (2, 1, 1, 0)
        expected_original_max = 2
        for i, user in enumerate(self.users):
            self.check_state(user, descriptor, expected_original_scores[i], expected_original_max)

        # update the data in the problem definition so the answer changes.
        self.redefine_option_problem(problem_url_name, **problem_edit)

        # confirm that simply rendering the problem again does not change the grade
        self.render_problem('u1', problem_url_name)
        self.check_state(self.user1, descriptor, expected_original_scores[0], expected_original_max)

        # rescore the problem for only one student -- only that student's grade should change:
        self.submit_rescore_one_student_answer('instructor', problem_url_name, self.user1, rescore_if_higher)
        self.check_state(self.user1, descriptor, new_expected_scores[0], new_expected_max)
        for i, user in enumerate(self.users[1:], start=1):  # everyone other than user1
            self.check_state(user, descriptor, expected_original_scores[i], expected_original_max)

        # rescore the problem for all students
        self.submit_rescore_all_student_answers('instructor', problem_url_name, rescore_if_higher)
        for i, user in enumerate(self.users):
            self.check_state(user, descriptor, new_expected_scores[i], new_expected_max)

    RescoreTestData = namedtuple('RescoreTestData', 'edit, new_expected_scores, new_expected_max')

    @ddt.data(
        RescoreTestData(edit=dict(correct_answer=OPTION_2), new_expected_scores=(0, 1, 1, 2), new_expected_max=2),
        RescoreTestData(edit=dict(num_inputs=2), new_expected_scores=(2, 1, 1, 0), new_expected_max=4),
        RescoreTestData(edit=dict(num_inputs=4), new_expected_scores=(2, 1, 1, 0), new_expected_max=8),
        RescoreTestData(edit=dict(num_responses=4), new_expected_scores=(2, 1, 1, 0), new_expected_max=4),
        RescoreTestData(edit=dict(num_inputs=2, num_responses=4), new_expected_scores=(2, 1, 1, 0), new_expected_max=8),
    )
    @ddt.unpack
    def test_rescoring_option_problem(self, problem_edit, new_expected_scores, new_expected_max):
        """
        Run rescore scenario on option problem.
        Verify rescoring updates grade after content change.
        Original problem definition has:
            num_inputs = 1
            num_responses = 2
            correct_answer = OPTION_1
        """
        self.verify_rescore_results(
            problem_edit, new_expected_scores, new_expected_max, rescore_if_higher=False,
        )

    @ddt.data(
        RescoreTestData(edit=dict(), new_expected_scores=(2, 1, 1, 0), new_expected_max=2),
        RescoreTestData(edit=dict(correct_answer=OPTION_2), new_expected_scores=(2, 1, 1, 2), new_expected_max=2),
        RescoreTestData(edit=dict(num_inputs=2), new_expected_scores=(2, 1, 1, 0), new_expected_max=2),
    )
    @ddt.unpack
    def test_rescoring_if_higher(self, problem_edit, new_expected_scores, new_expected_max):
        self.verify_rescore_results(
            problem_edit, new_expected_scores, new_expected_max, rescore_if_higher=True,
        )

    def test_rescoring_failure(self):
        """Simulate a failure in rescoring a problem"""
        problem_url_name = 'H1P1'
        self.define_option_problem(problem_url_name)
        self.submit_student_answer('u1', problem_url_name, [OPTION_1, OPTION_1])

        expected_message = "bad things happened"
        with patch('capa.capa_problem.LoncapaProblem.rescore_existing_answers') as mock_rescore:
            mock_rescore.side_effect = ZeroDivisionError(expected_message)
            instructor_task = self.submit_rescore_all_student_answers('instructor', problem_url_name)
        self._assert_task_failure(instructor_task.id, 'rescore_problem', problem_url_name, expected_message)

    def test_rescoring_bad_unicode_input(self):
        """Generate a real failure in rescoring a problem, with an answer including unicode"""
        # At one point, the student answers that resulted in StudentInputErrors were being
        # persisted (even though they were not counted as an attempt).  That is not possible
        # now, so it's harder to generate a test for how such input is handled.
        problem_url_name = 'H1P1'
        # set up an option problem -- doesn't matter really what problem it is, but we need
        # it to have an answer.
        self.define_option_problem(problem_url_name)
        self.submit_student_answer('u1', problem_url_name, [OPTION_1, OPTION_1])

        # return an input error as if it were a numerical response, with an embedded unicode character:
        expected_message = u"Could not interpret '2/3\u03a9' as a number"
        with patch('capa.capa_problem.LoncapaProblem.rescore_existing_answers') as mock_rescore:
            mock_rescore.side_effect = StudentInputError(expected_message)
            instructor_task = self.submit_rescore_all_student_answers('instructor', problem_url_name)

        # check instructor_task returned
        instructor_task = InstructorTask.objects.get(id=instructor_task.id)
        self.assertEqual(instructor_task.task_state, 'SUCCESS')
        self.assertEqual(instructor_task.requester.username, 'instructor')
        self.assertEqual(instructor_task.task_type, 'rescore_problem')
        task_input = json.loads(instructor_task.task_input)
        self.assertNotIn('student', task_input)
        self.assertEqual(task_input['problem_url'], InstructorTaskModuleTestCase.problem_location(problem_url_name).to_deprecated_string())
        status = json.loads(instructor_task.task_output)
        self.assertEqual(status['attempted'], 1)
        self.assertEqual(status['succeeded'], 0)
        self.assertEqual(status['total'], 1)

    def define_code_response_problem(self, problem_url_name):
        """
        Define an arbitrary code-response problem.

        We'll end up mocking its evaluation later.
        """
        factory = CodeResponseXMLFactory()
        grader_payload = json.dumps({"grader": "ps04/grade_square.py"})
        problem_xml = factory.build_xml(initial_display="def square(x):",
                                        answer_display="answer",
                                        grader_payload=grader_payload,
                                        num_responses=2)
        ItemFactory.create(parent_location=self.problem_section.location,
                           category="problem",
                           display_name=str(problem_url_name),
                           data=problem_xml)

    def test_rescoring_code_problem(self):
        """Run rescore scenario on problem with code submission"""
        problem_url_name = 'H1P2'
        self.define_code_response_problem(problem_url_name)
        # we fully create the CodeResponse problem, but just pretend that we're queuing it:
        with patch('capa.xqueue_interface.XQueueInterface.send_to_queue') as mock_send_to_queue:
            mock_send_to_queue.return_value = (0, "Successfully queued")
            self.submit_student_answer('u1', problem_url_name, ["answer1", "answer2"])

        instructor_task = self.submit_rescore_all_student_answers('instructor', problem_url_name)

        instructor_task = InstructorTask.objects.get(id=instructor_task.id)
        self.assertEqual(instructor_task.task_state, FAILURE)
        status = json.loads(instructor_task.task_output)
        self.assertEqual(status['exception'], 'NotImplementedError')
        self.assertEqual(status['message'], "Problem's definition does not support rescoring.")

        status = InstructorTaskModuleTestCase.get_task_status(instructor_task.task_id)
        self.assertEqual(status['message'], "Problem's definition does not support rescoring.")

    def define_randomized_custom_response_problem(self, problem_url_name, redefine=False):
        """
        Defines a custom response problem that uses a random value to determine correctness.

        Generated answer is also returned as the `msg`, so that the value can be used as a
        correct answer by a test.

        If the `redefine` flag is set, then change the definition of correctness (from equals
        to not-equals).
        """
        factory = CustomResponseXMLFactory()
        script = textwrap.dedent("""
                def check_func(expect, answer_given):
                    expected = str(random.randint(0, 100))
                    return {'ok': answer_given %s expected, 'msg': expected}
            """ % ('!=' if redefine else '=='))
        problem_xml = factory.build_xml(script=script, cfn="check_func", expect="42", num_responses=1)
        if redefine:
            descriptor = self.module_store.get_item(
                InstructorTaskModuleTestCase.problem_location(problem_url_name)
            )
            descriptor.data = problem_xml
            with self.module_store.branch_setting(ModuleStoreEnum.Branch.draft_preferred, descriptor.location.course_key):
                self.module_store.update_item(descriptor, self.user.id)
                self.module_store.publish(descriptor.location, self.user.id)
        else:
            # Use "per-student" rerandomization so that check-problem can be called more than once.
            # Using "always" means we cannot check a problem twice, but we want to call once to get the
            # correct answer, and call a second time with that answer to confirm it's graded as correct.
            # Per-student rerandomization will at least generate different seeds for different users, so
            # we get a little more test coverage.
            ItemFactory.create(parent_location=self.problem_section.location,
                               category="problem",
                               display_name=str(problem_url_name),
                               data=problem_xml,
                               metadata={"rerandomize": "per_student"})

    def test_rescoring_randomized_problem(self):
        """Run rescore scenario on custom problem that uses randomize"""
        # First define the custom response problem:
        problem_url_name = 'H1P1'
        self.define_randomized_custom_response_problem(problem_url_name)
        location = InstructorTaskModuleTestCase.problem_location(problem_url_name)
        descriptor = self.module_store.get_item(location)
        # run with more than one user
        for user in self.users:
            # first render the problem, so that a seed will be created for this user
            self.render_problem(user.username, problem_url_name)
            # submit a bogus answer, in order to get the problem to tell us its real answer
            dummy_answer = "1000"
            self.submit_student_answer(user.username, problem_url_name, [dummy_answer, dummy_answer])
            # we should have gotten the problem wrong, since we're way out of range:
            self.check_state(user, descriptor, 0, 1, expected_attempts=1)
            # dig the correct answer out of the problem's message
            module = self.get_student_module(user.username, descriptor)
            state = json.loads(module.state)
            correct_map = state['correct_map']
            log.info("Correct Map: %s", correct_map)
            # only one response, so pull it out:
            answer = correct_map.values()[0]['msg']
            self.submit_student_answer(user.username, problem_url_name, [answer, answer])
            # we should now get the problem right, with a second attempt:
            self.check_state(user, descriptor, 1, 1, expected_attempts=2)

        # redefine the problem (as stored in Mongo) so that the definition of correct changes
        self.define_randomized_custom_response_problem(problem_url_name, redefine=True)
        # confirm that simply rendering the problem again does not result in a change
        # in the grade (or the attempts):
        self.render_problem('u1', problem_url_name)
        self.check_state(self.user1, descriptor, 1, 1, expected_attempts=2)

        # rescore the problem for only one student -- only that student's grade should change
        # (and none of the attempts):
        self.submit_rescore_one_student_answer('instructor', problem_url_name, User.objects.get(username='u1'))
        for user in self.users:
            expected_score = 0 if user.username == 'u1' else 1
            self.check_state(user, descriptor, expected_score, 1, expected_attempts=2)

        # rescore the problem for all students
        self.submit_rescore_all_student_answers('instructor', problem_url_name)

        # all grades should change to being wrong (with no change in attempts)
        for user in self.users:
            self.check_state(user, descriptor, 0, 1, expected_attempts=2)


class TestResetAttemptsTask(TestIntegrationTask):
    """
    Integration-style tests for resetting problem attempts in a background task.

    Exercises real problems with a minimum of patching.
    """
    userlist = ['u1', 'u2', 'u3', 'u4']

    def setUp(self):
        super(TestResetAttemptsTask, self).setUp()
        self.initialize_course()
        self.create_instructor('instructor')
        for username in self.userlist:
            self.create_student(username)
        self.logout()

    def get_num_attempts(self, username, descriptor):
        """returns number of attempts stored for `username` on problem `descriptor` for test course"""
        module = self.get_student_module(username, descriptor)
        state = json.loads(module.state)
        return state['attempts']

    def reset_problem_attempts(self, instructor, location):
        """Submits the current problem for resetting"""
        return submit_reset_problem_attempts_for_all_students(self.create_task_request(instructor),
                                                              location)

    def test_reset_attempts_on_problem(self):
        """Run reset-attempts scenario on option problem"""
        # get descriptor:
        problem_url_name = 'H1P1'
        self.define_option_problem(problem_url_name)
        location = InstructorTaskModuleTestCase.problem_location(problem_url_name)
        descriptor = self.module_store.get_item(location)
        num_attempts = 3
        # first store answers for each of the separate users:
        for _ in range(num_attempts):
            for username in self.userlist:
                self.submit_student_answer(username, problem_url_name, [OPTION_1, OPTION_1])

        for username in self.userlist:
            self.assertEquals(self.get_num_attempts(username, descriptor), num_attempts)

        self.reset_problem_attempts('instructor', location)

        for username in self.userlist:
            self.assertEquals(self.get_num_attempts(username, descriptor), 0)

    def test_reset_failure(self):
        """Simulate a failure in resetting attempts on a problem"""
        problem_url_name = 'H1P1'
        location = InstructorTaskModuleTestCase.problem_location(problem_url_name)
        self.define_option_problem(problem_url_name)
        self.submit_student_answer('u1', problem_url_name, [OPTION_1, OPTION_1])

        expected_message = "bad things happened"
        with patch('courseware.models.StudentModule.save') as mock_save:
            mock_save.side_effect = ZeroDivisionError(expected_message)
            instructor_task = self.reset_problem_attempts('instructor', location)
        self._assert_task_failure(instructor_task.id, 'reset_problem_attempts', problem_url_name, expected_message)

    def test_reset_non_problem(self):
        """confirm that a non-problem can still be successfully reset"""
        location = self.problem_section.location
        instructor_task = self.reset_problem_attempts('instructor', location)
        instructor_task = InstructorTask.objects.get(id=instructor_task.id)
        self.assertEqual(instructor_task.task_state, SUCCESS)


class TestDeleteProblemTask(TestIntegrationTask):
    """
    Integration-style tests for deleting problem state in a background task.

    Exercises real problems with a minimum of patching.
    """
    userlist = ['u1', 'u2', 'u3', 'u4']

    def setUp(self):
        super(TestDeleteProblemTask, self).setUp()

        self.initialize_course()
        self.create_instructor('instructor')
        for username in self.userlist:
            self.create_student(username)
        self.logout()

    def delete_problem_state(self, instructor, location):
        """Submits the current problem for deletion"""
        return submit_delete_problem_state_for_all_students(self.create_task_request(instructor), location)

    def test_delete_problem_state(self):
        """Run delete-state scenario on option problem"""
        # get descriptor:
        problem_url_name = 'H1P1'
        self.define_option_problem(problem_url_name)
        location = InstructorTaskModuleTestCase.problem_location(problem_url_name)
        descriptor = self.module_store.get_item(location)
        # first store answers for each of the separate users:
        for username in self.userlist:
            self.submit_student_answer(username, problem_url_name, [OPTION_1, OPTION_1])
        # confirm that state exists:
        for username in self.userlist:
            self.assertIsNotNone(self.get_student_module(username, descriptor))
        # run delete task:
        self.delete_problem_state('instructor', location)
        # confirm that no state can be found:
        for username in self.userlist:
            with self.assertRaises(StudentModule.DoesNotExist):
                self.get_student_module(username, descriptor)

    def test_delete_failure(self):
        """Simulate a failure in deleting state of a problem"""
        problem_url_name = 'H1P1'
        location = InstructorTaskModuleTestCase.problem_location(problem_url_name)
        self.define_option_problem(problem_url_name)
        self.submit_student_answer('u1', problem_url_name, [OPTION_1, OPTION_1])

        expected_message = "bad things happened"
        with patch('courseware.models.StudentModule.delete') as mock_delete:
            mock_delete.side_effect = ZeroDivisionError(expected_message)
            instructor_task = self.delete_problem_state('instructor', location)
        self._assert_task_failure(instructor_task.id, 'delete_problem_state', problem_url_name, expected_message)

    def test_delete_non_problem(self):
        """confirm that a non-problem can still be successfully deleted"""
        location = self.problem_section.location
        instructor_task = self.delete_problem_state('instructor', location)
        instructor_task = InstructorTask.objects.get(id=instructor_task.id)
        self.assertEqual(instructor_task.task_state, SUCCESS)


class TestGradeReportConditionalContent(TestReportMixin, TestConditionalContent, TestIntegrationTask):
    """
    Test grade report in cases where there are problems contained within split tests.
    """

    def verify_csv_task_success(self, task_result):
        """
        Verify that all students were successfully graded by
        `upload_grades_csv`.

        Arguments:
            task_result (dict): Return value of `upload_grades_csv`.
        """
        self.assertDictContainsSubset({'attempted': 2, 'succeeded': 2, 'failed': 0}, task_result)

    def verify_grades_in_csv(self, students_grades, ignore_other_columns=False):
        """
        Verify that the grades CSV contains the expected grades data.

        Arguments:
            students_grades (iterable): An iterable of dictionaries,
                where each dict maps a student to another dict
                representing their grades we expect to see in the CSV.
                For example: [student_a: {'grade': 1.0, 'HW': 1.0}]
        """
        def merge_dicts(*dicts):
            """
            Return the union of dicts

            Arguments:
                dicts: tuple of dicts
            """
            return dict([item for d in dicts for item in d.items()])

        def user_partition_group(user):
            """Return a dict having single key with value equals to students group in partition"""
            group_config_hdr_tpl = 'Experiment Group ({})'
            return {
                group_config_hdr_tpl.format(self.partition.name): self.partition.scheme.get_group_for_user(
                    self.course.id, user, self.partition, track_function=None
                ).name
            }

        self.verify_rows_in_csv(
            [
                merge_dicts(
                    {'Student ID': str(student.id), 'Username': student.username, 'Email': student.email},
                    grades,
                    user_partition_group(student)
                )
                for student_grades in students_grades for student, grades in student_grades.iteritems()
            ],
            ignore_other_columns=ignore_other_columns,
        )

    def test_both_groups_problems(self):
        """
        Verify that grade export works when each user partition
        receives (different) problems.  Each user's grade on their
        particular problem should show up in the grade report.
        """
        problem_a_url = 'problem_a_url'
        problem_b_url = 'problem_b_url'
        self.define_option_problem(problem_a_url, parent=self.vertical_a)
        self.define_option_problem(problem_b_url, parent=self.vertical_b)
        # student A will get 100%, student B will get 50% because
        # OPTION_1 is the correct option, and OPTION_2 is the
        # incorrect option
        self.submit_student_answer(self.student_a.username, problem_a_url, [OPTION_1, OPTION_1])
        self.submit_student_answer(self.student_b.username, problem_b_url, [OPTION_1, OPTION_2])

        with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'):
            result = upload_grades_csv(None, None, self.course.id, None, 'graded')
            self.verify_csv_task_success(result)
            self.verify_grades_in_csv(
                [
                    {
                        self.student_a: {
                            u'Grade': '1.0',
                            u'Homework': '1.0',
                        }
                    },
                    {
                        self.student_b: {
                            u'Grade': '0.5',
                            u'Homework': '0.5',
                        }
                    },
                ],
                ignore_other_columns=True,
            )

    def test_one_group_problem(self):
        """
        Verify that grade export works when only the Group A user
        partition receives a problem.  We expect to see a column for
        the homework where student_a's entry includes their grade, and
        student b's entry shows a 0.
        """
        problem_a_url = 'problem_a_url'
        self.define_option_problem(problem_a_url, parent=self.vertical_a)

        self.submit_student_answer(self.student_a.username, problem_a_url, [OPTION_1, OPTION_1])

        with patch('lms.djangoapps.instructor_task.tasks_helper._get_current_task'):
            result = upload_grades_csv(None, None, self.course.id, None, 'graded')
            self.verify_csv_task_success(result)
            self.verify_grades_in_csv(
                [
                    {
                        self.student_a: {
                            u'Grade': '1.0',
                            u'Homework': '1.0',
                        },
                    },
                    {
                        self.student_b: {
                            u'Grade': '0.0',
                            u'Homework': u'Not Available',
                        }
                    },
                ],
                ignore_other_columns=True
            )