File size: 10,986 Bytes
ab4488b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import abc
import os
import sys
from typing import cast, Any, Dict, Optional

from azure.core.credentials import AccessToken, TokenRequestOptions, AccessTokenInfo
from azure.core.exceptions import ClientAuthenticationError
from .._exceptions import CredentialUnavailableError
from .._constants import AzureAuthorityHosts, AZURE_VSCODE_CLIENT_ID, EnvironmentVariables
from .._internal import normalize_authority, validate_tenant_id, within_dac
from .._internal.aad_client import AadClient, AadClientBase
from .._internal.get_token_mixin import GetTokenMixin
from .._internal.decorators import log_get_token

if sys.platform.startswith("win"):
    from .._internal.win_vscode_adapter import get_refresh_token, get_user_settings
elif sys.platform.startswith("darwin"):
    from .._internal.macos_vscode_adapter import get_refresh_token, get_user_settings
else:
    from .._internal.linux_vscode_adapter import get_refresh_token, get_user_settings


class _VSCodeCredentialBase(abc.ABC):
    def __init__(self, **kwargs: Any) -> None:
        super(_VSCodeCredentialBase, self).__init__()

        user_settings = get_user_settings()
        self._cloud = user_settings.get("azure.cloud", "AzureCloud")
        self._refresh_token = None
        self._unavailable_reason = ""

        self._client = kwargs.get("_client")
        if not self._client:
            self._initialize(user_settings, **kwargs)
        if not (self._client or self._unavailable_reason):
            self._unavailable_reason = "Initialization failed"

    @abc.abstractmethod
    def _get_client(self, **kwargs: Any) -> AadClientBase:
        pass

    def _get_refresh_token(self) -> str:
        if not self._refresh_token:
            self._refresh_token = get_refresh_token(self._cloud)
            if not self._refresh_token:
                message = (
                    "Failed to get Azure user details from Visual Studio Code. "
                    "Currently, the VisualStudioCodeCredential only works with the Azure "
                    "Account extension version 0.9.11 and earlier. A long-term fix is in "
                    "progress, see https://github.com/Azure/azure-sdk-for-python/issues/25713"
                )
                raise CredentialUnavailableError(message=message)
        return self._refresh_token

    def _initialize(self, vscode_user_settings: Dict, **kwargs: Any) -> None:
        """Build a client from kwargs merged with VS Code user settings.

        The first stable version of this credential defaulted to Public Cloud and the "organizations"
        tenant when it failed to read VS Code user settings. That behavior is preserved here.

        :param dict vscode_user_settings: VS Code user settings
        """

        # Precedence for authority:
        #  1) VisualStudioCodeCredential(authority=...)
        #  2) $AZURE_AUTHORITY_HOST
        #  3) authority matching VS Code's "azure.cloud" setting
        #  4) default: Public Cloud
        authority = kwargs.pop("authority", None) or os.environ.get(EnvironmentVariables.AZURE_AUTHORITY_HOST)
        if not authority:
            # the application didn't specify an authority, so we figure it out from VS Code settings
            if self._cloud == "AzureCloud":
                authority = AzureAuthorityHosts.AZURE_PUBLIC_CLOUD
            elif self._cloud == "AzureChinaCloud":
                authority = AzureAuthorityHosts.AZURE_CHINA
            elif self._cloud == "AzureUSGovernment":
                authority = AzureAuthorityHosts.AZURE_GOVERNMENT
            else:
                # If the value is anything else ("AzureCustomCloud" is the only other known value),
                # we need the user to provide the authority because VS Code has no setting for it and
                # we can't guess confidently.
                self._unavailable_reason = (
                    'VS Code is configured to use a custom cloud. Set keyword argument "authority"'
                    + ' with the Microsoft Entra endpoint for cloud "{}"'.format(self._cloud)
                )
                return

        # Precedence for tenant ID:
        #  1) VisualStudioCodeCredential(tenant_id=...)
        #  2) "azure.tenant" in VS Code user settings
        #  3) default: organizations
        tenant_id = kwargs.pop("tenant_id", None) or vscode_user_settings.get("azure.tenant", "organizations")
        validate_tenant_id(tenant_id)
        if tenant_id.lower() == "adfs":
            self._unavailable_reason = "VisualStudioCodeCredential authentication unavailable. ADFS is not supported."
            return

        self._client = self._get_client(
            authority=normalize_authority(authority), client_id=AZURE_VSCODE_CLIENT_ID, tenant_id=tenant_id, **kwargs
        )


class VisualStudioCodeCredential(_VSCodeCredentialBase, GetTokenMixin):
    """Authenticates as the Azure user signed in to Visual Studio Code via the 'Azure Account' extension.

    It's a `known issue <https://github.com/Azure/azure-sdk-for-python/issues/23249>`_ that this credential doesn't
    work with `Azure Account extension <https://marketplace.visualstudio.com/items?itemName=ms-vscode.azure-account>`_
    versions newer than **0.9.11**. A long-term fix to this problem is in progress. In the meantime, consider
    authenticating with :class:`AzureCliCredential`.

    :keyword str authority: Authority of a Microsoft Entra endpoint, for example "login.microsoftonline.com".
        This argument is required for a custom cloud and usually unnecessary otherwise. Defaults to the authority
        matching the "Azure: Cloud" setting in VS Code's user settings or, when that setting has no value, the
        authority for Azure Public Cloud.
    :keyword str tenant_id: ID of the tenant the credential should authenticate in. Defaults to the "Azure: Tenant"
        setting in VS Code's user settings or, when that setting has no value, the "organizations" tenant, which
        supports only Microsoft Entra work or school accounts.
    :keyword List[str] additionally_allowed_tenants: Specifies tenants in addition to the specified "tenant_id"
        for which the credential may acquire tokens. Add the wildcard value "*" to allow the credential to
        acquire tokens for any tenant the application can access.
    """

    def __enter__(self) -> "VisualStudioCodeCredential":
        if self._client:
            self._client.__enter__()
        return self

    def __exit__(self, *args: Any) -> None:
        if self._client:
            self._client.__exit__(*args)

    def close(self) -> None:
        """Close the credential's transport session."""
        self.__exit__()

    @log_get_token
    def get_token(
        self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, **kwargs: Any
    ) -> AccessToken:
        """Request an access token for `scopes` as the user currently signed in to Visual Studio Code.

        This method is called automatically by Azure SDK clients.

        :param str scopes: desired scopes for the access token. This method requires at least one scope.
            For more information about scopes, see
            https://learn.microsoft.com/entra/identity-platform/scopes-oidc.
        :keyword str claims: additional claims required in the token, such as those returned in a resource provider's
            claims challenge following an authorization failure.
        :keyword str tenant_id: optional tenant to include in the token request.

        :return: An access token with the desired scopes.
        :rtype: ~azure.core.credentials.AccessToken
        :raises ~azure.identity.CredentialUnavailableError: the credential cannot retrieve user details from Visual
          Studio Code
        """
        if self._unavailable_reason:
            error_message = (
                self._unavailable_reason + "\n"
                "Visit https://aka.ms/azsdk/python/identity/vscodecredential/troubleshoot"
                " to troubleshoot this issue."
            )
            raise CredentialUnavailableError(message=error_message)
        if within_dac.get():
            try:
                token = super().get_token(*scopes, claims=claims, tenant_id=tenant_id, **kwargs)
                return token
            except ClientAuthenticationError as ex:
                raise CredentialUnavailableError(message=ex.message) from ex
        return super().get_token(*scopes, claims=claims, tenant_id=tenant_id, **kwargs)

    def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] = None) -> AccessTokenInfo:
        """Request an access token for `scopes` as the user currently signed in to Visual Studio Code.

        This is an alternative to `get_token` to enable certain scenarios that require additional properties
        on the token. This method is called automatically by Azure SDK clients.

        :param str scopes: desired scopes for the access token. This method requires at least one scope.
            For more information about scopes, see https://learn.microsoft.com/entra/identity-platform/scopes-oidc.
        :keyword options: A dictionary of options for the token request. Unknown options will be ignored. Optional.
        :paramtype options: ~azure.core.credentials.TokenRequestOptions

        :rtype: AccessTokenInfo
        :return: An AccessTokenInfo instance containing information about the token.
        :raises ~azure.identity.CredentialUnavailableError: the credential cannot retrieve user details from Visual
          Studio Code.
        """
        if self._unavailable_reason:
            error_message = (
                self._unavailable_reason + "\n"
                "Visit https://aka.ms/azsdk/python/identity/vscodecredential/troubleshoot"
                " to troubleshoot this issue."
            )
            raise CredentialUnavailableError(message=error_message)
        if within_dac.get():
            try:
                token = super().get_token_info(*scopes, options=options)
                return token
            except ClientAuthenticationError as ex:
                raise CredentialUnavailableError(message=ex.message) from ex
        return super().get_token_info(*scopes, options=options)

    def _acquire_token_silently(self, *scopes: str, **kwargs: Any) -> Optional[AccessTokenInfo]:
        self._client = cast(AadClient, self._client)
        return self._client.get_cached_access_token(scopes, **kwargs)

    def _request_token(self, *scopes: str, **kwargs: Any) -> AccessTokenInfo:
        refresh_token = self._get_refresh_token()
        self._client = cast(AadClient, self._client)
        return self._client.obtain_token_by_refresh_token(scopes, refresh_token, **kwargs)

    def _get_client(self, **kwargs: Any) -> AadClient:
        return AadClient(**kwargs)