Source code for falcon_authentication.backends

# -*- coding: utf-8 -*-

from __future__ import absolute_import
from __future__ import division

import base64
from datetime import timedelta, datetime
from functools import partial

import falcon

try:
    # This is an optional dependency. To use JWTAuthBackend be sure to add
    # [backend-jwt] to your falcon-authentication requirement.
    # See https://www.python.org/dev/peps/pep-0508/#extras
    import jwt
except ImportError:
    pass

try:
    # This is an optional dependency. To use AuthBackend be sure to add
    # [backend-hawk] to your falcon-authentication requirement.
    # See https://www.python.org/dev/peps/pep-0508/#extras
    import mohawk
except ImportError:
    pass

from falcon_authentication.serializer import ExtendedJSONEncoder


class BackendAuthenticationFailure(falcon.HTTPUnauthorized):
    """
    Raised when the authentication backend fails to authenticate the request.
    """
    def __init__(self, backend, *args, **kwargs):
        super(BackendAuthenticationFailure, self).__init__(*args, **kwargs)
        self.backend = backend


class BackendNotApplicable(BackendAuthenticationFailure):
    """
    Raised when the authentication backend would fail because the request data indicates that a
    different backend should be applied.
    """


class UserNotFound(BackendNotApplicable):
    """
    Raised when the authentication backend would fail because the user record indicated by
    the credentials could not be found.
    """


class AuthBackend(object):
    """
    Base Class for all authentication backends. If successfully authenticated must
    return the authenticated `user` object. In case authorization header is
    not set properly or there is a credential mismatch, results in an
    `BackendAuthenticationFailure` exception with proper description of the issue.

    Args:
        user_loader(function, required): A callback function that is called with the
            falcon req, resp and resource objects as well as any relevant data from
            the extracted from the `Authorization` header. It should return the user
            object identified from the `Authorization` header, or `None` if the user
            is not found. The arguments passed to `user_loader` will vary depending
            on the AuthBackend type.
    """

    def __init__(self, user_loader):
        self.user_loader = user_loader

    def load_user(self, req, resp, resource, *args, **kwargs):
        """
        Invoke the provided `user_loader()` function to allow the app to retrieve
        the user record. If no such record is found, raise a `BackendNotApplicable`
        exception to indicate that another AuthBackend may be more viable.

        This is to account for cases where an application may require the use of
        multiple AuthBackends of the same type and the users may be namespaced
        somehow. We wouldn't want to preclude another AuthBackend of the same type
        from attempting authentication.
        """
        user = self.user_loader(req, resp, resource, *args, **kwargs)
        if not user:
            # We raise the less severe "not applicable" error here to allow other
            # AuthBackends a shot (if any). It will still result in a 401 if no
            # other backend can authenticate the user.
            raise UserNotFound(
                backend=self,
                description='User not found for provided credentials')
        return user

    def parse_auth_token_from_request(self, auth_header):
        """
        Parses and returns Auth token from the request header. Raises
        `BackendNotApplicable` exception with proper error message
        """
        if not auth_header:
            raise BackendNotApplicable(
                backend=self,
                description='Missing Authorization Header')

        parts = auth_header.split()

        if parts[0].lower() != self.auth_header_prefix.lower():
            raise BackendNotApplicable(
                backend=self,
                description='Invalid Authorization Header: '
                            'Must start with {0}'.format(self.auth_header_prefix))

        elif len(parts) == 1:
            raise BackendNotApplicable(
                backend=self,
                description='Invalid Authorization Header: Token Missing')
        elif len(parts) > 2:
            raise BackendNotApplicable(
                backend=self,
                description='Invalid Authorization Header: Contains extra content')

        return parts[1]

    def authenticate(self, req, resp, resource):
        """
        Authenticate the request and return the authenticated user. Must raise an
        a `BackendAuthenticationFailure` exception if authentication fails. It is
        preferred that it raise an `BackendNotApplicable` exception if it's determined
        that the provided credentials cannot be handled by this backend.
        """
        raise NotImplementedError(".authenticate() must be overridden.")

    def get_auth_token(self, user_payload):
        """
        Returns a authentication token created using the provided user details

        Args:
            user_payload(dict, required): A `dict` containing required information
                to create authentication token
        """
        raise NotImplementedError("Must be overridden")

    def get_auth_header(self, user_payload):
        """
        Returns the value for authorization header
        Args:
            user_payload(dict, required): A `dict` containing required information
                to create authentication token
        """
        auth_token = self.get_auth_token(user_payload)
        return '{auth_header_prefix} {auth_token}'.format(
            auth_header_prefix=self.auth_header_prefix, auth_token=auth_token
        )


[docs]class NoneAuthBackend(AuthBackend): """ Dummy authentication backend. This backend does not perform any authentication check. It can be used with the MultiAuthBackend in order to provide a fallback for an unauthenticated user. Args: user_loader(function, required): A callback function that is called without any arguments and returns an `unauthenticated user`. """
[docs] def authenticate(self, req, resp, resource): return { 'user': self.load_user(req, resp, resource), }
[docs]class BasicAuthBackend(AuthBackend): """ Implements `HTTP Basic Authentication <http://tools.ietf.org/html/rfc2617>`__ Clients should authenticate by passing the `base64` encoded credentials `username:password` in the `Authorization` HTTP header, prepended with the string specified in the setting `auth_header_prefix`. For example: Authorization: BASIC ZGZkZmY6ZGZkZ2RkZg== Args: user_loader(function, required): A callback function that is called with the user credentials (username and password) extracted from the `Authorization` header. Returns an `authenticated user` if user exists matching the credentials or return `None` to indicate if no user found or credentials mismatch. auth_header_prefix(string, optional): A prefix that is used with the bases64 encoded credentials in the `Authorization` header. Default is ``basic`` """ def __init__(self, user_loader, auth_header_prefix='Basic'): super(BasicAuthBackend, self).__init__(user_loader) self.auth_header_prefix = auth_header_prefix def _extract_credentials(self, req): auth = req.get_header('Authorization') token = self.parse_auth_token_from_request(auth_header=auth) try: token = base64.b64decode(token).decode('utf-8') except Exception: raise BackendNotApplicable( backend=self, description='Invalid Authorization Header: Unable to decode credentials') try: username, password = token.split(':', 1) except ValueError: raise BackendNotApplicable( backend=self, description='Invalid Authorization: Unable to decode credentials') return username, password
[docs] def authenticate(self, req, resp, resource): """ Extract basic auth token from request `authorization` header, deocode the token, verifies the username/password and return either a ``user`` object if successful else raise an `BackendAuthenticationFailure` exception """ username, password = self._extract_credentials(req) return { 'user': self.load_user(req, resp, resource, username, password), }
[docs] def get_auth_token(self, user_payload): """ Extracts username, password from the `user_payload` and encode the credentials `username:password` in `base64` form """ username = user_payload.get('username') or None password = user_payload.get('password') or None if not username or not password: raise ValueError('`user_payload` must contain both username and password') token = '{username}:{password}'.format( username=username, password=password).encode('utf-8') token_b64 = base64.b64encode(token).decode('utf-8', 'ignore') return '{auth_header_prefix} {token_b64}'.format( auth_header_prefix=self.auth_header_prefix, token_b64=token_b64)
[docs]class TokenAuthBackend(BasicAuthBackend): """ Implements Simple Token Based Authentication. Clients should authenticate by passing the token key in the "Authorization" HTTP header, prepended with the string "Token ". For example: Authorization: Token 401f7ac837da42b97f613d789819ff93537bee6a Args: user_loader(function, required): A callback function that is called with the token extracted from the `Authorization` header. Returns an `authenticated user` if user exists matching the credentials or return `None` to indicate if no user found or credentials mismatch. auth_header_prefix(string, optional): A prefix that is used with the token in the `Authorization` header. Default is ``basic`` """ def __init__(self, user_loader, auth_header_prefix='Token'): super(TokenAuthBackend, self).__init__(user_loader, auth_header_prefix) def _extract_credentials(self, req): auth = req.get_header('Authorization') return self.parse_auth_token_from_request(auth_header=auth)
[docs] def authenticate(self, req, resp, resource): token = self._extract_credentials(req) return { 'user': self.load_user(req, resp, resource, token), }
[docs] def get_auth_token(self, user_payload): """ Extracts token from the `user_payload` """ token = user_payload.get('token') or None if not token: raise ValueError('`user_payload` must provide api token') return '{auth_header_prefix} {token}'.format( auth_header_prefix=self.auth_header_prefix, token=token)
[docs]class JWTAuthBackend(AuthBackend): """ Token based authentication using the `JSON Web Token standard <https://jwt.io/introduction/>`__ Clients should authenticate by passing the token key in the `Authorization` HTTP header, prepended with the string specified in the setting `auth_header_prefix`. For example: Authorization: JWT eyJhbGciOiAiSFMyNTYiLCAidHlwIj Args: user_loader(function, required): A callback function that is called with the decoded `jwt payload` extracted from the `Authorization` header. Returns an `authenticated user` if user exists matching the credentials or return `None` to indicate if no user found or credentials mismatch. secrey_key(string, required): A secure key that was used to encode and create the `jwt token` from a dictionary payload algorithm(string, optional): Specifies the algorithm that was used to for cryptographic signing. Default is ``HS256`` which stands for HMAC using SHA-256 hash algorithm. Other supported algorithms can be found `here <http://pyjwt.readthedocs.io/en/latest/algorithms.html>`__ auth_header_prefix(string, optional): A prefix that is used with the bases64 encoded credentials in the `Authorization` header. Default is ``jwt`` leeway(int, optional): Specifies the timedelta in seconds that is allowed as leeway while validating `expiration time` / `nbf(not before) claim` /`iat (issued at) claim` which is in past but not very far. For example, if you have a JWT payload with an expiration time set to 30 seconds after creation but you know that sometimes you will process it after 30 seconds, you can set a leeway of 10 seconds in order to have some margin. Default is ``0 seconds`` expiration_delta(int, optional): Specifies the timedelta in seconds that will be added to current time to set the expiration for the token. Default is ``1 day(24 * 60 * 60 seconds)`` audience(string, optional): Specifies the string that will be specified as value of ``aud`` field in the jwt payload. It will also be checked agains the ``aud`` field while decoding. issuer(string, optional): Specifies the string that will be specified as value of ``iss`` field in the jwt payload. It will also be checked agains the ``iss`` field while decoding. """ def __init__(self, user_loader, secret_key, algorithm='HS256', auth_header_prefix='jwt', leeway=0, expiration_delta=24 * 60 * 60, audience=None, issuer=None, verify_claims=None, required_claims=None): try: jwt except NameError: raise ImportError('Optional dependency falcon-authentication[backend-jwt] not installed') super(JWTAuthBackend, self).__init__(user_loader) self.secret_key = secret_key self.algorithm = algorithm self.leeway = timedelta(seconds=leeway) self.auth_header_prefix = auth_header_prefix self.expiration_delta = timedelta(seconds=expiration_delta) self.audience = audience self.issuer = issuer self.verify_claims = verify_claims or ['signature', 'exp', 'nbf', 'iat'] self.required_claims = required_claims or ['exp', 'iat', 'nbf'] if 'aud' in self.verify_claims and not audience: raise ValueError('Audience parameter must be provided if ' '`aud` claim needs to be verified') if 'iss' in self.verify_claims and not issuer: raise ValueError('Issuer parameter must be provided if ' '`iss` claim needs to be verified') def _decode_jwt_token(self, req): # Decodes the jwt token into a payload token = self.parse_auth_token_from_request(auth_header=req.get_header('Authorization')) options = dict(('verify_' + claim, True) for claim in self.verify_claims) options.update( dict(('require_' + claim, True) for claim in self.required_claims) ) try: payload = jwt.decode(jwt=token, key=self.secret_key, options=options, algorithms=[self.algorithm], issuer=self.issuer, audience=self.audience, leeway=self.leeway) except jwt.InvalidTokenError as ex: raise BackendAuthenticationFailure( backend=self, description=str(ex)) return payload
[docs] def authenticate(self, req, resp, resource): """ Extract auth token from request `authorization` header, decode jwt token, verify configured claims and return either a ``user`` object if successful else raise an `BackendAuthenticationFailure` exception. """ payload = self._decode_jwt_token(req) return { 'user': self.load_user(req, resp, resource, payload), }
[docs] def get_auth_token(self, user_payload): """ Create a JWT authentication token from ``user_payload`` Args: user_payload(dict, required): A `dict` containing required information to create authentication token """ now = datetime.utcnow() payload = { 'user': user_payload } if 'iat' in self.verify_claims: payload['iat'] = now if 'nbf' in self.verify_claims: payload['nbf'] = now + self.leeway if 'exp' in self.verify_claims: payload['exp'] = now + self.expiration_delta if self.audience is not None: payload['aud'] = self.audience if self.issuer is not None: payload['iss'] = self.issuer return jwt.encode( payload, self.secret_key, algorithm=self.algorithm, json_encoder=ExtendedJSONEncoder).decode('utf-8')
[docs]class HawkAuthBackend(AuthBackend): """ Holder-Of-Key Authentication Scheme defined by `Hawk <https://github.com/hueniverse/hawk>`__ Clients should authenticate by passing a Hawk-formatted header as the `Authorization` HTTP header. For example: Authorization: Hawk id="dh37fgj492je", ts="1353832234", nonce="j4h3g2", ext="some-app-ext-data", mac="6R4rV5iE+NPoym+WwjeHzjAGXUtLNIxmo1vpMofpLAE=" Args: user_loader(function, required): A callback function that is called with the `id` value extracted from the `Hawk` header. Returns an `authenticated user` if the user matching the credentials exists or returns `None` to indicate if no user was found. receiver_kwargs(dict, optional): A dictionary of arguments to be passed through to the Receiver. One must provide the `credentials_map` function for the purposes of looking up a user's credentials from their user id (the same value passed to `user_loader()`). See the `docs <https://mohawk.readthedocs.io/en/latest/usage.html#receiving-a-request>`__ for further details. """ def __init__(self, user_loader, credentials_loader, receiver_kwargs=None): try: mohawk except NameError: raise ImportError('Optional dependency falcon-authentication[backend-hawk] not installed') super(HawkAuthBackend, self).__init__(user_loader) self.auth_header_prefix = 'Hawk' self.load_credentials = credentials_loader self.receiver_kwargs = receiver_kwargs or {}
[docs] def parse_auth_token_from_request(self, auth_header): """ Parses and returns the Hawk Authorization header if it is present and well-formed. Raises `BackendNotApplicable` exception with proper error message """ if not auth_header: raise BackendNotApplicable( backend=self, description='Missing Authorization Header') try: auth_header_prefix, _ = auth_header.split(' ', 1) except ValueError: raise BackendNotApplicable( backend=self, description='Invalid Authorization Header: Missing Scheme or Parameters') if auth_header_prefix.lower() != self.auth_header_prefix.lower(): raise BackendNotApplicable( backend=self, description='Invalid Authorization Header: ' 'Must start with {0}'.format(self.auth_header_prefix)) return auth_header
[docs] def credentials_map(self, req, resp, resource, user_id): """ Look up the user from the application and allow the application to extract/generate Hawk credentials from the user object. It then drops the user object into the credentials map as a way of memoizing this object for fast retrieval once authentication succeeds (we want to avoid another round trip to the backing datastore to get the user again). """ user = self.load_user(req, resp, resource, user_id) credentials = self.load_credentials(req, resp, resource, user) credentials['user'] = user return credentials
[docs] def authenticate(self, req, resp, resource): request_header = self.parse_auth_token_from_request(req.get_header('Authorization')) try: # Validate the Authorization header contents and lookup the user's credentials # via the provided `credentials_map` function. receiver = mohawk.Receiver( credentials_map=partial(self.credentials_map, req, resp, resource), request_header=request_header, method=req.method, url=req.forwarded_uri, content=req.context.get('body'), content_type=req.get_header('Content-Type'), **self.receiver_kwargs) except mohawk.exc.HawkFail as ex: raise BackendAuthenticationFailure( backend=self, description='{0}({1!s})'.format(ex.__class__.__name__, ex), challenges=( [getattr(ex, 'www_authenticate')] if hasattr(ex, 'www_authenticate') else [])) # The authentication was successful, return the previously retrieved user now return { 'user': receiver.resource.credentials['user'], 'receiver': receiver, }
[docs]class MultiAuthBackend(AuthBackend): """ A backend which takes two or more ``AuthBackend`` as inputs and successfully authenticates if any of them succeeds else raises a `BackendNotApplicable` exception. Args: backends(list[AuthBackend], required): A list of `AuthBackend` to be used in order to authenticate the user. early_exit(bool, optional): If early_exit is True, the iteration through the list of backends will stop upon the first non-`BackendNotApplicable` `BackendAuthenticationFailure` exception it encounters. Otherwise, it will treate all `falcon.HTTPUnauthorized` exceptions the same: just move on to the next backend in the list. Default is False. """ def __init__(self, *backends, **kwargs): if len(backends) <= 1: raise ValueError('Invalid authentication backend. Must pass more than one backend') for backend in backends: if not isinstance(backend, AuthBackend): raise ValueError( ( 'Invalid authentication backend {0}.' ' Must inherit falcon.auth.backends.AuthBackend' ) .format(backend.__class__.__name__) ) self.backends = backends # Examine kwargs since python 2.7 doesn't support keyword args after *args self.early_exit = kwargs.get('early_exit', False) @staticmethod def _append_challenges(challenges, exception): """ Extract any WWW-Authenticate headers from `exception` and append them to `challenges` """ assert isinstance(exception, falcon.HTTPUnauthorized) www_authenticate = exception.headers.get('WWW-Authenticate') if www_authenticate: challenges.append(www_authenticate)
[docs] def authenticate(self, req, resp, resource): challenges = [] for backend in self.backends: try: results = backend.authenticate(req, resp, resource) # Use setdefault() here to accomodate nested MultiAuthBackends, # though it's unclear when that strategy would be advisable. results.setdefault('backend', backend) return results except BackendNotApplicable as ex: # This backend didn't understand the Authorization header # (or other attributes of the request) or could not find the user # indicated by the credentials provided on the request and so # declined to handle authentication. If no other backend attempts # to authenticate the request and fails with a challenge, we # collect any challenges presented by these skipped backends and # send them back on the 401 response as the WWW-Authenticate header. self._append_challenges(challenges, ex) except BackendAuthenticationFailure as ex: if self.early_exit: # We're operating in the more strict (and thus more optimized) # mode. End authentication now, since we believe no other # backend will apply. raise self._append_challenges(challenges, ex) except falcon.HTTPUnauthorized as ex: # Unspeciallized falcon.HTTPUnauthorized will always allow iteration # to continue. self._append_challenges(challenges, ex) # Any other exceptions will cause authentication (and possibly # the entire request) to fail, most likely with a 5XX rather than # 401. else: raise BackendNotApplicable( backend=self, description='Authentication Failed', challenges=challenges)
[docs] def get_auth_token(self, user_payload): for backend in self.backends: try: return backend.get_auth_token(user_payload) except Exception: pass return None