Commit e354efe9 by Mushtaq Ali

Re-encrypt transcript credentials - EDUCATOR-1490

parent 60c7bdc9
PEP8_THRESHOLD=46
PYLINT_THRESHOLD=761
PEP8_THRESHOLD=45
PYLINT_THRESHOLD=760
production-requirements:
pip install -r requirements.txt
......
"""
Management command used to re-encrypt transcript credentials data with new fernet key.
"""
import logging
from cryptography.fernet import InvalidToken
from django.core.management.base import BaseCommand
from django.db import transaction
from VEDA_OS01.models import TranscriptCredentials
from VEDA_OS01.utils import invalidate_fernet_cached_properties
LOGGER = logging.getLogger(__name__)
class Command(BaseCommand):
"""
Re-encrypt trancript credentials command class.
"""
help = 'Re-encrypts transcript credentials with new fernet key.'
def handle(self, *args, **options):
"""
handle method for command class.
"""
LOGGER.info('[Transcript credentials re-encryption] Process started.')
# Invalidate cached properties so that we get the latest keys
invalidate_fernet_cached_properties(TranscriptCredentials, ['api_key', 'api_secret'])
try:
with transaction.atomic():
# Call save on each credentials record so that re-encryption can be be performed on fernet fields.
for transcript_credential in TranscriptCredentials.objects.all():
transcript_credential.save()
LOGGER.info('[Transcript credentials re-encryption] Process completed.')
except InvalidToken:
LOGGER.exception(
'[Transcript credentials re-encryption] No valid fernet key present to decrypt. Process halted.'
)
"""
Tests of the re_encrypt_transcript_credentials management command.
"""
from cryptography.fernet import InvalidToken
from django.conf import settings
from django.core.management import call_command
from django.test import TestCase, override_settings
from mock import patch
from VEDA_OS01.models import TranscriptCredentials, TranscriptProvider
from VEDA_OS01.utils import invalidate_fernet_cached_properties
OLD_FERNET_KEYS_LIST = ['test-ferent-key']
class ReEncryptTranscriptCredentialsTests(TestCase):
"""
Management command test class.
"""
def setUp(self):
"""
Test setup.
"""
self.credentials_data = {
'org': 'MAx',
'provider': TranscriptProvider.THREE_PLAY,
'api_key': 'test-key',
'api_secret': 'test-secret'
}
TranscriptCredentials.objects.create(**self.credentials_data)
def tearDown(self):
"""
Test teardown.
"""
# Invalidate here so that every new test would have FERNET KEYS from test environment.
invalidate_fernet_cached_properties(TranscriptCredentials, ['api_key', 'api_secret'])
def verify_access_credentials(self):
"""
Fetches a record to check if we are able to get encrypted data.
Accessing object that is not able to be decrypted, would throw InvalidToken error.
"""
TranscriptCredentials.objects.get(
org=self.credentials_data['org'], provider=self.credentials_data['provider']
)
@patch('VEDA_OS01.management.commands.re_encrypt_transcript_credentials.LOGGER')
def test_reencrypt_transcript_credentials(self, mock_logger):
"""
Test transcript credentials are re-encrypted correctly.
"""
# Verify fernet keys.
self.assertEqual(settings.FERNET_KEYS, OLD_FERNET_KEYS_LIST)
# Verify we are able to access the record.
self.verify_access_credentials()
# Add a new key to the set
new_keys_set = ['new-fernet-key'] + settings.FERNET_KEYS
with override_settings(FERNET_KEYS=new_keys_set):
self.assertEqual(settings.FERNET_KEYS, new_keys_set)
# Run re-encryption process.
call_command('re_encrypt_transcript_credentials')
# Verify logging.
mock_logger.info.assert_called_with('[Transcript credentials re-encryption] Process completed.')
# Verify we are able to access the record.
self.verify_access_credentials()
@patch('VEDA_OS01.management.commands.re_encrypt_transcript_credentials.LOGGER')
def test_reencrypt_transcript_credentials_invalid_keys(self, mock_logger):
"""
Test transcript credentials would not be re-encrypted if an decryption key is not provided with which
data was encypted before.
"""
# Verify fernet keys.
self.assertEqual(settings.FERNET_KEYS, OLD_FERNET_KEYS_LIST)
# Verify we are able to access the record.
self.verify_access_credentials()
# Modify key set so that old key is not presnet in the key list. Note that now we are not providing
# a decryption key for data to be decrypted.
new_keys_set = ['new-fernet-key']
with override_settings(FERNET_KEYS=new_keys_set):
self.assertEqual(settings.FERNET_KEYS, new_keys_set)
# Run re-encryption process.
call_command('re_encrypt_transcript_credentials')
# Verify logging.
mock_logger.info.assert_called_with('[Transcript credentials re-encryption] Process started.')
mock_logger.exception.assert_called_with(
'[Transcript credentials re-encryption] No valid fernet key present to decrypt. Process halted.'
)
# Verify we are not able to access the record, we should get an error due to decryption key not present.
with self.assertRaises(InvalidToken):
self.verify_access_credentials()
""" Model tests """
from cryptography.fernet import InvalidToken
from django.conf import settings
from django.test import override_settings
from django.test.testcases import TransactionTestCase
from cryptography.fernet import InvalidToken
from VEDA_OS01.models import TranscriptCredentials, TranscriptProvider
from VEDA_OS01.utils import invalidate_fernet_cached_properties
class TranscriptCredentialsModelTest(TransactionTestCase):
......@@ -27,29 +29,8 @@ class TranscriptCredentialsModelTest(TransactionTestCase):
"""
Test teardown.
"""
# Invalidate here so that evry new test would have FERNET KEYS from tests.py initially.
self.invalidate_fernet_cached_properties()
def invalidate_fernet_cached_properties(self):
"""
Invalidates transcript credential fernet field's cached properties.
"""
def invalidate_fernet_cached_property(field_name):
"""
Invalidates fernet fields cached properties.
"""
field = TranscriptCredentials._meta.get_field(field_name)
if field.keys:
del field.keys
if field.fernet_keys:
del field.fernet_keys
if field.fernet:
del field.fernet
invalidate_fernet_cached_property('api_key')
invalidate_fernet_cached_property('api_secret')
# Invalidate here so that every new test would have FERNET KEYS from tests.py initially.
invalidate_fernet_cached_properties(TranscriptCredentials, ['api_key', 'api_secret'])
def test_decrypt(self):
"""
......@@ -72,7 +53,7 @@ class TranscriptCredentialsModelTest(TransactionTestCase):
new_keys_set = ['new-fernet-key'] + settings.FERNET_KEYS
# Invalidate cached properties so that we get the latest keys
self.invalidate_fernet_cached_properties()
invalidate_fernet_cached_properties(TranscriptCredentials, ['api_key', 'api_secret'])
with override_settings(FERNET_KEYS=new_keys_set):
self.assertEqual(settings.FERNET_KEYS, new_keys_set)
......@@ -92,7 +73,7 @@ class TranscriptCredentialsModelTest(TransactionTestCase):
new_keys_set = ['new-fernet-key']
# Invalidate cached properties so that we get the latest keys
self.invalidate_fernet_cached_properties()
invalidate_fernet_cached_properties(TranscriptCredentials, ['api_key', 'api_secret'])
with override_settings(FERNET_KEYS=new_keys_set):
self.assertEqual(settings.FERNET_KEYS, new_keys_set)
......
"""
Tests common utils
"""
from unittest import TestCase
from ddt import data, ddt, unpack
from django.conf import settings
from django.test import override_settings
from mock import MagicMock, Mock
from unittest import TestCase
from VEDA_OS01 import utils
from VEDA_OS01.models import TranscriptCredentials
OLD_FERNET_KEYS_LIST = ['test-ferent-key']
@ddt
......@@ -29,3 +36,28 @@ class UtilTests(TestCase):
# Assert the status and call to edx-val api method.
self.assertEqual(val_api_client.update_video_status.called, update_val_status)
self.assertEqual(video.transcript_status, status)
def test_invalidate_fernet_cached_properties(self):
"""
Tests that fernet field properties are properly invalidated.
"""
def verify_model_field_keys(model, field_name, expected_keys_list):
"""
Verifies cached property keys has expected keys list.
"""
field = model._meta.get_field(field_name)
# Verify keys are properly set and fetched.
self.assertEqual(field.keys, expected_keys_list)
self.assertEqual(settings.FERNET_KEYS, OLD_FERNET_KEYS_LIST)
verify_model_field_keys(TranscriptCredentials, 'api_key', OLD_FERNET_KEYS_LIST)
# Invalidate cached properties.
utils.invalidate_fernet_cached_properties(TranscriptCredentials, ['api_key'])
# Prepend a new key.
new_keys_set = ['new-fernet-key'] + settings.FERNET_KEYS
with override_settings(FERNET_KEYS=new_keys_set):
self.assertEqual(settings.FERNET_KEYS, new_keys_set)
verify_model_field_keys(TranscriptCredentials, 'api_key', new_keys_set)
......@@ -39,3 +39,21 @@ def update_video_status(val_api_client, video, status):
# update edx-video-pipeline's video status
video.transcript_status = status
video.save()
def invalidate_fernet_cached_properties(model, fields):
"""
Invalidates transcript credential fernet field's cached properties.
Arguments:
model (class): Model class containing fernet fields.
fields (list): A list of fernet fields whose cache is to be invalidated.
"""
for field_name in fields:
try:
field = model._meta.get_field(field_name)
del field.keys
del field.fernet_keys
del field.fernet
except AttributeError:
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment