File size: 9,975 Bytes
ab4488b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import logging
from typing import Any, Optional, cast
from azure.core.exceptions import ClientAuthenticationError
from azure.core.credentials import (
AccessToken,
AccessTokenInfo,
TokenRequestOptions,
SupportsTokenInfo,
TokenCredential,
TokenProvider,
)
from .. import CredentialUnavailableError
from .._internal import within_credential_chain
_LOGGER = logging.getLogger(__name__)
def _get_error_message(history):
attempts = []
for credential, error in history:
if error:
attempts.append("{}: {}".format(credential.__class__.__name__, error))
else:
attempts.append(credential.__class__.__name__)
return """
Attempted credentials:\n\t{}""".format(
"\n\t".join(attempts)
)
class ChainedTokenCredential:
"""A sequence of credentials that is itself a credential.
Its :func:`get_token` method calls ``get_token`` on each credential in the sequence, in order, returning the first
valid token received. For more information, see
https://aka.ms/azsdk/python/identity/credential-chains#chainedtokencredential-overview.
:param credentials: credential instances to form the chain
:type credentials: ~azure.core.credentials.TokenCredential
.. admonition:: Example:
.. literalinclude:: ../samples/credential_creation_code_snippets.py
:start-after: [START create_chained_token_credential]
:end-before: [END create_chained_token_credential]
:language: python
:dedent: 4
:caption: Create a ChainedTokenCredential.
"""
def __init__(self, *credentials: TokenProvider) -> None:
if not credentials:
raise ValueError("at least one credential is required")
self._successful_credential: Optional[TokenProvider] = None
self.credentials = credentials
def __enter__(self) -> "ChainedTokenCredential":
for credential in self.credentials:
credential.__enter__() # type: ignore
return self
def __exit__(self, *args: Any) -> None:
for credential in self.credentials:
credential.__exit__(*args) # type: ignore
def close(self) -> None:
"""Close the transport session of each credential in the chain."""
self.__exit__()
def get_token(
self,
*scopes: str,
claims: Optional[str] = None,
tenant_id: Optional[str] = None,
enable_cae: bool = False,
**kwargs: Any,
) -> AccessToken:
"""Request a token from each chained credential, in order, returning the first token received.
If no credential provides a token, raises :class:`azure.core.exceptions.ClientAuthenticationError`
with an error message from each credential.
This method is called automatically by Azure SDK clients.
:param str scopes: desired scopes for the access token. This method requires at least one scope.
For more information about scopes, see
https://learn.microsoft.com/entra/identity-platform/scopes-oidc.
:keyword str claims: additional claims required in the token, such as those returned in a resource provider's
claims challenge following an authorization failure.
:keyword str tenant_id: optional tenant to include in the token request.
:keyword bool enable_cae: indicates whether to enable Continuous Access Evaluation (CAE) for the requested
token. Defaults to False.
:return: An access token with the desired scopes.
:rtype: ~azure.core.credentials.AccessToken
:raises ~azure.core.exceptions.ClientAuthenticationError: no credential in the chain provided a token
"""
within_credential_chain.set(True)
history = []
for credential in self.credentials:
try:
# Prioritize "get_token". Fall back to "get_token_info" if not available.
if hasattr(credential, "get_token"):
token = cast(TokenCredential, credential).get_token(
*scopes, claims=claims, tenant_id=tenant_id, enable_cae=enable_cae, **kwargs
)
else:
options: TokenRequestOptions = {}
if claims:
options["claims"] = claims
if tenant_id:
options["tenant_id"] = tenant_id
options["enable_cae"] = enable_cae
token_info = cast(SupportsTokenInfo, credential).get_token_info(*scopes, options=options)
token = AccessToken(token_info.token, token_info.expires_on)
_LOGGER.info("%s acquired a token from %s", self.__class__.__name__, credential.__class__.__name__)
self._successful_credential = credential
within_credential_chain.set(False)
return token
except CredentialUnavailableError as ex:
# credential didn't attempt authentication because it lacks required data or state -> continue
history.append((credential, ex.message))
except Exception as ex: # pylint: disable=broad-except
# credential failed to authenticate, or something unexpectedly raised -> break
history.append((credential, str(ex)))
_LOGGER.debug(
'%s.get_token failed: %s raised unexpected error "%s"',
self.__class__.__name__,
credential.__class__.__name__,
ex,
exc_info=True,
)
break
within_credential_chain.set(False)
attempts = _get_error_message(history)
message = (
self.__class__.__name__
+ " failed to retrieve a token from the included credentials."
+ attempts
+ "\nTo mitigate this issue, please refer to the troubleshooting guidelines here at "
"https://aka.ms/azsdk/python/identity/defaultazurecredential/troubleshoot."
)
_LOGGER.warning(message)
raise ClientAuthenticationError(message=message)
def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] = None) -> AccessTokenInfo:
"""Request a token from each chained credential, in order, returning the first token received.
If no credential provides a token, raises :class:`azure.core.exceptions.ClientAuthenticationError`
with an error message from each credential.
This is an alternative to `get_token` to enable certain scenarios that require additional properties
on the token. This method is called automatically by Azure SDK clients.
:param str scopes: desired scopes for the access token. This method requires at least one scope.
For more information about scopes, see https://learn.microsoft.com/entra/identity-platform/scopes-oidc.
:keyword options: A dictionary of options for the token request. Unknown options will be ignored. Optional.
:paramtype options: ~azure.core.credentials.TokenRequestOptions
:rtype: AccessTokenInfo
:return: An AccessTokenInfo instance containing information about the token.
:raises ~azure.core.exceptions.ClientAuthenticationError: no credential in the chain provided a token.
"""
within_credential_chain.set(True)
history = []
options = options or {}
for credential in self.credentials:
try:
# Prioritize "get_token_info". Fall back to "get_token" if not available.
if hasattr(credential, "get_token_info"):
token_info = cast(SupportsTokenInfo, credential).get_token_info(*scopes, options=options)
else:
if options.get("pop"):
raise CredentialUnavailableError(
"Proof of possession arguments are not supported for this credential."
)
token = cast(TokenCredential, credential).get_token(*scopes, **options)
token_info = AccessTokenInfo(token=token.token, expires_on=token.expires_on)
_LOGGER.info("%s acquired a token from %s", self.__class__.__name__, credential.__class__.__name__)
self._successful_credential = credential
within_credential_chain.set(False)
return token_info
except CredentialUnavailableError as ex:
# credential didn't attempt authentication because it lacks required data or state -> continue
history.append((credential, ex.message))
except Exception as ex: # pylint: disable=broad-except
# credential failed to authenticate, or something unexpectedly raised -> break
history.append((credential, str(ex)))
_LOGGER.debug(
'%s.get_token_info failed: %s raised unexpected error "%s"',
self.__class__.__name__,
credential.__class__.__name__,
ex,
exc_info=True,
)
break
within_credential_chain.set(False)
attempts = _get_error_message(history)
message = (
self.__class__.__name__
+ " failed to retrieve a token from the included credentials."
+ attempts
+ "\nTo mitigate this issue, please refer to the troubleshooting guidelines here at "
"https://aka.ms/azsdk/python/identity/defaultazurecredential/troubleshoot."
)
_LOGGER.warning(message)
raise ClientAuthenticationError(message=message)
|