'''Helper functions for MCP tools.''' import os import re import json import logging import urllib.request from urllib.error import HTTPError, URLError import feedparser from boilerpy3 import extractors from boilerpy3.exceptions import HTMLExtractionError from findfeed import search as feed_search from googlesearch import search as google_search from upstash_redis import Redis FEED_URIS = {} RSS_EXTENSIONS = ['xml', 'rss', 'atom'] COMMON_EXTENSIONS = ['com', 'net', 'org', 'edu', 'gov', 'co', 'us'] REDIS = Redis( url='https://sensible-midge-19304.upstash.io', token=os.environ['UPSTASH_REDIS_KEY'] ) def find_feed_uri(website: str) -> str: '''Attempts to find URI for RSS feed. First checks if string provided in website is a feed URI, it it's not, checks if website is a URL, if so, uses that to find the RSS feed URI. If the provided string is neither, defaults to Google search to find website URL and then uses that to try and find the Feed. Args: website: target resource to find RSS feed URI for, can be website URL or name of website Returns: RSS feed URI for website ''' logger = logging.getLogger(__name__ + '.find_feed_uri') logger.info('Finding feed URI for %s', website) # Find the feed URI feed_uri = None # If the website contains xml, rss or atom, assume it's an RSS URI if any(extension in website.lower() for extension in RSS_EXTENSIONS): feed_uri = website logger.info('%s looks like a feed URI already - using it directly', website) # Next, check the cache to see if we already have this feed's URI locally elif website in FEED_URIS: feed_uri = FEED_URIS[website] logger.info('%s feed URI in local cache: %s', website, feed_uri) # Then, check to see if the URI is in the Redis cache cache_key = f"{website.lower().replace(' ', '_')}-feed-uri" cache_hit = False if feed_uri is None: cached_uri = REDIS.get(cache_key) if cached_uri: cache_hit = True feed_uri = cached_uri logger.info('%s feed URI in Redis cache: %s', website, feed_uri) # If none of those get it - try feedparse if it looks like a url # or else just google it if feed_uri is None: if website.split('.')[-1] in COMMON_EXTENSIONS: website_url = website logger.info('%s looks like a website URL', website) else: website_url = _get_url(website) logger.info('Google result for %s: %s', website, website_url) feed_uri = _get_feed(website_url) logger.info('get_feed() returned %s', feed_uri) FEED_URIS[website] = feed_uri # Add the feed URI to the redis cache if it wasn't already there if cache_hit is False: REDIS.set(cache_key, feed_uri) return feed_uri def parse_feed(feed_uri: str) -> list: '''Gets content from a remote RSS feed URI. Args: feed_uri: The RSS feed to get content from Returns: List of titles for the 10 most recent entries in the RSS feed. ''' logger = logging.getLogger(__name__ + '.parse_feed') feed = feedparser.parse(feed_uri) logger.info('%s yielded %s entries', feed_uri, len(feed.entries)) entries = {} for i, entry in enumerate(feed.entries): entry_content = {} if 'title' in entry and 'link' in entry: title = entry.title # Check the Redis cache for this entry cache_key = title.lower().replace(' ', '_') cache_hit = False cached_entry = REDIS.get(cache_key) if cached_entry: cache_hit = True entry_content = json.loads(cached_entry) logger.info('Entry in Redis cache: "%s"', title) # If its not in the Redis cache, parse it from the feed data else: entry_content['title'] = entry.title entry_content['link'] = entry.link entry_content['content'] = None if 'content' in entry: entry_content['content'] = entry.content if entry_content['content'] is None: html = _get_html(entry_content['link']) content = _get_text(html) entry_content['content'] = content logger.info('Parsed entry: "%s"', title) # Add it to the Redis cache if it wasn't there if cache_hit is False: REDIS.set(cache_key, entry_content) entries[i] = entry_content if i == 2: break logger.info('Entries contains %s elements', len(list(entries.keys()))) return entries def _get_url(company_name: str) -> str: '''Finds the website associated with the name of a company or publication. Args: company_name: the name of the company, publication or site to find the URL for Returns: The URL for the company, publication or website. ''' logger = logging.getLogger(__name__ + '.get_url') logger.info('Getting website URL for %s', company_name) query = f'{company_name} official website' for url in google_search(query, num_results=5): if 'facebook' not in url and 'linkedin' not in url: return url return None def _get_feed(website_url: str) -> str: '''Finds the RSS feed URI for a website given the website's url. Args: website_url: The url for the website to find the RSS feed for Returns: The website's RSS feed URI as a string ''' logger = logging.getLogger(__name__ + '.get_content') logger.info('Getting feed URI for: %s', website_url) feeds = feed_search(website_url) if len(feeds) > 0: return str(feeds[0].url) else: return f'No feed found for {website_url}' def _get_html(url: str) -> str: '''Gets HTML string content from url Args: url: the webpage to extract content from Returns: Webpage HTML source as string ''' header={ "Accept": ("text/html,application/xhtml+xml,application/xml;q=0.9,image/avif," + "image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7"), "Accept-Language": "en-US,en;q=0.9", "Connection": "keep-alive", "Sec-Fetch-Site": "cross-site", "Sec-Fetch-User": "?1", "Upgrade-Insecure-Requests": "1", "User-Agent": ("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36" + "(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36") } # Create the request with header request_params = urllib.request.Request( url=url, headers=header ) # Get the html string try: with urllib.request.urlopen(request_params) as response: status_code = response.getcode() if status_code == 200: content = response.read() encoding = response.headers.get_content_charset() if encoding is None: encoding = "utf-8" content = content.decode(encoding) else: content = None except HTTPError: content = None except URLError: content = None return content def _get_text(html: str) -> str: '''Uses boilerpy3 extractor and regex cribbed from old NLTK clean_html function to try and extract text from HTML as cleanly as possible. Args: html: the HTML string to be cleaned Returns: Cleaned text string''' if html is None: return None extractor = extractors.ArticleExtractor() try: html = extractor.get_content(html) except HTMLExtractionError: pass except AttributeError: pass except TypeError: pass return _clean_html(html) def _clean_html(html: str) -> str: ''' Remove HTML markup from the given string. Args: html: the HTML string to be cleaned Returns: Cleaned string ''' if html is None: return None # First we remove inline JavaScript/CSS: cleaned = re.sub(r"(?is)<(script|style).*?>.*?()", "", html.strip()) # Then we remove html comments. This has to be done before removing regular # tags since comments can contain '>' characters. cleaned = re.sub(r"(?s)[\n]?", "", cleaned) # Next we can remove the remaining tags: cleaned = re.sub(r"(?s)<.*?>", " ", cleaned) # Finally, we deal with whitespace cleaned = re.sub(r" ", " ", cleaned) cleaned = re.sub(r" ", " ", cleaned) cleaned = re.sub(r" ", " ", cleaned) return cleaned.strip()