File size: 9,975 Bytes
ab4488b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import logging
from typing import Any, Optional, cast
from azure.core.exceptions import ClientAuthenticationError

from azure.core.credentials import (
    AccessToken,
    AccessTokenInfo,
    TokenRequestOptions,
    SupportsTokenInfo,
    TokenCredential,
    TokenProvider,
)
from .. import CredentialUnavailableError
from .._internal import within_credential_chain

_LOGGER = logging.getLogger(__name__)


def _get_error_message(history):
    attempts = []
    for credential, error in history:
        if error:
            attempts.append("{}: {}".format(credential.__class__.__name__, error))
        else:
            attempts.append(credential.__class__.__name__)
    return """
Attempted credentials:\n\t{}""".format(
        "\n\t".join(attempts)
    )


class ChainedTokenCredential:
    """A sequence of credentials that is itself a credential.

    Its :func:`get_token` method calls ``get_token`` on each credential in the sequence, in order, returning the first
    valid token received. For more information, see
    https://aka.ms/azsdk/python/identity/credential-chains#chainedtokencredential-overview.

    :param credentials: credential instances to form the chain
    :type credentials: ~azure.core.credentials.TokenCredential

    .. admonition:: Example:

        .. literalinclude:: ../samples/credential_creation_code_snippets.py
            :start-after: [START create_chained_token_credential]
            :end-before: [END create_chained_token_credential]
            :language: python
            :dedent: 4
            :caption: Create a ChainedTokenCredential.
    """

    def __init__(self, *credentials: TokenProvider) -> None:
        if not credentials:
            raise ValueError("at least one credential is required")

        self._successful_credential: Optional[TokenProvider] = None
        self.credentials = credentials

    def __enter__(self) -> "ChainedTokenCredential":
        for credential in self.credentials:
            credential.__enter__()  # type: ignore
        return self

    def __exit__(self, *args: Any) -> None:
        for credential in self.credentials:
            credential.__exit__(*args)  # type: ignore

    def close(self) -> None:
        """Close the transport session of each credential in the chain."""
        self.__exit__()

    def get_token(
        self,
        *scopes: str,
        claims: Optional[str] = None,
        tenant_id: Optional[str] = None,
        enable_cae: bool = False,
        **kwargs: Any,
    ) -> AccessToken:
        """Request a token from each chained credential, in order, returning the first token received.

        If no credential provides a token, raises :class:`azure.core.exceptions.ClientAuthenticationError`
        with an error message from each credential.

        This method is called automatically by Azure SDK clients.

        :param str scopes: desired scopes for the access token. This method requires at least one scope.
            For more information about scopes, see
            https://learn.microsoft.com/entra/identity-platform/scopes-oidc.
        :keyword str claims: additional claims required in the token, such as those returned in a resource provider's
            claims challenge following an authorization failure.
        :keyword str tenant_id: optional tenant to include in the token request.
        :keyword bool enable_cae: indicates whether to enable Continuous Access Evaluation (CAE) for the requested
            token. Defaults to False.

        :return: An access token with the desired scopes.
        :rtype: ~azure.core.credentials.AccessToken
        :raises ~azure.core.exceptions.ClientAuthenticationError: no credential in the chain provided a token
        """

        within_credential_chain.set(True)
        history = []
        for credential in self.credentials:
            try:
                # Prioritize "get_token". Fall back to "get_token_info" if not available.
                if hasattr(credential, "get_token"):
                    token = cast(TokenCredential, credential).get_token(
                        *scopes, claims=claims, tenant_id=tenant_id, enable_cae=enable_cae, **kwargs
                    )
                else:
                    options: TokenRequestOptions = {}
                    if claims:
                        options["claims"] = claims
                    if tenant_id:
                        options["tenant_id"] = tenant_id
                    options["enable_cae"] = enable_cae
                    token_info = cast(SupportsTokenInfo, credential).get_token_info(*scopes, options=options)
                    token = AccessToken(token_info.token, token_info.expires_on)

                _LOGGER.info("%s acquired a token from %s", self.__class__.__name__, credential.__class__.__name__)
                self._successful_credential = credential
                within_credential_chain.set(False)
                return token

            except CredentialUnavailableError as ex:
                # credential didn't attempt authentication because it lacks required data or state -> continue
                history.append((credential, ex.message))
            except Exception as ex:  # pylint: disable=broad-except
                # credential failed to authenticate, or something unexpectedly raised -> break
                history.append((credential, str(ex)))
                _LOGGER.debug(
                    '%s.get_token failed: %s raised unexpected error "%s"',
                    self.__class__.__name__,
                    credential.__class__.__name__,
                    ex,
                    exc_info=True,
                )
                break
        within_credential_chain.set(False)
        attempts = _get_error_message(history)
        message = (
            self.__class__.__name__
            + " failed to retrieve a token from the included credentials."
            + attempts
            + "\nTo mitigate this issue, please refer to the troubleshooting guidelines here at "
            "https://aka.ms/azsdk/python/identity/defaultazurecredential/troubleshoot."
        )
        _LOGGER.warning(message)
        raise ClientAuthenticationError(message=message)

    def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] = None) -> AccessTokenInfo:
        """Request a token from each chained credential, in order, returning the first token received.

        If no credential provides a token, raises :class:`azure.core.exceptions.ClientAuthenticationError`
        with an error message from each credential.

        This is an alternative to `get_token` to enable certain scenarios that require additional properties
        on the token. This method is called automatically by Azure SDK clients.

        :param str scopes: desired scopes for the access token. This method requires at least one scope.
            For more information about scopes, see https://learn.microsoft.com/entra/identity-platform/scopes-oidc.
        :keyword options: A dictionary of options for the token request. Unknown options will be ignored. Optional.
        :paramtype options: ~azure.core.credentials.TokenRequestOptions

        :rtype: AccessTokenInfo
        :return: An AccessTokenInfo instance containing information about the token.

        :raises ~azure.core.exceptions.ClientAuthenticationError: no credential in the chain provided a token.
        """
        within_credential_chain.set(True)
        history = []
        options = options or {}
        for credential in self.credentials:
            try:
                # Prioritize "get_token_info". Fall back to "get_token" if not available.
                if hasattr(credential, "get_token_info"):
                    token_info = cast(SupportsTokenInfo, credential).get_token_info(*scopes, options=options)
                else:
                    if options.get("pop"):
                        raise CredentialUnavailableError(
                            "Proof of possession arguments are not supported for this credential."
                        )
                    token = cast(TokenCredential, credential).get_token(*scopes, **options)
                    token_info = AccessTokenInfo(token=token.token, expires_on=token.expires_on)

                _LOGGER.info("%s acquired a token from %s", self.__class__.__name__, credential.__class__.__name__)
                self._successful_credential = credential
                within_credential_chain.set(False)
                return token_info
            except CredentialUnavailableError as ex:
                # credential didn't attempt authentication because it lacks required data or state -> continue
                history.append((credential, ex.message))
            except Exception as ex:  # pylint: disable=broad-except
                # credential failed to authenticate, or something unexpectedly raised -> break
                history.append((credential, str(ex)))
                _LOGGER.debug(
                    '%s.get_token_info failed: %s raised unexpected error "%s"',
                    self.__class__.__name__,
                    credential.__class__.__name__,
                    ex,
                    exc_info=True,
                )
                break

        within_credential_chain.set(False)
        attempts = _get_error_message(history)
        message = (
            self.__class__.__name__
            + " failed to retrieve a token from the included credentials."
            + attempts
            + "\nTo mitigate this issue, please refer to the troubleshooting guidelines here at "
            "https://aka.ms/azsdk/python/identity/defaultazurecredential/troubleshoot."
        )
        _LOGGER.warning(message)
        raise ClientAuthenticationError(message=message)