gperdrizet commited on
Commit
8849868
·
unverified ·
2 Parent(s): 40b2a94 00764df

Merge pull request #6 from gperdrizet/dev

Browse files
functions/helper_functions.py CHANGED
@@ -1,8 +1,33 @@
1
  '''Helper functions for MCP tools.'''
2
 
3
  import logging
 
 
4
  import feedparser
5
- from findfeed import search
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
 
7
 
8
  def get_feed(website_url: str) -> str:
@@ -16,8 +41,13 @@ def get_feed(website_url: str) -> str:
16
  '''
17
 
18
  logger = logging.getLogger(__name__ + '.get_content')
 
 
 
19
 
20
- feeds = search(website_url)
 
 
21
 
22
  if len(feeds) > 0:
23
  return str(feeds[0].url)
 
1
  '''Helper functions for MCP tools.'''
2
 
3
  import logging
4
+ from types import GeneratorType
5
+
6
  import feedparser
7
+ from findfeed import search as feed_search
8
+ from googlesearch import search as google_search
9
+
10
+ def get_url(company_name: str) -> str:
11
+ '''Finds the website associated with the name of a company or
12
+ publication.
13
+
14
+ Args:
15
+ company_name: the name of the company, publication or site to find
16
+ the URL for
17
+
18
+ Returns:
19
+ The URL for the company, publication or website.
20
+ '''
21
+
22
+ logger = logging.getLogger(__name__ + '.get_url')
23
+
24
+ query = f'{company_name} official website'
25
+
26
+ for url in google_search(query, num_results=5):
27
+ if 'facebook' not in url and 'linkedin' not in url:
28
+ return url
29
+
30
+ return None
31
 
32
 
33
  def get_feed(website_url: str) -> str:
 
41
  '''
42
 
43
  logger = logging.getLogger(__name__ + '.get_content')
44
+ logger.info('Getting feed URI for: %s', website_url)
45
+
46
+ feeds = feed_search(website_url)
47
 
48
+ logger.info('Feeds search result is: %s', type(feeds))
49
+ logger.info('Feeds search results: %s', len(feeds))
50
+ logger.info('Feeds results: %s', list(feeds))
51
 
52
  if len(feeds) > 0:
53
  return str(feeds[0].url)
functions/tools.py CHANGED
@@ -1,24 +1,56 @@
1
  '''Tool functions for MCP server'''
2
 
3
  import logging
 
 
4
  import functions.helper_functions as helper_funcs
5
 
 
 
6
 
7
- def get_content(website_url: str) -> list:
 
8
  '''Gets RSS feed content from a given website.
9
 
10
  Args:
11
- website_url: URL of website to extract RSS feed content from
12
 
13
  Returns:
14
- List of titles for the 10 most recent entries in the RSS feed.
 
15
  '''
16
 
17
  logger = logging.getLogger(__name__ + '.get_content')
18
- logger.info('Getting feed content for: %s', website_url)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
 
20
- feed_uri = helper_funcs.get_feed(website_url)
21
- logger.info('get_feed() returned %s', feed_uri)
22
 
23
  content = helper_funcs.parse_feed(feed_uri)
24
  logger.info('parse_feed() returned %s', content)
 
1
  '''Tool functions for MCP server'''
2
 
3
  import logging
4
+ from urllib.parse import urlparse
5
+ import validators
6
  import functions.helper_functions as helper_funcs
7
 
8
+ FEED_URIS = {}
9
+ RSS_EXTENSIONS = ['xml', 'rss', 'atom']
10
 
11
+
12
+ def get_content(website: str) -> list:
13
  '''Gets RSS feed content from a given website.
14
 
15
  Args:
16
+ website_url: URL or nam of website to extract RSS feed content from
17
 
18
  Returns:
19
+ List of titles for the 10 most recent entries in the RSS feed from the
20
+ requested website.
21
  '''
22
 
23
  logger = logging.getLogger(__name__ + '.get_content')
24
+ logger.info('Getting feed content for: %s', website)
25
+
26
+ # Find the feed URI
27
+ feed_uri = None
28
+
29
+ # If the website contains xml, rss or atom, assume it's an RSS URI
30
+ if any(extension in website.lower() for extension in RSS_EXTENSIONS):
31
+ feed_uri = website
32
+ logger.info('%s looks like a feed URI already - using it directly', website)
33
+
34
+ # Next, check the cache to see if we alreay have this feed's URI
35
+ elif website in FEED_URIS.keys():
36
+ feed_uri = FEED_URIS[website]
37
+ logger.info('%s feed URI in cache: %s', website, feed_uri)
38
+
39
+ # If neither of those get it - try feedparse if it looks like a url
40
+ # or else just google it
41
+ else:
42
+ if validators.url(website):
43
+ website_url = website
44
+ logger.info('%s looks like a website URL', website)
45
+
46
+ else:
47
+ website_url = helper_funcs.get_url(website)
48
+ logger.info('Google result for %s: %s', website, website_url)
49
+
50
+ feed_uri = helper_funcs.get_feed(website_url)
51
+ logger.info('get_feed() returned %s', feed_uri)
52
 
53
+ FEED_URIS[website] = feed_uri
 
54
 
55
  content = helper_funcs.parse_feed(feed_uri)
56
  logger.info('parse_feed() returned %s', content)
requirements.txt CHANGED
@@ -1,4 +1,5 @@
1
  feedparser
2
  findfeed
3
  gradio
4
- mcp
 
 
1
  feedparser
2
  findfeed
3
  gradio
4
+ mcp
5
+ validators
rss_server.py CHANGED
@@ -21,7 +21,7 @@ logging.basicConfig(
21
  backupCount=10,
22
  mode='w'
23
  )],
24
- level=logging.DEBUG,
25
  format='%(levelname)s - %(name)s - %(message)s'
26
  )
27
 
@@ -34,7 +34,7 @@ with gr.Blocks() as demo:
34
  gr.HTML(html.TITLE)
35
 
36
  gr.Markdown(html.DESCRIPTION)
37
- website_url = gr.Textbox('hackernews.com', label='Website URL')
38
  output = gr.Textbox(label='RSS entry titles', lines=10)
39
  submit_button = gr.Button('Submit')
40
 
 
21
  backupCount=10,
22
  mode='w'
23
  )],
24
+ level=logging.INFO,
25
  format='%(levelname)s - %(name)s - %(message)s'
26
  )
27
 
 
34
  gr.HTML(html.TITLE)
35
 
36
  gr.Markdown(html.DESCRIPTION)
37
+ website_url = gr.Textbox('hackernews.com', label='Website')
38
  output = gr.Textbox(label='RSS entry titles', lines=10)
39
  submit_button = gr.Button('Submit')
40