Commit fcaa006c by Matjaz Gregoric

Store scheduled forum digest tasks in the database.

To avoid scheduling the same digest job more than once (for example when
multiple notifier schedulers are running at the same time).
parent 937797a9
""" """
django test runner requires a models.py file. there is nothing to see here. Database models for notifier.
""" """
from datetime import datetime, timedelta
from django.db import models
class ForumDigestTask(models.Model):
"""
ForumDigestTask model is used for synchronization between notifier schedulers to avoid multiple
scheduler instances from scheduling duplicate forum digest tasks.
"""
from_dt = models.DateTimeField(help_text="Beginning of time slice for which to send forum digests.")
to_dt = models.DateTimeField(help_text="End of time slice for which to send forum digests.")
node = models.CharField(max_length=255, blank=True, help_text="Name of node that scheduled the task.")
created = models.DateTimeField(auto_now_add=True, help_text="Time at which the task was scheduled.")
class Meta:
unique_together = (('from_dt', 'to_dt'),)
@classmethod
def prune_old_tasks(cls, day_limit):
"""
Deletes all tasks older than `day_limit` days from the database.
"""
last_keep_dt = datetime.utcnow() - timedelta(days=day_limit)
cls.objects.filter(created__lt=last_keep_dt).delete()
...@@ -100,6 +100,8 @@ FORUM_DIGEST_TASK_MAX_RETRIES = 2 ...@@ -100,6 +100,8 @@ FORUM_DIGEST_TASK_MAX_RETRIES = 2
FORUM_DIGEST_TASK_RETRY_DELAY = 300 FORUM_DIGEST_TASK_RETRY_DELAY = 300
# set the interval (in minutes) at which the top-level digest task is triggered # set the interval (in minutes) at which the top-level digest task is triggered
FORUM_DIGEST_TASK_INTERVAL = int(os.getenv('FORUM_DIGEST_TASK_INTERVAL', 1440)) FORUM_DIGEST_TASK_INTERVAL = int(os.getenv('FORUM_DIGEST_TASK_INTERVAL', 1440))
# number of days to keep forum digest task entries in the database before they are deleted
FORUM_DIGEST_TASK_GC_DAYS = int(os.getenv('FORUM_DIGEST_TASK_GC_DAYS', 30))
LOGGING = { LOGGING = {
......
...@@ -4,6 +4,7 @@ Celery tasks for generating and sending digest emails. ...@@ -4,6 +4,7 @@ Celery tasks for generating and sending digest emails.
from contextlib import closing from contextlib import closing
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
import platform
import requests import requests
from boto.ses.exceptions import SESMaxSendingRateExceededError from boto.ses.exceptions import SESMaxSendingRateExceededError
...@@ -13,6 +14,7 @@ from django.core.mail import EmailMultiAlternatives ...@@ -13,6 +14,7 @@ from django.core.mail import EmailMultiAlternatives
from notifier.connection_wrapper import get_connection from notifier.connection_wrapper import get_connection
from notifier.digest import render_digest from notifier.digest import render_digest
from notifier.models import ForumDigestTask
from notifier.pull import generate_digest_content, CommentsServiceException from notifier.pull import generate_digest_content, CommentsServiceException
from notifier.user import get_digest_subscribers, UserServiceException from notifier.user import get_digest_subscribers, UserServiceException
...@@ -107,8 +109,12 @@ def _time_slice(minutes, now=None): ...@@ -107,8 +109,12 @@ def _time_slice(minutes, now=None):
dt_start = dt_end - timedelta(minutes=minutes) dt_start = dt_end - timedelta(minutes=minutes)
return (dt_start, dt_end) return (dt_start, dt_end)
@celery.task(max_retries=settings.DAILY_TASK_MAX_RETRIES, default_retry_delay=settings.DAILY_TASK_RETRY_DELAY) @celery.task(
def do_forums_digests(): bind=True,
max_retries=settings.DAILY_TASK_MAX_RETRIES,
default_retry_delay=settings.DAILY_TASK_RETRY_DELAY
)
def do_forums_digests(self):
def batch_digest_subscribers(): def batch_digest_subscribers():
batch = [] batch = []
...@@ -122,7 +128,26 @@ def do_forums_digests(): ...@@ -122,7 +128,26 @@ def do_forums_digests():
from_dt, to_dt = _time_slice(settings.FORUM_DIGEST_TASK_INTERVAL) from_dt, to_dt = _time_slice(settings.FORUM_DIGEST_TASK_INTERVAL)
# Remove old tasks from the database so that the table doesn't keep growing forever.
ForumDigestTask.prune_old_tasks(settings.FORUM_DIGEST_TASK_GC_DAYS)
if self.request.retries == 0:
task, created = ForumDigestTask.objects.get_or_create(
from_dt=from_dt,
to_dt=to_dt,
defaults={'node': platform.node()}
)
if created:
logger.info("Beginning forums digest task: from_dt=%s to_dt=%s", from_dt, to_dt) logger.info("Beginning forums digest task: from_dt=%s to_dt=%s", from_dt, to_dt)
else:
logger.info(
"Forums digest task already scheduled by '%s'; skipping: from_dt=%s to_dt=%s",
task.node, from_dt, to_dt
)
return
else:
logger.info("Retrying forums digest task: from_dt=%s to_dt=%s", from_dt, to_dt)
try: try:
for user_batch in batch_digest_subscribers(): for user_batch in batch_digest_subscribers():
generate_and_send_digests.delay(user_batch, from_dt, to_dt) generate_and_send_digests.delay(user_batch, from_dt, to_dt)
......
...@@ -4,6 +4,7 @@ from contextlib import nested ...@@ -4,6 +4,7 @@ from contextlib import nested
import datetime import datetime
import json import json
from os.path import dirname, join from os.path import dirname, join
import platform
from boto.ses.exceptions import SESMaxSendingRateExceededError from boto.ses.exceptions import SESMaxSendingRateExceededError
from django.conf import settings from django.conf import settings
...@@ -12,6 +13,7 @@ from django.test import TestCase ...@@ -12,6 +13,7 @@ from django.test import TestCase
from django.test.utils import override_settings from django.test.utils import override_settings
from mock import patch, Mock from mock import patch, Mock
from notifier.models import ForumDigestTask
from notifier.tasks import generate_and_send_digests, do_forums_digests from notifier.tasks import generate_and_send_digests, do_forums_digests
from notifier.pull import process_cs_response, CommentsServiceException from notifier.pull import process_cs_response, CommentsServiceException
from notifier.user import UserServiceException, DIGEST_NOTIFICATION_PREFERENCE_KEY from notifier.user import UserServiceException, DIGEST_NOTIFICATION_PREFERENCE_KEY
...@@ -204,3 +206,83 @@ class TasksTestCase(TestCase): ...@@ -204,3 +206,83 @@ class TasksTestCase(TestCase):
# should have raised # should have raised
self.fail("task did not give up after exactly 3 attempts") self.fail("task did not give up after exactly 3 attempts")
@override_settings(FORUM_DIGEST_TASK_BATCH_SIZE=10)
def test_do_forums_digests_creates_database_entry(self):
# patch _time_slice
# patch get_digest_subscribers
dt1 = datetime.datetime.utcnow()
dt2 = dt1 + datetime.timedelta(days=1)
with nested(
patch('notifier.tasks.get_digest_subscribers', return_value=(usern(n) for n in xrange(11))),
patch('notifier.tasks.generate_and_send_digests'),
patch('notifier.tasks._time_slice', return_value=(dt1, dt2))
) as (p, t, ts):
self.assertEqual(ForumDigestTask.objects.count(), 0)
task_result = do_forums_digests.delay()
self.assertTrue(task_result.successful())
self.assertEqual(ForumDigestTask.objects.count(), 1)
model = ForumDigestTask.objects.all()[0]
self.assertEqual(model.from_dt, dt1)
self.assertEqual(model.to_dt, dt2)
self.assertEqual(model.node, platform.node())
@override_settings(FORUM_DIGEST_TASK_BATCH_SIZE=10)
def test_do_forums_digests_already_scheduled(self):
# patch _time_slice
# patch get_digest_subscribers
dt1 = datetime.datetime.utcnow()
dt2 = dt1 + datetime.timedelta(days=1)
dt3 = dt2 + datetime.timedelta(days=1)
# Scheduling the task for the first time sends the digests:
with nested(
patch('notifier.tasks.get_digest_subscribers', return_value=(usern(n) for n in xrange(10))),
patch('notifier.tasks._time_slice', return_value=(dt1, dt2)),
patch('notifier.tasks.generate_and_send_digests')
) as (_gs, _ts, t):
task_result = do_forums_digests.delay()
self.assertTrue(task_result.successful())
self.assertEqual(t.delay.call_count, 1)
# Scheduling the task with the same time slice again does nothing:
with nested(
patch('notifier.tasks.get_digest_subscribers', return_value=(usern(n) for n in xrange(10))),
patch('notifier.tasks._time_slice', return_value=(dt1, dt2)),
patch('notifier.tasks.generate_and_send_digests')
) as (_gs, _ts, t):
task_result = do_forums_digests.delay()
self.assertTrue(task_result.successful())
self.assertEqual(t.delay.call_count, 0)
# Scheduling the task with a different time slice sends the digests:
with nested(
patch('notifier.tasks.get_digest_subscribers', return_value=(usern(n) for n in xrange(10))),
patch('notifier.tasks._time_slice', return_value=(dt2, dt3)),
patch('notifier.tasks.generate_and_send_digests')
) as (_gs, _ts, t):
task_result = do_forums_digests.delay()
self.assertTrue(task_result.successful())
self.assertEqual(t.delay.call_count, 1)
@override_settings(FORUM_DIGEST_TASK_GC_DAYS=5)
def test_do_forums_digests_creates_database_entry(self):
# Create some ForumDigestTask objects.
now = datetime.datetime.utcnow()
# Populate the database with four ForumDigestTask objects (1, 2, 7, and 10 days old).
for days in [1, 2, 7, 10]:
dt = now - datetime.timedelta(days=days)
from_dt = dt - datetime.timedelta(days=1)
task = ForumDigestTask.objects.create(from_dt=from_dt, to_dt=dt, node='some-node')
# Bypass field's auto_now_add by forcing the update via query manager.
ForumDigestTask.objects.filter(pk=task.pk).update(created=dt)
with nested(
patch('notifier.tasks.get_digest_subscribers', return_value=(usern(n) for n in xrange(11))),
patch('notifier.tasks.generate_and_send_digests'),
) as (p, t):
# Two of the tasks that we created above are older than 5 days.
five_days_ago = now - datetime.timedelta(days=5)
self.assertEqual(ForumDigestTask.objects.filter(created__lt=five_days_ago).count(), 2)
task_result = do_forums_digests.delay()
self.assertTrue(task_result.successful())
# The two tasks that are older than 5 days should be removed.
self.assertEqual(ForumDigestTask.objects.filter(created__lt=five_days_ago).count(), 0)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment