Commit 10cd8301 by Matjaz Gregoric Committed by GitHub

Merge pull request #43 from open-craft/mtyaka/task-model

Store scheduled forum digest tasks in the database.
parents 937797a9 fcaa006c
"""
django test runner requires a models.py file. there is nothing to see here.
Database models for notifier.
"""
from datetime import datetime, timedelta
from django.db import models
class ForumDigestTask(models.Model):
"""
ForumDigestTask model is used for synchronization between notifier schedulers to avoid multiple
scheduler instances from scheduling duplicate forum digest tasks.
"""
from_dt = models.DateTimeField(help_text="Beginning of time slice for which to send forum digests.")
to_dt = models.DateTimeField(help_text="End of time slice for which to send forum digests.")
node = models.CharField(max_length=255, blank=True, help_text="Name of node that scheduled the task.")
created = models.DateTimeField(auto_now_add=True, help_text="Time at which the task was scheduled.")
class Meta:
unique_together = (('from_dt', 'to_dt'),)
@classmethod
def prune_old_tasks(cls, day_limit):
"""
Deletes all tasks older than `day_limit` days from the database.
"""
last_keep_dt = datetime.utcnow() - timedelta(days=day_limit)
cls.objects.filter(created__lt=last_keep_dt).delete()
......@@ -100,6 +100,8 @@ FORUM_DIGEST_TASK_MAX_RETRIES = 2
FORUM_DIGEST_TASK_RETRY_DELAY = 300
# set the interval (in minutes) at which the top-level digest task is triggered
FORUM_DIGEST_TASK_INTERVAL = int(os.getenv('FORUM_DIGEST_TASK_INTERVAL', 1440))
# number of days to keep forum digest task entries in the database before they are deleted
FORUM_DIGEST_TASK_GC_DAYS = int(os.getenv('FORUM_DIGEST_TASK_GC_DAYS', 30))
LOGGING = {
......
......@@ -4,6 +4,7 @@ Celery tasks for generating and sending digest emails.
from contextlib import closing
from datetime import datetime, timedelta
import logging
import platform
import requests
from boto.ses.exceptions import SESMaxSendingRateExceededError
......@@ -13,6 +14,7 @@ from django.core.mail import EmailMultiAlternatives
from notifier.connection_wrapper import get_connection
from notifier.digest import render_digest
from notifier.models import ForumDigestTask
from notifier.pull import generate_digest_content, CommentsServiceException
from notifier.user import get_digest_subscribers, UserServiceException
......@@ -107,9 +109,13 @@ def _time_slice(minutes, now=None):
dt_start = dt_end - timedelta(minutes=minutes)
return (dt_start, dt_end)
@celery.task(max_retries=settings.DAILY_TASK_MAX_RETRIES, default_retry_delay=settings.DAILY_TASK_RETRY_DELAY)
def do_forums_digests():
@celery.task(
bind=True,
max_retries=settings.DAILY_TASK_MAX_RETRIES,
default_retry_delay=settings.DAILY_TASK_RETRY_DELAY
)
def do_forums_digests(self):
def batch_digest_subscribers():
batch = []
for v in get_digest_subscribers():
......@@ -122,7 +128,26 @@ def do_forums_digests():
from_dt, to_dt = _time_slice(settings.FORUM_DIGEST_TASK_INTERVAL)
logger.info("Beginning forums digest task: from_dt=%s to_dt=%s", from_dt, to_dt)
# Remove old tasks from the database so that the table doesn't keep growing forever.
ForumDigestTask.prune_old_tasks(settings.FORUM_DIGEST_TASK_GC_DAYS)
if self.request.retries == 0:
task, created = ForumDigestTask.objects.get_or_create(
from_dt=from_dt,
to_dt=to_dt,
defaults={'node': platform.node()}
)
if created:
logger.info("Beginning forums digest task: from_dt=%s to_dt=%s", from_dt, to_dt)
else:
logger.info(
"Forums digest task already scheduled by '%s'; skipping: from_dt=%s to_dt=%s",
task.node, from_dt, to_dt
)
return
else:
logger.info("Retrying forums digest task: from_dt=%s to_dt=%s", from_dt, to_dt)
try:
for user_batch in batch_digest_subscribers():
generate_and_send_digests.delay(user_batch, from_dt, to_dt)
......
......@@ -4,6 +4,7 @@ from contextlib import nested
import datetime
import json
from os.path import dirname, join
import platform
from boto.ses.exceptions import SESMaxSendingRateExceededError
from django.conf import settings
......@@ -12,6 +13,7 @@ from django.test import TestCase
from django.test.utils import override_settings
from mock import patch, Mock
from notifier.models import ForumDigestTask
from notifier.tasks import generate_and_send_digests, do_forums_digests
from notifier.pull import process_cs_response, CommentsServiceException
from notifier.user import UserServiceException, DIGEST_NOTIFICATION_PREFERENCE_KEY
......@@ -204,3 +206,83 @@ class TasksTestCase(TestCase):
# should have raised
self.fail("task did not give up after exactly 3 attempts")
@override_settings(FORUM_DIGEST_TASK_BATCH_SIZE=10)
def test_do_forums_digests_creates_database_entry(self):
# patch _time_slice
# patch get_digest_subscribers
dt1 = datetime.datetime.utcnow()
dt2 = dt1 + datetime.timedelta(days=1)
with nested(
patch('notifier.tasks.get_digest_subscribers', return_value=(usern(n) for n in xrange(11))),
patch('notifier.tasks.generate_and_send_digests'),
patch('notifier.tasks._time_slice', return_value=(dt1, dt2))
) as (p, t, ts):
self.assertEqual(ForumDigestTask.objects.count(), 0)
task_result = do_forums_digests.delay()
self.assertTrue(task_result.successful())
self.assertEqual(ForumDigestTask.objects.count(), 1)
model = ForumDigestTask.objects.all()[0]
self.assertEqual(model.from_dt, dt1)
self.assertEqual(model.to_dt, dt2)
self.assertEqual(model.node, platform.node())
@override_settings(FORUM_DIGEST_TASK_BATCH_SIZE=10)
def test_do_forums_digests_already_scheduled(self):
# patch _time_slice
# patch get_digest_subscribers
dt1 = datetime.datetime.utcnow()
dt2 = dt1 + datetime.timedelta(days=1)
dt3 = dt2 + datetime.timedelta(days=1)
# Scheduling the task for the first time sends the digests:
with nested(
patch('notifier.tasks.get_digest_subscribers', return_value=(usern(n) for n in xrange(10))),
patch('notifier.tasks._time_slice', return_value=(dt1, dt2)),
patch('notifier.tasks.generate_and_send_digests')
) as (_gs, _ts, t):
task_result = do_forums_digests.delay()
self.assertTrue(task_result.successful())
self.assertEqual(t.delay.call_count, 1)
# Scheduling the task with the same time slice again does nothing:
with nested(
patch('notifier.tasks.get_digest_subscribers', return_value=(usern(n) for n in xrange(10))),
patch('notifier.tasks._time_slice', return_value=(dt1, dt2)),
patch('notifier.tasks.generate_and_send_digests')
) as (_gs, _ts, t):
task_result = do_forums_digests.delay()
self.assertTrue(task_result.successful())
self.assertEqual(t.delay.call_count, 0)
# Scheduling the task with a different time slice sends the digests:
with nested(
patch('notifier.tasks.get_digest_subscribers', return_value=(usern(n) for n in xrange(10))),
patch('notifier.tasks._time_slice', return_value=(dt2, dt3)),
patch('notifier.tasks.generate_and_send_digests')
) as (_gs, _ts, t):
task_result = do_forums_digests.delay()
self.assertTrue(task_result.successful())
self.assertEqual(t.delay.call_count, 1)
@override_settings(FORUM_DIGEST_TASK_GC_DAYS=5)
def test_do_forums_digests_creates_database_entry(self):
# Create some ForumDigestTask objects.
now = datetime.datetime.utcnow()
# Populate the database with four ForumDigestTask objects (1, 2, 7, and 10 days old).
for days in [1, 2, 7, 10]:
dt = now - datetime.timedelta(days=days)
from_dt = dt - datetime.timedelta(days=1)
task = ForumDigestTask.objects.create(from_dt=from_dt, to_dt=dt, node='some-node')
# Bypass field's auto_now_add by forcing the update via query manager.
ForumDigestTask.objects.filter(pk=task.pk).update(created=dt)
with nested(
patch('notifier.tasks.get_digest_subscribers', return_value=(usern(n) for n in xrange(11))),
patch('notifier.tasks.generate_and_send_digests'),
) as (p, t):
# Two of the tasks that we created above are older than 5 days.
five_days_ago = now - datetime.timedelta(days=5)
self.assertEqual(ForumDigestTask.objects.filter(created__lt=five_days_ago).count(), 2)
task_result = do_forums_digests.delay()
self.assertTrue(task_result.successful())
# The two tasks that are older than 5 days should be removed.
self.assertEqual(ForumDigestTask.objects.filter(created__lt=five_days_ago).count(), 0)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment