""" Workflow models are intended to track which step the student is in during the assessment process. The submission state is not explicitly tracked because the assessment workflow only begins after a submission has been created. NOTE: We've switched to migrations, so if you make any edits to this file, you need to then generate a matching migration for it using: ./manage.py schemamigration openassessment.workflow --auto """ import logging import importlib from django.conf import settings from django.db import models, transaction, DatabaseError from django.dispatch import receiver from django_extensions.db.fields import UUIDField from django.utils.timezone import now from model_utils import Choices from model_utils.models import StatusModel, TimeStampedModel from submissions import api as sub_api from openassessment.assessment.signals import assessment_complete_signal from .errors import AssessmentApiLoadError, AssessmentWorkflowError, AssessmentWorkflowInternalError logger = logging.getLogger('openassessment.workflow.models') # To encapsulate the workflow API from the assessment API, # we use dependency injection. The Django settings define # a dictionary mapping assessment step names to the Python module path # that implements the corresponding assessment API. # For backwards compatibility, we provide a default configuration as well DEFAULT_ASSESSMENT_API_DICT = { 'peer': 'openassessment.assessment.api.peer', 'self': 'openassessment.assessment.api.self', 'training': 'openassessment.assessment.api.student_training', 'ai': 'openassessment.assessment.api.ai', } ASSESSMENT_API_DICT = getattr( settings, 'ORA2_ASSESSMENTS', DEFAULT_ASSESSMENT_API_DICT ) class AssessmentWorkflow(TimeStampedModel, StatusModel): """Tracks the open-ended assessment status of a student submission. It's important to note that although we track the status as an explicit field here, it is not the canonical status. This is because the determination of what we need to do in order to be "done" is specified by the OpenAssessmentBlock problem definition and can change. So every time we are asked where the student is, we have to query the peer, self, and later other assessment APIs with the latest requirements (e.g. "number of submissions you have to assess = 5"). The "status" field on this model is an after the fact recording of the last known state of that information so we can search easily. """ STEPS = ASSESSMENT_API_DICT.keys() STATUSES = [ "waiting", # User has done all necessary assessment but hasn't been # graded yet -- we're waiting for assessments of their # submission by others. "done", # Complete "cancelled" # User submission has been cancelled. ] STATUS_VALUES = STEPS + STATUSES STATUS = Choices(*STATUS_VALUES) # implicit "status" field # For now, we use a simple scoring mechanism: # Once a student has completed all assessments, # we search assessment APIs # in priority order until one of the APIs provides a score. # We then use that score as the student's overall score. # This Django setting is a list of assessment steps (defined in `settings.ORA2_ASSESSMENTS`) # in descending priority order. DEFAULT_ASSESSMENT_SCORE_PRIORITY = ['peer', 'self', 'ai'] ASSESSMENT_SCORE_PRIORITY = getattr( settings, 'ORA2_ASSESSMENT_SCORE_PRIORITY', DEFAULT_ASSESSMENT_SCORE_PRIORITY ) submission_uuid = models.CharField(max_length=36, db_index=True, unique=True) uuid = UUIDField(version=1, db_index=True, unique=True) # These values are used to find workflows for a particular item # in a course without needing to look up the submissions for that item. # Because submissions are immutable, we can safely duplicate the values # here without violating data integrity. course_id = models.CharField(max_length=255, blank=False, db_index=True) item_id = models.CharField(max_length=255, blank=False, db_index=True) class Meta: ordering = ["-created"] # TODO: In migration, need a non-unique index on (course_id, item_id, status) def __init__(self, *args, **kwargs): super(AssessmentWorkflow, self).__init__(*args, **kwargs) if 'staff' not in AssessmentWorkflow.STEPS: new_list = ['staff'] new_list.extend(AssessmentWorkflow.STEPS) AssessmentWorkflow.STEPS = new_list AssessmentWorkflow.STATUS_VALUES = AssessmentWorkflow.STEPS + AssessmentWorkflow.STATUSES AssessmentWorkflow.STATUS = Choices(*AssessmentWorkflow.STATUS_VALUES) if 'staff' not in AssessmentWorkflow.ASSESSMENT_SCORE_PRIORITY: new_list = ['staff'] new_list.extend(AssessmentWorkflow.ASSESSMENT_SCORE_PRIORITY) AssessmentWorkflow.ASSESSMENT_SCORE_PRIORITY = new_list @classmethod @transaction.atomic def start_workflow(cls, submission_uuid, step_names, on_init_params): """ Start a new workflow. Args: submission_uuid (str): The UUID of the submission associated with this workflow. step_names (list): The names of the assessment steps in the workflow. on_init_params (dict): The parameters to pass to each assessment module on init. Keys are the assessment step names. Returns: AssessmentWorkflow Raises: SubmissionNotFoundError SubmissionRequestError SubmissionInternalError DatabaseError Assessment-module specific errors """ submission_dict = sub_api.get_submission_and_student(submission_uuid) staff_auto_added = False if 'staff' not in step_names: staff_auto_added = True new_list = ['staff'] new_list.extend(step_names) step_names = new_list # Create the workflow and step models in the database # For now, set the status to waiting; we'll modify it later # based on the first step in the workflow. workflow = cls.objects.create( submission_uuid=submission_uuid, status=AssessmentWorkflow.STATUS.waiting, course_id=submission_dict['student_item']['course_id'], item_id=submission_dict['student_item']['item_id'] ) workflow_steps = [ AssessmentWorkflowStep( workflow=workflow, name=step, order_num=i ) for i, step in enumerate(step_names) ] workflow.steps.add(*workflow_steps) # Initialize the assessment APIs has_started_first_step = False for step in workflow_steps: api = step.api() if api is not None: # Initialize the assessment module # We do this for every assessment module on_init_func = getattr(api, 'on_init', lambda submission_uuid, **params: None) on_init_func(submission_uuid, **on_init_params.get(step.name, {})) # If we auto-added a staff step, it is optional and should be marked complete immediately if step.name == "staff" and staff_auto_added: step.assessment_completed_at=now() step.save() # For the first valid step, update the workflow status # and notify the assessment module that it's being started if not has_started_first_step: # Update the workflow workflow.status = step.name workflow.save() # Notify the assessment module that it's being started on_start_func = getattr(api, 'on_start', lambda submission_uuid: None) on_start_func(submission_uuid) # Remember that we've already started the first step has_started_first_step = True # Update the workflow (in case some of the assessment modules are automatically complete) # We do NOT pass in requirements, on the assumption that any assessment module # that accepts requirements would NOT automatically complete. workflow.update_from_assessments(None) # Return the newly created workflow return workflow @property def score(self): """Latest score for the submission we're tracking. Returns: score (dict): The latest score for this workflow, or None if the workflow is incomplete. """ score = None if self.status == self.STATUS.done: score = sub_api.get_latest_score_for_submission(self.submission_uuid) return score def status_details(self): status_dict = {} steps = self._get_steps() for step in steps: status_dict[step.name] = { "complete": step.is_submitter_complete(), "graded": step.is_assessment_complete(), } return status_dict def get_score(self, assessment_requirements, step_for_name): """Iterate through the assessment APIs in priority order and return the first reported score. Args: assessment_requirements (dict): Dictionary passed to the assessment API. This defines the requirements for each assessment step; the APIs can refer to this to decide whether the requirements have been met. Note that the requirements could change if the author updates the problem definition. step_for_name (dict): a key value pair for step name: step Returns: score dict. """ score = None for assessment_step_name in self.ASSESSMENT_SCORE_PRIORITY: # Check if the problem contains this assessment type assessment_step = step_for_name.get(assessment_step_name) # Query the corresponding assessment API for a score # If we find one, then stop looking if assessment_step is not None: # Check if the assessment API defines a score function at all get_score_func = getattr(assessment_step.api(), 'get_score', None) if get_score_func is not None: if assessment_requirements is None: step_requirements = None else: step_requirements = assessment_requirements.get(assessment_step_name, {}) score = get_score_func(self.submission_uuid, step_requirements) if assessment_step_name == self.STATUS.staff and score == None: if step_requirements and step_requirements.get('required', False): break # A staff score was not found, and one is required. Return None continue # A staff score was not found, but it is not required, so try the next type of score break return score def update_from_assessments(self, assessment_requirements): """Query assessment APIs and change our status if appropriate. If the status is done, we do nothing. Once something is done, we never move back to any other status. If an assessment API says that our submitter's requirements are met, then move to the next assessment. For example, in peer assessment, if the submitter we're tracking has assessed the required number of submissions, they're allowed to continue. If the submitter has finished all the assessments, then we change their status to `waiting`. If we're in the `waiting` status, and an assessment API says it can score this submission, then we record the score in the submissions API and move our `status` to `done`. By convention, if `assessment_requirements` is `None`, then assessment modules that need requirements should automatically say that they're incomplete. This allows us to update the workflow even when we don't know the current state of the problem. For example, if we're updating the workflow at the completion of an asynchronous call, we won't necessarily know the current state of the problem, but we would still want to update assessments that don't have any requirements. Args: assessment_requirements (dict): Dictionary passed to the assessment API. This defines the requirements for each assessment step; the APIs can refer to this to decide whether the requirements have been met. Note that the requirements could change if the author updates the problem definition. """ if self.status == self.STATUS.cancelled: return # Update our AssessmentWorkflowStep models with the latest from our APIs steps = self._get_steps() step_for_name = {step.name: step for step in steps} new_staff_score = self.get_score(assessment_requirements, {'staff': step_for_name.get('staff', None)}) if new_staff_score: # new_staff_score is just the most recent staff score, it may already be recorded in sub_api old_score = sub_api.get_latest_score_for_submission(self.submission_uuid) if ( not old_score or # There is no recorded score not old_score.get('staff_id') or # The recorded score is not a staff score old_score['points_earned'] != new_staff_score['points_earned'] # Previous staff score doesn't match ): # Set the staff score using submissions api, and log that fact self.set_staff_score(new_staff_score) self.save() logger.info(( u"Workflow for submission UUID {uuid} has updated score using staff assessment." ).format(uuid=self.submission_uuid)) # Update the assessment_completed_at field for all steps # All steps are considered "assessment complete", as the staff score will override all for step in steps: step.assessment_completed_at=now() step.save() if self.status == self.STATUS.done: return # Go through each step and update its status. for step in steps: step.update(self.submission_uuid, assessment_requirements) # Fetch name of the first step that the submitter hasn't yet completed. new_status = next( (step.name for step in steps if step.submitter_completed_at is None), self.STATUS.waiting # if nothing's left to complete, we're waiting ) # If the submitter is beginning the next assessment, notify the # appropriate assessment API. new_step = step_for_name.get(new_status) if new_step is not None: on_start_func = getattr(new_step.api(), 'on_start', None) if on_start_func is not None: on_start_func(self.submission_uuid) # If the submitter has done all they need to do, let's check to see if # all steps have been fully assessed (i.e. we can score it). if (new_status == self.STATUS.waiting and all(step.assessment_completed_at for step in steps)): score = self.get_score(assessment_requirements, step_for_name) # If we found a score, then we're done if score is not None: # Only set the score if it's not a staff score, in which case it will have already been set above if score.get("staff_id") is None: self.set_score(score) new_status = self.STATUS.done # Finally save our changes if the status has changed if self.status != new_status: self.status = new_status self.save() logger.info(( u"Workflow for submission UUID {uuid} has updated status to {status}" ).format(uuid=self.submission_uuid, status=new_status)) def _get_steps(self): """ Simple helper function for retrieving all the steps in the given Workflow. """ # A staff step must always be available, to allow for staff overrides try: self.steps.get(name=self.STATUS.staff) except AssessmentWorkflowStep.DoesNotExist: for step in list(self.steps.all()): step.order_num += 1 self.steps.add( AssessmentWorkflowStep( name=self.STATUS.staff, order_num=0, assessment_completed_at=now(), ) ) # Do not return steps that are not recognized in the AssessmentWorkflow. steps = list(self.steps.filter(name__in=AssessmentWorkflow.STEPS)) if not steps: # If no steps exist for this AssessmentWorkflow, assume # peer -> self for backwards compatibility, with an optional staff override self.steps.add( AssessmentWorkflowStep(name=self.STATUS.staff, order_num=0, assessment_completed_at=now()), AssessmentWorkflowStep(name=self.STATUS.peer, order_num=1), AssessmentWorkflowStep(name=self.STATUS.self, order_num=2) ) steps = list(self.steps.all()) return steps def set_staff_score(self, score, is_override=False, reason=None): """ Set a staff score for the workflow. Allows for staff scores to be set on a submission, with annotations to provide an audit trail if needed. This method can be used for both required staff grading, and staff overrides. Args: score (dict): A dict containing 'points_earned', 'points_possible', and 'staff_id'. is_override (bool): Optionally True if staff is overriding a previous score. reason (string): An optional parameter specifying the reason for the staff grade. A default value will be used in the event that this parameter is not provided. """ annotation_type = "staff_defined" if reason is None: reason = "A staff member has defined the score for this submission" sub_dict = sub_api.get_submission_and_student(self.submission_uuid) sub_api.reset_score( sub_dict['student_item']['student_id'], self.course_id, self.item_id ) sub_api.set_score( self.submission_uuid, score["points_earned"], score["points_possible"], annotation_creator = score["staff_id"], annotation_type = annotation_type, annotation_reason = reason ) def set_score(self, score): """ Set a score for the workflow. Scores are persisted via the Submissions API, separate from the Workflow Data. Score is associated with the same submission_uuid as this workflow Args: score (dict): A dict containing 'points_earned' and 'points_possible'. """ if not self.staff_score_exists(): sub_api.set_score( self.submission_uuid, score["points_earned"], score["points_possible"] ) def staff_score_exists(self): """ Check if a staff score exists for this submission. """ steps = self._get_steps() step_for_name = {step.name: step for step in steps} staff_step = step_for_name.get("staff") if staff_step is not None: get_latest_func = getattr(staff_step.api(), 'get_latest_assessment', None) if get_latest_func is not None: staff_assessment = get_latest_func(self.submission_uuid) if staff_assessment is not None: return True return False def cancel(self, assessment_requirements): """ Cancel workflow for all steps. Set the points earned to 0 and workflow status to cancelled. Args: assessment_requirements (dict): Dictionary that currently looks like: `{"peer": {"must_grade": <int>, "must_be_graded_by": <int>}}` `must_grade` is the number of assessments a student must complete. `must_be_graded_by` is the number of assessments a submission must receive to be scored. `must_grade` should be greater than `must_be_graded_by` to ensure that everyone will get scored. The intention is to eventually pass in more assessment sequence specific requirements in this dict. """ steps = self._get_steps() step_for_name = {step.name: step for step in steps} # Cancel the workflow for each step. for step in steps: on_cancel_func = getattr(step.api(), 'on_cancel', None) if on_cancel_func is not None: on_cancel_func(self.submission_uuid) score = self.get_score(assessment_requirements, step_for_name) # Set the points_earned to 0. if score is not None: score['points_earned'] = 0 self.set_score(score) # Save status if it is not cancelled. if self.status != self.STATUS.cancelled: self.status = self.STATUS.cancelled self.save() logger.info( u"Workflow for submission UUID {uuid} has updated status to {status}".format( uuid=self.submission_uuid, status=self.STATUS.cancelled ) ) @classmethod def cancel_workflow(cls, submission_uuid, comments, cancelled_by_id, assessment_requirements): """ Add an entry in AssessmentWorkflowCancellation table for a AssessmentWorkflow. AssessmentWorkflow which has been cancelled is no longer included in the peer grading pool. Args: submission_uuid (str): The UUID of the workflow's submission. comments (str): The reason for cancellation. cancelled_by_id (str): The ID of the user who cancelled the peer workflow. assessment_requirements (dict): Dictionary that currently looks like: `{"peer": {"must_grade": <int>, "must_be_graded_by": <int>}}` `must_grade` is the number of assessments a student must complete. `must_be_graded_by` is the number of assessments a submission must receive to be scored. `must_grade` should be greater than `must_be_graded_by` to ensure that everyone will get scored. The intention is to eventually pass in more assessment sequence specific requirements in this dict. """ try: workflow = cls.objects.get(submission_uuid=submission_uuid) AssessmentWorkflowCancellation.create(workflow=workflow, comments=comments, cancelled_by_id=cancelled_by_id) # Cancel the related step's workflow. workflow.cancel(assessment_requirements) except (cls.DoesNotExist, cls.MultipleObjectsReturned): error_message = u"Error finding workflow for submission UUID {}.".format(submission_uuid) logger.exception(error_message) raise AssessmentWorkflowError(error_message) except DatabaseError: error_message = u"Error creating assessment workflow cancellation for submission UUID {}.".format( submission_uuid) logger.exception(error_message) raise AssessmentWorkflowInternalError(error_message) @classmethod def get_by_submission_uuid(cls, submission_uuid): """ Retrieve the Assessment Workflow associated with the given submission UUID. Args: submission_uuid (str): The string representation of the UUID belonging to the associated Assessment Workflow. Returns: workflow (AssessmentWorkflow): The most recent assessment workflow associated with this submission UUID. Raises: AssessmentWorkflowError: Thrown when no workflow can be found for the associated submission UUID. This should always exist before a student is allow to request submissions for peer assessment. """ try: return cls.objects.get(submission_uuid=submission_uuid) except cls.DoesNotExist: return None except DatabaseError as exc: message = u"Error finding workflow for submission UUID {} due to error: {}.".format(submission_uuid, exc) logger.exception(message) raise AssessmentWorkflowError(message) @property def is_cancelled(self): """ Check if assessment workflow is cancelled. Returns: True/False """ return self.cancellations.exists() class AssessmentWorkflowStep(models.Model): """An individual step in the overall workflow process. Similar caveats apply to this class as apply to `AssessmentWorkflow`. What we're storing in the database is usually but not always current information. In particular, if the problem definition has changed the requirements for a particular step in the workflow, then what is in the database will be out of sync until someone views this problem again (which will trigger a workflow update to occur). """ workflow = models.ForeignKey(AssessmentWorkflow, related_name="steps") name = models.CharField(max_length=20) submitter_completed_at = models.DateTimeField(default=None, null=True) assessment_completed_at = models.DateTimeField(default=None, null=True) order_num = models.PositiveIntegerField() class Meta: ordering = ["workflow", "order_num"] def is_submitter_complete(self): return self.submitter_completed_at is not None def is_assessment_complete(self): return self.assessment_completed_at is not None def api(self): """ Returns an API associated with this workflow step. If no API is associated with this workflow step, None is returned. This relies on Django settings to map step names to the assessment API implementation. """ # We retrieve the settings in-line here (rather than using the # top-level constant), so that @override_settings will work # in the test suite. api_path = getattr( settings, 'ORA2_ASSESSMENTS', DEFAULT_ASSESSMENT_API_DICT ).get(self.name) # Staff API should always be available if self.name == 'staff' and not api_path: api_path = 'openassessment.assessment.api.staff' if api_path is not None: try: return importlib.import_module(api_path) except (ImportError, ValueError): raise AssessmentApiLoadError(self.name, api_path) else: # It's possible for the database to contain steps for APIs # that are not configured -- for example, if a new assessment # type is added, then the code is rolled back. msg = ( u"No assessment configured for '{name}'. " u"Check the ORA2_ASSESSMENTS Django setting." ).format(name=self.name) logger.warning(msg) return None def update(self, submission_uuid, assessment_requirements): """ Updates the AssessmentWorkflowStep models with the requirements specified from the Workflow API. Intended for internal use by update_from_assessments(). See update_from_assessments() documentation for more details. """ # Once a step is completed, it will not be revisited based on updated requirements. step_changed = False if assessment_requirements is None: step_reqs = None else: step_reqs = assessment_requirements.get(self.name, {}) default_finished = lambda submission_uuid, step_reqs: True submitter_finished = getattr(self.api(), 'submitter_is_finished', default_finished) assessment_finished = getattr(self.api(), 'assessment_is_finished', default_finished) # Has the user completed their obligations for this step? if (not self.is_submitter_complete() and submitter_finished(submission_uuid, step_reqs)): self.submitter_completed_at = now() step_changed = True # Has the step received a score? if (not self.is_assessment_complete() and assessment_finished(submission_uuid, step_reqs)): self.assessment_completed_at = now() step_changed = True if step_changed: self.save() @receiver(assessment_complete_signal) def update_workflow_async(sender, **kwargs): """ Register a receiver for the update workflow signal This allows asynchronous processes to update the workflow Args: sender (object): Not used Keyword Arguments: submission_uuid (str): The UUID of the submission associated with the workflow being updated. Returns: None """ submission_uuid = kwargs.get('submission_uuid') if submission_uuid is None: logger.error("Update workflow signal called without a submission UUID") return try: workflow = AssessmentWorkflow.objects.get(submission_uuid=submission_uuid) workflow.update_from_assessments(None) except AssessmentWorkflow.DoesNotExist: msg = u"Could not retrieve workflow for submission with UUID {}".format(submission_uuid) logger.exception(msg) except DatabaseError: msg = ( u"Database error occurred while updating " u"the workflow for submission UUID {}" ).format(submission_uuid) logger.exception(msg) except: msg = ( u"Unexpected error occurred while updating the workflow " u"for submission UUID {}" ).format(submission_uuid) logger.exception(msg) class AssessmentWorkflowCancellation(models.Model): """Model for tracking cancellations of assessment workflow. It is created when a staff member requests removal of a submission from the peer grading pool. """ workflow = models.ForeignKey(AssessmentWorkflow, related_name='cancellations') comments = models.TextField(max_length=10000) cancelled_by_id = models.CharField(max_length=40, db_index=True) created_at = models.DateTimeField(default=now, db_index=True) class Meta: ordering = ["created_at", "id"] def __repr__(self): return ( "AssessmentWorkflowCancellation(workflow={0.workflow}, " "comments={0.comments}, cancelled_by_id={0.cancelled_by_id}, " "created_at={0.created_at})" ).format(self) def __unicode__(self): return repr(self) @classmethod def create(cls, workflow, comments, cancelled_by_id): """ Create a new AssessmentWorkflowCancellation object. Args: workflow (AssessmentWorkflow): The cancelled workflow. comments (unicode): The reason for cancellation. cancelled_by_id (unicode): The ID of the user who cancelled the workflow. Returns: AssessmentWorkflowCancellation """ cancellation_params = { 'workflow': workflow, 'comments': comments, 'cancelled_by_id': cancelled_by_id, } return cls.objects.create(**cancellation_params) @classmethod def get_latest_workflow_cancellation(cls, submission_uuid): """ Get the latest AssessmentWorkflowCancellation for a submission's workflow. Args: submission_uuid (str): The UUID of the workflow's submission. Returns: AssessmentWorkflowCancellation or None """ workflow_cancellations = cls.objects.filter(workflow__submission_uuid=submission_uuid).order_by("-created_at") return workflow_cancellations[0] if workflow_cancellations.exists() else None