Added some fancy fallback options to handle requests for feeds that are not urls, also added caching so we don't have to look up the same feed URI more than once in a session.
Browse files- functions/helper_functions.py +32 -2
- functions/tools.py +38 -6
- rss_server.py +2 -2
functions/helper_functions.py
CHANGED
@@ -1,8 +1,33 @@
|
|
1 |
'''Helper functions for MCP tools.'''
|
2 |
|
3 |
import logging
|
|
|
|
|
4 |
import feedparser
|
5 |
-
from findfeed import search
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
6 |
|
7 |
|
8 |
def get_feed(website_url: str) -> str:
|
@@ -16,8 +41,13 @@ def get_feed(website_url: str) -> str:
|
|
16 |
'''
|
17 |
|
18 |
logger = logging.getLogger(__name__ + '.get_content')
|
|
|
|
|
|
|
19 |
|
20 |
-
|
|
|
|
|
21 |
|
22 |
if len(feeds) > 0:
|
23 |
return str(feeds[0].url)
|
|
|
1 |
'''Helper functions for MCP tools.'''
|
2 |
|
3 |
import logging
|
4 |
+
from types import GeneratorType
|
5 |
+
|
6 |
import feedparser
|
7 |
+
from findfeed import search as feed_search
|
8 |
+
from googlesearch import search as google_search
|
9 |
+
|
10 |
+
def get_url(company_name: str) -> str:
|
11 |
+
'''Finds the website associated with the name of a company or
|
12 |
+
publication.
|
13 |
+
|
14 |
+
Args:
|
15 |
+
company_name: the name of the company, publication or site to find
|
16 |
+
the URL for
|
17 |
+
|
18 |
+
Returns:
|
19 |
+
The URL for the company, publication or website.
|
20 |
+
'''
|
21 |
+
|
22 |
+
logger = logging.getLogger(__name__ + '.get_url')
|
23 |
+
|
24 |
+
query = f'{company_name} official website'
|
25 |
+
|
26 |
+
for url in google_search(query, num_results=5):
|
27 |
+
if 'facebook' not in url and 'linkedin' not in url:
|
28 |
+
return url
|
29 |
+
|
30 |
+
return None
|
31 |
|
32 |
|
33 |
def get_feed(website_url: str) -> str:
|
|
|
41 |
'''
|
42 |
|
43 |
logger = logging.getLogger(__name__ + '.get_content')
|
44 |
+
logger.info('Getting feed URI for: %s', website_url)
|
45 |
+
|
46 |
+
feeds = feed_search(website_url)
|
47 |
|
48 |
+
logger.info('Feeds search result is: %s', type(feeds))
|
49 |
+
logger.info('Feeds search results: %s', len(feeds))
|
50 |
+
logger.info('Feeds results: %s', list(feeds))
|
51 |
|
52 |
if len(feeds) > 0:
|
53 |
return str(feeds[0].url)
|
functions/tools.py
CHANGED
@@ -1,24 +1,56 @@
|
|
1 |
'''Tool functions for MCP server'''
|
2 |
|
3 |
import logging
|
|
|
|
|
4 |
import functions.helper_functions as helper_funcs
|
5 |
|
|
|
|
|
6 |
|
7 |
-
|
|
|
8 |
'''Gets RSS feed content from a given website.
|
9 |
|
10 |
Args:
|
11 |
-
website_url: URL of website to extract RSS feed content from
|
12 |
|
13 |
Returns:
|
14 |
-
List of titles for the 10 most recent entries in the RSS feed
|
|
|
15 |
'''
|
16 |
|
17 |
logger = logging.getLogger(__name__ + '.get_content')
|
18 |
-
logger.info('Getting feed content for: %s',
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
19 |
|
20 |
-
|
21 |
-
logger.info('get_feed() returned %s', feed_uri)
|
22 |
|
23 |
content = helper_funcs.parse_feed(feed_uri)
|
24 |
logger.info('parse_feed() returned %s', content)
|
|
|
1 |
'''Tool functions for MCP server'''
|
2 |
|
3 |
import logging
|
4 |
+
from urllib.parse import urlparse
|
5 |
+
import validators
|
6 |
import functions.helper_functions as helper_funcs
|
7 |
|
8 |
+
FEED_URIS = {}
|
9 |
+
RSS_EXTENSIONS = ['xml', 'rss', 'atom']
|
10 |
|
11 |
+
|
12 |
+
def get_content(website: str) -> list:
|
13 |
'''Gets RSS feed content from a given website.
|
14 |
|
15 |
Args:
|
16 |
+
website_url: URL or nam of website to extract RSS feed content from
|
17 |
|
18 |
Returns:
|
19 |
+
List of titles for the 10 most recent entries in the RSS feed from the
|
20 |
+
requested website.
|
21 |
'''
|
22 |
|
23 |
logger = logging.getLogger(__name__ + '.get_content')
|
24 |
+
logger.info('Getting feed content for: %s', website)
|
25 |
+
|
26 |
+
# Find the feed URI
|
27 |
+
feed_uri = None
|
28 |
+
|
29 |
+
# If the website contains xml, rss or atom, assume it's an RSS URI
|
30 |
+
if any(extension in website.lower() for extension in RSS_EXTENSIONS):
|
31 |
+
feed_uri = website
|
32 |
+
logger.info('%s looks like a feed URI already - using it directly', website)
|
33 |
+
|
34 |
+
# Next, check the cache to see if we alreay have this feed's URI
|
35 |
+
elif website in FEED_URIS.keys():
|
36 |
+
feed_uri = FEED_URIS[website]
|
37 |
+
logger.info('%s feed URI in cache: %s', website, feed_uri)
|
38 |
+
|
39 |
+
# If neither of those get it - try feedparse if it looks like a url
|
40 |
+
# or else just google it
|
41 |
+
else:
|
42 |
+
if validators.url(website):
|
43 |
+
website_url = website
|
44 |
+
logger.info('%s looks like a website URL', website)
|
45 |
+
|
46 |
+
else:
|
47 |
+
website_url = helper_funcs.get_url(website)
|
48 |
+
logger.info('Google result for %s: %s', website, website_url)
|
49 |
+
|
50 |
+
feed_uri = helper_funcs.get_feed(website_url)
|
51 |
+
logger.info('get_feed() returned %s', feed_uri)
|
52 |
|
53 |
+
FEED_URIS[website] = feed_uri
|
|
|
54 |
|
55 |
content = helper_funcs.parse_feed(feed_uri)
|
56 |
logger.info('parse_feed() returned %s', content)
|
rss_server.py
CHANGED
@@ -21,7 +21,7 @@ logging.basicConfig(
|
|
21 |
backupCount=10,
|
22 |
mode='w'
|
23 |
)],
|
24 |
-
level=logging.
|
25 |
format='%(levelname)s - %(name)s - %(message)s'
|
26 |
)
|
27 |
|
@@ -34,7 +34,7 @@ with gr.Blocks() as demo:
|
|
34 |
gr.HTML(html.TITLE)
|
35 |
|
36 |
gr.Markdown(html.DESCRIPTION)
|
37 |
-
website_url = gr.Textbox('hackernews.com', label='Website
|
38 |
output = gr.Textbox(label='RSS entry titles', lines=10)
|
39 |
submit_button = gr.Button('Submit')
|
40 |
|
|
|
21 |
backupCount=10,
|
22 |
mode='w'
|
23 |
)],
|
24 |
+
level=logging.INFO,
|
25 |
format='%(levelname)s - %(name)s - %(message)s'
|
26 |
)
|
27 |
|
|
|
34 |
gr.HTML(html.TITLE)
|
35 |
|
36 |
gr.Markdown(html.DESCRIPTION)
|
37 |
+
website_url = gr.Textbox('hackernews.com', label='Website')
|
38 |
output = gr.Textbox(label='RSS entry titles', lines=10)
|
39 |
submit_button = gr.Button('Submit')
|
40 |
|