Commit d2909e6e by Greg Price

Implement email digests for forum content

This is the initial implementation. See README for more details.
parent 5670f4ac
Jim Abramson <jsa@edx.org>
Greg Price <gprice@edx.org>
Marco Morales <marcotuts@gmail.com>
Change Log
----------
These are notable changes in notifier. This is a rolling list of changes,
in roughly chronological order, most recent first. Add your entries at or near
the top.
How To Contribute
=================
Contributions are very welcome. The easiest way is to fork this repo, and then
make a pull request from your fork. The first time you make a pull request, you
may be asked to sign a Contributor Agreement.
Be sure your pull request adds you to the AUTHORS file if you are not
already listed and adds an entry to the CHANGELOG for any non-trivial
code change.
This diff is collapsed. Click to expand it.
notifier
========
Part of `edX code`__.
__ http://code.edx.org/
Notifier
=======================
This is a django application for edX platform notifications.
It currently sends daily digests of new content to subscribed forums
users, with a goal of eventually supporting real-time and batched
notifications of various types of content across various channels
(e.g. SMS).
Getting Started
-------------------------------
To run tests: ``python manage.py test notifier``
To start the celery worker: ``python manage.py celery worker``
To run the nightly forums digest batch job (use --help to see
options): ``python manage.py forums_digest``
License
-------
The code in this repository is licensed under version 3 of the AGPL unless
otherwise noted.
Please see ``LICENSE.txt`` for details.
How to Contribute
-----------------
Contributions are very welcome. The easiest way is to fork this repo, and then
make a pull request from your fork. The first time you make a pull request, you
may be asked to sign a Contributor Agreement.
Please see ``CONTRIBUTING.rst`` for details.
Reporting Security Issues
-------------------------
Please do not report security issues in public. Please email security@edx.org
Mailing List and IRC Channel
----------------------------
You can discuss this code on the `edx-code Google Group`__ or in the
``edx-code`` IRC channel on Freenode.
__ https://groups.google.com/forum/#!forum/edx-code
#!/usr/bin/env python
import os
import sys
if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "notifier.settings")
from django.core.management import execute_from_command_line
execute_from_command_line(sys.argv)
import logging
from django.conf import settings
from dogapi import dog_stats_api
logger = logging.getLogger(__name__)
if settings.DATADOG_API_KEY:
logger.info("Initializing datadog")
dog_stats_api.start(api_key=settings.DATADOG_API_KEY, statsd=True)
else:
logger.info("No datadog API key found, skipping datadog init")
# silence chatty logging from urllib3 via requests
logging.getLogger("requests.packages.urllib3").setLevel(logging.WARN)
import logging
import time
from django.conf import settings
from django.core.mail import get_connection as dj_get_connection
from dogapi import dog_stats_api
logger = logging.getLogger(__name__)
class BackendWrapper(object):
"""A wrapper around Django's Email Backend, providing hooks
for instrumentation and testing.
"""
def __init__(self, backend):
self._backend = backend
logger.info("initialized connection wrapper with email backend: %s", backend)
def send_messages(self, email_messages):
# check settings hook for rewriting email recipient, act accordingly
if settings.EMAIL_REWRITE_RECIPIENT:
for message in email_messages:
message.to = [settings.EMAIL_REWRITE_RECIPIENT]
# send the messages
t = time.time()
msg_count = self._backend.send_messages(email_messages)
elapsed = time.time() - t
if msg_count > 0:
logger.info('sent %s messages, elapsed: %.3fs' % (msg_count, elapsed))
# report an average timing to datadog
dog_stats_api.histogram('notifier.send.time', elapsed / msg_count)
dog_stats_api.increment('notifier.send.count', msg_count)
if msg_count != len(email_messages):
logger.warn('send_messages() was called with %s messages but return value was %s',
len(email_messages), msg_count)
return msg_count
def close(self):
# never raise Exceptions on close().
try:
self._backend.close()
except Exception, e:
logger.debug("self._backend.close() failed: %s", e)
def __getattr__(self, a):
return getattr(self._backend, a)
def get_connection(*a, **kw):
return BackendWrapper(dj_get_connection(*a, **kw))
"""
General formatting and rendering helpers for digest notifications.
"""
import datetime
import logging
from django.conf import settings
from django.template.loader import get_template
from django.template import Context
from django.utils.html import strip_tags
from statsd import statsd
from notifier.user import UsernameCipher
# maximum number of threads to display per course
MAX_COURSE_THREADS = 30
# maximum number of items (posts) to display per thread
MAX_THREAD_ITEMS = 10
# maximum number of characters to allow in thread title, before truncating
THREAD_TITLE_MAXLEN = 140
# maximum number of characters to allow in thread post, before truncating
THREAD_ITEM_MAXLEN = 140
logger = logging.getLogger(__name__)
def _clean_markup(content):
"""
Remove any unwanted markup from `content` prior to rendering in digest form.
>>> # strip html inside a post
>>> _clean_markup('Hello, <strong>World!</strong>')
u'Hello, World!'
>>> # unbalanced / malformed is OK
>>> _clean_markup('Hello, <strong color="green"/>World!<script type="invalid>')
u'Hello, World!'
"""
return strip_tags(content)
def _trunc(s, length):
"""
Formatting helper.
Truncate the string `s` to no more than `length`, using ellipsis and
without chopping words.
>>> _trunc("one two three", 13)
'one two three'
>>> _trunc("one two three", 12)
'one two...'
"""
s = s.strip()
if len(s) <= length:
# nothing to do
return s
# truncate, taking an extra -3 off the orig string for the ellipsis itself
return s[:length - 3].rsplit(' ', 1)[0].strip() + '...'
def _join_and(values):
"""
Formatting helper.
Join a list of strings, using the comma and "and" properly (assuming
English).
>>> _join_and([])
''
>>> _join_and(['spam'])
'spam'
>>> _join_and(['spam', 'eggs'])
'spam and eggs'
>>> _join_and(['spam', 'eggs', 'beans'])
'spam, eggs, and beans'
>>> _join_and(['spam', 'eggs', 'beans', 'cheese'])
'spam, eggs, beans, and cheese'
"""
if len(values) == 0:
return ''
elif len(values) == 1:
return values[0]
elif len(values) == 2:
return ' and '.join(values)
else:
values[-1] = 'and ' + values[-1]
return ', '.join(values)
def _get_course_title(course_id):
"""
Formatting helper.
Transform an edX course id (e.g. "MITx/6.002x/2012_Fall") into a string
suitable for use as a course title in digest notifications.
>>> _get_course_title("MITx/6.002x/2012_Fall")
'6.002x MITx'
"""
return ' '.join(reversed(course_id.split('/')[:2]))
def _get_course_url(course_id):
"""
Formatting helper.
Generate a click-through url for a given edX course id.
>>> _get_course_url("MITx/6.002x/2012_Fall").replace(
... settings.LMS_URL_BASE, "URL_BASE")
'URL_BASE/courses/MITx/6.002x/2012_Fall/'
"""
return '{}/courses/{}/'.format(settings.LMS_URL_BASE, course_id)
def _get_thread_url(course_id, thread_id, commentable_id):
"""
Formatting helper.
Generate a click-through url for a specific discussion thread in an edX
course.
"""
thread_path = 'discussion/forum/{}/threads/{}'.format(commentable_id, thread_id)
return _get_course_url(course_id) + thread_path
def _get_unsubscribe_url(username):
"""
Formatting helper.
Generate a click-through url to unsubscribe a user from digest notifications,
using an encrypted token based on the username.
"""
token = UsernameCipher.encrypt(username)
return '{}/notification_prefs/unsubscribe/{}/'.format(settings.LMS_URL_BASE, token)
class Digest(object):
def __init__(self, courses):
self.courses = sorted(courses, key=lambda c: c.title.lower())
class DigestCourse(object):
def __init__(self, course_id, threads):
self.title = _get_course_title(course_id)
self.url = _get_course_url(course_id)
self.thread_count = len(threads) # not the same as len(self.threads), see below
self.threads = sorted(threads, reverse=True, key=lambda t: t.dt)[:MAX_COURSE_THREADS]
class DigestThread(object):
def __init__(self, thread_id, course_id, commentable_id, title, items):
self.title = _trunc(_clean_markup(title), THREAD_TITLE_MAXLEN)
self.url = _get_thread_url(course_id, thread_id, commentable_id)
self.items = sorted(items, reverse=True, key=lambda i: i.dt)[:MAX_THREAD_ITEMS]
@property
def dt(self):
return max(item.dt for item in self.items)
class DigestItem(object):
def __init__(self, body, author, dt):
self.body = _trunc(_clean_markup(body), THREAD_ITEM_MAXLEN)
self.author = author
self.dt = dt
@statsd.timed('notifier.digest_render.elapsed')
def render_digest(user, digest, title, description):
"""
Generate HTML and plaintext renderings of digest material, suitable for
emailing.
`user` should be a dictionary with the following keys: "id", "name",
"email" (all values should be nonempty strings).
`digest` should be a Digest object as defined above in this module.
`title` and `description` are brief strings to be displayed at the top
of the email message.
Returns two strings: (text_body, html_body).
"""
logger.info("rendering email message: {user_id: %s}", user['id'])
context = Context({
'user': user,
'digest': digest,
'title': title,
'description': description,
'course_count': len(digest.courses),
'course_names': _join_and([course.title for course in digest.courses]),
'thread_count': sum(course.thread_count for course in digest.courses),
'logo_image_url': "{}/static/images/header-logo.png".format(settings.LMS_URL_BASE),
'unsubscribe_url': _get_unsubscribe_url(user['username'])
})
text = get_template('digest-email.txt').render(context)
html = get_template('digest-email.html').render(context)
return (text, html)
"""
"""
import datetime
import celery
from dateutil.parser import parse as date_parse
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.core.serializers.json import DjangoJSONEncoder
import json
import logging
from optparse import make_option
import pytz
import requests
import sys
from notifier.digest import render_digest, Digest, DigestCourse, DigestThread, DigestItem
from notifier.pull import generate_digest_content
from notifier.tasks import generate_and_send_digests
from notifier.user import get_digest_subscribers, get_user
logger = logging.getLogger(__name__)
class DigestJSONEncoder(DjangoJSONEncoder):
def default(self, o):
if isinstance(o, (Digest, DigestCourse, DigestThread, DigestItem)):
return o.__dict__
else:
return super(DigestJSONEncoder, self).default(o)
class Command(BaseCommand):
"""
"""
option_list = BaseCommand.option_list + (
make_option('--to_datetime',
action='store',
dest='to_datetime',
default=None,
help='datetime as of which to generate digest content, in ISO-8601 format (UTC). Defaults to today at midnight (UTC).'),
make_option('--minutes',
action='store',
dest='minutes',
type='int',
default=1440,
help='number of minutes up to TO_DATETIME for which to generate digest content. Defaults to 1440 (one day).'),
make_option('--users',
action='store',
dest='users_str',
default=None,
help='send digests for the specified users only (regardless of opt-out settings!)'),
make_option('--show-content',
action='store_true',
dest='show_content',
default=None,
help='output the retrieved content only (don\'t send anything)'),
make_option('--show-users',
action='store_true',
dest='show_users',
default=None,
help='output the retrieved users only (don\'t fetch content or send anything)'),
make_option('--show-text',
action='store_true',
dest='show_text',
default=None,
help='output the rendered text body of the first user-digest generated, and exit (don\'t send anything)'),
make_option('--show-html',
action='store_true',
dest='show_html',
default=None,
help='output the rendered html body of the first user-digest generated, and exit (don\'t send anything)'),
)
def get_specific_users(self, user_ids):
# this makes an individual HTTP request for each user -
# it is only intended for use with small numbers of users
# (e.g. for diagnostic purposes).
users = []
for user_id in user_ids:
user = get_user(user_id)
if user:
users.append(user)
return users
def show_users(self, users):
json.dump(list(users), self.stdout)
def show_content(self, users, from_dt, to_dt):
all_content = generate_digest_content(
(u['id'] for u in users), from_dt, to_dt)
# use django's encoder; builtin one doesn't handle datetime objects
json.dump(list(all_content), self.stdout, cls=DigestJSONEncoder)
def show_rendered(self, fmt, users, from_dt, to_dt):
def _fail(msg):
logger.warning('could not show rendered %s: %s', fmt, msg)
try:
user = list(users)[0]
except IndexError, e:
_fail('no users found')
return
try:
user_id, digest = generate_digest_content(
[user['id']], from_dt, to_dt).next()
except StopIteration:
_fail('no digests found')
return
text, html = render_digest(
user, digest, settings.FORUM_DIGEST_EMAIL_TITLE, settings.FORUM_DIGEST_EMAIL_DESCRIPTION)
if fmt == 'text':
print >> self.stdout, text
elif fmt == 'html':
print >> self.stdout, html
def handle(self, *args, **options):
"""
"""
# get user data
if options.get('users_str') is not None:
# explicitly-specified users
user_ids = [v.strip() for v in options['users_str'].split(',')]
users = self.get_specific_users(user_ids)
else:
# get all the users subscribed to notifications
users = get_digest_subscribers() # generator
if options.get('show_users'):
self.show_users(users)
return
# determine time window
if options.get('to_datetime'):
to_datetime = date_parse(options['to_datetime'])
else:
to_datetime = datetime.datetime.utcnow().replace(
hour=0, minute=0, second=0)
from_datetime = to_datetime - \
datetime.timedelta(minutes=options['minutes'])
if options.get('show_content'):
self.show_content(users, from_datetime, to_datetime)
return
if options.get('show_text'):
self.show_rendered('text', users, from_datetime, to_datetime)
return
if options.get('show_html'):
self.show_rendered('html', users, from_datetime, to_datetime)
return
# invoke `tasks.generate_and_send_digests` via celery, in groups of
# 10
def queue_digests(some_users):
generate_and_send_digests.delay(
some_users,
from_datetime,
to_datetime)
user_batch = []
for user in users:
user_batch.append(user)
if len(user_batch) == settings.FORUM_DIGEST_TASK_BATCH_SIZE:
queue_digests(user_batch)
user_batch = []
# get the remainder if any
if user_batch:
queue_digests(user_batch)
"""
django test runner requires a models.py file. there is nothing to see here.
"""
"""
"""
from collections import namedtuple
import datetime
import logging
import sys
from dateutil.parser import parse as date_parse
from django.conf import settings
from dogapi import dog_stats_api
import requests
from notifier.digest import Digest, DigestCourse, DigestThread, DigestItem
logger = logging.getLogger(__name__)
class CommentsServiceException(Exception):
pass
def _http_post(*a, **kw):
try:
logger.debug('POST %s %s', a[0], kw)
response = requests.post(*a, **kw)
except requests.exceptions.ConnectionError, e:
_, msg, tb = sys.exc_info()
raise CommentsServiceException, "comments service request failed: {}".format(msg), tb
if response.status_code == 500:
raise CommentsServiceException, "comments service HTTP Error 500: {}".format(response.reason)
return response
class Parser(object):
@staticmethod
def parse(payload):
return ((user_id, Parser.digest(user_id, user_dict))
for user_id, user_dict in payload.iteritems())
@staticmethod
def digest(user_id, user_dict):
return Digest(
[Parser.course(course_id, course_dict)
for course_id, course_dict in user_dict.iteritems()]
)
@staticmethod
def course(course_id, course_dict):
return DigestCourse(
course_id,
[Parser.thread(thread_id, course_id, thread_content)
for thread_id, thread_content in course_dict.iteritems()]
)
@staticmethod
def thread(thread_id, course_id, thread_dict):
return DigestThread(
thread_id,
course_id,
thread_dict["commentable_id"],
thread_dict["title"],
[Parser.item(item_dict)
for item_dict in thread_dict["content"]]
)
@staticmethod
def item(item_dict):
return DigestItem(
item_dict["body"],
item_dict["username"],
date_parse(item_dict["updated_at"])
)
def generate_digest_content(user_ids, from_dt, to_dt):
"""
Function that calls the edX comments service API and yields a
tuple of (user_id, digest) for each specified user that has >0
discussion updates between the specified points in time.
`user_ids` should be an iterable of edX user ids.
`from_dt` and `to_dt` should be datetime.datetime objects representing
the desired time window.
In each yielded tuple, the `user_id` part will contain one of the values
passed in `user_ids` and the `digest` part will contain a Digest object
(see notifier.digest.Digest for structure details).
The order in which user-digest results will be yielded is undefined, and
if no updates are found for any user_id in the given time period, no
user-digest tuple will be yielded for them (therefore, depending on the
parameters passed, this function may not yield anything).
"""
# set up and execute the API call
api_url = settings.CS_URL_BASE + '/api/v1/notifications'
user_ids = ','.join(map(str, user_ids))
dt_format = '%Y-%m-%d %H:%M:%S%z'
params = {
'api_key': settings.CS_API_KEY,
}
data = {
'user_ids': user_ids,
'from': from_dt.strftime(dt_format),
'to': to_dt.strftime(dt_format)
}
with dog_stats_api.timer('notifier.comments_service.time'):
logger.info('calling comments service to pull digests for %d user(s)', len(user_ids))
res = _http_post(api_url, params=params, data=data).json
return Parser.parse(res)
from datetime import timedelta
import logging
import os
import platform
from celery.schedules import crontab
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3', # Add 'postgresql_psycopg2', 'mysql', 'sqlite3' or 'oracle'.
'NAME': 'notifier.db', # Or path to database file if using sqlite3.
'USER': '', # Not used with sqlite3.
'PASSWORD': '', # Not used with sqlite3.
'HOST': '', # Set to empty string for localhost. Not used with sqlite3.
'PORT': '', # Set to empty string for default. Not used with sqlite3.
}
}
INSTALLED_APPS = (
'kombu.transport.django',
'django_ses',
'djcelery',
'notifier',
)
SERVICE_NAME = 'notifier'
# django coverage
TEST_RUNNER = 'django_coverage.coverage_runner.CoverageRunner'
# Misc. Notifier Formatting
# this will be joined with '@'+EMAILER_DOMAIN as the msg sender
FORUM_DIGEST_EMAIL_SENDER = 'notifications'
FORUM_DIGEST_EMAIL_SUBJECT = 'edX.org Daily Discussion Digest'
FORUM_DIGEST_EMAIL_TITLE = 'Discussion Digest'
FORUM_DIGEST_EMAIL_DESCRIPTION = 'A digest of unread content from edX Course Discussions you are following.'
# Environment-specific settings
# Application Environment
NOTIFIER_ENV = os.getenv('NOTIFIER_ENV', 'Development')
# email backend settings
EMAIL_BACKEND = {
'console': 'django.core.mail.backends.console.EmailBackend',
'ses': 'django_ses.SESBackend',
'smtp': 'django.core.mail.backends.smtp.EmailBackend'
}[os.getenv('EMAIL_BACKEND', 'console')]
# The ideal setting for this is 1 / number_of_celery_workers * headroom,
# where headroom is a multiplier to underrun the send rate limit (e.g.
# 0.9 to keep 10% behind the per-second rate limit at any given moment).
AWS_SES_AUTO_THROTTLE = 0.9
EMAIL_HOST = os.getenv('EMAIL_HOST', 'localhost')
EMAIL_PORT = os.getenv('EMAIL_PORT', 1025)
EMAIL_HOST_USER = os.getenv('EMAIL_HOST_USER')
EMAIL_HOST_PASSWORD = os.getenv('EMAIL_HOST_PASSWORD')
# email settings independent of backend
EMAIL_DOMAIN = os.getenv('EMAIL_DOMAIN', 'notifications.edx.org')
EMAIL_REWRITE_RECIPIENT = os.getenv('EMAIL_REWRITE_RECIPIENT')
# secret key for generating unsub tokens
# this MUST be changed in production envs, and MUST match the LMS' secret key
SECRET_KEY = os.getenv('SECRET_KEY', '85920908f28904ed733fe576320db18cabd7b6cd')
# LMS links, images, etc
LMS_URL_BASE = os.getenv('LMS_URL_BASE', 'http://localhost:8000')
# Comments Service Endpoint, for digest pulls
CS_URL_BASE = os.getenv('CS_URL_BASE', 'http://localhost:4567')
CS_API_KEY = os.getenv('CS_API_KEY', 'PUT_YOUR_API_KEY_HERE')
# User Service Endpoint, for notification prefs
US_URL_BASE = os.getenv('US_URL_BASE', 'http://localhost:8000')
US_API_KEY = os.getenv('US_API_KEY', 'PUT_YOUR_API_KEY_HERE')
US_HTTP_AUTH_USER = os.getenv('US_HTTP_AUTH_USER', '')
US_HTTP_AUTH_PASS = os.getenv('US_HTTP_AUTH_PASS', '')
US_RESULT_PAGE_SIZE = 10
# Logging
LOG_FILE = os.getenv('LOG_FILE')
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
# datadog
DATADOG_API_KEY = os.getenv('DATADOG_API_KEY')
# celery
import djcelery
djcelery.setup_loader()
BROKER_URL = os.getenv('BROKER_URL', 'django://')
# limit the frequency with which the forum digest task may run.
FORUM_DIGEST_TASK_RATE_LIMIT = "6/m" # no more than every 10 seconds
# limit the size of user batches (cs service pulls / emails sent) per-task
FORUM_DIGEST_TASK_BATCH_SIZE = 50
# limit the number of times an individual task will be retried
FORUM_DIGEST_TASK_MAX_RETRIES = 2
# limit the minimum delay between retries of an individual task (in seconds)
FORUM_DIGEST_TASK_RETRY_DELAY = 300
# set the interval (in minutes) at which the top-level digest task is triggered
FORUM_DIGEST_TASK_INTERVAL = int(os.getenv('FORUM_DIGEST_TASK_INTERVAL', 1440))
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'filters': {
},
'formatters': {
'default': {
'format': '%(asctime)s [%(levelname)s] [service_name={}] [%(module)s] %(message)s'.format(SERVICE_NAME)
},
'rsyslog': {
'format': ("[service_variant={service_variant}]"
"[%(name)s][env:{logging_env}] %(levelname)s "
"[{hostname} %(process)d] [%(filename)s:%(lineno)d] "
"- %(message)s").format(
service_variant=SERVICE_NAME,
logging_env=NOTIFIER_ENV.lower(),
hostname=platform.node().split(".")[0])
}
},
'handlers': {
'default': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'default'
},
},
'loggers': {
'': {
'handlers': ['default'],
'level': LOG_LEVEL.upper(),
'propagate': True
},
}
}
CELERYD_HIJACK_ROOT_LOGGER=False
RSYSLOG_ENABLED = os.getenv('RSYSLOG_ENABLED', '')
if RSYSLOG_ENABLED:
LOGGING['handlers'].update({
'rsyslog': {
'level': 'INFO',
'class': 'logging.handlers.SysLogHandler',
'address': '/dev/log',
'formatter': 'rsyslog',
'facility': logging.handlers.SysLogHandler.LOG_LOCAL0,
}
})
LOGGING['loggers']['']['handlers'].append('rsyslog')
if LOG_FILE:
LOGGING['handlers'].update({
'file': {
'level': 'DEBUG',
'class': 'logging.FileHandler',
'formatter': 'default',
'filename': LOG_FILE
},
})
LOGGING['loggers']['']['handlers'].append('file')
CELERY_TIMEZONE = 'UTC'
if FORUM_DIGEST_TASK_INTERVAL==1440:
# in the production case, make the 24 hour cycle happen at a
# predetermined time of day (midnight UTC).
digest_schedule = crontab(minute=0, hour=0)
else:
# in special (testing) cases, just let celerybeat manage a timedelta.
digest_schedule = timedelta(minutes=FORUM_DIGEST_TASK_INTERVAL)
CELERYBEAT_SCHEDULE = {
'send_digests': {
'task': 'notifier.tasks.do_forums_digests',
'schedule': digest_schedule,
},
}
DAILY_TASK_MAX_RETRIES = 2
DAILY_TASK_RETRY_DELAY = 60
# Celery / RabbitMQ fine-tuning
# Don't use a connection pool, since connections are dropped by ELB.
BROKER_POOL_LIMIT = 0
BROKER_CONNECTION_TIMEOUT = 1
# When the broker is behind an ELB, use a heartbeat to refresh the
# connection and to detect if it has been dropped.
BROKER_HEARTBEAT = 10.0
BROKER_HEARTBEAT_CHECKRATE = 2
# Each worker should only fetch one message at a time
CELERYD_PREFETCH_MULTIPLIER = 1
"""
Celery tasks for generating and sending digest emails.
"""
from contextlib import closing
from datetime import datetime, timedelta
import logging
from boto.ses.exceptions import SESMaxSendingRateExceededError
import celery
from django.conf import settings
from django.core.mail import EmailMultiAlternatives
from notifier.connection_wrapper import get_connection
from notifier.digest import render_digest
from notifier.pull import generate_digest_content
from notifier.user import get_digest_subscribers, UserServiceException
logger = logging.getLogger(__name__)
@celery.task(rate_limit=settings.FORUM_DIGEST_TASK_RATE_LIMIT, max_retries=settings.FORUM_DIGEST_TASK_MAX_RETRIES)
def generate_and_send_digests(users, from_dt, to_dt):
"""
This task generates and sends forum digest emails to multiple users in a
single background operation.
`users` is an iterable of dictionaries, as returned by the edx user_api
(required keys are "id", "name", "username", and "email").
`from_dt` and `to_dt` are datetime objects representing the start and end
of the time window for which to generate a digest.
"""
users_by_id = dict((str(u['id']), u) for u in users)
with closing(get_connection()) as cx:
msgs = []
for user_id, digest in generate_digest_content(users_by_id.keys(), from_dt, to_dt):
user = users_by_id[user_id]
# format the digest
text, html = render_digest(
user, digest, settings.FORUM_DIGEST_EMAIL_TITLE, settings.FORUM_DIGEST_EMAIL_DESCRIPTION)
# send the message through our mailer
msg = EmailMultiAlternatives(
settings.FORUM_DIGEST_EMAIL_SUBJECT,
text,
'@'.join(
[settings.FORUM_DIGEST_EMAIL_SENDER,
settings.EMAIL_DOMAIN]),
[user['email']]
)
msg.attach_alternative(html, "text/html")
msgs.append(msg)
if not msgs:
return
try:
cx.send_messages(msgs)
except SESMaxSendingRateExceededError as e:
# we've tripped the per-second send rate limit. we generally
# rely on the django_ses auto throttle to prevent this,
# but in case we creep over, we can re-queue and re-try this task
# - if and only if none of the messages in our batch were
# sent yet.
# this implementation is also non-ideal in that the data will be
# fetched from the comments service again in the event of a retry.
if not any((getattr(msg, 'extra_headers', {}).get('status') == 200 for msg in msgs)):
raise generate_and_send_digests.retry(exc=e)
else:
# raise right away, since we don't support partial retry
raise
def _time_slice(minutes, now=None):
"""
Returns the most recently-elapsed time slice of the specified length (in
minutes), as of the specified datetime (defaults to utcnow).
`minutes` must be greater than one, less than or equal to 1440, and a factor
of 1440 (so that no time slice spans across multiple days).
>>> _time_slice(1, datetime(2013, 1, 1, 0, 0))
(datetime.datetime(2012, 12, 31, 23, 59), datetime.datetime(2013, 1, 1, 0, 0))
>>> _time_slice(1, datetime(2013, 1, 1, 0, 1))
(datetime.datetime(2013, 1, 1, 0, 0), datetime.datetime(2013, 1, 1, 0, 1))
>>> _time_slice(1, datetime(2013, 1, 1, 1, 1))
(datetime.datetime(2013, 1, 1, 1, 0), datetime.datetime(2013, 1, 1, 1, 1))
>>> _time_slice(15, datetime(2013, 1, 1, 0))
(datetime.datetime(2012, 12, 31, 23, 45), datetime.datetime(2013, 1, 1, 0, 0))
>>> _time_slice(15, datetime(2013, 1, 1, 0, 14))
(datetime.datetime(2012, 12, 31, 23, 45), datetime.datetime(2013, 1, 1, 0, 0))
>>> _time_slice(15, datetime(2013, 1, 1, 0, 14, 59))
(datetime.datetime(2012, 12, 31, 23, 45), datetime.datetime(2013, 1, 1, 0, 0))
>>> _time_slice(15, datetime(2013, 1, 1, 0, 15, 0))
(datetime.datetime(2013, 1, 1, 0, 0), datetime.datetime(2013, 1, 1, 0, 15))
>>> _time_slice(1440, datetime(2013, 1, 1))
(datetime.datetime(2012, 12, 31, 0, 0), datetime.datetime(2013, 1, 1, 0, 0))
>>> _time_slice(1440, datetime(2013, 1, 1, 23, 59))
(datetime.datetime(2012, 12, 31, 0, 0), datetime.datetime(2013, 1, 1, 0, 0))
>>> e = None
>>> try:
... _time_slice(14, datetime(2013, 1, 2, 0, 0))
... except AssertionError, e:
... pass
...
>>> e is not None
True
"""
assert minutes > 0
assert minutes <= 1440
assert 1440 % minutes == 0
now = now or datetime.utcnow()
midnight = now.replace(hour=0, minute=0, second=0, microsecond=0)
minutes_since_midnight = (now - midnight).seconds / 60
dt_end = midnight + timedelta(minutes=(minutes_since_midnight / minutes) * minutes)
dt_start = dt_end - timedelta(minutes=minutes)
return (dt_start, dt_end)
@celery.task(max_retries=settings.DAILY_TASK_MAX_RETRIES, default_retry_delay=settings.DAILY_TASK_RETRY_DELAY)
def do_forums_digests():
def batch_digest_subscribers():
batch = []
for v in get_digest_subscribers():
batch.append(v)
if len(batch)==settings.FORUM_DIGEST_TASK_BATCH_SIZE:
yield batch
batch = []
if batch:
yield batch
from_dt, to_dt = _time_slice(settings.FORUM_DIGEST_TASK_INTERVAL)
logger.info("Beginning forums digest task: from_dt=%s to_dt=%s", from_dt, to_dt)
try:
for user_batch in batch_digest_subscribers():
generate_and_send_digests.delay(user_batch, from_dt, to_dt)
except UserServiceException, e:
raise do_forums_digests.retry(exc=e)
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
"http://www.w3.org/TR/html4/loose.dtd">
<html lang="en">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta name="viewport" content="initial-scale=1.0"> <!-- So that mobile webkit will display zoomed in -->
<meta name="format-detection" content="telephone=no"> <!-- disable auto telephone linking in iOS -->
<title>{{ title|default:"edX Discussion Digest" }}</title>
<style type="text/css">
.ReadMsgBody { width: 100%; background-color: #ebebeb;}
.ExternalClass {width: 100%; background-color: #ebebeb;}
.ExternalClass, .ExternalClass p, .ExternalClass span, .ExternalClass font, .ExternalClass td, .ExternalClass div {line-height:100%;}
body {-webkit-text-size-adjust:none; -ms-text-size-adjust:none;}
body {margin:0; padding:0;}
table {border-spacing:0;}
table td {border-collapse:collapse;}
.yshortcuts a {border-bottom: none !important;}
/* Constrain email width for small screens */
@media screen and (max-width: 600px) {
table[class="container"] {
width: 95% !important;
}
}
/* Give content more room on mobile */
@media screen and (max-width: 480px) {
td[class="container-padding"] {
padding-left: 12px !important;
padding-right: 12px !important;
}
}
</style>
</head>
<body style="margin:0; padding:10px 0;" bgcolor="#ebebeb" leftmargin="0" topmargin="0" marginwidth="0" marginheight="0">
<br>
<!-- 100% wrapper (grey background) -->
<table border="0" width="100%" height="100%" cellpadding="0" cellspacing="0" bgcolor="#ebebeb">
<tr>
<td align="center" valign="top" bgcolor="#ebebeb" style="background-color: #ebebeb">
<!-- 600px container (white background) -->
<table border="0" width="600" cellpadding="0" cellspacing="0" class="container" bgcolor="#ffffff" style="border-radius: 5px;">
<tr>
<td align="left" class="container-padding" bgcolor="#ffffff" style="background-color: #ffffff; border-radius: 5px; box-shadow: 0 1px 2px rgba(0,0,0,0.2); padding-left: 30px; padding-right: 30px; font-size: 14px; line-height: 20px; font-family: Open Sans, Helvetica, sans-serif; color: #333333;">
<br>
<!-- ### BEGIN CONTENT ### -->
<table class="email-header" cellpadding="0" cellspacing="0" border="0">
<tbody>
<tr>
<td class="edx-logo" valign="middle">
<!-- TODO: is this the proper logo image? -->
<img src="{{ logo_image_url }}" alt="edX Logo" border="0" hspace="0" vspace="0" style="vertical-align:top;" class="logo">
</td>
<td valign="top" style="padding-left: 20px">
<p style="margin: 0; color: #5597DD">{{ title }}</p>
<p style="margin: 0; font-size: 12px; color: #777777">{{ description }}</p>
</td>
</tr>
</tbody>
</table>
<br>
Hi {{user.name}},
<br><br>
You have {{thread_count}} discussion thread{% if thread_count > 1 %}s{% endif %} with updates {% if course_count > 1 %}across{% else %}in{% endif %} {{course_names}}. The most recent highlights are shown below. As a reminder, you can turn off all discussion digests from any course's Discussion Home page.
<br><br>
{% for course in digest.courses %}
<table class="course-table" cellpadding="0" cellspacing="0" border="0" style="width:100%; margin-bottom: 30px;">
<tbody>
<tr>
<td class="course-name" valign="middle">
<div style="font-size: 16px; line-height: 24px;">
{{ course.title }}
</div>
</td>
</tr>
{% for thread in course.threads %}
<tr>
<td>
<table class="course-thread" cellpadding="0" cellspacing="0" border="0" style="width: 100%;">
<tbody>
<tr>
<td class="course-thread-title" valign="middle">
<a href="{{ thread.url }}" class="course-thread-link" style="display: block; border-bottom: 1px solid #cccccc; padding-bottom: 4px; margin-top: 15px; font-size: 16px; font-weight:bold; text-decoration: none; color: #5597DD">
<span>{{ thread.title|escape }}</span>
</a>
</td>
</tr>
{% for item in thread.items %}
<tr>
<td>
<br>
<div class="update-metadata"><span style="font-size: 12px; font-weight:bold; color:#aaaaaa">{{ item.author }}: </span><span style="font-style:italic; font-size: 12px; color: #aaaaaa;">on {{ item.dt }} UTC</span></div>
<div class="update-content">{{ item.body|escape }} </div>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endfor %}
<br><br>
<div class="unsubscribe-tools" style="border-radius:3px; padding:10px; background-color:#eeeeee; font-size:12px;">
If you would like to stop receiving these updates, you can turn off all Course Discussion digests from any course's Discussion Home page. You can also <a href="{{ unsubscribe_url }}" class="unsubscribe-link">quickly turn off these notifications from this email.</a>
</div>
<div class="physical-address" style="font-size: 10px; margin-top: 10px; text-align:center; color: #777777;"> edX, 11 Cambridge Center, Cambridge, MA 02142
</div>
<br>
<!-- ### END CONTENT ### -->
</td>
</tr>
</table>
<!--/600px container -->
</td>
</tr>
</table>
<!--/100% wrapper-->
<br>
<br>
</body>
</html>
* {{ title }} *
{{ description }}
---
Hi {{ user.name }},
You have {{thread_count}} discussion thread{% if thread_count > 1 %}s{% endif %} with updates {% if course_count > 1 %}across{% else %}in{% endif %} {{course_names}}. The most recent highlights are shown below. As a reminder, you can turn off all discussion digests from any course's Discussion Home page.
{% for course in digest.courses %}
[{{ course.title }}]
{% for thread in course.threads %}
{{ thread.title }}
---
{% for item in thread.items %}
{{ item.body }}
- {{ item.author }} on {{ item.dt }} UTC
{% endfor %}
{% endfor %}
{% endfor %}
To unsubscribe from this list, go to: {{ unsubscribe_url }}
To update your subscription preferences, go to: {{ preferences_url }}
"""
Explicitly load all the test modules so django's test runner finds them.
"""
import unittest
import doctest
# imports to pick up unit tests
from notifier.tests import test_pull
from notifier.tests import test_tasks
from notifier.tests import test_user
from notifier.tests import test_commands
# imports to pick up module doctests
from notifier import digest
from notifier import tasks
def add_unit_tests(suite, module):
suite.addTest(unittest.TestLoader().loadTestsFromModule(module))
def add_doc_tests(suite, module):
suite.addTest(doctest.DocTestSuite(module))
def suite():
suite = unittest.TestSuite()
# digest
add_doc_tests(suite, digest)
# pull
add_unit_tests(suite, test_pull)
# tasks
add_unit_tests(suite, test_tasks)
add_doc_tests(suite, tasks)
# user
add_unit_tests(suite, test_user)
# commands
add_unit_tests(suite, test_commands)
return suite
This source diff could not be displayed because it is too large. You can view the blob instead.
"""
"""
import datetime
import json
from os.path import dirname, join
from django.conf import settings
from django.test import TestCase
from django.test.utils import override_settings
from mock import patch, Mock
from notifier.management.commands import forums_digest
class CommandsTestCase(TestCase):
"""
"""
@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CELERY_ALWAYS_EAGER=True,
BROKER_BACKEND='memory',)
def test_forums_digest(self):
pass
"""
"""
import datetime
import random
from dateutil.parser import parse as date_parse
from django.test import TestCase
from django.test.utils import override_settings
from mock import MagicMock, Mock, patch
from notifier.digest import _trunc, THREAD_ITEM_MAXLEN, \
_get_thread_url, _get_course_title, _get_course_url
from notifier.pull import Parser, generate_digest_content
class ParserTestCase(TestCase):
"""
"""
@staticmethod
def _item(v):
return {
"body": "body: %s" % v,
"username": "user_%s" % v,
"updated_at": (datetime.datetime(2013, 1, 1) + \
datetime.timedelta(
days=random.randint(0,365),
seconds=random.randint(0,86399)
)
).isoformat()
}
@staticmethod
def _thread(v, items=[]):
return {
"title": "title: %s" % v,
"commentable_id": "commentable_id: %s" % v,
"content": items
}
@staticmethod
def _course(threads=[]):
return dict(('id-%s' % random.random(), thread) for thread in threads)
@staticmethod
def _digest(courses=[]):
return dict(('id-%s' % random.random(), course) for course in courses)
@staticmethod
def _payload(digests=[]):
return dict(('id-%s' % random.random(), digest) for digest in digests)
def _check_item(self, raw_item, parsed_item):
self.assertEqual(parsed_item.body, _trunc(raw_item["body"], THREAD_ITEM_MAXLEN))
self.assertEqual(parsed_item.author, raw_item["username"])
self.assertEqual(parsed_item.dt, date_parse(raw_item["updated_at"]))
def _find_raw_item(self, parsed_item, raw_items):
for raw_item in raw_items:
try:
self._check_item(raw_item, parsed_item)
return raw_item
except AssertionError:
pass
def _check_thread(self, thread_id, course_id, raw_thread, parsed_thread):
self.assertEqual(parsed_thread.title, raw_thread["title"])
self.assertEqual(parsed_thread.url, _get_thread_url(course_id, thread_id, raw_thread["commentable_id"]))
# each parsed item is a correct parsing of some raw item
for parsed_item in parsed_thread.items:
self.assertIsNotNone(self._find_raw_item(parsed_item, raw_thread["content"]))
# parsed items occur in reverse order by datetime
dts = [parsed_item.dt for parsed_item in parsed_thread.items]
self.assertEqual(dts, sorted(dts, reverse=True))
def _find_raw_thread(self, parsed_thread, course_id, raw_threads):
for thread_id, raw_thread in raw_threads.iteritems():
try:
self._check_thread(thread_id, course_id, raw_thread, parsed_thread)
return raw_thread
except AssertionError:
pass
def _check_course(self, course_id, raw_course, parsed_course):
self.assertEqual(parsed_course.title, _get_course_title(course_id))
self.assertEqual(parsed_course.url, _get_course_url(course_id))
# each parsed thread is a correct parsing of some raw thread
for parsed_thread in parsed_course.threads:
self.assertIsNotNone(self._find_raw_thread(parsed_thread, course_id, raw_course))
# parsed threads occur in reverse order by datetime
dts = [parsed_thread.dt for parsed_thread in parsed_course.threads]
self.assertEqual(dts, sorted(dts, reverse=True))
def _find_raw_course(self, parsed_course, raw_courses):
for course_id, raw_course in raw_courses.iteritems():
try:
self._check_course(course_id, raw_course, parsed_course)
return raw_course
except AssertionError:
pass
def _check_digest(self, user_id, raw_digest, parsed_digest):
# each parsed course is a correct parsing of some raw course
for parsed_course in parsed_digest.courses:
self.assertIsNotNone(self._find_raw_course(parsed_course, raw_digest))
# parsed courses occur sorted by title, case-insensitively
lower_titles = [parsed_course.title.lower() for parsed_course in parsed_digest.courses]
self.assertEqual(lower_titles, sorted(lower_titles))
def _find_raw_digest(self, parsed_digest, raw_payload):
for user_id, raw_digest in raw_payload.iteritems():
try:
self._check_digest(user_id, raw_digest, parsed_digest)
return raw_digest
except AssertionError:
pass
def test_item_simple(self):
i = self._item("a")
self._check_item(i, Parser.item(i))
def test_thread_simple(self):
t = self._thread("t", [self._item("a"), self._item("b"), self._item("c")])
self._check_thread("some_thread_id", "some_course_id", t,
Parser.thread('some_thread_id', 'some_course_id', t))
def test_course_simple(self):
c = self._course([
self._thread("t0", [self._item("a"), self._item("b"), self._item("c")]),
self._thread("t1", [self._item("d"), self._item("e"), self._item("f")]),
self._thread("t2", [self._item("g"), self._item("h"), self._item("i")]),
])
self._check_course("some_course_id", c, Parser.course("some_course_id", c))
def test_digest_simple(self):
d = self._digest([
self._course([
self._thread("t00", [self._item("a"), self._item("b"), self._item("c")]),
self._thread("t01", [self._item("d"), self._item("e"), self._item("f")]),
self._thread("t02", [self._item("g"), self._item("h"), self._item("i")]),
]),
self._course([
self._thread("t10", [self._item("j"), self._item("k"), self._item("l")]),
self._thread("t11", [self._item("m"), self._item("n"), self._item("o")]),
self._thread("t12", [self._item("p"), self._item("q"), self._item("r")]),
]),
])
self._check_digest("some_user_id", d, Parser.digest("some_user_id", d))
def test_parse(self):
p = self._payload([
self._digest([
self._course([
self._thread("t00", [self._item("a"), self._item("b"), self._item("c")]),
self._thread("t01", [self._item("d"), self._item("e"), self._item("f")]),
self._thread("t02", [self._item("g"), self._item("h"), self._item("i")]),
]),
self._course([
self._thread("t10", [self._item("j"), self._item("k"), self._item("l")]),
self._thread("t11", [self._item("m"), self._item("n"), self._item("o")]),
self._thread("t12", [self._item("p"), self._item("q"), self._item("r")]),
]),
]),
self._digest([
self._course([
self._thread("t20", [self._item("A"), self._item("B"), self._item("C")]),
self._thread("t21", [self._item("D"), self._item("E"), self._item("F")]),
self._thread("t22", [self._item("G"), self._item("H"), self._item("I")]),
]),
self._course([
self._thread("t30", [self._item("J"), self._item("K"), self._item("L")]),
self._thread("t31", [self._item("M"), self._item("N"), self._item("O")]),
self._thread("t32", [self._item("P"), self._item("Q"), self._item("R")]),
]),
]),
])
digest_count = 0
for user_id, parsed_digest in Parser.parse(p):
#self._check_user(user_id, u, Parser.user(user_id, u))
self.assertIsNotNone(self._find_raw_digest(parsed_digest, p))
digest_count += 1
self.assertEqual(digest_count, len(p))
class PullTestCase(TestCase):
"""
"""
@override_settings(
CS_URL_BASE='*test_cs_url*', CS_API_KEY='*test_cs_key*'
)
def test_generate_digest_content(self):
"""
"""
# empty result
from_dt = datetime.datetime(2013, 1, 1)
to_dt = datetime.datetime(2013, 1, 2)
with patch('requests.post', return_value=Mock(json={})) as p:
g = generate_digest_content(["a", "b", "c"], from_dt, to_dt)
expected_api_url = '*test_cs_url*/api/v1/notifications'
expected_params = {
'api_key': '*test_cs_key*',
}
expected_post_data = {
'user_ids': 'a,b,c',
'from': '2013-01-01 00:00:00', # TODO tz offset
'to': '2013-01-02 00:00:00'
}
p.assert_called_once_with(expected_api_url, params=expected_params, data=expected_post_data)
self.assertRaises(StopIteration, g.next)
# single result
# multiple result
"""
"""
from contextlib import nested
import datetime
import json
from os.path import dirname, join
import sys
import traceback
from boto.ses.exceptions import SESMaxSendingRateExceededError
from django.conf import settings
from django.core import mail as djmail
from django.test import TestCase
from django.test.utils import override_settings
from mock import patch, Mock
from notifier.tasks import generate_and_send_digests, do_forums_digests
from notifier.pull import Parser
from notifier.user import UserServiceException
# fixture data helper
usern = lambda n: {
'name': 'user%d' % n,
'id': n,
'email': 'user%d@dummy.edu' %n,
'username': 'user%d' % n}
@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CELERY_ALWAYS_EAGER=True,
EMAIL_BACKEND='django.core.mail.backends.locmem.EmailBackend',
BROKER_BACKEND='memory',)
class TasksTestCase(TestCase):
"""
"""
def _check_message(self, user, digest, message):
actual_text = message.body
actual_html, mime_type = message.alternatives[0]
self.assertEqual(mime_type, 'text/html')
self.assertEqual(message.from_email, '@'.join(
[settings.FORUM_DIGEST_EMAIL_SENDER, settings.EMAIL_DOMAIN]))
self.assertEqual(message.to, [user['email']])
self.assertEqual(message.subject, settings.FORUM_DIGEST_EMAIL_SUBJECT)
self.assertTrue(user['name'] in actual_text)
self.assertTrue(settings.FORUM_DIGEST_EMAIL_TITLE in actual_text)
self.assertTrue(settings.FORUM_DIGEST_EMAIL_DESCRIPTION in actual_text)
self.assertTrue(user['name'] in actual_html)
self.assertTrue(settings.FORUM_DIGEST_EMAIL_TITLE in actual_html)
self.assertTrue(settings.FORUM_DIGEST_EMAIL_DESCRIPTION in actual_html)
for course in digest.courses:
self.assertTrue(course.title in actual_text)
self.assertTrue(course.title in actual_html)
for thread in course.threads:
self.assertTrue(thread.title in actual_text)
self.assertTrue(thread.title in actual_html)
for item in thread.items:
self.assertTrue(item.body in actual_text)
self.assertTrue(item.body in actual_html)
def test_generate_and_send_digests(self):
"""
"""
data = json.load(
open(join(dirname(__file__), 'cs_notifications.result.json')))
user_id, digest = Parser.parse(data).next()
user = usern(10)
with patch('notifier.tasks.generate_digest_content', return_value=[(user_id, digest)]) as p:
# execute task
task_result = generate_and_send_digests.delay(
[user],
datetime.datetime.now(),
datetime.datetime.now())
self.assertTrue(task_result.successful())
# message was sent
self.assertTrue(hasattr(djmail, 'outbox'))
self.assertEqual(1, len(djmail.outbox))
# message has expected to, from, subj, and content
self._check_message(user, digest, djmail.outbox[0])
@override_settings(EMAIL_REWRITE_RECIPIENT='rewritten-address@domain.org')
def test_generate_and_send_digests_rewrite_recipient(self):
"""
"""
data = json.load(
open(join(dirname(__file__), 'cs_notifications.result.json')))
with patch('notifier.tasks.generate_digest_content', return_value=Parser.parse(data)) as p:
# execute task
task_result = generate_and_send_digests.delay(
(usern(n) for n in xrange(2, 11)), datetime.datetime.now(), datetime.datetime.now())
self.assertTrue(task_result.successful())
# all messages were sent
self.assertTrue(hasattr(djmail, 'outbox'))
self.assertEqual(9, len(djmail.outbox))
# all messages' email addresses were rewritten
for message in djmail.outbox:
self.assertEqual(message.to, ['rewritten-address@domain.org'])
def test_generate_and_send_digests_retry_limit(self):
"""
"""
data = json.load(
open(join(dirname(__file__), 'cs_notifications.result.json')))
with patch('notifier.tasks.generate_digest_content', return_value=list(Parser.parse(data))) as p:
# setting this here because override_settings doesn't seem to
# work on celery task configuration decorators
expected_num_tries = 1 + settings.FORUM_DIGEST_TASK_MAX_RETRIES
mock_backend = Mock(name='mock_backend', send_messages=Mock(
side_effect=SESMaxSendingRateExceededError(400, 'Throttling')))
with patch('notifier.connection_wrapper.dj_get_connection', return_value=mock_backend) as p2:
# execute task - should fail, retry twice and still fail, then
# give up
try:
task_result = generate_and_send_digests.delay(
[usern(n) for n in xrange(2, 11)], datetime.datetime.now(), datetime.datetime.now())
except SESMaxSendingRateExceededError as e:
self.assertEqual(
mock_backend.send_messages.call_count,
expected_num_tries)
else:
# should have raised
self.fail('task did not retry twice before giving up')
@override_settings(FORUM_DIGEST_TASK_BATCH_SIZE=10)
def test_do_forums_digests(self):
# patch _time_slice
# patch get_digest_subscribers
dt1 = datetime.datetime.utcnow()
dt2 = dt1 + datetime.timedelta(days=1)
with nested(
patch('notifier.tasks.get_digest_subscribers', return_value=(usern(n) for n in xrange(11))),
patch('notifier.tasks.generate_and_send_digests'),
patch('notifier.tasks._time_slice', return_value=(dt1, dt2))
) as (p, t, ts):
task_result = do_forums_digests.delay()
self.assertTrue(task_result.successful())
self.assertEqual(t.delay.call_count, 2)
t.delay.assert_called_with([usern(10)], dt1, dt2)
@override_settings(FORUM_DIGEST_TASK_BATCH_SIZE=10)
def test_do_forums_digests_user_api_unavailable(self):
# patch _time_slice
# patch get_digest_subscribers
dt1 = datetime.datetime.utcnow()
dt2 = dt1 + datetime.timedelta(days=1)
with nested(
patch('notifier.tasks.get_digest_subscribers', side_effect=UserServiceException("could not connect!")),
patch('notifier.tasks.generate_and_send_digests'),
) as (p, t):
try:
task_result = do_forums_digests.delay()
except UserServiceException as e:
self.assertEqual(p.call_count, settings.DAILY_TASK_MAX_RETRIES + 1)
self.assertEqual(t.call_count, 0)
else:
# should have raised
self.fail("task did not give up after exactly 3 attempts")
"""
"""
from django.conf import settings
from django.test import TestCase
from django.test.utils import override_settings
from mock import MagicMock, Mock, patch
from notifier.user import get_digest_subscribers, DIGEST_NOTIFICATION_PREFERENCE_KEY
TEST_API_KEY = 'ZXY123!@#$%'
# some shorthand to quickly generate fixture results
mkuser = lambda n: {
"id": n,
"email": "email%d" % n,
"name": "name%d" % n,
"url": "url%d" % n,
"username": "user%d" % n
}
mkresult = lambda n: {
"url": "url%d" % n,
"user": mkuser(n),
"key": DIGEST_NOTIFICATION_PREFERENCE_KEY,
"value": "value%d" % n
}
mkexpected = lambda d: {
"id": d["user"]["id"],
"name": d["user"]["name"],
"email": d["user"]["email"],
"username": d["user"]["username"]
}
@override_settings(US_API_KEY=TEST_API_KEY)
class UserTestCase(TestCase):
"""
"""
def setUp(self):
self.expected_api_url = "test_server_url/user_api/v1/user_prefs/"
self.expected_params = {"key":DIGEST_NOTIFICATION_PREFERENCE_KEY,
"page_size":3,
"page":1}
self.expected_headers = {'X-EDX-API-Key': TEST_API_KEY}
@override_settings(US_URL_BASE="test_server_url", US_RESULT_PAGE_SIZE=3)
def test_get_digest_subscribers_empty(self):
"""
"""
# empty result
expected_empty = {
"count": 0,
"next": None,
"previous": None,
"results": []
}
with patch('requests.get', return_value=Mock(json=expected_empty)) as p:
res = list(get_digest_subscribers())
p.assert_called_once_with(
self.expected_api_url,
params=self.expected_params,
headers=self.expected_headers)
self.assertEqual(0, len(res))
@override_settings(US_URL_BASE="test_server_url", US_RESULT_PAGE_SIZE=3)
def test_get_digest_subscribers_single_page(self):
"""
"""
# single page result
expected_single = {
"count": 3,
"next": None,
"previous": None,
"results": [mkresult(1), mkresult(2), mkresult(3)]
}
with patch('requests.get', return_value=Mock(json=expected_single)) as p:
res = list(get_digest_subscribers())
p.assert_called_once_with(
self.expected_api_url,
params=self.expected_params,
headers=self.expected_headers)
self.assertEqual([
mkexpected(mkresult(1)),
mkexpected(mkresult(2)),
mkexpected(mkresult(3))], res)
@override_settings(US_URL_BASE="test_server_url", US_RESULT_PAGE_SIZE=3)
def test_get_digest_subscribers_multi_page(self):
"""
"""
# multi page result
expected_multi_p1 = {
"count": 5,
"next": "not none",
"previous": None,
"results": [mkresult(1), mkresult(2), mkresult(3)]
}
expected_multi_p2 = {
"count": 5,
"next": None,
"previous": "not none",
"results": [mkresult(4), mkresult(5)]
}
expected_pages = [expected_multi_p1, expected_multi_p2]
def side_effect(*a, **kw):
return expected_pages.pop(0)
m = Mock()
with patch('requests.get', return_value=m) as p:
res = []
m.json = expected_multi_p1
g = get_digest_subscribers()
res.append(g.next())
p.assert_called_once_with(
self.expected_api_url,
params=self.expected_params,
headers=self.expected_headers)
res.append(g.next())
res.append(g.next()) # result 3, end of page
self.assertEqual([
mkexpected(mkresult(1)),
mkexpected(mkresult(2)),
mkexpected(mkresult(3))], res)
# still should only have called requests.get() once
self.assertEqual(1, p.call_count)
p.reset_mock() # reset call count
self.expected_params['page']=2
m.json = expected_multi_p2
self.assertEqual(mkexpected(mkresult(4)), g.next())
p.assert_called_once_with(
self.expected_api_url,
params=self.expected_params,
headers=self.expected_headers)
self.assertEqual(mkexpected(mkresult(5)), g.next())
self.assertEqual(1, p.call_count)
self.assertRaises(StopIteration, g.next)
@override_settings(US_URL_BASE="test_server_url", US_RESULT_PAGE_SIZE=3, US_HTTP_AUTH_USER='someuser', US_HTTP_AUTH_PASS='somepass')
def test_get_digest_subscribers_basic_auth(self):
"""
"""
# single page result
expected_single = {
"count": 3,
"next": None,
"previous": None,
"results": [mkresult(1), mkresult(2), mkresult(3)]
}
with patch('requests.get', return_value=Mock(json=expected_single)) as p:
res = list(get_digest_subscribers())
p.assert_called_once_with(
self.expected_api_url,
params=self.expected_params,
headers=self.expected_headers,
auth=('someuser', 'somepass'))
self.assertEqual([
mkexpected(mkresult(1)),
mkexpected(mkresult(2)),
mkexpected(mkresult(3))], res)
"""
Functions in support of generating formatted digest emails of forums activity.
"""
from base64 import urlsafe_b64encode, urlsafe_b64decode
from hashlib import sha256
import logging
import sys
from Crypto.Cipher import AES
from Crypto import Random
from dogapi import dog_stats_api
from django.conf import settings
import requests
logger = logging.getLogger(__name__)
DIGEST_NOTIFICATION_PREFERENCE_KEY = 'notification_pref'
class UserServiceException(Exception):
pass
def _headers():
return {'X-EDX-API-Key': settings.US_API_KEY}
def _auth():
auth = {}
if settings.US_HTTP_AUTH_USER:
auth['auth'] = (settings.US_HTTP_AUTH_USER, settings.US_HTTP_AUTH_PASS)
return auth
def _http_get(*a, **kw):
try:
logger.debug('GET {} {}'.format(a[0], kw))
response = requests.get(*a, **kw)
except requests.exceptions.ConnectionError, e:
_, msg, tb = sys.exc_info()
raise UserServiceException, "request failed: {}".format(msg), tb
if response.status_code == 500:
raise UserServiceException, "HTTP Error 500: {}".format(response.reason)
return response
def get_digest_subscribers():
"""
Generator function that calls the edX user API and yields a dict for each
user opted in for digest notifications.
The returned dicts will have keys "id", "name", and "email" (all strings).
"""
api_url = settings.US_URL_BASE + '/user_api/v1/user_prefs/'
params = {
'key': DIGEST_NOTIFICATION_PREFERENCE_KEY,
'page_size': settings.US_RESULT_PAGE_SIZE,
'page': 1
}
logger.info('calling user api for digest subscribers')
while True:
with dog_stats_api.timer('notifier.get_digest_subscribers.time'):
data = _http_get(api_url, params=params, headers=_headers(), **_auth()).json
for result in data['results']:
user = result['user']
del user['url'] # not used
yield user
if data['next'] is None:
break
params['page'] += 1
def get_user(user_id):
api_url = '{}/user_api/v1/users/{}/'.format(settings.US_URL_BASE, user_id)
logger.info('calling user api for user %s', user_id)
with dog_stats_api.timer('notifier.get_user.time'):
r = _http_get(api_url, headers=_headers(), **_auth())
if r.status_code == 200:
user = r.json
del user['url']
return user
elif r.status_code == 404:
return None
else:
r.raise_for_status()
raise Exception(
'unhandled response from user service: %s %s' %
(r.status_code, r.reason))
# implementation mirrors that in
# https://github.com/edx/edx-platform/blob/master/lms/djangoapps/notification_prefs/views.py
class UsernameCipher(object):
"""
A transformation of a username to/from an opaque token
The purpose of the token is to make one-click unsubscribe links that don't
require the user to log in. To prevent users from unsubscribing other users,
we must ensure the token cannot be computed by anyone who has this
source code. The token must also be embeddable in a URL.
Thus, we take the following steps to encode (and do the inverse to decode):
1. Pad the UTF-8 encoding of the username with PKCS#7 padding to match the
AES block length
2. Generate a random AES block length initialization vector
3. Use AES-256 (with a hash of settings.SECRET_KEY as the encryption key)
in CBC mode to encrypt the username
4. Prepend the IV to the encrypted value to allow for initialization of the
decryption cipher
5. base64url encode the result
"""
@staticmethod
def _get_aes_cipher(initialization_vector):
hash_ = sha256()
hash_.update(settings.SECRET_KEY)
return AES.new(hash_.digest(), AES.MODE_CBC, initialization_vector)
@staticmethod
def _add_padding(input_str):
"""Return `input_str` with PKCS#7 padding added to match AES block length"""
padding_len = AES.block_size - len(input_str) % AES.block_size
return input_str + padding_len * chr(padding_len)
@staticmethod
def _remove_padding(input_str):
"""Return `input_str` with PKCS#7 padding trimmed to match AES block length"""
num_pad_bytes = ord(input_str[-1])
if num_pad_bytes < 1 or num_pad_bytes > AES.block_size or num_pad_bytes >= len(input_str):
raise UsernameDecryptionException("padding")
return input_str[:-num_pad_bytes]
@staticmethod
def encrypt(username):
initialization_vector = Random.new().read(AES.block_size)
aes_cipher = UsernameCipher._get_aes_cipher(initialization_vector)
return urlsafe_b64encode(
initialization_vector +
aes_cipher.encrypt(UsernameCipher._add_padding(username.encode("utf-8")))
)
@staticmethod
def decrypt(token):
try:
base64_decoded = urlsafe_b64decode(token)
except TypeError:
raise UsernameDecryptionException("base64url")
if len(base64_decoded) < AES.block_size:
raise UsernameDecryptionException("initialization_vector")
initialization_vector = base64_decoded[:AES.block_size]
aes_encrypted = base64_decoded[AES.block_size:]
aes_cipher = UsernameCipher._get_aes_cipher(initialization_vector)
try:
decrypted = aes_cipher.decrypt(aes_encrypted)
except ValueError:
raise UsernameDecryptionException("aes")
return UsernameCipher._remove_padding(decrypted)
Django==1.4.5
amqp==1.0.12
anyjson==0.3.3
autopep8==0.9.2
billiard==2.7.3.28
celery==3.0.19
coverage==3.6
django-celery==3.0.17
django-configurations==0.3
django-coverage==1.2.4
django-ses==0.4.1
dogapi==1.4.0
dogstatsd-python==0.2.1
kombu==2.5.11
logilab-astng==0.24.3
logilab-common==0.59.1
mock==1.0.1
pep8==1.4.6
pycrypto>=2.6
pylint==0.28.0
python-dateutil==2.1
pytz==2013b
requests==0.14.2
six==1.3.0
wsgiref==0.1.2
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment