from rag.agents.interface import Pipeline from rich.progress import Progress, SpinnerColumn, TextColumn from typing import Any from pydantic import create_model from typing import List import warnings import box import yaml import timeit from rich import print from llama_index.core import SimpleDirectoryReader from llama_index.multi_modal_llms.ollama import OllamaMultiModal from llama_index.core.program import MultiModalLLMCompletionProgram from llama_index.core.output_parsers import PydanticOutputParser warnings.filterwarnings("ignore", category=DeprecationWarning) warnings.filterwarnings("ignore", category=UserWarning) # Import config vars with open('config.yml', 'r', encoding='utf8') as ymlfile: cfg = box.Box(yaml.safe_load(ymlfile)) class VLlamaIndexPipeline(Pipeline): def run_pipeline(self, payload: str, query_inputs: [str], query_types: [str], keywords: [str], query: str, file_path: str, index_name: str, options: List[str] = None, group_by_rows: bool = True, update_targets: bool = True, debug: bool = False, local: bool = True) -> Any: print(f"\nRunning pipeline with {payload}\n") start = timeit.default_timer() if file_path is None: raise ValueError("File path is required for vllamaindex pipeline") mm_model = self.invoke_pipeline_step(lambda: OllamaMultiModal(model=cfg.LLM_VLLAMAINDEX), "Loading Ollama MultiModal...", local) # load as image documents image_documents = self.invoke_pipeline_step(lambda: SimpleDirectoryReader(input_files=[file_path], required_exts=[".jpg", ".JPG", ".JPEG"]).load_data(), "Loading image documents...", local) ResponseModel = self.invoke_pipeline_step(lambda: self.build_response_class(query_inputs, query_types), "Building dynamic response class...", local) prompt_template_str = """\ {query_str} Return the answer as a Pydantic object. The Pydantic schema is given below: """ mm_program = MultiModalLLMCompletionProgram.from_defaults( output_parser=PydanticOutputParser(ResponseModel), image_documents=image_documents, prompt_template_str=prompt_template_str, multi_modal_llm=mm_model, verbose=True, ) try: response = self.invoke_pipeline_step(lambda: mm_program(query_str=query), "Running inference...", local) except ValueError as e: print(f"Error: {e}") msg = 'Inference failed' return '{"answer": "' + msg + '"}' end = timeit.default_timer() print(f"\nJSON response:\n") for res in response: print(res) print('=' * 50) print(f"Time to retrieve answer: {end - start}") return response # Function to safely evaluate type strings def safe_eval_type(self, type_str, context): try: return eval(type_str, {}, context) except NameError: raise ValueError(f"Type '{type_str}' is not recognized") def build_response_class(self, query_inputs, query_types_as_strings): # Controlled context for eval context = { 'List': List, 'str': str, 'int': int, 'float': float # Include other necessary types or typing constructs here } # Convert string representations to actual types query_types = [self.safe_eval_type(type_str, context) for type_str in query_types_as_strings] # Create fields dictionary fields = {name: (type_, ...) for name, type_ in zip(query_inputs, query_types)} DynamicModel = create_model('DynamicModel', **fields) return DynamicModel def invoke_pipeline_step(self, task_call, task_description, local): if local: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), transient=False, ) as progress: progress.add_task(description=task_description, total=None) ret = task_call() else: print(task_description) ret = task_call() return ret