django-oidc-provider/oidc_provider/tests/test_authorize_endpoint.py
Wojciech Bartosiak a829726be8 Merge develop to v0.5.x (#179)
* Log create_uri_response exceptions to logger.exception

* Support grant type password - basics

* Add tests for Resource Owner Password Credentials Flow

* Password Grant -Response according to specification

* Better tests for errors, disable grant type password by default

* Add documentation for grant type password

* User authentication failure to return 403

* Add id_token to response

* skipping consent only works for confidential clients

* fix URI fragment

example not working URL `http://localhost:8100/#/auth/callback/`

* OIDC_POST_END_SESSION_HOOK + tests

* Explicit function naming

* Remove print statements

* No need for semicolons, this is Python

* Update CHANGELOG.md

* fixed logger message

* Improved `exp` value calculation

* rename OIDC_POST_END_SESSION_HOOK to OIDC_AFTER_END_SESSION_HOOK

* added docs for OIDC_AFTER_END_SESSION_HOOK

*  Replaces `LOGIN_URL` with `OIDC_LOGIN_URL`
so users can use a different login path for their oidc requests.

* Adds a setting variable for custom template paths

* Updates documentation

* Fixed bad try/except/finally block

* Adds test for OIDC_TEMPLATES settings

* Determine value for op_browser_state from session_key or default

* Do not use cookie for browser_state. It may not yet be there

* Add docs on new setting

OIDC_UNAUTHENTICATED_SESSION_MANAGEMENT_KEY

* Fix compatibility for older versions of Django

* solved merging typo for missing @property
2017-05-05 05:19:57 +02:00

551 lines
20 KiB
Python

try:
from urllib.parse import urlencode
except ImportError:
from urllib import urlencode
try:
from urllib.parse import parse_qs, urlsplit
except ImportError:
from urlparse import parse_qs, urlsplit
import uuid
from mock import patch, mock
from django.contrib.auth.models import AnonymousUser
from django.core.management import call_command
from django.core.urlresolvers import reverse
from django.test import (
RequestFactory,
override_settings,
)
from django.test import TestCase
from jwkest.jwt import JWT
from oidc_provider import settings
from oidc_provider.tests.app.utils import (
create_fake_user,
create_fake_client,
FAKE_CODE_CHALLENGE,
is_code_valid,
)
from oidc_provider.views import AuthorizeView
from oidc_provider.lib.endpoints.authorize import AuthorizeEndpoint
class AuthorizeEndpointMixin(object):
def _auth_request(self, method, data={}, is_user_authenticated=False):
url = reverse('oidc_provider:authorize')
if method.lower() == 'get':
query_str = urlencode(data).replace('+', '%20')
if query_str:
url += '?' + query_str
request = self.factory.get(url)
elif method.lower() == 'post':
request = self.factory.post(url, data=data)
else:
raise Exception('Method unsupported for an Authorization Request.')
# Simulate that the user is logged.
request.user = self.user if is_user_authenticated else AnonymousUser()
response = AuthorizeView.as_view()(request)
return response
class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
"""
Test cases for Authorize Endpoint using Code Flow.
"""
def setUp(self):
call_command('creatersakey')
self.factory = RequestFactory()
self.user = create_fake_user()
self.client = create_fake_client(response_type='code')
self.client_with_no_consent = create_fake_client(response_type='code', require_consent=False)
self.client_public = create_fake_client(response_type='code', is_public=True)
self.client_public_with_no_consent = create_fake_client(response_type='code', is_public=True, require_consent=False)
self.state = uuid.uuid4().hex
self.nonce = uuid.uuid4().hex
def test_missing_parameters(self):
"""
If the request fails due to a missing, invalid, or mismatching
redirection URI, or if the client identifier is missing or invalid,
the authorization server SHOULD inform the resource owner of the error.
See: https://tools.ietf.org/html/rfc6749#section-4.1.2.1
"""
response = self._auth_request('get')
self.assertEqual(response.status_code, 200)
self.assertEqual(bool(response.content), True)
def test_invalid_response_type(self):
"""
The OP informs the RP by using the Error Response parameters defined
in Section 4.1.2.1 of OAuth 2.0.
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthError
"""
# Create an authorize request with an unsupported response_type.
data = {
'client_id': self.client.client_id,
'response_type': 'something_wrong',
'redirect_uri': self.client.default_redirect_uri,
'scope': 'openid email',
'state': self.state,
}
response = self._auth_request('get', data)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.has_header('Location'), True)
# Should be an 'error' component in query.
self.assertIn('error=', response['Location'])
def test_user_not_logged(self):
"""
The Authorization Server attempts to Authenticate the End-User by
redirecting to the login view.
See: http://openid.net/specs/openid-connect-core-1_0.html#Authenticates
"""
data = {
'client_id': self.client.client_id,
'response_type': 'code',
'redirect_uri': self.client.default_redirect_uri,
'scope': 'openid email',
'state': self.state,
}
response = self._auth_request('get', data)
# Check if user was redirected to the login view.
self.assertIn(settings.get('OIDC_LOGIN_URL'), response['Location'])
def test_user_consent_inputs(self):
"""
Once the End-User is authenticated, the Authorization Server MUST
obtain an authorization decision before releasing information to
the Client.
See: http://openid.net/specs/openid-connect-core-1_0.html#Consent
"""
data = {
'client_id': self.client.client_id,
'response_type': 'code',
'redirect_uri': self.client.default_redirect_uri,
'scope': 'openid email',
'state': self.state,
# PKCE parameters.
'code_challenge': FAKE_CODE_CHALLENGE,
'code_challenge_method': 'S256',
}
response = self._auth_request('get', data, is_user_authenticated=True)
# Check if hidden inputs exists in the form,
# also if their values are valid.
input_html = '<input name="{0}" type="hidden" value="{1}" />'
to_check = {
'client_id': self.client.client_id,
'redirect_uri': self.client.default_redirect_uri,
'response_type': 'code',
'code_challenge': FAKE_CODE_CHALLENGE,
'code_challenge_method': 'S256',
}
for key, value in iter(to_check.items()):
is_input_ok = input_html.format(key, value) in response.content.decode('utf-8')
self.assertEqual(is_input_ok, True,
msg='Hidden input for "' + key + '" fails.')
def test_user_consent_response(self):
"""
First,
if the user denied the consent we must ensure that
the error response parameters are added to the query component
of the Redirection URI.
Second,
if the user allow the RP then the server MUST return
the parameters defined in Section 4.1.2 of OAuth 2.0 [RFC6749]
by adding them as query parameters to the redirect_uri.
"""
data = {
'client_id': self.client.client_id,
'redirect_uri': self.client.default_redirect_uri,
'response_type': 'code',
'scope': 'openid email',
'state': self.state,
# PKCE parameters.
'code_challenge': FAKE_CODE_CHALLENGE,
'code_challenge_method': 'S256',
}
response = self._auth_request('post', data, is_user_authenticated=True)
# Because user doesn't allow app, SHOULD exists an error parameter
# in the query.
self.assertIn('error=', response['Location'], msg='error param is missing in query.')
self.assertIn('access_denied', response['Location'], msg='"access_denied" code is missing in query.')
# Simulate user authorization.
data['allow'] = 'Accept' # Will be the value of the button.
response = self._auth_request('post', data, is_user_authenticated=True)
is_code_ok = is_code_valid(url=response['Location'],
user=self.user,
client=self.client)
self.assertEqual(is_code_ok, True,
msg='Code returned is invalid.')
# Check if the state is returned.
state = (response['Location'].split('state='))[1].split('&')[0]
self.assertEqual(state, self.state, msg='State change or is missing.')
def test_user_consent_skipped(self):
"""
If users previously gave consent to some client (for a specific
list of scopes) and because they might be prompted for the same
authorization multiple times, the server skip it.
"""
data = {
'client_id': self.client_with_no_consent.client_id,
'redirect_uri': self.client_with_no_consent.default_redirect_uri,
'response_type': 'code',
'scope': 'openid email',
'state': self.state,
'allow': 'Accept',
}
request = self.factory.post(reverse('oidc_provider:authorize'),
data=data)
# Simulate that the user is logged.
request.user = self.user
response = self._auth_request('post', data, is_user_authenticated=True)
self.assertIn('code', response['Location'], msg='Code is missing in the returned url.')
response = self._auth_request('post', data, is_user_authenticated=True)
is_code_ok = is_code_valid(url=response['Location'],
user=self.user,
client=self.client_with_no_consent)
self.assertEqual(is_code_ok, True, msg='Code returned is invalid.')
del data['allow']
response = self._auth_request('get', data, is_user_authenticated=True)
is_code_ok = is_code_valid(url=response['Location'],
user=self.user,
client=self.client_with_no_consent)
self.assertEqual(is_code_ok, True, msg='Code returned is invalid or missing.')
def test_response_uri_is_properly_constructed(self):
data = {
'client_id': self.client.client_id,
'redirect_uri': self.client.default_redirect_uri + "?redirect_state=xyz",
'response_type': 'code',
'scope': 'openid email',
'state': self.state,
'allow': 'Accept',
}
response = self._auth_request('post', data, is_user_authenticated=True)
# TODO
def test_public_client_auto_approval(self):
"""
It's recommended not auto-approving requests for non-confidential clients.
"""
data = {
'client_id': self.client_public_with_no_consent.client_id,
'response_type': 'code',
'redirect_uri': self.client_public_with_no_consent.default_redirect_uri,
'scope': 'openid email',
'state': self.state,
}
response = self._auth_request('get', data, is_user_authenticated=True)
self.assertIn('Request for Permission', response.content.decode('utf-8'))
def test_prompt_parameter(self):
"""
Specifies whether the Authorization Server prompts the End-User for reauthentication and consent.
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
"""
data = {
'client_id': self.client.client_id,
'response_type': self.client.response_type,
'redirect_uri': self.client.default_redirect_uri,
'scope': 'openid email',
'state': self.state,
}
data['prompt'] = 'none'
response = self._auth_request('get', data)
# An error is returned if an End-User is not already authenticated.
self.assertIn('login_required', response['Location'])
response = self._auth_request('get', data, is_user_authenticated=True)
# An error is returned if the Client does not have pre-configured consent for the requested Claims.
self.assertIn('interaction_required', response['Location'])
class AuthorizationImplicitFlowTestCase(TestCase, AuthorizeEndpointMixin):
"""
Test cases for Authorization Endpoint using Implicit Flow.
"""
def setUp(self):
call_command('creatersakey')
self.factory = RequestFactory()
self.user = create_fake_user()
self.client = create_fake_client(response_type='id_token token')
self.client_public = create_fake_client(response_type='id_token token', is_public=True)
self.client_no_access = create_fake_client(response_type='id_token')
self.client_public_no_access = create_fake_client(response_type='id_token', is_public=True)
self.state = uuid.uuid4().hex
self.nonce = uuid.uuid4().hex
def test_missing_nonce(self):
"""
The `nonce` parameter is REQUIRED if you use the Implicit Flow.
"""
data = {
'client_id': self.client.client_id,
'response_type': self.client.response_type,
'redirect_uri': self.client.default_redirect_uri,
'scope': 'openid email',
'state': self.state,
}
response = self._auth_request('get', data, is_user_authenticated=True)
self.assertIn('#error=invalid_request', response['Location'])
def test_idtoken_token_response(self):
"""
Implicit client requesting `id_token token` receives both id token
and access token as the result of the authorization request.
"""
data = {
'client_id': self.client.client_id,
'redirect_uri': self.client.default_redirect_uri,
'response_type': self.client.response_type,
'scope': 'openid email',
'state': self.state,
'nonce': self.nonce,
'allow': 'Accept',
}
response = self._auth_request('post', data, is_user_authenticated=True)
self.assertIn('access_token', response['Location'])
self.assertIn('id_token', response['Location'])
# same for public client
data['client_id'] = self.client_public.client_id,
data['redirect_uri'] = self.client_public.default_redirect_uri,
data['response_type'] = self.client_public.response_type,
response = self._auth_request('post', data, is_user_authenticated=True)
self.assertIn('access_token', response['Location'])
self.assertIn('id_token', response['Location'])
def test_idtoken_response(self):
"""
Implicit client requesting `id_token` receives
only an id token as the result of the authorization request.
"""
data = {
'client_id': self.client_no_access.client_id,
'redirect_uri': self.client_no_access.default_redirect_uri,
'response_type': self.client_no_access.response_type,
'scope': 'openid email',
'state': self.state,
'nonce': self.nonce,
'allow': 'Accept',
}
response = self._auth_request('post', data, is_user_authenticated=True)
self.assertNotIn('access_token', response['Location'])
self.assertIn('id_token', response['Location'])
# same for public client
data['client_id'] = self.client_public_no_access.client_id,
data['redirect_uri'] = self.client_public_no_access.default_redirect_uri,
data['response_type'] = self.client_public_no_access.response_type,
response = self._auth_request('post', data, is_user_authenticated=True)
self.assertNotIn('access_token', response['Location'])
self.assertIn('id_token', response['Location'])
def test_idtoken_token_at_hash(self):
"""
Implicit client requesting `id_token token` receives
`at_hash` in `id_token`.
"""
data = {
'client_id': self.client.client_id,
'redirect_uri': self.client.default_redirect_uri,
'response_type': self.client.response_type,
'scope': 'openid email',
'state': self.state,
'nonce': self.nonce,
'allow': 'Accept',
}
response = self._auth_request('post', data, is_user_authenticated=True)
self.assertIn('id_token', response['Location'])
# obtain `id_token` portion of Location
components = urlsplit(response['Location'])
fragment = parse_qs(components[4])
id_token = JWT().unpack(fragment["id_token"][0].encode('utf-8')).payload()
self.assertIn('at_hash', id_token)
def test_idtoken_at_hash(self):
"""
Implicit client requesting `id_token` should not receive
`at_hash` in `id_token`.
"""
data = {
'client_id': self.client_no_access.client_id,
'redirect_uri': self.client_no_access.default_redirect_uri,
'response_type': self.client_no_access.response_type,
'scope': 'openid email',
'state': self.state,
'nonce': self.nonce,
'allow': 'Accept',
}
response = self._auth_request('post', data, is_user_authenticated=True)
self.assertIn('id_token', response['Location'])
# obtain `id_token` portion of Location
components = urlsplit(response['Location'])
fragment = parse_qs(components[4])
id_token = JWT().unpack(fragment["id_token"][0].encode('utf-8')).payload()
self.assertNotIn('at_hash', id_token)
class AuthorizationHybridFlowTestCase(TestCase, AuthorizeEndpointMixin):
"""
Test cases for Authorization Endpoint using Hybrid Flow.
"""
def setUp(self):
call_command('creatersakey')
self.factory = RequestFactory()
self.user = create_fake_user()
self.client_code_idtoken_token = create_fake_client(response_type='code id_token token', is_public=True)
self.state = uuid.uuid4().hex
self.nonce = uuid.uuid4().hex
# Base data for the auth request.
self.data = {
'client_id': self.client_code_idtoken_token.client_id,
'redirect_uri': self.client_code_idtoken_token.default_redirect_uri,
'response_type': self.client_code_idtoken_token.response_type,
'scope': 'openid email',
'state': self.state,
'nonce': self.nonce,
'allow': 'Accept',
}
def test_code_idtoken_token_response(self):
"""
Implicit client requesting `id_token token` receives both id token
and access token as the result of the authorization request.
"""
response = self._auth_request('post', self.data, is_user_authenticated=True)
self.assertIn('#', response['Location'])
self.assertIn('access_token', response['Location'])
self.assertIn('id_token', response['Location'])
self.assertIn('state', response['Location'])
self.assertIn('code', response['Location'])
# Validate code.
is_code_ok = is_code_valid(url=response['Location'],
user=self.user,
client=self.client_code_idtoken_token)
self.assertEqual(is_code_ok, True, msg='Code returned is invalid.')
@override_settings(OIDC_TOKEN_EXPIRE=36000)
def test_access_token_expiration(self):
"""
Add ten hours of expiration to access_token. Check for the expires_in query in fragment.
"""
response = self._auth_request('post', self.data, is_user_authenticated=True)
self.assertIn('expires_in=36000', response['Location'])
class TestCreateResponseURI(TestCase):
def setUp(self):
url = reverse('oidc_provider:authorize')
user = create_fake_user()
client = create_fake_client(response_type='code', is_public=True)
# Base data to create a uri response
data = {
'client_id': client.client_id,
'redirect_uri': client.default_redirect_uri,
'response_type': client.response_type,
}
factory = RequestFactory()
self.request = factory.post(url, data=data)
self.request.user = user
@patch('oidc_provider.lib.endpoints.authorize.create_code')
@patch('oidc_provider.lib.endpoints.authorize.logger.exception')
def test_create_response_uri_logs_to_error(self, log_exception, create_code):
"""
A lot can go wrong when creating a response uri and this is caught with a general Exception error. The
information contained within this error should show up in the error log so production servers have something
to work with when things don't work as expected.
"""
exception = Exception("Something went wrong!")
create_code.side_effect = exception
authorization_endpoint = AuthorizeEndpoint(self.request)
authorization_endpoint.validate_params()
with self.assertRaises(Exception):
authorization_endpoint.create_response_uri()
log_exception.assert_called_once_with('[Authorize] Error when trying to create response uri: %s', exception)
@override_settings(OIDC_SESSION_MANAGEMENT_ENABLE=True)
def test_create_response_uri_generates_session_state_if_session_management_enabled(self):
# RequestFactory doesn't support sessions, so we mock it
self.request.session = mock.Mock(session_key=None)
authorization_endpoint = AuthorizeEndpoint(self.request)
authorization_endpoint.validate_params()
uri = authorization_endpoint.create_response_uri()
self.assertIn('session_state=', uri)