Spaces:
Runtime error
Runtime error
File size: 6,678 Bytes
a22e84b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
import time
from typing import Dict, List
from bs4 import BeautifulSoup
from bs4.element import Tag
from loguru import logger
from selenium.webdriver.common.by import By
from llm_engineering.domain.documents import PostDocument
from llm_engineering.domain.exceptions import ImproperlyConfigured
from llm_engineering.settings import settings
from .base import BaseSeleniumCrawler
class LinkedInCrawler(BaseSeleniumCrawler):
model = PostDocument
def __init__(self, scroll_limit: int = 5, is_deprecated: bool = True) -> None:
super().__init__(scroll_limit)
self._is_deprecated = is_deprecated
def set_extra_driver_options(self, options) -> None:
options.add_experimental_option("detach", True)
def login(self) -> None:
if self._is_deprecated:
raise DeprecationWarning(
"As LinkedIn has updated its security measures, the login() method is no longer supported."
)
self.driver.get("https://www.linkedin.com/login")
if not settings.LINKEDIN_USERNAME or not settings.LINKEDIN_PASSWORD:
raise ImproperlyConfigured(
"LinkedIn scraper requires the {LINKEDIN_USERNAME} and {LINKEDIN_PASSWORD} settings."
)
self.driver.find_element(By.ID, "username").send_keys(settings.LINKEDIN_USERNAME)
self.driver.find_element(By.ID, "password").send_keys(settings.LINKEDIN_PASSWORD)
self.driver.find_element(By.CSS_SELECTOR, ".login__form_action_container button").click()
def extract(self, link: str, **kwargs) -> None:
if self._is_deprecated:
raise DeprecationWarning(
"As LinkedIn has updated its feed structure, the extract() method is no longer supported."
)
if self.model.link is not None:
old_model = self.model.find(link=link)
if old_model is not None:
logger.info(f"Post already exists in the database: {link}")
return
logger.info(f"Starting scrapping data for profile: {link}")
self.login()
soup = self._get_page_content(link)
data = { # noqa
"Name": self._scrape_section(soup, "h1", class_="text-heading-xlarge"),
"About": self._scrape_section(soup, "div", class_="display-flex ph5 pv3"),
"Main Page": self._scrape_section(soup, "div", {"id": "main-content"}),
"Experience": self._scrape_experience(link),
"Education": self._scrape_education(link),
}
self.driver.get(link)
time.sleep(5)
button = self.driver.find_element(
By.CSS_SELECTOR, ".app-aware-link.profile-creator-shared-content-view__footer-action"
)
button.click()
# Scrolling and scraping posts
self.scroll_page()
soup = BeautifulSoup(self.driver.page_source, "html.parser")
post_elements = soup.find_all(
"div",
class_="update-components-text relative update-components-update-v2__commentary",
)
buttons = soup.find_all("button", class_="update-components-image__image-link")
post_images = self._extract_image_urls(buttons)
posts = self._extract_posts(post_elements, post_images)
logger.info(f"Found {len(posts)} posts for profile: {link}")
self.driver.close()
user = kwargs["user"]
self.model.bulk_insert(
[
PostDocument(platform="linkedin", content=post, author_id=user.id, author_full_name=user.full_name)
for post in posts
]
)
logger.info(f"Finished scrapping data for profile: {link}")
def _scrape_section(self, soup: BeautifulSoup, *args, **kwargs) -> str:
"""Scrape a specific section of the LinkedIn profile."""
# Example: Scrape the 'About' section
parent_div = soup.find(*args, **kwargs)
return parent_div.get_text(strip=True) if parent_div else ""
def _extract_image_urls(self, buttons: List[Tag]) -> Dict[str, str]:
"""
Extracts image URLs from button elements.
Args:
buttons (List[Tag]): A list of BeautifulSoup Tag objects representing buttons.
Returns:
Dict[str, str]: A dictionary mapping post indexes to image URLs.
"""
post_images = {}
for i, button in enumerate(buttons):
img_tag = button.find("img")
if img_tag and "src" in img_tag.attrs:
post_images[f"Post_{i}"] = img_tag["src"]
else:
logger.warning("No image found in this button")
return post_images
def _get_page_content(self, url: str) -> BeautifulSoup:
"""Retrieve the page content of a given URL."""
self.driver.get(url)
time.sleep(5)
return BeautifulSoup(self.driver.page_source, "html.parser")
def _extract_posts(self, post_elements: List[Tag], post_images: Dict[str, str]) -> Dict[str, Dict[str, str]]:
"""
Extracts post texts and combines them with their respective images.
Args:
post_elements (List[Tag]): A list of BeautifulSoup Tag objects representing post elements.
post_images (Dict[str, str]): A dictionary containing image URLs mapped by post index.
Returns:
Dict[str, Dict[str, str]]: A dictionary containing post data with text and optional image URL.
"""
posts_data = {}
for i, post_element in enumerate(post_elements):
post_text = post_element.get_text(strip=True, separator="\n")
post_data = {"text": post_text}
if f"Post_{i}" in post_images:
post_data["image"] = post_images[f"Post_{i}"]
posts_data[f"Post_{i}"] = post_data
return posts_data
def _scrape_experience(self, profile_url: str) -> str:
"""Scrapes the Experience section of the LinkedIn profile."""
self.driver.get(profile_url + "/details/experience/")
time.sleep(5)
soup = BeautifulSoup(self.driver.page_source, "html.parser")
experience_content = soup.find("section", {"id": "experience-section"})
return experience_content.get_text(strip=True) if experience_content else ""
def _scrape_education(self, profile_url: str) -> str:
self.driver.get(profile_url + "/details/education/")
time.sleep(5)
soup = BeautifulSoup(self.driver.page_source, "html.parser")
education_content = soup.find("section", {"id": "education-section"})
return education_content.get_text(strip=True) if education_content else ""
|