Spaces:
Sleeping
Sleeping
import os | |
import random | |
import requests | |
import tempfile | |
import re | |
from typing import Dict | |
from pathlib import Path | |
#from markitdown import MarkItDown | |
from urllib.parse import urlparse | |
from langchain_core.tools import tool | |
from langchain_core.messages import ToolMessage | |
from langchain_tavily import TavilySearch | |
from langchain_community.utilities import GoogleSerperAPIWrapper | |
from langchain_community.document_loaders import WikipediaLoader | |
from langchain_community.document_loaders import ArxivLoader | |
def web_search(query: str) -> ToolMessage: | |
"""Search in the web with Tavily for a query and return maximum 5 results. | |
Args: | |
query: The search query. | |
Returns: | |
Tavily output, and snippet for the top 5 results | |
""" | |
return TavilySearch(max_results=5, include_images=False).invoke({"query": query}) | |
def search_tool(query: str) -> str: | |
"""Search in Google and returns an string with title, link, and snippet for the top 5 results. | |
Args: | |
query: str | |
Returns: | |
Title, link, and snippet for the top 5 results | |
""" | |
searcher = GoogleSerperAPIWrapper(k=5) | |
retries = 3 | |
result = "" | |
while retries > 0: | |
try: | |
search_results = searcher.results(query)["organic"] | |
for row in search_results: | |
result += f"Title: {row['title']}\nSnippet: {row['snippet']}\nURL: {row['link']}\n\n" | |
return result | |
except Exception as e: | |
retries -= 1 | |
return f"There was an error with Google search: {e}" | |
def wikipedia_search(query: str) -> Dict[str, list]: | |
"""Search Wikipedia for a given query and return the first 10 results. | |
Args: | |
query: The search term or topic. | |
Returns: | |
A dictionary containing the formatted Wikipedia results. | |
""" | |
search_docs = WikipediaLoader(query=query, load_max_docs=10).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
] | |
) | |
return {"wiki_results": formatted_search_docs} | |
#Mathematical tools | |
def multiply(a: float, b: float) -> float: | |
"""Multiply two numbers. | |
Args: | |
a: first number | |
b: second number | |
Returns: | |
Multiplication result | |
""" | |
return a * b | |
def add(a: float, b: float) -> float: | |
"""Add two numbers. | |
Args: | |
a: first number | |
b: second number | |
Returns: | |
Addition result | |
""" | |
return a + b | |
def subtract(a: float, b: float) -> float: | |
"""Subtract two numbers. | |
Args: | |
a: first number | |
b: second number | |
Returns: | |
Subtraction result | |
""" | |
return a - b | |
def divide(a: float, b: float) -> float: | |
"""Divide two numbers. | |
Args: | |
a: first number | |
b: second number | |
Returns: | |
Division result | |
""" | |
if b == 0: | |
raise ValueError("Cannot divide by zero.") | |
return a / b | |
def modulus(a: int, b: int) -> int: | |
"""Get the modulus of two numbers. | |
Args: | |
a: first number | |
b: second number | |
Returns: | |
Modulus result | |
""" | |
return a % b | |
from langchain_core.tools import tool | |
def convert_units(value: float, from_unit: str, to_unit: str) -> float: | |
""" | |
Converts a value from one unit to another. | |
Args: | |
value: The numerical value to convert. | |
from_unit: The original unit (e.g. 'miles', 'kg', 'celsius'). | |
to_unit: The target unit (e.g. 'kilometers', 'lb', 'fahrenheit'). | |
Supported conversions: | |
- miles <-> kilometers | |
- kilograms <-> pounds | |
- celsius <-> fahrenheit | |
Returns: | |
The converted value result. | |
""" | |
conversions = { | |
("miles", "kilometers"): lambda v: v * 1.60934, | |
("kilometers", "miles"): lambda v: v / 1.60934, | |
("kilograms", "pounds"): lambda v: v * 2.20462, | |
("pounds", "kilograms"): lambda v: v / 2.20462, | |
("celsius", "fahrenheit"): lambda v: (v * 9/5) + 32, | |
("fahrenheit", "celsius"): lambda v: (v - 32) * 5/9, | |
} | |
key = (from_unit.lower(), to_unit.lower()) | |
if key not in conversions: | |
raise ValueError(f"Conversion from {from_unit} to {to_unit} not supported.") | |
return conversions[key](value) | |
def query_table_data(file_path: str, query: str, sheet_name: str = None) -> str: | |
""" | |
Loads a table from CSV or Excel and filters it using a pandas query. | |
Args: | |
file_path: Path to the table file (.xlsx, .xls). | |
query_pandas_syntax: A pandas-compatible query string, e.g., "Age > 30 and Country == 'USA'". | |
sheet_name: Optional sheet name if the file is Excel. | |
Returns: | |
A string representation (markdown) of the filtered table (max 10 rows). | |
""" | |
try: | |
import pandas as pd | |
path = Path(file_path) | |
if not path.exists(): | |
raise FileNotFoundError(f"File not found: {file_path}") | |
ext = path.suffix.lower() | |
if ext == ".csv": | |
df = pd.read_csv(path) | |
elif ext in [".xlsx", ".xls"]: | |
df = pd.read_excel(path, sheet_name=sheet_name) | |
else: | |
raise ValueError(f"Unsupported file extension: {ext}") | |
try: | |
#Converts a natural language query to pandas query syntax using basic heuristics. | |
# Preprocess query | |
query_l = query.lower().strip() | |
# Heuristic rules | |
rules = [ | |
(r"(\w+) greater than (\d+)", r"\1 > \2"), | |
(r"(\w+) less than (\d+)", r"\1 < \2"), | |
(r"(\w+) equal to ['\"]?([\w\s]+)['\"]?", r"\1 == '\2'"), | |
(r"(\w+) not equal to ['\"]?([\w\s]+)['\"]?", r"\1 != '\2'"), | |
(r"(\w+) more than (\d+)", r"\1 > \2"), | |
(r"(\w+) less than or equal to (\d+)", r"\1 <= \2"), | |
(r"(\w+) greater than or equal to (\d+)", r"\1 >= \2"), | |
(r"(\w+) is ['\"]?([\w\s]+)['\"]?", r"\1 == '\2'"), | |
] | |
for pattern, replacement in rules: | |
if re.search(pattern, query): | |
query = re.sub(pattern, replacement, query) | |
break | |
# Handle AND/OR logic | |
query_pandas_syntax = query.replace(" and ", " and ") | |
query_pandas_syntaxs = query.replace(" or ", " or ") | |
filtered_df = df.query(query_pandas_syntax) | |
return filtered_df.head(10).to_markdown(index=False) | |
except Exception as e: | |
raise ValueError(f"Invalid query: {query_pandas_syntax}. Error: {e}") | |
except ImportError: | |
return "Error: pandas and openpyxl are not installed. Please install them with 'pip install pandas openpyxl'." | |
def arvix_search(query: str) -> str: | |
"""Search Arxiv for a query and return maximum 5 result. | |
Args: | |
query: The search query. | |
Returns: | |
A dictionary containing the formatted Arvix results, and snippet for the top 5 results. | |
""" | |
search_docs = ArxivLoader(query=query, load_max_docs=5).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
for doc in search_docs | |
]) | |
return {"arvix_results": formatted_search_docs} | |
def read_python_file(file_path: str) -> str: | |
""" | |
Reads and parses an Python file to markdown. | |
Args: | |
file_path: Path to the Python file | |
Returns: | |
Python file content. | |
""" | |
try: | |
# Just with markitdown | |
path = Path(file_path) | |
if not path.exists(): | |
raise FileNotFoundError(f"File not found: {file_path}") | |
ext = path.suffix.lower() | |
if ext == ".py": | |
md = MarkItDown(enable_plugins=True) | |
result = md.convert(file_path) | |
return result.text_content | |
else: | |
raise ValueError(f"Unsupported file extension: {ext}") | |
except Exception as err: | |
raise type(err)(f"Could not parse python file > {err}") | |
level1_tools = [ | |
multiply, | |
add, | |
subtract, | |
divide, | |
modulus, | |
wikipedia_search, | |
web_search, | |
#search_tool, | |
arvix_search, | |
convert_units, | |
query_table_data, | |
read_python_file, | |
] | |