Commit 0eb9c343 by Usman Khalid

Merge pull request #2396 from edx/usman/lms2090-instructor-task-transactions

instructor_task: Distribute extra items among subtasks of last query.
parents 19674a6f 95a112a3
......@@ -51,7 +51,7 @@ def _get_number_of_subtasks(total_num_items, items_per_query, items_per_task):
return total_num_tasks
def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, items_per_query, items_per_task):
def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, total_num_subtasks, items_per_query, items_per_task):
"""
Generates a chunk of "items" that should be passed into a subtask.
......@@ -68,15 +68,29 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, ite
Warning: if the algorithm here changes, the _get_number_of_subtasks() method should similarly be changed.
"""
num_queries = int(math.ceil(float(total_num_items) / float(items_per_query)))
last_pk = item_queryset[0].pk - 1
last_pk = item_queryset.order_by('pk')[0].pk - 1
num_items_queued = 0
available_num_subtasks = total_num_subtasks
all_item_fields = list(item_fields)
all_item_fields.append('pk')
for _ in range(num_queries):
item_sublist = list(item_queryset.order_by('pk').filter(pk__gt=last_pk).values(*all_item_fields)[:items_per_query])
for query_number in range(num_queries):
# In case total_num_items has increased since it was initially calculated
# include all remaining items in last query.
item_sublist = item_queryset.order_by('pk').filter(pk__gt=last_pk).values(*all_item_fields)
if query_number < num_queries - 1:
item_sublist = list(item_sublist[:items_per_query])
else:
item_sublist = list(item_sublist)
last_pk = item_sublist[-1]['pk']
num_items_this_query = len(item_sublist)
num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task)))
# In case total_num_items has increased since it was initially calculated just distribute the extra
# items among the available subtasks.
num_tasks_this_query = min(available_num_subtasks, int(math.ceil(float(num_items_this_query) / float(items_per_task))))
available_num_subtasks -= num_tasks_this_query
chunk = int(math.ceil(float(num_items_this_query) / float(num_tasks_this_query)))
for i in range(num_tasks_this_query):
items_for_task = item_sublist[i * chunk:i * chunk + chunk]
......@@ -84,11 +98,12 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, ite
num_items_queued += num_items_this_query
# Sanity check: we expect the chunking to be properly summing to the original count:
# Because queueing does not happen in one transaction the number of items in the queryset may change
# from the initial count. For example if the queryset is of the CourseEnrollment model students may
# enroll or unenroll while queueing is in progress. The purpose of the original count is to estimate the
# number of subtasks needed to perform the requested task.
if num_items_queued != total_num_items:
error_msg = "Number of items generated by chunking {} not equal to original total {}".format(num_items_queued, total_num_items)
TASK_LOG.error(error_msg)
raise ValueError(error_msg)
TASK_LOG.info("Number of items generated by chunking %s not equal to original total %s", num_items_queued, total_num_items)
class SubtaskStatus(object):
......@@ -278,6 +293,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
item_queryset,
item_fields,
total_num_items,
total_num_subtasks,
items_per_query,
items_per_task
)
......@@ -293,13 +309,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
new_subtask = create_subtask_fcn(item_list, subtask_status)
new_subtask.apply_async()
# Sanity check: we expect the subtask to be properly summing to the original count:
if num_subtasks != len(subtask_id_list):
task_id = entry.task_id
error_fmt = "Task {}: number of tasks generated {} not equal to original total {}"
error_msg = error_fmt.format(task_id, num_subtasks, len(subtask_id_list))
TASK_LOG.error(error_msg)
raise ValueError(error_msg)
# Subtasks have been queued so no exceptions should be raised after this point.
# Return the task progress as stored in the InstructorTask object.
return progress
......
......@@ -28,7 +28,7 @@ from instructor_task.views import instructor_task_status
TEST_COURSE_ORG = 'edx'
TEST_COURSE_NAME = 'Test Course'
TEST_COURSE_NAME = 'test course'
TEST_COURSE_NUMBER = '1.23x'
TEST_SECTION_NAME = "Problem"
TEST_COURSE_ID = 'edx/1.23x/test_course'
......
"""
Unit tests for instructor_task subtasks.
"""
from uuid import uuid4
from mock import Mock, patch
from student.models import CourseEnrollment
from instructor_task.subtasks import queue_subtasks_for_query
from instructor_task.tests.factories import InstructorTaskFactory
from instructor_task.tests.test_base import InstructorTaskCourseTestCase
class TestSubtasks(InstructorTaskCourseTestCase):
"""Tests for subtasks."""
def setUp(self):
super(TestSubtasks, self).setUp()
self.initialize_course()
def _enroll_students_in_course(self, course_id, num_students):
"""Create and enroll some students in the course."""
for _ in range(num_students):
random_id = uuid4().hex[:8]
self.create_student(username='student{0}'.format(random_id))
def _queue_subtasks(self, create_subtask_fcn, items_per_query, items_per_task, initial_count, extra_count):
"""Queue subtasks while enrolling more students into course in the middle of the process."""
task_id = str(uuid4())
instructor_task = InstructorTaskFactory.create(
course_id=self.course.id,
task_id=task_id,
task_key='dummy_task_key',
task_type='bulk_course_email',
)
self._enroll_students_in_course(self.course.id, initial_count)
task_queryset = CourseEnrollment.objects.filter(course_id=self.course.id)
def initialize_subtask_info(*args): # pylint: disable=unused-argument
"""Instead of initializing subtask info enroll some more students into course."""
self._enroll_students_in_course(self.course.id, extra_count)
return {}
with patch('instructor_task.subtasks.initialize_subtask_info') as mock_initialize_subtask_info:
mock_initialize_subtask_info.side_effect = initialize_subtask_info
queue_subtasks_for_query(
entry=instructor_task,
action_name='action_name',
create_subtask_fcn=create_subtask_fcn,
item_queryset=task_queryset,
item_fields=[],
items_per_query=items_per_query,
items_per_task=items_per_task,
)
def test_queue_subtasks_for_query1(self):
"""Test queue_subtasks_for_query() if in last query the subtasks only need to accommodate < items_per_tasks items."""
mock_create_subtask_fcn = Mock()
self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 8, 1)
# Check number of items for each subtask
mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list
self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 3)
def test_queue_subtasks_for_query2(self):
"""Test queue_subtasks_for_query() if in last query the subtasks need to accommodate > items_per_task items."""
mock_create_subtask_fcn = Mock()
self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 8, 3)
# Check number of items for each subtask
mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list
self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 5)
def test_queue_subtasks_for_query3(self):
"""Test queue_subtasks_for_query() if in last query the number of items available > items_per_query."""
mock_create_subtask_fcn = Mock()
self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 11, 3)
# Check number of items for each subtask
mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list
self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 4)
self.assertEqual(len(mock_create_subtask_fcn_args[3][0][0]), 4)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment