Spaces:
Sleeping
Sleeping
File size: 5,048 Bytes
42cd5f6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
from rag.agents.interface import Pipeline
from rich.progress import Progress, SpinnerColumn, TextColumn
from typing import Any
from pydantic import create_model
from typing import List
import warnings
import box
import yaml
import timeit
from rich import print
from llama_index.core import SimpleDirectoryReader
from llama_index.multi_modal_llms.ollama import OllamaMultiModal
from llama_index.core.program import MultiModalLLMCompletionProgram
from llama_index.core.output_parsers import PydanticOutputParser
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=UserWarning)
# Import config vars
with open('config.yml', 'r', encoding='utf8') as ymlfile:
cfg = box.Box(yaml.safe_load(ymlfile))
class VLlamaIndexPipeline(Pipeline):
def run_pipeline(self,
payload: str,
query_inputs: [str],
query_types: [str],
keywords: [str],
query: str,
file_path: str,
index_name: str,
options: List[str] = None,
group_by_rows: bool = True,
update_targets: bool = True,
debug: bool = False,
local: bool = True) -> Any:
print(f"\nRunning pipeline with {payload}\n")
start = timeit.default_timer()
if file_path is None:
raise ValueError("File path is required for vllamaindex pipeline")
mm_model = self.invoke_pipeline_step(lambda: OllamaMultiModal(model=cfg.LLM_VLLAMAINDEX),
"Loading Ollama MultiModal...",
local)
# load as image documents
image_documents = self.invoke_pipeline_step(lambda: SimpleDirectoryReader(input_files=[file_path],
required_exts=[".jpg", ".JPG",
".JPEG"]).load_data(),
"Loading image documents...",
local)
ResponseModel = self.invoke_pipeline_step(lambda: self.build_response_class(query_inputs, query_types),
"Building dynamic response class...",
local)
prompt_template_str = """\
{query_str}
Return the answer as a Pydantic object. The Pydantic schema is given below:
"""
mm_program = MultiModalLLMCompletionProgram.from_defaults(
output_parser=PydanticOutputParser(ResponseModel),
image_documents=image_documents,
prompt_template_str=prompt_template_str,
multi_modal_llm=mm_model,
verbose=True,
)
try:
response = self.invoke_pipeline_step(lambda: mm_program(query_str=query),
"Running inference...",
local)
except ValueError as e:
print(f"Error: {e}")
msg = 'Inference failed'
return '{"answer": "' + msg + '"}'
end = timeit.default_timer()
print(f"\nJSON response:\n")
for res in response:
print(res)
print('=' * 50)
print(f"Time to retrieve answer: {end - start}")
return response
# Function to safely evaluate type strings
def safe_eval_type(self, type_str, context):
try:
return eval(type_str, {}, context)
except NameError:
raise ValueError(f"Type '{type_str}' is not recognized")
def build_response_class(self, query_inputs, query_types_as_strings):
# Controlled context for eval
context = {
'List': List,
'str': str,
'int': int,
'float': float
# Include other necessary types or typing constructs here
}
# Convert string representations to actual types
query_types = [self.safe_eval_type(type_str, context) for type_str in query_types_as_strings]
# Create fields dictionary
fields = {name: (type_, ...) for name, type_ in zip(query_inputs, query_types)}
DynamicModel = create_model('DynamicModel', **fields)
return DynamicModel
def invoke_pipeline_step(self, task_call, task_description, local):
if local:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
transient=False,
) as progress:
progress.add_task(description=task_description, total=None)
ret = task_call()
else:
print(task_description)
ret = task_call()
return ret
|