kdamevski's picture
Upload folder using huggingface_hub
1c60c6e
import datetime
import functools
import typing
import warnings
from types import TracebackType
import httpcore
from .__version__ import __version__
from ._auth import Auth, BasicAuth, FunctionAuth
from ._config import (
DEFAULT_LIMITS,
DEFAULT_MAX_REDIRECTS,
DEFAULT_TIMEOUT_CONFIG,
UNSET,
Limits,
Proxy,
Timeout,
UnsetType,
create_ssl_context,
)
from ._decoders import SUPPORTED_DECODERS
from ._exceptions import (
HTTPCORE_EXC_MAP,
InvalidURL,
RemoteProtocolError,
TooManyRedirects,
map_exceptions,
)
from ._models import URL, Cookies, Headers, QueryParams, Request, Response
from ._status_codes import codes
from ._transports.asgi import ASGITransport
from ._transports.wsgi import WSGITransport
from ._types import (
AuthTypes,
ByteStream,
CertTypes,
CookieTypes,
HeaderTypes,
ProxiesTypes,
QueryParamTypes,
RequestContent,
RequestData,
RequestFiles,
TimeoutTypes,
URLTypes,
VerifyTypes,
)
from ._utils import (
NetRCInfo,
Timer,
URLPattern,
get_environment_proxies,
get_logger,
same_origin,
warn_deprecated,
)
logger = get_logger(__name__)
KEEPALIVE_EXPIRY = 5.0
USER_AGENT = f"python-httpx/{__version__}"
ACCEPT_ENCODING = ", ".join(
[key for key in SUPPORTED_DECODERS.keys() if key != "identity"]
)
class BaseClient:
def __init__(
self,
*,
auth: AuthTypes = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
event_hooks: typing.Dict[str, typing.List[typing.Callable]] = None,
base_url: URLTypes = "",
trust_env: bool = True,
):
event_hooks = {} if event_hooks is None else event_hooks
self._base_url = self._enforce_trailing_slash(URL(base_url))
self._auth = self._build_auth(auth)
self._params = QueryParams(params)
self.headers = Headers(headers)
self._cookies = Cookies(cookies)
self._timeout = Timeout(timeout)
self.max_redirects = max_redirects
self._event_hooks = {
"request": list(event_hooks.get("request", [])),
"response": list(event_hooks.get("response", [])),
}
self._trust_env = trust_env
self._netrc = NetRCInfo()
self._is_closed = True
@property
def is_closed(self) -> bool:
"""
Check if the client being closed
"""
return self._is_closed
@property
def trust_env(self) -> bool:
return self._trust_env
def _enforce_trailing_slash(self, url: URL) -> URL:
if url.path.endswith("/"):
return url
return url.copy_with(path=url.path + "/")
def _get_proxy_map(
self, proxies: typing.Optional[ProxiesTypes], allow_env_proxies: bool
) -> typing.Dict[str, typing.Optional[Proxy]]:
if proxies is None:
if allow_env_proxies:
return {
key: None if url is None else Proxy(url=url)
for key, url in get_environment_proxies().items()
}
return {}
if isinstance(proxies, dict):
new_proxies = {}
for key, value in proxies.items():
proxy = Proxy(url=value) if isinstance(value, (str, URL)) else value
new_proxies[str(key)] = proxy
return new_proxies
else:
proxy = Proxy(url=proxies) if isinstance(proxies, (str, URL)) else proxies
return {"all://": proxy}
@property
def timeout(self) -> Timeout:
return self._timeout
@timeout.setter
def timeout(self, timeout: TimeoutTypes) -> None:
self._timeout = Timeout(timeout)
@property
def event_hooks(self) -> typing.Dict[str, typing.List[typing.Callable]]:
return self._event_hooks
@event_hooks.setter
def event_hooks(
self, event_hooks: typing.Dict[str, typing.List[typing.Callable]]
) -> None:
self._event_hooks = {
"request": list(event_hooks.get("request", [])),
"response": list(event_hooks.get("response", [])),
}
@property
def auth(self) -> typing.Optional[Auth]:
"""
Authentication class used when none is passed at the request-level.
See also [Authentication][0].
[0]: /quickstart/#authentication
"""
return self._auth
@auth.setter
def auth(self, auth: AuthTypes) -> None:
self._auth = self._build_auth(auth)
@property
def base_url(self) -> URL:
"""
Base URL to use when sending requests with relative URLs.
"""
return self._base_url
@base_url.setter
def base_url(self, url: URLTypes) -> None:
self._base_url = self._enforce_trailing_slash(URL(url))
@property
def headers(self) -> Headers:
"""
HTTP headers to include when sending requests.
"""
return self._headers
@headers.setter
def headers(self, headers: HeaderTypes) -> None:
client_headers = Headers(
{
b"Accept": b"*/*",
b"Accept-Encoding": ACCEPT_ENCODING.encode("ascii"),
b"Connection": b"keep-alive",
b"User-Agent": USER_AGENT.encode("ascii"),
}
)
client_headers.update(headers)
self._headers = client_headers
@property
def cookies(self) -> Cookies:
"""
Cookie values to include when sending requests.
"""
return self._cookies
@cookies.setter
def cookies(self, cookies: CookieTypes) -> None:
self._cookies = Cookies(cookies)
@property
def params(self) -> QueryParams:
"""
Query parameters to include in the URL when sending requests.
"""
return self._params
@params.setter
def params(self, params: QueryParamTypes) -> None:
self._params = QueryParams(params)
def stream(
self,
method: str,
url: URLTypes,
*,
content: RequestContent = None,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> "StreamContextManager":
"""
Alternative to `httpx.request()` that streams the response body
instead of loading it into memory at once.
**Parameters**: See `httpx.request`.
See also: [Streaming Responses][0]
[0]: /quickstart#streaming-responses
"""
request = self.build_request(
method=method,
url=url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
)
return StreamContextManager(
client=self,
request=request,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
def build_request(
self,
method: str,
url: URLTypes,
*,
content: RequestContent = None,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
) -> Request:
"""
Build and return a request instance.
* The `params`, `headers` and `cookies` arguments
are merged with any values set on the client.
* The `url` argument is merged with any `base_url` set on the client.
See also: [Request instances][0]
[0]: /advanced/#request-instances
"""
url = self._merge_url(url)
headers = self._merge_headers(headers)
cookies = self._merge_cookies(cookies)
params = self._merge_queryparams(params)
return Request(
method,
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
)
def _merge_url(self, url: URLTypes) -> URL:
"""
Merge a URL argument together with any 'base_url' on the client,
to create the URL used for the outgoing request.
"""
merge_url = URL(url)
if merge_url.is_relative_url:
# We always ensure the base_url paths include the trailing '/',
# and always strip any leading '/' from the merge URL.
merge_url = merge_url.copy_with(path=merge_url.path.lstrip("/"))
return self.base_url.join(merge_url)
return merge_url
def _merge_cookies(
self, cookies: CookieTypes = None
) -> typing.Optional[CookieTypes]:
"""
Merge a cookies argument together with any cookies on the client,
to create the cookies used for the outgoing request.
"""
if cookies or self.cookies:
merged_cookies = Cookies(self.cookies)
merged_cookies.update(cookies)
return merged_cookies
return cookies
def _merge_headers(
self, headers: HeaderTypes = None
) -> typing.Optional[HeaderTypes]:
"""
Merge a headers argument together with any headers on the client,
to create the headers used for the outgoing request.
"""
merged_headers = Headers(self.headers)
merged_headers.update(headers)
return merged_headers
def _merge_queryparams(
self, params: QueryParamTypes = None
) -> typing.Optional[QueryParamTypes]:
"""
Merge a queryparams argument together with any queryparams on the client,
to create the queryparams used for the outgoing request.
"""
if params or self.params:
merged_queryparams = QueryParams(self.params)
merged_queryparams.update(params)
return merged_queryparams
return params
def _build_auth(self, auth: AuthTypes) -> typing.Optional[Auth]:
if auth is None:
return None
elif isinstance(auth, tuple):
return BasicAuth(username=auth[0], password=auth[1])
elif isinstance(auth, Auth):
return auth
elif callable(auth):
return FunctionAuth(func=auth)
else:
raise TypeError('Invalid "auth" argument.')
def _build_request_auth(
self, request: Request, auth: typing.Union[AuthTypes, UnsetType] = UNSET
) -> Auth:
auth = self._auth if isinstance(auth, UnsetType) else self._build_auth(auth)
if auth is not None:
return auth
username, password = request.url.username, request.url.password
if username or password:
return BasicAuth(username=username, password=password)
if self.trust_env and "Authorization" not in request.headers:
credentials = self._netrc.get_credentials(request.url.host)
if credentials is not None:
return BasicAuth(username=credentials[0], password=credentials[1])
return Auth()
def _build_redirect_request(self, request: Request, response: Response) -> Request:
"""
Given a request and a redirect response, return a new request that
should be used to effect the redirect.
"""
method = self._redirect_method(request, response)
url = self._redirect_url(request, response)
headers = self._redirect_headers(request, url, method)
stream = self._redirect_stream(request, method)
cookies = Cookies(self.cookies)
return Request(
method=method, url=url, headers=headers, cookies=cookies, stream=stream
)
def _redirect_method(self, request: Request, response: Response) -> str:
"""
When being redirected we may want to change the method of the request
based on certain specs or browser behavior.
"""
method = request.method
# https://tools.ietf.org/html/rfc7231#section-6.4.4
if response.status_code == codes.SEE_OTHER and method != "HEAD":
method = "GET"
# Do what the browsers do, despite standards...
# Turn 302s into GETs.
if response.status_code == codes.FOUND and method != "HEAD":
method = "GET"
# If a POST is responded to with a 301, turn it into a GET.
# This bizarre behaviour is explained in 'requests' issue 1704.
if response.status_code == codes.MOVED_PERMANENTLY and method == "POST":
method = "GET"
return method
def _redirect_url(self, request: Request, response: Response) -> URL:
"""
Return the URL for the redirect to follow.
"""
location = response.headers["Location"]
try:
url = URL(location)
except InvalidURL as exc:
raise RemoteProtocolError(
f"Invalid URL in location header: {exc}.", request=request
) from None
# Handle malformed 'Location' headers that are "absolute" form, have no host.
# See: https://github.com/encode/httpx/issues/771
if url.scheme and not url.host:
url = url.copy_with(host=request.url.host)
# Facilitate relative 'Location' headers, as allowed by RFC 7231.
# (e.g. '/path/to/resource' instead of 'http://domain.tld/path/to/resource')
if url.is_relative_url:
url = request.url.join(url)
# Attach previous fragment if needed (RFC 7231 7.1.2)
if request.url.fragment and not url.fragment:
url = url.copy_with(fragment=request.url.fragment)
return url
def _redirect_headers(self, request: Request, url: URL, method: str) -> Headers:
"""
Return the headers that should be used for the redirect request.
"""
headers = Headers(request.headers)
if not same_origin(url, request.url):
# Strip Authorization headers when responses are redirected away from
# the origin.
headers.pop("Authorization", None)
# Remove the Host header, so that a new one will be auto-populated on
# the request instance.
headers.pop("Host", None)
if method != request.method and method == "GET":
# If we've switch to a 'GET' request, then strip any headers which
# are only relevant to the request body.
headers.pop("Content-Length", None)
headers.pop("Transfer-Encoding", None)
# We should use the client cookie store to determine any cookie header,
# rather than whatever was on the original outgoing request.
headers.pop("Cookie", None)
return headers
def _redirect_stream(
self, request: Request, method: str
) -> typing.Optional[ByteStream]:
"""
Return the body that should be used for the redirect request.
"""
if method != request.method and method == "GET":
return None
return request.stream
class Client(BaseClient):
"""
An HTTP client, with connection pooling, HTTP/2, redirects, cookie persistence, etc.
Usage:
```python
>>> client = httpx.Client()
>>> response = client.get('https://example.org')
```
**Parameters:**
* **auth** - *(optional)* An authentication class to use when sending
requests.
* **params** - *(optional)* Query parameters to include in request URLs, as
a string, dictionary, or list of two-tuples.
* **headers** - *(optional)* Dictionary of HTTP headers to include when
sending requests.
* **cookies** - *(optional)* Dictionary of Cookie items to include when
sending requests.
* **verify** - *(optional)* SSL certificates (a.k.a CA bundle) used to
verify the identity of requested hosts. Either `True` (default CA bundle),
a path to an SSL certificate file, or `False` (disable verification).
* **cert** - *(optional)* An SSL certificate used by the requested host
to authenticate the client. Either a path to an SSL certificate file, or
two-tuple of (certificate file, key file), or a three-tuple of (certificate
file, key file, password).
* **proxies** - *(optional)* A dictionary mapping proxy keys to proxy
URLs.
* **timeout** - *(optional)* The timeout configuration to use when sending
requests.
* **limits** - *(optional)* The limits configuration to use.
* **max_redirects** - *(optional)* The maximum number of redirect responses
that should be followed.
* **base_url** - *(optional)* A URL to use as the base when building
request URLs.
* **transport** - *(optional)* A transport class to use for sending requests
over the network.
* **app** - *(optional)* An WSGI application to send requests to,
rather than sending actual network requests.
* **trust_env** - *(optional)* Enables or disables usage of environment
variables for configuration.
"""
def __init__(
self,
*,
auth: AuthTypes = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
verify: VerifyTypes = True,
cert: CertTypes = None,
http2: bool = False,
proxies: ProxiesTypes = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
limits: Limits = DEFAULT_LIMITS,
pool_limits: Limits = None,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
event_hooks: typing.Dict[str, typing.List[typing.Callable]] = None,
base_url: URLTypes = "",
transport: httpcore.SyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
):
super().__init__(
auth=auth,
params=params,
headers=headers,
cookies=cookies,
timeout=timeout,
max_redirects=max_redirects,
event_hooks=event_hooks,
base_url=base_url,
trust_env=trust_env,
)
if http2:
try:
import h2 # noqa
except ImportError: # pragma: nocover
raise ImportError(
"Using http2=True, but the 'h2' package is not installed. "
"Make sure to install httpx using `pip install httpx[http2]`."
) from None
if pool_limits is not None:
warn_deprecated(
"Client(..., pool_limits=...) is deprecated and will raise "
"errors in the future. Use Client(..., limits=...) instead."
)
limits = pool_limits
allow_env_proxies = trust_env and app is None and transport is None
proxy_map = self._get_proxy_map(proxies, allow_env_proxies)
self._transport = self._init_transport(
verify=verify,
cert=cert,
http2=http2,
limits=limits,
transport=transport,
app=app,
trust_env=trust_env,
)
self._proxies: typing.Dict[
URLPattern, typing.Optional[httpcore.SyncHTTPTransport]
] = {
URLPattern(key): None
if proxy is None
else self._init_proxy_transport(
proxy,
verify=verify,
cert=cert,
http2=http2,
limits=limits,
trust_env=trust_env,
)
for key, proxy in proxy_map.items()
}
self._proxies = dict(sorted(self._proxies.items()))
def _init_transport(
self,
verify: VerifyTypes = True,
cert: CertTypes = None,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
transport: httpcore.SyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
) -> httpcore.SyncHTTPTransport:
if transport is not None:
return transport
if app is not None:
return WSGITransport(app=app)
ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
return httpcore.SyncConnectionPool(
ssl_context=ssl_context,
max_connections=limits.max_connections,
max_keepalive_connections=limits.max_keepalive_connections,
keepalive_expiry=KEEPALIVE_EXPIRY,
http2=http2,
)
def _init_proxy_transport(
self,
proxy: Proxy,
verify: VerifyTypes = True,
cert: CertTypes = None,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
trust_env: bool = True,
) -> httpcore.SyncHTTPTransport:
ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
return httpcore.SyncHTTPProxy(
proxy_url=proxy.url.raw,
proxy_headers=proxy.headers.raw,
proxy_mode=proxy.mode,
ssl_context=ssl_context,
max_connections=limits.max_connections,
max_keepalive_connections=limits.max_keepalive_connections,
keepalive_expiry=KEEPALIVE_EXPIRY,
http2=http2,
)
def _transport_for_url(self, url: URL) -> httpcore.SyncHTTPTransport:
"""
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
for pattern, transport in self._proxies.items():
if pattern.matches(url):
return self._transport if transport is None else transport
return self._transport
def request(
self,
method: str,
url: URLTypes,
*,
content: RequestContent = None,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Build and send a request.
Equivalent to:
```python
request = client.build_request(...)
response = client.send(request, ...)
```
See `Client.build_request()`, `Client.send()` and
[Merging of configuration][0] for how the various parameters
are merged with client-level configuration.
[0]: /advanced/#merging-of-configuration
"""
request = self.build_request(
method=method,
url=url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
)
return self.send(
request, auth=auth, allow_redirects=allow_redirects, timeout=timeout
)
def send(
self,
request: Request,
*,
stream: bool = False,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send a request.
The request is sent as-is, unmodified.
Typically you'll want to build one with `Client.build_request()`
so that any client-level configuration is merged into the request,
but passing an explicit `httpx.Request()` is supported as well.
See also: [Request instances][0]
[0]: /advanced/#request-instances
"""
self._is_closed = False
timeout = self.timeout if isinstance(timeout, UnsetType) else Timeout(timeout)
auth = self._build_request_auth(request, auth)
response = self._send_handling_auth(
request,
auth=auth,
timeout=timeout,
allow_redirects=allow_redirects,
history=[],
)
if not stream:
try:
response.read()
finally:
response.close()
try:
for hook in self._event_hooks["response"]:
hook(response)
except Exception:
response.close()
raise
return response
def _send_handling_auth(
self,
request: Request,
auth: Auth,
timeout: Timeout,
allow_redirects: bool,
history: typing.List[Response],
) -> Response:
auth_flow = auth.sync_auth_flow(request)
request = next(auth_flow)
for hook in self._event_hooks["request"]:
hook(request)
while True:
response = self._send_handling_redirects(
request,
timeout=timeout,
allow_redirects=allow_redirects,
history=history,
)
try:
next_request = auth_flow.send(response)
except StopIteration:
return response
except BaseException as exc:
response.close()
raise exc from None
else:
response.history = list(history)
response.read()
request = next_request
history.append(response)
def _send_handling_redirects(
self,
request: Request,
timeout: Timeout,
allow_redirects: bool,
history: typing.List[Response],
) -> Response:
while True:
if len(history) > self.max_redirects:
raise TooManyRedirects(
"Exceeded maximum allowed redirects.", request=request
)
response = self._send_single_request(request, timeout)
response.history = list(history)
if not response.is_redirect:
return response
if allow_redirects:
response.read()
request = self._build_redirect_request(request, response)
history = history + [response]
if not allow_redirects:
response.next_request = request
response.call_next = functools.partial(
self._send_handling_redirects,
request=request,
timeout=timeout,
allow_redirects=False,
history=history,
)
return response
def _send_single_request(self, request: Request, timeout: Timeout) -> Response:
"""
Sends a single request, without handling any redirections.
"""
transport = self._transport_for_url(request.url)
timer = Timer()
timer.sync_start()
with map_exceptions(HTTPCORE_EXC_MAP, request=request):
(status_code, headers, stream, ext) = transport.request(
request.method.encode(),
request.url.raw,
headers=request.headers.raw,
stream=request.stream, # type: ignore
ext={"timeout": timeout.as_dict()},
)
def on_close(response: Response) -> None:
response.elapsed = datetime.timedelta(seconds=timer.sync_elapsed())
if hasattr(stream, "close"):
stream.close()
response = Response(
status_code,
headers=headers,
stream=stream, # type: ignore
ext=ext,
request=request,
on_close=on_close,
)
self.cookies.extract_cookies(response)
status = f"{response.status_code} {response.reason_phrase}"
response_line = f"{response.http_version} {status}"
logger.debug(f'HTTP Request: {request.method} {request.url} "{response_line}"')
return response
def get(
self,
url: URLTypes,
*,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send a `GET` request.
**Parameters**: See `httpx.request`.
"""
return self.request(
"GET",
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
def options(
self,
url: URLTypes,
*,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send an `OPTIONS` request.
**Parameters**: See `httpx.request`.
"""
return self.request(
"OPTIONS",
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
def head(
self,
url: URLTypes,
*,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send a `HEAD` request.
**Parameters**: See `httpx.request`.
"""
return self.request(
"HEAD",
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
def post(
self,
url: URLTypes,
*,
content: RequestContent = None,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send a `POST` request.
**Parameters**: See `httpx.request`.
"""
return self.request(
"POST",
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
def put(
self,
url: URLTypes,
*,
content: RequestContent = None,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send a `PUT` request.
**Parameters**: See `httpx.request`.
"""
return self.request(
"PUT",
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
def patch(
self,
url: URLTypes,
*,
content: RequestContent = None,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send a `PATCH` request.
**Parameters**: See `httpx.request`.
"""
return self.request(
"PATCH",
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
def delete(
self,
url: URLTypes,
*,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send a `DELETE` request.
**Parameters**: See `httpx.request`.
"""
return self.request(
"DELETE",
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
def close(self) -> None:
"""
Close transport and proxies.
"""
if not self.is_closed:
self._is_closed = True
self._transport.close()
for proxy in self._proxies.values():
if proxy is not None:
proxy.close()
def __enter__(self) -> "Client":
self._transport.__enter__()
for proxy in self._proxies.values():
if proxy is not None:
proxy.__enter__()
self._is_closed = False
return self
def __exit__(
self,
exc_type: typing.Type[BaseException] = None,
exc_value: BaseException = None,
traceback: TracebackType = None,
) -> None:
if not self.is_closed:
self._is_closed = True
self._transport.__exit__(exc_type, exc_value, traceback)
for proxy in self._proxies.values():
if proxy is not None:
proxy.__exit__(exc_type, exc_value, traceback)
def __del__(self) -> None:
self.close()
class AsyncClient(BaseClient):
"""
An asynchronous HTTP client, with connection pooling, HTTP/2, redirects,
cookie persistence, etc.
Usage:
```python
>>> async with httpx.AsyncClient() as client:
>>> response = await client.get('https://example.org')
```
**Parameters:**
* **auth** - *(optional)* An authentication class to use when sending
requests.
* **params** - *(optional)* Query parameters to include in request URLs, as
a string, dictionary, or list of two-tuples.
* **headers** - *(optional)* Dictionary of HTTP headers to include when
sending requests.
* **cookies** - *(optional)* Dictionary of Cookie items to include when
sending requests.
* **verify** - *(optional)* SSL certificates (a.k.a CA bundle) used to
verify the identity of requested hosts. Either `True` (default CA bundle),
a path to an SSL certificate file, or `False` (disable verification).
* **cert** - *(optional)* An SSL certificate used by the requested host
to authenticate the client. Either a path to an SSL certificate file, or
two-tuple of (certificate file, key file), or a three-tuple of (certificate
file, key file, password).
* **http2** - *(optional)* A boolean indicating if HTTP/2 support should be
enabled. Defaults to `False`.
* **proxies** - *(optional)* A dictionary mapping HTTP protocols to proxy
URLs.
* **timeout** - *(optional)* The timeout configuration to use when sending
requests.
* **limits** - *(optional)* The limits configuration to use.
* **max_redirects** - *(optional)* The maximum number of redirect responses
that should be followed.
* **base_url** - *(optional)* A URL to use as the base when building
request URLs.
* **transport** - *(optional)* A transport class to use for sending requests
over the network.
* **app** - *(optional)* An ASGI application to send requests to,
rather than sending actual network requests.
* **trust_env** - *(optional)* Enables or disables usage of environment
variables for configuration.
"""
def __init__(
self,
*,
auth: AuthTypes = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
verify: VerifyTypes = True,
cert: CertTypes = None,
http2: bool = False,
proxies: ProxiesTypes = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
limits: Limits = DEFAULT_LIMITS,
pool_limits: Limits = None,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
event_hooks: typing.Dict[str, typing.List[typing.Callable]] = None,
base_url: URLTypes = "",
transport: httpcore.AsyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
):
super().__init__(
auth=auth,
params=params,
headers=headers,
cookies=cookies,
timeout=timeout,
max_redirects=max_redirects,
event_hooks=event_hooks,
base_url=base_url,
trust_env=trust_env,
)
if http2:
try:
import h2 # noqa
except ImportError: # pragma: nocover
raise ImportError(
"Using http2=True, but the 'h2' package is not installed. "
"Make sure to install httpx using `pip install httpx[http2]`."
) from None
if pool_limits is not None:
warn_deprecated(
"AsyncClient(..., pool_limits=...) is deprecated and will raise "
"errors in the future. Use AsyncClient(..., limits=...) instead."
)
limits = pool_limits
allow_env_proxies = trust_env and app is None and transport is None
proxy_map = self._get_proxy_map(proxies, allow_env_proxies)
self._transport = self._init_transport(
verify=verify,
cert=cert,
http2=http2,
limits=limits,
transport=transport,
app=app,
trust_env=trust_env,
)
self._proxies: typing.Dict[
URLPattern, typing.Optional[httpcore.AsyncHTTPTransport]
] = {
URLPattern(key): None
if proxy is None
else self._init_proxy_transport(
proxy,
verify=verify,
cert=cert,
http2=http2,
limits=limits,
trust_env=trust_env,
)
for key, proxy in proxy_map.items()
}
self._proxies = dict(sorted(self._proxies.items()))
def _init_transport(
self,
verify: VerifyTypes = True,
cert: CertTypes = None,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
transport: httpcore.AsyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
) -> httpcore.AsyncHTTPTransport:
if transport is not None:
return transport
if app is not None:
return ASGITransport(app=app)
ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
return httpcore.AsyncConnectionPool(
ssl_context=ssl_context,
max_connections=limits.max_connections,
max_keepalive_connections=limits.max_keepalive_connections,
keepalive_expiry=KEEPALIVE_EXPIRY,
http2=http2,
)
def _init_proxy_transport(
self,
proxy: Proxy,
verify: VerifyTypes = True,
cert: CertTypes = None,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
trust_env: bool = True,
) -> httpcore.AsyncHTTPTransport:
ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
return httpcore.AsyncHTTPProxy(
proxy_url=proxy.url.raw,
proxy_headers=proxy.headers.raw,
proxy_mode=proxy.mode,
ssl_context=ssl_context,
max_connections=limits.max_connections,
max_keepalive_connections=limits.max_keepalive_connections,
keepalive_expiry=KEEPALIVE_EXPIRY,
http2=http2,
)
def _transport_for_url(self, url: URL) -> httpcore.AsyncHTTPTransport:
"""
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
for pattern, transport in self._proxies.items():
if pattern.matches(url):
return self._transport if transport is None else transport
return self._transport
async def request(
self,
method: str,
url: URLTypes,
*,
content: RequestContent = None,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Build and send a request.
Equivalent to:
```python
request = client.build_request(...)
response = await client.send(request, ...)
```
See `AsyncClient.build_request()`, `AsyncClient.send()`
and [Merging of configuration][0] for how the various parameters
are merged with client-level configuration.
[0]: /advanced/#merging-of-configuration
"""
request = self.build_request(
method=method,
url=url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
)
response = await self.send(
request, auth=auth, allow_redirects=allow_redirects, timeout=timeout
)
return response
async def send(
self,
request: Request,
*,
stream: bool = False,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send a request.
The request is sent as-is, unmodified.
Typically you'll want to build one with `AsyncClient.build_request()`
so that any client-level configuration is merged into the request,
but passing an explicit `httpx.Request()` is supported as well.
See also: [Request instances][0]
[0]: /advanced/#request-instances
"""
self._is_closed = False
timeout = self.timeout if isinstance(timeout, UnsetType) else Timeout(timeout)
auth = self._build_request_auth(request, auth)
response = await self._send_handling_auth(
request,
auth=auth,
timeout=timeout,
allow_redirects=allow_redirects,
history=[],
)
if not stream:
try:
await response.aread()
finally:
await response.aclose()
try:
for hook in self._event_hooks["response"]:
await hook(response)
except Exception:
await response.aclose()
raise
return response
async def _send_handling_auth(
self,
request: Request,
auth: Auth,
timeout: Timeout,
allow_redirects: bool,
history: typing.List[Response],
) -> Response:
auth_flow = auth.async_auth_flow(request)
request = await auth_flow.__anext__()
for hook in self._event_hooks["request"]:
await hook(request)
while True:
response = await self._send_handling_redirects(
request,
timeout=timeout,
allow_redirects=allow_redirects,
history=history,
)
try:
next_request = await auth_flow.asend(response)
except StopAsyncIteration:
return response
except BaseException as exc:
await response.aclose()
raise exc from None
else:
response.history = list(history)
await response.aread()
request = next_request
history.append(response)
async def _send_handling_redirects(
self,
request: Request,
timeout: Timeout,
allow_redirects: bool,
history: typing.List[Response],
) -> Response:
while True:
if len(history) > self.max_redirects:
raise TooManyRedirects(
"Exceeded maximum allowed redirects.", request=request
)
response = await self._send_single_request(request, timeout)
response.history = list(history)
if not response.is_redirect:
return response
if allow_redirects:
await response.aread()
request = self._build_redirect_request(request, response)
history = history + [response]
if not allow_redirects:
response.next_request = request
response.call_next = functools.partial(
self._send_handling_redirects,
request=request,
timeout=timeout,
allow_redirects=False,
history=history,
)
return response
async def _send_single_request(
self, request: Request, timeout: Timeout
) -> Response:
"""
Sends a single request, without handling any redirections.
"""
transport = self._transport_for_url(request.url)
timer = Timer()
await timer.async_start()
with map_exceptions(HTTPCORE_EXC_MAP, request=request):
(status_code, headers, stream, ext,) = await transport.arequest(
request.method.encode(),
request.url.raw,
headers=request.headers.raw,
stream=request.stream, # type: ignore
ext={"timeout": timeout.as_dict()},
)
async def on_close(response: Response) -> None:
response.elapsed = datetime.timedelta(seconds=await timer.async_elapsed())
if hasattr(stream, "aclose"):
await stream.aclose()
response = Response(
status_code,
headers=headers,
stream=stream, # type: ignore
ext=ext,
request=request,
on_close=on_close,
)
self.cookies.extract_cookies(response)
status = f"{response.status_code} {response.reason_phrase}"
response_line = f"{response.http_version} {status}"
logger.debug(f'HTTP Request: {request.method} {request.url} "{response_line}"')
return response
async def get(
self,
url: URLTypes,
*,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send a `GET` request.
**Parameters**: See `httpx.request`.
"""
return await self.request(
"GET",
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
async def options(
self,
url: URLTypes,
*,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send an `OPTIONS` request.
**Parameters**: See `httpx.request`.
"""
return await self.request(
"OPTIONS",
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
async def head(
self,
url: URLTypes,
*,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send a `HEAD` request.
**Parameters**: See `httpx.request`.
"""
return await self.request(
"HEAD",
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
async def post(
self,
url: URLTypes,
*,
content: RequestContent = None,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send a `POST` request.
**Parameters**: See `httpx.request`.
"""
return await self.request(
"POST",
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
async def put(
self,
url: URLTypes,
*,
content: RequestContent = None,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send a `PUT` request.
**Parameters**: See `httpx.request`.
"""
return await self.request(
"PUT",
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
async def patch(
self,
url: URLTypes,
*,
content: RequestContent = None,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send a `PATCH` request.
**Parameters**: See `httpx.request`.
"""
return await self.request(
"PATCH",
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
async def delete(
self,
url: URLTypes,
*,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
) -> Response:
"""
Send a `DELETE` request.
**Parameters**: See `httpx.request`.
"""
return await self.request(
"DELETE",
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
allow_redirects=allow_redirects,
timeout=timeout,
)
async def aclose(self) -> None:
"""
Close transport and proxies.
"""
if not self.is_closed:
self._is_closed = True
await self._transport.aclose()
for proxy in self._proxies.values():
if proxy is not None:
await proxy.aclose()
async def __aenter__(self) -> "AsyncClient":
await self._transport.__aenter__()
for proxy in self._proxies.values():
if proxy is not None:
await proxy.__aenter__()
self._is_closed = False
return self
async def __aexit__(
self,
exc_type: typing.Type[BaseException] = None,
exc_value: BaseException = None,
traceback: TracebackType = None,
) -> None:
if not self.is_closed:
self._is_closed = True
await self._transport.__aexit__(exc_type, exc_value, traceback)
for proxy in self._proxies.values():
if proxy is not None:
await proxy.__aexit__(exc_type, exc_value, traceback)
def __del__(self) -> None:
if not self.is_closed:
warnings.warn(
f"Unclosed {self!r}. "
"See https://www.python-httpx.org/async/#opening-and-closing-clients "
"for details."
)
class StreamContextManager:
def __init__(
self,
client: BaseClient,
request: Request,
*,
auth: typing.Union[AuthTypes, UnsetType] = UNSET,
allow_redirects: bool = True,
timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
close_client: bool = False,
) -> None:
self.client = client
self.request = request
self.auth = auth
self.allow_redirects = allow_redirects
self.timeout = timeout
self.close_client = close_client
def __enter__(self) -> "Response":
assert isinstance(self.client, Client)
self.response = self.client.send(
request=self.request,
auth=self.auth,
allow_redirects=self.allow_redirects,
timeout=self.timeout,
stream=True,
)
return self.response
def __exit__(
self,
exc_type: typing.Type[BaseException] = None,
exc_value: BaseException = None,
traceback: TracebackType = None,
) -> None:
assert isinstance(self.client, Client)
self.response.close()
if self.close_client:
self.client.close()
async def __aenter__(self) -> "Response":
assert isinstance(self.client, AsyncClient)
self.response = await self.client.send(
request=self.request,
auth=self.auth,
allow_redirects=self.allow_redirects,
timeout=self.timeout,
stream=True,
)
return self.response
async def __aexit__(
self,
exc_type: typing.Type[BaseException] = None,
exc_value: BaseException = None,
traceback: TracebackType = None,
) -> None:
assert isinstance(self.client, AsyncClient)
await self.response.aclose()