Commit 95a112a3 by Usman Khalid

instructor_task: Distribute extra items among subtasks of last query.

When creating an instructor task total_num_items may change between the
time it and the number of subtasks is calculated and the time the
subtasks are actually queued (all of this cannot happen in one transaction).
In such a case the extra items are distributed among the subtasks of the
last query.

LMS-2090
parent 0f8919a6
......@@ -51,7 +51,7 @@ def _get_number_of_subtasks(total_num_items, items_per_query, items_per_task):
return total_num_tasks
def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, items_per_query, items_per_task):
def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, total_num_subtasks, items_per_query, items_per_task):
"""
Generates a chunk of "items" that should be passed into a subtask.
......@@ -68,15 +68,29 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, ite
Warning: if the algorithm here changes, the _get_number_of_subtasks() method should similarly be changed.
"""
num_queries = int(math.ceil(float(total_num_items) / float(items_per_query)))
last_pk = item_queryset[0].pk - 1
last_pk = item_queryset.order_by('pk')[0].pk - 1
num_items_queued = 0
available_num_subtasks = total_num_subtasks
all_item_fields = list(item_fields)
all_item_fields.append('pk')
for _ in range(num_queries):
item_sublist = list(item_queryset.order_by('pk').filter(pk__gt=last_pk).values(*all_item_fields)[:items_per_query])
for query_number in range(num_queries):
# In case total_num_items has increased since it was initially calculated
# include all remaining items in last query.
item_sublist = item_queryset.order_by('pk').filter(pk__gt=last_pk).values(*all_item_fields)
if query_number < num_queries - 1:
item_sublist = list(item_sublist[:items_per_query])
else:
item_sublist = list(item_sublist)
last_pk = item_sublist[-1]['pk']
num_items_this_query = len(item_sublist)
num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task)))
# In case total_num_items has increased since it was initially calculated just distribute the extra
# items among the available subtasks.
num_tasks_this_query = min(available_num_subtasks, int(math.ceil(float(num_items_this_query) / float(items_per_task))))
available_num_subtasks -= num_tasks_this_query
chunk = int(math.ceil(float(num_items_this_query) / float(num_tasks_this_query)))
for i in range(num_tasks_this_query):
items_for_task = item_sublist[i * chunk:i * chunk + chunk]
......@@ -84,11 +98,12 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, ite
num_items_queued += num_items_this_query
# Sanity check: we expect the chunking to be properly summing to the original count:
# Because queueing does not happen in one transaction the number of items in the queryset may change
# from the initial count. For example if the queryset is of the CourseEnrollment model students may
# enroll or unenroll while queueing is in progress. The purpose of the original count is to estimate the
# number of subtasks needed to perform the requested task.
if num_items_queued != total_num_items:
error_msg = "Number of items generated by chunking {} not equal to original total {}".format(num_items_queued, total_num_items)
TASK_LOG.error(error_msg)
raise ValueError(error_msg)
TASK_LOG.info("Number of items generated by chunking %s not equal to original total %s", num_items_queued, total_num_items)
class SubtaskStatus(object):
......@@ -278,6 +293,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
item_queryset,
item_fields,
total_num_items,
total_num_subtasks,
items_per_query,
items_per_task
)
......@@ -293,13 +309,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
new_subtask = create_subtask_fcn(item_list, subtask_status)
new_subtask.apply_async()
# Sanity check: we expect the subtask to be properly summing to the original count:
if num_subtasks != len(subtask_id_list):
task_id = entry.task_id
error_fmt = "Task {}: number of tasks generated {} not equal to original total {}"
error_msg = error_fmt.format(task_id, num_subtasks, len(subtask_id_list))
TASK_LOG.error(error_msg)
raise ValueError(error_msg)
# Subtasks have been queued so no exceptions should be raised after this point.
# Return the task progress as stored in the InstructorTask object.
return progress
......
......@@ -28,7 +28,7 @@ from instructor_task.views import instructor_task_status
TEST_COURSE_ORG = 'edx'
TEST_COURSE_NAME = 'Test Course'
TEST_COURSE_NAME = 'test course'
TEST_COURSE_NUMBER = '1.23x'
TEST_SECTION_NAME = "Problem"
TEST_COURSE_ID = 'edx/1.23x/test_course'
......
"""
Unit tests for instructor_task subtasks.
"""
from uuid import uuid4
from mock import Mock, patch
from student.models import CourseEnrollment
from instructor_task.subtasks import queue_subtasks_for_query
from instructor_task.tests.factories import InstructorTaskFactory
from instructor_task.tests.test_base import InstructorTaskCourseTestCase
class TestSubtasks(InstructorTaskCourseTestCase):
"""Tests for subtasks."""
def setUp(self):
super(TestSubtasks, self).setUp()
self.initialize_course()
def _enroll_students_in_course(self, course_id, num_students):
"""Create and enroll some students in the course."""
for _ in range(num_students):
random_id = uuid4().hex[:8]
self.create_student(username='student{0}'.format(random_id))
def _queue_subtasks(self, create_subtask_fcn, items_per_query, items_per_task, initial_count, extra_count):
"""Queue subtasks while enrolling more students into course in the middle of the process."""
task_id = str(uuid4())
instructor_task = InstructorTaskFactory.create(
course_id=self.course.id,
task_id=task_id,
task_key='dummy_task_key',
task_type='bulk_course_email',
)
self._enroll_students_in_course(self.course.id, initial_count)
task_queryset = CourseEnrollment.objects.filter(course_id=self.course.id)
def initialize_subtask_info(*args): # pylint: disable=unused-argument
"""Instead of initializing subtask info enroll some more students into course."""
self._enroll_students_in_course(self.course.id, extra_count)
return {}
with patch('instructor_task.subtasks.initialize_subtask_info') as mock_initialize_subtask_info:
mock_initialize_subtask_info.side_effect = initialize_subtask_info
queue_subtasks_for_query(
entry=instructor_task,
action_name='action_name',
create_subtask_fcn=create_subtask_fcn,
item_queryset=task_queryset,
item_fields=[],
items_per_query=items_per_query,
items_per_task=items_per_task,
)
def test_queue_subtasks_for_query1(self):
"""Test queue_subtasks_for_query() if in last query the subtasks only need to accommodate < items_per_tasks items."""
mock_create_subtask_fcn = Mock()
self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 8, 1)
# Check number of items for each subtask
mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list
self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 3)
def test_queue_subtasks_for_query2(self):
"""Test queue_subtasks_for_query() if in last query the subtasks need to accommodate > items_per_task items."""
mock_create_subtask_fcn = Mock()
self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 8, 3)
# Check number of items for each subtask
mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list
self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 5)
def test_queue_subtasks_for_query3(self):
"""Test queue_subtasks_for_query() if in last query the number of items available > items_per_query."""
mock_create_subtask_fcn = Mock()
self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 11, 3)
# Check number of items for each subtask
mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list
self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 4)
self.assertEqual(len(mock_create_subtask_fcn_args[3][0][0]), 4)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment