Spaces:
Running
Running
text-generation-webui
/
installer_files
/conda
/lib
/python3.10
/site-packages
/cryptography
/x509
/ocsp.py
# This file is dual licensed under the terms of the Apache License, Version | |
# 2.0, and the BSD License. See the LICENSE file in the root of this repository | |
# for complete details. | |
import abc | |
import datetime | |
import typing | |
from cryptography import utils, x509 | |
from cryptography.hazmat.bindings._rust import ocsp | |
from cryptography.hazmat.primitives import hashes, serialization | |
from cryptography.hazmat.primitives.asymmetric.types import ( | |
CERTIFICATE_PRIVATE_KEY_TYPES, | |
) | |
from cryptography.x509.base import ( | |
_EARLIEST_UTC_TIME, | |
_convert_to_naive_utc_time, | |
_reject_duplicate_extension, | |
) | |
class OCSPResponderEncoding(utils.Enum): | |
HASH = "By Hash" | |
NAME = "By Name" | |
class OCSPResponseStatus(utils.Enum): | |
SUCCESSFUL = 0 | |
MALFORMED_REQUEST = 1 | |
INTERNAL_ERROR = 2 | |
TRY_LATER = 3 | |
SIG_REQUIRED = 5 | |
UNAUTHORIZED = 6 | |
_ALLOWED_HASHES = ( | |
hashes.SHA1, | |
hashes.SHA224, | |
hashes.SHA256, | |
hashes.SHA384, | |
hashes.SHA512, | |
) | |
def _verify_algorithm(algorithm: hashes.HashAlgorithm) -> None: | |
if not isinstance(algorithm, _ALLOWED_HASHES): | |
raise ValueError( | |
"Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512" | |
) | |
class OCSPCertStatus(utils.Enum): | |
GOOD = 0 | |
REVOKED = 1 | |
UNKNOWN = 2 | |
class _SingleResponse: | |
def __init__( | |
self, | |
cert: x509.Certificate, | |
issuer: x509.Certificate, | |
algorithm: hashes.HashAlgorithm, | |
cert_status: OCSPCertStatus, | |
this_update: datetime.datetime, | |
next_update: typing.Optional[datetime.datetime], | |
revocation_time: typing.Optional[datetime.datetime], | |
revocation_reason: typing.Optional[x509.ReasonFlags], | |
): | |
if not isinstance(cert, x509.Certificate) or not isinstance( | |
issuer, x509.Certificate | |
): | |
raise TypeError("cert and issuer must be a Certificate") | |
_verify_algorithm(algorithm) | |
if not isinstance(this_update, datetime.datetime): | |
raise TypeError("this_update must be a datetime object") | |
if next_update is not None and not isinstance( | |
next_update, datetime.datetime | |
): | |
raise TypeError("next_update must be a datetime object or None") | |
self._cert = cert | |
self._issuer = issuer | |
self._algorithm = algorithm | |
self._this_update = this_update | |
self._next_update = next_update | |
if not isinstance(cert_status, OCSPCertStatus): | |
raise TypeError( | |
"cert_status must be an item from the OCSPCertStatus enum" | |
) | |
if cert_status is not OCSPCertStatus.REVOKED: | |
if revocation_time is not None: | |
raise ValueError( | |
"revocation_time can only be provided if the certificate " | |
"is revoked" | |
) | |
if revocation_reason is not None: | |
raise ValueError( | |
"revocation_reason can only be provided if the certificate" | |
" is revoked" | |
) | |
else: | |
if not isinstance(revocation_time, datetime.datetime): | |
raise TypeError("revocation_time must be a datetime object") | |
revocation_time = _convert_to_naive_utc_time(revocation_time) | |
if revocation_time < _EARLIEST_UTC_TIME: | |
raise ValueError( | |
"The revocation_time must be on or after" | |
" 1950 January 1." | |
) | |
if revocation_reason is not None and not isinstance( | |
revocation_reason, x509.ReasonFlags | |
): | |
raise TypeError( | |
"revocation_reason must be an item from the ReasonFlags " | |
"enum or None" | |
) | |
self._cert_status = cert_status | |
self._revocation_time = revocation_time | |
self._revocation_reason = revocation_reason | |
class OCSPRequest(metaclass=abc.ABCMeta): | |
def issuer_key_hash(self) -> bytes: | |
""" | |
The hash of the issuer public key | |
""" | |
def issuer_name_hash(self) -> bytes: | |
""" | |
The hash of the issuer name | |
""" | |
def hash_algorithm(self) -> hashes.HashAlgorithm: | |
""" | |
The hash algorithm used in the issuer name and key hashes | |
""" | |
def serial_number(self) -> int: | |
""" | |
The serial number of the cert whose status is being checked | |
""" | |
def public_bytes(self, encoding: serialization.Encoding) -> bytes: | |
""" | |
Serializes the request to DER | |
""" | |
def extensions(self) -> x509.Extensions: | |
""" | |
The list of request extensions. Not single request extensions. | |
""" | |
class OCSPSingleResponse(metaclass=abc.ABCMeta): | |
def certificate_status(self) -> OCSPCertStatus: | |
""" | |
The status of the certificate (an element from the OCSPCertStatus enum) | |
""" | |
def revocation_time(self) -> typing.Optional[datetime.datetime]: | |
""" | |
The date of when the certificate was revoked or None if not | |
revoked. | |
""" | |
def revocation_reason(self) -> typing.Optional[x509.ReasonFlags]: | |
""" | |
The reason the certificate was revoked or None if not specified or | |
not revoked. | |
""" | |
def this_update(self) -> datetime.datetime: | |
""" | |
The most recent time at which the status being indicated is known by | |
the responder to have been correct | |
""" | |
def next_update(self) -> typing.Optional[datetime.datetime]: | |
""" | |
The time when newer information will be available | |
""" | |
def issuer_key_hash(self) -> bytes: | |
""" | |
The hash of the issuer public key | |
""" | |
def issuer_name_hash(self) -> bytes: | |
""" | |
The hash of the issuer name | |
""" | |
def hash_algorithm(self) -> hashes.HashAlgorithm: | |
""" | |
The hash algorithm used in the issuer name and key hashes | |
""" | |
def serial_number(self) -> int: | |
""" | |
The serial number of the cert whose status is being checked | |
""" | |
class OCSPResponse(metaclass=abc.ABCMeta): | |
def responses(self) -> typing.Iterator[OCSPSingleResponse]: | |
""" | |
An iterator over the individual SINGLERESP structures in the | |
response | |
""" | |
def response_status(self) -> OCSPResponseStatus: | |
""" | |
The status of the response. This is a value from the OCSPResponseStatus | |
enumeration | |
""" | |
def signature_algorithm_oid(self) -> x509.ObjectIdentifier: | |
""" | |
The ObjectIdentifier of the signature algorithm | |
""" | |
def signature_hash_algorithm( | |
self, | |
) -> typing.Optional[hashes.HashAlgorithm]: | |
""" | |
Returns a HashAlgorithm corresponding to the type of the digest signed | |
""" | |
def signature(self) -> bytes: | |
""" | |
The signature bytes | |
""" | |
def tbs_response_bytes(self) -> bytes: | |
""" | |
The tbsResponseData bytes | |
""" | |
def certificates(self) -> typing.List[x509.Certificate]: | |
""" | |
A list of certificates used to help build a chain to verify the OCSP | |
response. This situation occurs when the OCSP responder uses a delegate | |
certificate. | |
""" | |
def responder_key_hash(self) -> typing.Optional[bytes]: | |
""" | |
The responder's key hash or None | |
""" | |
def responder_name(self) -> typing.Optional[x509.Name]: | |
""" | |
The responder's Name or None | |
""" | |
def produced_at(self) -> datetime.datetime: | |
""" | |
The time the response was produced | |
""" | |
def certificate_status(self) -> OCSPCertStatus: | |
""" | |
The status of the certificate (an element from the OCSPCertStatus enum) | |
""" | |
def revocation_time(self) -> typing.Optional[datetime.datetime]: | |
""" | |
The date of when the certificate was revoked or None if not | |
revoked. | |
""" | |
def revocation_reason(self) -> typing.Optional[x509.ReasonFlags]: | |
""" | |
The reason the certificate was revoked or None if not specified or | |
not revoked. | |
""" | |
def this_update(self) -> datetime.datetime: | |
""" | |
The most recent time at which the status being indicated is known by | |
the responder to have been correct | |
""" | |
def next_update(self) -> typing.Optional[datetime.datetime]: | |
""" | |
The time when newer information will be available | |
""" | |
def issuer_key_hash(self) -> bytes: | |
""" | |
The hash of the issuer public key | |
""" | |
def issuer_name_hash(self) -> bytes: | |
""" | |
The hash of the issuer name | |
""" | |
def hash_algorithm(self) -> hashes.HashAlgorithm: | |
""" | |
The hash algorithm used in the issuer name and key hashes | |
""" | |
def serial_number(self) -> int: | |
""" | |
The serial number of the cert whose status is being checked | |
""" | |
def extensions(self) -> x509.Extensions: | |
""" | |
The list of response extensions. Not single response extensions. | |
""" | |
def single_extensions(self) -> x509.Extensions: | |
""" | |
The list of single response extensions. Not response extensions. | |
""" | |
def public_bytes(self, encoding: serialization.Encoding) -> bytes: | |
""" | |
Serializes the response to DER | |
""" | |
class OCSPRequestBuilder: | |
def __init__( | |
self, | |
request: typing.Optional[ | |
typing.Tuple[ | |
x509.Certificate, x509.Certificate, hashes.HashAlgorithm | |
] | |
] = None, | |
request_hash: typing.Optional[ | |
typing.Tuple[bytes, bytes, int, hashes.HashAlgorithm] | |
] = None, | |
extensions: typing.List[x509.Extension[x509.ExtensionType]] = [], | |
) -> None: | |
self._request = request | |
self._request_hash = request_hash | |
self._extensions = extensions | |
def add_certificate( | |
self, | |
cert: x509.Certificate, | |
issuer: x509.Certificate, | |
algorithm: hashes.HashAlgorithm, | |
) -> "OCSPRequestBuilder": | |
if self._request is not None or self._request_hash is not None: | |
raise ValueError("Only one certificate can be added to a request") | |
_verify_algorithm(algorithm) | |
if not isinstance(cert, x509.Certificate) or not isinstance( | |
issuer, x509.Certificate | |
): | |
raise TypeError("cert and issuer must be a Certificate") | |
return OCSPRequestBuilder( | |
(cert, issuer, algorithm), self._request_hash, self._extensions | |
) | |
def add_certificate_by_hash( | |
self, | |
issuer_name_hash: bytes, | |
issuer_key_hash: bytes, | |
serial_number: int, | |
algorithm: hashes.HashAlgorithm, | |
) -> "OCSPRequestBuilder": | |
if self._request is not None or self._request_hash is not None: | |
raise ValueError("Only one certificate can be added to a request") | |
if not isinstance(serial_number, int): | |
raise TypeError("serial_number must be an integer") | |
_verify_algorithm(algorithm) | |
utils._check_bytes("issuer_name_hash", issuer_name_hash) | |
utils._check_bytes("issuer_key_hash", issuer_key_hash) | |
if algorithm.digest_size != len( | |
issuer_name_hash | |
) or algorithm.digest_size != len(issuer_key_hash): | |
raise ValueError( | |
"issuer_name_hash and issuer_key_hash must be the same length " | |
"as the digest size of the algorithm" | |
) | |
return OCSPRequestBuilder( | |
self._request, | |
(issuer_name_hash, issuer_key_hash, serial_number, algorithm), | |
self._extensions, | |
) | |
def add_extension( | |
self, extval: x509.ExtensionType, critical: bool | |
) -> "OCSPRequestBuilder": | |
if not isinstance(extval, x509.ExtensionType): | |
raise TypeError("extension must be an ExtensionType") | |
extension = x509.Extension(extval.oid, critical, extval) | |
_reject_duplicate_extension(extension, self._extensions) | |
return OCSPRequestBuilder( | |
self._request, self._request_hash, self._extensions + [extension] | |
) | |
def build(self) -> OCSPRequest: | |
if self._request is None and self._request_hash is None: | |
raise ValueError("You must add a certificate before building") | |
return ocsp.create_ocsp_request(self) | |
class OCSPResponseBuilder: | |
def __init__( | |
self, | |
response: typing.Optional[_SingleResponse] = None, | |
responder_id: typing.Optional[ | |
typing.Tuple[x509.Certificate, OCSPResponderEncoding] | |
] = None, | |
certs: typing.Optional[typing.List[x509.Certificate]] = None, | |
extensions: typing.List[x509.Extension[x509.ExtensionType]] = [], | |
): | |
self._response = response | |
self._responder_id = responder_id | |
self._certs = certs | |
self._extensions = extensions | |
def add_response( | |
self, | |
cert: x509.Certificate, | |
issuer: x509.Certificate, | |
algorithm: hashes.HashAlgorithm, | |
cert_status: OCSPCertStatus, | |
this_update: datetime.datetime, | |
next_update: typing.Optional[datetime.datetime], | |
revocation_time: typing.Optional[datetime.datetime], | |
revocation_reason: typing.Optional[x509.ReasonFlags], | |
) -> "OCSPResponseBuilder": | |
if self._response is not None: | |
raise ValueError("Only one response per OCSPResponse.") | |
singleresp = _SingleResponse( | |
cert, | |
issuer, | |
algorithm, | |
cert_status, | |
this_update, | |
next_update, | |
revocation_time, | |
revocation_reason, | |
) | |
return OCSPResponseBuilder( | |
singleresp, | |
self._responder_id, | |
self._certs, | |
self._extensions, | |
) | |
def responder_id( | |
self, encoding: OCSPResponderEncoding, responder_cert: x509.Certificate | |
) -> "OCSPResponseBuilder": | |
if self._responder_id is not None: | |
raise ValueError("responder_id can only be set once") | |
if not isinstance(responder_cert, x509.Certificate): | |
raise TypeError("responder_cert must be a Certificate") | |
if not isinstance(encoding, OCSPResponderEncoding): | |
raise TypeError( | |
"encoding must be an element from OCSPResponderEncoding" | |
) | |
return OCSPResponseBuilder( | |
self._response, | |
(responder_cert, encoding), | |
self._certs, | |
self._extensions, | |
) | |
def certificates( | |
self, certs: typing.Iterable[x509.Certificate] | |
) -> "OCSPResponseBuilder": | |
if self._certs is not None: | |
raise ValueError("certificates may only be set once") | |
certs = list(certs) | |
if len(certs) == 0: | |
raise ValueError("certs must not be an empty list") | |
if not all(isinstance(x, x509.Certificate) for x in certs): | |
raise TypeError("certs must be a list of Certificates") | |
return OCSPResponseBuilder( | |
self._response, | |
self._responder_id, | |
certs, | |
self._extensions, | |
) | |
def add_extension( | |
self, extval: x509.ExtensionType, critical: bool | |
) -> "OCSPResponseBuilder": | |
if not isinstance(extval, x509.ExtensionType): | |
raise TypeError("extension must be an ExtensionType") | |
extension = x509.Extension(extval.oid, critical, extval) | |
_reject_duplicate_extension(extension, self._extensions) | |
return OCSPResponseBuilder( | |
self._response, | |
self._responder_id, | |
self._certs, | |
self._extensions + [extension], | |
) | |
def sign( | |
self, | |
private_key: CERTIFICATE_PRIVATE_KEY_TYPES, | |
algorithm: typing.Optional[hashes.HashAlgorithm], | |
) -> OCSPResponse: | |
if self._response is None: | |
raise ValueError("You must add a response before signing") | |
if self._responder_id is None: | |
raise ValueError("You must add a responder_id before signing") | |
return ocsp.create_ocsp_response( | |
OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm | |
) | |
def build_unsuccessful( | |
cls, response_status: OCSPResponseStatus | |
) -> OCSPResponse: | |
if not isinstance(response_status, OCSPResponseStatus): | |
raise TypeError( | |
"response_status must be an item from OCSPResponseStatus" | |
) | |
if response_status is OCSPResponseStatus.SUCCESSFUL: | |
raise ValueError("response_status cannot be SUCCESSFUL") | |
return ocsp.create_ocsp_response(response_status, None, None, None) | |
def load_der_ocsp_request(data: bytes) -> OCSPRequest: | |
return ocsp.load_der_ocsp_request(data) | |
def load_der_ocsp_response(data: bytes) -> OCSPResponse: | |
return ocsp.load_der_ocsp_response(data) | |