Commit 80af538d by Don Mitchell

URLencode $ and . in location.name when used as key

parent d15bfb67
......@@ -9,6 +9,7 @@ from xmodule.modulestore.exceptions import InvalidLocationError, ItemNotFoundErr
from xmodule.modulestore.locator import BlockUsageLocator
from xmodule.modulestore.mongo import draft
from xmodule.modulestore import Location
import urllib
class LocMapperStore(object):
......@@ -149,7 +150,7 @@ class LocMapperStore(object):
else:
branch = entry['draft_branch']
usage_id = entry['block_map'].get(location.name)
usage_id = entry['block_map'].get(self._encode_for_mongo(location.name))
if usage_id is None:
if add_entry_if_missing:
usage_id = self._add_to_block_map(location, location_id, entry['block_map'])
......@@ -206,7 +207,7 @@ class LocMapperStore(object):
candidate['_id']['org'],
candidate['_id']['course'],
category,
old_name,
self._decode_from_mongo(old_name),
revision)
return None
......@@ -241,13 +242,14 @@ class LocMapperStore(object):
# turn maps from cursor to list
map_list = list(maps)
encoded_location_name = self._encode_for_mongo(location.name)
# check whether there's already a usage_id for this location (and it agrees w/ any passed in or found)
for map_entry in map_list:
if (location.name in map_entry['block_map'] and
location.category in map_entry['block_map'][location.name]):
if (encoded_location_name in map_entry['block_map'] and
location.category in map_entry['block_map'][encoded_location_name]):
if usage_id is None:
usage_id = map_entry['block_map'][location.name][location.category]
elif usage_id != map_entry['block_map'][location.name][location.category]:
usage_id = map_entry['block_map'][encoded_location_name][location.category]
elif usage_id != map_entry['block_map'][encoded_location_name][location.category]:
raise DuplicateItemError(usage_id, self, 'location_map')
computed_usage_id = usage_id
......@@ -256,8 +258,8 @@ class LocMapperStore(object):
for map_entry in map_list:
if computed_usage_id is None:
computed_usage_id = self._add_to_block_map(location, location_id, map_entry['block_map'])
elif (location.name not in map_entry['block_map'] or
location.category not in map_entry['block_map'][location.name]):
elif (encoded_location_name not in map_entry['block_map'] or
location.category not in map_entry['block_map'][encoded_location_name]):
alt_usage_id = self._verify_uniqueness(computed_usage_id, map_entry['block_map'])
if alt_usage_id != computed_usage_id:
if usage_id is not None:
......@@ -271,7 +273,7 @@ class LocMapperStore(object):
True
)
map_entry['block_map'].setdefault(location.name, {})[location.category] = computed_usage_id
map_entry['block_map'].setdefault(encoded_location_name, {})[location.category] = computed_usage_id
self.location_map.update({'_id': map_entry['_id']}, {'$set': {'block_map': map_entry['block_map']}})
return computed_usage_id
......@@ -293,10 +295,11 @@ class LocMapperStore(object):
location_id = self._interpret_location_course_id(old_course_id, location)
maps = self.location_map.find(location_id)
encoded_location_name = self._encode_for_mongo(location.name)
for map_entry in maps:
# handle noop of renaming to same name
if (location.name in map_entry['block_map'] and
map_entry['block_map'][location.name].get(location.category) == usage_id):
if (encoded_location_name in map_entry['block_map'] and
map_entry['block_map'][encoded_location_name].get(location.category) == usage_id):
continue
alt_usage_id = self._verify_uniqueness(usage_id, map_entry['block_map'])
if alt_usage_id != usage_id:
......@@ -307,8 +310,8 @@ class LocMapperStore(object):
else:
raise DuplicateItemError(usage_id, self, 'location_map')
if location.category in map_entry['block_map'].setdefault(location.name, {}):
map_entry['block_map'][location.name][location.category] = usage_id
if location.category in map_entry['block_map'].setdefault(encoded_location_name, {}):
map_entry['block_map'][encoded_location_name][location.category] = usage_id
self.location_map.update({'_id': map_entry['_id']}, {'$set': {'block_map': map_entry['block_map']}})
return usage_id
......@@ -323,12 +326,13 @@ class LocMapperStore(object):
location_id = self._interpret_location_course_id(old_course_id, location)
maps = self.location_map.find(location_id)
encoded_location_name = self._encode_for_mongo(location.name)
for map_entry in maps:
if location.category in map_entry['block_map'].setdefault(location.name, {}):
if len(map_entry['block_map'][location.name]) == 1:
del map_entry['block_map'][location.name]
if location.category in map_entry['block_map'].setdefault(encoded_location_name, {}):
if len(map_entry['block_map'][encoded_location_name]) == 1:
del map_entry['block_map'][encoded_location_name]
else:
del map_entry['block_map'][location.name][location.category]
del map_entry['block_map'][encoded_location_name][location.category]
self.location_map.update({'_id': map_entry['_id']}, {'$set': {'block_map': map_entry['block_map']}})
def _add_to_block_map(self, location, location_id, block_map):
......@@ -341,7 +345,8 @@ class LocMapperStore(object):
usage_id = self._verify_uniqueness(location.category + location.name[:3], block_map)
else:
usage_id = location.name
block_map.setdefault(location.name, {})[location.category] = usage_id
encoded_location_name = self._encode_for_mongo(location.name)
block_map.setdefault(encoded_location_name, {})[location.category] = usage_id
self.location_map.update(location_id, {'$set': {'block_map': block_map}})
return usage_id
......@@ -386,3 +391,20 @@ class LocMapperStore(object):
name += str(randint(0, 9))
return self._verify_uniqueness(name, block_map)
return name
def _encode_for_mongo(self, fieldname):
"""
Fieldnames in mongo cannot have periods nor dollar signs. So encode them.
:param fieldname: an atomic field name. Note, don't pass structured paths as it will flatten them
"""
for char in [".", "$"]:
fieldname = fieldname.replace(char, '%{:02x}'.format(ord(char)))
return fieldname
def _decode_from_mongo(self, fieldname):
"""
The inverse of _encode_for_mongo
:param fieldname: with period and dollar escaped
"""
return urllib.unquote(fieldname)
......@@ -324,6 +324,24 @@ class TestLocationMapper(unittest.TestCase):
prob_location = loc_mapper().translate_locator_to_location(prob_locator)
self.assertEqual(prob_location, Location('i4x', org, course, 'problem', 'abc123'))
def test_special_chars(self):
"""
Test locations which have special characters
"""
# afaik, location.check_list prevents $ in all fields
org = 'foo.org.edu'
course = 'bar.course-4'
name = 'baz.run_4-3'
old_style_course_id = '{}/{}/{}'.format(org, course, name)
location = Location('i4x', org, course, 'course', name)
prob_locator = loc_mapper().translate_location(
old_style_course_id,
location,
add_entry_if_missing=True
)
reverted_location = loc_mapper().translate_locator_to_location(prob_locator)
self.assertEqual(location, reverted_location)
def test_add_block(self):
"""
Test add_block_location_translator(location, old_course_id=None, usage_id=None)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment