import os import random import requests import tempfile import re from typing import Dict from pathlib import Path from markitdown import MarkItDown from urllib.parse import urlparse from langchain_core.tools import tool from langchain_core.messages import ToolMessage from langchain_tavily import TavilySearch from langchain_community.document_loaders import WikipediaLoader from langchain_community.document_loaders import ArxivLoader @tool def web_search(query: str) -> ToolMessage: """Search in the web with Tavily for a query and return maximum 5 results. Args: query: The search query. Returns: Tavily output, and snippet for the top 5 results """ return TavilySearch(max_results=5, include_images=False).invoke({"query": query}) @tool def wikipedia_search(query: str) -> Dict[str, list]: """Search Wikipedia for a given query and return the first 5 results. Args: query: The search term or topic. Returns: A dictionary containing the formatted Wikipedia results. """ search_docs = WikipediaLoader(query=query, load_max_docs=5).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ] ) return {"wiki_results": formatted_search_docs} #Mathematical tools @tool def multiply(a: float, b: float) -> float: """Multiply two numbers. Args: a: first number b: second number Returns: Multiplication result """ return a * b @tool def add(a: float, b: float) -> float: """Add two numbers. Args: a: first number b: second number Returns: Addition result """ return a + b @tool def subtract(a: float, b: float) -> float: """Subtract two numbers. Args: a: first number b: second number Returns: Subtraction result """ return a - b @tool def divide(a: float, b: float) -> float: """Divide two numbers. Args: a: first number b: second number Returns: Division result """ if b == 0: raise ValueError("Cannot divide by zero.") return a / b @tool def modulus(a: int, b: int) -> int: """Get the modulus of two numbers. Args: a: first number b: second number Returns: Modulus result """ return a % b from langchain_core.tools import tool @tool def convert_units(value: float, from_unit: str, to_unit: str) -> float: """ Converts a value from one unit to another. Args: value: The numerical value to convert. from_unit: The original unit (e.g. 'miles', 'kg', 'celsius'). to_unit: The target unit (e.g. 'kilometers', 'lb', 'fahrenheit'). Supported conversions: - miles <-> kilometers - kilograms <-> pounds - celsius <-> fahrenheit Returns: The converted value result. """ conversions = { ("miles", "kilometers"): lambda v: v * 1.60934, ("kilometers", "miles"): lambda v: v / 1.60934, ("kilograms", "pounds"): lambda v: v * 2.20462, ("pounds", "kilograms"): lambda v: v / 2.20462, ("celsius", "fahrenheit"): lambda v: (v * 9/5) + 32, ("fahrenheit", "celsius"): lambda v: (v - 32) * 5/9, } key = (from_unit.lower(), to_unit.lower()) if key not in conversions: raise ValueError(f"Conversion from {from_unit} to {to_unit} not supported.") return conversions[key](value) def convert_query_to_pandas_syntax(natural_query: str, column_names: list) -> str: """ Converts a natural language query to pandas query syntax using basic heuristics. Args: natural_query: A string with a question or filter expression in plain English. column_names: List of column names from the DataFrame. Returns: A best-effort string in pandas query() format. """ # Preprocess query query = natural_query.lower().strip() # Heuristic rules rules = [ (r"(\w+) greater than (\d+)", r"\1 > \2"), (r"(\w+) less than (\d+)", r"\1 < \2"), (r"(\w+) equal to ['\"]?([\w\s]+)['\"]?", r"\1 == '\2'"), (r"(\w+) not equal to ['\"]?([\w\s]+)['\"]?", r"\1 != '\2'"), (r"(\w+) more than (\d+)", r"\1 > \2"), (r"(\w+) less than or equal to (\d+)", r"\1 <= \2"), (r"(\w+) greater than or equal to (\d+)", r"\1 >= \2"), (r"(\w+) is ['\"]?([\w\s]+)['\"]?", r"\1 == '\2'"), ] for pattern, replacement in rules: if re.search(pattern, query): query = re.sub(pattern, replacement, query) break # Handle AND/OR logic query = query.replace(" and ", " and ") query = query.replace(" or ", " or ") return query @tool def query_table_data(file_path: str, query_pandas_syntax: str, sheet_name: str = None) -> str: """ Loads a table from CSV or Excel and filters it using a pandas query. Args: file_path: Path to the table file (.xlsx, .xls). query_pandas_syntax: A pandas-compatible query string, e.g., "Age > 30 and Country == 'USA'". sheet_name: Optional sheet name if the file is Excel. Returns: A string representation (markdown) of the filtered table (max 10 rows). """ try: import pandas as pd path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"File not found: {file_path}") ext = path.suffix.lower() if ext == ".csv": df = pd.read_csv(path) elif ext in [".xlsx", ".xls"]: df = pd.read_excel(path, sheet_name=sheet_name) else: raise ValueError(f"Unsupported file extension: {ext}") try: filtered_df = df.query(query_pandas_syntax) return filtered_df.head(10).to_markdown(index=False) except Exception as e: raise ValueError(f"Invalid query: {query_pandas_syntax}. Error: {e}") except ImportError: return "Error: pandas and openpyxl are not installed. Please install them with 'pip install pandas openpyxl'." @tool def arvix_search(query: str) -> str: """Search Arxiv for a query and return maximum 5 result. Args: query: The search query. Returns: A dictionary containing the formatted Arvix results, and snippet for the top 5 results. """ search_docs = ArxivLoader(query=query, load_max_docs=5).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content[:1000]}\n' for doc in search_docs ]) return {"arvix_results": formatted_search_docs} @tool def read_python_file(file_path: str) -> str: """ Reads and parses an Python file to markdown. Args: file_path: Path to the Python file Returns: Python file content. """ try: # Just with markitdown path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"File not found: {file_path}") ext = path.suffix.lower() if ext == ".py": md = MarkItDown(enable_plugins=True) result = md.convert(file_path) return result.text_content else: raise ValueError(f"Unsupported file extension: {ext}") except Exception as err: raise type(err)(f"Could not parse python file > {err}") @tool def save_and_read_file(content: str, filename: str = None) -> str: """ Save content to a temporary file and return the path. Useful for processing files from the GAIA API. Args: content: The content to save to the file filename: Optional filename, will generate a random name if not provided Returns: Path to the saved file """ temp_dir = tempfile.gettempdir() if filename is None: temp_file = tempfile.NamedTemporaryFile(delete=False) filepath = temp_file.name else: filepath = os.path.join(temp_dir, filename) # Write content to the file with open(filepath, 'w') as f: f.write(content) return f"File saved to {filepath}. You can read this file to process its contents." def download_file_from_url(url: str, filename: str) -> str: """ Download a file from a URL and save it to a temporary location. Args: url: The URL to download from filename: filename Returns: Path to the downloaded file """ try: # Parse URL to get filename if not provided if not filename: path = urlparse(url).path filename = os.path.basename(path) if not filename: # Generate a random name if we couldn't extract one import uuid filename = f"downloaded_{uuid.uuid4().hex[:8]}" # Create temporary file temp_dir = tempfile.gettempdir() filepath = os.path.join(temp_dir, filename) # Download the file response = requests.get(url, stream=True) response.raise_for_status() # Save the file with open(filepath, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return f"File downloaded to {filepath}. You can now process this file." except Exception as e: return f"Error downloading file: {str(e)}" @tool def extract_text_from_image(image_path: str) -> str: """ Extracts text from an image using pytesseract OCR. Args: image_path: Path to the image file. Returns: A string with the extracted text or an error message. """ try: from PIL import Image import pytesseract # Load the image image = Image.open(image_path) # Perform OCR text = pytesseract.image_to_string(image) return f"Extracted text from image:\n\n{text.strip()}" except ImportError: return ( "Error: pytesseract or PIL is not installed. " "Install them with 'pip install pytesseract pillow' and ensure Tesseract OCR is installed." ) except FileNotFoundError: return f"Error: File not found at '{image_path}'." except Exception as e: return f"Unexpected error during OCR: {str(e)}" level1_tools = [ multiply, add, subtract, divide, modulus, wikipedia_search, web_search, arvix_search, convert_units, convert_query_to_pandas_syntax, query_table_data, download_file_from_url, save_and_read_file, read_python_file, extract_text_from_image ]