File size: 7,734 Bytes
8863982 f8a041b e97f932 f8a041b e97f932 00764df f8a041b e97f932 00764df e97f932 e51eea0 e97f932 e51eea0 e97f932 f8a041b e97f932 4c58071 e97f932 4c58071 e97f932 4c58071 f8a041b e97f932 f8a041b 4c58071 f8a041b 4c58071 f8a041b e97f932 f8a041b 4c58071 f8a041b e97f932 e51eea0 e97f932 e6f6cfa e97f932 e51eea0 e97f932 8698e3d e6f6cfa e97f932 e6f6cfa e97f932 e51eea0 e97f932 e51eea0 e97f932 4c58071 e97f932 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
'''Helper functions for MCP tools.'''
import re
import logging
import urllib.request
from urllib.error import HTTPError, URLError
import feedparser
from boilerpy3 import extractors
from boilerpy3.exceptions import HTMLExtractionError
from findfeed import search as feed_search
from googlesearch import search as google_search
FEED_URIS = {}
RSS_EXTENSIONS = ['xml', 'rss', 'atom']
COMMON_EXTENSIONS = ['com', 'net', 'org', 'edu', 'gov', 'co', 'us']
def find_feed_uri(website: str) -> str:
'''Attempts to find URI for RSS feed. First checks if string provided in
website is a feed URI, it it's not, checks if website is a URL, if so,
uses that to find the RSS feed URI. If the provided string is neither,
defaults to Google search to find website URL and then uses that to try
and find the Feed.
Args:
website: target resource to find RSS feed URI for, can be website URL or
name of website
Returns:
RSS feed URI for website
'''
logger = logging.getLogger(__name__ + '.find_feed_uri')
logger.info('Finding feed URI for %s', website)
# Find the feed URI
feed_uri = None
# If the website contains xml, rss or atom, assume it's an RSS URI
if any(extension in website.lower() for extension in RSS_EXTENSIONS):
feed_uri = website
logger.info('%s looks like a feed URI already - using it directly', website)
# Next, check the cache to see if we already have this feed's URI
elif website in FEED_URIS:
feed_uri = FEED_URIS[website]
logger.info('%s feed URI in cache: %s', website, feed_uri)
# If neither of those get it - try feedparse if it looks like a url
# or else just google it
else:
if website.split('.')[-1] in COMMON_EXTENSIONS:
website_url = website
logger.info('%s looks like a website URL', website)
else:
website_url = _get_url(website)
logger.info('Google result for %s: %s', website, website_url)
feed_uri = _get_feed(website_url)
logger.info('get_feed() returned %s', feed_uri)
FEED_URIS[website] = feed_uri
return feed_uri
def parse_feed(feed_uri: str) -> list:
'''Gets content from a remote RSS feed URI.
Args:
feed_uri: The RSS feed to get content from
Returns:
List of titles for the 10 most recent entries in the RSS feed.
'''
logger = logging.getLogger(__name__ + '.parse_feed')
feed = feedparser.parse(feed_uri)
logger.info('%s yielded %s entries', feed_uri, len(feed.entries))
entries = {}
for i, entry in enumerate(feed.entries):
entry_content = {}
if 'title' in entry and 'link' in entry:
entry_content['title'] = entry.title
entry_content['link'] = entry.link
# entry_content['updated'] = None
# entry_content['summary'] = None
entry_content['content'] = None
# if 'updated' in entry:
# entry_content['updated'] = entry.updated
# if 'summary' in entry:
# summary = _get_text(entry.summary)
# entry_content['summary'] = summary
if 'content' in entry:
entry_content['content'] = entry.content
if entry_content['content'] is None:
html = _get_html(entry_content['link'])
content = _get_text(html)
entry_content['content'] = content
entries[i] = entry_content
if i == 2:
break
logger.info('Entries contains %s elements', len(list(entries.keys())))
return entries
def _get_url(company_name: str) -> str:
'''Finds the website associated with the name of a company or
publication.
Args:
company_name: the name of the company, publication or site to find
the URL for
Returns:
The URL for the company, publication or website.
'''
logger = logging.getLogger(__name__ + '.get_url')
logger.info('Getting website URL for %s', company_name)
query = f'{company_name} official website'
for url in google_search(query, num_results=5):
if 'facebook' not in url and 'linkedin' not in url:
return url
return None
def _get_feed(website_url: str) -> str:
'''Finds the RSS feed URI for a website given the website's url.
Args:
website_url: The url for the website to find the RSS feed for
Returns:
The website's RSS feed URI as a string
'''
logger = logging.getLogger(__name__ + '.get_content')
logger.info('Getting feed URI for: %s', website_url)
feeds = feed_search(website_url)
if len(feeds) > 0:
return str(feeds[0].url)
else:
return f'No feed found for {website_url}'
def _get_html(url: str) -> str:
'''Gets HTML string content from url
Args:
url: the webpage to extract content from
Returns:
Webpage HTML source as string
'''
header={
"Accept": ("text/html,application/xhtml+xml,application/xml;q=0.9,image/avif," +
"image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7"),
"Accept-Language": "en-US,en;q=0.9",
"Connection": "keep-alive",
"Sec-Fetch-Site": "cross-site",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": ("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36" +
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36")
}
# Create the request with header
request_params = urllib.request.Request(
url=url,
headers=header
)
# Get the html string
try:
with urllib.request.urlopen(request_params) as response:
status_code = response.getcode()
if status_code == 200:
content = response.read()
encoding = response.headers.get_content_charset()
if encoding is None:
encoding = "utf-8"
content = content.decode(encoding)
else:
content = None
except HTTPError:
content = None
except URLError:
content = None
return content
def _get_text(html: str) -> str:
'''Uses boilerpy3 extractor and regex cribbed from old NLTK clean_html
function to try and extract text from HTML as cleanly as possible.
Args:
html: the HTML string to be cleaned
Returns:
Cleaned text string'''
if html is None:
return None
extractor = extractors.ArticleExtractor()
try:
html = extractor.get_content(html)
except HTMLExtractionError:
pass
except AttributeError:
pass
except TypeError:
pass
return _clean_html(html)
def _clean_html(html: str) -> str:
'''
Remove HTML markup from the given string.
Args:
html: the HTML string to be cleaned
Returns:
Cleaned string
'''
if html is None:
return None
# First we remove inline JavaScript/CSS:
cleaned = re.sub(r"(?is)<(script|style).*?>.*?(</\1>)", "", html.strip())
# Then we remove html comments. This has to be done before removing regular
# tags since comments can contain '>' characters.
cleaned = re.sub(r"(?s)<!--(.*?)-->[\n]?", "", cleaned)
# Next we can remove the remaining tags:
cleaned = re.sub(r"(?s)<.*?>", " ", cleaned)
# Finally, we deal with whitespace
cleaned = re.sub(r" ", " ", cleaned)
cleaned = re.sub(r" ", " ", cleaned)
cleaned = re.sub(r" ", " ", cleaned)
return cleaned.strip()
|