Commit 199468ec by Calen Pennington

DRY up the Schedule*.run task methods

parent 0f102323
...@@ -90,7 +90,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -90,7 +90,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
@ddt.data(1, 10, 100) @ddt.data(1, 10, 100)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(tasks, '_recurring_nudge_schedule_send') @patch.object(tasks.ScheduleRecurringNudge, 'async_send_task')
def test_schedule_bin(self, schedule_count, mock_schedule_send, mock_ace): def test_schedule_bin(self, schedule_count, mock_schedule_send, mock_ace):
schedules = [ schedules = [
ScheduleFactory.create( ScheduleFactory.create(
...@@ -117,7 +117,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -117,7 +117,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
self.assertEqual(mock_schedule_send.apply_async.call_count, schedule_count) self.assertEqual(mock_schedule_send.apply_async.call_count, schedule_count)
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, '_recurring_nudge_schedule_send') @patch.object(tasks.ScheduleRecurringNudge, 'async_send_task')
def test_no_course_overview(self, mock_schedule_send): def test_no_course_overview(self, mock_schedule_send):
schedule = ScheduleFactory.create( schedule = ScheduleFactory.create(
start=datetime.datetime(2017, 8, 3, 20, 34, 30, tzinfo=pytz.UTC), start=datetime.datetime(2017, 8, 3, 20, 34, 30, tzinfo=pytz.UTC),
...@@ -143,7 +143,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -143,7 +143,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
# is null. # is null.
self.assertEqual(mock_schedule_send.apply_async.call_count, 0) self.assertEqual(mock_schedule_send.apply_async.call_count, 0)
@patch.object(tasks, '_recurring_nudge_schedule_send') @patch.object(tasks.ScheduleRecurringNudge, 'async_send_task')
def test_send_after_course_end(self, mock_schedule_send): def test_send_after_course_end(self, mock_schedule_send):
user1 = UserFactory.create(id=resolvers.RECURRING_NUDGE_NUM_BINS) user1 = UserFactory.create(id=resolvers.RECURRING_NUDGE_NUM_BINS)
...@@ -173,23 +173,21 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -173,23 +173,21 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
def test_enqueue_disabled(self, mock_ace): @patch.object(tasks.ScheduleUpgradeReminder, 'apply_async')
def test_enqueue_disabled(self, mock_ace, mock_apply_async):
ScheduleConfigFactory.create(site=self.site_config.site, enqueue_recurring_nudge=False) ScheduleConfigFactory.create(site=self.site_config.site, enqueue_recurring_nudge=False)
mock_schedule_bin = Mock()
current_datetime = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC) current_datetime = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC)
tasks.ScheduleRecurringNudge.enqueue( tasks.ScheduleRecurringNudge.enqueue(
self.site_config.site, self.site_config.site,
current_datetime, current_datetime,
mock_schedule_bin,
3 3
) )
self.assertFalse(mock_schedule_bin.called) self.assertFalse(mock_apply_async.called)
self.assertFalse(mock_schedule_bin.apply_async.called)
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(tasks, '_recurring_nudge_schedule_send') @patch.object(tasks.ScheduleRecurringNudge, 'async_send_task')
@ddt.data( @ddt.data(
((['filtered_org'], False, 1)), ((['filtered_org'], False, 1)),
((['filtered_org'], True, 2)) ((['filtered_org'], True, 2))
...@@ -237,7 +235,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -237,7 +235,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(tasks, '_recurring_nudge_schedule_send') @patch.object(tasks.ScheduleRecurringNudge, 'async_send_task')
def test_multiple_enrollments(self, mock_schedule_send, mock_ace): def test_multiple_enrollments(self, mock_schedule_send, mock_ace):
user = UserFactory.create() user = UserFactory.create()
schedules = [ schedules = [
...@@ -287,7 +285,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -287,7 +285,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
sent_messages = [] sent_messages = []
with self.settings(TEMPLATES=self._get_template_overrides()): with self.settings(TEMPLATES=self._get_template_overrides()):
with patch.object(tasks, '_recurring_nudge_schedule_send') as mock_schedule_send: with patch.object(tasks.ScheduleRecurringNudge, 'async_send_task') as mock_schedule_send:
mock_schedule_send.apply_async = lambda args, *_a, **_kw: sent_messages.append(args) mock_schedule_send.apply_async = lambda args, *_a, **_kw: sent_messages.append(args)
with self.assertNumQueries(NUM_QUERIES_WITH_MATCHES, table_blacklist=WAFFLE_TABLES): with self.assertNumQueries(NUM_QUERIES_WITH_MATCHES, table_blacklist=WAFFLE_TABLES):
...@@ -339,7 +337,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -339,7 +337,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
schedule.enrollment.course.org schedule.enrollment.course.org
] ]
sent_messages = self._stub_sender_and_collect_sent_messages(bin_task=tasks.ScheduleRecurringNudge, sent_messages = self._stub_sender_and_collect_sent_messages(bin_task=tasks.ScheduleRecurringNudge,
stubbed_send_task=patch.object(tasks, '_recurring_nudge_schedule_send'), stubbed_send_task=patch.object(tasks.ScheduleRecurringNudge, 'async_send_task'),
bin_task_params=bin_task_parameters) bin_task_params=bin_task_parameters)
self.assertEqual(len(sent_messages), 1) self.assertEqual(len(sent_messages), 1)
...@@ -371,7 +369,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -371,7 +369,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
schedule.enrollment.course.org schedule.enrollment.course.org
] ]
sent_messages = self._stub_sender_and_collect_sent_messages(bin_task=tasks.ScheduleRecurringNudge, sent_messages = self._stub_sender_and_collect_sent_messages(bin_task=tasks.ScheduleRecurringNudge,
stubbed_send_task=patch.object(tasks, '_recurring_nudge_schedule_send'), stubbed_send_task=patch.object(tasks.ScheduleRecurringNudge, 'async_send_task'),
bin_task_params=bin_task_parameters) bin_task_params=bin_task_parameters)
self.assertEqual(len(sent_messages), 1) self.assertEqual(len(sent_messages), 1)
...@@ -410,7 +408,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -410,7 +408,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
schedule.enrollment.course.org schedule.enrollment.course.org
] ]
sent_messages = self._stub_sender_and_collect_sent_messages(bin_task=tasks.ScheduleRecurringNudge, sent_messages = self._stub_sender_and_collect_sent_messages(bin_task=tasks.ScheduleRecurringNudge,
stubbed_send_task=patch.object(tasks, '_recurring_nudge_schedule_send'), stubbed_send_task=patch.object(tasks.ScheduleRecurringNudge, 'async_send_task'),
bin_task_params=bin_task_parameters) bin_task_params=bin_task_parameters)
self.assertEqual(len(sent_messages), 1) self.assertEqual(len(sent_messages), 1)
......
...@@ -94,7 +94,7 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -94,7 +94,7 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
@ddt.data(1, 10, 100) @ddt.data(1, 10, 100)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(tasks, '_upgrade_reminder_schedule_send') @patch.object(tasks.ScheduleUpgradeReminder, 'async_send_task')
def test_schedule_bin(self, schedule_count, mock_schedule_send, mock_ace): def test_schedule_bin(self, schedule_count, mock_schedule_send, mock_ace):
schedules = [ schedules = [
ScheduleFactory.create( ScheduleFactory.create(
...@@ -122,7 +122,7 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -122,7 +122,7 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
self.assertEqual(mock_schedule_send.apply_async.call_count, schedule_count) self.assertEqual(mock_schedule_send.apply_async.call_count, schedule_count)
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, '_upgrade_reminder_schedule_send') @patch.object(tasks.ScheduleUpgradeReminder, 'async_send_task')
def test_no_course_overview(self, mock_schedule_send): def test_no_course_overview(self, mock_schedule_send):
schedule = ScheduleFactory.create( schedule = ScheduleFactory.create(
...@@ -157,22 +157,21 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -157,22 +157,21 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
def test_enqueue_disabled(self, mock_ace): @patch.object(tasks.ScheduleUpgradeReminder, 'apply_async')
def test_enqueue_disabled(self, mock_ace, mock_apply_async):
ScheduleConfigFactory.create(site=self.site_config.site, enqueue_upgrade_reminder=False) ScheduleConfigFactory.create(site=self.site_config.site, enqueue_upgrade_reminder=False)
mock_schedule_bin = Mock()
current_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC) current_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC)
tasks.ScheduleUpgradeReminder.enqueue( tasks.ScheduleUpgradeReminder.enqueue(
self.site_config.site, self.site_config.site,
current_day, current_day,
day_offset=3, day_offset=3,
) )
self.assertFalse(mock_schedule_bin.called) self.assertFalse(mock_apply_async.called)
self.assertFalse(mock_schedule_bin.apply_async.called)
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(tasks, '_upgrade_reminder_schedule_send') @patch.object(tasks.ScheduleUpgradeReminder, 'async_send_task')
@ddt.data( @ddt.data(
((['filtered_org'], False, 1)), ((['filtered_org'], False, 1)),
((['filtered_org'], True, 2)) ((['filtered_org'], True, 2))
...@@ -220,7 +219,7 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -220,7 +219,7 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(tasks, '_upgrade_reminder_schedule_send') @patch.object(tasks.ScheduleUpgradeReminder, 'async_send_task')
def test_multiple_enrollments(self, mock_schedule_send, mock_ace): def test_multiple_enrollments(self, mock_schedule_send, mock_ace):
user = UserFactory.create() user = UserFactory.create()
schedules = [ schedules = [
...@@ -283,7 +282,7 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -283,7 +282,7 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
sent_messages = [] sent_messages = []
with self.settings(TEMPLATES=self._get_template_overrides()): with self.settings(TEMPLATES=self._get_template_overrides()):
with patch.object(tasks, '_upgrade_reminder_schedule_send') as mock_schedule_send: with patch.object(tasks.ScheduleUpgradeReminder, 'async_send_task') as mock_schedule_send:
mock_schedule_send.apply_async = lambda args, *_a, **_kw: sent_messages.append(args) mock_schedule_send.apply_async = lambda args, *_a, **_kw: sent_messages.append(args)
# we execute one query per course to see if it's opted out of dynamic upgrade deadlines, however, # we execute one query per course to see if it's opted out of dynamic upgrade deadlines, however,
......
...@@ -50,28 +50,14 @@ def update_course_schedules(self, **kwargs): ...@@ -50,28 +50,14 @@ def update_course_schedules(self, **kwargs):
raise self.retry(kwargs=kwargs, exc=exc) raise self.retry(kwargs=kwargs, exc=exc)
@task(ignore_result=True, routing_key=ROUTING_KEY)
def _recurring_nudge_schedule_send(site_id, msg_str):
site = Site.objects.get(pk=site_id)
if not ScheduleConfig.current(site).deliver_recurring_nudge:
LOG.debug('Recurring Nudge: Message delivery disabled for site %s', site.domain)
return
msg = Message.from_string(msg_str)
# A unique identifier for this batch of messages being sent.
set_custom_metric('send_uuid', msg.send_uuid)
# A unique identifier for this particular message.
set_custom_metric('uuid', msg.uuid)
LOG.debug('Recurring Nudge: Sending message = %s', msg_str)
ace.send(msg)
class ScheduleMessageBaseTask(Task): class ScheduleMessageBaseTask(Task):
ignore_result = True ignore_result = True
routing_key = ROUTING_KEY routing_key = ROUTING_KEY
num_bins = resolvers.DEFAULT_NUM_BINS num_bins = resolvers.DEFAULT_NUM_BINS
enqueue_config_var = None # define in subclass enqueue_config_var = None # define in subclass
log_prefix = None log_prefix = None
resolver = None # define in subclass
async_send_task = None # define in subclass
@classmethod @classmethod
def log_debug(cls, message, *args, **kwargs): def log_debug(cls, message, *args, **kwargs):
...@@ -145,17 +131,11 @@ class ScheduleMessageBaseTask(Task): ...@@ -145,17 +131,11 @@ class ScheduleMessageBaseTask(Task):
return exclude_orgs, org_list return exclude_orgs, org_list
class ScheduleRecurringNudge(ScheduleMessageBaseTask):
num_bins = resolvers.RECURRING_NUDGE_NUM_BINS
enqueue_config_var = 'enqueue_recurring_nudge'
log_prefix = 'Scheduled Nudge'
def run( def run(
self, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None, self, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
): ):
return resolvers.ScheduleStartResolver().schedule_bin( return self.resolver().schedule_bin(
_recurring_nudge_schedule_send, self.async_send_task,
site_id, site_id,
target_day_str, target_day_str,
day_offset, day_offset,
...@@ -166,25 +146,30 @@ class ScheduleRecurringNudge(ScheduleMessageBaseTask): ...@@ -166,25 +146,30 @@ class ScheduleRecurringNudge(ScheduleMessageBaseTask):
) )
class ScheduleUpgradeReminder(ScheduleMessageBaseTask): @task(ignore_result=True, routing_key=ROUTING_KEY)
num_bins = resolvers.UPGRADE_REMINDER_NUM_BINS def _recurring_nudge_schedule_send(site_id, msg_str):
enqueue_config_var = 'enqueue_upgrade_reminder' site = Site.objects.get(pk=site_id)
log_prefix = 'Course Update' if not ScheduleConfig.current(site).deliver_recurring_nudge:
LOG.debug(
'Recurring Nudge: Message delivery disabled for site %s', site.domain)
return
msg = Message.from_string(msg_str)
# A unique identifier for this batch of messages being sent.
set_custom_metric('send_uuid', msg.send_uuid)
# A unique identifier for this particular message.
set_custom_metric('uuid', msg.uuid)
LOG.debug('Recurring Nudge: Sending message = %s', msg_str)
ace.send(msg)
class ScheduleRecurringNudge(ScheduleMessageBaseTask):
num_bins = resolvers.RECURRING_NUDGE_NUM_BINS
enqueue_config_var = 'enqueue_recurring_nudge'
log_prefix = 'Scheduled Nudge'
resolver = resolvers.ScheduleStartResolver
async_send_task = _recurring_nudge_schedule_send
def run(
self, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
return resolvers.UpgradeReminderResolver().schedule_bin(
_upgrade_reminder_schedule_send,
site_id,
target_day_str,
day_offset,
bin_num,
org_list,
exclude_orgs=exclude_orgs,
override_recipient_email=override_recipient_email,
)
@task(ignore_result=True, routing_key=ROUTING_KEY) @task(ignore_result=True, routing_key=ROUTING_KEY)
def _upgrade_reminder_schedule_send(site_id, msg_str): def _upgrade_reminder_schedule_send(site_id, msg_str):
...@@ -193,27 +178,20 @@ def _upgrade_reminder_schedule_send(site_id, msg_str): ...@@ -193,27 +178,20 @@ def _upgrade_reminder_schedule_send(site_id, msg_str):
return return
msg = Message.from_string(msg_str) msg = Message.from_string(msg_str)
# A unique identifier for this batch of messages being sent.
set_custom_metric('send_uuid', msg.send_uuid)
# A unique identifier for this particular message.
set_custom_metric('uuid', msg.uuid)
ace.send(msg) ace.send(msg)
class ScheduleCourseUpdate(ScheduleMessageBaseTask): class ScheduleUpgradeReminder(ScheduleMessageBaseTask):
num_bins = resolvers.COURSE_UPDATE_NUM_BINS num_bins = resolvers.UPGRADE_REMINDER_NUM_BINS
enqueue_config_var = 'enqueue_course_update' enqueue_config_var = 'enqueue_upgrade_reminder'
log_prefix = 'Course Update' log_prefix = 'Course Update'
resolver = resolvers.UpgradeReminderResolver
async_send_task = _upgrade_reminder_schedule_send
def run(
self, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
return resolvers.CourseUpdateResolver().schedule_bin(
_course_update_schedule_send,
site_id,
target_day_str,
day_offset,
bin_num,
org_list,
exclude_orgs=exclude_orgs,
override_recipient_email=override_recipient_email,
)
@task(ignore_result=True, routing_key=ROUTING_KEY) @task(ignore_result=True, routing_key=ROUTING_KEY)
...@@ -223,4 +201,16 @@ def _course_update_schedule_send(site_id, msg_str): ...@@ -223,4 +201,16 @@ def _course_update_schedule_send(site_id, msg_str):
return return
msg = Message.from_string(msg_str) msg = Message.from_string(msg_str)
# A unique identifier for this batch of messages being sent.
set_custom_metric('send_uuid', msg.send_uuid)
# A unique identifier for this particular message.
set_custom_metric('uuid', msg.uuid)
ace.send(msg) ace.send(msg)
class ScheduleCourseUpdate(ScheduleMessageBaseTask):
num_bins = resolvers.COURSE_UPDATE_NUM_BINS
enqueue_config_var = 'enqueue_course_update'
log_prefix = 'Course Update'
resolver = resolvers.CourseUpdateResolver
async_send_task = _course_update_schedule_send
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment