Commit 21a1c2f2 by Brian Wilson

Implement gpg encryption tasks.

* ContextManager method
* Example MultiOutputReducerTask to encrypt all output files by org.
* Unit test for ContextManager method.

Change-Id: I45fbc83e0d066fd8a0c9e7bb792bcb1fb50e5110
parent f694e7fe
"""
Tasks for performing encryption on export files.
"""
from contextlib import contextmanager
import logging
import tempfile
import gnupg
import luigi
import yaml
from edx.analytics.tasks.mapreduce import MultiOutputMapReduceJobTask
from edx.analytics.tasks.url import get_target_from_url, url_path_join, ExternalURL
from edx.analytics.tasks.util.tempdir import make_temp_directory
log = logging.getLogger(__name__)
@contextmanager
def make_encrypted_file(output_file, key_file_targets, recipients):
"""Creates a file object to be written to, whose contents will afterwards be encrypted."""
with make_temp_directory(prefix="encrypt") as temp_dir:
# Use temp directory to hold gpg keys.
gpg = gnupg.GPG(gnupghome=temp_dir)
_import_key_files(gpg, key_file_targets)
# Create a temp file to contain the unencrypted output, in the same temp directory.
with tempfile.NamedTemporaryFile(dir=temp_dir, delete=False) as temp_input_file:
temp_input_filepath = temp_input_file.name
yield temp_input_file
# Encryption produces a second file in the same temp directory.
temp_encrypted_filepath = "{filepath}.gpg".format(filepath=temp_input_filepath)
with open(temp_input_filepath, 'r') as temp_input_file:
_encrypt_file(gpg, temp_input_file, temp_encrypted_filepath, recipients)
_copy_file_to_open_file(temp_encrypted_filepath, output_file)
def _import_key_files(gpg_instance, key_file_targets):
"""
Load key-file targets into the GPG instance.
This writes files in the home directory of the instance.
"""
for key_file_target in key_file_targets:
log.info("Importing keyfile from %s", key_file_target.path)
with key_file_target.open('r') as gpg_key_file:
gpg_instance.import_keys(gpg_key_file.read())
def _encrypt_file(gpg_instance, input_file, encrypted_filepath, recipients):
"""Encrypts a given file open for read, and writes result to a file."""
gpg_instance.encrypt_file(
input_file,
recipients,
always_trust=True,
output=encrypted_filepath,
armor=False,
)
def _copy_file_to_open_file(filepath, output_file):
"""Copies a filepath to a file object already opened for writing."""
with open(filepath, 'r') as src_file:
while True:
transfer_buffer = src_file.read(1024)
if transfer_buffer:
output_file.write(transfer_buffer)
else:
break
class FakeEventExportWithEncryptionTask(MultiOutputMapReduceJobTask):
"""Example class to demonstrate use of encryption of files for export from multi-output."""
source = luigi.Parameter()
config = luigi.Parameter()
# TODO: these parameters could be moved into the config file.
gpg_key_dir = luigi.Parameter()
gpg_master_key = luigi.Parameter(default=None)
def init_reducer(self):
self._get_organization_info()
def requires(self):
return {
'source': ExternalURL(self.source),
'config': ExternalURL(self.config),
}
def requires_local(self):
return self.requires()['config']
def requires_hadoop(self):
return self.requires()['source']
def mapper(self, line):
org_id = "edx"
server_id = "prod-edxapp-011"
yield (org_id, server_id), line
def extra_modules(self):
return [gnupg, yaml]
def output_path_for_key(self, key):
org_id, server_id = key
return url_path_join(self.output_root, org_id, server_id, 'tracking.log.gpg')
def multi_output_reducer(self, key, values, output_file):
org_id, _server_id = key
recipients = self._get_recipients(org_id)
key_file_targets = [get_target_from_url(url_path_join(self.gpg_key_dir, recipient)) for recipient in recipients]
with make_encrypted_file(output_file, key_file_targets, recipients) as encrypted_output_file:
for value in values:
encrypted_output_file.write(value)
encrypted_output_file.write('\n')
def _get_organization_info(self):
"""Get the organization configuration from the configuration yaml file."""
with self.input()['config'].open() as config_input:
config_data = yaml.load(config_input)
self.organizations = config_data['organizations'] # pylint: disable=attribute-defined-outside-init
def _get_recipients(self, org_id):
"""Get the correct recipients for the specified organization."""
recipients = [self.organizations[org_id]['recipient']]
if self.gpg_master_key is not None:
recipients.append(self.gpg_master_key)
return recipients
"""Tests of utilities to encrypt files."""
import gnupg
import tempfile
from edx.analytics.tasks.encrypt import make_encrypted_file, _import_key_files
from edx.analytics.tasks.tests import unittest
from edx.analytics.tasks.url import get_target_from_url
from edx.analytics.tasks.url import url_path_join
from edx.analytics.tasks.util.tempdir import make_temp_directory
class MakeEncryptedFileTest(unittest.TestCase):
"""Test make_encrypted_file context manager."""
def get_decrypted_data(self, input_file, key_file_target):
"""Decrypts contents of input, and writes to output file object open for writing."""
with make_temp_directory(prefix="decrypt") as temp_dir:
# Use temp directory to hold gpg keys.
gpg_instance = gnupg.GPG(gnupghome=temp_dir)
_import_key_files(gpg_instance, [key_file_target])
decrypted_data = gpg_instance.decrypt_file(input_file, always_trust=True)
return decrypted_data
def test_make_encrypted_file(self):
recipient = 'daemon@edx.org'
gpg_key_dir = 'gpg-keys'
key_file_targets = [get_target_from_url(url_path_join(gpg_key_dir, recipient))]
values = ['this', 'is', 'a', 'test']
with tempfile.NamedTemporaryFile() as output_file:
with make_encrypted_file(output_file, key_file_targets, [recipient]) as encrypted_output_file:
for value in values:
encrypted_output_file.write(value)
encrypted_output_file.write('\n')
output_file.seek(0)
# Decrypt the file and compare.
recipient_private_key = 'insecure_secret.key'
key_file_target = get_target_from_url(url_path_join(gpg_key_dir, recipient_private_key))
decrypted_data = self.get_decrypted_data(output_file, key_file_target)
self.assertEquals(values, str(decrypted_data).strip().split('\n'))
...@@ -282,7 +282,6 @@ class UsersPerCountryReportTestCase(unittest.TestCase): ...@@ -282,7 +282,6 @@ class UsersPerCountryReportTestCase(unittest.TestCase):
self.assertTrue(line.startswith('0.')) self.assertTrue(line.startswith('0.'))
class UsersPerCountryReportWorkflowTestCase(BaseUserLocationEventTestCase): class UsersPerCountryReportWorkflowTestCase(BaseUserLocationEventTestCase):
"""Tests of UsersPerCountryReportWorkflow.""" """Tests of UsersPerCountryReportWorkflow."""
......
"""Utility methods to help with temp-file management."""
import atexit
from contextlib import contextmanager
import os.path
import shutil
import tempfile
@contextmanager
def make_temp_directory(suffix='', prefix='tmp', dir=None): # pylint: disable=redefined-builtin
"""Creates a temp directory and makes sure to clean it up."""
# create secure temp directory
temp_dir = tempfile.mkdtemp(suffix, prefix, dir)
# and delete it at exit
def clean_dir():
"""Delete the temp directory."""
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
# make sure the directory is deleted, even if interrupted
atexit.register(clean_dir)
try:
yield temp_dir
finally:
clean_dir()
...@@ -9,6 +9,8 @@ pandas==0.13.0 ...@@ -9,6 +9,8 @@ pandas==0.13.0
pbr==0.5.23 pbr==0.5.23
pygeoip==0.3.1 pygeoip==0.3.1
python-cjson==1.0.5 python-cjson==1.0.5
python-gnupg==0.3.6
pyyaml==3.10
stevedore==0.14.1 stevedore==0.14.1
tornado==3.1.1 tornado==3.1.1
git+https://github.com/spotify/luigi.git@a33756c781b9bf7e51384f0eb19d6a25050ef136#egg=luigi git+https://github.com/spotify/luigi.git@a33756c781b9bf7e51384f0eb19d6a25050ef136#egg=luigi
...@@ -33,6 +33,7 @@ edx.analytics.tasks = ...@@ -33,6 +33,7 @@ edx.analytics.tasks =
dump-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseTask dump-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseTask
export-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseAfterImportWorkflow export-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseAfterImportWorkflow
last-country = edx.analytics.tasks.user_location:LastCountryForEachUser last-country = edx.analytics.tasks.user_location:LastCountryForEachUser
encrypt-exports = edx.analytics.tasks.encrypt:FakeEventExportWithEncryptionTask
mapreduce.engine = mapreduce.engine =
hadoop = edx.analytics.tasks.mapreduce:MapReduceJobRunner hadoop = edx.analytics.tasks.mapreduce:MapReduceJobRunner
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment