Commit a55724d1 by Calen Pennington

Prevent unicode data from being injected into a Location using the _replace function

parent 46109bb3
......@@ -40,6 +40,21 @@ INVALID_HTML_CHARS = re.compile(r"[^\w-]")
_LocationBase = namedtuple('LocationBase', 'tag org course category name revision')
def _check_location_part(val, regexp):
"""
Check that `regexp` doesn't match inside `val`. If it does, raise an exception
Args:
val (string): The value to check
regexp (re.RegexObject): The regular expression specifying invalid characters
Raises:
InvalidLocationError: Raised if any invalid character is found in `val`
"""
if val is not None and regexp.search(val) is not None:
raise InvalidLocationError("Invalid characters in {!r}.".format(val))
class Location(_LocationBase):
'''
Encodes a location.
......@@ -145,7 +160,6 @@ class Location(_LocationBase):
Components may be set to None, which may be interpreted in some contexts
to mean wildcard selection.
"""
if (org is None and course is None and category is None and name is None and revision is None):
location = loc_or_tag
else:
......@@ -161,23 +175,18 @@ class Location(_LocationBase):
check_list(list_)
def check_list(list_):
def check(val, regexp):
if val is not None and regexp.search(val) is not None:
log.debug('invalid characters val=%r, list_=%r', val, list_)
raise InvalidLocationError("Invalid characters in {!r}.".format(val))
list_ = list(list_)
for val in list_[:4] + [list_[5]]:
check(val, INVALID_CHARS)
_check_location_part(val, INVALID_CHARS)
# names allow colons
check(list_[4], INVALID_CHARS_NAME)
_check_location_part(list_[4], INVALID_CHARS_NAME)
if isinstance(location, Location):
return location
elif isinstance(location, basestring):
match = URL_RE.match(location)
if match is None:
log.debug("location %r doesn't match URL" % location)
log.debug("location %r doesn't match URL", location)
raise InvalidLocationError(location)
groups = match.groupdict()
check_dict(groups)
......@@ -249,6 +258,18 @@ class Location(_LocationBase):
return "/".join([self.org, self.course, self.name])
def _replace(self, **kwargs):
"""
Return a new :class:`Location` with values replaced
by the values specified in `**kwargs`
"""
for name, value in kwargs.iteritems():
if name == 'name':
_check_location_part(value, INVALID_CHARS_NAME)
else:
_check_location_part(value, INVALID_CHARS)
return super(Location, self)._replace(**kwargs)
def replace(self, **kwargs):
'''
Expose a public method for replacing location elements
......
from nose.tools import assert_equals, assert_raises, assert_not_equals # pylint: disable=E0611
import ddt
from unittest import TestCase
from xmodule.modulestore import Location
from xmodule.modulestore.exceptions import InvalidLocationError
# Pairs for testing the clean* functions.
# The first item in the tuple is the input string.
# The second item in the tuple is what the result of
# replacement should be.
GENERAL_PAIRS = [
('', ''),
(' ', '_'),
('abc,', 'abc_'),
('ab fg!@//\\aj', 'ab_fg_aj'),
(u"ab\xA9", "ab_"), # no unicode allowed for now
]
def check_string_roundtrip(url):
assert_equals(url, Location(url).url())
assert_equals(url, str(Location(url)))
def test_string_roundtrip():
check_string_roundtrip("tag://org/course/category/name")
check_string_roundtrip("tag://org/course/category/name@revision")
@ddt.ddt
class TestLocations(TestCase):
@ddt.data(
"tag://org/course/category/name",
"tag://org/course/category/name@revision"
)
def test_string_roundtrip(self, url):
self.assertEquals(url, Location(url).url())
self.assertEquals(url, str(Location(url)))
input_dict = {
@ddt.data(
{
'tag': 'tag',
'course': 'course',
'category': 'category',
'name': 'name',
'org': 'org'
}
also_valid_dict = {
},
{
'tag': 'tag',
'course': 'course',
'category': 'category',
'name': 'name:more_name',
'org': 'org'
}
input_list = ['tag', 'org', 'course', 'category', 'name']
input_str = "tag://org/course/category/name"
input_str_rev = "tag://org/course/category/name@revision"
valid = (input_list, input_dict, input_str, input_str_rev, also_valid_dict)
},
['tag', 'org', 'course', 'category', 'name'],
"tag://org/course/category/name",
"tag://org/course/category/name@revision",
u"tag://org/course/category/name",
u"tag://org/course/category/name@revision",
)
def test_is_valid(self, loc):
self.assertTrue(Location.is_valid(loc))
invalid_dict = {
@ddt.data(
{
'tag': 'tag',
'course': 'course',
'category': 'category',
'name': 'name@more_name',
'org': 'org'
}
invalid_dict2 = {
},
{
'tag': 'tag',
'course': 'course',
'category': 'category',
'name': 'name ', # extra space
'org': 'org'
}
invalid = ("foo", ["foo"], ["foo", "bar"],
},
"foo",
["foo"],
["foo", "bar"],
["foo", "bar", "baz", "blat:blat", "foo:bar"], # ':' ok in name, not in category
"tag://org/course/category/name with spaces@revision",
"tag://org/course/category/name/with/slashes@revision",
invalid_dict,
invalid_dict2)
def test_is_valid():
for v in valid:
assert_equals(Location.is_valid(v), True)
for v in invalid:
assert_equals(Location.is_valid(v), False)
u"tag://org/course/category/name\xae", # No non-ascii characters for now
u"tag://org/course/category\xae/name", # No non-ascii characters for now
)
def test_is_invalid(self, loc):
self.assertFalse(Location.is_valid(loc))
def test_dict(self):
input_dict = {
'tag': 'tag',
'course': 'course',
'category': 'category',
'name': 'name',
'org': 'org'
}
def test_dict():
assert_equals("tag://org/course/category/name", Location(input_dict).url())
assert_equals(dict(revision=None, **input_dict), Location(input_dict).dict())
self.assertEquals("tag://org/course/category/name", Location(input_dict).url())
self.assertEquals(dict(revision=None, **input_dict), Location(input_dict).dict())
input_dict['revision'] = 'revision'
assert_equals("tag://org/course/category/name@revision", Location(input_dict).url())
assert_equals(input_dict, Location(input_dict).dict())
self.assertEquals("tag://org/course/category/name@revision", Location(input_dict).url())
self.assertEquals(input_dict, Location(input_dict).dict())
def test_list():
assert_equals("tag://org/course/category/name", Location(input_list).url())
assert_equals(input_list + [None], Location(input_list).list())
def test_list(self):
input_list = ['tag', 'org', 'course', 'category', 'name']
self.assertEquals("tag://org/course/category/name", Location(input_list).url())
self.assertEquals(input_list + [None], Location(input_list).list())
input_list.append('revision')
assert_equals("tag://org/course/category/name@revision", Location(input_list).url())
assert_equals(input_list, Location(input_list).list())
self.assertEquals("tag://org/course/category/name@revision", Location(input_list).url())
self.assertEquals(input_list, Location(input_list).list())
def test_location():
def test_location(self):
input_list = ['tag', 'org', 'course', 'category', 'name']
assert_equals("tag://org/course/category/name", Location(Location(input_list)).url())
self.assertEquals("tag://org/course/category/name", Location(Location(input_list)).url())
def test_none():
assert_equals([None] * 6, Location(None).list())
def test_invalid_locations():
assert_raises(InvalidLocationError, Location, "foo")
assert_raises(InvalidLocationError, Location, ["foo", "bar"])
assert_raises(InvalidLocationError, Location, ["foo", "bar", "baz", "blat/blat", "foo"])
assert_raises(InvalidLocationError, Location, ["foo", "bar", "baz", "blat", "foo/bar"])
assert_raises(InvalidLocationError, Location, "tag://org/course/category/name with spaces@revision")
assert_raises(InvalidLocationError, Location, "tag://org/course/category/name/revision")
def test_none(self):
self.assertEquals([None] * 6, Location(None).list())
@ddt.data(
"foo",
["foo", "bar"],
["foo", "bar", "baz", "blat/blat", "foo"],
["foo", "bar", "baz", "blat", "foo/bar"],
"tag://org/course/category/name with spaces@revision",
"tag://org/course/category/name/revision",
)
def test_invalid_locations(self, loc):
with self.assertRaises(InvalidLocationError):
Location(loc)
def test_equality():
assert_equals(
def test_equality(self):
self.assertEquals(
Location('tag', 'org', 'course', 'category', 'name'),
Location('tag', 'org', 'course', 'category', 'name')
)
assert_not_equals(
self.assertNotEquals(
Location('tag', 'org', 'course', 'category', 'name1'),
Location('tag', 'org', 'course', 'category', 'name')
)
# All the cleaning functions should do the same thing with these
general_pairs = [('', ''),
(' ', '_'),
('abc,', 'abc_'),
('ab fg!@//\\aj', 'ab_fg_aj'),
(u"ab\xA9", "ab_"), # no unicode allowed for now
]
def test_clean():
pairs = general_pairs + [
@ddt.data(
('a:b', 'a_b'), # no colons in non-name components
('a-b', 'a-b'), # dashes ok
('a.b', 'a.b'), # dot ok
]
for input, output in pairs:
assert_equals(Location.clean(input), output)
*GENERAL_PAIRS
)
def test_clean(self, pair):
self.assertEquals(Location.clean(pair[0]), pair[1])
def test_clean_for_url_name():
pairs = general_pairs + [
@ddt.data(
('a:b', 'a:b'), # colons ok in names
('a-b', 'a-b'), # dashes ok in names
('a.b', 'a.b'), # dot ok in names
]
for input, output in pairs:
assert_equals(Location.clean_for_url_name(input), output)
*GENERAL_PAIRS
)
def test_clean_for_url_name(self, pair):
self.assertEquals(Location.clean_for_url_name(pair[0]), pair[1])
def test_clean_for_html():
pairs = general_pairs + [
@ddt.data(
("a:b", "a_b"), # no colons for html use
("a-b", "a-b"), # dashes ok (though need to be replaced in various use locations. ugh.)
('a.b', 'a_b'), # no dots.
]
for input, output in pairs:
assert_equals(Location.clean_for_html(input), output)
*GENERAL_PAIRS
)
def test_clean_for_html(self, pair):
self.assertEquals(Location.clean_for_html(pair[0]), pair[1])
def test_html_id():
def test_html_id(self):
loc = Location("tag://org/course/cat/name:more_name@rev")
assert_equals(loc.html_id(), "tag-org-course-cat-name_more_name-rev")
self.assertEquals(loc.html_id(), "tag-org-course-cat-name_more_name-rev")
def test_course_id():
def test_course_id(self):
loc = Location('i4x', 'mitX', '103', 'course', 'test2')
assert_equals('mitX/103/test2', loc.course_id)
self.assertEquals('mitX/103/test2', loc.course_id)
loc = Location('i4x', 'mitX', '103', '_not_a_course', 'test2')
with assert_raises(InvalidLocationError):
loc.course_id
with self.assertRaises(InvalidLocationError):
loc.course_id # pylint: disable=pointless-statement
def test_replacement(self):
self.assertEquals(
Location('t://o/c/c/n@r')._replace(name='new_name'),
Location('t://o/c/c/new_name@r'),
)
with self.assertRaises(InvalidLocationError):
Location('t://o/c/c/n@r')._replace(name=u'name\xae')
@ddt.data('org', 'course', 'category', 'name', 'revision')
def test_immutable(self, attr):
loc = Location('t://o/c/c/n@r')
with self.assertRaises(AttributeError):
setattr(loc, attr, attr)
......@@ -205,11 +205,11 @@ class ImportSystem(XMLParsingSystem, MakoDescriptorSystem):
descriptor.data_dir = course_dir
xmlstore.modules[course_id][descriptor.location] = descriptor
xmlstore.modules[course_id][descriptor.scope_ids.usage_id] = descriptor
if descriptor.has_children:
for child in descriptor.get_children():
parent_tracker.add_parent(child.location, descriptor.location)
parent_tracker.add_parent(child.scope_ids.usage_id, descriptor.scope_ids.usage_id)
# After setting up the descriptor, save any changes that we have
# made to attributes on the descriptor to the underlying KeyValueStore.
......@@ -412,8 +412,8 @@ class XMLModuleStore(ModuleStoreReadBase):
if course_descriptor is not None and not isinstance(course_descriptor, ErrorDescriptor):
self.courses[course_dir] = course_descriptor
self._location_errors[course_descriptor.location] = errorlog
self.parent_trackers[course_descriptor.id].make_known(course_descriptor.location)
self._location_errors[course_descriptor.scope_ids.usage_id] = errorlog
self.parent_trackers[course_descriptor.id].make_known(course_descriptor.scope_ids.usage_id)
else:
# Didn't load course. Instead, save the errors elsewhere.
self.errored_courses[course_dir] = errorlog
......@@ -570,7 +570,7 @@ class XMLModuleStore(ModuleStoreReadBase):
html = f.read().decode('utf-8')
# tabs are referenced in policy.json through a 'slug' which is just the filename without the .html suffix
slug = os.path.splitext(os.path.basename(filepath))[0]
loc = Location('i4x', course_descriptor.location.org, course_descriptor.location.course, category, slug)
loc = course_descriptor.scope_ids.usage_id._replace(category=category, name=slug)
module = system.construct_xblock_from_class(
HtmlDescriptor,
# We're loading a descriptor, so student_id is meaningless
......@@ -588,7 +588,7 @@ class XMLModuleStore(ModuleStoreReadBase):
module.display_name = tab['name']
module.data_dir = course_dir
module.save()
self.modules[course_descriptor.id][module.location] = module
self.modules[course_descriptor.id][module.scope_ids.usage_id] = module
except Exception, e:
logging.exception("Failed to load %s. Skipping... \
Exception: %s", filepath, unicode(e))
......
......@@ -310,9 +310,7 @@ def import_module(
source_course_location, dest_course_location, allow_not_found=False,
do_import_static=True):
logging.debug('processing import of module {url}...'.format(
url=module.location.url()
))
logging.debug('processing import of module {}...'.format(module.location.url()))
content = {}
for field in module.fields.values():
......
......@@ -93,7 +93,6 @@ transifex-client==0.9.1
# Used for testing
coverage==3.7
ddt==0.4.0
factory_boy==2.2.1
mock==1.0.1
nosexcover==1.0.7
......
......@@ -22,3 +22,6 @@
-e git+https://github.com/edx/django-waffle.git@823a102e48#egg=django-waffle
-e git+https://github.com/edx/event-tracking.git@f0211d702d#egg=event-tracking
-e git+https://github.com/edx/bok-choy.git@bc6f1adbe439618162079f1004b2b3db3b6f8916#egg=bok_choy
# Move back to upstream release once https://github.com/txels/ddt/pull/13 is merged
-e git+https://github.com/edx/ddt.git@9e8010b8777aa40b848fdb76de6e60081616325a#egg=ddt
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment