GraphRag
/
graphrag-ollama
/lib
/python3.12
/site-packages
/azure
/identity
/_credentials
/chained.py
# ------------------------------------ | |
# Copyright (c) Microsoft Corporation. | |
# Licensed under the MIT License. | |
# ------------------------------------ | |
import logging | |
from typing import Any, Optional, cast | |
from azure.core.exceptions import ClientAuthenticationError | |
from azure.core.credentials import ( | |
AccessToken, | |
AccessTokenInfo, | |
TokenRequestOptions, | |
SupportsTokenInfo, | |
TokenCredential, | |
TokenProvider, | |
) | |
from .. import CredentialUnavailableError | |
from .._internal import within_credential_chain | |
_LOGGER = logging.getLogger(__name__) | |
def _get_error_message(history): | |
attempts = [] | |
for credential, error in history: | |
if error: | |
attempts.append("{}: {}".format(credential.__class__.__name__, error)) | |
else: | |
attempts.append(credential.__class__.__name__) | |
return """ | |
Attempted credentials:\n\t{}""".format( | |
"\n\t".join(attempts) | |
) | |
class ChainedTokenCredential: | |
"""A sequence of credentials that is itself a credential. | |
Its :func:`get_token` method calls ``get_token`` on each credential in the sequence, in order, returning the first | |
valid token received. For more information, see | |
https://aka.ms/azsdk/python/identity/credential-chains#chainedtokencredential-overview. | |
:param credentials: credential instances to form the chain | |
:type credentials: ~azure.core.credentials.TokenCredential | |
.. admonition:: Example: | |
.. literalinclude:: ../samples/credential_creation_code_snippets.py | |
:start-after: [START create_chained_token_credential] | |
:end-before: [END create_chained_token_credential] | |
:language: python | |
:dedent: 4 | |
:caption: Create a ChainedTokenCredential. | |
""" | |
def __init__(self, *credentials: TokenProvider) -> None: | |
if not credentials: | |
raise ValueError("at least one credential is required") | |
self._successful_credential: Optional[TokenProvider] = None | |
self.credentials = credentials | |
def __enter__(self) -> "ChainedTokenCredential": | |
for credential in self.credentials: | |
credential.__enter__() # type: ignore | |
return self | |
def __exit__(self, *args: Any) -> None: | |
for credential in self.credentials: | |
credential.__exit__(*args) # type: ignore | |
def close(self) -> None: | |
"""Close the transport session of each credential in the chain.""" | |
self.__exit__() | |
def get_token( | |
self, | |
*scopes: str, | |
claims: Optional[str] = None, | |
tenant_id: Optional[str] = None, | |
enable_cae: bool = False, | |
**kwargs: Any, | |
) -> AccessToken: | |
"""Request a token from each chained credential, in order, returning the first token received. | |
If no credential provides a token, raises :class:`azure.core.exceptions.ClientAuthenticationError` | |
with an error message from each credential. | |
This method is called automatically by Azure SDK clients. | |
:param str scopes: desired scopes for the access token. This method requires at least one scope. | |
For more information about scopes, see | |
https://learn.microsoft.com/entra/identity-platform/scopes-oidc. | |
:keyword str claims: additional claims required in the token, such as those returned in a resource provider's | |
claims challenge following an authorization failure. | |
:keyword str tenant_id: optional tenant to include in the token request. | |
:keyword bool enable_cae: indicates whether to enable Continuous Access Evaluation (CAE) for the requested | |
token. Defaults to False. | |
:return: An access token with the desired scopes. | |
:rtype: ~azure.core.credentials.AccessToken | |
:raises ~azure.core.exceptions.ClientAuthenticationError: no credential in the chain provided a token | |
""" | |
within_credential_chain.set(True) | |
history = [] | |
for credential in self.credentials: | |
try: | |
# Prioritize "get_token". Fall back to "get_token_info" if not available. | |
if hasattr(credential, "get_token"): | |
token = cast(TokenCredential, credential).get_token( | |
*scopes, claims=claims, tenant_id=tenant_id, enable_cae=enable_cae, **kwargs | |
) | |
else: | |
options: TokenRequestOptions = {} | |
if claims: | |
options["claims"] = claims | |
if tenant_id: | |
options["tenant_id"] = tenant_id | |
options["enable_cae"] = enable_cae | |
token_info = cast(SupportsTokenInfo, credential).get_token_info(*scopes, options=options) | |
token = AccessToken(token_info.token, token_info.expires_on) | |
_LOGGER.info("%s acquired a token from %s", self.__class__.__name__, credential.__class__.__name__) | |
self._successful_credential = credential | |
within_credential_chain.set(False) | |
return token | |
except CredentialUnavailableError as ex: | |
# credential didn't attempt authentication because it lacks required data or state -> continue | |
history.append((credential, ex.message)) | |
except Exception as ex: # pylint: disable=broad-except | |
# credential failed to authenticate, or something unexpectedly raised -> break | |
history.append((credential, str(ex))) | |
_LOGGER.debug( | |
'%s.get_token failed: %s raised unexpected error "%s"', | |
self.__class__.__name__, | |
credential.__class__.__name__, | |
ex, | |
exc_info=True, | |
) | |
break | |
within_credential_chain.set(False) | |
attempts = _get_error_message(history) | |
message = ( | |
self.__class__.__name__ | |
+ " failed to retrieve a token from the included credentials." | |
+ attempts | |
+ "\nTo mitigate this issue, please refer to the troubleshooting guidelines here at " | |
"https://aka.ms/azsdk/python/identity/defaultazurecredential/troubleshoot." | |
) | |
_LOGGER.warning(message) | |
raise ClientAuthenticationError(message=message) | |
def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] = None) -> AccessTokenInfo: | |
"""Request a token from each chained credential, in order, returning the first token received. | |
If no credential provides a token, raises :class:`azure.core.exceptions.ClientAuthenticationError` | |
with an error message from each credential. | |
This is an alternative to `get_token` to enable certain scenarios that require additional properties | |
on the token. This method is called automatically by Azure SDK clients. | |
:param str scopes: desired scopes for the access token. This method requires at least one scope. | |
For more information about scopes, see https://learn.microsoft.com/entra/identity-platform/scopes-oidc. | |
:keyword options: A dictionary of options for the token request. Unknown options will be ignored. Optional. | |
:paramtype options: ~azure.core.credentials.TokenRequestOptions | |
:rtype: AccessTokenInfo | |
:return: An AccessTokenInfo instance containing information about the token. | |
:raises ~azure.core.exceptions.ClientAuthenticationError: no credential in the chain provided a token. | |
""" | |
within_credential_chain.set(True) | |
history = [] | |
options = options or {} | |
for credential in self.credentials: | |
try: | |
# Prioritize "get_token_info". Fall back to "get_token" if not available. | |
if hasattr(credential, "get_token_info"): | |
token_info = cast(SupportsTokenInfo, credential).get_token_info(*scopes, options=options) | |
else: | |
if options.get("pop"): | |
raise CredentialUnavailableError( | |
"Proof of possession arguments are not supported for this credential." | |
) | |
token = cast(TokenCredential, credential).get_token(*scopes, **options) | |
token_info = AccessTokenInfo(token=token.token, expires_on=token.expires_on) | |
_LOGGER.info("%s acquired a token from %s", self.__class__.__name__, credential.__class__.__name__) | |
self._successful_credential = credential | |
within_credential_chain.set(False) | |
return token_info | |
except CredentialUnavailableError as ex: | |
# credential didn't attempt authentication because it lacks required data or state -> continue | |
history.append((credential, ex.message)) | |
except Exception as ex: # pylint: disable=broad-except | |
# credential failed to authenticate, or something unexpectedly raised -> break | |
history.append((credential, str(ex))) | |
_LOGGER.debug( | |
'%s.get_token_info failed: %s raised unexpected error "%s"', | |
self.__class__.__name__, | |
credential.__class__.__name__, | |
ex, | |
exc_info=True, | |
) | |
break | |
within_credential_chain.set(False) | |
attempts = _get_error_message(history) | |
message = ( | |
self.__class__.__name__ | |
+ " failed to retrieve a token from the included credentials." | |
+ attempts | |
+ "\nTo mitigate this issue, please refer to the troubleshooting guidelines here at " | |
"https://aka.ms/azsdk/python/identity/defaultazurecredential/troubleshoot." | |
) | |
_LOGGER.warning(message) | |
raise ClientAuthenticationError(message=message) | |