Spaces:
Runtime error
Runtime error
File size: 6,168 Bytes
1c60c6e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
import inspect
from json import dumps as json_dumps
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Dict,
Iterable,
Iterator,
Tuple,
Union,
)
from urllib.parse import urlencode
from ._exceptions import StreamConsumed
from ._multipart import MultipartStream
from ._types import (
ByteStream,
RequestContent,
RequestData,
RequestFiles,
ResponseContent,
)
class PlainByteStream:
"""
Request content encoded as plain bytes.
"""
def __init__(self, body: bytes) -> None:
self._body = body
def __iter__(self) -> Iterator[bytes]:
yield self._body
async def __aiter__(self) -> AsyncIterator[bytes]:
yield self._body
class GeneratorStream:
"""
Request content encoded as plain bytes, using an byte generator.
"""
def __init__(self, generator: Iterable[bytes]) -> None:
self._generator = generator
self._is_stream_consumed = False
def __iter__(self) -> Iterator[bytes]:
if self._is_stream_consumed:
raise StreamConsumed()
self._is_stream_consumed = True
for part in self._generator:
yield part
class AsyncGeneratorStream:
"""
Request content encoded as plain bytes, using an async byte iterator.
"""
def __init__(self, agenerator: AsyncIterable[bytes]) -> None:
self._agenerator = agenerator
self._is_stream_consumed = False
async def __aiter__(self) -> AsyncIterator[bytes]:
if self._is_stream_consumed:
raise StreamConsumed()
self._is_stream_consumed = True
async for part in self._agenerator:
yield part
def encode_content(
content: Union[str, bytes, ByteStream]
) -> Tuple[Dict[str, str], ByteStream]:
if isinstance(content, (str, bytes)):
body = content.encode("utf-8") if isinstance(content, str) else content
content_length = str(len(body))
headers = {"Content-Length": content_length} if body else {}
stream = PlainByteStream(body)
return headers, stream
elif isinstance(content, (Iterable, AsyncIterable)):
headers = {"Transfer-Encoding": "chunked"}
# Generators should be wrapped in GeneratorStream/AsyncGeneratorStream
# which will raise `StreamConsumed` if the stream is accessed more
# than once. (Eg. Following HTTP 307 or HTTP 308 redirects.)
if inspect.isgenerator(content):
generator_stream = GeneratorStream(content) # type: ignore
return headers, generator_stream
if inspect.isasyncgen(content):
agenerator_stream = AsyncGeneratorStream(content) # type: ignore
return headers, agenerator_stream
# Other iterables may be passed through as-is.
return headers, content # type: ignore
raise TypeError(f"Unexpected type for 'content', {type(content)!r}")
def encode_urlencoded_data(
data: dict,
) -> Tuple[Dict[str, str], ByteStream]:
body = urlencode(data, doseq=True).encode("utf-8")
content_length = str(len(body))
content_type = "application/x-www-form-urlencoded"
headers = {"Content-Length": content_length, "Content-Type": content_type}
return headers, PlainByteStream(body)
def encode_multipart_data(
data: dict, files: RequestFiles, boundary: bytes = None
) -> Tuple[Dict[str, str], ByteStream]:
stream = MultipartStream(data=data, files=files, boundary=boundary)
headers = stream.get_headers()
return headers, stream
def encode_text(text: str) -> Tuple[Dict[str, str], ByteStream]:
body = text.encode("utf-8")
content_length = str(len(body))
content_type = "text/plain; charset=utf-8"
headers = {"Content-Length": content_length, "Content-Type": content_type}
return headers, PlainByteStream(body)
def encode_html(html: str) -> Tuple[Dict[str, str], ByteStream]:
body = html.encode("utf-8")
content_length = str(len(body))
content_type = "text/html; charset=utf-8"
headers = {"Content-Length": content_length, "Content-Type": content_type}
return headers, PlainByteStream(body)
def encode_json(json: Any) -> Tuple[Dict[str, str], ByteStream]:
body = json_dumps(json).encode("utf-8")
content_length = str(len(body))
content_type = "application/json"
headers = {"Content-Length": content_length, "Content-Type": content_type}
return headers, PlainByteStream(body)
def encode_request(
content: RequestContent = None,
data: RequestData = None,
files: RequestFiles = None,
json: Any = None,
boundary: bytes = None,
) -> Tuple[Dict[str, str], ByteStream]:
"""
Handles encoding the given `content`, `data`, `files`, and `json`,
returning a two-tuple of (<headers>, <stream>).
"""
if data is not None and not isinstance(data, dict):
# We prefer to seperate `content=<bytes|str|byte iterator|bytes aiterator>`
# for raw request content, and `data=<form data>` for url encoded or
# multipart form content.
#
# However for compat with requests, we *do* still support
# `data=<bytes...>` usages. We deal with that case here, treating it
# as if `content=<...>` had been supplied instead.
return encode_content(data)
if content is not None:
return encode_content(content)
elif files:
return encode_multipart_data(data or {}, files, boundary)
elif data:
return encode_urlencoded_data(data)
elif json is not None:
return encode_json(json)
return {}, PlainByteStream(b"")
def encode_response(
content: ResponseContent = None,
text: str = None,
html: str = None,
json: Any = None,
) -> Tuple[Dict[str, str], ByteStream]:
"""
Handles encoding the given `content`, returning a two-tuple of
(<headers>, <stream>).
"""
if content is not None:
return encode_content(content)
elif text is not None:
return encode_text(text)
elif html is not None:
return encode_html(html)
elif json is not None:
return encode_json(json)
return {}, PlainByteStream(b"")
|