Commit 64887c68 by Chris Rossi Committed by Diana Huang

Flesh out findusers script, except for LinkedIn API call, with tests.

parent cc53aab4
import datetime
import pytz
import time
from django.contrib.auth.models import User
from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone
from optparse import make_option
FRIDAY = 4
def get_call_limits():
"""
Returns a tuple of: (max_checks, checks_per_call, time_between_calls)
Here are the parameters provided by LinkedIn:
Please note: in order to ensure a successful call, please run the calls
between Friday 6pm PST and Monday 5am PST.
During the week, calls are limited to very low volume (500 profiles/day)
and must be run after 6pm and before 5am. This should only be used to do
subsequent trigger emails. Please contact the developer support alias for
more information.
Use 80 emails per API call and 1 call per second.
"""
now = timezone.now().astimezone(pytz.timezone('US/Pacific'))
lastfriday = now
while lastfriday.weekday() != FRIDAY:
lastfriday -= datetime.timedelta(days=1)
safeharbor_begin = lastfriday.replace(hour=18, minute=0)
safeharbor_end = safeharbor_begin + datetime.timedelta(days=2, hours=11)
if safeharbor_begin < now < safeharbor_end:
return -1, 80, 1
elif now.hour >= 18 or now.hour < 5:
return 500, 80, 1
else:
return 0, 0, 0
class Command(BaseCommand):
args = ''
help = 'Checks LinkedIn for students that are on LinkedIn'
option_list = BaseCommand.option_list + (
make_option('--recheck',
action='store_true',
dest='recheck',
default=False,
help='Check users that have been checked in the past to see if '
'they have joined or left LinkedIn since the last check'),
)
def handle(self, *args, **options):
print "Hello World!"
api = LinkedinAPI()
recheck = options.pop('recheck', False)
max_checks, checks_per_call, time_between_calls = get_call_limits()
if not max_checks:
raise CommandError("No checks allowed during this time.")
check_users = []
for user in User.objects.all():
checked = (hasattr(user, 'linkedin') and
user.linkedin.has_linkedin_account is not None)
if recheck or not checked:
check_users.append(user)
if max_checks != -1 and len(check_users) > max_checks:
self.stderr.write(
"WARNING: limited to checking only %d users today." %
max_checks)
check_users = check_users[:max_checks]
batches = [check_users[i:i + checks_per_call]
for i in xrange(0, len(check_users), checks_per_call)]
def do_batch(batch):
emails = [u.email for u in batch]
for user, has_account in zip(batch, api.batch(emails)):
user.linkedin.has_linkedin_account = has_account
if batches:
do_batch(batches.pop(0))
for batch in batches:
time.sleep(time_between_calls)
do_batch(batch)
class LinkedinAPI(object):
def batch(self, emails):
pass
import datetime
import mock
import pytz
import StringIO
import unittest
from linkedin.management.commands import findusers
class FindUsersTests(unittest.TestCase):
@mock.patch('linkedin.management.commands.findusers.timezone')
def test_get_call_limits_in_safe_harbor(self, timezone):
fut = findusers.get_call_limits
tz = pytz.timezone('US/Eastern')
timezone.now.return_value = datetime.datetime(
2013, 12, 14, 0, 0, tzinfo=tz)
self.assertEqual(fut(), (-1, 80, 1))
timezone.now.return_value = datetime.datetime(
2013, 12, 13, 21, 1, tzinfo=tz)
self.assertEqual(fut(), (-1, 80, 1))
timezone.now.return_value = datetime.datetime(
2013, 12, 15, 7, 59, tzinfo=tz)
self.assertEqual(fut(), (-1, 80, 1))
@mock.patch('linkedin.management.commands.findusers.timezone')
def test_get_call_limits_in_business_hours(self, timezone):
fut = findusers.get_call_limits
tz = pytz.timezone('US/Eastern')
timezone.now.return_value = datetime.datetime(
2013, 12, 11, 11, 3, tzinfo=tz)
self.assertEqual(fut(), (0, 0, 0))
timezone.now.return_value = datetime.datetime(
2013, 12, 13, 20, 59, tzinfo=tz)
self.assertEqual(fut(), (0, 0, 0))
timezone.now.return_value = datetime.datetime(
2013, 12, 16, 8, 1, tzinfo=tz)
self.assertEqual(fut(), (0, 0, 0))
@mock.patch('linkedin.management.commands.findusers.timezone')
def test_get_call_limits_on_weeknights(self, timezone):
fut = findusers.get_call_limits
tz = pytz.timezone('US/Eastern')
timezone.now.return_value = datetime.datetime(
2013, 12, 11, 21, 3, tzinfo=tz)
self.assertEqual(fut(), (500, 80, 1))
timezone.now.return_value = datetime.datetime(
2013, 12, 11, 7, 59, tzinfo=tz)
self.assertEqual(fut(), (500, 80, 1))
@mock.patch('linkedin.management.commands.findusers.time')
@mock.patch('linkedin.management.commands.findusers.User')
@mock.patch('linkedin.management.commands.findusers.LinkedinAPI')
@mock.patch('linkedin.management.commands.findusers.get_call_limits')
def test_command_success_recheck_no_limits(
self, get_call_limits, API, User, time):
fut = findusers.Command().handle
get_call_limits.return_value = (-1, 6, 42)
api = API.return_value
users = [mock.Mock(email=i) for i in xrange(10)]
User.objects.all.return_value = users
def dummy_batch(emails):
return [email % 2 == 0 for email in emails]
api.batch = dummy_batch
fut(recheck=True)
time.sleep.assert_called_once_with(42)
self.assertEqual([u.linkedin.has_linkedin_account for u in users],
[i % 2 == 0 for i in xrange(10)])
@mock.patch('linkedin.management.commands.findusers.time')
@mock.patch('linkedin.management.commands.findusers.User')
@mock.patch('linkedin.management.commands.findusers.LinkedinAPI')
@mock.patch('linkedin.management.commands.findusers.get_call_limits')
def test_command_success_no_recheck_no_limits(
self, get_call_limits, API, User, time):
fut = findusers.Command().handle
get_call_limits.return_value = (-1, 6, 42)
api = API.return_value
users = [mock.Mock(email=i) for i in xrange(10)]
for user in users[:6]:
user.linkedin.has_linkedin_account = user.email % 2 == 0
for user in users[6:]:
user.linkedin.has_linkedin_account = None
User.objects.all.return_value = users
def dummy_batch(emails):
self.assertEqual(len(emails), 4)
return [email % 2 == 0 for email in emails]
api.batch = dummy_batch
fut()
time.sleep.assert_not_called()
self.assertEqual([u.linkedin.has_linkedin_account for u in users],
[i % 2 == 0 for i in xrange(10)])
@mock.patch('linkedin.management.commands.findusers.time')
@mock.patch('linkedin.management.commands.findusers.User')
@mock.patch('linkedin.management.commands.findusers.LinkedinAPI')
@mock.patch('linkedin.management.commands.findusers.get_call_limits')
def test_command_success_no_recheck_no_users(
self, get_call_limits, API, User, time):
fut = findusers.Command().handle
get_call_limits.return_value = (-1, 6, 42)
api = API.return_value
users = [mock.Mock(email=i) for i in xrange(10)]
for user in users:
user.linkedin.has_linkedin_account = user.email % 2 == 0
User.objects.all.return_value = users
def dummy_batch(emails):
self.assertTrue(False) # shouldn't be called
api.batch = dummy_batch
fut()
time.sleep.assert_not_called()
self.assertEqual([u.linkedin.has_linkedin_account for u in users],
[i % 2 == 0 for i in xrange(10)])
@mock.patch('linkedin.management.commands.findusers.time')
@mock.patch('linkedin.management.commands.findusers.User')
@mock.patch('linkedin.management.commands.findusers.LinkedinAPI')
@mock.patch('linkedin.management.commands.findusers.get_call_limits')
def test_command_success_recheck_with_limit(
self, get_call_limits, API, User, time):
command = findusers.Command()
command.stderr = StringIO.StringIO()
fut = command.handle
get_call_limits.return_value = (9, 6, 42)
api = API.return_value
users = [mock.Mock(email=i) for i in xrange(10)]
for user in users:
user.linkedin.has_linkedin_account = None
User.objects.all.return_value = users
def dummy_batch(emails):
return [email % 2 == 0 for email in emails]
api.batch = dummy_batch
fut()
time.sleep.assert_called_once_with(42)
self.assertEqual([u.linkedin.has_linkedin_account for u in users[:9]],
[i % 2 == 0 for i in xrange(9)])
self.assertEqual(users[9].linkedin.has_linkedin_account, None)
self.assertTrue(command.stderr.getvalue().startswith("WARNING"))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment