Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
E
edx-platform
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
edx
edx-platform
Commits
80b2f45a
Commit
80b2f45a
authored
Jan 17, 2013
by
Brian Wilson
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
move pearson_import to another branch
parent
4a59125b
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
0 additions
and
131 deletions
+0
-131
common/djangoapps/student/management/commands/pearson_import_conf_zip.py
+0
-131
No files found.
common/djangoapps/student/management/commands/pearson_import_conf_zip.py
deleted
100644 → 0
View file @
4a59125b
import
csv
from
zipfile
import
ZipFile
from
time
import
strptime
,
strftime
from
collections
import
OrderedDict
from
datetime
import
datetime
from
os.path
import
isdir
from
optparse
import
make_option
from
django.core.management.base
import
BaseCommand
,
CommandError
from
student.models
import
TestCenterUser
,
TestCenterRegistration
class
Command
(
BaseCommand
):
args
=
'<input zip file>'
help
=
"""
Import Pearson confirmation files and update TestCenterUser and TestCenterRegistration tables
with status.
"""
def
handle
(
self
,
*
args
,
**
kwargs
):
if
len
(
args
)
<
1
:
print
Command
.
help
return
source_zip
=
args
[
0
]
# TODO: check that it's a zip
# loop through all files in zip, and process them based on filename prefix:
with
ZipFile
(
source_zip
,
'r'
)
as
zipfile
:
for
fileinfo
in
zipfile
.
infolist
():
with
zipfile
.
open
(
fileinfo
)
as
zipentry
:
if
fileinfo
.
filename
.
startswith
(
"eac-"
):
self
.
process_eac
(
zipentry
)
elif
fileinfo
.
filename
.
startswith
(
"vcdc-"
):
self
.
process_vcdc
(
zipentry
)
else
:
raise
CommandError
(
"Unrecognized confirmation file type
\"
{}
\"
in confirmation zip file
\"
{}
\"
"
.
format
(
fileinfo
.
filename
,
zipfile
))
def
process_eac
(
self
,
eacfile
):
print
"processing eac"
reader
=
csv
.
DictReader
(
eacfile
,
delimiter
=
"
\t
"
)
for
row
in
reader
:
client_authorization_id
=
row
[
'ClientAuthorizationID'
]
if
client_authorization_id
is
not
None
:
try
:
registration
=
TestCenterRegistration
.
objects
.
get
(
client_authorization_id
=
client_authorization_id
)
print
"Found authorization record for user {}"
.
format
(
registration
.
testcenter_user
.
user
.
username
)
# now update the record:
registration
.
upload_status
=
row
[
'Status'
]
registration
.
upload_error_message
=
row
[
'Message'
]
try
:
registration
.
processed_at
=
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
,
strptime
(
row
[
'Date'
],
'
%
Y/
%
m/
%
d
%
H:
%
M:
%
S'
))
except
ValueError
as
ve
:
print
"Bad Date value found for {}: message {}"
.
format
(
client_authorization_id
,
ve
)
# store the Authorization Id if one is provided. (For debugging)
if
row
[
'AuthorizationID'
]:
try
:
registration
.
authorization_id
=
int
(
row
[
'AuthorizationID'
])
except
ValueError
as
ve
:
print
"Bad AuthorizationID value found for {}: message {}"
.
format
(
client_authorization_id
,
ve
)
registration
.
confirmed_at
=
datetime
.
utcnow
()
registration
.
save
()
except
TestCenterRegistration
.
DoesNotExist
:
print
" Failed to find record for client_auth_id {}"
.
format
(
client_authorization_id
)
def
process_vcdc
(
self
,
vcdcfile
):
print
"processing vcdc"
reader
=
csv
.
DictReader
(
vcdcfile
,
delimiter
=
"
\t
"
)
for
row
in
reader
:
client_candidate_id
=
row
[
'ClientCandidateID'
]
if
client_candidate_id
is
not
None
:
try
:
tcuser
=
TestCenterUser
.
objects
.
get
(
client_candidate_id
=
client_candidate_id
)
print
"Found demographics record for user {}"
.
format
(
tcuser
.
user
.
username
)
# now update the record:
tcuser
.
upload_status
=
row
[
'Status'
]
tcuser
.
upload_error_message
=
row
[
'Message'
]
try
:
tcuser
.
processed_at
=
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
,
strptime
(
row
[
'Date'
],
'
%
Y/
%
m/
%
d
%
H:
%
M:
%
S'
))
except
ValueError
as
ve
:
print
"Bad Date value found for {}: message {}"
.
format
(
client_candidate_id
,
ve
)
# store the Authorization Id if one is provided. (For debugging)
if
row
[
'CandidateID'
]:
try
:
tcuser
.
candidate_id
=
int
(
row
[
'CandidateID'
])
except
ValueError
as
ve
:
print
"Bad CandidateID value found for {}: message {}"
.
format
(
client_candidate_id
,
ve
)
tcuser
.
confirmed_at
=
datetime
.
utcnow
()
tcuser
.
save
()
except
TestCenterUser
.
DoesNotExist
:
print
" Failed to find record for client_candidate_id {}"
.
format
(
client_candidate_id
)
# def _try_parse_time(self, key):
# """
# Parse an optional metadata key containing a time: if present, complain
# if it doesn't parse.
# Return None if not present or invalid.
# """
# if key in self.exam_info:
# try:
# return parse_time(self.exam_info[key])
# except ValueError as e:
# msg = "Exam {0} in course {1} loaded with a bad exam_info key '{2}': '{3}'".format(self.exam_name, self.course_id, self.exam_info[key], e)
# log.warning(msg)
# return None
#
# with open(destfile, "wb") as outfile:
# writer = csv.DictWriter(outfile,
# Command.CSV_TO_MODEL_FIELDS,
# delimiter="\t",
# quoting=csv.QUOTE_MINIMAL,
# extrasaction='ignore')
# writer.writeheader()
# for tcu in TestCenterUser.objects.order_by('id'):
# if dump_all or tcu.needs_uploading:
# record = dict((csv_field, ensure_encoding(getattr(tcu, model_field)))
# for csv_field, model_field
# in Command.CSV_TO_MODEL_FIELDS.items())
# record["LastUpdate"] = record["LastUpdate"].strftime("%Y/%m/%d %H:%M:%S")
# writer.writerow(record)
# tcu.uploaded_at = uploaded_at
# tcu.save()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment