from rag.agents.interface import Pipeline from llama_index.core.program import LLMTextCompletionProgram import json from llama_index.llms.ollama import Ollama from typing import List from pydantic import create_model from rich.progress import Progress, SpinnerColumn, TextColumn import requests import warnings import box import yaml import timeit from rich import print from typing import Any warnings.filterwarnings("ignore", category=DeprecationWarning) warnings.filterwarnings("ignore", category=UserWarning) # Import config vars with open('config.yml', 'r', encoding='utf8') as ymlfile: cfg = box.Box(yaml.safe_load(ymlfile)) class VProcessorPipeline(Pipeline): def run_pipeline(self, payload: str, query_inputs: [str], query_types: [str], keywords: [str], query: str, file_path: str, index_name: str, options: List[str] = None, group_by_rows: bool = True, update_targets: bool = True, debug: bool = False, local: bool = True) -> Any: print(f"\nRunning pipeline with {payload}\n") start = timeit.default_timer() if file_path is None: raise ValueError("File path is required for vprocessor pipeline") with open(file_path, "rb") as file: files = {'file': (file_path, file, 'image/jpeg')} data = { 'image_url': '' } response = self.invoke_pipeline_step(lambda: requests.post(cfg.OCR_ENDPOINT_VPROCESSOR, data=data, files=files, timeout=180), "Running OCR...", local) if response.status_code != 200: print('Request failed with status code:', response.status_code) print('Response:', response.text) return "Failed to process file. Please try again." end = timeit.default_timer() print(f"Time to run OCR: {end - start}") start = timeit.default_timer() data = response.json() ResponseModel = self.invoke_pipeline_step(lambda: self.build_response_class(query_inputs, query_types), "Building dynamic response class...", local) prompt_template_str = """\ """ + query + """\ using this structured data, coming from OCR {document_data}.\ """ llm_ollama = self.invoke_pipeline_step(lambda: Ollama(model=cfg.LLM_VPROCESSOR, base_url=cfg.OLLAMA_BASE_URL_VPROCESSOR, temperature=0, request_timeout=900), "Loading Ollama...", local) program = LLMTextCompletionProgram.from_defaults( output_cls=ResponseModel, prompt_template_str=prompt_template_str, llm=llm_ollama, verbose=True, ) output = self.invoke_pipeline_step(lambda: program(document_data=data), "Running inference...", local) answer = self.beautify_json(output.model_dump_json()) end = timeit.default_timer() print(f"\nJSON response:\n") print(answer + '\n') print('=' * 50) print(f"Time to retrieve answer: {end - start}") return answer def prepare_files(self, file_path, file): if file_path is not None: with open(file_path, "rb") as file: files = {'file': (file_path, file, 'image/jpeg')} data = { 'image_url': '' } else: files = {'file': (file.filename, file.file, file.content_type)} data = { 'image_url': '' } return data, files # Function to safely evaluate type strings def safe_eval_type(self, type_str, context): try: return eval(type_str, {}, context) except NameError: raise ValueError(f"Type '{type_str}' is not recognized") def build_response_class(self, query_inputs, query_types_as_strings): # Controlled context for eval context = { 'List': List, 'str': str, 'int': int, 'float': float # Include other necessary types or typing constructs here } # Convert string representations to actual types query_types = [self.safe_eval_type(type_str, context) for type_str in query_types_as_strings] # Create fields dictionary fields = {name: (type_, ...) for name, type_ in zip(query_inputs, query_types)} DynamicModel = create_model('DynamicModel', **fields) return DynamicModel def invoke_pipeline_step(self, task_call, task_description, local): if local: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), transient=False, ) as progress: progress.add_task(description=task_description, total=None) ret = task_call() else: print(task_description) ret = task_call() return ret def beautify_json(self, result): try: # Convert and pretty print data = json.loads(str(result)) data = json.dumps(data, indent=4) return data except (json.decoder.JSONDecodeError, TypeError): print("The response is not in JSON format:\n") print(result) return {}