Commit 4d230d62 by Calen Pennington

Move the core of the bin enumeration code into resolvers.py

parent 2e4e479f
......@@ -4,20 +4,24 @@ import logging
from django.conf import settings
from django.contrib.auth.models import User
from django.contrib.sites.models import Site
from django.contrib.staticfiles.templatetags.staticfiles import static
from django.core.urlresolvers import reverse
from django.db.models import F, Min, Q
from django.utils.formats import dateformat, get_format
from edx_ace.recipient_resolver import RecipientResolver
from edx_ace.utils.date import serialize
from edx_ace.recipient import Recipient
from edx_ace.utils.date import serialize, deserialize
from courseware.date_summary import verified_upgrade_deadline_link, verified_upgrade_link_is_valid
from openedx.core.djangoapps.monitoring_utils import set_custom_metric, function_trace
from openedx.core.djangoapps.schedules.config import COURSE_UPDATE_WAFFLE_FLAG
from openedx.core.djangoapps.schedules.exceptions import CourseUpdateDoesNotExist
from openedx.core.djangoapps.schedules.models import Schedule, ScheduleConfig
from openedx.core.djangoapps.schedules.message_type import ScheduleMessageType
from openedx.core.djangoapps.schedules.utils import PrefixedDebugLoggerMixin
from openedx.core.djangoapps.schedules.template_context import (
absolute_url,
......@@ -186,6 +190,12 @@ def get_schedules_with_target_date_by_bin_and_orgs(schedule_date_field, current_
return schedules
class RecurringNudge(ScheduleMessageType):
def __init__(self, day, *args, **kwargs):
super(RecurringNudge, self).__init__(*args, **kwargs)
self.name = "recurringnudge_day{}".format(day)
class ScheduleStartResolver(BinnedSchedulesBaseResolver):
"""
Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset``.
......@@ -198,6 +208,56 @@ class ScheduleStartResolver(BinnedSchedulesBaseResolver):
self.log_prefix = 'Scheduled Nudge'
def _annotate_for_monitoring(message_type, site, bin_num, target_day_str, day_offset):
# This identifies the type of message being sent, for example: schedules.recurring_nudge3.
set_custom_metric('message_name', '{0}.{1}'.format(
message_type.app_label, message_type.name))
# The domain name of the site we are sending the message for.
set_custom_metric('site', site.domain)
# This is the "bin" of data being processed. We divide up the work into chunks so that we don't tie up celery
# workers for too long. This could help us identify particular bins that are problematic.
set_custom_metric('bin', bin_num)
# The date we are processing data for.
set_custom_metric('target_day', target_day_str)
# The number of days relative to the current date to process data for.
set_custom_metric('day_offset', day_offset)
# A unique identifier for this batch of messages being sent.
set_custom_metric('send_uuid', message_type.uuid)
def recurring_nudge_schedule_bin(
async_send_task, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
target_datetime = deserialize(target_day_str)
# TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here
current_datetime = target_datetime - datetime.timedelta(days=day_offset)
msg_type = RecurringNudge(abs(day_offset))
site = Site.objects.get(id=site_id)
_annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset)
for (user, language, context) in _recurring_nudge_schedules_for_bin(
site,
current_datetime,
target_datetime,
bin_num,
org_list,
exclude_orgs
):
msg = msg_type.personalize(
Recipient(
user.username,
override_recipient_email or user.email,
),
language,
context,
)
with function_trace('enqueue_send_task'):
async_send_task.apply_async(
(site_id, str(msg)), retry=False)
def _recurring_nudge_schedules_for_bin(site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False):
schedules = get_schedules_with_target_date_by_bin_and_orgs(
......@@ -241,6 +301,9 @@ def _get_datetime_beginning_of_day(dt):
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
class UpgradeReminder(ScheduleMessageType):
pass
class UpgradeReminderResolver(BinnedSchedulesBaseResolver):
"""
Send a message to all users whose verified upgrade deadline is at ``self.current_date`` + ``day_offset``.
......@@ -253,6 +316,38 @@ class UpgradeReminderResolver(BinnedSchedulesBaseResolver):
self.log_prefix = 'Upgrade Reminder'
def upgrade_reminder_schedule_bin(
async_send_task, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
target_datetime = deserialize(target_day_str)
# TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here
current_datetime = target_datetime - datetime.timedelta(days=day_offset)
msg_type = UpgradeReminder()
site = Site.objects.get(id=site_id)
_annotate_for_monitoring(msg_type, site, bin_num,target_day_str, day_offset)
for (user, language, context) in _upgrade_reminder_schedules_for_bin(
site,
current_datetime,
target_datetime,
bin_num,
org_list,
exclude_orgs
):
msg = msg_type.personalize(
Recipient(
user.username,
override_recipient_email or user.email,
),
language,
context,
)
with function_trace('enqueue_send_task'):
async_send_task.apply_async(
(site_id, str(msg)), retry=False)
def _upgrade_reminder_schedules_for_bin(site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False):
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='upgrade_deadline',
......@@ -320,6 +415,10 @@ def _get_link_to_purchase_verified_certificate(a_user, a_schedule):
return verified_upgrade_deadline_link(a_user, enrollment.course)
class CourseUpdate(ScheduleMessageType):
pass
class CourseUpdateResolver(BinnedSchedulesBaseResolver):
"""
Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset`` and the
......@@ -333,6 +432,40 @@ class CourseUpdateResolver(BinnedSchedulesBaseResolver):
self.log_prefix = 'Course Update'
def course_update_schedule_bin(
async_send_task, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
target_datetime = deserialize(target_day_str)
# TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here
current_datetime = target_datetime - datetime.timedelta(days=day_offset)
msg_type = CourseUpdate()
site = Site.objects.get(id=site_id)
_annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset)
for (user, language, context) in _course_update_schedules_for_bin(
site,
current_datetime,
target_datetime,
day_offset,
bin_num,
org_list,
exclude_orgs
):
msg = msg_type.personalize(
Recipient(
user.username,
override_recipient_email or user.email,
),
language,
context,
)
with function_trace('enqueue_send_task'):
async_send_task.apply_async(
(site_id, str(msg)), retry=False)
def _course_update_schedules_for_bin(site, current_datetime, target_datetime, day_offset, bin_num, org_list,
exclude_orgs=False):
week_num = abs(day_offset) / 7
......
......@@ -16,10 +16,9 @@ from edx_ace.recipient import Recipient
from edx_ace.utils.date import deserialize
from opaque_keys.edx.keys import CourseKey
from openedx.core.djangoapps.monitoring_utils import set_custom_metric, function_trace
from openedx.core.djangoapps.monitoring_utils import set_custom_metric
from openedx.core.djangoapps.schedules.models import Schedule, ScheduleConfig
from openedx.core.djangoapps.schedules.message_type import ScheduleMessageType
from openedx.core.djangoapps.schedules import resolvers
......@@ -50,12 +49,6 @@ def update_course_schedules(self, **kwargs):
raise self.retry(kwargs=kwargs, exc=exc)
class RecurringNudge(ScheduleMessageType):
def __init__(self, day, *args, **kwargs):
super(RecurringNudge, self).__init__(*args, **kwargs)
self.name = "recurringnudge_day{}".format(day)
@task(ignore_result=True, routing_key=ROUTING_KEY)
def _recurring_nudge_schedule_send(site_id, msg_str):
site = Site.objects.get(pk=site_id)
......@@ -76,85 +69,32 @@ def _recurring_nudge_schedule_send(site_id, msg_str):
def recurring_nudge_schedule_bin(
site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
target_datetime = deserialize(target_day_str)
# TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here
current_datetime = target_datetime - datetime.timedelta(days=day_offset)
msg_type = RecurringNudge(abs(day_offset))
site = Site.objects.get(id=site_id)
_annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset)
for (user, language, context) in resolvers._recurring_nudge_schedules_for_bin(
site,
current_datetime,
target_datetime,
return resolvers.recurring_nudge_schedule_bin(
_recurring_nudge_schedule_send,
site_id,
target_day_str,
day_offset,
bin_num,
org_list,
exclude_orgs
):
msg = msg_type.personalize(
Recipient(
user.username,
override_recipient_email or user.email,
),
language,
context,
)
with function_trace('enqueue_send_task'):
_recurring_nudge_schedule_send.apply_async((site_id, str(msg)), retry=False)
def _annotate_for_monitoring(message_type, site, bin_num, target_day_str, day_offset):
# This identifies the type of message being sent, for example: schedules.recurring_nudge3.
set_custom_metric('message_name', '{0}.{1}'.format(message_type.app_label, message_type.name))
# The domain name of the site we are sending the message for.
set_custom_metric('site', site.domain)
# This is the "bin" of data being processed. We divide up the work into chunks so that we don't tie up celery
# workers for too long. This could help us identify particular bins that are problematic.
set_custom_metric('bin', bin_num)
# The date we are processing data for.
set_custom_metric('target_day', target_day_str)
# The number of days relative to the current date to process data for.
set_custom_metric('day_offset', day_offset)
# A unique identifier for this batch of messages being sent.
set_custom_metric('send_uuid', message_type.uuid)
class UpgradeReminder(ScheduleMessageType):
pass
exclude_orgs=exclude_orgs,
override_recipient_email=override_recipient_email,
)
@task(ignore_result=True, routing_key=ROUTING_KEY)
def upgrade_reminder_schedule_bin(
site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
target_datetime = deserialize(target_day_str)
# TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here
current_datetime = target_datetime - datetime.timedelta(days=day_offset)
msg_type = UpgradeReminder()
site = Site.objects.get(id=site_id)
_annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset)
for (user, language, context) in resolvers._upgrade_reminder_schedules_for_bin(
site,
current_datetime,
target_datetime,
return resolvers.upgrade_reminder_schedule_bin(
_upgrade_reminder_schedule_send,
site_id,
target_day_str,
day_offset,
bin_num,
org_list,
exclude_orgs
):
msg = msg_type.personalize(
Recipient(
user.username,
override_recipient_email or user.email,
),
language,
context,
)
with function_trace('enqueue_send_task'):
_upgrade_reminder_schedule_send.apply_async((site_id, str(msg)), retry=False)
exclude_orgs=exclude_orgs,
override_recipient_email=override_recipient_email,
)
@task(ignore_result=True, routing_key=ROUTING_KEY)
def _upgrade_reminder_schedule_send(site_id, msg_str):
......@@ -166,41 +106,20 @@ def _upgrade_reminder_schedule_send(site_id, msg_str):
ace.send(msg)
class CourseUpdate(ScheduleMessageType):
pass
@task(ignore_result=True, routing_key=ROUTING_KEY)
def course_update_schedule_bin(
site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
target_datetime = deserialize(target_day_str)
# TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here
current_datetime = target_datetime - datetime.timedelta(days=day_offset)
msg_type = CourseUpdate()
site = Site.objects.get(id=site_id)
_annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset)
for (user, language, context) in resolvers._course_update_schedules_for_bin(
site,
current_datetime,
target_datetime,
return resolvers.course_update_schedule_bin(
_course_update_schedule_send,
site_id,
target_day_str,
day_offset,
bin_num,
org_list,
exclude_orgs
):
msg = msg_type.personalize(
Recipient(
user.username,
override_recipient_email or user.email,
),
language,
context,
)
with function_trace('enqueue_send_task'):
_course_update_schedule_send.apply_async((site_id, str(msg)), retry=False)
exclude_orgs=exclude_orgs,
override_recipient_email=override_recipient_email,
)
@task(ignore_result=True, routing_key=ROUTING_KEY)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment