File size: 5,101 Bytes
d9f1916
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""
UrlscanClient: A simple Python client for interacting with the urlscan.io API.

This client allows you to:
- Submit a URL to be scanned.
- Retrieve scan results by UUID.
- Search existing scans with a query.
- Perform a full scan workflow with error handling.

Environment Variable:
    URLSCAN_API_KEY (str): If not passed during initialization, the client will attempt to use this environment variable.

Dependencies:
    - requests
    - os
    - time
"""

import os
import time
import requests

class UrlscanClient:
    """
    A client to interact with the urlscan.io API for submitting URLs, retrieving scan results,
    and searching the scan database.
    """
    BASE_URL = "https://urlscan.io/api/v1"

    def __init__(self, api_key=None):
        """
        Initialize the UrlscanClient.

        Parameters:
            api_key (str, optional): Your urlscan.io API key. If not provided, it is read from
                                     the URLSCAN_API_KEY environment variable.

        Raises:
            ValueError: If the API key is not provided or found in environment variables.
        """
        self.api_key = api_key or os.getenv("URLSCAN_API_KEY")
        if not self.api_key:
            raise ValueError("API key is required. Set it via parameter or the URLSCAN_API_KEY environment variable.")
        self.headers = {
            "API-Key": self.api_key,
            "Content-Type": "application/json"
        }

    def submit_url(self, url, visibility="public", tags=None, **options):
        """
        Submit a URL for scanning.

        Parameters:
            url (str): The URL to scan.
            visibility (str): Scan visibility ('public', 'unlisted', or 'private'). Defaults to 'public'.
            tags (list, optional): Optional list of tags to associate with the scan.
            **options: Additional scan options like 'useragent', 'referer', 'country', etc.

        Returns:
            dict: JSON response from the submission API.

        Raises:
            requests.HTTPError: If the request fails.
        """
        payload = {
            "url": url,
            "visibility": visibility,
            "country": "fr",
            "tags": tags or []
        }
        payload.update(options)
        response = requests.post(f"{self.BASE_URL}/scan/", headers=self.headers, json=payload)
        response.raise_for_status()
        return response.json()

    def get_result(self, uuid, wait=True, timeout=60):
        """
        Retrieve the result of a scan by UUID.

        Parameters:
            uuid (str): The UUID of the scan result.
            wait (bool): Whether to wait for the scan to complete if it's not yet ready. Defaults to True.
            timeout (int): Maximum time (in seconds) to wait for the result if wait is True. Defaults to 60.

        Returns:
            dict: The scan result data.

        Raises:
            TimeoutError: If the result is not ready within the timeout period.
            requests.HTTPError: If another HTTP error occurs.
        """
        result_url = f"{self.BASE_URL}/result/{uuid}/"
        start_time = time.time()
        while True:
            response = requests.get(result_url, headers=self.headers)
            if response.status_code == 200:
                return response.json()
            elif response.status_code == 404:
                if not wait or (time.time() - start_time) > timeout:
                    raise TimeoutError("Scan result not available yet.")
                time.sleep(5)
            else:
                response.raise_for_status()

    def search(self, query, size=10):
        """
        Search for past scans using a query string.

        Parameters:
            query (str): The search query, such as a domain name or IP address.
            size (int): Maximum number of results to return. Defaults to 10.

        Returns:
            dict: Search results from urlscan.io.

        Raises:
            requests.HTTPError: If the request fails.
        """
        params = {"q": query, "size": size}
        response = requests.get(f"{self.BASE_URL}/search/", headers=self.headers, params=params)
        response.raise_for_status()
        return response.json()

    def scan(self, url: str):
        """
        Convenience method to submit a scan and retrieve the result.

        Parameters:
            url (str): The URL to scan.

        Returns:
            dict: The scan result, or a fallback result if the scan fails.
        """
        try:
            print(f"Submit url {url}")
            submission = self.submit_url(url=url, visibility="public")
            print(f"Submitted scan. UUID: {submission['uuid']}")
            result = self.get_result(submission["uuid"])
            print(f"Submission succeed. UUID: {submission['uuid']}")
            return result
        except requests.exceptions.RequestException as e:
            print(f"Submission failed {e}")
            return {
                'page': {'url': url},
                'verdicts': {'overall': {'hasVerdicts': False, 'score': 0}}
            }