phishing-detector-api / urlscan_client.py
kokluch's picture
Add urlscan.io to classify as junk malicious urls.
d9f1916
"""
UrlscanClient: A simple Python client for interacting with the urlscan.io API.
This client allows you to:
- Submit a URL to be scanned.
- Retrieve scan results by UUID.
- Search existing scans with a query.
- Perform a full scan workflow with error handling.
Environment Variable:
URLSCAN_API_KEY (str): If not passed during initialization, the client will attempt to use this environment variable.
Dependencies:
- requests
- os
- time
"""
import os
import time
import requests
class UrlscanClient:
"""
A client to interact with the urlscan.io API for submitting URLs, retrieving scan results,
and searching the scan database.
"""
BASE_URL = "https://urlscan.io/api/v1"
def __init__(self, api_key=None):
"""
Initialize the UrlscanClient.
Parameters:
api_key (str, optional): Your urlscan.io API key. If not provided, it is read from
the URLSCAN_API_KEY environment variable.
Raises:
ValueError: If the API key is not provided or found in environment variables.
"""
self.api_key = api_key or os.getenv("URLSCAN_API_KEY")
if not self.api_key:
raise ValueError("API key is required. Set it via parameter or the URLSCAN_API_KEY environment variable.")
self.headers = {
"API-Key": self.api_key,
"Content-Type": "application/json"
}
def submit_url(self, url, visibility="public", tags=None, **options):
"""
Submit a URL for scanning.
Parameters:
url (str): The URL to scan.
visibility (str): Scan visibility ('public', 'unlisted', or 'private'). Defaults to 'public'.
tags (list, optional): Optional list of tags to associate with the scan.
**options: Additional scan options like 'useragent', 'referer', 'country', etc.
Returns:
dict: JSON response from the submission API.
Raises:
requests.HTTPError: If the request fails.
"""
payload = {
"url": url,
"visibility": visibility,
"country": "fr",
"tags": tags or []
}
payload.update(options)
response = requests.post(f"{self.BASE_URL}/scan/", headers=self.headers, json=payload)
response.raise_for_status()
return response.json()
def get_result(self, uuid, wait=True, timeout=60):
"""
Retrieve the result of a scan by UUID.
Parameters:
uuid (str): The UUID of the scan result.
wait (bool): Whether to wait for the scan to complete if it's not yet ready. Defaults to True.
timeout (int): Maximum time (in seconds) to wait for the result if wait is True. Defaults to 60.
Returns:
dict: The scan result data.
Raises:
TimeoutError: If the result is not ready within the timeout period.
requests.HTTPError: If another HTTP error occurs.
"""
result_url = f"{self.BASE_URL}/result/{uuid}/"
start_time = time.time()
while True:
response = requests.get(result_url, headers=self.headers)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
if not wait or (time.time() - start_time) > timeout:
raise TimeoutError("Scan result not available yet.")
time.sleep(5)
else:
response.raise_for_status()
def search(self, query, size=10):
"""
Search for past scans using a query string.
Parameters:
query (str): The search query, such as a domain name or IP address.
size (int): Maximum number of results to return. Defaults to 10.
Returns:
dict: Search results from urlscan.io.
Raises:
requests.HTTPError: If the request fails.
"""
params = {"q": query, "size": size}
response = requests.get(f"{self.BASE_URL}/search/", headers=self.headers, params=params)
response.raise_for_status()
return response.json()
def scan(self, url: str):
"""
Convenience method to submit a scan and retrieve the result.
Parameters:
url (str): The URL to scan.
Returns:
dict: The scan result, or a fallback result if the scan fails.
"""
try:
print(f"Submit url {url}")
submission = self.submit_url(url=url, visibility="public")
print(f"Submitted scan. UUID: {submission['uuid']}")
result = self.get_result(submission["uuid"])
print(f"Submission succeed. UUID: {submission['uuid']}")
return result
except requests.exceptions.RequestException as e:
print(f"Submission failed {e}")
return {
'page': {'url': url},
'verdicts': {'overall': {'hasVerdicts': False, 'score': 0}}
}