|
"""Utility functions for aiohappyeyeballs.""" |
|
|
|
import ipaddress |
|
import socket |
|
from typing import Dict, List, Optional, Tuple, Union |
|
|
|
from .types import AddrInfoType |
|
|
|
|
|
def addr_to_addr_infos( |
|
addr: Optional[ |
|
Union[Tuple[str, int, int, int], Tuple[str, int, int], Tuple[str, int]] |
|
], |
|
) -> Optional[List[AddrInfoType]]: |
|
"""Convert an address tuple to a list of addr_info tuples.""" |
|
if addr is None: |
|
return None |
|
host = addr[0] |
|
port = addr[1] |
|
is_ipv6 = ":" in host |
|
if is_ipv6: |
|
flowinfo = 0 |
|
scopeid = 0 |
|
addr_len = len(addr) |
|
if addr_len >= 4: |
|
scopeid = addr[3] |
|
if addr_len >= 3: |
|
flowinfo = addr[2] |
|
addr = (host, port, flowinfo, scopeid) |
|
family = socket.AF_INET6 |
|
else: |
|
addr = (host, port) |
|
family = socket.AF_INET |
|
return [(family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)] |
|
|
|
|
|
def pop_addr_infos_interleave( |
|
addr_infos: List[AddrInfoType], interleave: Optional[int] = None |
|
) -> None: |
|
""" |
|
Pop addr_info from the list of addr_infos by family up to interleave times. |
|
|
|
The interleave parameter is used to know how many addr_infos for |
|
each family should be popped of the top of the list. |
|
""" |
|
seen: Dict[int, int] = {} |
|
if interleave is None: |
|
interleave = 1 |
|
to_remove: List[AddrInfoType] = [] |
|
for addr_info in addr_infos: |
|
family = addr_info[0] |
|
if family not in seen: |
|
seen[family] = 0 |
|
if seen[family] < interleave: |
|
to_remove.append(addr_info) |
|
seen[family] += 1 |
|
for addr_info in to_remove: |
|
addr_infos.remove(addr_info) |
|
|
|
|
|
def _addr_tuple_to_ip_address( |
|
addr: Union[Tuple[str, int], Tuple[str, int, int, int]], |
|
) -> Union[ |
|
Tuple[ipaddress.IPv4Address, int], Tuple[ipaddress.IPv6Address, int, int, int] |
|
]: |
|
"""Convert an address tuple to an IPv4Address.""" |
|
return (ipaddress.ip_address(addr[0]), *addr[1:]) |
|
|
|
|
|
def remove_addr_infos( |
|
addr_infos: List[AddrInfoType], |
|
addr: Union[Tuple[str, int], Tuple[str, int, int, int]], |
|
) -> None: |
|
""" |
|
Remove an address from the list of addr_infos. |
|
|
|
The addr value is typically the return value of |
|
sock.getpeername(). |
|
""" |
|
bad_addrs_infos: List[AddrInfoType] = [] |
|
for addr_info in addr_infos: |
|
if addr_info[-1] == addr: |
|
bad_addrs_infos.append(addr_info) |
|
if bad_addrs_infos: |
|
for bad_addr_info in bad_addrs_infos: |
|
addr_infos.remove(bad_addr_info) |
|
return |
|
|
|
match_addr = _addr_tuple_to_ip_address(addr) |
|
for addr_info in addr_infos: |
|
if match_addr == _addr_tuple_to_ip_address(addr_info[-1]): |
|
bad_addrs_infos.append(addr_info) |
|
if bad_addrs_infos: |
|
for bad_addr_info in bad_addrs_infos: |
|
addr_infos.remove(bad_addr_info) |
|
return |
|
raise ValueError(f"Address {addr} not found in addr_infos") |
|
|