Commit 1accff9b by Calen Pennington

DRY up more of tasks.py code

parent 887191b1
...@@ -13,7 +13,6 @@ from django.utils.formats import dateformat, get_format ...@@ -13,7 +13,6 @@ from django.utils.formats import dateformat, get_format
from edx_ace.recipient_resolver import RecipientResolver from edx_ace.recipient_resolver import RecipientResolver
from edx_ace.recipient import Recipient from edx_ace.recipient import Recipient
from edx_ace.utils.date import serialize
from courseware.date_summary import verified_upgrade_deadline_link, verified_upgrade_link_is_valid from courseware.date_summary import verified_upgrade_deadline_link, verified_upgrade_link_is_valid
from openedx.core.djangoapps.monitoring_utils import function_trace, set_custom_metric from openedx.core.djangoapps.monitoring_utils import function_trace, set_custom_metric
...@@ -69,8 +68,6 @@ class BinnedSchedulesBaseResolver(PrefixedDebugLoggerMixin, RecipientResolver): ...@@ -69,8 +68,6 @@ class BinnedSchedulesBaseResolver(PrefixedDebugLoggerMixin, RecipientResolver):
self.current_datetime = self.target_datetime - datetime.timedelta(days=self.day_offset) self.current_datetime = self.target_datetime - datetime.timedelta(days=self.day_offset)
def send(self, msg_type): def send(self, msg_type):
_annotate_for_monitoring(msg_type, self.site, self.bin_num, self.target_datetime, self.day_offset)
for (user, language, context) in self.schedules_for_bin(): for (user, language, context) in self.schedules_for_bin():
msg = msg_type.personalize( msg = msg_type.personalize(
Recipient( Recipient(
...@@ -152,23 +149,6 @@ class BinnedSchedulesBaseResolver(PrefixedDebugLoggerMixin, RecipientResolver): ...@@ -152,23 +149,6 @@ class BinnedSchedulesBaseResolver(PrefixedDebugLoggerMixin, RecipientResolver):
return schedules return schedules
def _annotate_for_monitoring(message_type, site, bin_num, target_datetime, day_offset):
# This identifies the type of message being sent, for example: schedules.recurring_nudge3.
set_custom_metric('message_name', '{0}.{1}'.format(
message_type.app_label, message_type.name))
# The domain name of the site we are sending the message for.
set_custom_metric('site', site.domain)
# This is the "bin" of data being processed. We divide up the work into chunks so that we don't tie up celery
# workers for too long. This could help us identify particular bins that are problematic.
set_custom_metric('bin', bin_num)
# The date we are processing data for.
set_custom_metric('target_day', serialize(target_datetime))
# The number of days relative to the current date to process data for.
set_custom_metric('day_offset', day_offset)
# A unique identifier for this batch of messages being sent.
set_custom_metric('send_uuid', message_type.uuid)
class ScheduleStartResolver(BinnedSchedulesBaseResolver): class ScheduleStartResolver(BinnedSchedulesBaseResolver):
""" """
Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset``. Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset``.
...@@ -251,8 +231,7 @@ def _add_upsell_button_information_to_template_context(user, schedule, template_ ...@@ -251,8 +231,7 @@ def _add_upsell_button_information_to_template_context(user, schedule, template_
enrollment = schedule.enrollment enrollment = schedule.enrollment
course = enrollment.course course = enrollment.course
verified_upgrade_link = _get_link_to_purchase_verified_certificate( verified_upgrade_link = _get_verified_upgrade_link(user, schedule)
user, schedule)
has_verified_upgrade_link = verified_upgrade_link is not None has_verified_upgrade_link = verified_upgrade_link is not None
if has_verified_upgrade_link: if has_verified_upgrade_link:
...@@ -269,12 +248,10 @@ def _add_upsell_button_information_to_template_context(user, schedule, template_ ...@@ -269,12 +248,10 @@ def _add_upsell_button_information_to_template_context(user, schedule, template_
template_context['show_upsell'] = has_verified_upgrade_link template_context['show_upsell'] = has_verified_upgrade_link
def _get_link_to_purchase_verified_certificate(a_user, a_schedule): def _get_verified_upgrade_link(user, schedule):
enrollment = a_schedule.enrollment enrollment = schedule.enrollment
if enrollment.dynamic_upgrade_deadline is None or not verified_upgrade_link_is_valid(enrollment): if enrollment.dynamic_upgrade_deadline is not None and verified_upgrade_link_is_valid(enrollment):
return None return verified_upgrade_deadline_link(user, enrollment.course)
return verified_upgrade_deadline_link(a_user, enrollment.course)
class CourseUpdateResolver(BinnedSchedulesBaseResolver): class CourseUpdateResolver(BinnedSchedulesBaseResolver):
...@@ -291,13 +268,11 @@ class CourseUpdateResolver(BinnedSchedulesBaseResolver): ...@@ -291,13 +268,11 @@ class CourseUpdateResolver(BinnedSchedulesBaseResolver):
schedules = self.get_schedules_with_target_date_by_bin_and_orgs( schedules = self.get_schedules_with_target_date_by_bin_and_orgs(
order_by='enrollment__course', order_by='enrollment__course',
) )
LOG.debug('Course Update: Query = %r', schedules.query.sql_with_params())
for schedule in schedules: for schedule in schedules:
enrollment = schedule.enrollment enrollment = schedule.enrollment
try: try:
week_summary = get_course_week_summary( week_summary = get_course_week_summary(enrollment.course_id, week_num)
enrollment.course_id, week_num)
except CourseUpdateDoesNotExist: except CourseUpdateDoesNotExist:
continue continue
...@@ -307,7 +282,6 @@ class CourseUpdateResolver(BinnedSchedulesBaseResolver): ...@@ -307,7 +282,6 @@ class CourseUpdateResolver(BinnedSchedulesBaseResolver):
template_context = get_base_template_context(self.site) template_context = get_base_template_context(self.site)
template_context.update({ template_context.update({
'student_name': user.profile.name, 'student_name': user.profile.name,
'user_personal_address': user.profile.name if user.profile.name else user.username,
'course_name': schedule.enrollment.course.display_name, 'course_name': schedule.enrollment.course.display_name,
'course_url': absolute_url(self.site, reverse('course_root', args=[str(schedule.enrollment.course_id)])), 'course_url': absolute_url(self.site, reverse('course_root', args=[str(schedule.enrollment.course_id)])),
'week_num': week_num, 'week_num': week_num,
......
...@@ -30,6 +30,11 @@ KNOWN_RETRY_ERRORS = ( # Errors we expect occasionally that could resolve on re ...@@ -30,6 +30,11 @@ KNOWN_RETRY_ERRORS = ( # Errors we expect occasionally that could resolve on re
) )
RECURRING_NUDGE_LOG_PREFIX = 'Recurring Nudge'
UPGRADE_REMINDER_LOG_PREFIX = 'Upgrade Reminder'
COURSE_UPDATE_LOG_PREFIX = 'Course Update'
@task(bind=True, default_retry_delay=30, routing_key=ROUTING_KEY) @task(bind=True, default_retry_delay=30, routing_key=ROUTING_KEY)
def update_course_schedules(self, **kwargs): def update_course_schedules(self, **kwargs):
course_key = CourseKey.from_string(kwargs['course_id']) course_key = CourseKey.from_string(kwargs['course_id'])
...@@ -65,8 +70,7 @@ class ScheduleMessageBaseTask(Task): ...@@ -65,8 +70,7 @@ class ScheduleMessageBaseTask(Task):
current_date = resolvers._get_datetime_beginning_of_day(current_date) current_date = resolvers._get_datetime_beginning_of_day(current_date)
if not cls.is_enqueue_enabled(site): if not cls.is_enqueue_enabled(site):
cls.log_debug( cls.log_debug('Message queuing disabled for site %s', site.domain)
'Message queuing disabled for site %s', site.domain)
return return
exclude_orgs, org_list = cls.get_course_org_filter(site) exclude_orgs, org_list = cls.get_course_org_filter(site)
...@@ -132,9 +136,11 @@ class ScheduleMessageBaseTask(Task): ...@@ -132,9 +136,11 @@ class ScheduleMessageBaseTask(Task):
self, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None, self, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
): ):
msg_type = self.make_message_type(day_offset) msg_type = self.make_message_type(day_offset)
site = Site.objects.get(id=site_id)
_annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset)
return self.resolver( return self.resolver(
self.async_send_task, self.async_send_task,
Site.objects.get(id=site_id), site,
deserialize(target_day_str), deserialize(target_day_str),
day_offset, day_offset,
bin_num, bin_num,
...@@ -144,30 +150,43 @@ class ScheduleMessageBaseTask(Task): ...@@ -144,30 +150,43 @@ class ScheduleMessageBaseTask(Task):
).send(msg_type) ).send(msg_type)
def make_message_type(self, day_offset): def make_message_type(self, day_offset):
raise NotImplementedError() raise NotImplementedError
@task(ignore_result=True, routing_key=ROUTING_KEY) @task(ignore_result=True, routing_key=ROUTING_KEY)
def _recurring_nudge_schedule_send(site_id, msg_str): def _recurring_nudge_schedule_send(site_id, msg_str):
site = Site.objects.get(pk=site_id) _schedule_send(
if not ScheduleConfig.current(site).deliver_recurring_nudge: msg_str,
LOG.debug( site_id,
'Recurring Nudge: Message delivery disabled for site %s', site.domain) 'deliver_recurring_nudge',
return RECURRING_NUDGE_LOG_PREFIX,
)
msg = Message.from_string(msg_str)
# A unique identifier for this batch of messages being sent. @task(ignore_result=True, routing_key=ROUTING_KEY)
set_custom_metric('send_uuid', msg.send_uuid) def _upgrade_reminder_schedule_send(site_id, msg_str):
# A unique identifier for this particular message. _schedule_send(
set_custom_metric('uuid', msg.uuid) msg_str,
LOG.debug('Recurring Nudge: Sending message = %s', msg_str) site_id,
ace.send(msg) 'deliver_upgrade_reminder',
UPGRADE_REMINDER_LOG_PREFIX,
)
@task(ignore_result=True, routing_key=ROUTING_KEY)
def _course_update_schedule_send(site_id, msg_str):
_schedule_send(
msg_str,
site_id,
'deliver_course_update',
COURSE_UPDATE_LOG_PREFIX,
)
class ScheduleRecurringNudge(ScheduleMessageBaseTask): class ScheduleRecurringNudge(ScheduleMessageBaseTask):
num_bins = resolvers.RECURRING_NUDGE_NUM_BINS num_bins = resolvers.RECURRING_NUDGE_NUM_BINS
enqueue_config_var = 'enqueue_recurring_nudge' enqueue_config_var = 'enqueue_recurring_nudge'
log_prefix = 'Scheduled Nudge' log_prefix = RECURRING_NUDGE_LOG_PREFIX
resolver = resolvers.ScheduleStartResolver resolver = resolvers.ScheduleStartResolver
async_send_task = _recurring_nudge_schedule_send async_send_task = _recurring_nudge_schedule_send
...@@ -175,24 +194,10 @@ class ScheduleRecurringNudge(ScheduleMessageBaseTask): ...@@ -175,24 +194,10 @@ class ScheduleRecurringNudge(ScheduleMessageBaseTask):
return message_types.RecurringNudge(abs(day_offset)) return message_types.RecurringNudge(abs(day_offset))
@task(ignore_result=True, routing_key=ROUTING_KEY)
def _upgrade_reminder_schedule_send(site_id, msg_str):
site = Site.objects.get(pk=site_id)
if not ScheduleConfig.current(site).deliver_upgrade_reminder:
return
msg = Message.from_string(msg_str)
# A unique identifier for this batch of messages being sent.
set_custom_metric('send_uuid', msg.send_uuid)
# A unique identifier for this particular message.
set_custom_metric('uuid', msg.uuid)
ace.send(msg)
class ScheduleUpgradeReminder(ScheduleMessageBaseTask): class ScheduleUpgradeReminder(ScheduleMessageBaseTask):
num_bins = resolvers.UPGRADE_REMINDER_NUM_BINS num_bins = resolvers.UPGRADE_REMINDER_NUM_BINS
enqueue_config_var = 'enqueue_upgrade_reminder' enqueue_config_var = 'enqueue_upgrade_reminder'
log_prefix = 'Course Update' log_prefix = UPGRADE_REMINDER_LOG_PREFIX
resolver = resolvers.UpgradeReminderResolver resolver = resolvers.UpgradeReminderResolver
async_send_task = _upgrade_reminder_schedule_send async_send_task = _upgrade_reminder_schedule_send
...@@ -200,27 +205,51 @@ class ScheduleUpgradeReminder(ScheduleMessageBaseTask): ...@@ -200,27 +205,51 @@ class ScheduleUpgradeReminder(ScheduleMessageBaseTask):
return message_types.UpgradeReminder() return message_types.UpgradeReminder()
@task(ignore_result=True, routing_key=ROUTING_KEY)
def _course_update_schedule_send(site_id, msg_str):
site = Site.objects.get(pk=site_id)
if not ScheduleConfig.current(site).deliver_course_update:
return
msg = Message.from_string(msg_str)
# A unique identifier for this batch of messages being sent.
set_custom_metric('send_uuid', msg.send_uuid)
# A unique identifier for this particular message.
set_custom_metric('uuid', msg.uuid)
ace.send(msg)
class ScheduleCourseUpdate(ScheduleMessageBaseTask): class ScheduleCourseUpdate(ScheduleMessageBaseTask):
num_bins = resolvers.COURSE_UPDATE_NUM_BINS num_bins = resolvers.COURSE_UPDATE_NUM_BINS
enqueue_config_var = 'enqueue_course_update' enqueue_config_var = 'enqueue_course_update'
log_prefix = 'Course Update' log_prefix = COURSE_UPDATE_LOG_PREFIX
resolver = resolvers.CourseUpdateResolver resolver = resolvers.CourseUpdateResolver
async_send_task = _course_update_schedule_send async_send_task = _course_update_schedule_send
def make_message_type(self, day_offset): def make_message_type(self, day_offset):
return message_types.CourseUpdate() return message_types.CourseUpdate()
def _schedule_send(msg_str, site_id, delivery_config_var, log_prefix):
if _is_delivery_enabled(site_id, delivery_config_var, log_prefix):
msg = Message.from_string(msg_str)
_annonate_send_task_for_monitoring(msg)
LOG.debug('%s: Sending message = %s', log_prefix, msg_str)
ace.send(msg)
def _is_delivery_enabled(site_id, delivery_config_var, log_prefix):
site = Site.objects.get(pk=site_id)
if getattr(ScheduleConfig.current(site), delivery_config_var, False):
return True
else:
LOG.debug('%s: Message delivery disabled for site %s', log_prefix, site.domain)
def _annotate_for_monitoring(message_type, site, bin_num, target_day_str, day_offset):
# This identifies the type of message being sent, for example: schedules.recurring_nudge3.
set_custom_metric('message_name', '{0}.{1}'.format(message_type.app_label, message_type.name))
# The domain name of the site we are sending the message for.
set_custom_metric('site', site.domain)
# This is the "bin" of data being processed. We divide up the work into chunks so that we don't tie up celery
# workers for too long. This could help us identify particular bins that are problematic.
set_custom_metric('bin', bin_num)
# The date we are processing data for.
set_custom_metric('target_day', target_day_str)
# The number of days relative to the current date to process data for.
set_custom_metric('day_offset', day_offset)
# A unique identifier for this batch of messages being sent.
set_custom_metric('send_uuid', message_type.uuid)
def _annonate_send_task_for_monitoring(msg):
# A unique identifier for this batch of messages being sent.
set_custom_metric('send_uuid', msg.send_uuid)
# A unique identifier for this particular message.
set_custom_metric('uuid', msg.uuid)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment