pmkhanh7890's picture
run pre-commit
38fd181
raw
history blame
7.9 kB
import json
import logging
import time
from urllib.parse import (
quote,
urlparse,
)
import requests
from bs4 import BeautifulSoup
logging.basicConfig(
filename="error.log",
level=logging.INFO,
format="%(asctime)s | [%(levelname)s]: %(message)s",
datefmt="%m-%d-%Y / %I:%M:%S %p",
)
class SearchResults:
def __init__(self, results):
self.results = results
def __str__(self):
output = ""
for result in self.results:
output += "---\n"
output += f"Title: {result.get('title', 'Title not found')}\n"
output += f"Link: {result.get('link', 'Link not found')}\n"
output += "---\n"
return output
class YandexReverseImageSearcher:
def __init__(self):
self.base_url = "https://yandex.ru/images/search"
self.headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", # noqa: E501
}
self.retry_count = 3
self.retry_delay = 1
def response(
self,
query: str,
image_url: str,
max_results: int = 10,
delay: int = 1,
) -> SearchResults:
self._validate_input(query, image_url)
encoded_query = quote(query)
encoded_image_url = quote(image_url)
url = f"{self.base_url}?q={encoded_query}&image_url={encoded_image_url}&sbisrc=cr_1_5_2" # noqa: E501
all_results = []
start_index = 0
while len(all_results) < max_results:
if start_index != 0:
time.sleep(delay)
paginated_url = f"{url}&start={start_index}"
response = self._make_request(paginated_url)
if response is None:
break
search_results, valid_content = self._parse_search_results(
response.text,
)
if not valid_content:
logging.warning("Unexpected HTML structure encountered.")
break
for result in search_results:
if len(all_results) >= max_results:
break
data = self._extract_result_data(result)
if data and data not in all_results:
all_results.append(data)
start_index += len(all_results) - start_index
if len(all_results) == 0:
logging.warning(
f"No results were found for the given query: [{query}], and/or image URL: [{image_url}].", # noqa: E501
)
return "No results found. Please try again with a different query and/or image URL." # noqa: E501
else:
return SearchResults(all_results[:max_results])
def _validate_input(self, query: str, image_url: str):
if not query:
raise ValueError(
"Query not found. Enter a query and try again.",
)
if not image_url:
raise ValueError(
"Image URL not found. Enter an image URL and try again.",
)
if not self._validate_image_url(image_url):
raise ValueError(
"Invalid image URL. Enter a valid image URL and try again.",
)
def _validate_image_url(self, url: str) -> bool:
parsed_url = urlparse(url)
path = parsed_url.path.lower()
valid_extensions = (".jpg", ".jpeg", ".png", ".webp")
return any(path.endswith(ext) for ext in valid_extensions)
def _make_request(self, url: str):
attempts = 0
while attempts < self.retry_count:
try:
response = requests.get(url, headers=self.headers)
if response.headers.get("Content-Type", "").startswith(
"text/html",
):
response.raise_for_status()
return response
else:
logging.warning("Non-HTML content received.")
return None
except requests.exceptions.HTTPError as http_err:
logging.error(f"HTTP error occurred: {http_err}")
attempts += 1
time.sleep(self.retry_delay)
except Exception as err:
logging.error(f"An error occurred: {err}")
return None
return None
def _parse_search_results(self, html_content: str):
try:
soup = BeautifulSoup(html_content, "html.parser")
return soup.find_all("div", class_="g"), True
except Exception as e:
logging.error(f"Error parsing HTML content: {e}")
return None, False
def _extract_result_data(self, result):
link = (
result.find("a", href=True)["href"]
if result.find("a", href=True)
else None
)
title = (
result.find("h3").get_text(strip=True)
if result.find("h3")
else None
)
return {"link": link, "title": title} if link and title else {}
def get_image_links(page):
"""
Extracts image URLs from the given HTML page.
Args:
page: The HTML content as a string.
Returns:
A list of image URLs.
"""
soup = BeautifulSoup(page, "html.parser")
# Find the specific section containing image links
gallery_data = soup.find(
"div",
{"class": "cbir-section cbir-section_name_sites"},
)
if gallery_data is None:
return []
# Find the container of image links
image_links_container = gallery_data.find("div", {"class": "Root"})
if image_links_container is None:
return []
data_state = json.loads(image_links_container["data-state"])
# Extract URLs from each div
image_urls = []
for site in data_state["sites"]:
original_image_url = site["originalImage"]["url"]
image_urls.append(original_image_url)
return image_urls
def yandex_reverse_image_search(file_path):
img_search_url = generate_images_search_links(file_path)
if img_search_url is None:
return []
# Simulate a user agent to avoid being blocked
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36", # noqa: E501
"Content-Type": "application/json",
}
try:
response = requests.get(img_search_url, headers=headers)
response.raise_for_status() # Raise an exception for bad status codes
# Parse the HTML content
soup = BeautifulSoup(response.content, "html.parser")
image_urls = get_image_links(soup.prettify())
return image_urls
except requests.exceptions.RequestException as e:
print(f"Error fetching image: {e}")
return []
def generate_images_search_links(file_path):
search_url = "https://yandex.ru/images/search"
params = {
"rpt": "imageview",
"format": "json",
"request": '{"blocks":[{"block":"b-page_type_search-by-image__link"}]}', # noqa: E501
}
try:
files = {"upfile": ("blob", open(file_path, "rb"), "image/jpeg/webp")}
response = requests.post(search_url, params=params, files=files)
query_string = json.loads(response.content)["blocks"][0]["params"][
"url"
]
img_search_url = search_url + "?" + query_string
return img_search_url
except requests.exceptions as e:
print(f"Error generating search URL: {e}")
return None
if __name__ == "__main__":
file_path = "T:\\Projects\\prj-nict-ai-content-detection\\data\\test_data\\towels.jpg.webp" # noqa: E501
image_urls = yandex_reverse_image_search(file_path)
for image_url in image_urls:
print(f"Image URL: {image_url}")