File size: 4,783 Bytes
488dc3e
1bbca12
751d628
 
 
 
 
 
 
1bbca12
488dc3e
1bbca12
751d628
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
488dc3e
 
 
 
 
751d628
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
488dc3e
751d628
 
 
 
 
 
 
 
488dc3e
 
 
751d628
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
488dc3e
751d628
 
 
 
 
 
 
488dc3e
751d628
488dc3e
751d628
488dc3e
 
751d628
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import logging
import os
import pandas as pd
import PyPDF2
import speech_recognition as sr
import re
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
from typing import Optional

logger = logging.getLogger(__name__)

class FileParserInput(BaseModel):
    task_id: str = Field(description="Task identifier")
    file_type: str = Field(description="File extension (e.g., pdf, csv)")
    file_path: str = Field(description="Path to the file")
    query: Optional[str] = Field(description="Query related to the file", default=None)

async def file_parser_func(task_id: str, file_type: str, file_path: str, query: Optional[str] = None) -> str:
    """
    Parse a file based on task_id, file_type, file_path, and query context.
    
    Args:
        task_id (str): Task identifier.
        file_type (str): File extension (e.g., 'xlsx', 'mp3', 'pdf').
        file_path (str): Path to the file.
        query (Optional[str]): Question context to guide parsing (e.g., for specific data extraction).
    
    Returns:
        str: Parsed content or error message.
    """
    try:
        if not os.path.exists(file_path):
            logger.warning(f"File not found: {file_path}")
            return "File not found"
        
        logger.info(f"Parsing file: {file_path} for task {task_id}")

        if file_type in ["xlsx", "xls"]:
            df = pd.read_excel(file_path, engine="openpyxl")
            if query and ("sum" in query.lower() or "total" in query.lower()):
                numerical_cols = df.select_dtypes(include=['float64', 'int64']).columns
                if numerical_cols.empty:
                    return "No numerical data found"
                if "food" in query.lower():
                    food_rows = df[df.apply(lambda x: "food" in str(x).lower(), axis=1)]
                    if not food_rows.empty and numerical_cols[0] in food_rows:
                        total = food_rows[numerical_cols[0]].sum()
                        return f"{total:.2f}"
                total = df[numerical_cols[0]].sum()
                return f"{total:.2f}"
            return df.to_string(index=False)

        elif file_type == "csv":
            df = pd.read_csv(file_path)
            if query and ("sum" in query.lower() or "total" in query.lower()):
                numerical_cols = df.select_dtypes(include=['float64', 'int64']).columns
                if numerical_cols.empty:
                    return "No numerical data found"
                total = df[numerical_cols[0]].sum()
                return f"{total:.2f}"
            return df.to_string(index=False)

        elif file_type == "pdf":
            with open(file_path, "rb") as f:
                reader = PyPDF2.PdfReader(f)
                text = "".join(page.extract_text() or "" for page in reader.pages)
                if query and "page number" in query.lower():
                    pages = re.findall(r'\b\d+\b', text)
                    return ", ".join(sorted(pages, key=int)) if pages else "No page numbers found"
                return text.strip() or "No text extracted"

        elif file_type == "txt":
            with open(file_path, "r", encoding="utf-8") as f:
                text = f.read()
                if query and "page number" in query.lower():
                    pages = re.findall(r'\b\d+\b', text)
                    return ", ".join(sorted(pages, key=int)) if pages else "No page numbers found"
                return text.strip()

        elif file_type == "mp3":
            recognizer = sr.Recognizer()
            with sr.AudioFile(file_path) as source:
                audio = recognizer.record(source)
            try:
                text = recognizer.recognize_google(audio)
                logger.debug(f"Transcribed audio: {text}")
                if query and "page number" in query.lower():
                    pages = re.findall(r'\b\d+\b', text)
                    return ", ".join(sorted(pages, key=int)) if pages else "No page numbers provided"
                return text
            except sr.UnknownValueError:
                logger.error("Could not understand audio")
                return "No text transcribed from audio"
            except Exception as e:
                logger.error(f"Audio parsing failed: {e}")
                return "Error transcribing audio"

        else:
            logger.warning(f"Unsupported file type: {file_type}")
            return f"Unsupported file type: {file_type}"

    except Exception as e:
        logger.error(f"Error parsing file for task {task_id}: {e}")
        return f"Error: {str(e)}"

file_parser_tool = StructuredTool.from_function(
    func=file_parser_func,
    name="file_parser_tool",
    args_schema=FileParserInput,
    coroutine=file_parser_func
)