Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
E
edx-video-pipeline
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
edx
edx-video-pipeline
Commits
112d11fd
Commit
112d11fd
authored
Oct 12, 2017
by
muhammad-ammar
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add management command to create oauth client
parent
15e1f50b
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
350 additions
and
0 deletions
+350
-0
.coveragerc
+1
-0
VEDA_OS01/management/__init__.py
+0
-0
VEDA_OS01/management/commands/__init__.py
+0
-0
VEDA_OS01/management/commands/create_oauth_client.py
+143
-0
VEDA_OS01/management/commands/tests/__init__.py
+0
-0
VEDA_OS01/management/commands/tests/test_create_oauth_client.py
+206
-0
No files found.
.coveragerc
View file @
112d11fd
...
...
@@ -21,6 +21,7 @@ omit =
VEDA_OS01/tests/*
VEDA_OS01/migrations/*
VEDA_OS01/admin.py
VEDA_OS01/management/commands/tests/*
...
...
VEDA_OS01/management/__init__.py
0 → 100644
View file @
112d11fd
VEDA_OS01/management/commands/__init__.py
0 → 100644
View file @
112d11fd
VEDA_OS01/management/commands/create_oauth_client.py
0 → 100644
View file @
112d11fd
"""
Management command used to create an OAuth client in the database.
"""
from
django.contrib.auth
import
get_user_model
from
django.core.exceptions
import
ValidationError
from
django.core.management.base
import
BaseCommand
,
CommandError
from
django.core.validators
import
URLValidator
from
oauth2_provider.models
import
Application
CLIENT_TYPES
=
[
client_type
[
0
]
for
client_type
in
Application
.
CLIENT_TYPES
]
GRANT_TYPES
=
[
grant_type
[
0
]
for
grant_type
in
Application
.
GRANT_TYPES
]
class
Command
(
BaseCommand
):
"""
create_oauth_client command class
"""
help
=
'Create a new OAuth Application Client. Outputs a serialized representation of the newly-created Client.'
def
add_arguments
(
self
,
parser
):
super
(
Command
,
self
)
.
add_arguments
(
parser
)
# Required positional arguments.
parser
.
add_argument
(
'client_id'
,
help
=
"String to assign as the Client ID."
)
parser
.
add_argument
(
'client_type'
,
help
=
"Client type."
)
parser
.
add_argument
(
'authorization_grant_type'
,
help
=
"Authorization flows available to the Application."
)
# Optional options.
parser
.
add_argument
(
'-u'
,
'--username'
,
help
=
"Username of a user to associate with the Client."
)
parser
.
add_argument
(
'-r'
,
'--redirect_uris'
,
help
=
"Comma separated redirect URIs."
)
parser
.
add_argument
(
'-n'
,
'--client_name'
,
help
=
"String to assign as the Client name."
)
parser
.
add_argument
(
'-s'
,
'--client_secret'
,
help
=
"String to assign as the Client secret. Should not be shared."
)
parser
.
add_argument
(
'-a'
,
'--skip_authorization'
,
action
=
'store_true'
,
default
=
False
,
help
=
"Skip authorization for trusted applications."
)
def
handle
(
self
,
*
args
,
**
options
):
self
.
_clean_required_args
(
options
[
'client_id'
],
options
[
'client_type'
],
options
[
'authorization_grant_type'
])
self
.
_parse_options
(
options
)
# Check if client ID is already in use. If so, fetch existing Client and update fields.
Application
.
objects
.
update_or_create
(
client_id
=
self
.
fields
.
pop
(
'client_id'
),
defaults
=
self
.
fields
)
def
_clean_required_args
(
self
,
client_id
,
client_type
,
grant_type
):
"""
Validate and clean the command's arguments.
Arguments:
client_id (str): Client Id.
client_type (str): Client Type.
grant_type (str): Grant Type
Raises:
CommandError, if the arguments have invalid values.
"""
client_id
=
client_id
and
client_id
.
strip
()
if
not
client_id
:
raise
CommandError
(
'Client id provided is invalid.'
)
client_type
=
client_type
.
lower
()
if
client_type
not
in
CLIENT_TYPES
:
raise
CommandError
(
'Client type provided is invalid. Please use one of {}.'
.
format
(
CLIENT_TYPES
))
grant_type
=
grant_type
.
lower
()
if
grant_type
not
in
GRANT_TYPES
:
raise
CommandError
(
'Grant type provided is invalid. Please use one of {}.'
.
format
(
GRANT_TYPES
))
self
.
fields
=
{
# pylint: disable=attribute-defined-outside-init
'client_id'
:
client_id
,
'client_type'
:
client_type
,
'authorization_grant_type'
:
grant_type
,
}
def
_parse_options
(
self
,
options
):
"""
Parse the command's options.
Arguments:
options (dict): Options with which the command was called.
Raises:
CommandError, if user does not exist or redirect_uris is invalid
"""
for
key
in
(
'username'
,
'client_name'
,
'client_secret'
,
'redirect_uris'
,
'skip_authorization'
):
value
=
options
.
get
(
key
)
# replace argument names to match Application model field names
if
key
==
'username'
:
key
=
'user_id'
elif
key
==
'client_name'
:
key
=
'name'
if
value
is
not
None
:
self
.
fields
[
key
]
=
value
username
=
self
.
fields
.
get
(
'user_id'
)
if
username
is
not
None
:
try
:
user_model
=
get_user_model
()
self
.
fields
[
'user_id'
]
=
user_model
.
objects
.
get
(
username
=
username
)
.
id
except
user_model
.
DoesNotExist
:
raise
CommandError
(
'User matching the provided username does not exist.'
)
uris
=
self
.
fields
.
get
(
'redirect_uris'
)
if
uris
is
not
None
:
uris
=
uris
.
split
(
','
)
for
uri
in
uris
:
try
:
URLValidator
()(
uri
)
except
ValidationError
:
raise
CommandError
(
'URIs provided are invalid. Please provide valid redirect URIs.'
)
VEDA_OS01/management/commands/tests/__init__.py
0 → 100644
View file @
112d11fd
VEDA_OS01/management/commands/tests/test_create_oauth_client.py
0 → 100644
View file @
112d11fd
"""
Tests of the create_oauth_client management command.
"""
from
itertools
import
product
import
ddt
from
django.contrib.auth
import
get_user_model
from
django.core.management
import
call_command
from
django.core.management.base
import
CommandError
from
django.test
import
TestCase
from
oauth2_provider.models
import
Application
from
VEDA_OS01.management.commands.create_oauth_client
import
CLIENT_TYPES
,
GRANT_TYPES
USER_ID
=
1
USERNAME
=
'username'
CLIENT_ID
=
'cliend_id101'
REDIRECT_URI
=
'https://www.example.com/o/token'
OPTIONAL_COMMAND_ARGS
=
(
'username'
,
'redirect_uris'
,
'client_name'
,
'client_secret'
,
'skip_authorization'
)
@ddt.ddt
class
CreateOauthAppClientTests
(
TestCase
):
"""
Management command test class.
"""
def
setUp
(
self
):
super
(
CreateOauthAppClientTests
,
self
)
.
setUp
()
user_model
=
get_user_model
()
self
.
user
=
user_model
.
objects
.
create
(
username
=
USERNAME
)
def
_call_command
(
self
,
args
,
options
=
None
):
"""
Call the command.
"""
if
options
is
None
:
options
=
{}
call_command
(
'create_oauth_client'
,
*
args
,
**
options
)
def
assert_client_created
(
self
,
args
,
options
):
"""
Verify that the Client was created.
"""
application
=
Application
.
objects
.
get
()
for
index
,
attr
in
enumerate
((
'client_id'
,
'client_type'
,
'authorization_grant_type'
)):
self
.
assertEqual
(
args
[
index
],
getattr
(
application
,
attr
))
username
=
options
.
get
(
'username'
)
if
username
is
not
None
:
get_user_model
()
.
objects
.
get
(
username
=
username
)
client_name
=
options
.
get
(
'client_name'
)
if
client_name
is
not
None
:
self
.
assertEqual
(
client_name
,
application
.
name
)
for
attr
in
(
'client_secret'
,
'redirect_uris'
,
'skip_authorization'
):
value
=
options
.
get
(
attr
)
if
value
is
not
None
:
self
.
assertEqual
(
value
,
getattr
(
application
,
attr
))
# Generate all valid argument and options combinations
@ddt.data
(
*
product
(
# Generate all valid argument combinations
product
(
(
CLIENT_ID
,),
(
t
for
t
in
CLIENT_TYPES
),
(
g
for
g
in
GRANT_TYPES
),
),
# Generate all valid option combinations
(
dict
(
zip
(
OPTIONAL_COMMAND_ARGS
,
p
))
for
p
in
product
(
(
USERNAME
,
None
),
(
REDIRECT_URI
,
None
),
(
'client_name'
,
None
),
(
'client_secret'
,
None
),
(
True
,
False
)
)
)
))
@ddt.unpack
def
test_client_creation
(
self
,
args
,
options
):
"""
Verify that the command creates a Client when given valid arguments and options.
"""
self
.
_call_command
(
args
,
options
)
self
.
assert_client_created
(
args
,
options
)
@ddt.data
(
((
CLIENT_ID
,),
'too few arguments'
),
((
CLIENT_ID
,
REDIRECT_URI
,
CLIENT_TYPES
[
0
],
CLIENT_TYPES
[
1
]),
'unrecognized arguments'
),
)
@ddt.unpack
def
test_argument_cardinality
(
self
,
args
,
err_msg
):
"""
Verify that the command fails when given an incorrect number of arguments.
"""
with
self
.
assertRaises
(
CommandError
)
as
exc
:
self
.
_call_command
(
args
,
{})
self
.
assertIn
(
err_msg
,
exc
.
exception
.
message
)
@ddt.data
(
{
'client_id'
:
''
,
},
{
'client_id'
:
' '
,
},
{
'client_id'
:
None
,
}
)
@ddt.unpack
def
test_client_id_validation
(
self
,
client_id
):
"""
Verify that the command fails when the provided client id is invalid.
"""
with
self
.
assertRaises
(
CommandError
)
as
exc
:
self
.
_call_command
((
client_id
,
CLIENT_TYPES
[
0
],
GRANT_TYPES
[
0
]))
self
.
assertEqual
(
'Client id provided is invalid.'
,
exc
.
exception
.
message
)
def
test_client_type_validation
(
self
):
"""
Verify that the command fails when the provided client type is invalid.
"""
with
self
.
assertRaises
(
CommandError
)
as
exc
:
self
.
_call_command
((
CLIENT_ID
,
'invalid_client_type'
,
GRANT_TYPES
[
0
]))
self
.
assertEqual
(
'Client type provided is invalid. Please use one of {}.'
.
format
(
CLIENT_TYPES
),
exc
.
exception
.
message
)
def
test_grant_type_validation
(
self
):
"""
Verify that the command fails when the provided grant type is invalid.
"""
with
self
.
assertRaises
(
CommandError
)
as
exc
:
self
.
_call_command
((
CLIENT_ID
,
CLIENT_TYPES
[
0
],
'invalid_grant_type'
))
self
.
assertEqual
(
'Grant type provided is invalid. Please use one of {}.'
.
format
(
GRANT_TYPES
),
exc
.
exception
.
message
)
def
test_username_validation
(
self
):
"""
Verify that the command fails when the provided username is invalid.
"""
with
self
.
assertRaises
(
CommandError
)
as
exc
:
self
.
_call_command
(
(
CLIENT_ID
,
CLIENT_TYPES
[
0
],
GRANT_TYPES
[
0
]),
{
'username'
:
'invalid'
}
)
self
.
assertEqual
(
'User matching the provided username does not exist.'
,
exc
.
exception
.
message
)
def
test_url_validation
(
self
):
"""
Verify that the command fails when the provided URLs are invalid.
"""
args
=
CLIENT_ID
,
CLIENT_TYPES
[
0
],
GRANT_TYPES
[
0
]
with
self
.
assertRaises
(
CommandError
)
as
exc
:
self
.
_call_command
(
args
,
{
'redirect_uris'
:
'invalide uri'
})
self
.
assertEqual
(
'URIs provided are invalid. Please provide valid redirect URIs.'
,
exc
.
exception
.
message
)
def
test_idempotency
(
self
):
"""
Verify that the command can be run repeatedly with the same client id, without any ill effects.
"""
args
=
[
CLIENT_ID
,
CLIENT_TYPES
[
0
],
GRANT_TYPES
[
0
]]
options
=
{
'username'
:
'username'
,
'client_secret'
:
'client_secret'
,
'client_name'
:
'client_name'
,
'redirect_uris'
:
'https://www.example.com/o/token'
,
'skip_authorization'
:
True
}
self
.
_call_command
(
args
,
options
)
self
.
assert_client_created
(
args
,
options
)
# Verify that the command is idempotent.
self
.
_call_command
(
args
,
options
)
self
.
assert_client_created
(
args
,
options
)
# Verify that attributes are updated if the command is run with the same client ID,
# but with other options varying.
options
[
'client_secret'
]
=
'another-secret'
options
[
'skip_authorization'
]
=
False
self
.
_call_command
(
args
,
options
)
self
.
assert_client_created
(
args
,
options
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment