""" UrlscanClient: A simple Python client for interacting with the urlscan.io API. This client allows you to: - Submit a URL to be scanned. - Retrieve scan results by UUID. - Search existing scans with a query. - Perform a full scan workflow with error handling. Environment Variable: URLSCAN_API_KEY (str): If not passed during initialization, the client will attempt to use this environment variable. Dependencies: - requests - os - time """ import os import time import requests class UrlscanClient: """ A client to interact with the urlscan.io API for submitting URLs, retrieving scan results, and searching the scan database. """ BASE_URL = "https://urlscan.io/api/v1" def __init__(self, api_key=None): """ Initialize the UrlscanClient. Parameters: api_key (str, optional): Your urlscan.io API key. If not provided, it is read from the URLSCAN_API_KEY environment variable. Raises: ValueError: If the API key is not provided or found in environment variables. """ self.api_key = api_key or os.getenv("URLSCAN_API_KEY") if not self.api_key: raise ValueError("API key is required. Set it via parameter or the URLSCAN_API_KEY environment variable.") self.headers = { "API-Key": self.api_key, "Content-Type": "application/json" } def submit_url(self, url, visibility="public", tags=None, **options): """ Submit a URL for scanning. Parameters: url (str): The URL to scan. visibility (str): Scan visibility ('public', 'unlisted', or 'private'). Defaults to 'public'. tags (list, optional): Optional list of tags to associate with the scan. **options: Additional scan options like 'useragent', 'referer', 'country', etc. Returns: dict: JSON response from the submission API. Raises: requests.HTTPError: If the request fails. """ payload = { "url": url, "visibility": visibility, "country": "fr", "tags": tags or [] } payload.update(options) response = requests.post(f"{self.BASE_URL}/scan/", headers=self.headers, json=payload) response.raise_for_status() return response.json() def get_result(self, uuid, wait=True, timeout=60): """ Retrieve the result of a scan by UUID. Parameters: uuid (str): The UUID of the scan result. wait (bool): Whether to wait for the scan to complete if it's not yet ready. Defaults to True. timeout (int): Maximum time (in seconds) to wait for the result if wait is True. Defaults to 60. Returns: dict: The scan result data. Raises: TimeoutError: If the result is not ready within the timeout period. requests.HTTPError: If another HTTP error occurs. """ result_url = f"{self.BASE_URL}/result/{uuid}/" start_time = time.time() while True: response = requests.get(result_url, headers=self.headers) if response.status_code == 200: return response.json() elif response.status_code == 404: if not wait or (time.time() - start_time) > timeout: raise TimeoutError("Scan result not available yet.") time.sleep(5) else: response.raise_for_status() def search(self, query, size=10): """ Search for past scans using a query string. Parameters: query (str): The search query, such as a domain name or IP address. size (int): Maximum number of results to return. Defaults to 10. Returns: dict: Search results from urlscan.io. Raises: requests.HTTPError: If the request fails. """ params = {"q": query, "size": size} response = requests.get(f"{self.BASE_URL}/search/", headers=self.headers, params=params) response.raise_for_status() return response.json() def scan(self, url: str): """ Convenience method to submit a scan and retrieve the result. Parameters: url (str): The URL to scan. Returns: dict: The scan result, or a fallback result if the scan fails. """ try: print(f"Submit url {url}") submission = self.submit_url(url=url, visibility="public") print(f"Submitted scan. UUID: {submission['uuid']}") result = self.get_result(submission["uuid"]) print(f"Submission succeed. UUID: {submission['uuid']}") return result except requests.exceptions.RequestException as e: print(f"Submission failed {e}") return { 'page': {'url': url}, 'verdicts': {'overall': {'hasVerdicts': False, 'score': 0}} }