Commit af4e49dd by Brian Wilson

Make recipients optional when encrypting files.

If no recipients are defined, use all loaded keys.

Change-Id: I0e465f83ee2537d1aabf8608da1fc5b143374ce9
parent 21a1c2f2
......@@ -17,8 +17,15 @@ log = logging.getLogger(__name__)
@contextmanager
def make_encrypted_file(output_file, key_file_targets, recipients):
"""Creates a file object to be written to, whose contents will afterwards be encrypted."""
def make_encrypted_file(output_file, key_file_targets, recipients=None):
"""
Creates a file object to be written to, whose contents will afterwards be encrypted.
Parameters:
output_file: a file object, opened for writing.
key_file_targets: a list of luigi.Target objects defining the gpg public key files to be loaded.
recipients: an optional list of recipients to be loaded. If not specified, uses all loaded keys.
"""
with make_temp_directory(prefix="encrypt") as temp_dir:
# Use temp directory to hold gpg keys.
gpg = gnupg.GPG(gnupghome=temp_dir)
......@@ -31,6 +38,8 @@ def make_encrypted_file(output_file, key_file_targets, recipients):
# Encryption produces a second file in the same temp directory.
temp_encrypted_filepath = "{filepath}.gpg".format(filepath=temp_input_filepath)
if recipients is None:
recipients = [key['keyid'] for key in gpg.list_keys()]
with open(temp_input_filepath, 'r') as temp_input_file:
_encrypt_file(gpg, temp_input_file, temp_encrypted_filepath, recipients)
_copy_file_to_open_file(temp_encrypted_filepath, output_file)
......@@ -109,7 +118,7 @@ class FakeEventExportWithEncryptionTask(MultiOutputMapReduceJobTask):
org_id, _server_id = key
recipients = self._get_recipients(org_id)
key_file_targets = [get_target_from_url(url_path_join(self.gpg_key_dir, recipient)) for recipient in recipients]
with make_encrypted_file(output_file, key_file_targets, recipients) as encrypted_output_file:
with make_encrypted_file(output_file, key_file_targets) as encrypted_output_file:
for value in values:
encrypted_output_file.write(value)
encrypted_output_file.write('\n')
......
......@@ -13,6 +13,12 @@ from edx.analytics.tasks.util.tempdir import make_temp_directory
class MakeEncryptedFileTest(unittest.TestCase):
"""Test make_encrypted_file context manager."""
def setUp(self):
self.recipient = 'daemon@edx.org'
self.gpg_key_dir = 'gpg-keys'
self.recipient_private_key = 'insecure_secret.key'
self.key_file_targets = [get_target_from_url(url_path_join(self.gpg_key_dir, self.recipient))]
def get_decrypted_data(self, input_file, key_file_target):
"""Decrypts contents of input, and writes to output file object open for writing."""
with make_temp_directory(prefix="decrypt") as temp_dir:
......@@ -22,21 +28,30 @@ class MakeEncryptedFileTest(unittest.TestCase):
decrypted_data = gpg_instance.decrypt_file(input_file, always_trust=True)
return decrypted_data
def check_encrypted_data(self, encrypted_file, expected_values):
"""Decrypt the file and compare against expected values."""
key_file_target = get_target_from_url(url_path_join(self.gpg_key_dir, self.recipient_private_key))
decrypted_data = self.get_decrypted_data(encrypted_file, key_file_target)
self.assertEquals(str(decrypted_data).strip().split('\n'), expected_values)
def test_make_encrypted_file(self):
recipient = 'daemon@edx.org'
gpg_key_dir = 'gpg-keys'
key_file_targets = [get_target_from_url(url_path_join(gpg_key_dir, recipient))]
values = ['this', 'is', 'a', 'test']
with tempfile.NamedTemporaryFile() as output_file:
with make_encrypted_file(output_file, key_file_targets, [recipient]) as encrypted_output_file:
with make_encrypted_file(output_file, self.key_file_targets, [self.recipient]) as encrypted_output_file:
for value in values:
encrypted_output_file.write(value)
encrypted_output_file.write('\n')
output_file.seek(0)
self.check_encrypted_data(output_file, values)
# Decrypt the file and compare.
recipient_private_key = 'insecure_secret.key'
key_file_target = get_target_from_url(url_path_join(gpg_key_dir, recipient_private_key))
decrypted_data = self.get_decrypted_data(output_file, key_file_target)
self.assertEquals(values, str(decrypted_data).strip().split('\n'))
def test_make_encrypted_file_with_implied_recipients(self):
values = ['this', 'is', 'a', 'test']
with tempfile.NamedTemporaryFile() as output_file:
with make_encrypted_file(output_file, self.key_file_targets) as encrypted_output_file:
for value in values:
encrypted_output_file.write(value)
encrypted_output_file.write('\n')
output_file.seek(0)
self.check_encrypted_data(output_file, values)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment