Commit ca28796a by David Ormsbee

Merge pull request #1345 from MITx/feature/apenney/pearson-merge

Pearson export/import code. (No merging before 25th Jan)
parents 72c9d9f7 a5477855
import csv import csv
import os
from collections import OrderedDict from collections import OrderedDict
from datetime import datetime from datetime import datetime
from os.path import isdir
from optparse import make_option from optparse import make_option
from django.core.management.base import BaseCommand from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from student.models import TestCenterUser from student.models import TestCenterUser
class Command(BaseCommand): class Command(BaseCommand):
CSV_TO_MODEL_FIELDS = OrderedDict([ CSV_TO_MODEL_FIELDS = OrderedDict([
...@@ -37,39 +39,48 @@ class Command(BaseCommand): ...@@ -37,39 +39,48 @@ class Command(BaseCommand):
("LastUpdate", "user_updated_at"), # in UTC, so same as what we store ("LastUpdate", "user_updated_at"), # in UTC, so same as what we store
]) ])
# define defaults, even thought 'store_true' shouldn't need them.
# (call_command will set None as default value for all options that don't have one,
# so one cannot rely on presence/absence of flags in that world.)
option_list = BaseCommand.option_list + ( option_list = BaseCommand.option_list + (
make_option( make_option('--dest-from-settings',
'--dump_all',
action='store_true', action='store_true',
dest='dump_all', dest='dest-from-settings',
), default=False,
help='Retrieve the destination to export to from django.'),
make_option('--destination',
action='store',
dest='destination',
default=None,
help='Where to store the exported files')
) )
args = '<output_file_or_dir>' def handle(self, **options):
help = """
Export user demographic information from TestCenterUser model into a tab delimited
text file with a format that Pearson expects.
"""
def handle(self, *args, **kwargs):
if len(args) < 1:
print Command.help
return
# update time should use UTC in order to be comparable to the user_updated_at # update time should use UTC in order to be comparable to the user_updated_at
# field # field
uploaded_at = datetime.utcnow() uploaded_at = datetime.utcnow()
# if specified destination is an existing directory, then # if specified destination is an existing directory, then
# create a filename for it automatically. If it doesn't exist, # create a filename for it automatically. If it doesn't exist,
# or exists as a file, then we will just write to it. # then we will create the directory.
# Name will use timestamp -- this is UTC, so it will look funny, # Name will use timestamp -- this is UTC, so it will look funny,
# but it should at least be consistent with the other timestamps # but it should at least be consistent with the other timestamps
# used in the system. # used in the system.
dest = args[0] if 'dest-from-settings' in options and options['dest-from-settings']:
if isdir(dest): if 'LOCAL_EXPORT' in settings.PEARSON:
destfile = os.path.join(dest, uploaded_at.strftime("cdd-%Y%m%d-%H%M%S.dat")) dest = settings.PEARSON['LOCAL_EXPORT']
else: else:
destfile = dest raise CommandError('--dest-from-settings was enabled but the'
'PEARSON[LOCAL_EXPORT] setting was not set.')
elif 'destination' in options and options['destination']:
dest = options['destination']
else:
raise CommandError('--destination or --dest-from-settings must be used')
if not os.path.isdir(dest):
os.makedirs(dest)
destfile = os.path.join(dest, uploaded_at.strftime("cdd-%Y%m%d-%H%M%S.dat"))
# strings must be in latin-1 format. CSV parser will # strings must be in latin-1 format. CSV parser will
# otherwise convert unicode objects to ascii. # otherwise convert unicode objects to ascii.
...@@ -79,7 +90,7 @@ class Command(BaseCommand): ...@@ -79,7 +90,7 @@ class Command(BaseCommand):
else: else:
return value return value
dump_all = kwargs['dump_all'] # dump_all = options['dump_all']
with open(destfile, "wb") as outfile: with open(destfile, "wb") as outfile:
writer = csv.DictWriter(outfile, writer = csv.DictWriter(outfile,
...@@ -89,7 +100,7 @@ class Command(BaseCommand): ...@@ -89,7 +100,7 @@ class Command(BaseCommand):
extrasaction='ignore') extrasaction='ignore')
writer.writeheader() writer.writeheader()
for tcu in TestCenterUser.objects.order_by('id'): for tcu in TestCenterUser.objects.order_by('id'):
if dump_all or tcu.needs_uploading: if tcu.needs_uploading: # or dump_all
record = dict((csv_field, ensure_encoding(getattr(tcu, model_field))) record = dict((csv_field, ensure_encoding(getattr(tcu, model_field)))
for csv_field, model_field for csv_field, model_field
in Command.CSV_TO_MODEL_FIELDS.items()) in Command.CSV_TO_MODEL_FIELDS.items())
...@@ -97,6 +108,3 @@ class Command(BaseCommand): ...@@ -97,6 +108,3 @@ class Command(BaseCommand):
writer.writerow(record) writer.writerow(record)
tcu.uploaded_at = uploaded_at tcu.uploaded_at = uploaded_at
tcu.save() tcu.save()
import csv import csv
import os
from collections import OrderedDict from collections import OrderedDict
from datetime import datetime from datetime import datetime
from os.path import isdir, join
from optparse import make_option from optparse import make_option
from django.core.management.base import BaseCommand from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from student.models import TestCenterRegistration, ACCOMMODATION_REJECTED_CODE
from student.models import TestCenterRegistration
class Command(BaseCommand): class Command(BaseCommand):
...@@ -23,48 +25,57 @@ class Command(BaseCommand): ...@@ -23,48 +25,57 @@ class Command(BaseCommand):
("LastUpdate", "user_updated_at"), # in UTC, so same as what we store ("LastUpdate", "user_updated_at"), # in UTC, so same as what we store
]) ])
args = '<output_file_or_dir>'
help = """
Export user registration information from TestCenterRegistration model into a tab delimited
text file with a format that Pearson expects.
"""
option_list = BaseCommand.option_list + ( option_list = BaseCommand.option_list + (
make_option( make_option('--dest-from-settings',
'--dump_all', action='store_true',
dest='dest-from-settings',
default=False,
help='Retrieve the destination to export to from django.'),
make_option('--destination',
action='store',
dest='destination',
default=None,
help='Where to store the exported files'),
make_option('--dump_all',
action='store_true', action='store_true',
dest='dump_all', dest='dump_all',
default=False,
), ),
make_option( make_option('--force_add',
'--force_add',
action='store_true', action='store_true',
dest='force_add', dest='force_add',
default=False,
), ),
) )
def handle(self, **options):
def handle(self, *args, **kwargs):
if len(args) < 1:
print Command.help
return
# update time should use UTC in order to be comparable to the user_updated_at # update time should use UTC in order to be comparable to the user_updated_at
# field # field
uploaded_at = datetime.utcnow() uploaded_at = datetime.utcnow()
# if specified destination is an existing directory, then # if specified destination is an existing directory, then
# create a filename for it automatically. If it doesn't exist, # create a filename for it automatically. If it doesn't exist,
# or exists as a file, then we will just write to it. # then we will create the directory.
# Name will use timestamp -- this is UTC, so it will look funny, # Name will use timestamp -- this is UTC, so it will look funny,
# but it should at least be consistent with the other timestamps # but it should at least be consistent with the other timestamps
# used in the system. # used in the system.
dest = args[0] if 'dest-from-settings' in options and options['dest-from-settings']:
if isdir(dest): if 'LOCAL_EXPORT' in settings.PEARSON:
destfile = join(dest, uploaded_at.strftime("ead-%Y%m%d-%H%M%S.dat")) dest = settings.PEARSON['LOCAL_EXPORT']
else:
raise CommandError('--dest-from-settings was enabled but the'
'PEARSON[LOCAL_EXPORT] setting was not set.')
elif 'destination' in options and options['destination']:
dest = options['destination']
else: else:
destfile = dest raise CommandError('--destination or --dest-from-settings must be used')
dump_all = kwargs['dump_all'] if not os.path.isdir(dest):
os.makedirs(dest)
destfile = os.path.join(dest, uploaded_at.strftime("ead-%Y%m%d-%H%M%S.dat"))
dump_all = options['dump_all']
with open(destfile, "wb") as outfile: with open(destfile, "wb") as outfile:
writer = csv.DictWriter(outfile, writer = csv.DictWriter(outfile,
...@@ -81,13 +92,11 @@ class Command(BaseCommand): ...@@ -81,13 +92,11 @@ class Command(BaseCommand):
record["LastUpdate"] = record["LastUpdate"].strftime("%Y/%m/%d %H:%M:%S") record["LastUpdate"] = record["LastUpdate"].strftime("%Y/%m/%d %H:%M:%S")
record["EligibilityApptDateFirst"] = record["EligibilityApptDateFirst"].strftime("%Y/%m/%d") record["EligibilityApptDateFirst"] = record["EligibilityApptDateFirst"].strftime("%Y/%m/%d")
record["EligibilityApptDateLast"] = record["EligibilityApptDateLast"].strftime("%Y/%m/%d") record["EligibilityApptDateLast"] = record["EligibilityApptDateLast"].strftime("%Y/%m/%d")
if kwargs['force_add']: if record["Accommodations"] == ACCOMMODATION_REJECTED_CODE:
record["Accommodations"] = ""
if options['force_add']:
record['AuthorizationTransactionType'] = 'Add' record['AuthorizationTransactionType'] = 'Add'
writer.writerow(record) writer.writerow(record)
tcr.uploaded_at = uploaded_at tcr.uploaded_at = uploaded_at
tcr.save() tcr.save()
import csv
from zipfile import ZipFile, is_zipfile
from time import strptime, strftime
from collections import OrderedDict
from datetime import datetime
from os.path import isdir
from optparse import make_option
from dogapi import dog_http_api, dog_stats_api
from django.core.management.base import BaseCommand, CommandError
from django.conf import settings
from student.models import TestCenterUser, TestCenterRegistration
class Command(BaseCommand):
dog_http_api.api_key = settings.DATADOG_API
args = '<input zip file>'
help = """
Import Pearson confirmation files and update TestCenterUser
and TestCenterRegistration tables with status.
"""
@staticmethod
def datadog_error(string, tags):
dog_http_api.event("Pearson Import", string, alert_type='error', tags=[tags])
def handle(self, *args, **kwargs):
if len(args) < 1:
print Command.help
return
source_zip = args[0]
if not is_zipfile(source_zip):
error = "Input file is not a zipfile: \"{}\"".format(source_zip)
Command.datadog_error(error, source_zip)
raise CommandError(error)
# loop through all files in zip, and process them based on filename prefix:
with ZipFile(source_zip, 'r') as zipfile:
for fileinfo in zipfile.infolist():
with zipfile.open(fileinfo) as zipentry:
if fileinfo.filename.startswith("eac-"):
self.process_eac(zipentry)
elif fileinfo.filename.startswith("vcdc-"):
self.process_vcdc(zipentry)
else:
error = "Unrecognized confirmation file type\"{}\" in confirmation zip file \"{}\"".format(fileinfo.filename, zipfile)
Command.datadog_error(error, source_zip)
raise CommandError(error)
def process_eac(self, eacfile):
print "processing eac"
reader = csv.DictReader(eacfile, delimiter="\t")
for row in reader:
client_authorization_id = row['ClientAuthorizationID']
if not client_authorization_id:
if row['Status'] == 'Error':
Command.datadog_error("Error in EAD file processing ({}): {}".format(row['Date'], row['Message']), eacfile.name)
else:
Command.datadog_error("Encountered bad record: {}".format(row), eacfile.name)
else:
try:
registration = TestCenterRegistration.objects.get(client_authorization_id=client_authorization_id)
Command.datadog_error("Found authorization record for user {}".format(registration.testcenter_user.user.username), eacfile)
# now update the record:
registration.upload_status = row['Status']
registration.upload_error_message = row['Message']
try:
registration.processed_at = strftime('%Y-%m-%d %H:%M:%S', strptime(row['Date'], '%Y/%m/%d %H:%M:%S'))
except ValueError as ve:
Command.datadog_error("Bad Date value found for {}: message {}".format(client_authorization_id, ve), eacfile.name)
# store the authorization Id if one is provided. (For debugging)
if row['AuthorizationID']:
try:
registration.authorization_id = int(row['AuthorizationID'])
except ValueError as ve:
Command.datadog_error("Bad AuthorizationID value found for {}: message {}".format(client_authorization_id, ve), eacfile.name)
registration.confirmed_at = datetime.utcnow()
registration.save()
except TestCenterRegistration.DoesNotExist:
Command.datadog_error("Failed to find record for client_auth_id {}".format(client_authorization_id), eacfile.name)
def process_vcdc(self, vcdcfile):
print "processing vcdc"
reader = csv.DictReader(vcdcfile, delimiter="\t")
for row in reader:
client_candidate_id = row['ClientCandidateID']
if not client_candidate_id:
if row['Status'] == 'Error':
Command.datadog_error("Error in CDD file processing ({}): {}".format(row['Date'], row['Message']), vcdcfile.name)
else:
Command.datadog_error("Encountered bad record: {}".format(row), vcdcfile.name)
else:
try:
tcuser = TestCenterUser.objects.get(client_candidate_id=client_candidate_id)
Command.datadog_error("Found demographics record for user {}".format(tcuser.user.username), vcdcfile.name)
# now update the record:
tcuser.upload_status = row['Status']
tcuser.upload_error_message = row['Message']
try:
tcuser.processed_at = strftime('%Y-%m-%d %H:%M:%S', strptime(row['Date'], '%Y/%m/%d %H:%M:%S'))
except ValueError as ve:
Command.datadog_error("Bad Date value found for {}: message {}".format(client_candidate_id, ve), vcdcfile.name)
# store the candidate Id if one is provided. (For debugging)
if row['CandidateID']:
try:
tcuser.candidate_id = int(row['CandidateID'])
except ValueError as ve:
Command.datadog_error("Bad CandidateID value found for {}: message {}".format(client_candidate_id, ve), vcdcfile.name)
tcuser.confirmed_at = datetime.utcnow()
tcuser.save()
except TestCenterUser.DoesNotExist:
Command.datadog_error(" Failed to find record for client_candidate_id {}".format(client_candidate_id), vcdcfile.name)
...@@ -71,6 +71,12 @@ class Command(BaseCommand): ...@@ -71,6 +71,12 @@ class Command(BaseCommand):
dest='ignore_registration_dates', dest='ignore_registration_dates',
help='find exam info for course based on exam_series_code, even if the exam is not active.' help='find exam info for course based on exam_series_code, even if the exam is not active.'
), ),
make_option(
'--create_dummy_exam',
action='store_true',
dest='create_dummy_exam',
help='create dummy exam info for course, even if course exists'
),
) )
args = "<student_username course_id>" args = "<student_username course_id>"
help = "Create or modify a TestCenterRegistration entry for a given Student" help = "Create or modify a TestCenterRegistration entry for a given Student"
...@@ -98,7 +104,10 @@ class Command(BaseCommand): ...@@ -98,7 +104,10 @@ class Command(BaseCommand):
except TestCenterUser.DoesNotExist: except TestCenterUser.DoesNotExist:
raise CommandError("User \"{}\" does not have an existing demographics record".format(username)) raise CommandError("User \"{}\" does not have an existing demographics record".format(username))
# check to see if a course_id was specified, and use information from that: # get an "exam" object. Check to see if a course_id was specified, and use information from that:
exam = None
create_dummy_exam = 'create_dummy_exam' in our_options and our_options['create_dummy_exam']
if not create_dummy_exam:
try: try:
course = course_from_id(course_id) course = course_from_id(course_id)
if 'ignore_registration_dates' in our_options: if 'ignore_registration_dates' in our_options:
...@@ -107,6 +116,8 @@ class Command(BaseCommand): ...@@ -107,6 +116,8 @@ class Command(BaseCommand):
else: else:
exam = course.current_test_center_exam exam = course.current_test_center_exam
except ItemNotFoundError: except ItemNotFoundError:
pass
else:
# otherwise use explicit values (so we don't have to define a course): # otherwise use explicit values (so we don't have to define a course):
exam_name = "Dummy Placeholder Name" exam_name = "Dummy Placeholder Name"
exam_info = { 'Exam_Series_Code': our_options['exam_series_code'], exam_info = { 'Exam_Series_Code': our_options['exam_series_code'],
...@@ -120,7 +131,7 @@ class Command(BaseCommand): ...@@ -120,7 +131,7 @@ class Command(BaseCommand):
our_options['eligibility_appointment_date_last'] = strftime("%Y-%m-%d", exam.last_eligible_appointment_date) our_options['eligibility_appointment_date_last'] = strftime("%Y-%m-%d", exam.last_eligible_appointment_date)
if exam is None: if exam is None:
raise CommandError("Exam for course_id {%s} does not exist".format(course_id)) raise CommandError("Exam for course_id {} does not exist".format(course_id))
exam_code = exam.exam_series_code exam_code = exam.exam_series_code
......
from optparse import make_option from optparse import make_option
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand, CommandError
from student.models import TestCenterUser, TestCenterUserForm from student.models import TestCenterUser, TestCenterUserForm
...@@ -161,15 +161,16 @@ class Command(BaseCommand): ...@@ -161,15 +161,16 @@ class Command(BaseCommand):
if form.is_valid(): if form.is_valid():
form.update_and_save() form.update_and_save()
else: else:
errorlist = []
if (len(form.errors) > 0): if (len(form.errors) > 0):
print "Field Form errors encountered:" errorlist.append("Field Form errors encountered:")
for fielderror in form.errors: for fielderror in form.errors:
print "Field Form Error: %s" % fielderror errorlist.append("Field Form Error: {}".format(fielderror))
if (len(form.non_field_errors()) > 0): if (len(form.non_field_errors()) > 0):
print "Non-field Form errors encountered:" errorlist.append("Non-field Form errors encountered:")
for nonfielderror in form.non_field_errors: for nonfielderror in form.non_field_errors:
print "Non-field Form Error: %s" % nonfielderror errorlist.append("Non-field Form Error: {}".format(nonfielderror))
raise CommandError("\n".join(errorlist))
else: else:
print "No changes necessary to make to existing user's demographics." print "No changes necessary to make to existing user's demographics."
......
import os
from optparse import make_option
from stat import S_ISDIR
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.core.management import call_command
from dogapi import dog_http_api, dog_stats_api
import paramiko
import boto
dog_http_api.api_key = settings.DATADOG_API
class Command(BaseCommand):
help = """
This command handles the importing and exporting of student records for
Pearson. It uses some other Django commands to export and import the
files and then uploads over SFTP to Pearson and stuffs the entry in an
S3 bucket for archive purposes.
Usage: django-admin.py pearson-transfer --mode [import|export|both]
"""
option_list = BaseCommand.option_list + (
make_option('--mode',
action='store',
dest='mode',
default='both',
choices=('import', 'export', 'both'),
help='mode is import, export, or both'),
)
def handle(self, **options):
if not hasattr(settings, 'PEARSON'):
raise CommandError('No PEARSON entries in auth/env.json.')
# check settings needed for either import or export:
for value in ['SFTP_HOSTNAME', 'SFTP_USERNAME', 'SFTP_PASSWORD', 'S3_BUCKET']:
if value not in settings.PEARSON:
raise CommandError('No entry in the PEARSON settings'
'(env/auth.json) for {0}'.format(value))
for value in ['AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY']:
if not hasattr(settings, value):
raise CommandError('No entry in the AWS settings'
'(env/auth.json) for {0}'.format(value))
# check additional required settings for import and export:
if options['mode'] in ('export', 'both'):
for value in ['LOCAL_EXPORT','SFTP_EXPORT']:
if value not in settings.PEARSON:
raise CommandError('No entry in the PEARSON settings'
'(env/auth.json) for {0}'.format(value))
# make sure that the import directory exists or can be created:
source_dir = settings.PEARSON['LOCAL_EXPORT']
if not os.path.isdir(source_dir):
os.makedirs(source_dir)
if options['mode'] in ('import', 'both'):
for value in ['LOCAL_IMPORT','SFTP_IMPORT']:
if value not in settings.PEARSON:
raise CommandError('No entry in the PEARSON settings'
'(env/auth.json) for {0}'.format(value))
# make sure that the import directory exists or can be created:
dest_dir = settings.PEARSON['LOCAL_IMPORT']
if not os.path.isdir(dest_dir):
os.makedirs(dest_dir)
def sftp(files_from, files_to, mode, deleteAfterCopy=False):
with dog_stats_api.timer('pearson.{0}'.format(mode), tags='sftp'):
try:
t = paramiko.Transport((settings.PEARSON['SFTP_HOSTNAME'], 22))
t.connect(username=settings.PEARSON['SFTP_USERNAME'],
password=settings.PEARSON['SFTP_PASSWORD'])
sftp = paramiko.SFTPClient.from_transport(t)
if mode == 'export':
try:
sftp.chdir(files_to)
except IOError:
raise CommandError('SFTP destination path does not exist: {}'.format(files_to))
for filename in os.listdir(files_from):
sftp.put(files_from + '/' + filename, filename)
if deleteAfterCopy:
os.remove(os.path.join(files_from, filename))
else:
try:
sftp.chdir(files_from)
except IOError:
raise CommandError('SFTP source path does not exist: {}'.format(files_from))
for filename in sftp.listdir('.'):
# skip subdirectories
if not S_ISDIR(sftp.stat(filename).st_mode):
sftp.get(filename, files_to + '/' + filename)
# delete files from sftp server once they are successfully pulled off:
if deleteAfterCopy:
sftp.remove(filename)
except:
dog_http_api.event('pearson {0}'.format(mode),
'sftp uploading failed',
alert_type='error')
raise
finally:
sftp.close()
t.close()
def s3(files_from, bucket, mode, deleteAfterCopy=False):
with dog_stats_api.timer('pearson.{0}'.format(mode), tags='s3'):
try:
for filename in os.listdir(files_from):
source_file = os.path.join(files_from, filename)
# use mode as name of directory into which to write files
dest_file = os.path.join(mode, filename)
upload_file_to_s3(bucket, source_file, dest_file)
if deleteAfterCopy:
os.remove(files_from + '/' + filename)
except:
dog_http_api.event('pearson {0}'.format(mode),
's3 archiving failed')
raise
def upload_file_to_s3(bucket, source_file, dest_file):
"""
Upload file to S3
"""
s3 = boto.connect_s3(settings.AWS_ACCESS_KEY_ID,
settings.AWS_SECRET_ACCESS_KEY)
from boto.s3.key import Key
b = s3.get_bucket(bucket)
k = Key(b)
k.key = "{filename}".format(filename=dest_file)
k.set_contents_from_filename(source_file)
def export_pearson():
options = { 'dest-from-settings' : True }
call_command('pearson_export_cdd', **options)
call_command('pearson_export_ead', **options)
mode = 'export'
sftp(settings.PEARSON['LOCAL_EXPORT'], settings.PEARSON['SFTP_EXPORT'], mode, deleteAfterCopy = False)
s3(settings.PEARSON['LOCAL_EXPORT'], settings.PEARSON['S3_BUCKET'], mode, deleteAfterCopy=True)
def import_pearson():
mode = 'import'
try:
sftp(settings.PEARSON['SFTP_IMPORT'], settings.PEARSON['LOCAL_IMPORT'], mode, deleteAfterCopy = True)
s3(settings.PEARSON['LOCAL_IMPORT'], settings.PEARSON['S3_BUCKET'], mode, deleteAfterCopy=False)
except Exception as e:
dog_http_api.event('Pearson Import failure', str(e))
raise e
else:
for filename in os.listdir(settings.PEARSON['LOCAL_IMPORT']):
filepath = os.path.join(settings.PEARSON['LOCAL_IMPORT'], filename)
call_command('pearson_import_conf_zip', filepath)
os.remove(filepath)
# actually do the work!
if options['mode'] in ('export', 'both'):
export_pearson()
if options['mode'] in ('import', 'both'):
import_pearson()
'''
Created on Jan 17, 2013
@author: brian
'''
import logging
import os
from tempfile import mkdtemp
import cStringIO
import sys
from django.test import TestCase
from django.core.management import call_command
from nose.plugins.skip import SkipTest
from student.models import User, TestCenterRegistration, TestCenterUser, get_testcenter_registration
log = logging.getLogger(__name__)
def create_tc_user(username):
user = User.objects.create_user(username, '{}@edx.org'.format(username), 'fakepass')
options = {
'first_name' : 'TestFirst',
'last_name' : 'TestLast',
'address_1' : 'Test Address',
'city' : 'TestCity',
'state' : 'Alberta',
'postal_code' : 'A0B 1C2',
'country' : 'CAN',
'phone' : '252-1866',
'phone_country_code' : '1',
}
call_command('pearson_make_tc_user', username, **options)
return TestCenterUser.objects.get(user=user)
def create_tc_registration(username, course_id = 'org1/course1/term1', exam_code = 'exam1', accommodation_code = None):
options = { 'exam_series_code' : exam_code,
'eligibility_appointment_date_first' : '2013-01-01T00:00',
'eligibility_appointment_date_last' : '2013-12-31T23:59',
'accommodation_code' : accommodation_code,
}
call_command('pearson_make_tc_registration', username, course_id, **options)
user = User.objects.get(username=username)
registrations = get_testcenter_registration(user, course_id, exam_code)
return registrations[0]
def create_multiple_registrations(prefix='test'):
username1 = '{}_multiple1'.format(prefix)
create_tc_user(username1)
create_tc_registration(username1)
create_tc_registration(username1, course_id = 'org1/course2/term1')
create_tc_registration(username1, exam_code = 'exam2')
username2 = '{}_multiple2'.format(prefix)
create_tc_user(username2)
create_tc_registration(username2)
username3 = '{}_multiple3'.format(prefix)
create_tc_user(username3)
create_tc_registration(username3, course_id = 'org1/course2/term1')
username4 = '{}_multiple4'.format(prefix)
create_tc_user(username4)
create_tc_registration(username4, exam_code = 'exam2')
def get_command_error_text(*args, **options):
stderr_string = None
old_stderr = sys.stderr
sys.stderr = cStringIO.StringIO()
try:
call_command(*args, **options)
except SystemExit, why1:
# The goal here is to catch CommandError calls.
# But these are actually translated into nice messages,
# and sys.exit(1) is then called. For testing, we
# want to catch what sys.exit throws, and get the
# relevant text either from stdout or stderr.
if (why1.message > 0):
stderr_string = sys.stderr.getvalue()
else:
raise why1
except Exception, why:
raise why
finally:
sys.stderr = old_stderr
if stderr_string is None:
raise Exception("Expected call to {} to fail, but it succeeded!".format(args[0]))
return stderr_string
def get_error_string_for_management_call(*args, **options):
stdout_string = None
old_stdout = sys.stdout
old_stderr = sys.stderr
sys.stdout = cStringIO.StringIO()
sys.stderr = cStringIO.StringIO()
try:
call_command(*args, **options)
except SystemExit, why1:
# The goal here is to catch CommandError calls.
# But these are actually translated into nice messages,
# and sys.exit(1) is then called. For testing, we
# want to catch what sys.exit throws, and get the
# relevant text either from stdout or stderr.
if (why1.message == 1):
stdout_string = sys.stdout.getvalue()
stderr_string = sys.stderr.getvalue()
else:
raise why1
except Exception, why:
raise why
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
if stdout_string is None:
raise Exception("Expected call to {} to fail, but it succeeded!".format(args[0]))
return stdout_string, stderr_string
def get_file_info(dirpath):
filelist = os.listdir(dirpath)
print 'Files found: {}'.format(filelist)
numfiles = len(filelist)
if numfiles == 1:
filepath = os.path.join(dirpath, filelist[0])
with open(filepath, 'r') as cddfile:
filecontents = cddfile.readlines()
numlines = len(filecontents)
return filepath, numlines
else:
raise Exception("Expected to find a single file in {}, but found {}".format(dirpath,filelist))
class PearsonTestCase(TestCase):
'''
Base class for tests running Pearson-related commands
'''
import_dir = mkdtemp(prefix="import")
export_dir = mkdtemp(prefix="export")
def assertErrorContains(self, error_message, expected):
self.assertTrue(error_message.find(expected) >= 0, 'error message "{}" did not contain "{}"'.format(error_message, expected))
def tearDown(self):
def delete_temp_dir(dirname):
if os.path.exists(dirname):
for filename in os.listdir(dirname):
os.remove(os.path.join(dirname, filename))
os.rmdir(dirname)
# clean up after any test data was dumped to temp directory
delete_temp_dir(self.import_dir)
delete_temp_dir(self.export_dir)
# and clean up the database:
# TestCenterUser.objects.all().delete()
# TestCenterRegistration.objects.all().delete()
class PearsonCommandTestCase(PearsonTestCase):
def test_missing_demographic_fields(self):
# We won't bother to test all details of form validation here.
# It is enough to show that it works here, but deal with test cases for the form
# validation in the student tests, not these management tests.
username = 'baduser'
User.objects.create_user(username, '{}@edx.org'.format(username), 'fakepass')
options = {}
error_string = get_command_error_text('pearson_make_tc_user', username, **options)
self.assertTrue(error_string.find('Field Form errors encountered:') >= 0)
self.assertTrue(error_string.find('Field Form Error: city') >= 0)
self.assertTrue(error_string.find('Field Form Error: first_name') >= 0)
self.assertTrue(error_string.find('Field Form Error: last_name') >= 0)
self.assertTrue(error_string.find('Field Form Error: country') >= 0)
self.assertTrue(error_string.find('Field Form Error: phone_country_code') >= 0)
self.assertTrue(error_string.find('Field Form Error: phone') >= 0)
self.assertTrue(error_string.find('Field Form Error: address_1') >= 0)
self.assertErrorContains(error_string, 'Field Form Error: address_1')
def test_create_good_testcenter_user(self):
testcenter_user = create_tc_user("test1")
self.assertIsNotNone(testcenter_user)
def test_create_good_testcenter_registration(self):
username = 'test1'
create_tc_user(username)
registration = create_tc_registration(username)
self.assertIsNotNone(registration)
def test_cdd_missing_option(self):
error_string = get_command_error_text('pearson_export_cdd', **{})
self.assertErrorContains(error_string, 'Error: --destination or --dest-from-settings must be used')
def test_ead_missing_option(self):
error_string = get_command_error_text('pearson_export_ead', **{})
self.assertErrorContains(error_string, 'Error: --destination or --dest-from-settings must be used')
def test_export_single_cdd(self):
# before we generate any tc_users, we expect there to be nothing to output:
options = { 'dest-from-settings' : True }
with self.settings(PEARSON={ 'LOCAL_EXPORT' : self.export_dir }):
call_command('pearson_export_cdd', **options)
(filepath, numlines) = get_file_info(self.export_dir)
self.assertEquals(numlines, 1, "Expect cdd file to have no non-header lines")
os.remove(filepath)
# generating a tc_user should result in a line in the output
username = 'test_single_cdd'
create_tc_user(username)
call_command('pearson_export_cdd', **options)
(filepath, numlines) = get_file_info(self.export_dir)
self.assertEquals(numlines, 2, "Expect cdd file to have one non-header line")
os.remove(filepath)
# output after registration should not have any entries again.
call_command('pearson_export_cdd', **options)
(filepath, numlines) = get_file_info(self.export_dir)
self.assertEquals(numlines, 1, "Expect cdd file to have no non-header lines")
os.remove(filepath)
# if we modify the record, then it should be output again:
user_options = { 'first_name' : 'NewTestFirst', }
call_command('pearson_make_tc_user', username, **user_options)
call_command('pearson_export_cdd', **options)
(filepath, numlines) = get_file_info(self.export_dir)
self.assertEquals(numlines, 2, "Expect cdd file to have one non-header line")
os.remove(filepath)
def test_export_single_ead(self):
# before we generate any registrations, we expect there to be nothing to output:
options = { 'dest-from-settings' : True }
with self.settings(PEARSON={ 'LOCAL_EXPORT' : self.export_dir }):
call_command('pearson_export_ead', **options)
(filepath, numlines) = get_file_info(self.export_dir)
self.assertEquals(numlines, 1, "Expect ead file to have no non-header lines")
os.remove(filepath)
# generating a registration should result in a line in the output
username = 'test_single_ead'
create_tc_user(username)
create_tc_registration(username)
call_command('pearson_export_ead', **options)
(filepath, numlines) = get_file_info(self.export_dir)
self.assertEquals(numlines, 2, "Expect ead file to have one non-header line")
os.remove(filepath)
# output after registration should not have any entries again.
call_command('pearson_export_ead', **options)
(filepath, numlines) = get_file_info(self.export_dir)
self.assertEquals(numlines, 1, "Expect ead file to have no non-header lines")
os.remove(filepath)
# if we modify the record, then it should be output again:
create_tc_registration(username, accommodation_code='EQPMNT')
call_command('pearson_export_ead', **options)
(filepath, numlines) = get_file_info(self.export_dir)
self.assertEquals(numlines, 2, "Expect ead file to have one non-header line")
os.remove(filepath)
def test_export_multiple(self):
create_multiple_registrations("export")
with self.settings(PEARSON={ 'LOCAL_EXPORT' : self.export_dir }):
options = { 'dest-from-settings' : True }
call_command('pearson_export_cdd', **options)
(filepath, numlines) = get_file_info(self.export_dir)
self.assertEquals(numlines, 5, "Expect cdd file to have four non-header lines: total was {}".format(numlines))
os.remove(filepath)
call_command('pearson_export_ead', **options)
(filepath, numlines) = get_file_info(self.export_dir)
self.assertEquals(numlines, 7, "Expect ead file to have six non-header lines: total was {}".format(numlines))
os.remove(filepath)
# def test_bad_demographic_option(self):
# username = 'nonuser'
# output_string, stderrmsg = get_error_string_for_management_call('pearson_make_tc_user', username, **{'--garbage' : None })
# print stderrmsg
# self.assertErrorContains(stderrmsg, 'Unexpected option')
#
# def test_missing_demographic_user(self):
# username = 'nonuser'
# output_string, error_string = get_error_string_for_management_call('pearson_make_tc_user', username, **{})
# self.assertErrorContains(error_string, 'User matching query does not exist')
# credentials for a test SFTP site:
SFTP_HOSTNAME = 'ec2-23-20-150-101.compute-1.amazonaws.com'
SFTP_USERNAME = 'pearsontest'
SFTP_PASSWORD = 'password goes here'
S3_BUCKET = 'edx-pearson-archive'
AWS_ACCESS_KEY_ID = 'put yours here'
AWS_SECRET_ACCESS_KEY = 'put yours here'
class PearsonTransferTestCase(PearsonTestCase):
'''
Class for tests running Pearson transfers
'''
def test_transfer_config(self):
with self.settings(DATADOG_API='FAKE_KEY'):
# TODO: why is this failing with the wrong error message?!
stderrmsg = get_command_error_text('pearson_transfer', **{'mode' : 'garbage'})
self.assertErrorContains(stderrmsg, 'Error: No PEARSON entries')
with self.settings(DATADOG_API='FAKE_KEY'):
stderrmsg = get_command_error_text('pearson_transfer')
self.assertErrorContains(stderrmsg, 'Error: No PEARSON entries')
with self.settings(DATADOG_API='FAKE_KEY',
PEARSON={'LOCAL_EXPORT' : self.export_dir,
'LOCAL_IMPORT' : self.import_dir }):
stderrmsg = get_command_error_text('pearson_transfer')
self.assertErrorContains(stderrmsg, 'Error: No entry in the PEARSON settings')
def test_transfer_export_missing_dest_dir(self):
raise SkipTest()
create_multiple_registrations('export_missing_dest')
with self.settings(DATADOG_API='FAKE_KEY',
PEARSON={'LOCAL_EXPORT' : self.export_dir,
'SFTP_EXPORT' : 'this/does/not/exist',
'SFTP_HOSTNAME' : SFTP_HOSTNAME,
'SFTP_USERNAME' : SFTP_USERNAME,
'SFTP_PASSWORD' : SFTP_PASSWORD,
'S3_BUCKET' : S3_BUCKET,
},
AWS_ACCESS_KEY_ID = AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY = AWS_SECRET_ACCESS_KEY):
options = { 'mode' : 'export'}
stderrmsg = get_command_error_text('pearson_transfer', **options)
self.assertErrorContains(stderrmsg, 'Error: SFTP destination path does not exist')
def test_transfer_export(self):
raise SkipTest()
create_multiple_registrations("transfer_export")
with self.settings(DATADOG_API='FAKE_KEY',
PEARSON={'LOCAL_EXPORT' : self.export_dir,
'SFTP_EXPORT' : 'results/topvue',
'SFTP_HOSTNAME' : SFTP_HOSTNAME,
'SFTP_USERNAME' : SFTP_USERNAME,
'SFTP_PASSWORD' : SFTP_PASSWORD,
'S3_BUCKET' : S3_BUCKET,
},
AWS_ACCESS_KEY_ID = AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY = AWS_SECRET_ACCESS_KEY):
options = { 'mode' : 'export'}
# call_command('pearson_transfer', **options)
# # confirm that the export directory is still empty:
# self.assertEqual(len(os.listdir(self.export_dir)), 0, "expected export directory to be empty")
def test_transfer_import_missing_source_dir(self):
raise SkipTest()
create_multiple_registrations('import_missing_src')
with self.settings(DATADOG_API='FAKE_KEY',
PEARSON={'LOCAL_IMPORT' : self.import_dir,
'SFTP_IMPORT' : 'this/does/not/exist',
'SFTP_HOSTNAME' : SFTP_HOSTNAME,
'SFTP_USERNAME' : SFTP_USERNAME,
'SFTP_PASSWORD' : SFTP_PASSWORD,
'S3_BUCKET' : S3_BUCKET,
},
AWS_ACCESS_KEY_ID = AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY = AWS_SECRET_ACCESS_KEY):
options = { 'mode' : 'import'}
stderrmsg = get_command_error_text('pearson_transfer', **options)
self.assertErrorContains(stderrmsg, 'Error: SFTP source path does not exist')
def test_transfer_import(self):
raise SkipTest()
create_multiple_registrations('import_missing_src')
with self.settings(DATADOG_API='FAKE_KEY',
PEARSON={'LOCAL_IMPORT' : self.import_dir,
'SFTP_IMPORT' : 'results',
'SFTP_HOSTNAME' : SFTP_HOSTNAME,
'SFTP_USERNAME' : SFTP_USERNAME,
'SFTP_PASSWORD' : SFTP_PASSWORD,
'S3_BUCKET' : S3_BUCKET,
},
AWS_ACCESS_KEY_ID = AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY = AWS_SECRET_ACCESS_KEY):
options = { 'mode' : 'import'}
call_command('pearson_transfer', **options)
self.assertEqual(len(os.listdir(self.import_dir)), 0, "expected import directory to be empty")
...@@ -428,6 +428,10 @@ class TestCenterRegistration(models.Model): ...@@ -428,6 +428,10 @@ class TestCenterRegistration(models.Model):
# TODO: figure out if this should really go in the database (with a default value). # TODO: figure out if this should really go in the database (with a default value).
return 1 return 1
@property
def needs_uploading(self):
return self.uploaded_at is None or self.uploaded_at < self.user_updated_at
@classmethod @classmethod
def create(cls, testcenter_user, exam, accommodation_request): def create(cls, testcenter_user, exam, accommodation_request):
registration = cls(testcenter_user = testcenter_user) registration = cls(testcenter_user = testcenter_user)
...@@ -550,6 +554,10 @@ def get_testcenter_registration(user, course_id, exam_series_code): ...@@ -550,6 +554,10 @@ def get_testcenter_registration(user, course_id, exam_series_code):
return [] return []
return TestCenterRegistration.objects.filter(testcenter_user=tcu, course_id=course_id, exam_series_code=exam_series_code) return TestCenterRegistration.objects.filter(testcenter_user=tcu, course_id=course_id, exam_series_code=exam_series_code)
# nosetests thinks that anything with _test_ in the name is a test.
# Correct this (https://nose.readthedocs.org/en/latest/finding_tests.html)
get_testcenter_registration.__test__ = False
def unique_id_for_user(user): def unique_id_for_user(user):
""" """
Return a unique id for a user, suitable for inserting into Return a unique id for a user, suitable for inserting into
......
...@@ -3,3 +3,4 @@ ...@@ -3,3 +3,4 @@
-e git://github.com/MITx/django-pipeline.git#egg=django-pipeline -e git://github.com/MITx/django-pipeline.git#egg=django-pipeline
-e git://github.com/MITx/django-wiki.git@e2e84558#egg=django-wiki -e git://github.com/MITx/django-wiki.git@e2e84558#egg=django-wiki
-e git://github.com/dementrock/pystache_custom.git@776973740bdaad83a3b029f96e415a7d1e8bec2f#egg=pystache_custom-dev -e git://github.com/dementrock/pystache_custom.git@776973740bdaad83a3b029f96e415a7d1e8bec2f#egg=pystache_custom-dev
-e git://github.com/MITx/dogapi.git@003a4fc9#egg=dogapi
...@@ -88,3 +88,9 @@ PEER_GRADING_INTERFACE = AUTH_TOKENS.get('PEER_GRADING_INTERFACE', PEER_GRADING_ ...@@ -88,3 +88,9 @@ PEER_GRADING_INTERFACE = AUTH_TOKENS.get('PEER_GRADING_INTERFACE', PEER_GRADING_
PEARSON_TEST_USER = "pearsontest" PEARSON_TEST_USER = "pearsontest"
PEARSON_TEST_PASSWORD = AUTH_TOKENS.get("PEARSON_TEST_PASSWORD") PEARSON_TEST_PASSWORD = AUTH_TOKENS.get("PEARSON_TEST_PASSWORD")
# Pearson hash for import/export
PEARSON = AUTH_TOKENS.get("PEARSON")
# Datadog for events!
DATADOG_API = AUTH_TOKENS.get("DATADOG_API")
...@@ -58,4 +58,4 @@ factory_boy ...@@ -58,4 +58,4 @@ factory_boy
Shapely==1.2.16 Shapely==1.2.16
ipython==0.13.1 ipython==0.13.1
xmltodict==0.4.1 xmltodict==0.4.1
paramiko==1.9.0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment