Commit 88d2ff24 by Calen Pennington Committed by GitHub

Merge pull request #16293 from edx/cale/refactor-upgrade-email-resolvers

Cale/refactor upgrade email resolvers
parents 5cf016cf 58bff7ed
...@@ -8,7 +8,7 @@ from openedx.core.djangoapps.schedules.utils import PrefixedDebugLoggerMixin ...@@ -8,7 +8,7 @@ from openedx.core.djangoapps.schedules.utils import PrefixedDebugLoggerMixin
class SendEmailBaseCommand(PrefixedDebugLoggerMixin, BaseCommand): class SendEmailBaseCommand(PrefixedDebugLoggerMixin, BaseCommand):
resolver_class = None # define in subclass async_send_task = None # define in subclass
def add_arguments(self, parser): def add_arguments(self, parser):
parser.add_argument( parser.add_argument(
...@@ -23,20 +23,27 @@ class SendEmailBaseCommand(PrefixedDebugLoggerMixin, BaseCommand): ...@@ -23,20 +23,27 @@ class SendEmailBaseCommand(PrefixedDebugLoggerMixin, BaseCommand):
parser.add_argument('site_domain_name') parser.add_argument('site_domain_name')
def handle(self, *args, **options): def handle(self, *args, **options):
resolver = self.make_resolver(*args, **options) self.log_debug('Args = %r', options)
self.send_emails(resolver, *args, **options)
def make_resolver(self, *args, **options):
current_date = datetime.datetime( current_date = datetime.datetime(
*[int(x) for x in options['date'].split('-')], *[int(x) for x in options['date'].split('-')],
tzinfo=pytz.UTC tzinfo=pytz.UTC
) )
self.log_debug('Args = %r', options)
self.log_debug('Current date = %s', current_date.isoformat()) self.log_debug('Current date = %s', current_date.isoformat())
site = Site.objects.get(domain__iexact=options['site_domain_name']) site = Site.objects.get(domain__iexact=options['site_domain_name'])
self.log_debug('Running for site %s', site.domain) self.log_debug('Running for site %s', site.domain)
return self.resolver_class(site, current_date)
def send_emails(self, resolver, *args, **options): override_recipient_email = options.get('override_recipient_email')
pass # define in subclass self.send_emails(site, current_date, override_recipient_email)
def send_emails(self, *args, **kwargs):
raise NotImplementedError
def enqueue(self, day_offset, site, current_date, override_recipient_email=None):
self.async_send_task.enqueue(
site,
current_date,
day_offset,
override_recipient_email,
)
from openedx.core.djangoapps.schedules.management.commands import SendEmailBaseCommand from openedx.core.djangoapps.schedules.management.commands import SendEmailBaseCommand
from openedx.core.djangoapps.schedules.resolvers import CourseUpdateResolver from openedx.core.djangoapps.schedules.tasks import ScheduleCourseUpdate
class Command(SendEmailBaseCommand): class Command(SendEmailBaseCommand):
resolver_class = CourseUpdateResolver async_send_task = ScheduleCourseUpdate
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(Command, self).__init__(*args, **kwargs) super(Command, self).__init__(*args, **kwargs)
self.log_prefix = 'Upgrade Reminder' self.log_prefix = 'Upgrade Reminder'
def send_emails(self, resolver, *args, **options): def send_emails(self, *args, **kwargs):
for day_offset in xrange(-7, -77, -7): for day_offset in xrange(-7, -77, -7):
resolver.send(day_offset, options.get('override_recipient_email')) self.enqueue(day_offset, *args, **kwargs)
from openedx.core.djangoapps.schedules.management.commands import SendEmailBaseCommand from openedx.core.djangoapps.schedules.management.commands import SendEmailBaseCommand
from openedx.core.djangoapps.schedules.resolvers import ScheduleStartResolver from openedx.core.djangoapps.schedules.tasks import ScheduleRecurringNudge
class Command(SendEmailBaseCommand): class Command(SendEmailBaseCommand):
resolver_class = ScheduleStartResolver async_send_task = ScheduleRecurringNudge
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(Command, self).__init__(*args, **kwargs) super(Command, self).__init__(*args, **kwargs)
self.log_prefix = 'Scheduled Nudge' self.log_prefix = 'Scheduled Nudge'
def send_emails(self, resolver, *args, **options): def send_emails(self, *args, **kwargs):
for day_offset in (-3, -10): for day_offset in (-3, -10):
resolver.send(day_offset, options.get('override_recipient_email')) self.enqueue(day_offset, *args, **kwargs)
from openedx.core.djangoapps.schedules.management.commands import SendEmailBaseCommand from openedx.core.djangoapps.schedules.management.commands import SendEmailBaseCommand
from openedx.core.djangoapps.schedules.resolvers import UpgradeReminderResolver from openedx.core.djangoapps.schedules.tasks import ScheduleUpgradeReminder
class Command(SendEmailBaseCommand): class Command(SendEmailBaseCommand):
resolver_class = UpgradeReminderResolver async_send_task = ScheduleUpgradeReminder
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(Command, self).__init__(*args, **kwargs) super(Command, self).__init__(*args, **kwargs)
self.log_prefix = 'Upgrade Reminder' self.log_prefix = 'Upgrade Reminder'
def send_emails(self, resolver, *args, **options): def send_emails(self, *args, **kwargs):
resolver.send(2, options.get('override_recipient_email')) self.enqueue(2, *args, **kwargs)
...@@ -18,23 +18,13 @@ from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_un ...@@ -18,23 +18,13 @@ from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_un
class TestSendEmailBaseCommand(CacheIsolationTestCase): class TestSendEmailBaseCommand(CacheIsolationTestCase):
def setUp(self): def setUp(self):
self.command = SendEmailBaseCommand() self.command = SendEmailBaseCommand()
self.site = SiteFactory()
def test_init_resolver_class(self):
assert self.command.resolver_class is None
def test_make_resolver(self):
with patch.object(self.command, 'resolver_class') as resolver_class:
example_site = SiteFactory(domain='example.com')
self.command.make_resolver(site_domain_name='example.com', date='2017-09-29')
resolver_class.assert_called_once_with(
example_site,
datetime.datetime(2017, 9, 29, tzinfo=pytz.UTC)
)
def test_handle(self): def test_handle(self):
with patch.object(self.command, 'make_resolver') as make_resolver:
make_resolver.return_value = 'resolver'
with patch.object(self.command, 'send_emails') as send_emails: with patch.object(self.command, 'send_emails') as send_emails:
self.command.handle(date='2017-09-29') self.command.handle(site_domain_name=self.site.domain, date='2017-09-29')
make_resolver.assert_called_once_with(date='2017-09-29') send_emails.assert_called_once_with(
send_emails.assert_called_once_with('resolver', date='2017-09-29') self.site,
datetime.datetime(2017, 9, 29, tzinfo=pytz.UTC),
None
)
...@@ -60,35 +60,37 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -60,35 +60,37 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
DynamicUpgradeDeadlineConfiguration.objects.create(enabled=True) DynamicUpgradeDeadlineConfiguration.objects.create(enabled=True)
@patch.object(nudge.Command, 'resolver_class') @patch.object(nudge.Command, 'async_send_task')
def test_handle(self, mock_resolver): def test_handle(self, mock_send):
test_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC) test_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC)
nudge.Command().handle(date='2017-08-01', site_domain_name=self.site_config.site.domain) nudge.Command().handle(date='2017-08-01', site_domain_name=self.site_config.site.domain)
mock_resolver.assert_called_with(self.site_config.site, test_day)
for day in (-3, -10): for day in (-3, -10):
mock_resolver().send.assert_any_call(day, None) mock_send.enqueue.assert_any_call(
self.site_config.site,
test_day,
day,
None
)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(resolvers.ScheduleStartResolver, 'async_send_task') def test_resolver_send(self, mock_ace):
def test_resolver_send(self, mock_schedule_bin, mock_ace):
current_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC) current_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC)
nudge.ScheduleStartResolver(self.site_config.site, current_day).send(-3) with patch.object(tasks.ScheduleRecurringNudge, 'apply_async') as mock_apply_async:
tasks.ScheduleRecurringNudge.enqueue(self.site_config.site, current_day, -3)
test_day = current_day + datetime.timedelta(days=-3) test_day = current_day + datetime.timedelta(days=-3)
self.assertFalse(mock_schedule_bin.called) mock_apply_async.assert_any_call(
mock_schedule_bin.apply_async.assert_any_call(
(self.site_config.site.id, serialize(test_day), -3, 0, [], True, None), (self.site_config.site.id, serialize(test_day), -3, 0, [], True, None),
retry=False, retry=False,
) )
mock_schedule_bin.apply_async.assert_any_call( mock_apply_async.assert_any_call(
(self.site_config.site.id, serialize(test_day), -3, tasks.RECURRING_NUDGE_NUM_BINS - 1, [], True, None), (self.site_config.site.id, serialize(test_day), -3, resolvers.RECURRING_NUDGE_NUM_BINS - 1, [], True, None),
retry=False, retry=False,
) )
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@ddt.data(1, 10, 100) @ddt.data(1, 10, 100)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(tasks, '_recurring_nudge_schedule_send') @patch.object(tasks.ScheduleRecurringNudge, 'async_send_task')
def test_schedule_bin(self, schedule_count, mock_schedule_send, mock_ace): def test_schedule_bin(self, schedule_count, mock_schedule_send, mock_ace):
schedules = [ schedules = [
ScheduleFactory.create( ScheduleFactory.create(
...@@ -97,25 +99,25 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -97,25 +99,25 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
) for i in range(schedule_count) ) for i in range(schedule_count)
] ]
bins_in_use = frozenset((s.enrollment.user.id % tasks.RECURRING_NUDGE_NUM_BINS) for s in schedules) bins_in_use = frozenset((s.enrollment.user.id % resolvers.RECURRING_NUDGE_NUM_BINS) for s in schedules)
test_datetime = datetime.datetime(2017, 8, 3, 18, tzinfo=pytz.UTC) test_datetime = datetime.datetime(2017, 8, 3, 18, tzinfo=pytz.UTC)
test_datetime_str = serialize(test_datetime) test_datetime_str = serialize(test_datetime)
for b in range(tasks.RECURRING_NUDGE_NUM_BINS): for b in range(resolvers.RECURRING_NUDGE_NUM_BINS):
expected_queries = NUM_QUERIES_NO_MATCHING_SCHEDULES expected_queries = NUM_QUERIES_NO_MATCHING_SCHEDULES
if b in bins_in_use: if b in bins_in_use:
# to fetch course modes for valid schedules # to fetch course modes for valid schedules
expected_queries += NUM_COURSE_MODES_QUERIES expected_queries += NUM_COURSE_MODES_QUERIES
with self.assertNumQueries(expected_queries, table_blacklist=WAFFLE_TABLES): with self.assertNumQueries(expected_queries, table_blacklist=WAFFLE_TABLES):
tasks.recurring_nudge_schedule_bin( tasks.ScheduleRecurringNudge.delay(
self.site_config.site.id, target_day_str=test_datetime_str, day_offset=-3, bin_num=b, self.site_config.site.id, target_day_str=test_datetime_str, day_offset=-3, bin_num=b,
org_list=[schedules[0].enrollment.course.org], org_list=[schedules[0].enrollment.course.org],
) )
self.assertEqual(mock_schedule_send.apply_async.call_count, schedule_count) self.assertEqual(mock_schedule_send.apply_async.call_count, schedule_count)
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, '_recurring_nudge_schedule_send') @patch.object(tasks.ScheduleRecurringNudge, 'async_send_task')
def test_no_course_overview(self, mock_schedule_send): def test_no_course_overview(self, mock_schedule_send):
schedule = ScheduleFactory.create( schedule = ScheduleFactory.create(
start=datetime.datetime(2017, 8, 3, 20, 34, 30, tzinfo=pytz.UTC), start=datetime.datetime(2017, 8, 3, 20, 34, 30, tzinfo=pytz.UTC),
...@@ -126,9 +128,9 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -126,9 +128,9 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
test_datetime = datetime.datetime(2017, 8, 3, 20, tzinfo=pytz.UTC) test_datetime = datetime.datetime(2017, 8, 3, 20, tzinfo=pytz.UTC)
test_datetime_str = serialize(test_datetime) test_datetime_str = serialize(test_datetime)
for b in range(tasks.RECURRING_NUDGE_NUM_BINS): for b in range(resolvers.RECURRING_NUDGE_NUM_BINS):
with self.assertNumQueries(NUM_QUERIES_NO_MATCHING_SCHEDULES, table_blacklist=WAFFLE_TABLES): with self.assertNumQueries(NUM_QUERIES_NO_MATCHING_SCHEDULES, table_blacklist=WAFFLE_TABLES):
tasks.recurring_nudge_schedule_bin( tasks.ScheduleRecurringNudge.delay(
self.site_config.site.id, target_day_str=test_datetime_str, day_offset=-3, bin_num=b, self.site_config.site.id, target_day_str=test_datetime_str, day_offset=-3, bin_num=b,
org_list=[schedule.enrollment.course.org], org_list=[schedule.enrollment.course.org],
) )
...@@ -141,9 +143,9 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -141,9 +143,9 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
# is null. # is null.
self.assertEqual(mock_schedule_send.apply_async.call_count, 0) self.assertEqual(mock_schedule_send.apply_async.call_count, 0)
@patch.object(tasks, '_recurring_nudge_schedule_send') @patch.object(tasks.ScheduleRecurringNudge, 'async_send_task')
def test_send_after_course_end(self, mock_schedule_send): def test_send_after_course_end(self, mock_schedule_send):
user1 = UserFactory.create(id=tasks.RECURRING_NUDGE_NUM_BINS) user1 = UserFactory.create(id=resolvers.RECURRING_NUDGE_NUM_BINS)
schedule = ScheduleFactory.create( schedule = ScheduleFactory.create(
start=datetime.datetime(2017, 8, 3, 20, 34, 30, tzinfo=pytz.UTC), start=datetime.datetime(2017, 8, 3, 20, 34, 30, tzinfo=pytz.UTC),
...@@ -155,7 +157,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -155,7 +157,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
test_datetime = datetime.datetime(2017, 8, 3, 20, tzinfo=pytz.UTC) test_datetime = datetime.datetime(2017, 8, 3, 20, tzinfo=pytz.UTC)
test_datetime_str = serialize(test_datetime) test_datetime_str = serialize(test_datetime)
tasks.recurring_nudge_schedule_bin.apply_async( tasks.ScheduleRecurringNudge.apply_async(
self.site_config.site.id, target_day_str=test_datetime_str, day_offset=-3, bin_num=0, self.site_config.site.id, target_day_str=test_datetime_str, day_offset=-3, bin_num=0,
org_list=[schedule.enrollment.course.org], org_list=[schedule.enrollment.course.org],
) )
...@@ -171,18 +173,21 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -171,18 +173,21 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(resolvers.ScheduleStartResolver, 'async_send_task') @patch.object(tasks.ScheduleUpgradeReminder, 'apply_async')
def test_enqueue_disabled(self, mock_schedule_bin, mock_ace): def test_enqueue_disabled(self, mock_ace, mock_apply_async):
ScheduleConfigFactory.create(site=self.site_config.site, enqueue_recurring_nudge=False) ScheduleConfigFactory.create(site=self.site_config.site, enqueue_recurring_nudge=False)
current_datetime = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC) current_datetime = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC)
nudge.ScheduleStartResolver(self.site_config.site, current_datetime).send(3) tasks.ScheduleRecurringNudge.enqueue(
self.assertFalse(mock_schedule_bin.called) self.site_config.site,
self.assertFalse(mock_schedule_bin.apply_async.called) current_datetime,
3
)
self.assertFalse(mock_apply_async.called)
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(tasks, '_recurring_nudge_schedule_send') @patch.object(tasks.ScheduleRecurringNudge, 'async_send_task')
@ddt.data( @ddt.data(
((['filtered_org'], False, 1)), ((['filtered_org'], False, 1)),
((['filtered_org'], True, 2)) ((['filtered_org'], True, 2))
...@@ -199,8 +204,8 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -199,8 +204,8 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
for config in (limited_config, unlimited_config): for config in (limited_config, unlimited_config):
ScheduleConfigFactory.create(site=config.site) ScheduleConfigFactory.create(site=config.site)
user1 = UserFactory.create(id=tasks.RECURRING_NUDGE_NUM_BINS) user1 = UserFactory.create(id=resolvers.RECURRING_NUDGE_NUM_BINS)
user2 = UserFactory.create(id=tasks.RECURRING_NUDGE_NUM_BINS * 2) user2 = UserFactory.create(id=resolvers.RECURRING_NUDGE_NUM_BINS * 2)
ScheduleFactory.create( ScheduleFactory.create(
start=datetime.datetime(2017, 8, 3, 17, 44, 30, tzinfo=pytz.UTC), start=datetime.datetime(2017, 8, 3, 17, 44, 30, tzinfo=pytz.UTC),
...@@ -221,7 +226,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -221,7 +226,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
test_datetime = datetime.datetime(2017, 8, 3, 17, tzinfo=pytz.UTC) test_datetime = datetime.datetime(2017, 8, 3, 17, tzinfo=pytz.UTC)
test_datetime_str = serialize(test_datetime) test_datetime_str = serialize(test_datetime)
with self.assertNumQueries(NUM_QUERIES_WITH_MATCHES, table_blacklist=WAFFLE_TABLES): with self.assertNumQueries(NUM_QUERIES_WITH_MATCHES, table_blacklist=WAFFLE_TABLES):
tasks.recurring_nudge_schedule_bin( tasks.ScheduleRecurringNudge.delay(
limited_config.site.id, target_day_str=test_datetime_str, day_offset=-3, bin_num=0, limited_config.site.id, target_day_str=test_datetime_str, day_offset=-3, bin_num=0,
org_list=org_list, exclude_orgs=exclude_orgs, org_list=org_list, exclude_orgs=exclude_orgs,
) )
...@@ -230,7 +235,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -230,7 +235,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(tasks, '_recurring_nudge_schedule_send') @patch.object(tasks.ScheduleRecurringNudge, 'async_send_task')
def test_multiple_enrollments(self, mock_schedule_send, mock_ace): def test_multiple_enrollments(self, mock_schedule_send, mock_ace):
user = UserFactory.create() user = UserFactory.create()
schedules = [ schedules = [
...@@ -245,9 +250,9 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -245,9 +250,9 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
test_datetime = datetime.datetime(2017, 8, 3, 19, 44, 30, tzinfo=pytz.UTC) test_datetime = datetime.datetime(2017, 8, 3, 19, 44, 30, tzinfo=pytz.UTC)
test_datetime_str = serialize(test_datetime) test_datetime_str = serialize(test_datetime)
with self.assertNumQueries(NUM_QUERIES_WITH_MATCHES, table_blacklist=WAFFLE_TABLES): with self.assertNumQueries(NUM_QUERIES_WITH_MATCHES, table_blacklist=WAFFLE_TABLES):
tasks.recurring_nudge_schedule_bin( tasks.ScheduleRecurringNudge.delay(
self.site_config.site.id, target_day_str=test_datetime_str, day_offset=-3, self.site_config.site.id, target_day_str=test_datetime_str, day_offset=-3,
bin_num=user.id % tasks.RECURRING_NUDGE_NUM_BINS, bin_num=user.id % resolvers.RECURRING_NUDGE_NUM_BINS,
org_list=[schedules[0].enrollment.course.org], org_list=[schedules[0].enrollment.course.org],
) )
self.assertEqual(mock_schedule_send.apply_async.call_count, 1) self.assertEqual(mock_schedule_send.apply_async.call_count, 1)
...@@ -280,11 +285,11 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -280,11 +285,11 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
sent_messages = [] sent_messages = []
with self.settings(TEMPLATES=self._get_template_overrides()): with self.settings(TEMPLATES=self._get_template_overrides()):
with patch.object(tasks, '_recurring_nudge_schedule_send') as mock_schedule_send: with patch.object(tasks.ScheduleRecurringNudge, 'async_send_task') as mock_schedule_send:
mock_schedule_send.apply_async = lambda args, *_a, **_kw: sent_messages.append(args) mock_schedule_send.apply_async = lambda args, *_a, **_kw: sent_messages.append(args)
with self.assertNumQueries(NUM_QUERIES_WITH_MATCHES, table_blacklist=WAFFLE_TABLES): with self.assertNumQueries(NUM_QUERIES_WITH_MATCHES, table_blacklist=WAFFLE_TABLES):
tasks.recurring_nudge_schedule_bin( tasks.ScheduleRecurringNudge.delay(
self.site_config.site.id, target_day_str=test_datetime_str, day_offset=day, self.site_config.site.id, target_day_str=test_datetime_str, day_offset=day,
bin_num=self._calculate_bin_for_user(user), org_list=[schedules[0].enrollment.course.org], bin_num=self._calculate_bin_for_user(user), org_list=[schedules[0].enrollment.course.org],
) )
...@@ -331,8 +336,8 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -331,8 +336,8 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
user, user,
schedule.enrollment.course.org schedule.enrollment.course.org
] ]
sent_messages = self._stub_sender_and_collect_sent_messages(bin_task=tasks.recurring_nudge_schedule_bin, sent_messages = self._stub_sender_and_collect_sent_messages(bin_task=tasks.ScheduleRecurringNudge,
stubbed_send_task=patch.object(tasks, '_recurring_nudge_schedule_send'), stubbed_send_task=patch.object(tasks.ScheduleRecurringNudge, 'async_send_task'),
bin_task_params=bin_task_parameters) bin_task_params=bin_task_parameters)
self.assertEqual(len(sent_messages), 1) self.assertEqual(len(sent_messages), 1)
...@@ -363,8 +368,8 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -363,8 +368,8 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
user, user,
schedule.enrollment.course.org schedule.enrollment.course.org
] ]
sent_messages = self._stub_sender_and_collect_sent_messages(bin_task=tasks.recurring_nudge_schedule_bin, sent_messages = self._stub_sender_and_collect_sent_messages(bin_task=tasks.ScheduleRecurringNudge,
stubbed_send_task=patch.object(tasks, '_recurring_nudge_schedule_send'), stubbed_send_task=patch.object(tasks.ScheduleRecurringNudge, 'async_send_task'),
bin_task_params=bin_task_parameters) bin_task_params=bin_task_parameters)
self.assertEqual(len(sent_messages), 1) self.assertEqual(len(sent_messages), 1)
...@@ -402,8 +407,8 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -402,8 +407,8 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
user, user,
schedule.enrollment.course.org schedule.enrollment.course.org
] ]
sent_messages = self._stub_sender_and_collect_sent_messages(bin_task=tasks.recurring_nudge_schedule_bin, sent_messages = self._stub_sender_and_collect_sent_messages(bin_task=tasks.ScheduleRecurringNudge,
stubbed_send_task=patch.object(tasks, '_recurring_nudge_schedule_send'), stubbed_send_task=patch.object(tasks.ScheduleRecurringNudge, 'async_send_task'),
bin_task_params=bin_task_parameters) bin_task_params=bin_task_parameters)
self.assertEqual(len(sent_messages), 1) self.assertEqual(len(sent_messages), 1)
...@@ -418,7 +423,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -418,7 +423,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
mock_schedule_send.apply_async = lambda args, *_a, **_kw: sent_messages.append(args) mock_schedule_send.apply_async = lambda args, *_a, **_kw: sent_messages.append(args)
bin_task( bin_task.delay(
self.site_config.site.id, self.site_config.site.id,
target_day_str=bin_task_params[0], target_day_str=bin_task_params[0],
day_offset=bin_task_params[1], day_offset=bin_task_params[1],
...@@ -434,7 +439,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -434,7 +439,7 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
return templates_override return templates_override
def _calculate_bin_for_user(self, user): def _calculate_bin_for_user(self, user):
return user.id % tasks.RECURRING_NUDGE_NUM_BINS return user.id % resolvers.RECURRING_NUDGE_NUM_BINS
def _contains_upsell_attribute(self, msg_attr): def _contains_upsell_attribute(self, msg_attr):
msg = Message.from_string(msg_attr) msg = Message.from_string(msg_attr)
......
...@@ -63,36 +63,38 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -63,36 +63,38 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
DynamicUpgradeDeadlineConfiguration.objects.create(enabled=True) DynamicUpgradeDeadlineConfiguration.objects.create(enabled=True)
@patch.object(reminder.Command, 'resolver_class') @patch.object(reminder.Command, 'async_send_task')
def test_handle(self, mock_resolver): def test_handle(self, mock_send):
test_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC) test_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC)
reminder.Command().handle(date='2017-08-01', site_domain_name=self.site_config.site.domain) reminder.Command().handle(date='2017-08-01', site_domain_name=self.site_config.site.domain)
mock_resolver.assert_called_with(self.site_config.site, test_day) mock_send.enqueue.assert_called_with(
self.site_config.site,
mock_resolver().send.assert_any_call(2, None) test_day,
2,
None
)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(resolvers.UpgradeReminderResolver, 'async_send_task') def test_resolver_send(self, mock_ace):
def test_resolver_send(self, mock_schedule_bin, mock_ace):
current_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC) current_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC)
test_day = current_day + datetime.timedelta(days=2) test_day = current_day + datetime.timedelta(days=2)
ScheduleFactory.create(upgrade_deadline=datetime.datetime(2017, 8, 3, 15, 34, 30, tzinfo=pytz.UTC)) ScheduleFactory.create(upgrade_deadline=datetime.datetime(2017, 8, 3, 15, 34, 30, tzinfo=pytz.UTC))
reminder.UpgradeReminderResolver(self.site_config.site, current_day).send(2) with patch.object(tasks.ScheduleUpgradeReminder, 'apply_async') as mock_apply_async:
self.assertFalse(mock_schedule_bin.called) tasks.ScheduleUpgradeReminder.enqueue(self.site_config.site, current_day, 2)
mock_schedule_bin.apply_async.assert_any_call( mock_apply_async.assert_any_call(
(self.site_config.site.id, serialize(test_day), 2, 0, [], True, None), (self.site_config.site.id, serialize(test_day), 2, 0, [], True, None),
retry=False, retry=False,
) )
mock_schedule_bin.apply_async.assert_any_call( mock_apply_async.assert_any_call(
(self.site_config.site.id, serialize(test_day), 2, tasks.UPGRADE_REMINDER_NUM_BINS - 1, [], True, None), (self.site_config.site.id, serialize(test_day), 2, resolvers.UPGRADE_REMINDER_NUM_BINS - 1, [], True, None),
retry=False, retry=False,
) )
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@ddt.data(1, 10, 100) @ddt.data(1, 10, 100)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(tasks, '_upgrade_reminder_schedule_send') @patch.object(tasks.ScheduleUpgradeReminder, 'async_send_task')
def test_schedule_bin(self, schedule_count, mock_schedule_send, mock_ace): def test_schedule_bin(self, schedule_count, mock_schedule_send, mock_ace):
schedules = [ schedules = [
ScheduleFactory.create( ScheduleFactory.create(
...@@ -101,18 +103,18 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -101,18 +103,18 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
) for i in range(schedule_count) ) for i in range(schedule_count)
] ]
bins_in_use = frozenset((s.enrollment.user.id % tasks.UPGRADE_REMINDER_NUM_BINS) for s in schedules) bins_in_use = frozenset((s.enrollment.user.id % resolvers.UPGRADE_REMINDER_NUM_BINS) for s in schedules)
test_datetime = datetime.datetime(2017, 8, 3, 18, tzinfo=pytz.UTC) test_datetime = datetime.datetime(2017, 8, 3, 18, tzinfo=pytz.UTC)
test_datetime_str = serialize(test_datetime) test_datetime_str = serialize(test_datetime)
for b in range(tasks.UPGRADE_REMINDER_NUM_BINS): for b in range(resolvers.UPGRADE_REMINDER_NUM_BINS):
expected_queries = NUM_QUERIES_NO_MATCHING_SCHEDULES expected_queries = NUM_QUERIES_NO_MATCHING_SCHEDULES
if b in bins_in_use: if b in bins_in_use:
# to fetch course modes for valid schedules # to fetch course modes for valid schedules
expected_queries += NUM_COURSE_MODES_QUERIES expected_queries += NUM_COURSE_MODES_QUERIES
with self.assertNumQueries(expected_queries, table_blacklist=WAFFLE_TABLES): with self.assertNumQueries(expected_queries, table_blacklist=WAFFLE_TABLES):
tasks.upgrade_reminder_schedule_bin( tasks.ScheduleUpgradeReminder.delay(
self.site_config.site.id, target_day_str=test_datetime_str, day_offset=2, bin_num=b, self.site_config.site.id, target_day_str=test_datetime_str, day_offset=2, bin_num=b,
org_list=[schedules[0].enrollment.course.org], org_list=[schedules[0].enrollment.course.org],
) )
...@@ -120,7 +122,7 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -120,7 +122,7 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
self.assertEqual(mock_schedule_send.apply_async.call_count, schedule_count) self.assertEqual(mock_schedule_send.apply_async.call_count, schedule_count)
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, '_upgrade_reminder_schedule_send') @patch.object(tasks.ScheduleUpgradeReminder, 'async_send_task')
def test_no_course_overview(self, mock_schedule_send): def test_no_course_overview(self, mock_schedule_send):
schedule = ScheduleFactory.create( schedule = ScheduleFactory.create(
...@@ -131,9 +133,9 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -131,9 +133,9 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
test_datetime = datetime.datetime(2017, 8, 3, 20, tzinfo=pytz.UTC) test_datetime = datetime.datetime(2017, 8, 3, 20, tzinfo=pytz.UTC)
test_datetime_str = serialize(test_datetime) test_datetime_str = serialize(test_datetime)
for b in range(tasks.UPGRADE_REMINDER_NUM_BINS): for b in range(resolvers.UPGRADE_REMINDER_NUM_BINS):
with self.assertNumQueries(NUM_QUERIES_NO_MATCHING_SCHEDULES, table_blacklist=WAFFLE_TABLES): with self.assertNumQueries(NUM_QUERIES_NO_MATCHING_SCHEDULES, table_blacklist=WAFFLE_TABLES):
tasks.upgrade_reminder_schedule_bin( tasks.ScheduleUpgradeReminder.delay(
self.site_config.site.id, target_day_str=test_datetime_str, day_offset=2, bin_num=b, self.site_config.site.id, target_day_str=test_datetime_str, day_offset=2, bin_num=b,
org_list=[schedule.enrollment.course.org], org_list=[schedule.enrollment.course.org],
) )
...@@ -155,18 +157,21 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -155,18 +157,21 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(resolvers.UpgradeReminderResolver, 'async_send_task') @patch.object(tasks.ScheduleUpgradeReminder, 'apply_async')
def test_enqueue_disabled(self, mock_schedule_bin, mock_ace): def test_enqueue_disabled(self, mock_ace, mock_apply_async):
ScheduleConfigFactory.create(site=self.site_config.site, enqueue_upgrade_reminder=False) ScheduleConfigFactory.create(site=self.site_config.site, enqueue_upgrade_reminder=False)
current_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC) current_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC)
reminder.UpgradeReminderResolver(self.site_config.site, current_day).send(3) tasks.ScheduleUpgradeReminder.enqueue(
self.assertFalse(mock_schedule_bin.called) self.site_config.site,
self.assertFalse(mock_schedule_bin.apply_async.called) current_day,
day_offset=3,
)
self.assertFalse(mock_apply_async.called)
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(tasks, '_upgrade_reminder_schedule_send') @patch.object(tasks.ScheduleUpgradeReminder, 'async_send_task')
@ddt.data( @ddt.data(
((['filtered_org'], False, 1)), ((['filtered_org'], False, 1)),
((['filtered_org'], True, 2)) ((['filtered_org'], True, 2))
...@@ -183,8 +188,8 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -183,8 +188,8 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
for config in (limited_config, unlimited_config): for config in (limited_config, unlimited_config):
ScheduleConfigFactory.create(site=config.site) ScheduleConfigFactory.create(site=config.site)
user1 = UserFactory.create(id=tasks.UPGRADE_REMINDER_NUM_BINS) user1 = UserFactory.create(id=resolvers.UPGRADE_REMINDER_NUM_BINS)
user2 = UserFactory.create(id=tasks.UPGRADE_REMINDER_NUM_BINS * 2) user2 = UserFactory.create(id=resolvers.UPGRADE_REMINDER_NUM_BINS * 2)
ScheduleFactory.create( ScheduleFactory.create(
upgrade_deadline=datetime.datetime(2017, 8, 3, 17, 44, 30, tzinfo=pytz.UTC), upgrade_deadline=datetime.datetime(2017, 8, 3, 17, 44, 30, tzinfo=pytz.UTC),
...@@ -205,7 +210,7 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -205,7 +210,7 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
test_datetime = datetime.datetime(2017, 8, 3, 17, tzinfo=pytz.UTC) test_datetime = datetime.datetime(2017, 8, 3, 17, tzinfo=pytz.UTC)
test_datetime_str = serialize(test_datetime) test_datetime_str = serialize(test_datetime)
with self.assertNumQueries(NUM_QUERIES_WITH_MATCHES, table_blacklist=WAFFLE_TABLES): with self.assertNumQueries(NUM_QUERIES_WITH_MATCHES, table_blacklist=WAFFLE_TABLES):
tasks.upgrade_reminder_schedule_bin( tasks.ScheduleUpgradeReminder.delay(
limited_config.site.id, target_day_str=test_datetime_str, day_offset=2, bin_num=0, limited_config.site.id, target_day_str=test_datetime_str, day_offset=2, bin_num=0,
org_list=org_list, exclude_orgs=exclude_orgs, org_list=org_list, exclude_orgs=exclude_orgs,
) )
...@@ -214,7 +219,7 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -214,7 +219,7 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
self.assertFalse(mock_ace.send.called) self.assertFalse(mock_ace.send.called)
@patch.object(tasks, 'ace') @patch.object(tasks, 'ace')
@patch.object(tasks, '_upgrade_reminder_schedule_send') @patch.object(tasks.ScheduleUpgradeReminder, 'async_send_task')
def test_multiple_enrollments(self, mock_schedule_send, mock_ace): def test_multiple_enrollments(self, mock_schedule_send, mock_ace):
user = UserFactory.create() user = UserFactory.create()
schedules = [ schedules = [
...@@ -229,9 +234,9 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -229,9 +234,9 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
test_datetime = datetime.datetime(2017, 8, 3, 19, 44, 30, tzinfo=pytz.UTC) test_datetime = datetime.datetime(2017, 8, 3, 19, 44, 30, tzinfo=pytz.UTC)
test_datetime_str = serialize(test_datetime) test_datetime_str = serialize(test_datetime)
with self.assertNumQueries(NUM_QUERIES_WITH_MATCHES, table_blacklist=WAFFLE_TABLES): with self.assertNumQueries(NUM_QUERIES_WITH_MATCHES, table_blacklist=WAFFLE_TABLES):
tasks.upgrade_reminder_schedule_bin( tasks.ScheduleUpgradeReminder.delay(
self.site_config.site.id, target_day_str=test_datetime_str, day_offset=2, self.site_config.site.id, target_day_str=test_datetime_str, day_offset=2,
bin_num=user.id % tasks.UPGRADE_REMINDER_NUM_BINS, bin_num=user.id % resolvers.UPGRADE_REMINDER_NUM_BINS,
org_list=[schedules[0].enrollment.course.org], org_list=[schedules[0].enrollment.course.org],
) )
self.assertEqual(mock_schedule_send.apply_async.call_count, 1) self.assertEqual(mock_schedule_send.apply_async.call_count, 1)
...@@ -277,14 +282,14 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -277,14 +282,14 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
sent_messages = [] sent_messages = []
with self.settings(TEMPLATES=self._get_template_overrides()): with self.settings(TEMPLATES=self._get_template_overrides()):
with patch.object(tasks, '_upgrade_reminder_schedule_send') as mock_schedule_send: with patch.object(tasks.ScheduleUpgradeReminder, 'async_send_task') as mock_schedule_send:
mock_schedule_send.apply_async = lambda args, *_a, **_kw: sent_messages.append(args) mock_schedule_send.apply_async = lambda args, *_a, **_kw: sent_messages.append(args)
# we execute one query per course to see if it's opted out of dynamic upgrade deadlines, however, # we execute one query per course to see if it's opted out of dynamic upgrade deadlines, however,
# since we create a new course for each schedule in this test, we expect there to be one per message # since we create a new course for each schedule in this test, we expect there to be one per message
num_expected_queries = NUM_QUERIES_WITH_MATCHES + NUM_QUERIES_WITH_DEADLINE num_expected_queries = NUM_QUERIES_WITH_MATCHES + NUM_QUERIES_WITH_DEADLINE
with self.assertNumQueries(num_expected_queries, table_blacklist=WAFFLE_TABLES): with self.assertNumQueries(num_expected_queries, table_blacklist=WAFFLE_TABLES):
tasks.upgrade_reminder_schedule_bin( tasks.ScheduleUpgradeReminder.delay(
self.site_config.site.id, target_day_str=test_datetime_str, day_offset=day, self.site_config.site.id, target_day_str=test_datetime_str, day_offset=day,
bin_num=self._calculate_bin_for_user(user), bin_num=self._calculate_bin_for_user(user),
org_list=[schedules[0].enrollment.course.org], org_list=[schedules[0].enrollment.course.org],
...@@ -309,4 +314,4 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): ...@@ -309,4 +314,4 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
return templates_override return templates_override
def _calculate_bin_for_user(self, user): def _calculate_bin_for_user(self, user):
return user.id % tasks.RECURRING_NUDGE_NUM_BINS return user.id % resolvers.RECURRING_NUDGE_NUM_BINS
...@@ -9,3 +9,17 @@ class ScheduleMessageType(MessageType): ...@@ -9,3 +9,17 @@ class ScheduleMessageType(MessageType):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(ScheduleMessageType, self).__init__(*args, **kwargs) super(ScheduleMessageType, self).__init__(*args, **kwargs)
self.log_level = logging.DEBUG if DEBUG_MESSAGE_WAFFLE_FLAG.is_enabled() else None self.log_level = logging.DEBUG if DEBUG_MESSAGE_WAFFLE_FLAG.is_enabled() else None
class RecurringNudge(ScheduleMessageType):
def __init__(self, day, *args, **kwargs):
super(RecurringNudge, self).__init__(*args, **kwargs)
self.name = "recurringnudge_day{}".format(day)
class UpgradeReminder(ScheduleMessageType):
pass
class CourseUpdate(ScheduleMessageType):
pass
import datetime import datetime
from itertools import groupby
import logging
import attr
from django.conf import settings
from django.contrib.auth.models import User
from django.contrib.staticfiles.templatetags.staticfiles import static
from django.core.urlresolvers import reverse
from django.db.models import F, Q
from django.utils.formats import dateformat, get_format
from edx_ace.recipient_resolver import RecipientResolver from edx_ace.recipient_resolver import RecipientResolver
from edx_ace.utils.date import serialize from edx_ace.recipient import Recipient
from openedx.core.djangoapps.schedules.models import ScheduleConfig from courseware.date_summary import verified_upgrade_deadline_link, verified_upgrade_link_is_valid
from openedx.core.djangoapps.schedules.tasks import ( from openedx.core.djangoapps.monitoring_utils import function_trace, set_custom_metric
DEFAULT_NUM_BINS, from openedx.core.djangoapps.schedules.config import COURSE_UPDATE_WAFFLE_FLAG
RECURRING_NUDGE_NUM_BINS, from openedx.core.djangoapps.schedules.exceptions import CourseUpdateDoesNotExist
UPGRADE_REMINDER_NUM_BINS, from openedx.core.djangoapps.schedules.models import Schedule
COURSE_UPDATE_NUM_BINS,
recurring_nudge_schedule_bin,
upgrade_reminder_schedule_bin,
course_update_schedule_bin,
)
from openedx.core.djangoapps.schedules.utils import PrefixedDebugLoggerMixin from openedx.core.djangoapps.schedules.utils import PrefixedDebugLoggerMixin
from openedx.core.djangoapps.site_configuration.models import SiteConfiguration from openedx.core.djangoapps.schedules.template_context import (
absolute_url,
get_base_template_context
)
from request_cache.middleware import request_cached
from xmodule.modulestore.django import modulestore
LOG = logging.getLogger(__name__)
DEFAULT_NUM_BINS = 24
RECURRING_NUDGE_NUM_BINS = DEFAULT_NUM_BINS
UPGRADE_REMINDER_NUM_BINS = DEFAULT_NUM_BINS
COURSE_UPDATE_NUM_BINS = DEFAULT_NUM_BINS
@attr.s
class BinnedSchedulesBaseResolver(PrefixedDebugLoggerMixin, RecipientResolver): class BinnedSchedulesBaseResolver(PrefixedDebugLoggerMixin, RecipientResolver):
""" """
Starts num_bins number of async tasks, each of which sends emails to an equal group of learners. Identifies learners to send messages to, pulls all needed context and sends a message to each learner.
Note that for performance reasons, it actually enqueues a task to send the message instead of sending the message
directly.
Arguments: Arguments:
async_send_task -- celery task function that sends the message
site -- Site object that filtered Schedules will be a part of site -- Site object that filtered Schedules will be a part of
current_date -- datetime that will be used (with time zeroed-out) as the current date in the queries target_datetime -- datetime that the User's Schedule's schedule_date_field value should fall under
day_offset -- int number of days relative to the Schedule's schedule_date_field that we are targeting
bin_num -- int for selecting the bin of Users whose id % num_bins == bin_num
org_list -- list of course_org names (strings) that the returned Schedules must or must not be in
(default: None)
exclude_orgs -- boolean indicating whether the returned Schedules should exclude (True) the course_orgs in
org_list or strictly include (False) them (default: False)
override_recipient_email -- string email address that should receive all emails instead of the normal
recipient. (default: None)
Static attributes: Static attributes:
async_send_task -- celery task function which this resolver will call out to schedule_date_field -- the name of the model field that represents the date that offsets should be computed
relative to. For example, if this resolver finds schedules that started 7 days ago
this variable should be set to "start".
num_bins -- the int number of bins to split the users into num_bins -- the int number of bins to split the users into
enqueue_config_var -- the string field name of the config variable on ScheduleConfig to check before enqueuing
""" """
async_send_task = None # define in subclass async_send_task = attr.ib()
site = attr.ib()
target_datetime = attr.ib()
day_offset = attr.ib()
bin_num = attr.ib()
org_list = attr.ib()
exclude_orgs = attr.ib(default=False)
override_recipient_email = attr.ib(default=None)
schedule_date_field = None
num_bins = DEFAULT_NUM_BINS num_bins = DEFAULT_NUM_BINS
enqueue_config_var = None # define in subclass
def __init__(self, site, current_date, *args, **kwargs): def __attrs_post_init__(self):
super(BinnedSchedulesBaseResolver, self).__init__(*args, **kwargs) # TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here
self.site = site self.current_datetime = self.target_datetime - datetime.timedelta(days=self.day_offset)
self.current_date = current_date.replace(hour=0, minute=0, second=0)
def send(self, day_offset, override_recipient_email=None): def send(self, msg_type):
if not self.is_enqueue_enabled(): for (user, language, context) in self.schedules_for_bin():
self.log_debug('Message queuing disabled for site %s', self.site.domain) msg = msg_type.personalize(
return Recipient(
user.username,
self.override_recipient_email or user.email,
),
language,
context,
)
with function_trace('enqueue_send_task'):
self.async_send_task.apply_async((self.site.id, str(msg)), retry=False)
exclude_orgs, org_list = self.get_course_org_filter() def get_schedules_with_target_date_by_bin_and_orgs(
self, order_by='enrollment__user__id'
):
"""
Returns Schedules with the target_date, related to Users whose id matches the bin_num, and filtered by org_list.
target_date = self.current_date + datetime.timedelta(days=day_offset) Arguments:
self.log_debug('Target date = %s', target_date.isoformat()) order_by -- string for field to sort the resulting Schedules by
for bin in range(self.num_bins): """
task_args = ( target_day = _get_datetime_beginning_of_day(self.target_datetime)
self.site.id, serialize(target_date), day_offset, bin, org_list, exclude_orgs, override_recipient_email, schedule_day_equals_target_day_filter = {
) 'courseenrollment__schedule__{}__gte'.format(self.schedule_date_field): target_day,
self.log_debug('Launching task with args = %r', task_args) 'courseenrollment__schedule__{}__lt'.format(self.schedule_date_field): target_day + datetime.timedelta(days=1),
self.async_send_task.apply_async( }
task_args, users = User.objects.filter(
retry=False, courseenrollment__is_active=True,
**schedule_day_equals_target_day_filter
).annotate(
id_mod=F('id') % self.num_bins
).filter(
id_mod=self.bin_num
) )
def is_enqueue_enabled(self): schedule_day_equals_target_day_filter = {
if self.enqueue_config_var: '{}__gte'.format(self.schedule_date_field): target_day,
return getattr(ScheduleConfig.current(self.site), self.enqueue_config_var) '{}__lt'.format(self.schedule_date_field): target_day + datetime.timedelta(days=1),
return False }
schedules = Schedule.objects.select_related(
'enrollment__user__profile',
'enrollment__course',
).prefetch_related(
'enrollment__course__modes'
).filter(
Q(enrollment__course__end__isnull=True) | Q(
enrollment__course__end__gte=self.current_datetime),
enrollment__user__in=users,
enrollment__is_active=True,
**schedule_day_equals_target_day_filter
).order_by(order_by)
if self.org_list is not None:
if self.exclude_orgs:
schedules = schedules.exclude(enrollment__course__org__in=self.org_list)
else:
schedules = schedules.filter(enrollment__course__org__in=self.org_list)
if "read_replica" in settings.DATABASES:
schedules = schedules.using("read_replica")
LOG.debug('Query = %r', schedules.query.sql_with_params())
with function_trace('schedule_query_set_evaluation'):
# This will run the query and cache all of the results in memory.
num_schedules = len(schedules)
# This should give us a sense of the volume of data being processed by each task.
set_custom_metric('num_schedules', num_schedules)
return schedules
def schedules_for_bin(self):
schedules = self.get_schedules_with_target_date_by_bin_and_orgs()
template_context = get_base_template_context(self.site)
for (user, user_schedules) in groupby(schedules, lambda s: s.enrollment.user):
user_schedules = list(user_schedules)
course_id_strs = [str(schedule.enrollment.course_id) for schedule in user_schedules]
# This is used by the bulk email optout policy
template_context['course_ids'] = course_id_strs
first_schedule = user_schedules[0]
template_context.update(self.get_template_context(user, user_schedules))
# Information for including upsell messaging in template.
_add_upsell_button_information_to_template_context(
user, first_schedule, template_context)
def get_course_org_filter(self): yield (user, first_schedule.enrollment.course.language, template_context)
def get_template_context(self, user, user_schedules):
""" """
Given the configuration of sites, get the list of orgs that should be included or excluded from this send. Given a user and their schedules, build the context needed to render the template for this message.
Arguments:
user -- the User who will be receiving the message
user_schedules -- a list of Schedule objects representing all of their schedules that should be covered by
this message. For example, when a user enrolls in multiple courses on the same day, we
don't want to send them multiple reminder emails. Instead this list would have multiple
elements, allowing us to send a single message for all of the courses.
Returns: Returns:
tuple: Returns a tuple (exclude_orgs, org_list). If exclude_orgs is True, then org_list is a list of the dict: This dict must be JSON serializable (no datetime objects!). When rendering the message templates it
only orgs that should be included in this send. If exclude_orgs is False, then org_list is a list of it will be used as the template context. Note that it will also include several default values that
orgs that should be excluded from this send. All other orgs should be included. injected into all template contexts. See `get_base_template_context` for more information.
""" """
try: return {}
site_config = SiteConfiguration.objects.get(site_id=self.site.id)
org_list = site_config.get_value('course_org_filter')
exclude_orgs = False
if not org_list:
not_orgs = set()
for other_site_config in SiteConfiguration.objects.all():
other = other_site_config.get_value('course_org_filter')
if not isinstance(other, list):
if other is not None:
not_orgs.add(other)
else:
not_orgs.update(other)
org_list = list(not_orgs)
exclude_orgs = True
elif not isinstance(org_list, list):
org_list = [org_list]
except SiteConfiguration.DoesNotExist:
org_list = None
exclude_orgs = False
finally:
return exclude_orgs, org_list
class ScheduleStartResolver(BinnedSchedulesBaseResolver): class ScheduleStartResolver(BinnedSchedulesBaseResolver):
""" """
Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset``. Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset``.
""" """
async_send_task = recurring_nudge_schedule_bin log_prefix = 'Scheduled Nudge'
schedule_date_field = 'start'
num_bins = RECURRING_NUDGE_NUM_BINS num_bins = RECURRING_NUDGE_NUM_BINS
enqueue_config_var = 'enqueue_recurring_nudge'
def __init__(self, *args, **kwargs): def get_template_context(self, user, user_schedules):
super(ScheduleStartResolver, self).__init__(*args, **kwargs) first_schedule = user_schedules[0]
self.log_prefix = 'Scheduled Nudge' return {
'course_name': first_schedule.enrollment.course.display_name,
'course_url': absolute_url(
self.site, reverse('course_root', args=[str(first_schedule.enrollment.course_id)])
),
}
def _get_datetime_beginning_of_day(dt):
"""
Truncates hours, minutes, seconds, and microseconds to zero on given datetime.
"""
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
class UpgradeReminderResolver(BinnedSchedulesBaseResolver): class UpgradeReminderResolver(BinnedSchedulesBaseResolver):
""" """
Send a message to all users whose verified upgrade deadline is at ``self.current_date`` + ``day_offset``. Send a message to all users whose verified upgrade deadline is at ``self.current_date`` + ``day_offset``.
""" """
async_send_task = upgrade_reminder_schedule_bin log_prefix = 'Upgrade Reminder'
schedule_date_field = 'upgrade_deadline'
num_bins = UPGRADE_REMINDER_NUM_BINS num_bins = UPGRADE_REMINDER_NUM_BINS
enqueue_config_var = 'enqueue_upgrade_reminder'
def __init__(self, *args, **kwargs): def get_template_context(self, user, user_schedules):
super(UpgradeReminderResolver, self).__init__(*args, **kwargs) first_schedule = user_schedules[0]
self.log_prefix = 'Upgrade Reminder' return {
'course_links': [
{
'url': absolute_url(self.site, reverse('course_root', args=[str(s.enrollment.course_id)])),
'name': s.enrollment.course.display_name
} for s in user_schedules
],
'first_course_name': first_schedule.enrollment.course.display_name,
'cert_image': absolute_url(self.site, static('course_experience/images/verified-cert.png')),
}
def _add_upsell_button_information_to_template_context(user, schedule, template_context):
enrollment = schedule.enrollment
course = enrollment.course
verified_upgrade_link = _get_verified_upgrade_link(user, schedule)
has_verified_upgrade_link = verified_upgrade_link is not None
if has_verified_upgrade_link:
template_context['upsell_link'] = verified_upgrade_link
template_context['user_schedule_upgrade_deadline_time'] = dateformat.format(
enrollment.dynamic_upgrade_deadline,
get_format(
'DATE_FORMAT',
lang=course.language,
use_l10n=True
)
)
template_context['show_upsell'] = has_verified_upgrade_link
def _get_verified_upgrade_link(user, schedule):
enrollment = schedule.enrollment
if enrollment.dynamic_upgrade_deadline is not None and verified_upgrade_link_is_valid(enrollment):
return verified_upgrade_deadline_link(user, enrollment.course)
class CourseUpdateResolver(BinnedSchedulesBaseResolver): class CourseUpdateResolver(BinnedSchedulesBaseResolver):
...@@ -127,10 +271,47 @@ class CourseUpdateResolver(BinnedSchedulesBaseResolver): ...@@ -127,10 +271,47 @@ class CourseUpdateResolver(BinnedSchedulesBaseResolver):
Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset`` and the Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset`` and the
course has updates. course has updates.
""" """
async_send_task = course_update_schedule_bin log_prefix = 'Course Update'
schedule_date_field = 'start'
num_bins = COURSE_UPDATE_NUM_BINS num_bins = COURSE_UPDATE_NUM_BINS
enqueue_config_var = 'enqueue_course_update'
def __init__(self, *args, **kwargs): def schedules_for_bin(self):
super(CourseUpdateResolver, self).__init__(*args, **kwargs) week_num = abs(self.day_offset) / 7
self.log_prefix = 'Course Update' schedules = self.get_schedules_with_target_date_by_bin_and_orgs(
order_by='enrollment__course',
)
template_context = get_base_template_context(self.site)
for schedule in schedules:
enrollment = schedule.enrollment
try:
week_summary = get_course_week_summary(enrollment.course_id, week_num)
except CourseUpdateDoesNotExist:
continue
user = enrollment.user
course_id_str = str(enrollment.course_id)
template_context.update({
'student_name': user.profile.name,
'course_name': schedule.enrollment.course.display_name,
'course_url': absolute_url(
self.site, reverse('course_root', args=[str(schedule.enrollment.course_id)])
),
'week_num': week_num,
'week_summary': week_summary,
# This is used by the bulk email optout policy
'course_ids': [course_id_str],
})
yield (user, schedule.enrollment.course.language, template_context)
@request_cached
def get_course_week_summary(course_id, week_num):
if COURSE_UPDATE_WAFFLE_FLAG.is_enabled(course_id):
course = modulestore().get_course(course_id)
return course.week_summary(week_num)
else:
raise CourseUpdateDoesNotExist()
import datetime import datetime
from itertools import groupby
import logging import logging
from celery.task import task from celery.task import task, Task
from django.conf import settings from django.conf import settings
from django.contrib.auth.models import User
from django.contrib.sites.models import Site from django.contrib.sites.models import Site
from django.contrib.staticfiles.templatetags.staticfiles import static
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse
from django.db.models import F, Q
from django.db.utils import DatabaseError from django.db.utils import DatabaseError
from django.utils.formats import dateformat, get_format
from edx_ace import ace from edx_ace import ace
from edx_ace.message import Message from edx_ace.message import Message
from edx_ace.recipient import Recipient from edx_ace.utils.date import deserialize, serialize
from edx_ace.utils.date import deserialize
from opaque_keys.edx.keys import CourseKey from opaque_keys.edx.keys import CourseKey
from courseware.date_summary import verified_upgrade_deadline_link, verified_upgrade_link_is_valid from openedx.core.djangoapps.monitoring_utils import set_custom_metric
from openedx.core.djangoapps.monitoring_utils import set_custom_metric, function_trace from openedx.core.djangoapps.schedules import message_types
from request_cache.middleware import request_cached
from xmodule.modulestore.django import modulestore
from openedx.core.djangoapps.schedules.config import COURSE_UPDATE_WAFFLE_FLAG
from openedx.core.djangoapps.schedules.exceptions import CourseUpdateDoesNotExist
from openedx.core.djangoapps.schedules.message_type import ScheduleMessageType
from openedx.core.djangoapps.schedules.models import Schedule, ScheduleConfig from openedx.core.djangoapps.schedules.models import Schedule, ScheduleConfig
from openedx.core.djangoapps.schedules.template_context import absolute_url, get_base_template_context from openedx.core.djangoapps.schedules import resolvers
from openedx.core.djangoapps.site_configuration.models import SiteConfiguration
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
...@@ -38,10 +28,11 @@ KNOWN_RETRY_ERRORS = ( # Errors we expect occasionally that could resolve on re ...@@ -38,10 +28,11 @@ KNOWN_RETRY_ERRORS = ( # Errors we expect occasionally that could resolve on re
DatabaseError, DatabaseError,
ValidationError, ValidationError,
) )
DEFAULT_NUM_BINS = 24
RECURRING_NUDGE_NUM_BINS = DEFAULT_NUM_BINS
UPGRADE_REMINDER_NUM_BINS = DEFAULT_NUM_BINS RECURRING_NUDGE_LOG_PREFIX = 'Recurring Nudge'
COURSE_UPDATE_NUM_BINS = DEFAULT_NUM_BINS UPGRADE_REMINDER_LOG_PREFIX = 'Upgrade Reminder'
COURSE_UPDATE_LOG_PREFIX = 'Course Update'
@task(bind=True, default_retry_delay=30, routing_key=ROUTING_KEY) @task(bind=True, default_retry_delay=30, routing_key=ROUTING_KEY)
...@@ -61,392 +52,204 @@ def update_course_schedules(self, **kwargs): ...@@ -61,392 +52,204 @@ def update_course_schedules(self, **kwargs):
raise self.retry(kwargs=kwargs, exc=exc) raise self.retry(kwargs=kwargs, exc=exc)
class RecurringNudge(ScheduleMessageType): class ScheduleMessageBaseTask(Task):
def __init__(self, day, *args, **kwargs): ignore_result = True
super(RecurringNudge, self).__init__(*args, **kwargs) routing_key = ROUTING_KEY
self.name = "recurringnudge_day{}".format(day) num_bins = resolvers.DEFAULT_NUM_BINS
enqueue_config_var = None # define in subclass
log_prefix = None
@task(ignore_result=True, routing_key=ROUTING_KEY) resolver = None # define in subclass
def _recurring_nudge_schedule_send(site_id, msg_str): async_send_task = None # define in subclass
site = Site.objects.get(pk=site_id)
if not ScheduleConfig.current(site).deliver_recurring_nudge:
LOG.debug('Recurring Nudge: Message delivery disabled for site %s', site.domain)
return
msg = Message.from_string(msg_str) @classmethod
# A unique identifier for this batch of messages being sent. def log_debug(cls, message, *args, **kwargs):
set_custom_metric('send_uuid', msg.send_uuid) LOG.debug(cls.log_prefix + ': ' + message, *args, **kwargs)
# A unique identifier for this particular message.
set_custom_metric('uuid', msg.uuid)
LOG.debug('Recurring Nudge: Sending message = %s', msg_str)
ace.send(msg)
@classmethod
def enqueue(cls, site, current_date, day_offset, override_recipient_email=None):
current_date = resolvers._get_datetime_beginning_of_day(current_date)
@task(ignore_result=True, routing_key=ROUTING_KEY) if not cls.is_enqueue_enabled(site):
def recurring_nudge_schedule_bin( cls.log_debug('Message queuing disabled for site %s', site.domain)
site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None, return
):
target_datetime = deserialize(target_day_str)
# TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here
current_datetime = target_datetime - datetime.timedelta(days=day_offset)
msg_type = RecurringNudge(abs(day_offset))
site = Site.objects.get(id=site_id)
_annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset) exclude_orgs, org_list = cls.get_course_org_filter(site)
for (user, language, context) in _recurring_nudge_schedules_for_bin( target_date = current_date + datetime.timedelta(days=day_offset)
site, cls.log_debug('Target date = %s', target_date.isoformat())
current_datetime, for bin in range(cls.num_bins):
target_datetime, task_args = (
bin_num, site.id,
serialize(target_date),
day_offset,
bin,
org_list, org_list,
exclude_orgs exclude_orgs,
): override_recipient_email,
msg = msg_type.personalize(
Recipient(
user.username,
override_recipient_email or user.email,
),
language,
context,
) )
with function_trace('enqueue_send_task'): cls.log_debug('Launching task with args = %r', task_args)
_recurring_nudge_schedule_send.apply_async((site_id, str(msg)), retry=False) cls.apply_async(
task_args,
retry=False,
def _annotate_for_monitoring(message_type, site, bin_num, target_day_str, day_offset):
# This identifies the type of message being sent, for example: schedules.recurring_nudge3.
set_custom_metric('message_name', '{0}.{1}'.format(message_type.app_label, message_type.name))
# The domain name of the site we are sending the message for.
set_custom_metric('site', site.domain)
# This is the "bin" of data being processed. We divide up the work into chunks so that we don't tie up celery
# workers for too long. This could help us identify particular bins that are problematic.
set_custom_metric('bin', bin_num)
# The date we are processing data for.
set_custom_metric('target_day', target_day_str)
# The number of days relative to the current date to process data for.
set_custom_metric('day_offset', day_offset)
# A unique identifier for this batch of messages being sent.
set_custom_metric('send_uuid', message_type.uuid)
def _recurring_nudge_schedules_for_bin(site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False):
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='start',
current_datetime=current_datetime,
target_datetime=target_datetime,
bin_num=bin_num,
num_bins=RECURRING_NUDGE_NUM_BINS,
org_list=org_list,
exclude_orgs=exclude_orgs,
) )
for (user, user_schedules) in groupby(schedules, lambda s: s.enrollment.user): @classmethod
user_schedules = list(user_schedules) def is_enqueue_enabled(cls, site):
course_id_strs = [str(schedule.enrollment.course_id) for schedule in user_schedules] if cls.enqueue_config_var:
return getattr(ScheduleConfig.current(site), cls.enqueue_config_var)
first_schedule = user_schedules[0] return False
template_context = get_base_template_context(site)
template_context.update({
'student_name': user.profile.name,
'course_name': first_schedule.enrollment.course.display_name,
'course_url': absolute_url(site, reverse('course_root', args=[str(first_schedule.enrollment.course_id)])),
# This is used by the bulk email optout policy
'course_ids': course_id_strs,
})
# Information for including upsell messaging in template.
_add_upsell_button_information_to_template_context(user, first_schedule, template_context)
yield (user, first_schedule.enrollment.course.language, template_context)
class UpgradeReminder(ScheduleMessageType):
pass
@classmethod
def get_course_org_filter(cls, site):
"""
Given the configuration of sites, get the list of orgs that should be included or excluded from this send.
@task(ignore_result=True, routing_key=ROUTING_KEY) Returns:
def upgrade_reminder_schedule_bin( tuple: Returns a tuple (exclude_orgs, org_list). If exclude_orgs is True, then org_list is a list of the
site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None, only orgs that should be included in this send. If exclude_orgs is False, then org_list is a list of
): orgs that should be excluded from this send. All other orgs should be included.
target_datetime = deserialize(target_day_str) """
# TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here try:
current_datetime = target_datetime - datetime.timedelta(days=day_offset) site_config = SiteConfiguration.objects.get(site_id=site.id)
msg_type = UpgradeReminder() org_list = site_config.get_value('course_org_filter')
exclude_orgs = False
if not org_list:
not_orgs = set()
for other_site_config in SiteConfiguration.objects.all():
other = other_site_config.get_value('course_org_filter')
if not isinstance(other, list):
if other is not None:
not_orgs.add(other)
else:
not_orgs.update(other)
org_list = list(not_orgs)
exclude_orgs = True
elif not isinstance(org_list, list):
org_list = [org_list]
except SiteConfiguration.DoesNotExist:
org_list = None
exclude_orgs = False
return exclude_orgs, org_list
def run(
self, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
msg_type = self.make_message_type(day_offset)
site = Site.objects.get(id=site_id) site = Site.objects.get(id=site_id)
_annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset) _annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset)
return self.resolver(
for (user, language, context) in _upgrade_reminder_schedules_for_bin( self.async_send_task,
site, site,
current_datetime, deserialize(target_day_str),
target_datetime, day_offset,
bin_num, bin_num,
org_list, org_list,
exclude_orgs
):
msg = msg_type.personalize(
Recipient(
user.username,
override_recipient_email or user.email,
),
language,
context,
)
with function_trace('enqueue_send_task'):
_upgrade_reminder_schedule_send.apply_async((site_id, str(msg)), retry=False)
@task(ignore_result=True, routing_key=ROUTING_KEY)
def _upgrade_reminder_schedule_send(site_id, msg_str):
site = Site.objects.get(pk=site_id)
if not ScheduleConfig.current(site).deliver_upgrade_reminder:
return
msg = Message.from_string(msg_str)
ace.send(msg)
def _upgrade_reminder_schedules_for_bin(site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False):
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='upgrade_deadline',
current_datetime=current_datetime,
target_datetime=target_datetime,
bin_num=bin_num,
num_bins=RECURRING_NUDGE_NUM_BINS,
org_list=org_list,
exclude_orgs=exclude_orgs, exclude_orgs=exclude_orgs,
) override_recipient_email=override_recipient_email,
).send(msg_type)
for (user, user_schedules) in groupby(schedules, lambda s: s.enrollment.user):
user_schedules = list(user_schedules)
course_id_strs = [str(schedule.enrollment.course_id) for schedule in user_schedules]
first_schedule = user_schedules[0]
template_context = get_base_template_context(site)
template_context.update({
'student_name': user.profile.name,
'user_personal_address': user.profile.name if user.profile.name else user.username,
'course_links': [
{
'url': absolute_url(site, reverse('course_root', args=[str(s.enrollment.course_id)])),
'name': s.enrollment.course.display_name
} for s in user_schedules
],
'first_course_name': first_schedule.enrollment.course.display_name, def make_message_type(self, day_offset):
raise NotImplementedError
# This is used by the bulk email optout policy
'course_ids': course_id_strs,
'cert_image': absolute_url(site, static('course_experience/images/verified-cert.png')),
})
_add_upsell_button_information_to_template_context(user, first_schedule, template_context)
yield (user, first_schedule.enrollment.course.language, template_context)
class CourseUpdate(ScheduleMessageType):
pass
@task(ignore_result=True, routing_key=ROUTING_KEY) @task(ignore_result=True, routing_key=ROUTING_KEY)
def course_update_schedule_bin( def _recurring_nudge_schedule_send(site_id, msg_str):
site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None, _schedule_send(
): msg_str,
target_datetime = deserialize(target_day_str) site_id,
# TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here 'deliver_recurring_nudge',
current_datetime = target_datetime - datetime.timedelta(days=day_offset) RECURRING_NUDGE_LOG_PREFIX,
msg_type = CourseUpdate() )
site = Site.objects.get(id=site_id)
_annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset)
for (user, language, context) in _course_update_schedules_for_bin( @task(ignore_result=True, routing_key=ROUTING_KEY)
site, def _upgrade_reminder_schedule_send(site_id, msg_str):
current_datetime, _schedule_send(
target_datetime, msg_str,
day_offset, site_id,
bin_num, 'deliver_upgrade_reminder',
org_list, UPGRADE_REMINDER_LOG_PREFIX,
exclude_orgs
):
msg = msg_type.personalize(
Recipient(
user.username,
override_recipient_email or user.email,
),
language,
context,
) )
with function_trace('enqueue_send_task'):
_course_update_schedule_send.apply_async((site_id, str(msg)), retry=False)
@task(ignore_result=True, routing_key=ROUTING_KEY) @task(ignore_result=True, routing_key=ROUTING_KEY)
def _course_update_schedule_send(site_id, msg_str): def _course_update_schedule_send(site_id, msg_str):
site = Site.objects.get(pk=site_id) _schedule_send(
if not ScheduleConfig.current(site).deliver_course_update: msg_str,
return site_id,
'deliver_course_update',
msg = Message.from_string(msg_str) COURSE_UPDATE_LOG_PREFIX,
ace.send(msg)
def _course_update_schedules_for_bin(site, current_datetime, target_datetime, day_offset, bin_num, org_list,
exclude_orgs=False):
week_num = abs(day_offset) / 7
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='start',
current_datetime=current_datetime,
target_datetime=target_datetime,
bin_num=bin_num,
num_bins=COURSE_UPDATE_NUM_BINS,
org_list=org_list,
exclude_orgs=exclude_orgs,
order_by='enrollment__course',
) )
for schedule in schedules:
enrollment = schedule.enrollment
try:
week_summary = get_course_week_summary(enrollment.course_id, week_num)
except CourseUpdateDoesNotExist:
continue
user = enrollment.user
course_id_str = str(enrollment.course_id)
template_context = get_base_template_context(site)
template_context.update({
'student_name': user.profile.name,
'user_personal_address': user.profile.name if user.profile.name else user.username,
'course_name': schedule.enrollment.course.display_name,
'course_url': absolute_url(site, reverse('course_root', args=[str(schedule.enrollment.course_id)])),
'week_num': week_num,
'week_summary': week_summary,
# This is used by the bulk email optout policy
'course_ids': [course_id_str],
})
yield (user, schedule.enrollment.course.language, template_context)
@request_cached
def get_course_week_summary(course_id, week_num):
if COURSE_UPDATE_WAFFLE_FLAG.is_enabled(course_id):
course = modulestore().get_course(course_id)
return course.week_summary(week_num)
else:
raise CourseUpdateDoesNotExist()
class ScheduleRecurringNudge(ScheduleMessageBaseTask):
num_bins = resolvers.RECURRING_NUDGE_NUM_BINS
enqueue_config_var = 'enqueue_recurring_nudge'
log_prefix = RECURRING_NUDGE_LOG_PREFIX
resolver = resolvers.ScheduleStartResolver
async_send_task = _recurring_nudge_schedule_send
def get_schedules_with_target_date_by_bin_and_orgs(schedule_date_field, current_datetime, target_datetime, bin_num, def make_message_type(self, day_offset):
num_bins=DEFAULT_NUM_BINS, org_list=None, exclude_orgs=False, return message_types.RecurringNudge(abs(day_offset))
order_by='enrollment__user__id'):
"""
Returns Schedules with the target_date, related to Users whose id matches the bin_num, and filtered by org_list.
Arguments:
schedule_date_field -- string field name to query on the User's Schedule model
current_datetime -- datetime that will be used as "right now" in the query
target_datetime -- datetime that the User's Schedule's schedule_date_field value should fall under
bin_num -- int for selecting the bin of Users whose id % num_bins == bin_num
num_bin -- int specifying the number of bins to separate the Users into (default: DEFAULT_NUM_BINS)
org_list -- list of course_org names (strings) that the returned Schedules must or must not be in (default: None)
exclude_orgs -- boolean indicating whether the returned Schedules should exclude (True) the course_orgs in org_list
or strictly include (False) them (default: False)
order_by -- string for field to sort the resulting Schedules by
"""
target_day = _get_datetime_beginning_of_day(target_datetime)
schedule_day_equals_target_day_filter = {
'courseenrollment__schedule__{}__gte'.format(schedule_date_field): target_day,
'courseenrollment__schedule__{}__lt'.format(schedule_date_field): target_day + datetime.timedelta(days=1),
}
users = User.objects.filter(
courseenrollment__is_active=True,
**schedule_day_equals_target_day_filter
).annotate(
id_mod=F('id') % num_bins
).filter(
id_mod=bin_num
)
schedule_day_equals_target_day_filter = {
'{}__gte'.format(schedule_date_field): target_day,
'{}__lt'.format(schedule_date_field): target_day + datetime.timedelta(days=1),
}
schedules = Schedule.objects.select_related(
'enrollment__user__profile',
'enrollment__course',
).prefetch_related(
'enrollment__course__modes'
).filter(
Q(enrollment__course__end__isnull=True) | Q(enrollment__course__end__gte=current_datetime),
enrollment__user__in=users,
enrollment__is_active=True,
**schedule_day_equals_target_day_filter
).order_by(order_by)
if org_list is not None:
if exclude_orgs:
schedules = schedules.exclude(enrollment__course__org__in=org_list)
else:
schedules = schedules.filter(enrollment__course__org__in=org_list)
if "read_replica" in settings.DATABASES:
schedules = schedules.using("read_replica")
LOG.debug('Query = %r', schedules.query.sql_with_params()) class ScheduleUpgradeReminder(ScheduleMessageBaseTask):
num_bins = resolvers.UPGRADE_REMINDER_NUM_BINS
enqueue_config_var = 'enqueue_upgrade_reminder'
log_prefix = UPGRADE_REMINDER_LOG_PREFIX
resolver = resolvers.UpgradeReminderResolver
async_send_task = _upgrade_reminder_schedule_send
with function_trace('schedule_query_set_evaluation'): def make_message_type(self, day_offset):
# This will run the query and cache all of the results in memory. return message_types.UpgradeReminder()
num_schedules = len(schedules)
# This should give us a sense of the volume of data being processed by each task.
set_custom_metric('num_schedules', num_schedules)
return schedules class ScheduleCourseUpdate(ScheduleMessageBaseTask):
num_bins = resolvers.COURSE_UPDATE_NUM_BINS
enqueue_config_var = 'enqueue_course_update'
log_prefix = COURSE_UPDATE_LOG_PREFIX
resolver = resolvers.CourseUpdateResolver
async_send_task = _course_update_schedule_send
def make_message_type(self, day_offset):
return message_types.CourseUpdate()
def _add_upsell_button_information_to_template_context(user, schedule, template_context):
enrollment = schedule.enrollment
course = enrollment.course
verified_upgrade_link = _get_link_to_purchase_verified_certificate(user, schedule) def _schedule_send(msg_str, site_id, delivery_config_var, log_prefix):
has_verified_upgrade_link = verified_upgrade_link is not None if _is_delivery_enabled(site_id, delivery_config_var, log_prefix):
msg = Message.from_string(msg_str)
if has_verified_upgrade_link: _annonate_send_task_for_monitoring(msg)
template_context['upsell_link'] = verified_upgrade_link LOG.debug('%s: Sending message = %s', log_prefix, msg_str)
template_context['user_schedule_upgrade_deadline_time'] = dateformat.format( ace.send(msg)
enrollment.dynamic_upgrade_deadline,
get_format(
'DATE_FORMAT',
lang=course.language,
use_l10n=True
)
)
template_context['show_upsell'] = has_verified_upgrade_link
def _is_delivery_enabled(site_id, delivery_config_var, log_prefix):
site = Site.objects.get(pk=site_id)
if getattr(ScheduleConfig.current(site), delivery_config_var, False):
return True
else:
LOG.debug('%s: Message delivery disabled for site %s', log_prefix, site.domain)
def _get_link_to_purchase_verified_certificate(a_user, a_schedule):
enrollment = a_schedule.enrollment
if enrollment.dynamic_upgrade_deadline is None or not verified_upgrade_link_is_valid(enrollment):
return None
return verified_upgrade_deadline_link(a_user, enrollment.course) def _annotate_for_monitoring(message_type, site, bin_num, target_day_str, day_offset):
# This identifies the type of message being sent, for example: schedules.recurring_nudge3.
set_custom_metric('message_name', '{0}.{1}'.format(message_type.app_label, message_type.name))
# The domain name of the site we are sending the message for.
set_custom_metric('site', site.domain)
# This is the "bin" of data being processed. We divide up the work into chunks so that we don't tie up celery
# workers for too long. This could help us identify particular bins that are problematic.
set_custom_metric('bin', bin_num)
# The date we are processing data for.
set_custom_metric('target_day', target_day_str)
# The number of days relative to the current date to process data for.
set_custom_metric('day_offset', day_offset)
# A unique identifier for this batch of messages being sent.
set_custom_metric('send_uuid', message_type.uuid)
def _get_datetime_beginning_of_day(dt): def _annonate_send_task_for_monitoring(msg):
""" # A unique identifier for this batch of messages being sent.
Truncates hours, minutes, seconds, and microseconds to zero on given datetime. set_custom_metric('send_uuid', msg.send_uuid)
""" # A unique identifier for this particular message.
return dt.replace(hour=0, minute=0, second=0, microsecond=0) set_custom_metric('uuid', msg.uuid)
...@@ -3,10 +3,10 @@ from unittest import skipUnless ...@@ -3,10 +3,10 @@ from unittest import skipUnless
import ddt import ddt
from django.conf import settings from django.conf import settings
from mock import patch from mock import patch, DEFAULT, Mock
from openedx.core.djangoapps.schedules.resolvers import BinnedSchedulesBaseResolver from openedx.core.djangoapps.schedules.tasks import ScheduleMessageBaseTask
from openedx.core.djangoapps.schedules.tasks import DEFAULT_NUM_BINS from openedx.core.djangoapps.schedules.resolvers import DEFAULT_NUM_BINS
from openedx.core.djangoapps.schedules.tests.factories import ScheduleConfigFactory from openedx.core.djangoapps.schedules.tests.factories import ScheduleConfigFactory
from openedx.core.djangoapps.site_configuration.tests.factories import SiteConfigurationFactory, SiteFactory from openedx.core.djangoapps.site_configuration.tests.factories import SiteConfigurationFactory, SiteFactory
from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_unless_lms from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_unless_lms
...@@ -16,68 +16,61 @@ from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_un ...@@ -16,68 +16,61 @@ from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_un
@skip_unless_lms @skip_unless_lms
@skipUnless('openedx.core.djangoapps.schedules.apps.SchedulesConfig' in settings.INSTALLED_APPS, @skipUnless('openedx.core.djangoapps.schedules.apps.SchedulesConfig' in settings.INSTALLED_APPS,
"Can't test schedules if the app isn't installed") "Can't test schedules if the app isn't installed")
class TestBinnedSchedulesBaseResolver(CacheIsolationTestCase): class TestScheduleMessageBaseTask(CacheIsolationTestCase):
def setUp(self): def setUp(self):
super(TestBinnedSchedulesBaseResolver, self).setUp() super(TestScheduleMessageBaseTask, self).setUp()
self.site = SiteFactory.create() self.site = SiteFactory.create()
self.site_config = SiteConfigurationFactory.create(site=self.site) self.site_config = SiteConfigurationFactory.create(site=self.site)
self.schedule_config = ScheduleConfigFactory.create(site=self.site) self.schedule_config = ScheduleConfigFactory.create(site=self.site)
self.basetask = ScheduleMessageBaseTask
def setup_resolver(self, site=None, current_date=None):
if site is None:
site = self.site
if current_date is None:
current_date = datetime.datetime.now()
resolver = BinnedSchedulesBaseResolver(self.site, current_date)
return resolver
def test_init_site(self):
resolver = self.setup_resolver()
assert resolver.site == self.site
def test_init_current_date(self):
current_time = datetime.datetime.now()
resolver = self.setup_resolver(current_date=current_time)
current_date = current_time.replace(hour=0, minute=0, second=0)
assert resolver.current_date == current_date
def test_init_async_send_task(self):
resolver = self.setup_resolver()
assert resolver.async_send_task is None
def test_init_num_bins(self):
resolver = self.setup_resolver()
assert resolver.num_bins == DEFAULT_NUM_BINS
def test_send_enqueue_disabled(self): def test_send_enqueue_disabled(self):
resolver = self.setup_resolver() send = Mock(name='async_send_task')
resolver.is_enqueue_enabled = lambda: False with patch.multiple(
with patch.object(resolver, 'async_send_task') as send: self.basetask,
with patch.object(resolver, 'log_debug') as log_debug: is_enqueue_enabled=Mock(return_value=False),
resolver.send(day_offset=2) log_debug=DEFAULT,
log_debug.assert_called_once_with('Message queuing disabled for site %s', self.site.domain) run=send,
) as patches:
self.basetask.enqueue(
site=self.site,
current_date=datetime.datetime.now(),
day_offset=2
)
patches['log_debug'].assert_called_once_with(
'Message queuing disabled for site %s', self.site.domain)
send.apply_async.assert_not_called() send.apply_async.assert_not_called()
@ddt.data(0, 2, -3) @ddt.data(0, 2, -3)
def test_send_enqueue_enabled(self, day_offset): def test_send_enqueue_enabled(self, day_offset):
resolver = self.setup_resolver() send = Mock(name='async_send_task')
resolver.is_enqueue_enabled = lambda: True current_date = datetime.datetime.now()
resolver.get_course_org_filter = lambda: (False, None) with patch.multiple(
with patch.object(resolver, 'async_send_task') as send: self.basetask,
with patch.object(resolver, 'log_debug') as log_debug: is_enqueue_enabled=Mock(return_value=True),
resolver.send(day_offset=day_offset) get_course_org_filter=Mock(return_value=(False, None)),
target_date = resolver.current_date + datetime.timedelta(day_offset) log_debug=DEFAULT,
log_debug.assert_any_call('Target date = %s', target_date.isoformat()) run=send,
assert send.apply_async.call_count == DEFAULT_NUM_BINS ) as patches:
self.basetask.enqueue(
site=self.site,
current_date=current_date,
day_offset=day_offset
)
target_date = current_date.replace(hour=0, minute=0, second=0, microsecond=0) + \
datetime.timedelta(day_offset)
print(patches['log_debug'].mock_calls)
patches['log_debug'].assert_any_call(
'Target date = %s', target_date.isoformat())
assert send.call_count == DEFAULT_NUM_BINS
@ddt.data(True, False) @ddt.data(True, False)
def test_is_enqueue_enabled(self, enabled): def test_is_enqueue_enabled(self, enabled):
resolver = self.setup_resolver() with patch.object(self.basetask, 'enqueue_config_var', 'enqueue_recurring_nudge'):
resolver.enqueue_config_var = 'enqueue_recurring_nudge'
self.schedule_config.enqueue_recurring_nudge = enabled self.schedule_config.enqueue_recurring_nudge = enabled
self.schedule_config.save() self.schedule_config.save()
assert resolver.is_enqueue_enabled() == enabled assert self.basetask.is_enqueue_enabled(self.site) == enabled
@ddt.unpack @ddt.unpack
@ddt.data( @ddt.data(
...@@ -85,10 +78,9 @@ class TestBinnedSchedulesBaseResolver(CacheIsolationTestCase): ...@@ -85,10 +78,9 @@ class TestBinnedSchedulesBaseResolver(CacheIsolationTestCase):
(['course1', 'course2'], ['course1', 'course2']) (['course1', 'course2'], ['course1', 'course2'])
) )
def test_get_course_org_filter_include(self, course_org_filter, expected_org_list): def test_get_course_org_filter_include(self, course_org_filter, expected_org_list):
resolver = self.setup_resolver()
self.site_config.values['course_org_filter'] = course_org_filter self.site_config.values['course_org_filter'] = course_org_filter
self.site_config.save() self.site_config.save()
exclude_orgs, org_list = resolver.get_course_org_filter() exclude_orgs, org_list = self.basetask.get_course_org_filter(self.site)
assert not exclude_orgs assert not exclude_orgs
assert org_list == expected_org_list assert org_list == expected_org_list
...@@ -99,12 +91,9 @@ class TestBinnedSchedulesBaseResolver(CacheIsolationTestCase): ...@@ -99,12 +91,9 @@ class TestBinnedSchedulesBaseResolver(CacheIsolationTestCase):
(['course1', 'course2'], [u'course1', u'course2']) (['course1', 'course2'], [u'course1', u'course2'])
) )
def test_get_course_org_filter_exclude(self, course_org_filter, expected_org_list): def test_get_course_org_filter_exclude(self, course_org_filter, expected_org_list):
resolver = self.setup_resolver() SiteConfigurationFactory.create(
self.other_site = SiteFactory.create()
self.other_site_config = SiteConfigurationFactory.create(
site=self.other_site,
values={'course_org_filter': course_org_filter}, values={'course_org_filter': course_org_filter},
) )
exclude_orgs, org_list = resolver.get_course_org_filter() exclude_orgs, org_list = self.basetask.get_course_org_filter(self.site)
assert exclude_orgs assert exclude_orgs
self.assertItemsEqual(org_list, expected_org_list) self.assertItemsEqual(org_list, expected_org_list)
...@@ -6,8 +6,11 @@ LOG = logging.getLogger(__name__) ...@@ -6,8 +6,11 @@ LOG = logging.getLogger(__name__)
# TODO: consider using a LoggerAdapter instead of this mixin: # TODO: consider using a LoggerAdapter instead of this mixin:
# https://docs.python.org/2/library/logging.html#logging.LoggerAdapter # https://docs.python.org/2/library/logging.html#logging.LoggerAdapter
class PrefixedDebugLoggerMixin(object): class PrefixedDebugLoggerMixin(object):
log_prefix = None
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(PrefixedDebugLoggerMixin, self).__init__(*args, **kwargs) super(PrefixedDebugLoggerMixin, self).__init__(*args, **kwargs)
if self.log_prefix is None:
self.log_prefix = self.__class__.__name__ self.log_prefix = self.__class__.__name__
def log_debug(self, message, *args, **kwargs): def log_debug(self, message, *args, **kwargs):
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
# * @edx/ospr - to check licensing # * @edx/ospr - to check licensing
# * @edx/devops - to check system requirements # * @edx/devops - to check system requirements
attrs==17.2.0
beautifulsoup4==4.1.3 beautifulsoup4==4.1.3
beautifulsoup==3.2.1 beautifulsoup==3.2.1
bleach==1.4 bleach==1.4
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment