mchinea
Update agent and tools
86b31ff
raw
history blame
10.6 kB
import os
import random
import requests
import tempfile
import re
from typing import Dict
from pathlib import Path
#from markitdown import MarkItDown
from urllib.parse import urlparse
from langchain_core.tools import tool
from langchain_core.messages import ToolMessage
from langchain_tavily import TavilySearch
from langchain_community.document_loaders import WikipediaLoader
from langchain_community.document_loaders import ArxivLoader
@tool
def web_search(query: str) -> ToolMessage:
"""Search in the web with Tavily for a query and return maximum 5 results.
Args:
query: The search query.
Returns:
Tavily output, and snippet for the top 5 results
"""
return TavilySearch(max_results=5, include_images=False).invoke({"query": query})
@tool
def wikipedia_search(query: str) -> Dict[str, list]:
"""Search Wikipedia for a given query and return the first 5 results.
Args:
query: The search term or topic.
Returns:
A dictionary containing the formatted Wikipedia results.
"""
search_docs = WikipediaLoader(query=query, load_max_docs=5).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)
return {"wiki_results": formatted_search_docs}
#Mathematical tools
@tool
def multiply(a: float, b: float) -> float:
"""Multiply two numbers.
Args:
a: first number
b: second number
Returns:
Multiplication result
"""
return a * b
@tool
def add(a: float, b: float) -> float:
"""Add two numbers.
Args:
a: first number
b: second number
Returns:
Addition result
"""
return a + b
@tool
def subtract(a: float, b: float) -> float:
"""Subtract two numbers.
Args:
a: first number
b: second number
Returns:
Subtraction result
"""
return a - b
@tool
def divide(a: float, b: float) -> float:
"""Divide two numbers.
Args:
a: first number
b: second number
Returns:
Division result
"""
if b == 0:
raise ValueError("Cannot divide by zero.")
return a / b
@tool
def modulus(a: int, b: int) -> int:
"""Get the modulus of two numbers.
Args:
a: first number
b: second number
Returns:
Modulus result
"""
return a % b
from langchain_core.tools import tool
@tool
def convert_units(value: float, from_unit: str, to_unit: str) -> float:
"""
Converts a value from one unit to another.
Args:
value: The numerical value to convert.
from_unit: The original unit (e.g. 'miles', 'kg', 'celsius').
to_unit: The target unit (e.g. 'kilometers', 'lb', 'fahrenheit').
Supported conversions:
- miles <-> kilometers
- kilograms <-> pounds
- celsius <-> fahrenheit
Returns:
The converted value result.
"""
conversions = {
("miles", "kilometers"): lambda v: v * 1.60934,
("kilometers", "miles"): lambda v: v / 1.60934,
("kilograms", "pounds"): lambda v: v * 2.20462,
("pounds", "kilograms"): lambda v: v / 2.20462,
("celsius", "fahrenheit"): lambda v: (v * 9/5) + 32,
("fahrenheit", "celsius"): lambda v: (v - 32) * 5/9,
}
key = (from_unit.lower(), to_unit.lower())
if key not in conversions:
raise ValueError(f"Conversion from {from_unit} to {to_unit} not supported.")
return conversions[key](value)
@tool
def query_table_data(file_path: str, query: str, sheet_name: str = None) -> str:
"""
Loads a table from CSV or Excel and filters it using a pandas query.
Args:
file_path: Path to the table file (.xlsx, .xls).
query_pandas_syntax: A pandas-compatible query string, e.g., "Age > 30 and Country == 'USA'".
sheet_name: Optional sheet name if the file is Excel.
Returns:
A string representation (markdown) of the filtered table (max 10 rows).
"""
try:
import pandas as pd
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
ext = path.suffix.lower()
if ext == ".csv":
df = pd.read_csv(path)
elif ext in [".xlsx", ".xls"]:
df = pd.read_excel(path, sheet_name=sheet_name)
else:
raise ValueError(f"Unsupported file extension: {ext}")
try:
#Converts a natural language query to pandas query syntax using basic heuristics.
# Preprocess query
query_l = query.lower().strip()
# Heuristic rules
rules = [
(r"(\w+) greater than (\d+)", r"\1 > \2"),
(r"(\w+) less than (\d+)", r"\1 < \2"),
(r"(\w+) equal to ['\"]?([\w\s]+)['\"]?", r"\1 == '\2'"),
(r"(\w+) not equal to ['\"]?([\w\s]+)['\"]?", r"\1 != '\2'"),
(r"(\w+) more than (\d+)", r"\1 > \2"),
(r"(\w+) less than or equal to (\d+)", r"\1 <= \2"),
(r"(\w+) greater than or equal to (\d+)", r"\1 >= \2"),
(r"(\w+) is ['\"]?([\w\s]+)['\"]?", r"\1 == '\2'"),
]
for pattern, replacement in rules:
if re.search(pattern, query):
query = re.sub(pattern, replacement, query)
break
# Handle AND/OR logic
query_pandas_syntax = query.replace(" and ", " and ")
query_pandas_syntaxs = query.replace(" or ", " or ")
filtered_df = df.query(query_pandas_syntax)
return filtered_df.head(10).to_markdown(index=False)
except Exception as e:
raise ValueError(f"Invalid query: {query_pandas_syntax}. Error: {e}")
except ImportError:
return "Error: pandas and openpyxl are not installed. Please install them with 'pip install pandas openpyxl'."
@tool
def arvix_search(query: str) -> str:
"""Search Arxiv for a query and return maximum 5 result.
Args:
query: The search query.
Returns:
A dictionary containing the formatted Arvix results, and snippet for the top 5 results.
"""
search_docs = ArxivLoader(query=query, load_max_docs=5).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>'
for doc in search_docs
])
return {"arvix_results": formatted_search_docs}
@tool
def read_python_file(file_path: str) -> str:
"""
Reads and parses an Python file to markdown.
Args:
file_path: Path to the Python file
Returns:
Python file content.
"""
try:
# Just with markitdown
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
ext = path.suffix.lower()
if ext == ".py":
md = MarkItDown(enable_plugins=True)
result = md.convert(file_path)
return result.text_content
else:
raise ValueError(f"Unsupported file extension: {ext}")
except Exception as err:
raise type(err)(f"Could not parse python file > {err}")
@tool
def save_and_read_file(content: str, filename: str = None) -> str:
"""
Save content to a temporary file and return the path.
Useful for processing files from the GAIA API.
Args:
content: The content to save to the file
filename: Optional filename, will generate a random name if not provided
Returns:
Path to the saved file
"""
temp_dir = tempfile.gettempdir()
if filename is None:
temp_file = tempfile.NamedTemporaryFile(delete=False)
filepath = temp_file.name
else:
filepath = os.path.join(temp_dir, filename)
# Write content to the file
with open(filepath, 'w') as f:
f.write(content)
return f"File saved to {filepath}. You can read this file to process its contents."
def download_file_from_url(url: str, filename: str) -> str:
"""
Download a file from a URL and save it to a temporary location.
Args:
url: The URL to download from
filename: filename
Returns:
Path to the downloaded file
"""
try:
# Parse URL to get filename if not provided
if not filename:
path = urlparse(url).path
filename = os.path.basename(path)
if not filename:
# Generate a random name if we couldn't extract one
import uuid
filename = f"downloaded_{uuid.uuid4().hex[:8]}"
# Create temporary file
temp_dir = tempfile.gettempdir()
filepath = os.path.join(temp_dir, filename)
# Download the file
response = requests.get(url, stream=True)
response.raise_for_status()
# Save the file
with open(filepath, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return f"File downloaded to {filepath}. You can now process this file."
except Exception as e:
return f"Error downloading file: {str(e)}"
@tool
def extract_text_from_image(image_path: str) -> str:
"""
Extracts text from an image using pytesseract OCR.
Args:
image_path: Path to the image file.
Returns:
A string with the extracted text or an error message.
"""
try:
from PIL import Image
import pytesseract
# Load the image
image = Image.open(image_path)
# Perform OCR
text = pytesseract.image_to_string(image)
return f"Extracted text from image:\n\n{text.strip()}"
except ImportError:
return (
"Error: pytesseract or PIL is not installed. "
"Install them with 'pip install pytesseract pillow' and ensure Tesseract OCR is installed."
)
except FileNotFoundError:
return f"Error: File not found at '{image_path}'."
except Exception as e:
return f"Unexpected error during OCR: {str(e)}"
level1_tools = [
multiply,
add,
subtract,
divide,
modulus,
wikipedia_search,
web_search,
arvix_search,
convert_units,
query_table_data,
download_file_from_url,
save_and_read_file,
read_python_file,
extract_text_from_image
]