"""
Provide tests for sysadmin dashboard feature in sysadmin.py
"""
import glob
import os
import re
import shutil
import unittest
from util.date_utils import get_time_display, DEFAULT_DATE_TIME_FORMAT

from django.conf import settings
from django.contrib.auth.hashers import check_password
from django.contrib.auth.models import User
from django.core.urlresolvers import reverse
from django.test.client import Client
from django.test.utils import override_settings
from django.utils.timezone import utc as UTC
from django.utils.translation import ugettext as _
import mongoengine
from opaque_keys.edx.locations import SlashSeparatedCourseKey

from xmodule.modulestore.tests.django_utils import (
    TEST_DATA_MOCK_MODULESTORE, TEST_DATA_XML_MODULESTORE
)
from dashboard.models import CourseImportLog
from dashboard.sysadmin import Users
from dashboard.git_import import GitImportError
from external_auth.models import ExternalAuthMap
from student.roles import CourseStaffRole, GlobalStaff
from student.tests.factories import UserFactory
from xmodule.modulestore.django import modulestore
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.mongo_connection import MONGO_PORT_NUM, MONGO_HOST
from xmodule.modulestore.xml import XMLModuleStore


TEST_MONGODB_LOG = {
    'host': MONGO_HOST,
    'port': MONGO_PORT_NUM,
    'user': '',
    'password': '',
    'db': 'test_xlog',
}

FEATURES_WITH_SSL_AUTH = settings.FEATURES.copy()
FEATURES_WITH_SSL_AUTH['AUTH_USE_CERTIFICATES'] = True


class SysadminBaseTestCase(ModuleStoreTestCase):
    """
    Base class with common methods used in XML and Mongo tests
    """

    TEST_REPO = 'https://github.com/mitocw/edx4edx_lite.git'
    TEST_BRANCH = 'testing_do_not_delete'
    TEST_BRANCH_COURSE = SlashSeparatedCourseKey('MITx', 'edx4edx_branch', 'edx4edx')

    def setUp(self):
        """Setup test case by adding primary user."""
        super(SysadminBaseTestCase, self).setUp(create_user=False)
        self.user = UserFactory.create(username='test_user',
                                       email='test_user+sysadmin@edx.org',
                                       password='foo')
        self.client = Client()

    def _setstaff_login(self):
        """Makes the test user staff and logs them in"""
        GlobalStaff().add_users(self.user)
        self.client.login(username=self.user.username, password='foo')

    def _add_edx4edx(self, branch=None):
        """Adds the edx4edx sample course"""
        post_dict = {'repo_location': self.TEST_REPO, 'action': 'add_course', }
        if branch:
            post_dict['repo_branch'] = branch
        return self.client.post(reverse('sysadmin_courses'), post_dict)

    def _rm_edx4edx(self):
        """Deletes the sample course from the XML store"""
        def_ms = modulestore()
        course_path = '{0}/edx4edx_lite'.format(
            os.path.abspath(settings.DATA_DIR))
        try:
            # using XML store
            course = def_ms.courses.get(course_path, None)
        except AttributeError:
            # Using mongo store
            course = def_ms.get_course(SlashSeparatedCourseKey('MITx', 'edx4edx', 'edx4edx'))

        # Delete git loaded course
        response = self.client.post(
            reverse('sysadmin_courses'),
            {
                'course_id': course.id.to_deprecated_string(),
                'action': 'del_course',
            }
        )
        self.addCleanup(self._rm_glob, '{0}_deleted_*'.format(course_path))

        return response

    def _rm_glob(self, path):
        """
        Create a shell expansion of passed in parameter and iteratively
        remove them.  Must only expand to directories.
        """
        for path in glob.glob(path):
            shutil.rmtree(path)

    def _mkdir(self, path):
        """
        Create directory and add the cleanup for it.
        """
        os.mkdir(path)
        self.addCleanup(shutil.rmtree, path)


@unittest.skipUnless(settings.FEATURES.get('ENABLE_SYSADMIN_DASHBOARD'),
                     "ENABLE_SYSADMIN_DASHBOARD not set")
@override_settings(GIT_IMPORT_WITH_XMLMODULESTORE=True)
class TestSysadmin(SysadminBaseTestCase):
    """
    Test sysadmin dashboard features using XMLModuleStore
    """
    MODULESTORE = TEST_DATA_XML_MODULESTORE

    def test_staff_access(self):
        """Test access controls."""

        test_views = ['sysadmin', 'sysadmin_courses', 'sysadmin_staffing', ]
        for view in test_views:
            response = self.client.get(reverse(view))
            self.assertEqual(response.status_code, 302)

        self.user.is_staff = False
        self.user.save()

        logged_in = self.client.login(username=self.user.username,
                                      password='foo')
        self.assertTrue(logged_in)

        for view in test_views:
            response = self.client.get(reverse(view))
            self.assertEqual(response.status_code, 404)

        response = self.client.get(reverse('gitlogs'))
        self.assertEqual(response.status_code, 404)

        self.user.is_staff = True
        self.user.save()

        self.client.logout()
        self.client.login(username=self.user.username, password='foo')

        for view in test_views:
            response = self.client.get(reverse(view))
            self.assertTrue(response.status_code, 200)

        response = self.client.get(reverse('gitlogs'))
        self.assertTrue(response.status_code, 200)

    def test_user_mod(self):
        """Create and delete a user"""

        self._setstaff_login()

        self.client.login(username=self.user.username, password='foo')

        # Create user tests

        # No uname
        response = self.client.post(reverse('sysadmin'),
                                    {'action': 'create_user',
                                     'student_fullname': 'blah',
                                     'student_password': 'foozor', })
        self.assertIn('Must provide username', response.content.decode('utf-8'))
        # no full name
        response = self.client.post(reverse('sysadmin'),
                                    {'action': 'create_user',
                                     'student_uname': 'test_cuser+sysadmin@edx.org',
                                     'student_password': 'foozor', })
        self.assertIn('Must provide full name', response.content.decode('utf-8'))

        # Test create valid user
        self.client.post(reverse('sysadmin'),
                         {'action': 'create_user',
                          'student_uname': 'test_cuser+sysadmin@edx.org',
                          'student_fullname': 'test cuser',
                          'student_password': 'foozor', })

        self.assertIsNotNone(
            User.objects.get(username='test_cuser+sysadmin@edx.org',
                             email='test_cuser+sysadmin@edx.org'))

        # login as new user to confirm
        self.assertTrue(self.client.login(
            username='test_cuser+sysadmin@edx.org', password='foozor'))

        self.client.logout()
        self.client.login(username=self.user.username, password='foo')

        # Delete user tests

        # Try no username
        response = self.client.post(reverse('sysadmin'),
                                    {'action': 'del_user', })
        self.assertIn('Must provide username', response.content.decode('utf-8'))

        # Try bad usernames
        response = self.client.post(reverse('sysadmin'),
                                    {'action': 'del_user',
                                     'student_uname': 'flabbergast@example.com',
                                     'student_fullname': 'enigma jones', })
        self.assertIn('Cannot find user with email address', response.content.decode('utf-8'))

        response = self.client.post(reverse('sysadmin'),
                                    {'action': 'del_user',
                                     'student_uname': 'flabbergast',
                                     'student_fullname': 'enigma jones', })
        self.assertIn('Cannot find user with username', response.content.decode('utf-8'))

        self.client.post(reverse('sysadmin'),
                         {'action': 'del_user',
                          'student_uname': 'test_cuser+sysadmin@edx.org',
                          'student_fullname': 'test cuser', })

        self.assertEqual(0, len(User.objects.filter(
            username='test_cuser+sysadmin@edx.org',
            email='test_cuser+sysadmin@edx.org')))

        self.assertEqual(1, len(User.objects.all()))

    def test_user_csv(self):
        """Download and validate user CSV"""

        num_test_users = 100
        self._setstaff_login()

        # Stuff full of users to test streaming
        for user_num in xrange(num_test_users):
            Users().create_user('testingman_with_long_name{}'.format(user_num),
                                'test test')

        response = self.client.post(reverse('sysadmin'),
                                    {'action': 'download_users', })

        self.assertIn('attachment', response['Content-Disposition'])
        self.assertEqual('text/csv', response['Content-Type'])
        self.assertIn('test_user', response.content)
        self.assertTrue(num_test_users + 2, len(response.content.splitlines()))

        # Clean up
        User.objects.filter(
            username__startswith='testingman_with_long_name').delete()

    @override_settings(FEATURES=FEATURES_WITH_SSL_AUTH)
    def test_authmap_repair(self):
        """Run authmap check and repair"""

        self._setstaff_login()

        Users().create_user('test0', 'test test')
        # Will raise exception, so no assert needed
        eamap = ExternalAuthMap.objects.get(external_name='test test')
        mitu = User.objects.get(username='test0')

        self.assertTrue(check_password(eamap.internal_password, mitu.password))
        mitu.set_password('not autogenerated')
        mitu.save()
        self.assertFalse(check_password(eamap.internal_password, mitu.password))

        # Create really non user AuthMap
        ExternalAuthMap(external_id='ll',
                        external_domain='ll',
                        external_credentials='{}',
                        external_email='a@b.c',
                        external_name='c',
                        internal_password='').save()

        response = self.client.post(reverse('sysadmin'),
                                    {'action': 'repair_eamap', })

        self.assertIn('{0} test0'.format('Failed in authenticating'),
                      response.content)
        self.assertIn('fixed password', response.content.decode('utf-8'))

        self.assertTrue(self.client.login(username='test0',
                                          password=eamap.internal_password))

        # Check for all OK
        self._setstaff_login()
        response = self.client.post(reverse('sysadmin'),
                                    {'action': 'repair_eamap', })
        self.assertIn('All ok!', response.content.decode('utf-8'))

    def test_xml_course_add_delete(self):
        """add and delete course from xml module store"""

        self._setstaff_login()

        # Try bad git repo
        response = self.client.post(reverse('sysadmin_courses'), {
            'repo_location': 'github.com/mitocw/edx4edx_lite',
            'action': 'add_course', })
        self.assertIn(_("The git repo location should end with '.git', "
                        "and be a valid url"), response.content.decode('utf-8'))

        response = self.client.post(reverse('sysadmin_courses'), {
            'repo_location': 'http://example.com/not_real.git',
            'action': 'add_course', })
        self.assertIn('Unable to clone or pull repository',
                      response.content.decode('utf-8'))
        # Create git loaded course
        response = self._add_edx4edx()

        def_ms = modulestore()
        self.assertIn('xml', str(def_ms.__class__))
        course = def_ms.courses.get('{0}/edx4edx_lite'.format(
            os.path.abspath(settings.DATA_DIR)), None)
        self.assertIsNotNone(course)

        # Delete a course
        self._rm_edx4edx()
        course = def_ms.courses.get('{0}/edx4edx_lite'.format(
            os.path.abspath(settings.DATA_DIR)), None)
        self.assertIsNone(course)

        # Load a bad git branch
        response = self._add_edx4edx('asdfasdfasdf')
        self.assertIn(GitImportError.REMOTE_BRANCH_MISSING,
                      response.content.decode('utf-8'))

        # Load a course from a git branch
        self._add_edx4edx(self.TEST_BRANCH)
        course = def_ms.courses.get('{0}/edx4edx_lite'.format(
            os.path.abspath(settings.DATA_DIR)), None)
        self.assertIsNotNone(course)
        self.assertEqual(self.TEST_BRANCH_COURSE, course.id)
        self._rm_edx4edx()

        # Try and delete a non-existent course
        response = self.client.post(reverse('sysadmin_courses'),
                                    {'course_id': 'foobar/foo/blah',
                                     'action': 'del_course', })
        self.assertIn('Error - cannot get course with ID',
                      response.content.decode('utf-8'))

    @override_settings(GIT_IMPORT_WITH_XMLMODULESTORE=False)
    def test_xml_safety_flag(self):
        """Make sure the settings flag to disable xml imports is working"""

        self._setstaff_login()
        response = self._add_edx4edx()
        self.assertIn('GIT_IMPORT_WITH_XMLMODULESTORE', response.content)

        def_ms = modulestore()
        course = def_ms.courses.get('{0}/edx4edx_lite'.format(
            os.path.abspath(settings.DATA_DIR)), None)
        self.assertIsNone(course)

    def test_git_pull(self):
        """Make sure we can pull"""

        self._setstaff_login()

        response = self._add_edx4edx()
        response = self._add_edx4edx()
        self.assertIn(_("The course {0} already exists in the data directory! "
                        "(reloading anyway)").format('edx4edx_lite'),
                      response.content.decode('utf-8'))
        self._rm_edx4edx()

    def test_staff_csv(self):
        """Download and validate staff CSV"""

        self._setstaff_login()
        self._add_edx4edx()

        def_ms = modulestore()
        course = def_ms.get_course(SlashSeparatedCourseKey('MITx', 'edx4edx', 'edx4edx'))
        CourseStaffRole(course.id).add_users(self.user)

        response = self.client.post(reverse('sysadmin_staffing'),
                                    {'action': 'get_staff_csv', })
        self.assertIn('attachment', response['Content-Disposition'])
        self.assertEqual('text/csv', response['Content-Type'])
        columns = ['course_id', 'role', 'username',
                   'email', 'full_name', ]
        self.assertIn(','.join('"' + c + '"' for c in columns),
                      response.content)

        self._rm_edx4edx()

    def test_enrollment_page(self):
        """
        Adds a course and makes sure that it shows up on the staffing and
        enrollment page
        """

        self._setstaff_login()
        self._add_edx4edx()
        response = self.client.get(reverse('sysadmin_staffing'))
        self.assertIn('edx4edx', response.content)
        self._rm_edx4edx()


@override_settings(MONGODB_LOG=TEST_MONGODB_LOG)
@unittest.skipUnless(settings.FEATURES.get('ENABLE_SYSADMIN_DASHBOARD'),
                     "ENABLE_SYSADMIN_DASHBOARD not set")
class TestSysAdminMongoCourseImport(SysadminBaseTestCase):
    """
    Check that importing into the mongo module store works
    """

    @classmethod
    def tearDownClass(cls):
        """Delete mongo log entries after test."""
        super(TestSysAdminMongoCourseImport, cls).tearDownClass()
        try:
            mongoengine.connect(TEST_MONGODB_LOG['db'])
            CourseImportLog.objects.all().delete()
        except mongoengine.connection.ConnectionError:
            pass

    def _setstaff_login(self):
        """
        Makes the test user staff and logs them in
        """

        self.user.is_staff = True
        self.user.save()

        self.client.login(username=self.user.username, password='foo')

    def test_missing_repo_dir(self):
        """
        Ensure that we handle a missing repo dir
        """

        self._setstaff_login()

        if os.path.isdir(getattr(settings, 'GIT_REPO_DIR')):
            shutil.rmtree(getattr(settings, 'GIT_REPO_DIR'))

        # Create git loaded course
        response = self._add_edx4edx()
        self.assertIn(GitImportError.NO_DIR,
                      response.content.decode('UTF-8'))

    def test_mongo_course_add_delete(self):
        """
        This is the same as TestSysadmin.test_xml_course_add_delete,
        but it uses a mongo store
        """

        self._setstaff_login()
        self._mkdir(getattr(settings, 'GIT_REPO_DIR'))

        def_ms = modulestore()
        self.assertFalse(isinstance(def_ms, XMLModuleStore))

        self._add_edx4edx()
        course = def_ms.get_course(SlashSeparatedCourseKey('MITx', 'edx4edx', 'edx4edx'))
        self.assertIsNotNone(course)

        self._rm_edx4edx()
        course = def_ms.get_course(SlashSeparatedCourseKey('MITx', 'edx4edx', 'edx4edx'))
        self.assertIsNone(course)

    def test_course_info(self):
        """
        Check to make sure we are getting git info for courses
        """
        # Regex of first 3 columns of course information table row for
        # test course loaded from git. Would not have sha1 if
        # git_info_for_course failed.
        table_re = re.compile(r"""
            <tr>\s+
            <td>edX\sAuthor\sCourse</td>\s+  # expected test git course name
            <td>MITx/edx4edx/edx4edx</td>\s+  # expected test git course_id
            <td>[a-fA-F\d]{40}</td>  # git sha1 hash
        """, re.VERBOSE)

        self._setstaff_login()
        self._mkdir(getattr(settings, 'GIT_REPO_DIR'))

        # Make sure we don't have any git hashes on the page
        response = self.client.get(reverse('sysadmin_courses'))
        self.assertNotRegexpMatches(response.content, table_re)

        # Now add the course and make sure it does match
        response = self._add_edx4edx()
        self.assertRegexpMatches(response.content, table_re)

    def test_gitlogs(self):
        """
        Create a log entry and make sure it exists
        """

        self._setstaff_login()
        self._mkdir(getattr(settings, 'GIT_REPO_DIR'))

        self._add_edx4edx()
        response = self.client.get(reverse('gitlogs'))

        # Check that our earlier import has a log with a link to details
        self.assertIn('/gitlogs/MITx/edx4edx/edx4edx', response.content)

        response = self.client.get(
            reverse('gitlogs_detail', kwargs={
                'course_id': 'MITx/edx4edx/edx4edx'}))

        self.assertIn('======&gt; IMPORTING course',
                      response.content)

        self._rm_edx4edx()

    def test_gitlog_date(self):
        """
        Make sure the date is timezone-aware and being converted/formatted
        properly.
        """

        tz_names = [
            'America/New_York',  # UTC - 5
            'Asia/Pyongyang',    # UTC + 9
            'Europe/London',     # UTC
            'Canada/Yukon',      # UTC - 8
            'Europe/Moscow',     # UTC + 4
        ]
        tz_format = DEFAULT_DATE_TIME_FORMAT

        self._setstaff_login()
        self._mkdir(getattr(settings, 'GIT_REPO_DIR'))

        self._add_edx4edx()
        date = CourseImportLog.objects.first().created.replace(tzinfo=UTC)

        for timezone in tz_names:
            with (override_settings(TIME_ZONE=timezone)):
                date_text = get_time_display(date, tz_format, settings.TIME_ZONE)
                response = self.client.get(reverse('gitlogs'))
                self.assertIn(date_text, response.content.decode('UTF-8'))

        self._rm_edx4edx()

    def test_gitlog_bad_course(self):
        """
        Make sure we gracefully handle courses that don't exist.
        """
        self._setstaff_login()
        response = self.client.get(
            reverse('gitlogs_detail', kwargs={
                'course_id': 'Not/Real/Testing'}))
        self.assertEqual(404, response.status_code)

    def test_gitlog_no_logs(self):
        """
        Make sure the template behaves well when rendered despite there not being any logs.
        (This is for courses imported using methods other than the git_add_course command)
        """

        self._setstaff_login()
        self._mkdir(getattr(settings, 'GIT_REPO_DIR'))

        self._add_edx4edx()

        # Simulate a lack of git import logs
        import_logs = CourseImportLog.objects.all()
        import_logs.delete()

        response = self.client.get(
            reverse('gitlogs_detail', kwargs={
                'course_id': 'MITx/edx4edx/edx4edx'
            })
        )
        self.assertIn(
            'No git import logs have been recorded for this course.',
            response.content
        )

        self._rm_edx4edx()

    def test_gitlog_courseteam_access(self):
        """
        Ensure course team users are allowed to access only their own course.
        """

        self._mkdir(getattr(settings, 'GIT_REPO_DIR'))

        self._setstaff_login()
        self._add_edx4edx()
        self.user.is_staff = False
        self.user.save()
        logged_in = self.client.login(username=self.user.username,
                                      password='foo')
        response = self.client.get(reverse('gitlogs'))
        # Make sure our non privileged user doesn't have access to all logs
        self.assertEqual(response.status_code, 404)
        # Or specific logs
        response = self.client.get(reverse('gitlogs_detail', kwargs={
            'course_id': 'MITx/edx4edx/edx4edx'
        }))
        self.assertEqual(response.status_code, 404)

        # Add user as staff in course team
        def_ms = modulestore()
        course = def_ms.get_course(SlashSeparatedCourseKey('MITx', 'edx4edx', 'edx4edx'))
        CourseStaffRole(course.id).add_users(self.user)

        self.assertTrue(CourseStaffRole(course.id).has_user(self.user))
        logged_in = self.client.login(username=self.user.username,
                                      password='foo')
        self.assertTrue(logged_in)

        response = self.client.get(
            reverse('gitlogs_detail', kwargs={
                'course_id': 'MITx/edx4edx/edx4edx'
            }))
        self.assertIn('======&gt; IMPORTING course',
                      response.content)

        self._rm_edx4edx()