Commit 22a8ce39 by Adam Palay

add tracking for memory usage in creating subtasks (LMS-6488)

parent 12e2d442
......@@ -5,6 +5,8 @@ from time import time
import json
from uuid import uuid4
import math
import psutil
from contextlib import contextmanager
from celery.utils.log import get_task_logger
from celery.states import SUCCESS, READY_STATES, RETRY
......@@ -39,10 +41,43 @@ def _get_number_of_subtasks(total_num_items, items_per_task):
The number of subtask_id values returned by this should match the number of chunks returned
by the generate_items_for_subtask generator.
"""
return int(math.ceil(float(total_num_items) / float(items_per_task)))
num_subtasks, remainder = divmod(total_num_items, items_per_task)
if remainder:
num_subtasks += 1
return num_subtasks
def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, items_per_task, total_num_subtasks):
@contextmanager
def track_memory_usage(metric, course_id):
"""
Context manager to track how much memory (in bytes) a given process uses.
Metrics will look like: 'course_email.subtask_generation.memory.rss'
or 'course_email.subtask_generation.memory.vms'.
"""
memory_types = ['rss', 'vms']
process = psutil.Process()
baseline_memory_info = process.get_memory_info()
baseline_usages = [getattr(baseline_memory_info, memory_type) for memory_type in memory_types]
yield
for memory_type, baseline_usage in zip(memory_types, baseline_usages):
total_memory_info = process.get_memory_info()
total_usage = getattr(total_memory_info, memory_type)
memory_used = total_usage - baseline_usage
dog_stats_api.increment(
metric + "." + memory_type,
memory_used,
tags=["course_id:{}".format(course_id)],
)
def _generate_items_for_subtask(
item_queryset,
item_fields,
total_num_items,
items_per_task,
total_num_subtasks,
course_id,
):
"""
Generates a chunk of "items" that should be passed into a subtask.
......@@ -53,6 +88,7 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, ite
`total_num_items` : the result of item_queryset.count().
`items_per_query` : size of chunks to break the query operation into.
`items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.
`course_id` : course_id of the course. Only needed for the track_memory_usage context manager.
Returns: yields a list of dicts, where each dict contains the fields in `item_fields`, plus the 'pk' field.
......@@ -64,18 +100,20 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, ite
num_subtasks = 0
items_for_task = []
for item in item_queryset.values(*all_item_fields).iterator():
if len(items_for_task) == items_per_task and num_subtasks < total_num_subtasks - 1:
yield items_for_task
num_items_queued += items_per_task
items_for_task = []
num_subtasks += 1
items_for_task.append(item)
# yield remainder items for task, if any
if items_for_task:
yield items_for_task
num_items_queued += len(items_for_task)
with track_memory_usage('course_email.subtask_generation.memory', course_id):
for item in item_queryset.values(*all_item_fields).iterator():
if len(items_for_task) == items_per_task and num_subtasks < total_num_subtasks - 1:
yield items_for_task
num_items_queued += items_per_task
items_for_task = []
num_subtasks += 1
items_for_task.append(item)
# yield remainder items for task, if any
if items_for_task:
yield items_for_task
num_items_queued += len(items_for_task)
# Note, depending on what kind of DB is used, it's possible for the queryset
# we iterate over to change in the course of the query. Therefore it's
......@@ -269,19 +307,20 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
# Construct a generator that will return the recipients to use for each subtask.
# Pass in the desired fields to fetch for each recipient.
item_generator = _generate_items_for_subtask(
item_list_generator = _generate_items_for_subtask(
item_queryset,
item_fields,
total_num_items,
items_per_task,
total_num_subtasks,
entry.course_id,
)
# Now create the subtasks, and start them running.
TASK_LOG.info("Task %s: creating %s subtasks to process %s items.",
task_id, total_num_subtasks, total_num_items)
num_subtasks = 0
for item_list in item_generator:
for item_list in item_list_generator:
subtask_id = subtask_id_list[num_subtasks]
num_subtasks += 1
subtask_status = SubtaskStatus.create(subtask_id)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment