Spaces:
Runtime error
Runtime error
'''Helper functions for MCP tools.''' | |
import os | |
import re | |
import logging | |
import urllib.request | |
from urllib.error import HTTPError, URLError | |
import feedparser | |
from boilerpy3 import extractors | |
from boilerpy3.exceptions import HTMLExtractionError | |
from findfeed import search as feed_search | |
from googlesearch import search as google_search | |
from upstash_redis import Redis | |
FEED_URIS = {} | |
RSS_EXTENSIONS = ['xml', 'rss', 'atom'] | |
COMMON_EXTENSIONS = ['com', 'net', 'org', 'edu', 'gov', 'co', 'us'] | |
REDIS = Redis( | |
url='https://sensible-midge-19304.upstash.io', | |
token=os.environ['UPSTASH_REDIS_KEY'] | |
) | |
def find_feed_uri(website: str) -> str: | |
'''Attempts to find URI for RSS feed. First checks if string provided in | |
website is a feed URI, it it's not, checks if website is a URL, if so, | |
uses that to find the RSS feed URI. If the provided string is neither, | |
defaults to Google search to find website URL and then uses that to try | |
and find the Feed. | |
Args: | |
website: target resource to find RSS feed URI for, can be website URL or | |
name of website | |
Returns: | |
RSS feed URI for website | |
''' | |
logger = logging.getLogger(__name__ + '.find_feed_uri') | |
logger.info('Finding feed URI for %s', website) | |
# Find the feed URI | |
feed_uri = None | |
# If the website contains xml, rss or atom, assume it's an RSS URI | |
if any(extension in website.lower() for extension in RSS_EXTENSIONS): | |
feed_uri = website | |
logger.info('%s looks like a feed URI already - using it directly', website) | |
# Next, check the cache to see if we already have this feed's URI locally | |
elif website in FEED_URIS: | |
feed_uri = FEED_URIS[website] | |
logger.info('%s feed URI in local cache: %s', website, feed_uri) | |
# If we still haven't found it, check to see if the URI is in the Redis cache | |
cache_key = f'{website} feed uri' | |
cache_hit = False | |
if feed_uri is None: | |
cached_uri = REDIS.get(cache_key) | |
if cached_uri: | |
cache_hit = True | |
feed_uri = cached_uri | |
logger.info('%s feed URI in Redis cache: %s', website, feed_uri) | |
# If still none of those methods get it - try feedparse if it looks like a url | |
# or else just google it | |
if feed_uri is None: | |
if website.split('.')[-1] in COMMON_EXTENSIONS: | |
website_url = website | |
logger.info('%s looks like a website URL', website) | |
else: | |
website_url = _get_url(website) | |
logger.info('Google result for %s: %s', website, website_url) | |
feed_uri = _get_feed(website_url) | |
logger.info('get_feed() returned %s', feed_uri) | |
# Add to local cache | |
FEED_URIS[website] = feed_uri | |
# Add the feed URI to the redis cache if it wasn't already there | |
if cache_hit is False: | |
REDIS.set(cache_key, feed_uri) | |
return feed_uri | |
def parse_feed(feed_uri: str, n: int) -> list: | |
'''Gets content from a remote RSS feed URI. | |
Args: | |
feed_uri: The RSS feed to get content from | |
n: the number of feed entries to parse | |
Returns: | |
List of dictionaries for the n most recent entries in the RSS feed. | |
Each dictionary contains 'title', 'link' and 'content' keys. | |
''' | |
logger = logging.getLogger(__name__ + '.parse_feed') | |
feed = feedparser.parse(feed_uri) | |
logger.info('%s yielded %s entries', feed_uri, len(feed.entries)) | |
entries = {} | |
for i, entry in enumerate(feed.entries): | |
entry_content = {} | |
if 'title' in entry and 'link' in entry: | |
title = entry.title | |
entry_content['title'] = title | |
# Check the Redis cache | |
cached_link = REDIS.get(f'{title} link') | |
if cached_link: | |
logger.info('Entry in Redis cache: "%s"', title) | |
entry_content['link'] = cached_link | |
entry_content['content'] = REDIS.get(f'{title} content') | |
# If its not in the Redis cache, parse it from the feed data | |
else: | |
entry_content['title'] = entry.title | |
entry_content['link'] = entry.link | |
entry_content['content'] = None | |
# Grab the article content from the feed, if provided | |
if 'content' in entry: | |
entry_content['content'] = entry.content | |
# If not, try to get the article content from the link | |
elif entry_content['content'] is None: | |
html = _get_html(entry_content['link']) | |
content = _get_text(html) | |
entry_content['content'] = content | |
# Add everything to the cache | |
REDIS.set(f'{title} link', entry_content['link']) | |
REDIS.set(f'{title} content', entry_content['content']) | |
logger.info('Parsed entry: "%s"', title) | |
entries[i] = entry_content | |
if i == n-1: | |
break | |
logger.info('Entries contains %s elements', len(list(entries.keys()))) | |
return entries | |
def _get_url(company_name: str) -> str: | |
'''Finds the website associated with the name of a company or | |
publication. | |
Args: | |
company_name: the name of the company, publication or site to find | |
the URL for | |
Returns: | |
The URL for the company, publication or website. | |
''' | |
logger = logging.getLogger(__name__ + '.get_url') | |
logger.info('Getting website URL for %s', company_name) | |
query = f'{company_name} official website' | |
for url in google_search(query, num_results=5): | |
if 'facebook' not in url and 'linkedin' not in url: | |
return url | |
return None | |
def _get_feed(website_url: str) -> str: | |
'''Finds the RSS feed URI for a website given the website's url. | |
Args: | |
website_url: The url for the website to find the RSS feed for | |
Returns: | |
The website's RSS feed URI as a string | |
''' | |
logger = logging.getLogger(__name__ + '.get_content') | |
logger.info('Getting feed URI for: %s', website_url) | |
feeds = feed_search(website_url) | |
if len(feeds) > 0: | |
return str(feeds[0].url) | |
else: | |
return f'No feed found for {website_url}' | |
def _get_html(url: str) -> str: | |
'''Gets HTML string content from url | |
Args: | |
url: the webpage to extract content from | |
Returns: | |
Webpage HTML source as string | |
''' | |
header={ | |
"Accept": ("text/html,application/xhtml+xml,application/xml;q=0.9,image/avif," + | |
"image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7"), | |
"Accept-Language": "en-US,en;q=0.9", | |
"Connection": "keep-alive", | |
"Sec-Fetch-Site": "cross-site", | |
"Sec-Fetch-User": "?1", | |
"Upgrade-Insecure-Requests": "1", | |
"User-Agent": ("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36" + | |
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36") | |
} | |
# Create the request with header | |
request_params = urllib.request.Request( | |
url=url, | |
headers=header | |
) | |
# Get the html string | |
try: | |
with urllib.request.urlopen(request_params) as response: | |
status_code = response.getcode() | |
if status_code == 200: | |
content = response.read() | |
encoding = response.headers.get_content_charset() | |
if encoding is None: | |
encoding = "utf-8" | |
content = content.decode(encoding) | |
else: | |
content = None | |
except HTTPError: | |
content = None | |
except URLError: | |
content = None | |
return content | |
def _get_text(html: str) -> str: | |
'''Uses boilerpy3 extractor and regex cribbed from old NLTK clean_html | |
function to try and extract text from HTML as cleanly as possible. | |
Args: | |
html: the HTML string to be cleaned | |
Returns: | |
Cleaned text string''' | |
if html is None: | |
return None | |
extractor = extractors.ArticleExtractor() | |
try: | |
html = extractor.get_content(html) | |
except HTMLExtractionError: | |
pass | |
except AttributeError: | |
pass | |
except TypeError: | |
pass | |
return _clean_html(html) | |
def _clean_html(html: str) -> str: | |
''' | |
Remove HTML markup from the given string. | |
Args: | |
html: the HTML string to be cleaned | |
Returns: | |
Cleaned string | |
''' | |
if html is None: | |
return None | |
# First we remove inline JavaScript/CSS: | |
cleaned = re.sub(r"(?is)<(script|style).*?>.*?(</\1>)", "", html.strip()) | |
# Then we remove html comments. This has to be done before removing regular | |
# tags since comments can contain '>' characters. | |
cleaned = re.sub(r"(?s)<!--(.*?)-->[\n]?", "", cleaned) | |
# Next we can remove the remaining tags: | |
cleaned = re.sub(r"(?s)<.*?>", " ", cleaned) | |
# Finally, we deal with whitespace | |
cleaned = re.sub(r" ", " ", cleaned) | |
cleaned = re.sub(r" ", " ", cleaned) | |
cleaned = re.sub(r" ", " ", cleaned) | |
return cleaned.strip() | |