Spaces:
Runtime error
Runtime error
'''Helper functions for MCP tools.''' | |
import re | |
import logging | |
import urllib.request | |
from urllib.error import HTTPError, URLError | |
import feedparser | |
from boilerpy3 import extractors | |
from boilerpy3.exceptions import HTMLExtractionError | |
from findfeed import search as feed_search | |
from googlesearch import search as google_search | |
FEED_URIS = {} | |
RSS_EXTENSIONS = ['xml', 'rss', 'atom'] | |
COMMON_EXTENSIONS = ['com', 'net', 'org', 'edu', 'gov', 'co', 'us'] | |
def find_feed_uri(website: str) -> str: | |
'''Attempts to find URI for RSS feed. First checks if string provided in | |
website is a feed URI, it it's not, checks if website is a URL, if so, | |
uses that to find the RSS feed URI. If the provided string is neither, | |
defaults to Google search to find website URL and then uses that to try | |
and find the Feed. | |
Args: | |
website: target resource to find RSS feed URI for, can be website URL or | |
name of website | |
Returns: | |
RSS feed URI for website | |
''' | |
logger = logging.getLogger(__name__ + '.find_feed_uri') | |
logger.info('Finding feed URI for %s', website) | |
# Find the feed URI | |
feed_uri = None | |
# If the website contains xml, rss or atom, assume it's an RSS URI | |
if any(extension in website.lower() for extension in RSS_EXTENSIONS): | |
feed_uri = website | |
logger.info('%s looks like a feed URI already - using it directly', website) | |
# Next, check the cache to see if we already have this feed's URI | |
elif website in FEED_URIS: | |
feed_uri = FEED_URIS[website] | |
logger.info('%s feed URI in cache: %s', website, feed_uri) | |
# If neither of those get it - try feedparse if it looks like a url | |
# or else just google it | |
else: | |
if website.split('.')[-1] in COMMON_EXTENSIONS: | |
website_url = website | |
logger.info('%s looks like a website URL', website) | |
else: | |
website_url = _get_url(website) | |
logger.info('Google result for %s: %s', website, website_url) | |
feed_uri = _get_feed(website_url) | |
logger.info('get_feed() returned %s', feed_uri) | |
FEED_URIS[website] = feed_uri | |
return feed_uri | |
def parse_feed(feed_uri: str) -> list: | |
'''Gets content from a remote RSS feed URI. | |
Args: | |
feed_uri: The RSS feed to get content from | |
Returns: | |
List of titles for the 10 most recent entries in the RSS feed. | |
''' | |
logger = logging.getLogger(__name__ + '.parse_feed') | |
feed = feedparser.parse(feed_uri) | |
logger.info('%s yielded %s entries', feed_uri, len(feed.entries)) | |
entries = {} | |
for i, entry in enumerate(feed.entries): | |
entry_content = {} | |
if 'title' in entry and 'link' in entry: | |
entry_content['title'] = entry.title | |
entry_content['link'] = entry.link | |
# entry_content['updated'] = None | |
# entry_content['summary'] = None | |
entry_content['content'] = None | |
# if 'updated' in entry: | |
# entry_content['updated'] = entry.updated | |
# if 'summary' in entry: | |
# summary = _get_text(entry.summary) | |
# entry_content['summary'] = summary | |
if 'content' in entry: | |
entry_content['content'] = entry.content | |
if entry_content['content'] is None: | |
html = _get_html(entry_content['link']) | |
content = _get_text(html) | |
entry_content['content'] = content | |
entries[i] = entry_content | |
if i == 2: | |
break | |
logger.info('Entries contains %s elements', len(list(entries.keys()))) | |
return entries | |
def _get_url(company_name: str) -> str: | |
'''Finds the website associated with the name of a company or | |
publication. | |
Args: | |
company_name: the name of the company, publication or site to find | |
the URL for | |
Returns: | |
The URL for the company, publication or website. | |
''' | |
logger = logging.getLogger(__name__ + '.get_url') | |
logger.info('Getting website URL for %s', company_name) | |
query = f'{company_name} official website' | |
for url in google_search(query, num_results=5): | |
if 'facebook' not in url and 'linkedin' not in url: | |
return url | |
return None | |
def _get_feed(website_url: str) -> str: | |
'''Finds the RSS feed URI for a website given the website's url. | |
Args: | |
website_url: The url for the website to find the RSS feed for | |
Returns: | |
The website's RSS feed URI as a string | |
''' | |
logger = logging.getLogger(__name__ + '.get_content') | |
logger.info('Getting feed URI for: %s', website_url) | |
feeds = feed_search(website_url) | |
if len(feeds) > 0: | |
return str(feeds[0].url) | |
else: | |
return f'No feed found for {website_url}' | |
def _get_html(url: str) -> str: | |
'''Gets HTML string content from url | |
Args: | |
url: the webpage to extract content from | |
Returns: | |
Webpage HTML source as string | |
''' | |
header={ | |
"Accept": ("text/html,application/xhtml+xml,application/xml;q=0.9,image/avif," + | |
"image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7"), | |
"Accept-Language": "en-US,en;q=0.9", | |
"Connection": "keep-alive", | |
"Sec-Fetch-Site": "cross-site", | |
"Sec-Fetch-User": "?1", | |
"Upgrade-Insecure-Requests": "1", | |
"User-Agent": ("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36" + | |
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36") | |
} | |
# Create the request with header | |
request_params = urllib.request.Request( | |
url=url, | |
headers=header | |
) | |
# Get the html string | |
try: | |
with urllib.request.urlopen(request_params) as response: | |
status_code = response.getcode() | |
if status_code == 200: | |
content = response.read() | |
encoding = response.headers.get_content_charset() | |
if encoding is None: | |
encoding = "utf-8" | |
content = content.decode(encoding) | |
else: | |
content = None | |
except HTTPError: | |
content = None | |
except URLError: | |
content = None | |
return content | |
def _get_text(html: str) -> str: | |
'''Uses boilerpy3 extractor and regex cribbed from old NLTK clean_html | |
function to try and extract text from HTML as cleanly as possible. | |
Args: | |
html: the HTML string to be cleaned | |
Returns: | |
Cleaned text string''' | |
if html is None: | |
return None | |
extractor = extractors.ArticleExtractor() | |
try: | |
html = extractor.get_content(html) | |
except HTMLExtractionError: | |
pass | |
except AttributeError: | |
pass | |
except TypeError: | |
pass | |
return _clean_html(html) | |
def _clean_html(html: str) -> str: | |
''' | |
Remove HTML markup from the given string. | |
Args: | |
html: the HTML string to be cleaned | |
Returns: | |
Cleaned string | |
''' | |
if html is None: | |
return None | |
# First we remove inline JavaScript/CSS: | |
cleaned = re.sub(r"(?is)<(script|style).*?>.*?(</\1>)", "", html.strip()) | |
# Then we remove html comments. This has to be done before removing regular | |
# tags since comments can contain '>' characters. | |
cleaned = re.sub(r"(?s)<!--(.*?)-->[\n]?", "", cleaned) | |
# Next we can remove the remaining tags: | |
cleaned = re.sub(r"(?s)<.*?>", " ", cleaned) | |
# Finally, we deal with whitespace | |
cleaned = re.sub(r" ", " ", cleaned) | |
cleaned = re.sub(r" ", " ", cleaned) | |
cleaned = re.sub(r" ", " ", cleaned) | |
return cleaned.strip() | |