Commit c5c9554d by Chris Rossi Committed by Diana Huang

Test LinkedinAPI

parent cf98cb63
......@@ -16,26 +16,28 @@ class LinkedinAPI(object):
"""
Encapsulates the LinkedIn API.
"""
def __init__(self):
def __init__(self, command):
config = getattr(settings, "LINKEDIN_API", None)
if not config:
raise CommandError("LINKEDIN_API is not configured")
self.config = config
try:
self.tokens = LinkedInToken.objects.get()
self.token = LinkedInToken.objects.get()
except LinkedInToken.DoesNotExist:
self.tokens = None
self.token = None
self.command = command
self.state = str(uuid.uuid4())
def http_error(self, error, message):
"""
Handle an unexpected HTTP response.
"""
print "!!ERROR!!"
print error
print error.read()
stderr = self.command.stderr
stderr.write("!!ERROR!!")
stderr.write(error)
stderr.write(error.read())
raise CommandError(message)
def authorization_url(self):
......@@ -57,53 +59,56 @@ class LinkedinAPI(object):
assert query['state'][0] == self.state, (query['state'][0], self.state)
return query['code'][0]
def get_access_token(self, code):
"""
Given an authorization code, get an access token.
"""
def access_token_url(self, code):
config = self.config
url = ("https://www.linkedin.com/uas/oauth2/accessToken"
"?grant_type=authorization_code"
"&code=%s&redirect_uri=%s&client_id=%s&client_secret=%s" % (
code, config['REDIRECT_URI'], config['CLIENT_ID'],
config['CLIENT_SECRET']))
return ("https://www.linkedin.com/uas/oauth2/accessToken"
"?grant_type=authorization_code"
"&code=%s&redirect_uri=%s&client_id=%s&client_secret=%s" % (
code, config['REDIRECT_URI'], config['CLIENT_ID'],
config['CLIENT_SECRET']))
def call_json_api(self, url):
try:
response = urllib2.urlopen(url).read()
request = urllib2.Request(url, headers={'x-li-format': 'json'})
response = urllib2.urlopen(request).read()
return json.loads(response)
except urllib2.HTTPError, error:
self.http_error(error, "Unable to retrieve access token")
self.http_error(error, "Error calling LinkedIn API")
access_token = json.loads(response)['access_token']
def get_access_token(self, code):
"""
Given an authorization code, get an access token.
"""
response = self.call_json_api(self.access_token_url(code))
access_token = response['access_token']
try:
tokens = LinkedInToken.objects.get()
tokens.access_token = access_token
tokens.authorization_code = code
token = LinkedInToken.objects.get()
token.access_token = access_token
except LinkedInToken.DoesNotExist:
tokens = LinkedInToken(access_token=access_token)
tokens.save()
self.tokens = tokens
token = LinkedInToken(access_token=access_token)
token.save()
self.token = token
return access_token
def batch(self, emails):
"""
Get the LinkedIn status for a batch of emails.
"""
if self.tokens is None:
def require_token(self):
if self.token is None:
raise CommandError(
"You must log in to LinkedIn in order to use this script. "
"Please use the 'login' command to log in to LinkedIn.")
emails = list(emails) # realize generator since we traverse twice
def batch_url(self, emails):
self.require_token()
queries = ','.join(("email=" + email for email in emails))
url = "https://api.linkedin.com/v1/people::(%s):(id)" % queries
url += "?oauth2_access_token=%s" % self.tokens.access_token
request = urllib2.Request(url, headers={'x-li-format': 'json'})
try:
response = urllib2.urlopen(request).read()
values = json.loads(response)['values']
accounts = set(value['_key'][6:] for value in values)
return (email in accounts for email in emails)
except urllib2.HTTPError, error:
self.http_error(error, "Unable to access People API")
return (True for email in emails)
url += "?oauth2_access_token=%s" % self.token.access_token
return url
def batch(self, emails):
"""
Get the LinkedIn status for a batch of emails.
"""
emails = list(emails) # realize generator since we traverse twice
response = self.call_json_api(self.batch_url(emails))
accounts = set(value['_key'][6:] for value in response['values'])
return (email in accounts for email in emails)
......@@ -75,7 +75,7 @@ class Command(BaseCommand):
"""
Check users.
"""
api = LinkedinAPI()
api = LinkedinAPI(self)
recheck = options.pop('recheck', False)
force = options.pop('force', False)
if force:
......
......@@ -19,7 +19,7 @@ class Command(BaseCommand):
def handle(self, *args, **options):
"""
"""
api = LinkedinAPI()
api = LinkedinAPI(self)
print "Let's log into your LinkedIn account."
print "Start by visiting this url:"
print api.authorization_url()
......
......@@ -43,7 +43,7 @@ class Command(BaseCommand):
def __init__(self):
super(BaseCommand, self).__init__()
self.api = LinkedinAPI()
self.api = LinkedinAPI(self)
def handle(self, *args, **options):
whitelist = self.api.config.get('EMAIL_WHITELIST')
......
import mock
import StringIO
from django.core.management.base import CommandError
from django.test import TestCase
from linkedin.management.commands import LinkedinAPI
from linkedin.models import LinkedInToken
class LinkedinAPITests(TestCase):
def setUp(self):
patcher = mock.patch('linkedin.management.commands.uuid.uuid4')
uuid4 = patcher.start()
uuid4.return_value = '0000-0000'
self.addCleanup(patcher.stop)
def make_one(self):
return LinkedinAPI(DummyCommand())
@mock.patch('django.conf.settings.LINKEDIN_API', None)
def test_ctor_no_api_config(self):
with self.assertRaises(CommandError):
self.make_one()
def test_ctor_no_token(self):
api = self.make_one()
self.assertEqual(api.token, None)
def test_ctor_with_token(self):
token = LinkedInToken()
token.save()
api = self.make_one()
self.assertEqual(api.token, token)
def test_http_error(self):
api = self.make_one()
with self.assertRaises(CommandError):
api.http_error(DummyHTTPError(), "That didn't work")
self.assertEqual(
api.command.stderr.getvalue(),
"!!ERROR!!"
"HTTPError OMG!"
"OMG OHNOES!")
def test_authorization_url(self):
api = self.make_one()
self.assertEqual(
api.authorization_url(),
'https://www.linkedin.com/uas/oauth2/authorization?'
'response_type=code&client_id=12345&state=0000-0000&'
'redirect_uri=http://bar.foo')
def test_get_authorization_code(self):
fut = self.make_one().get_authorization_code
self.assertEqual(
fut('http://foo.bar/?state=0000-0000&code=54321'), '54321')
def test_access_token_url(self):
fut = self.make_one().access_token_url
self.assertEqual(
fut('54321'),
'https://www.linkedin.com/uas/oauth2/accessToken?'
'grant_type=authorization_code&code=54321&'
'redirect_uri=http://bar.foo&client_id=12345&client_secret=SECRET')
def test_get_access_token(self):
api = self.make_one()
api.call_json_api = mock.Mock(return_value={'access_token': '777'})
self.assertEqual(api.get_access_token('54321'), '777')
token = LinkedInToken.objects.get()
self.assertEqual(token.access_token, '777')
def test_get_access_token_overwrite_previous(self):
LinkedInToken(access_token='888').save()
api = self.make_one()
api.call_json_api = mock.Mock(return_value={'access_token': '777'})
self.assertEqual(api.get_access_token('54321'), '777')
token = LinkedInToken.objects.get()
self.assertEqual(token.access_token, '777')
def test_require_token_no_token(self):
fut = self.make_one().require_token
with self.assertRaises(CommandError):
fut()
def test_require_token(self):
LinkedInToken().save()
fut = self.make_one().require_token
fut()
def test_batch_url(self):
LinkedInToken(access_token='777').save()
fut = self.make_one().batch_url
emails = ['foo@bar', 'bar@foo']
self.assertEquals(
fut(emails),
'https://api.linkedin.com/v1/people::(email=foo@bar,email=bar@foo):'
'(id)?oauth2_access_token=777')
def test_batch(self):
LinkedInToken(access_token='777').save()
api = self.make_one()
api.call_json_api = mock.Mock(return_value={
'values': [{'_key': 'email=bar@foo'}]})
emails = ['foo@bar', 'bar@foo']
self.assertEqual(list(api.batch(emails)), [False, True])
class DummyCommand(object):
def __init__(self):
self.stderr = StringIO.StringIO()
class DummyHTTPError(object):
def __str__(self):
return 'HTTPError OMG!'
def read(self):
return 'OMG OHNOES!'
......@@ -260,6 +260,9 @@ LTI_PORT = 8765
############################ LinkedIn Integration #############################
INSTALLED_APPS += ('linkedin',)
LINKEDIN_API = {
'CLIENT_ID': '12345',
'CLIENT_SECRET': 'SECRET',
'REDIRECT_URI': 'http://bar.foo',
'COMPANY_NAME': 'edX',
'COMPANY_ID': '0000000',
'EMAIL_FROM': 'The Team <team@test.foo>',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment