subtasks.py 26.8 KB
Newer Older
1 2 3 4 5
"""
This module contains celery task functions for handling the management of subtasks.
"""
from time import time
import json
6 7
from uuid import uuid4
import math
8 9

from celery.utils.log import get_task_logger
10
from celery.states import SUCCESS, READY_STATES, RETRY
11
from dogapi import dog_stats_api
12

13
from django.db import transaction, DatabaseError
14
from django.core.cache import cache
15 16 17

from instructor_task.models import InstructorTask, PROGRESS, QUEUING

18
TASK_LOG = get_task_logger(__name__)
19

20
# Lock expiration should be long enough to allow a subtask to complete.
21
SUBTASK_LOCK_EXPIRE = 60 * 10  # Lock expires in 10 minutes
22 23 24
# Number of times to retry if a subtask update encounters a lock on the InstructorTask.
# (These are recursive retries, so don't make this number too large.)
MAX_DATABASE_LOCK_RETRIES = 5
25

26

27 28 29 30 31
class DuplicateTaskException(Exception):
    """Exception indicating that a task already exists or has already completed."""
    pass


32
def _get_number_of_subtasks(total_num_items, items_per_query, items_per_task):
33
    """
34
    Determines number of subtasks that would be generated by _generate_items_for_subtask.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50

    This needs to be calculated before a query is executed so that the list of all subtasks can be
    stored in the InstructorTask before any subtasks are started.

    The number of subtask_id values returned by this should match the number of chunks returned
    by the generate_items_for_subtask generator.
    """
    total_num_tasks = 0
    num_queries = int(math.ceil(float(total_num_items) / float(items_per_query)))
    num_items_remaining = total_num_items
    for _ in range(num_queries):
        num_items_this_query = min(num_items_remaining, items_per_query)
        num_items_remaining -= num_items_this_query
        num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task)))
        total_num_tasks += num_tasks_this_query

51
    return total_num_tasks
52 53


54
def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, total_num_subtasks, items_per_query, items_per_task):
55 56 57 58 59 60 61 62 63 64 65 66 67
    """
    Generates a chunk of "items" that should be passed into a subtask.

    Arguments:
        `item_queryset` : a query set that defines the "items" that should be passed to subtasks.
        `item_fields` : the fields that should be included in the dict that is returned.
            These are in addition to the 'pk' field.
        `total_num_items` : the result of item_queryset.count().
        `items_per_query` : size of chunks to break the query operation into.
        `items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.

    Returns:  yields a list of dicts, where each dict contains the fields in `item_fields`, plus the 'pk' field.

68
    Warning:  if the algorithm here changes, the _get_number_of_subtasks() method should similarly be changed.
69 70
    """
    num_queries = int(math.ceil(float(total_num_items) / float(items_per_query)))
71
    last_pk = item_queryset.order_by('pk')[0].pk - 1
72
    num_items_queued = 0
73
    available_num_subtasks = total_num_subtasks
74 75
    all_item_fields = list(item_fields)
    all_item_fields.append('pk')
76 77 78 79 80 81 82 83 84 85

    for query_number in range(num_queries):
        # In case total_num_items has increased since it was initially calculated
        # include all remaining items in last query.
        item_sublist = item_queryset.order_by('pk').filter(pk__gt=last_pk).values(*all_item_fields)
        if query_number < num_queries - 1:
            item_sublist = list(item_sublist[:items_per_query])
        else:
            item_sublist = list(item_sublist)

86 87
        last_pk = item_sublist[-1]['pk']
        num_items_this_query = len(item_sublist)
88 89 90 91 92 93

        # In case total_num_items has increased since it was initially calculated just distribute the extra
        # items among the available subtasks.
        num_tasks_this_query = min(available_num_subtasks, int(math.ceil(float(num_items_this_query) / float(items_per_task))))
        available_num_subtasks -= num_tasks_this_query

94 95 96 97 98 99 100
        chunk = int(math.ceil(float(num_items_this_query) / float(num_tasks_this_query)))
        for i in range(num_tasks_this_query):
            items_for_task = item_sublist[i * chunk:i * chunk + chunk]
            yield items_for_task

        num_items_queued += num_items_this_query

101 102 103 104
    # Because queueing does not happen in one transaction the number of items in the queryset may change
    # from the initial count. For example if the queryset is of the CourseEnrollment model students may
    # enroll or unenroll while queueing is in progress. The purpose of the original count is to estimate the
    # number of subtasks needed to perform the requested task.
105
    if num_items_queued != total_num_items:
106
        TASK_LOG.info("Number of items generated by chunking %s not equal to original total %s", num_items_queued, total_num_items)
107 108


109
class SubtaskStatus(object):
110
    """
111 112
    Create and return a dict for tracking the status of a subtask.

113
    SubtaskStatus values are:
114 115 116 117 118 119 120 121 122 123 124

      'task_id' : id of subtask.  This is used to pass task information across retries.
      'attempted' : number of attempts -- should equal succeeded plus failed
      'succeeded' : number that succeeded in processing
      'skipped' : number that were not processed.
      'failed' : number that failed during processing
      'retried_nomax' : number of times the subtask has been retried for conditions that
          should not have a maximum count applied
      'retried_withmax' : number of times the subtask has been retried for conditions that
          should have a maximum count applied
      'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
125

126
    Object is not JSON-serializable, so to_dict and from_dict methods are provided so that
127
    it can be passed as a serializable argument to tasks (and be reconstituted within such tasks).
128

129
    In future, we may want to include specific error information
130
    indicating the reason for failure.
131
    Also, we should count up "not attempted" separately from attempted/failed.
132
    """
133

134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
    def __init__(self, task_id, attempted=None, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):
        """Construct a SubtaskStatus object."""
        self.task_id = task_id
        if attempted is not None:
            self.attempted = attempted
        else:
            self.attempted = succeeded + failed
        self.succeeded = succeeded
        self.failed = failed
        self.skipped = skipped
        self.retried_nomax = retried_nomax
        self.retried_withmax = retried_withmax
        self.state = state if state is not None else QUEUING

    @classmethod
    def from_dict(self, d):
        """Construct a SubtaskStatus object from a dict representation."""
        options = dict(d)
        task_id = options['task_id']
        del options['task_id']
        return SubtaskStatus.create(task_id, **options)

    @classmethod
    def create(self, task_id, **options):
        """Construct a SubtaskStatus object."""
159
        return self(task_id, **options)
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195

    def to_dict(self):
        """
        Output a dict representation of a SubtaskStatus object.

        Use for creating a JSON-serializable representation for use by tasks.
        """
        return self.__dict__

    def increment(self, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):
        """
        Update the result of a subtask with additional results.

        Kwarg arguments are incremented to the existing values.
        The exception is for `state`, which if specified is used to override the existing value.
        """
        self.attempted += (succeeded + failed)
        self.succeeded += succeeded
        self.failed += failed
        self.skipped += skipped
        self.retried_nomax += retried_nomax
        self.retried_withmax += retried_withmax
        if state is not None:
            self.state = state

    def get_retry_count(self):
        """Returns the number of retries of any kind."""
        return self.retried_nomax + self.retried_withmax

    def __repr__(self):
        """Return print representation of a SubtaskStatus object."""
        return 'SubtaskStatus<%r>' % (self.to_dict(),)

    def __unicode__(self):
        """Return unicode version of a SubtaskStatus object representation."""
        return unicode(repr(self))
196 197


198
def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
199 200 201
    """
    Store initial subtask information to InstructorTask object.

202 203 204 205
    The InstructorTask's "task_output" field is initialized.  This is a JSON-serialized dict.
    Counters for 'attempted', 'succeeded', 'failed', 'skipped' keys are initialized to zero,
    as is the 'duration_ms' value.  A 'start_time' is stored for later duration calculations,
    and the total number of "things to do" is set, so the user can be told how much needs to be
206
    done overall.  The `action_name` is also stored, to help with constructing more readable
207
    task_progress messages.
208 209 210 211 212 213 214 215

    The InstructorTask's "subtasks" field is also initialized.  This is also a JSON-serialized dict.
    Keys include 'total', 'succeeded', 'retried', 'failed', which are counters for the number of
    subtasks.  'Total' is set here to the total number, while the other three are initialized to zero.
    Once the counters for 'succeeded' and 'failed' match the 'total', the subtasks are done and
    the InstructorTask's "status" will be changed to SUCCESS.

    The "subtasks" field also contains a 'status' key, that contains a dict that stores status
216
    information for each subtask.  The value for each subtask (keyed by its task_id)
217
    is its subtask status, as defined by SubtaskStatus.to_dict().
218 219 220 221 222 223 224 225 226

    This information needs to be set up in the InstructorTask before any of the subtasks start
    running.  If not, there is a chance that the subtasks could complete before the parent task
    is done creating subtasks.  Doing so also simplifies the save() here, as it avoids the need
    for locking.

    Monitoring code should assume that if an InstructorTask has subtask information, that it should
    rely on the status stored in the InstructorTask object, rather than status stored in the
    corresponding AsyncResult.
227
    """
228
    task_progress = {
229 230 231 232 233 234 235 236 237
        'action_name': action_name,
        'attempted': 0,
        'failed': 0,
        'skipped': 0,
        'succeeded': 0,
        'total': total_num,
        'duration_ms': int(0),
        'start_time': time()
    }
238
    entry.task_output = InstructorTask.create_output_for_success(task_progress)
239 240 241 242
    entry.task_state = PROGRESS

    # Write out the subtasks information.
    num_subtasks = len(subtask_id_list)
243
    # Note that may not be necessary to store initial value with all those zeroes!
244 245
    # Write out as a dict, so it will go more smoothly into json.
    subtask_status = {subtask_id: (SubtaskStatus.create(subtask_id)).to_dict() for subtask_id in subtask_id_list}
246 247 248 249 250 251
    subtask_dict = {
        'total': num_subtasks,
        'succeeded': 0,
        'failed': 0,
        'status': subtask_status
    }
252 253 254 255
    entry.subtasks = json.dumps(subtask_dict)

    # and save the entry immediately, before any subtasks actually start work:
    entry.save_now()
256
    return task_progress
257 258


259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_queryset, item_fields, items_per_query, items_per_task):
    """
    Generates and queues subtasks to each execute a chunk of "items" generated by a queryset.

    Arguments:
        `entry` : the InstructorTask object for which subtasks are being queued.
        `action_name` : a past-tense verb that can be used for constructing readable status messages.
        `create_subtask_fcn` : a function of two arguments that constructs the desired kind of subtask object.
            Arguments are the list of items to be processed by this subtask, and a SubtaskStatus
            object reflecting initial status (and containing the subtask's id).
        `item_queryset` : a query set that defines the "items" that should be passed to subtasks.
        `item_fields` : the fields that should be included in the dict that is returned.
            These are in addition to the 'pk' field.
        `items_per_query` : size of chunks to break the query operation into.
        `items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.

    Returns:  the task progress as stored in the InstructorTask object.

    """
    task_id = entry.task_id
    total_num_items = item_queryset.count()

    # Calculate the number of tasks that will be created, and create a list of ids for each task.
    total_num_subtasks = _get_number_of_subtasks(total_num_items, items_per_query, items_per_task)
    subtask_id_list = [str(uuid4()) for _ in range(total_num_subtasks)]

    # Update the InstructorTask  with information about the subtasks we've defined.
    TASK_LOG.info("Task %s: updating InstructorTask %s with subtask info for %s subtasks to process %s items.",
             task_id, entry.id, total_num_subtasks, total_num_items)  # pylint: disable=E1101
    progress = initialize_subtask_info(entry, action_name, total_num_items, subtask_id_list)

    # Construct a generator that will return the recipients to use for each subtask.
    # Pass in the desired fields to fetch for each recipient.
    item_generator = _generate_items_for_subtask(
        item_queryset,
        item_fields,
        total_num_items,
296
        total_num_subtasks,
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
        items_per_query,
        items_per_task
    )

    # Now create the subtasks, and start them running.
    TASK_LOG.info("Task %s: creating %s subtasks to process %s items.",
             task_id, total_num_subtasks, total_num_items)
    num_subtasks = 0
    for item_list in item_generator:
        subtask_id = subtask_id_list[num_subtasks]
        num_subtasks += 1
        subtask_status = SubtaskStatus.create(subtask_id)
        new_subtask = create_subtask_fcn(item_list, subtask_status)
        new_subtask.apply_async()

312
    # Subtasks have been queued so no exceptions should be raised after this point.
313 314 315 316 317

    # Return the task progress as stored in the InstructorTask object.
    return progress


318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
def _acquire_subtask_lock(task_id):
    """
    Mark the specified task_id as being in progress.

    This is used to make sure that the same task is not worked on by more than one worker
    at the same time.  This can occur when tasks are requeued by Celery in response to
    loss of connection to the task broker.  Most of the time, such duplicate tasks are
    run sequentially, but they can overlap in processing as well.

    Returns true if the task_id was not already locked; false if it was.
    """
    # cache.add fails if the key already exists
    key = "subtask-{}".format(task_id)
    succeeded = cache.add(key, 'true', SUBTASK_LOCK_EXPIRE)
    if not succeeded:
        TASK_LOG.warning("task_id '%s': already locked.  Contains value '%s'", task_id, cache.get(key))
    return succeeded


def _release_subtask_lock(task_id):
    """
    Unmark the specified task_id as being no longer in progress.

    This is most important to permit a task to be retried.
    """
    # According to Celery task cookbook, "Memcache delete is very slow, but we have
    # to use it to take advantage of using add() for atomic locking."
    key = "subtask-{}".format(task_id)
    cache.delete(key)


349
def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status):
350
    """
351 352 353
    Confirms that the current subtask is known to the InstructorTask and hasn't already been completed.

    Problems can occur when the parent task has been run twice, and results in duplicate
354
    subtasks being created for the same InstructorTask entry.  This maybe happens when Celery
355
    loses its connection to its broker, and any current tasks get requeued.
356

357 358 359
    If a parent task gets requeued, then the same InstructorTask may have a different set of
    subtasks defined (to do the same thing), so the subtasks from the first queuing would not
    be known to the InstructorTask.  We return an exception in this case.
360

361 362
    If a subtask gets requeued, then the first time the subtask runs it should run fine to completion.
    However, we want to prevent it from running again, so we check here to see what the existing
363 364 365
    subtask's status is.  If it is complete, we raise an exception.  We also take a lock on the task,
    so that we can detect if another worker has started work but has not yet completed that work.
    The other worker is allowed to finish, and this raises an exception.
366 367

    Raises a DuplicateTaskException exception if it's not a task that should be run.
368 369 370

    If this succeeds, it requires that update_subtask_status() is called to release the lock on the
    task.
371
    """
372
    # Confirm that the InstructorTask actually defines subtasks.
373 374
    entry = InstructorTask.objects.get(pk=entry_id)
    if len(entry.subtasks) == 0:
375 376
        format_str = "Unexpected task_id '{}': unable to find subtasks of instructor task '{}': rejecting task {}"
        msg = format_str.format(current_task_id, entry, new_subtask_status)
377
        TASK_LOG.warning(msg)
378
        dog_stats_api.increment('instructor_task.subtask.duplicate.nosubtasks', tags=[_statsd_tag(entry.course_id)])
379
        raise DuplicateTaskException(msg)
380

381
    # Confirm that the InstructorTask knows about this particular subtask.
382 383 384
    subtask_dict = json.loads(entry.subtasks)
    subtask_status_info = subtask_dict['status']
    if current_task_id not in subtask_status_info:
385 386
        format_str = "Unexpected task_id '{}': unable to find status for subtask of instructor task '{}': rejecting task {}"
        msg = format_str.format(current_task_id, entry, new_subtask_status)
387
        TASK_LOG.warning(msg)
388
        dog_stats_api.increment('instructor_task.subtask.duplicate.unknown', tags=[_statsd_tag(entry.course_id)])
389 390 391 392
        raise DuplicateTaskException(msg)

    # Confirm that the InstructorTask doesn't think that this subtask has already been
    # performed successfully.
393 394
    subtask_status = SubtaskStatus.from_dict(subtask_status_info[current_task_id])
    subtask_state = subtask_status.state
395
    if subtask_state in READY_STATES:
396 397
        format_str = "Unexpected task_id '{}': already completed - status {} for subtask of instructor task '{}': rejecting task {}"
        msg = format_str.format(current_task_id, subtask_status, entry, new_subtask_status)
398
        TASK_LOG.warning(msg)
399
        dog_stats_api.increment('instructor_task.subtask.duplicate.completed', tags=[_statsd_tag(entry.course_id)])
400
        raise DuplicateTaskException(msg)
401

402 403 404 405 406
    # Confirm that the InstructorTask doesn't think that this subtask is already being
    # retried by another task.
    if subtask_state == RETRY:
        # Check to see if the input number of retries is less than the recorded number.
        # If so, then this is an earlier version of the task, and a duplicate.
407 408
        new_retry_count = new_subtask_status.get_retry_count()
        current_retry_count = subtask_status.get_retry_count()
409 410 411 412
        if new_retry_count < current_retry_count:
            format_str = "Unexpected task_id '{}': already retried - status {} for subtask of instructor task '{}': rejecting task {}"
            msg = format_str.format(current_task_id, subtask_status, entry, new_subtask_status)
            TASK_LOG.warning(msg)
413
            dog_stats_api.increment('instructor_task.subtask.duplicate.retried', tags=[_statsd_tag(entry.course_id)])
414 415
            raise DuplicateTaskException(msg)

416 417 418 419 420 421 422
    # Now we are ready to start working on this.  Try to lock it.
    # If it fails, then it means that another worker is already in the
    # middle of working on this.
    if not _acquire_subtask_lock(current_task_id):
        format_str = "Unexpected task_id '{}': already being executed - for subtask of instructor task '{}'"
        msg = format_str.format(current_task_id, entry)
        TASK_LOG.warning(msg)
423
        dog_stats_api.increment('instructor_task.subtask.duplicate.locked', tags=[_statsd_tag(entry.course_id)])
424 425
        raise DuplicateTaskException(msg)

426

427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
def update_subtask_status(entry_id, current_task_id, new_subtask_status, retry_count=0):
    """
    Update the status of the subtask in the parent InstructorTask object tracking its progress.

    Because select_for_update is used to lock the InstructorTask object while it is being updated,
    multiple subtasks updating at the same time may time out while waiting for the lock.
    The actual update operation is surrounded by a try/except/else that permits the update to be
    retried if the transaction times out.

    The subtask lock acquired in the call to check_subtask_is_valid() is released here, only when
    the attempting of retries has concluded.
    """
    try:
        _update_subtask_status(entry_id, current_task_id, new_subtask_status)
    except DatabaseError:
        # If we fail, try again recursively.
        retry_count += 1
        if retry_count < MAX_DATABASE_LOCK_RETRIES:
            TASK_LOG.info("Retrying to update status for subtask %s of instructor task %d with status %s:  retry %d",
                          current_task_id, entry_id, new_subtask_status, retry_count)
            dog_stats_api.increment('instructor_task.subtask.retry_after_failed_update')
            update_subtask_status(entry_id, current_task_id, new_subtask_status, retry_count)
        else:
            TASK_LOG.info("Failed to update status after %d retries for subtask %s of instructor task %d with status %s",
                          retry_count, current_task_id, entry_id, new_subtask_status)
            dog_stats_api.increment('instructor_task.subtask.failed_after_update_retries')
            raise
    finally:
        # Only release the lock on the subtask when we're done trying to update it.
        # Note that this will be called each time a recursive call to update_subtask_status()
        # returns.  Fortunately, it's okay to release a lock that has already been released.
        _release_subtask_lock(current_task_id)


461
@transaction.commit_manually
462
def _update_subtask_status(entry_id, current_task_id, new_subtask_status):
463 464
    """
    Update the status of the subtask in the parent InstructorTask object tracking its progress.
465 466 467 468 469 470

    Uses select_for_update to lock the InstructorTask object while it is being updated.
    The operation is surrounded by a try/except/else that permit the manual transaction to be
    committed on completion, or rolled back on error.

    The InstructorTask's "task_output" field is updated.  This is a JSON-serialized dict.
471
    Accumulates values for 'attempted', 'succeeded', 'failed', 'skipped' from `new_subtask_status`
472
    into the corresponding values in the InstructorTask's task_output.  Also updates the 'duration_ms'
473 474 475
    value with the current interval since the original InstructorTask started.  Note that this
    value is only approximate, since the subtask may be running on a different server than the
    original task, so is subject to clock skew.
476 477 478 479 480 481 482 483 484 485

    The InstructorTask's "subtasks" field is also updated.  This is also a JSON-serialized dict.
    Keys include 'total', 'succeeded', 'retried', 'failed', which are counters for the number of
    subtasks.  'Total' is expected to have been set at the time the subtasks were created.
    The other three counters are incremented depending on the value of `status`.  Once the counters
    for 'succeeded' and 'failed' match the 'total', the subtasks are done and the InstructorTask's
    "status" is changed to SUCCESS.

    The "subtasks" field also contains a 'status' key, that contains a dict that stores status
    information for each subtask.  At the moment, the value for each subtask (keyed by its task_id)
486 487
    is the value of the SubtaskStatus.to_dict(), but could be expanded in future to store information
    about failure messages, progress made, etc.
488
    """
489
    TASK_LOG.info("Preparing to update status for subtask %s for instructor task %d with status %s",
490
                  current_task_id, entry_id, new_subtask_status)
491 492 493 494

    try:
        entry = InstructorTask.objects.select_for_update().get(pk=entry_id)
        subtask_dict = json.loads(entry.subtasks)
495 496
        subtask_status_info = subtask_dict['status']
        if current_task_id not in subtask_status_info:
497
            # unexpected error -- raise an exception
498
            format_str = "Unexpected task_id '{}': unable to update status for subtask of instructor task '{}'"
499
            msg = format_str.format(current_task_id, entry_id)
500
            TASK_LOG.warning(msg)
501
            raise ValueError(msg)
502

503
        # Update status:
504
        subtask_status_info[current_task_id] = new_subtask_status.to_dict()
505

506
        # Update the parent task progress.
507 508 509
        # Set the estimate of duration, but only if it
        # increases.  Clock skew between time() returned by different machines
        # may result in non-monotonic values for duration.
510 511
        task_progress = json.loads(entry.task_output)
        start_time = task_progress['start_time']
512 513 514 515 516 517 518 519
        prev_duration = task_progress['duration_ms']
        new_duration = int((time() - start_time) * 1000)
        task_progress['duration_ms'] = max(prev_duration, new_duration)

        # Update counts only when subtask is done.
        # In future, we can make this more responsive by updating status
        # between retries, by comparing counts that change from previous
        # retry.
520
        new_state = new_subtask_status.state
521
        if new_subtask_status is not None and new_state in READY_STATES:
522
            for statname in ['attempted', 'succeeded', 'failed', 'skipped']:
523
                task_progress[statname] += getattr(new_subtask_status, statname)
524 525 526

        # Figure out if we're actually done (i.e. this is the last task to complete).
        # This is easier if we just maintain a counter, rather than scanning the
527 528
        # entire new_subtask_status dict.
        if new_state == SUCCESS:
529
            subtask_dict['succeeded'] += 1
530
        elif new_state in READY_STATES:
531 532
            subtask_dict['failed'] += 1
        num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed']
533 534 535 536 537

        # If we're done with the last task, update the parent status to indicate that.
        # At present, we mark the task as having succeeded.  In future, we should see
        # if there was a catastrophic failure that occurred, and figure out how to
        # report that here.
538 539 540 541 542
        if num_remaining <= 0:
            entry.task_state = SUCCESS
        entry.subtasks = json.dumps(subtask_dict)
        entry.task_output = InstructorTask.create_output_for_success(task_progress)

543
        TASK_LOG.debug("about to save....")
544
        entry.save()
545 546
        TASK_LOG.info("Task output updated to %s for subtask %s of instructor task %d",
                      entry.task_output, current_task_id, entry_id)
547 548
    except Exception:
        TASK_LOG.exception("Unexpected error while updating InstructorTask.")
549
        transaction.rollback()
550
        dog_stats_api.increment('instructor_task.subtask.update_exception')
551
        raise
552
    else:
553
        TASK_LOG.debug("about to commit....")
554
        transaction.commit()
555 556 557 558 559 560 561 562


def _statsd_tag(course_id):
    """
    Calculate the tag we will use for DataDog.
    """
    tag = unicode(course_id).encode('utf-8')
    return tag[:200]