#!/usr/bin/env python3 """ Enhanced Google Maps Reviews Scraper for David's Bridal Scrapes reviews from Google Maps with parallel processing and improved element detection """ import csv import time import random import asyncio from concurrent.futures import ThreadPoolExecutor, as_completed from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException, ElementClickInterceptedException from webdriver_manager.chrome import ChromeDriverManager import pandas as pd from datetime import datetime import logging import sys import threading from queue import Queue # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class EnhancedGoogleMapsReviewsScraper: def __init__(self, headless=True, wait_time=10, max_workers=3): """Initialize the scraper with Chrome driver options""" self.wait_time = wait_time self.max_workers = max_workers self.reviews_queue = Queue() self.processed_reviews = [] self.lock = threading.Lock() self.setup_driver(headless) def setup_driver(self, headless): """Set up Chrome driver with appropriate options""" try: chrome_options = Options() if headless: chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-blink-features=AutomationControlled") chrome_options.add_argument("--disable-extensions") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--remote-debugging-port=9222") chrome_options.add_argument("--window-size=1920,1080") chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) chrome_options.add_experimental_option('useAutomationExtension', False) chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") logger.info("Setting up ChromeDriver...") service = Service(ChromeDriverManager().install()) self.driver = webdriver.Chrome(service=service, options=chrome_options) self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") self.wait = WebDriverWait(self.driver, self.wait_time) logger.info("ChromeDriver setup successful") except WebDriverException as e: logger.error(f"Failed to setup ChromeDriver: {e}") sys.exit(1) def search_location(self, query): """Search for David's Bridal location on Google Maps""" try: search_url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}" logger.info(f"Navigating to: {search_url}") self.driver.get(search_url) # Wait for page to load time.sleep(5) # Look for search results result_selectors = [ "button.hh2c6.G7m0Af", # Button with class for location ] result_found = False for selector in result_selectors: try: first_result = self.wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, selector)) ) self.driver.execute_script("arguments[0].click();", first_result) time.sleep(3) result_found = True break except TimeoutException: continue return result_found except Exception as e: logger.error(f"Error in search_location: {e}") return False def click_reviews_tab(self): """Click on the reviews tab using the specific element structure""" try: # Wait for the reviews tab to be clickable reviews_button = self.wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-tab-index='1'][aria-label*='Reviews']")) ) # Scroll the button into view self.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", reviews_button) time.sleep(1) # Click the reviews button self.driver.execute_script("arguments[0].click();", reviews_button) logger.info("Successfully clicked reviews tab") # Wait for reviews to load time.sleep(3) return True except Exception as e: logger.error(f"Could not click reviews tab: {e}") return False def expand_review_text(self, review_element): """Expand review text by clicking 'More' button if present""" try: # Look for the 'More' button within this review more_button = review_element.find_element( By.CSS_SELECTOR, "button.w8nwRe.kyuRq[aria-label='See more']" ) # Scroll button into view and click self.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", more_button) time.sleep(0.5) self.driver.execute_script("arguments[0].click();", more_button) time.sleep(1) # Wait for text to expand return True except NoSuchElementException: # No 'More' button found - review is already fully visible return False except Exception as e: logger.warning(f"Error expanding review text: {e}") return False def scroll_and_load_reviews(self, target_count=5000): """Scroll through reviews to load all available reviews""" try: scrollable_container = self.driver.find_element(By.CSS_SELECTOR, "div.m6QErb.DxyBCb.kA9KIf.dS8AEf.XiKgde") last_review_count = 0 stagnant_rounds = 0 max_stagnant_rounds = 5 scroll_attempts = 0 max_scroll_attempts = 1000 # increased max while scroll_attempts < max_scroll_attempts: # Scroll down self.driver.execute_script( "arguments[0].scrollTo(0, arguments[0].scrollHeight);", scrollable_container ) # Wait for content to load time.sleep(random.uniform(2, 4)) # Count reviews current_reviews = len(self.driver.find_elements(By.CSS_SELECTOR, "div[data-review-id]")) logger.info(f"Attempt {scroll_attempts + 1}: Loaded {current_reviews} reviews (target: {target_count})") # Check if we’ve hit the target if current_reviews >= target_count: logger.info("Reached target review count.") break # Check if no new reviews are loading if current_reviews == last_review_count: stagnant_rounds += 1 logger.info(f"No new reviews this round. Stagnant rounds: {stagnant_rounds}/{max_stagnant_rounds}") if stagnant_rounds >= max_stagnant_rounds: logger.info("No new reviews after several attempts. Stopping scroll.") break else: stagnant_rounds = 0 # reset if progress made last_review_count = current_reviews scroll_attempts += 1 # Occasionally wait longer to mimic human behavior if scroll_attempts % 10 == 0: logger.info("Taking a longer pause to mimic human browsing...") time.sleep(random.uniform(5, 8)) logger.info(f"Finished scrolling. Total reviews found: {current_reviews}") return current_reviews except Exception as e: logger.error(f"Error scrolling reviews: {e}") return 0 def extract_single_review_data(self, review_element): """Extract data from a single review element""" try: review_data = {} # First, try to expand the review text if there's a 'More' button self.expand_review_text(review_element) # Extract reviewer name try: name_element = review_element.find_element(By.CSS_SELECTOR, "div[class*='d4r55']") review_data['reviewer_name'] = name_element.text.strip() except NoSuchElementException: review_data['reviewer_name'] = "Anonymous" # Extract rating try: rating_element = review_element.find_element(By.CSS_SELECTOR, "span[role='img'][aria-label*='star']") rating_text = rating_element.get_attribute('aria-label') review_data['rating'] = self.extract_rating_from_text(rating_text) except NoSuchElementException: review_data['rating'] = None # Extract review text using the specific selector you provided try: text_element = review_element.find_element(By.CSS_SELECTOR, "span.wiI7pd") review_data['review_text'] = text_element.text.strip() except NoSuchElementException: review_data['review_text'] = "" # Extract date try: date_element = review_element.find_element(By.CSS_SELECTOR, "span.rsqaWe") review_data['date'] = date_element.text.strip() except NoSuchElementException: review_data['date'] = "" # Extract owner response if any try: response_element = review_element.find_element(By.CSS_SELECTOR, "div[class*='wiI7pd']") review_data['owner_response'] = response_element.text.strip() except NoSuchElementException: review_data['owner_response'] = "" # Add metadata review_data['scraped_at'] = datetime.now().isoformat() review_data['review_id'] = review_element.get_attribute('data-review-id') or f"review_{int(time.time() * 1000)}" return review_data except Exception as e: logger.error(f"Error extracting single review: {e}") return None def extract_rating_from_text(self, text): """Extract numeric rating from aria-label text""" if not text: return None import re # Look for patterns like "5 stars", "Rated 4 out of 5 stars" match = re.search(r'(\d+)\s*(?:out of \d+\s*)?stars?', text.lower()) if match: return int(match.group(1)) # Fallback: count star characters star_count = text.count('★') or text.count('⭐') if star_count > 0: return star_count return None def process_reviews_batch(self, review_elements, start_idx, end_idx): """Process a batch of reviews in parallel""" batch_results = [] for i in range(start_idx, min(end_idx, len(review_elements))): try: review_data = self.extract_single_review_data(review_elements[i]) if review_data: batch_results.append(review_data) logger.info(f"Processed review {i+1}/{len(review_elements)}") except Exception as e: logger.warning(f"Error processing review {i+1}: {e}") continue return batch_results def extract_all_reviews_parallel(self): """Extract all reviews using parallel processing with duplicate removal""" try: # Get all review elements using a single, specific selector review_elements = self.driver.find_elements(By.CSS_SELECTOR, "div[data-review-id]") total_reviews = len(review_elements) logger.info(f"Found {total_reviews} review elements to process") if total_reviews == 0: return [] # Use a set to track processed review IDs and avoid duplicates processed_review_ids = set() all_reviews = [] # Process reviews sequentially to better control duplicates for i, review_element in enumerate(review_elements): try: # Get review ID first to check for duplicates review_id = review_element.get_attribute('data-review-id') if review_id and review_id in processed_review_ids: logger.debug(f"Skipping duplicate review ID: {review_id}") continue # Extract review data review_data = self.extract_single_review_data(review_element) if review_data and review_data.get('review_id'): # Add to processed set to prevent duplicates processed_review_ids.add(review_data['review_id']) all_reviews.append(review_data) logger.info(f"Processed review {len(all_reviews)}/{total_reviews}") except Exception as e: logger.warning(f"Error processing review {i+1}: {e}") continue logger.info(f"Successfully extracted {len(all_reviews)} unique reviews") return all_reviews except Exception as e: logger.error(f"Error in review extraction: {e}") return [] def save_to_csv(self, reviews_data, filename="davids_bridal_reviews.csv"): """Save reviews data to CSV file with duplicate removal and better formatting""" if not reviews_data: logger.warning("No reviews data to save") return try: df = pd.DataFrame(reviews_data) # Remove duplicates based on review_id and review_text initial_count = len(df) df = df.drop_duplicates(subset=['review_id'], keep='first') # If review_id duplicates removed, also check for text duplicates as backup df = df.drop_duplicates(subset=['reviewer_name', 'review_text', 'date'], keep='first') final_count = len(df) if initial_count > final_count: logger.info(f"Removed {initial_count - final_count} duplicate reviews") # Reorder columns for better readability column_order = ['reviewer_name', 'rating', 'date', 'review_text', 'owner_response', 'review_id', 'scraped_at'] df = df.reindex(columns=column_order) # Save to CSV with proper encoding df.to_csv(filename, index=False, encoding='utf-8') logger.info(f"Successfully saved {len(df)} unique reviews to {filename}") # Print summary statistics if 'rating' in df.columns and len(df) > 0: avg_rating = df['rating'].mean() logger.info(f"Average rating: {avg_rating:.2f}") logger.info(f"Rating distribution:\n{df['rating'].value_counts().sort_index()}") except Exception as e: logger.error(f"Error saving to CSV: {e}") def scrape_reviews(self, location_query, output_file="davids_bridal_reviews.csv"): """Main method to scrape all reviews""" try: logger.info("Starting enhanced review scraping...") # Search for the location if not self.search_location(location_query): logger.error("Failed to find location") return None # Click reviews tab if not self.click_reviews_tab(): logger.error("Failed to access reviews tab") return None # Scroll to load all reviews total_loaded = self.scroll_and_load_reviews(target_count=2394) if total_loaded == 0: logger.error("No reviews found after scrolling") return None # Extract all reviews using parallel processing reviews_data = self.extract_all_reviews_parallel() # Save to CSV if reviews_data: self.save_to_csv(reviews_data, output_file) logger.info(f"Successfully scraped {len(reviews_data)} reviews") return reviews_data else: logger.warning("No reviews extracted") return None except Exception as e: logger.error(f"Error during scraping: {e}") return None finally: self.close() def close(self): """Close the browser driver""" if hasattr(self, 'driver'): self.driver.quit() def main(): """Enhanced main function with better error handling""" try: # Initialize scraper scraper = EnhancedGoogleMapsReviewsScraper( headless=False, # Set to True for background operation max_workers=3 # Adjust based on your system ) # Define search query search_query = "David's Bridal Middletown NY" logger.info(f"Starting scrape for: {search_query}") # Scrape reviews reviews = scraper.scrape_reviews( location_query=search_query, output_file="davids_bridal_middletown_reviews.csv" ) if reviews: logger.info(f"Scraping completed successfully! Total reviews: {len(reviews)}") else: logger.error("Scraping failed - no reviews collected") except KeyboardInterrupt: logger.info("Scraping interrupted by user") except Exception as e: logger.error(f"Unexpected error in main: {e}") if __name__ == "__main__": main()