Spaces:
Running
Running
""" | |
UrlscanClient: A simple Python client for interacting with the urlscan.io API. | |
This client allows you to: | |
- Submit a URL to be scanned. | |
- Retrieve scan results by UUID. | |
- Search existing scans with a query. | |
- Perform a full scan workflow with error handling. | |
Environment Variable: | |
URLSCAN_API_KEY (str): If not passed during initialization, the client will attempt to use this environment variable. | |
Dependencies: | |
- requests | |
- os | |
- time | |
""" | |
import os | |
import time | |
import requests | |
class UrlscanClient: | |
""" | |
A client to interact with the urlscan.io API for submitting URLs, retrieving scan results, | |
and searching the scan database. | |
""" | |
BASE_URL = "https://urlscan.io/api/v1" | |
def __init__(self, api_key=None): | |
""" | |
Initialize the UrlscanClient. | |
Parameters: | |
api_key (str, optional): Your urlscan.io API key. If not provided, it is read from | |
the URLSCAN_API_KEY environment variable. | |
Raises: | |
ValueError: If the API key is not provided or found in environment variables. | |
""" | |
self.api_key = api_key or os.getenv("URLSCAN_API_KEY") | |
if not self.api_key: | |
raise ValueError("API key is required. Set it via parameter or the URLSCAN_API_KEY environment variable.") | |
self.headers = { | |
"API-Key": self.api_key, | |
"Content-Type": "application/json" | |
} | |
def submit_url(self, url, visibility="public", tags=None, **options): | |
""" | |
Submit a URL for scanning. | |
Parameters: | |
url (str): The URL to scan. | |
visibility (str): Scan visibility ('public', 'unlisted', or 'private'). Defaults to 'public'. | |
tags (list, optional): Optional list of tags to associate with the scan. | |
**options: Additional scan options like 'useragent', 'referer', 'country', etc. | |
Returns: | |
dict: JSON response from the submission API. | |
Raises: | |
requests.HTTPError: If the request fails. | |
""" | |
payload = { | |
"url": url, | |
"visibility": visibility, | |
"country": "fr", | |
"tags": tags or [] | |
} | |
payload.update(options) | |
response = requests.post(f"{self.BASE_URL}/scan/", headers=self.headers, json=payload) | |
response.raise_for_status() | |
return response.json() | |
def get_result(self, uuid, wait=True, timeout=60): | |
""" | |
Retrieve the result of a scan by UUID. | |
Parameters: | |
uuid (str): The UUID of the scan result. | |
wait (bool): Whether to wait for the scan to complete if it's not yet ready. Defaults to True. | |
timeout (int): Maximum time (in seconds) to wait for the result if wait is True. Defaults to 60. | |
Returns: | |
dict: The scan result data. | |
Raises: | |
TimeoutError: If the result is not ready within the timeout period. | |
requests.HTTPError: If another HTTP error occurs. | |
""" | |
result_url = f"{self.BASE_URL}/result/{uuid}/" | |
start_time = time.time() | |
while True: | |
response = requests.get(result_url, headers=self.headers) | |
if response.status_code == 200: | |
return response.json() | |
elif response.status_code == 404: | |
if not wait or (time.time() - start_time) > timeout: | |
raise TimeoutError("Scan result not available yet.") | |
time.sleep(5) | |
else: | |
response.raise_for_status() | |
def search(self, query, size=10): | |
""" | |
Search for past scans using a query string. | |
Parameters: | |
query (str): The search query, such as a domain name or IP address. | |
size (int): Maximum number of results to return. Defaults to 10. | |
Returns: | |
dict: Search results from urlscan.io. | |
Raises: | |
requests.HTTPError: If the request fails. | |
""" | |
params = {"q": query, "size": size} | |
response = requests.get(f"{self.BASE_URL}/search/", headers=self.headers, params=params) | |
response.raise_for_status() | |
return response.json() | |
def scan(self, url: str): | |
""" | |
Convenience method to submit a scan and retrieve the result. | |
Parameters: | |
url (str): The URL to scan. | |
Returns: | |
dict: The scan result, or a fallback result if the scan fails. | |
""" | |
try: | |
print(f"Submit url {url}") | |
submission = self.submit_url(url=url, visibility="public") | |
print(f"Submitted scan. UUID: {submission['uuid']}") | |
result = self.get_result(submission["uuid"]) | |
print(f"Submission succeed. UUID: {submission['uuid']}") | |
return result | |
except requests.exceptions.RequestException as e: | |
print(f"Submission failed {e}") | |
return { | |
'page': {'url': url}, | |
'verdicts': {'overall': {'hasVerdicts': False, 'score': 0}} | |
} |