Commit de06dda3 by Mike Chen

added capaxmlconverter app to convert xml into json format

parent 7f6f04df
from lxml import etree
import logging
import json
from StringIO import StringIO
import re
class CapaXMLConverter(object):
class CapaXMLConverterError(BaseException):
def __init__(self, msg):
self.msg = msg
super(CapaXMLConverter.CapaXMLConverterError, self).__init__()
def __str__(self):
return self.msg
class TagNotConsumedError(CapaXMLConverterError):
pass
class InvalidNestingError(CapaXMLConverterError):
pass
def __init__(self, logger=None):
if logger:
self.logger = logger
else:
self.logger = logging.getLogger(__name__)
self.end_of_text_tags = ['text', 'endouttext']
self.group_tags = ['multiplechoiceresponse', 'choiceresponse',
'truefalseresponse']
self.response_tags = ['choice', 'numericalresponse', 'formularesponse', 'stringresponse',
'schematicresponse', 'symbolicresponse', 'customresponse']
self.grouped_tags = ['choice']
self.script_tags = ['script']
self.text_included_tags = ['choice', 'schematicresponse', 'customresponse']
self.attr_copy_tags = ['responseparam']
self.type_map = {
'script': 'script',
'choice': 'choice',
'multiplechoiceresponse': 'multiple_choice',
'choiceresponse': 'multiple_choice',
'truefalseresponse': 'true_false',
'numericalresponse': 'numerical',
'stringresponse': 'string',
'formularesponse': 'formula',
'schematicresponse': 'schematic',
'symbolicresponse': 'symbolic',
'customresponse': 'custom',
'img': 'image',
'answer': 'script',
}
self.group_children_field = {
'multiplechoiceresponse': 'choices',
'choiceresponse': 'choices',
'truefalseresponse': 'choices',
}
self.element_post_hooks = {
'schematicresponse': self.process_schematicresponse,
'formularesponse': self.process_formularesponse,
'customresponse': self.process_customresponse,
}
self.all_tags = self.group_tags + self.response_tags + self.script_tags
self.attribute_rules = {
'*': [
],
'script': [
{
'src': 'type',
'dest': 'language',
'wrapper': lambda x: x.split('/')[1].strip(),
}
],
'customresponse': [
],
'choice': [
{
'src': 'correct',
'dest': 'correct',
'wrapper': lambda x: x == 'true',
'required': True
},
{
'src': 'location',
'dest': 'location',
},
{
'dest': 'text',
'default': '',
},
],
'answer': [
{
'src': 'type',
'dest': 'language',
'wrapper': lambda x: x.split('/')[1].strip(),
}
],
'img': [
{
'src': 'src',
'dest': 'url',
}
],
'numericalresponse': [
{
'src': 'answer',
'dest': 'answer',
}
],
'stringresponse': [
{
'src': 'answer',
'dest': 'answer',
},
],
'schematicresponse': [
{
'src': 'answer',
'dest': 'answer',
}
],
'responseparam': [
{
'src': 'default',
'dest': 'tolerance',
}
],
'formularesponse': [
{
'src': 'samples',
'dest': '_samples_',
}
],
}
super(CapaXMLConverter, self).__init__()
def build_from_element(self, element):
return self.copy_attribute(element, {'type': self.type_map[element.tag], '_tag_': element.tag})
def copy_attribute(self, element, out):
rules = self.attribute_rules['*'] + self.attribute_rules.get(element.tag, [])
for rule in rules:
if 'src' in rule:
if rule.get('required', False) or rule['src'] in element.keys():
val = element.get(rule['src'])
if 'wrapper' in rule:
val = rule['wrapper'](val)
out[rule['dest']] = val
elif 'default' in rule:
out[rule['dest']] = rule['default']
elif 'dest' in rule and 'default' in rule:
out[rule['dest']] = rule['default']
return out
def pretty_print(self, data):
print json.dumps(data, indent=2)
def dict_del_key(self, d, k):
if isinstance(d, dict) and k in d:
del d[k]
values = d.values() if isinstance(d, dict) else d
for v in values:
if isinstance(v, dict) or isinstance(v, list):
self.dict_del_key(v, k)
return d
def feed_post_process_hook(self, element, out):
if element.tag in self.element_post_hooks:
out = self.element_post_hooks[element.tag](element, out)
return out
def merge_adjacent_string(self, l):
if l:
ret = [l[0], ]
for x in l[1:]:
if isinstance(ret[-1], basestring) and isinstance(x, basestring):
ret[-1] += x
else:
ret.append(x)
return ret
return l
def iterate_element(self, element, ):
if element.text:
yield element.text
for el in element:
yield el
if el.tail:
yield el.tail
def split_element_on_tag(self, element, tag):
t = []
for part in self.iterate_element(element):
if isinstance(part, basestring) or part.tag == tag:
t.append(part)
else:
t.append(etree.tostring(part, with_tail=False))
return self.merge_adjacent_string(t)
# look for only (img, str) or (img, ) in <center> tag
def picky_center_element_format(self, center):
img_el_idx, img_el = -1, None
ret = []
for el in self.iterate_element(center):
if not isinstance(el, basestring) and not el.tag == "img":
return None
elif isinstance(el, basestring):
if not el.strip():
continue
if img_el != None:
img = self.build_from_element(img_el)
img['title'] = el
ret.append(img)
img_el = None
else:
return None
elif el.tag == "img":
if img_el != None:
ret.append(self.build_from_element(img_el))
else:
img_el = el
if img_el != None:
ret.append(self.build_from_element(img_el))
return ret
def convert_xml_file(self, filename):
out = {'scripts': [], 'contents': []}
temp = {'text': '', 'section': {}, 'in_text': False, 'embedded_text': False}
# replace <startouttext /> and <endouttext /> first
with open(filename, 'r') as f:
problem_text = f.read()
problem_text = re.sub("startouttext\s*/", "text", problem_text)
problem_text = re.sub("endouttext\s*/", "/text", problem_text)
io = StringIO(problem_text)
for event, element in etree.iterparse(io, events=('start', 'end')):
self.logger.debug("%s %s" % ("entering" if event == "start" else "leaving", element))
if event == 'start':
if element.tag in self.text_included_tags:
temp['embedded_text'] = True
if element.tag == 'text':
temp['in_text'] = True
if temp['embedded_text']:
# the text is a part of other element, say <choice><text>Choice A</text></choice>
t = etree.tostring(element).replace('<text>', '').replace('</text>', '').replace('\n', '')
temp['text'] = t
else:
# or it's individual text. find <center> elements and see if there's image inside
# e.g. [x] = ['See the diagram below. \n', <center><img src="" /></center>, 'What's .. ?']
for x in self.split_element_on_tag(element, 'center'):
if isinstance(x, basestring):
out["contents"].append({'type': 'text', 'text': x})
else:
# self.picky_center_element_format returns a list of {'type': 'image', ...} if
#
ret = self.picky_center_element_format(x)
if ret is None:
out["contents"].append({'type': 'text', 'text': etree.tostring(x)})
else:
for el in ret:
out["contents"].append(el)
element.clear()
# find response groups and create a group
elif element.tag in self.group_tags:
temp['group'] = self.build_from_element(element)
temp['group'][self.group_children_field[element.tag]] = []
# or it's indivial element
elif element.tag in self.response_tags + self.script_tags:
temp['section'] = self.build_from_element(element)
elif event == "end":
if element.tag in self.text_included_tags:
temp['embedded_text'] = False
temp['section']['text'] = temp['text']
temp['text'] = ''
# if we want to copy the attributes from this element
if element.tag in self.attr_copy_tags:
self.copy_attribute(element, temp['section'])
# if it's a script, put text as code and add it into scripts part of output
if element.tag in self.script_tags:
temp['section']['code'] = element.text
out['scripts'].append(self.feed_post_process_hook(element, temp['section']))
temp['section'] = None
elif element.tag in self.response_tags:
if temp.get('group', None):
temp['group'][self.group_children_field[temp['group']['_tag_']]].append(
self.feed_post_process_hook(element, temp['section']))
else:
out['contents'].append(self.feed_post_process_hook(element, temp['section']))
temp['section'] = None
elif element.tag in self.group_tags:
out['contents'].append(self.feed_post_process_hook(element, temp['group']))
temp['group'] = None
return (self.dict_del_key(out, '_tag_'))
def process_customresponse(self, element, out):
answer_el = element.find('answer')
answer = self.build_from_element(answer_el)
answer['code'] = answer_el.text
out['answer'] = answer
return out
def process_schematicresponse(self, element, out):
answer_el = element.find('answer')
answer = self.build_from_element(answer_el)
answer['code'] = answer_el.text
out['answer'] = answer
return out
def process_formularesponse(self, element, out):
return out
###
### One-off script for importing courseware form XML format
###
from django.core.management.base import BaseCommand, CommandError
import json
from capaconverter import CapaXMLConverter
class Command(BaseCommand):
help = \
'''Import the specified data directory into the default ModuleStore'''
def handle(self, *args, **options):
self.converter = CapaXMLConverter()
# print json.dumps(self.converter.convert_xml_file("/Users/ccp/code/mitx_all/mitx/1.xml"), indent=2)
print json.dumps(self.converter.convert_xml_file("/Users/ccp/code/mitx_all/data/6.002x/problems/HW3ID1.xml"), indent=2)
# print json.dumps(self.converter.convert_xml_file("/Users/ccp/code/mitx_all/data/6.002x/problems/multichoice.xml"), indent=2)
#!/usr/bin/env python
from django.utils import unittest
import logging
import os.path
from __init__ import CapaXMLConverter
import json
import sys
from lxml import etree
class CapaXMLConverterTestCase(unittest.TestCase):
def setUp(self):
self.converter = CapaXMLConverter()
self.converter.logger.setLevel(logging.DEBUG)
self.problems_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), "problems")
logging.info("Testing problems from folder %s" % self.problems_folder)
self.problem_files = map(lambda filename: os.path.join(self.problems_folder, filename),
filter(lambda filename: filename.endswith(".xml"),
os.listdir(self.problems_folder)))
logging.info("Found %d lon-CAPA XML files. " % len(self.problem_files))
def test_center(self):
xml = '<center><img src="/aa" /></center>'
elements = self.converter.picky_center_element_format(etree.fromstring(xml))
self.assertEqual(elements, [{'url': '/aa', '_tag_': 'img', 'type': 'image'}])
xml = '<center><img src="/aa" />title</center>'
elements = self.converter.picky_center_element_format(etree.fromstring(xml))
self.assertEqual(elements, [{'url': '/aa', '_tag_': 'img', 'type': 'image', 'title': 'title'}])
xml = '<center><img src="/aa" />title<input /></center>'
elements = self.converter.picky_center_element_format(etree.fromstring(xml))
self.assertEqual(elements, None)
def test_iterator(self):
xml = """<text>In this problem we will investigate a fun idea called "duality."
<br />
Consider the series circuit in the diagram shown.
<center>
<img src="/static/images/circuits/duality.gif" />
</center>
We are given device parameters \(V=$V\)V, \(R_1=$R1\Omega\), and \(R_2=$R2\Omega\).
All of the unknown voltages and currents are labeled in associated reference
directions. Solve this circuit for the unknowns and enter them into
the boxes given.
<br />
The value (in Volts) of \(v_1\) is: </text>"""
elements = list(self.converter.iterate_element(etree.fromstring(xml)))
self.assertEqual(7, len(elements))
def test_xmls(self):
for filepath in self.problem_files:
try:
out = self.converter.convert_xml_file(filepath)
except:
print "Failed to convert file %s" % filepath
raise
f = open(filepath.replace(".xml", ".json"), "w")
json.dump(out, f, indent=2)
f.close()
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
###
### One-off script for importing courseware form XML format
###
from django.core.management.base import BaseCommand, CommandError
import json
from lxml import etree
class CapaXMLConverter(object):
def convert_from_xml(self, filename):
out = {'scripts': [], 'contents': []}
temp = {'text':'', 'response':None}
with open(filename, "r") as f:
for event, element in etree.iterparse(f, events=("start", "end")):
if event == "start" and element.tag == "br":
temp['text'] += '\n\n'
elif event == "start" and element.text:
temp['text'] += element.text
elif event == "end" and element.tail:
temp['text'] += element.tail
if event == "start":
if element.tag == 'multiplechoiceresponse':
temp['group'] = {'type': 'multiple_choice', 'choices': []}
elif element.tag == 'truefalseresponse':
temp['group'] = {'type': 'true_false', 'statements': []}
elif element.tag == "choice":
if temp['group']['type'] == 'multiple_choice':
temp['response'] = {'type':'choice', 'text': '', 'correct': element.get('correct') == "true"}
elif temp['group']['type'] == 'true_false':
temp['response'] = {'type':'statement', 'text': '', 'correct': element.get('correct') == "true"}
elif event == "end":
if element.tag == "endouttext":
if temp['response']:
temp['response']['text'] += temp['text'].strip()
else:
out['contents'].append({'type':'paragraph', 'text': temp['text'].strip()})
temp['text'] = ''
elif element.tag in ["multiplechoiceresponse", "truefalseresponse"]:
out['contents'].append(temp['group'])
temp['group'] = None
elif element.tag == "choice":
if temp['group']['type'] == 'true_false':
temp['group']['statements'].append(temp['response'])
elif temp['group']['type'] == 'multiple_choice':
temp['group']['choices'].append(temp['response'])
temp['response'] = None
# self.parse_tree(tree, out)
return out
class Command(BaseCommand):
help = \
'''Import the specified data directory into the default ModuleStore'''
def handle(self, *args, **options):
self.converter = CapaXMLConverter()
# print json.dumps(self.converter.convert_from_xml("/Users/ccp/code/mitx_all/data/6.002x/problems/HW3ID1.xml"), indent=2)
print json.dumps(self.converter.convert_from_xml("/Users/ccp/code/mitx_all/data/6.002x/problems/multichoice.xml"), indent=2)
...@@ -311,6 +311,7 @@ INSTALLED_APPS = ( ...@@ -311,6 +311,7 @@ INSTALLED_APPS = (
'contentstore', 'contentstore',
'github_sync', 'github_sync',
'student', # misleading name due to sharing with lms 'student', # misleading name due to sharing with lms
'capaconverter',
# For asset pipelining # For asset pipelining
'pipeline', 'pipeline',
......
- scripts: scripts:
- type: 'script' - type: 'script'
language: 'python' language: 'python'
code: 'print "Hello world!"' code: 'print "Hello world!"'
- contents: contents:
- type: 'text' - type: 'text'
text: 'This is a sample paragraph. The linebreaks here should matter..?' text: 'This is a sample paragraph. The linebreaks here matter.\n\n'
- type: 'linebreaks' - type: 'multiple_choice'
count: 2
- type: 'multiple_choice'
randomize: true randomize: true
choices: choices:
- type: 'choice' - type: 'choice'
...@@ -19,7 +17,7 @@ ...@@ -19,7 +17,7 @@
- type: 'choice' - type: 'choice'
text: 'Choice C' text: 'Choice C'
correct: true correct: true
- type: 'true_false' - type: 'true_false'
statements: statements:
- type: 'statement' - type: 'statement'
text: 'Sun revolves around Earth. ' text: 'Sun revolves around Earth. '
...@@ -27,12 +25,12 @@ ...@@ -27,12 +25,12 @@
- type: 'statement' - type: 'statement'
text: 'This is a true statement. ' text: 'This is a true statement. '
correct: true correct: true
- type: 'string' - type: 'string'
answer: 'banana' answer: 'banana'
- type: 'numerical' - type: 'numerical'
tolerance: '5%' tolerance: '5%'
answer: 6 answer: 6
- type: 'formula' - type: 'formula'
answer: '-A*(RF/RS)' answer: '-A*(RF/RS)'
samples: 10 samples: 10
tolerance: '5%' tolerance: '5%'
...@@ -52,7 +50,7 @@ ...@@ -52,7 +50,7 @@
- type: 'variable' - type: 'variable'
symbol: 'T' symbol: 'T'
range: '1-3' range: '1-3'
- type: 'custom' - type: 'custom'
script: script:
- type: 'script' - type: 'script'
language: 'python' language: 'python'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment