Spaces:
Sleeping
Sleeping
import json | |
import logging | |
import time | |
from urllib.parse import ( | |
quote, | |
urlparse, | |
) | |
import requests | |
from bs4 import BeautifulSoup | |
logging.basicConfig( | |
filename="error.log", | |
level=logging.INFO, | |
format="%(asctime)s | [%(levelname)s]: %(message)s", | |
datefmt="%m-%d-%Y / %I:%M:%S %p", | |
) | |
class SearchResults: | |
def __init__(self, results): | |
self.results = results | |
def __str__(self): | |
output = "" | |
for result in self.results: | |
output += "---\n" | |
output += f"Title: {result.get('title', 'Title not found')}\n" | |
output += f"Link: {result.get('link', 'Link not found')}\n" | |
output += "---\n" | |
return output | |
class YandexReverseImageSearcher: | |
def __init__(self): | |
self.base_url = "https://yandex.ru/images/search" | |
self.headers = { | |
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", # noqa: E501 | |
} | |
self.retry_count = 3 | |
self.retry_delay = 1 | |
def response( | |
self, | |
query: str, | |
image_url: str, | |
max_results: int = 10, | |
delay: int = 1, | |
) -> SearchResults: | |
self._validate_input(query, image_url) | |
encoded_query = quote(query) | |
encoded_image_url = quote(image_url) | |
url = f"{self.base_url}?q={encoded_query}&image_url={encoded_image_url}&sbisrc=cr_1_5_2" # noqa: E501 | |
all_results = [] | |
start_index = 0 | |
while len(all_results) < max_results: | |
if start_index != 0: | |
time.sleep(delay) | |
paginated_url = f"{url}&start={start_index}" | |
response = self._make_request(paginated_url) | |
if response is None: | |
break | |
search_results, valid_content = self._parse_search_results( | |
response.text, | |
) | |
if not valid_content: | |
logging.warning("Unexpected HTML structure encountered.") | |
break | |
for result in search_results: | |
if len(all_results) >= max_results: | |
break | |
data = self._extract_result_data(result) | |
if data and data not in all_results: | |
all_results.append(data) | |
start_index += len(all_results) - start_index | |
if len(all_results) == 0: | |
logging.warning( | |
f"No results were found for the given query: [{query}], and/or image URL: [{image_url}].", # noqa: E501 | |
) | |
return "No results found. Please try again with a different query and/or image URL." # noqa: E501 | |
else: | |
return SearchResults(all_results[:max_results]) | |
def _validate_input(self, query: str, image_url: str): | |
if not query: | |
raise ValueError( | |
"Query not found. Enter a query and try again.", | |
) | |
if not image_url: | |
raise ValueError( | |
"Image URL not found. Enter an image URL and try again.", | |
) | |
if not self._validate_image_url(image_url): | |
raise ValueError( | |
"Invalid image URL. Enter a valid image URL and try again.", | |
) | |
def _validate_image_url(self, url: str) -> bool: | |
parsed_url = urlparse(url) | |
path = parsed_url.path.lower() | |
valid_extensions = (".jpg", ".jpeg", ".png", ".webp") | |
return any(path.endswith(ext) for ext in valid_extensions) | |
def _make_request(self, url: str): | |
attempts = 0 | |
while attempts < self.retry_count: | |
try: | |
response = requests.get(url, headers=self.headers) | |
if response.headers.get("Content-Type", "").startswith( | |
"text/html", | |
): | |
response.raise_for_status() | |
return response | |
else: | |
logging.warning("Non-HTML content received.") | |
return None | |
except requests.exceptions.HTTPError as http_err: | |
logging.error(f"HTTP error occurred: {http_err}") | |
attempts += 1 | |
time.sleep(self.retry_delay) | |
except Exception as err: | |
logging.error(f"An error occurred: {err}") | |
return None | |
return None | |
def _parse_search_results(self, html_content: str): | |
try: | |
soup = BeautifulSoup(html_content, "html.parser") | |
return soup.find_all("div", class_="g"), True | |
except Exception as e: | |
logging.error(f"Error parsing HTML content: {e}") | |
return None, False | |
def _extract_result_data(self, result): | |
link = ( | |
result.find("a", href=True)["href"] | |
if result.find("a", href=True) | |
else None | |
) | |
title = ( | |
result.find("h3").get_text(strip=True) | |
if result.find("h3") | |
else None | |
) | |
return {"link": link, "title": title} if link and title else {} | |
def get_image_links(page): | |
""" | |
Extracts image URLs from the given HTML page. | |
Args: | |
page: The HTML content as a string. | |
Returns: | |
A list of image URLs. | |
""" | |
soup = BeautifulSoup(page, "html.parser") | |
# Find the specific section containing image links | |
gallery_data = soup.find( | |
"div", | |
{"class": "cbir-section cbir-section_name_sites"}, | |
) | |
if gallery_data is None: | |
return [] | |
# Find the container of image links | |
image_links_container = gallery_data.find("div", {"class": "Root"}) | |
if image_links_container is None: | |
return [] | |
data_state = json.loads(image_links_container["data-state"]) | |
# Extract URLs from each div | |
image_urls = [] | |
for site in data_state["sites"]: | |
original_image_url = site["originalImage"]["url"] | |
image_urls.append(original_image_url) | |
return image_urls | |
def yandex_reverse_image_search(file_path): | |
img_search_url = generate_images_search_links(file_path) | |
if img_search_url is None: | |
return [] | |
# Simulate a user agent to avoid being blocked | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36", # noqa: E501 | |
"Content-Type": "application/json", | |
} | |
try: | |
response = requests.get(img_search_url, headers=headers) | |
response.raise_for_status() # Raise an exception for bad status codes | |
# Parse the HTML content | |
soup = BeautifulSoup(response.content, "html.parser") | |
image_urls = get_image_links(soup.prettify()) | |
return image_urls | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching image: {e}") | |
return [] | |
def generate_images_search_links(file_path): | |
search_url = "https://yandex.ru/images/search" | |
params = { | |
"rpt": "imageview", | |
"format": "json", | |
"request": '{"blocks":[{"block":"b-page_type_search-by-image__link"}]}', # noqa: E501 | |
} | |
try: | |
files = {"upfile": ("blob", open(file_path, "rb"), "image/jpeg/webp")} | |
response = requests.post(search_url, params=params, files=files) | |
query_string = json.loads(response.content)["blocks"][0]["params"][ | |
"url" | |
] | |
img_search_url = search_url + "?" + query_string | |
return img_search_url | |
except requests.exceptions as e: | |
print(f"Error generating search URL: {e}") | |
return None | |
if __name__ == "__main__": | |
file_path = "T:\\Projects\\prj-nict-ai-content-detection\\data\\test_data\\towels.jpg.webp" # noqa: E501 | |
image_urls = yandex_reverse_image_search(file_path) | |
for image_url in image_urls: | |
print(f"Image URL: {image_url}") | |