Spaces:
Sleeping
Sleeping
import os | |
import random | |
import requests | |
import tempfile | |
import re | |
from typing import Dict | |
from pathlib import Path | |
from markitdown import MarkItDown | |
from urllib.parse import urlparse | |
from langchain_core.tools import tool | |
from langchain_core.messages import ToolMessage | |
from langchain_tavily import TavilySearch | |
from langchain_community.document_loaders import WikipediaLoader | |
from langchain_community.document_loaders import ArxivLoader | |
def web_search(query: str) -> ToolMessage: | |
"""Search in the web with Tavily for a query and return maximum 5 results. | |
Args: | |
query: The search query. | |
Returns: | |
Tavily output, and snippet for the top 5 results | |
""" | |
return TavilySearch(max_results=5, include_images=False).invoke({"query": query}) | |
def wikipedia_search(query: str) -> Dict[str, list]: | |
"""Search Wikipedia for a given query and return the first 5 results. | |
Args: | |
query: The search term or topic. | |
Returns: | |
A dictionary containing the formatted Wikipedia results. | |
""" | |
search_docs = WikipediaLoader(query=query, load_max_docs=5).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
] | |
) | |
return {"wiki_results": formatted_search_docs} | |
#Mathematical tools | |
def multiply(a: float, b: float) -> float: | |
"""Multiply two numbers. | |
Args: | |
a: first number | |
b: second number | |
Returns: | |
Multiplication result | |
""" | |
return a * b | |
def add(a: float, b: float) -> float: | |
"""Add two numbers. | |
Args: | |
a: first number | |
b: second number | |
Returns: | |
Addition result | |
""" | |
return a + b | |
def subtract(a: float, b: float) -> float: | |
"""Subtract two numbers. | |
Args: | |
a: first number | |
b: second number | |
Returns: | |
Subtraction result | |
""" | |
return a - b | |
def divide(a: float, b: float) -> float: | |
"""Divide two numbers. | |
Args: | |
a: first number | |
b: second number | |
Returns: | |
Division result | |
""" | |
if b == 0: | |
raise ValueError("Cannot divide by zero.") | |
return a / b | |
def modulus(a: int, b: int) -> int: | |
"""Get the modulus of two numbers. | |
Args: | |
a: first number | |
b: second number | |
Returns: | |
Modulus result | |
""" | |
return a % b | |
from langchain_core.tools import tool | |
def convert_units(value: float, from_unit: str, to_unit: str) -> float: | |
""" | |
Converts a value from one unit to another. | |
Args: | |
value: The numerical value to convert. | |
from_unit: The original unit (e.g. 'miles', 'kg', 'celsius'). | |
to_unit: The target unit (e.g. 'kilometers', 'lb', 'fahrenheit'). | |
Supported conversions: | |
- miles <-> kilometers | |
- kilograms <-> pounds | |
- celsius <-> fahrenheit | |
Returns: | |
The converted value result. | |
""" | |
conversions = { | |
("miles", "kilometers"): lambda v: v * 1.60934, | |
("kilometers", "miles"): lambda v: v / 1.60934, | |
("kilograms", "pounds"): lambda v: v * 2.20462, | |
("pounds", "kilograms"): lambda v: v / 2.20462, | |
("celsius", "fahrenheit"): lambda v: (v * 9/5) + 32, | |
("fahrenheit", "celsius"): lambda v: (v - 32) * 5/9, | |
} | |
key = (from_unit.lower(), to_unit.lower()) | |
if key not in conversions: | |
raise ValueError(f"Conversion from {from_unit} to {to_unit} not supported.") | |
return conversions[key](value) | |
def convert_query_to_pandas_syntax(natural_query: str, column_names: list) -> str: | |
""" | |
Converts a natural language query to pandas query syntax using basic heuristics. | |
Args: | |
natural_query: A string with a question or filter expression in plain English. | |
column_names: List of column names from the DataFrame. | |
Returns: | |
A best-effort string in pandas query() format. | |
""" | |
# Preprocess query | |
query = natural_query.lower().strip() | |
# Heuristic rules | |
rules = [ | |
(r"(\w+) greater than (\d+)", r"\1 > \2"), | |
(r"(\w+) less than (\d+)", r"\1 < \2"), | |
(r"(\w+) equal to ['\"]?([\w\s]+)['\"]?", r"\1 == '\2'"), | |
(r"(\w+) not equal to ['\"]?([\w\s]+)['\"]?", r"\1 != '\2'"), | |
(r"(\w+) more than (\d+)", r"\1 > \2"), | |
(r"(\w+) less than or equal to (\d+)", r"\1 <= \2"), | |
(r"(\w+) greater than or equal to (\d+)", r"\1 >= \2"), | |
(r"(\w+) is ['\"]?([\w\s]+)['\"]?", r"\1 == '\2'"), | |
] | |
for pattern, replacement in rules: | |
if re.search(pattern, query): | |
query = re.sub(pattern, replacement, query) | |
break | |
# Handle AND/OR logic | |
query = query.replace(" and ", " and ") | |
query = query.replace(" or ", " or ") | |
return query | |
def query_table_data(file_path: str, query_pandas_syntax: str, sheet_name: str = None) -> str: | |
""" | |
Loads a table from CSV or Excel and filters it using a pandas query. | |
Args: | |
file_path: Path to the table file (.xlsx, .xls). | |
query_pandas_syntax: A pandas-compatible query string, e.g., "Age > 30 and Country == 'USA'". | |
sheet_name: Optional sheet name if the file is Excel. | |
Returns: | |
A string representation (markdown) of the filtered table (max 10 rows). | |
""" | |
try: | |
import pandas as pd | |
path = Path(file_path) | |
if not path.exists(): | |
raise FileNotFoundError(f"File not found: {file_path}") | |
ext = path.suffix.lower() | |
if ext == ".csv": | |
df = pd.read_csv(path) | |
elif ext in [".xlsx", ".xls"]: | |
df = pd.read_excel(path, sheet_name=sheet_name) | |
else: | |
raise ValueError(f"Unsupported file extension: {ext}") | |
try: | |
filtered_df = df.query(query_pandas_syntax) | |
return filtered_df.head(10).to_markdown(index=False) | |
except Exception as e: | |
raise ValueError(f"Invalid query: {query_pandas_syntax}. Error: {e}") | |
except ImportError: | |
return "Error: pandas and openpyxl are not installed. Please install them with 'pip install pandas openpyxl'." | |
def arvix_search(query: str) -> str: | |
"""Search Arxiv for a query and return maximum 5 result. | |
Args: | |
query: The search query. | |
Returns: | |
A dictionary containing the formatted Arvix results, and snippet for the top 5 results. | |
""" | |
search_docs = ArxivLoader(query=query, load_max_docs=5).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
for doc in search_docs | |
]) | |
return {"arvix_results": formatted_search_docs} | |
def read_python_file(file_path: str) -> str: | |
""" | |
Reads and parses an Python file to markdown. | |
Args: | |
file_path: Path to the Python file | |
Returns: | |
Python file content. | |
""" | |
try: | |
# Just with markitdown | |
path = Path(file_path) | |
if not path.exists(): | |
raise FileNotFoundError(f"File not found: {file_path}") | |
ext = path.suffix.lower() | |
if ext == ".py": | |
md = MarkItDown(enable_plugins=True) | |
result = md.convert(file_path) | |
return result.text_content | |
else: | |
raise ValueError(f"Unsupported file extension: {ext}") | |
except Exception as err: | |
raise type(err)(f"Could not parse python file > {err}") | |
def save_and_read_file(content: str, filename: str = None) -> str: | |
""" | |
Save content to a temporary file and return the path. | |
Useful for processing files from the GAIA API. | |
Args: | |
content: The content to save to the file | |
filename: Optional filename, will generate a random name if not provided | |
Returns: | |
Path to the saved file | |
""" | |
temp_dir = tempfile.gettempdir() | |
if filename is None: | |
temp_file = tempfile.NamedTemporaryFile(delete=False) | |
filepath = temp_file.name | |
else: | |
filepath = os.path.join(temp_dir, filename) | |
# Write content to the file | |
with open(filepath, 'w') as f: | |
f.write(content) | |
return f"File saved to {filepath}. You can read this file to process its contents." | |
def download_file_from_url(url: str, filename: str) -> str: | |
""" | |
Download a file from a URL and save it to a temporary location. | |
Args: | |
url: The URL to download from | |
filename: filename | |
Returns: | |
Path to the downloaded file | |
""" | |
try: | |
# Parse URL to get filename if not provided | |
if not filename: | |
path = urlparse(url).path | |
filename = os.path.basename(path) | |
if not filename: | |
# Generate a random name if we couldn't extract one | |
import uuid | |
filename = f"downloaded_{uuid.uuid4().hex[:8]}" | |
# Create temporary file | |
temp_dir = tempfile.gettempdir() | |
filepath = os.path.join(temp_dir, filename) | |
# Download the file | |
response = requests.get(url, stream=True) | |
response.raise_for_status() | |
# Save the file | |
with open(filepath, "wb") as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
return f"File downloaded to {filepath}. You can now process this file." | |
except Exception as e: | |
return f"Error downloading file: {str(e)}" | |
def extract_text_from_image(image_path: str) -> str: | |
""" | |
Extracts text from an image using pytesseract OCR. | |
Args: | |
image_path: Path to the image file. | |
Returns: | |
A string with the extracted text or an error message. | |
""" | |
try: | |
from PIL import Image | |
import pytesseract | |
# Load the image | |
image = Image.open(image_path) | |
# Perform OCR | |
text = pytesseract.image_to_string(image) | |
return f"Extracted text from image:\n\n{text.strip()}" | |
except ImportError: | |
return ( | |
"Error: pytesseract or PIL is not installed. " | |
"Install them with 'pip install pytesseract pillow' and ensure Tesseract OCR is installed." | |
) | |
except FileNotFoundError: | |
return f"Error: File not found at '{image_path}'." | |
except Exception as e: | |
return f"Unexpected error during OCR: {str(e)}" | |
level1_tools = [ | |
multiply, | |
add, | |
subtract, | |
divide, | |
modulus, | |
wikipedia_search, | |
web_search, | |
arvix_search, | |
convert_units, | |
convert_query_to_pandas_syntax, | |
query_table_data, | |
download_file_from_url, | |
save_and_read_file, | |
read_python_file, | |
extract_text_from_image | |
] |