|
|
|
""" |
|
Enhanced Google Maps Reviews Scraper for David's Bridal |
|
Scrapes reviews from Google Maps with parallel processing and improved element detection |
|
""" |
|
|
|
import csv |
|
import time |
|
import random |
|
import asyncio |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from selenium import webdriver |
|
from selenium.webdriver.common.by import By |
|
from selenium.webdriver.support.ui import WebDriverWait |
|
from selenium.webdriver.support import expected_conditions as EC |
|
from selenium.webdriver.chrome.options import Options |
|
from selenium.webdriver.chrome.service import Service |
|
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException, ElementClickInterceptedException |
|
from webdriver_manager.chrome import ChromeDriverManager |
|
import pandas as pd |
|
from datetime import datetime |
|
import logging |
|
import sys |
|
import threading |
|
from queue import Queue |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
class EnhancedGoogleMapsReviewsScraper: |
|
def __init__(self, headless=True, wait_time=10, max_workers=3): |
|
"""Initialize the scraper with Chrome driver options""" |
|
self.wait_time = wait_time |
|
self.max_workers = max_workers |
|
self.reviews_queue = Queue() |
|
self.processed_reviews = [] |
|
self.lock = threading.Lock() |
|
self.setup_driver(headless) |
|
|
|
def setup_driver(self, headless): |
|
"""Set up Chrome driver with appropriate options""" |
|
try: |
|
chrome_options = Options() |
|
if headless: |
|
chrome_options.add_argument("--headless") |
|
chrome_options.add_argument("--no-sandbox") |
|
chrome_options.add_argument("--disable-dev-shm-usage") |
|
chrome_options.add_argument("--disable-blink-features=AutomationControlled") |
|
chrome_options.add_argument("--disable-extensions") |
|
chrome_options.add_argument("--disable-gpu") |
|
chrome_options.add_argument("--remote-debugging-port=9222") |
|
chrome_options.add_argument("--window-size=1920,1080") |
|
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) |
|
chrome_options.add_experimental_option('useAutomationExtension', False) |
|
chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") |
|
|
|
logger.info("Setting up ChromeDriver...") |
|
service = Service(ChromeDriverManager().install()) |
|
|
|
self.driver = webdriver.Chrome(service=service, options=chrome_options) |
|
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") |
|
self.wait = WebDriverWait(self.driver, self.wait_time) |
|
logger.info("ChromeDriver setup successful") |
|
|
|
except WebDriverException as e: |
|
logger.error(f"Failed to setup ChromeDriver: {e}") |
|
sys.exit(1) |
|
|
|
def search_location(self, query): |
|
"""Search for David's Bridal location on Google Maps""" |
|
try: |
|
search_url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}" |
|
logger.info(f"Navigating to: {search_url}") |
|
self.driver.get(search_url) |
|
|
|
|
|
time.sleep(5) |
|
|
|
|
|
result_selectors = [ |
|
"button.hh2c6.G7m0Af", |
|
] |
|
|
|
result_found = False |
|
for selector in result_selectors: |
|
try: |
|
first_result = self.wait.until( |
|
EC.element_to_be_clickable((By.CSS_SELECTOR, selector)) |
|
) |
|
self.driver.execute_script("arguments[0].click();", first_result) |
|
time.sleep(3) |
|
result_found = True |
|
break |
|
except TimeoutException: |
|
continue |
|
|
|
return result_found |
|
|
|
except Exception as e: |
|
logger.error(f"Error in search_location: {e}") |
|
return False |
|
|
|
def click_reviews_tab(self): |
|
"""Click on the reviews tab using the specific element structure""" |
|
try: |
|
|
|
reviews_button = self.wait.until( |
|
EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-tab-index='1'][aria-label*='Reviews']")) |
|
) |
|
|
|
|
|
self.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", reviews_button) |
|
time.sleep(1) |
|
|
|
|
|
self.driver.execute_script("arguments[0].click();", reviews_button) |
|
logger.info("Successfully clicked reviews tab") |
|
|
|
|
|
time.sleep(3) |
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"Could not click reviews tab: {e}") |
|
return False |
|
|
|
def expand_review_text(self, review_element): |
|
"""Expand review text by clicking 'More' button if present""" |
|
try: |
|
|
|
more_button = review_element.find_element( |
|
By.CSS_SELECTOR, |
|
"button.w8nwRe.kyuRq[aria-label='See more']" |
|
) |
|
|
|
|
|
self.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", more_button) |
|
time.sleep(0.5) |
|
self.driver.execute_script("arguments[0].click();", more_button) |
|
time.sleep(1) |
|
return True |
|
|
|
except NoSuchElementException: |
|
|
|
return False |
|
except Exception as e: |
|
logger.warning(f"Error expanding review text: {e}") |
|
return False |
|
|
|
def scroll_and_load_reviews(self, target_count=5000): |
|
"""Scroll through reviews to load all available reviews""" |
|
try: |
|
scrollable_container = self.driver.find_element(By.CSS_SELECTOR, "div.m6QErb.DxyBCb.kA9KIf.dS8AEf.XiKgde") |
|
last_review_count = 0 |
|
stagnant_rounds = 0 |
|
max_stagnant_rounds = 5 |
|
scroll_attempts = 0 |
|
max_scroll_attempts = 1000 |
|
|
|
while scroll_attempts < max_scroll_attempts: |
|
|
|
self.driver.execute_script( |
|
"arguments[0].scrollTo(0, arguments[0].scrollHeight);", |
|
scrollable_container |
|
) |
|
|
|
|
|
time.sleep(random.uniform(2, 4)) |
|
|
|
|
|
current_reviews = len(self.driver.find_elements(By.CSS_SELECTOR, "div[data-review-id]")) |
|
logger.info(f"Attempt {scroll_attempts + 1}: Loaded {current_reviews} reviews (target: {target_count})") |
|
|
|
|
|
if current_reviews >= target_count: |
|
logger.info("Reached target review count.") |
|
break |
|
|
|
|
|
if current_reviews == last_review_count: |
|
stagnant_rounds += 1 |
|
logger.info(f"No new reviews this round. Stagnant rounds: {stagnant_rounds}/{max_stagnant_rounds}") |
|
if stagnant_rounds >= max_stagnant_rounds: |
|
logger.info("No new reviews after several attempts. Stopping scroll.") |
|
break |
|
else: |
|
stagnant_rounds = 0 |
|
|
|
last_review_count = current_reviews |
|
scroll_attempts += 1 |
|
|
|
|
|
if scroll_attempts % 10 == 0: |
|
logger.info("Taking a longer pause to mimic human browsing...") |
|
time.sleep(random.uniform(5, 8)) |
|
|
|
logger.info(f"Finished scrolling. Total reviews found: {current_reviews}") |
|
return current_reviews |
|
|
|
except Exception as e: |
|
logger.error(f"Error scrolling reviews: {e}") |
|
return 0 |
|
|
|
def extract_single_review_data(self, review_element): |
|
"""Extract data from a single review element""" |
|
try: |
|
review_data = {} |
|
|
|
|
|
self.expand_review_text(review_element) |
|
|
|
|
|
try: |
|
name_element = review_element.find_element(By.CSS_SELECTOR, "div[class*='d4r55']") |
|
review_data['reviewer_name'] = name_element.text.strip() |
|
except NoSuchElementException: |
|
review_data['reviewer_name'] = "Anonymous" |
|
|
|
|
|
try: |
|
rating_element = review_element.find_element(By.CSS_SELECTOR, "span[role='img'][aria-label*='star']") |
|
rating_text = rating_element.get_attribute('aria-label') |
|
review_data['rating'] = self.extract_rating_from_text(rating_text) |
|
except NoSuchElementException: |
|
review_data['rating'] = None |
|
|
|
|
|
try: |
|
text_element = review_element.find_element(By.CSS_SELECTOR, "span.wiI7pd") |
|
review_data['review_text'] = text_element.text.strip() |
|
except NoSuchElementException: |
|
review_data['review_text'] = "" |
|
|
|
|
|
try: |
|
date_element = review_element.find_element(By.CSS_SELECTOR, "span.rsqaWe") |
|
review_data['date'] = date_element.text.strip() |
|
except NoSuchElementException: |
|
review_data['date'] = "" |
|
|
|
|
|
try: |
|
response_element = review_element.find_element(By.CSS_SELECTOR, "div[class*='wiI7pd']") |
|
review_data['owner_response'] = response_element.text.strip() |
|
except NoSuchElementException: |
|
review_data['owner_response'] = "" |
|
|
|
|
|
review_data['scraped_at'] = datetime.now().isoformat() |
|
review_data['review_id'] = review_element.get_attribute('data-review-id') or f"review_{int(time.time() * 1000)}" |
|
|
|
return review_data |
|
|
|
except Exception as e: |
|
logger.error(f"Error extracting single review: {e}") |
|
return None |
|
|
|
def extract_rating_from_text(self, text): |
|
"""Extract numeric rating from aria-label text""" |
|
if not text: |
|
return None |
|
|
|
import re |
|
|
|
match = re.search(r'(\d+)\s*(?:out of \d+\s*)?stars?', text.lower()) |
|
if match: |
|
return int(match.group(1)) |
|
|
|
|
|
star_count = text.count('★') or text.count('⭐') |
|
if star_count > 0: |
|
return star_count |
|
|
|
return None |
|
|
|
def process_reviews_batch(self, review_elements, start_idx, end_idx): |
|
"""Process a batch of reviews in parallel""" |
|
batch_results = [] |
|
|
|
for i in range(start_idx, min(end_idx, len(review_elements))): |
|
try: |
|
review_data = self.extract_single_review_data(review_elements[i]) |
|
if review_data: |
|
batch_results.append(review_data) |
|
logger.info(f"Processed review {i+1}/{len(review_elements)}") |
|
except Exception as e: |
|
logger.warning(f"Error processing review {i+1}: {e}") |
|
continue |
|
|
|
return batch_results |
|
|
|
def extract_all_reviews_parallel(self): |
|
"""Extract all reviews using parallel processing with duplicate removal""" |
|
try: |
|
|
|
review_elements = self.driver.find_elements(By.CSS_SELECTOR, "div[data-review-id]") |
|
total_reviews = len(review_elements) |
|
logger.info(f"Found {total_reviews} review elements to process") |
|
|
|
if total_reviews == 0: |
|
return [] |
|
|
|
|
|
processed_review_ids = set() |
|
all_reviews = [] |
|
|
|
|
|
for i, review_element in enumerate(review_elements): |
|
try: |
|
|
|
review_id = review_element.get_attribute('data-review-id') |
|
|
|
if review_id and review_id in processed_review_ids: |
|
logger.debug(f"Skipping duplicate review ID: {review_id}") |
|
continue |
|
|
|
|
|
review_data = self.extract_single_review_data(review_element) |
|
|
|
if review_data and review_data.get('review_id'): |
|
|
|
processed_review_ids.add(review_data['review_id']) |
|
all_reviews.append(review_data) |
|
logger.info(f"Processed review {len(all_reviews)}/{total_reviews}") |
|
|
|
except Exception as e: |
|
logger.warning(f"Error processing review {i+1}: {e}") |
|
continue |
|
|
|
logger.info(f"Successfully extracted {len(all_reviews)} unique reviews") |
|
return all_reviews |
|
|
|
except Exception as e: |
|
logger.error(f"Error in review extraction: {e}") |
|
return [] |
|
|
|
def save_to_csv(self, reviews_data, filename="davids_bridal_reviews.csv"): |
|
"""Save reviews data to CSV file with duplicate removal and better formatting""" |
|
if not reviews_data: |
|
logger.warning("No reviews data to save") |
|
return |
|
|
|
try: |
|
df = pd.DataFrame(reviews_data) |
|
|
|
|
|
initial_count = len(df) |
|
df = df.drop_duplicates(subset=['review_id'], keep='first') |
|
|
|
|
|
df = df.drop_duplicates(subset=['reviewer_name', 'review_text', 'date'], keep='first') |
|
|
|
final_count = len(df) |
|
if initial_count > final_count: |
|
logger.info(f"Removed {initial_count - final_count} duplicate reviews") |
|
|
|
|
|
column_order = ['reviewer_name', 'rating', 'date', 'review_text', 'owner_response', 'review_id', 'scraped_at'] |
|
df = df.reindex(columns=column_order) |
|
|
|
|
|
df.to_csv(filename, index=False, encoding='utf-8') |
|
logger.info(f"Successfully saved {len(df)} unique reviews to {filename}") |
|
|
|
|
|
if 'rating' in df.columns and len(df) > 0: |
|
avg_rating = df['rating'].mean() |
|
logger.info(f"Average rating: {avg_rating:.2f}") |
|
logger.info(f"Rating distribution:\n{df['rating'].value_counts().sort_index()}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error saving to CSV: {e}") |
|
|
|
def scrape_reviews(self, location_query, output_file="davids_bridal_reviews.csv"): |
|
"""Main method to scrape all reviews""" |
|
try: |
|
logger.info("Starting enhanced review scraping...") |
|
|
|
|
|
if not self.search_location(location_query): |
|
logger.error("Failed to find location") |
|
return None |
|
|
|
|
|
if not self.click_reviews_tab(): |
|
logger.error("Failed to access reviews tab") |
|
return None |
|
|
|
|
|
total_loaded = self.scroll_and_load_reviews(target_count=2394) |
|
|
|
if total_loaded == 0: |
|
logger.error("No reviews found after scrolling") |
|
return None |
|
|
|
|
|
reviews_data = self.extract_all_reviews_parallel() |
|
|
|
|
|
if reviews_data: |
|
self.save_to_csv(reviews_data, output_file) |
|
logger.info(f"Successfully scraped {len(reviews_data)} reviews") |
|
return reviews_data |
|
else: |
|
logger.warning("No reviews extracted") |
|
return None |
|
|
|
except Exception as e: |
|
logger.error(f"Error during scraping: {e}") |
|
return None |
|
finally: |
|
self.close() |
|
|
|
def close(self): |
|
"""Close the browser driver""" |
|
if hasattr(self, 'driver'): |
|
self.driver.quit() |
|
|
|
def main(): |
|
"""Enhanced main function with better error handling""" |
|
try: |
|
|
|
scraper = EnhancedGoogleMapsReviewsScraper( |
|
headless=False, |
|
max_workers=3 |
|
) |
|
|
|
|
|
search_query = "David's Bridal Middletown NY" |
|
|
|
logger.info(f"Starting scrape for: {search_query}") |
|
|
|
|
|
reviews = scraper.scrape_reviews( |
|
location_query=search_query, |
|
output_file="davids_bridal_middletown_reviews.csv" |
|
) |
|
|
|
if reviews: |
|
logger.info(f"Scraping completed successfully! Total reviews: {len(reviews)}") |
|
else: |
|
logger.error("Scraping failed - no reviews collected") |
|
|
|
except KeyboardInterrupt: |
|
logger.info("Scraping interrupted by user") |
|
except Exception as e: |
|
logger.error(f"Unexpected error in main: {e}") |
|
|
|
if __name__ == "__main__": |
|
main() |