Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
# Copyright (c) 2015 Ian Stapleton Cordasco | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
# implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
"""Module containing the urlparse compatibility logic.""" | |
from collections import namedtuple | |
from . import compat | |
from . import exceptions | |
from . import misc | |
from . import normalizers | |
from . import uri | |
__all__ = ("ParseResult", "ParseResultBytes") | |
PARSED_COMPONENTS = ( | |
"scheme", | |
"userinfo", | |
"host", | |
"port", | |
"path", | |
"query", | |
"fragment", | |
) | |
class ParseResultMixin(object): | |
def _generate_authority(self, attributes): | |
# I swear I did not align the comparisons below. That's just how they | |
# happened to align based on pep8 and attribute lengths. | |
userinfo, host, port = ( | |
attributes[p] for p in ("userinfo", "host", "port") | |
) | |
if ( | |
self.userinfo != userinfo | |
or self.host != host | |
or self.port != port | |
): | |
if port: | |
port = "{0}".format(port) | |
return normalizers.normalize_authority( | |
( | |
compat.to_str(userinfo, self.encoding), | |
compat.to_str(host, self.encoding), | |
port, | |
) | |
) | |
if isinstance(self.authority, bytes): | |
return self.authority.decode("utf-8") | |
return self.authority | |
def geturl(self): | |
"""Shim to match the standard library method.""" | |
return self.unsplit() | |
def hostname(self): | |
"""Shim to match the standard library.""" | |
return self.host | |
def netloc(self): | |
"""Shim to match the standard library.""" | |
return self.authority | |
def params(self): | |
"""Shim to match the standard library.""" | |
return self.query | |
class ParseResult( | |
namedtuple("ParseResult", PARSED_COMPONENTS), ParseResultMixin | |
): | |
"""Implementation of urlparse compatibility class. | |
This uses the URIReference logic to handle compatibility with the | |
urlparse.ParseResult class. | |
""" | |
slots = () | |
def __new__( | |
cls, | |
scheme, | |
userinfo, | |
host, | |
port, | |
path, | |
query, | |
fragment, | |
uri_ref, | |
encoding="utf-8", | |
): | |
"""Create a new ParseResult.""" | |
parse_result = super(ParseResult, cls).__new__( | |
cls, | |
scheme or None, | |
userinfo or None, | |
host, | |
port or None, | |
path or None, | |
query, | |
fragment, | |
) | |
parse_result.encoding = encoding | |
parse_result.reference = uri_ref | |
return parse_result | |
def from_parts( | |
cls, | |
scheme=None, | |
userinfo=None, | |
host=None, | |
port=None, | |
path=None, | |
query=None, | |
fragment=None, | |
encoding="utf-8", | |
): | |
"""Create a ParseResult instance from its parts.""" | |
authority = "" | |
if userinfo is not None: | |
authority += userinfo + "@" | |
if host is not None: | |
authority += host | |
if port is not None: | |
authority += ":{0}".format(port) | |
uri_ref = uri.URIReference( | |
scheme=scheme, | |
authority=authority, | |
path=path, | |
query=query, | |
fragment=fragment, | |
encoding=encoding, | |
).normalize() | |
userinfo, host, port = authority_from(uri_ref, strict=True) | |
return cls( | |
scheme=uri_ref.scheme, | |
userinfo=userinfo, | |
host=host, | |
port=port, | |
path=uri_ref.path, | |
query=uri_ref.query, | |
fragment=uri_ref.fragment, | |
uri_ref=uri_ref, | |
encoding=encoding, | |
) | |
def from_string( | |
cls, uri_string, encoding="utf-8", strict=True, lazy_normalize=True | |
): | |
"""Parse a URI from the given unicode URI string. | |
:param str uri_string: Unicode URI to be parsed into a reference. | |
:param str encoding: The encoding of the string provided | |
:param bool strict: Parse strictly according to :rfc:`3986` if True. | |
If False, parse similarly to the standard library's urlparse | |
function. | |
:returns: :class:`ParseResult` or subclass thereof | |
""" | |
reference = uri.URIReference.from_string(uri_string, encoding) | |
if not lazy_normalize: | |
reference = reference.normalize() | |
userinfo, host, port = authority_from(reference, strict) | |
return cls( | |
scheme=reference.scheme, | |
userinfo=userinfo, | |
host=host, | |
port=port, | |
path=reference.path, | |
query=reference.query, | |
fragment=reference.fragment, | |
uri_ref=reference, | |
encoding=encoding, | |
) | |
def authority(self): | |
"""Return the normalized authority.""" | |
return self.reference.authority | |
def copy_with( | |
self, | |
scheme=misc.UseExisting, | |
userinfo=misc.UseExisting, | |
host=misc.UseExisting, | |
port=misc.UseExisting, | |
path=misc.UseExisting, | |
query=misc.UseExisting, | |
fragment=misc.UseExisting, | |
): | |
"""Create a copy of this instance replacing with specified parts.""" | |
attributes = zip( | |
PARSED_COMPONENTS, | |
(scheme, userinfo, host, port, path, query, fragment), | |
) | |
attrs_dict = {} | |
for name, value in attributes: | |
if value is misc.UseExisting: | |
value = getattr(self, name) | |
attrs_dict[name] = value | |
authority = self._generate_authority(attrs_dict) | |
ref = self.reference.copy_with( | |
scheme=attrs_dict["scheme"], | |
authority=authority, | |
path=attrs_dict["path"], | |
query=attrs_dict["query"], | |
fragment=attrs_dict["fragment"], | |
) | |
return ParseResult(uri_ref=ref, encoding=self.encoding, **attrs_dict) | |
def encode(self, encoding=None): | |
"""Convert to an instance of ParseResultBytes.""" | |
encoding = encoding or self.encoding | |
attrs = dict( | |
zip( | |
PARSED_COMPONENTS, | |
( | |
attr.encode(encoding) if hasattr(attr, "encode") else attr | |
for attr in self | |
), | |
) | |
) | |
return ParseResultBytes( | |
uri_ref=self.reference, encoding=encoding, **attrs | |
) | |
def unsplit(self, use_idna=False): | |
"""Create a URI string from the components. | |
:returns: The parsed URI reconstituted as a string. | |
:rtype: str | |
""" | |
parse_result = self | |
if use_idna and self.host: | |
hostbytes = self.host.encode("idna") | |
host = hostbytes.decode(self.encoding) | |
parse_result = self.copy_with(host=host) | |
return parse_result.reference.unsplit() | |
class ParseResultBytes( | |
namedtuple("ParseResultBytes", PARSED_COMPONENTS), ParseResultMixin | |
): | |
"""Compatibility shim for the urlparse.ParseResultBytes object.""" | |
def __new__( | |
cls, | |
scheme, | |
userinfo, | |
host, | |
port, | |
path, | |
query, | |
fragment, | |
uri_ref, | |
encoding="utf-8", | |
lazy_normalize=True, | |
): | |
"""Create a new ParseResultBytes instance.""" | |
parse_result = super(ParseResultBytes, cls).__new__( | |
cls, | |
scheme or None, | |
userinfo or None, | |
host, | |
port or None, | |
path or None, | |
query or None, | |
fragment or None, | |
) | |
parse_result.encoding = encoding | |
parse_result.reference = uri_ref | |
parse_result.lazy_normalize = lazy_normalize | |
return parse_result | |
def from_parts( | |
cls, | |
scheme=None, | |
userinfo=None, | |
host=None, | |
port=None, | |
path=None, | |
query=None, | |
fragment=None, | |
encoding="utf-8", | |
lazy_normalize=True, | |
): | |
"""Create a ParseResult instance from its parts.""" | |
authority = "" | |
if userinfo is not None: | |
authority += userinfo + "@" | |
if host is not None: | |
authority += host | |
if port is not None: | |
authority += ":{0}".format(int(port)) | |
uri_ref = uri.URIReference( | |
scheme=scheme, | |
authority=authority, | |
path=path, | |
query=query, | |
fragment=fragment, | |
encoding=encoding, | |
) | |
if not lazy_normalize: | |
uri_ref = uri_ref.normalize() | |
to_bytes = compat.to_bytes | |
userinfo, host, port = authority_from(uri_ref, strict=True) | |
return cls( | |
scheme=to_bytes(scheme, encoding), | |
userinfo=to_bytes(userinfo, encoding), | |
host=to_bytes(host, encoding), | |
port=port, | |
path=to_bytes(path, encoding), | |
query=to_bytes(query, encoding), | |
fragment=to_bytes(fragment, encoding), | |
uri_ref=uri_ref, | |
encoding=encoding, | |
lazy_normalize=lazy_normalize, | |
) | |
def from_string( | |
cls, uri_string, encoding="utf-8", strict=True, lazy_normalize=True | |
): | |
"""Parse a URI from the given unicode URI string. | |
:param str uri_string: Unicode URI to be parsed into a reference. | |
:param str encoding: The encoding of the string provided | |
:param bool strict: Parse strictly according to :rfc:`3986` if True. | |
If False, parse similarly to the standard library's urlparse | |
function. | |
:returns: :class:`ParseResultBytes` or subclass thereof | |
""" | |
reference = uri.URIReference.from_string(uri_string, encoding) | |
if not lazy_normalize: | |
reference = reference.normalize() | |
userinfo, host, port = authority_from(reference, strict) | |
to_bytes = compat.to_bytes | |
return cls( | |
scheme=to_bytes(reference.scheme, encoding), | |
userinfo=to_bytes(userinfo, encoding), | |
host=to_bytes(host, encoding), | |
port=port, | |
path=to_bytes(reference.path, encoding), | |
query=to_bytes(reference.query, encoding), | |
fragment=to_bytes(reference.fragment, encoding), | |
uri_ref=reference, | |
encoding=encoding, | |
lazy_normalize=lazy_normalize, | |
) | |
def authority(self): | |
"""Return the normalized authority.""" | |
return self.reference.authority.encode(self.encoding) | |
def copy_with( | |
self, | |
scheme=misc.UseExisting, | |
userinfo=misc.UseExisting, | |
host=misc.UseExisting, | |
port=misc.UseExisting, | |
path=misc.UseExisting, | |
query=misc.UseExisting, | |
fragment=misc.UseExisting, | |
lazy_normalize=True, | |
): | |
"""Create a copy of this instance replacing with specified parts.""" | |
attributes = zip( | |
PARSED_COMPONENTS, | |
(scheme, userinfo, host, port, path, query, fragment), | |
) | |
attrs_dict = {} | |
for name, value in attributes: | |
if value is misc.UseExisting: | |
value = getattr(self, name) | |
if not isinstance(value, bytes) and hasattr(value, "encode"): | |
value = value.encode(self.encoding) | |
attrs_dict[name] = value | |
authority = self._generate_authority(attrs_dict) | |
to_str = compat.to_str | |
ref = self.reference.copy_with( | |
scheme=to_str(attrs_dict["scheme"], self.encoding), | |
authority=to_str(authority, self.encoding), | |
path=to_str(attrs_dict["path"], self.encoding), | |
query=to_str(attrs_dict["query"], self.encoding), | |
fragment=to_str(attrs_dict["fragment"], self.encoding), | |
) | |
if not lazy_normalize: | |
ref = ref.normalize() | |
return ParseResultBytes( | |
uri_ref=ref, | |
encoding=self.encoding, | |
lazy_normalize=lazy_normalize, | |
**attrs_dict | |
) | |
def unsplit(self, use_idna=False): | |
"""Create a URI bytes object from the components. | |
:returns: The parsed URI reconstituted as a string. | |
:rtype: bytes | |
""" | |
parse_result = self | |
if use_idna and self.host: | |
# self.host is bytes, to encode to idna, we need to decode it | |
# first | |
host = self.host.decode(self.encoding) | |
hostbytes = host.encode("idna") | |
parse_result = self.copy_with(host=hostbytes) | |
if self.lazy_normalize: | |
parse_result = parse_result.copy_with(lazy_normalize=False) | |
uri = parse_result.reference.unsplit() | |
return uri.encode(self.encoding) | |
def split_authority(authority): | |
# Initialize our expected return values | |
userinfo = host = port = None | |
# Initialize an extra var we may need to use | |
extra_host = None | |
# Set-up rest in case there is no userinfo portion | |
rest = authority | |
if "@" in authority: | |
userinfo, rest = authority.rsplit("@", 1) | |
# Handle IPv6 host addresses | |
if rest.startswith("["): | |
host, rest = rest.split("]", 1) | |
host += "]" | |
if ":" in rest: | |
extra_host, port = rest.split(":", 1) | |
elif not host and rest: | |
host = rest | |
if extra_host and not host: | |
host = extra_host | |
return userinfo, host, port | |
def authority_from(reference, strict): | |
try: | |
subauthority = reference.authority_info() | |
except exceptions.InvalidAuthority: | |
if strict: | |
raise | |
userinfo, host, port = split_authority(reference.authority) | |
else: | |
# Thanks to Richard Barrell for this idea: | |
# https://twitter.com/0x2ba22e11/status/617338811975139328 | |
userinfo, host, port = ( | |
subauthority.get(p) for p in ("userinfo", "host", "port") | |
) | |
if port: | |
try: | |
port = int(port) | |
except ValueError: | |
raise exceptions.InvalidPort(port) | |
return userinfo, host, port | |