pc-ai-data-analyst-dup / data_processing.py
dolphinium
dynamic core selection according to agentic api's output.
dae6a10
raw
history blame
8.02 kB
"""
Core data processing and analysis logic for the PharmaCircle AI Data Analyst.
This module orchestrates the main analysis workflow:
1. Takes a user's natural language query.
2. Uses the LLM to generate a structured analysis plan.
3. Executes parallel queries against Solr for quantitative and qualitative data.
4. Generates a data visualization using the LLM.
5. Synthesizes the findings into a comprehensive, user-facing report.
"""
import json
import re
import datetime
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import concurrent.futures
import copy
import google.generativeai as genai
import urllib
from llm_prompts import (
get_analysis_plan_prompt,
get_synthesis_report_prompt,
get_visualization_code_prompt
)
from extract_results import get_search_list_params
def parse_suggestions_from_report(report_text):
"""Extracts numbered suggestions from the report's markdown text."""
suggestions_match = re.search(r"### (?:Deeper Dive: Suggested Follow-up Analyses|Suggestions for Further Exploration)\s*\n(.*?)$", report_text, re.DOTALL | re.IGNORECASE)
if not suggestions_match: return []
suggestions_text = suggestions_match.group(1)
suggestions = re.findall(r"^\s*\d+\.\s*(.*)", suggestions_text, re.MULTILINE)
return [s.strip() for s in suggestions]
def llm_generate_analysis_plan_with_history(llm_model, natural_language_query, chat_history):
"""
Generates a complete analysis plan from a user query, considering chat history
and dynamic field suggestions from an external API.
"""
search_fields, search_name, field_mappings = [], "", {}
try:
# Call the external API to get dynamic fields, core name, and mappings
search_fields, search_name, field_mappings = get_search_list_params(natural_language_query)
print(f"API returned core: '{search_name}' with {len(search_fields)} fields and {len(field_mappings)} mappings.")
except Exception as e:
print(f"Warning: Could not retrieve dynamic search fields. Proceeding without them. Error: {e}")
# Determine the core name, default to 'news' if not provided by the API
core_name = search_name if search_name else 'news'
# Apply the field mappings to the suggestions before sending them to the LLM
mapped_search_fields = []
if search_fields and field_mappings:
for field in search_fields:
original_name = field.get('field_name')
# Create a new dict to avoid modifying the original
mapped_field = field.copy()
if original_name in field_mappings:
mapped_field['field_name'] = field_mappings[original_name]
print(f"Mapped field '{original_name}' to '{mapped_field['field_name']}'")
mapped_search_fields.append(mapped_field)
else:
mapped_search_fields = search_fields
# Generate the prompt, passing the mapped fields and the dynamic core name
prompt = get_analysis_plan_prompt(natural_language_query, chat_history, mapped_search_fields, core_name)
try:
response = llm_model.generate_content(prompt)
cleaned_text = re.sub(r'```json\s*|\s*```', '', response.text, flags=re.MULTILINE | re.DOTALL).strip()
plan = json.loads(cleaned_text)
# Return the plan, the mapped fields for UI display, and the core name
return plan, mapped_search_fields, core_name
except Exception as e:
raw_response_text = response.text if 'response' in locals() else 'N/A'
print(f"Error in llm_generate_analysis_plan_with_history: {e}\nRaw Response:\n{raw_response_text}")
# Return None for the plan but still return other data for debugging
return None, mapped_search_fields, core_name
def execute_quantitative_query(solr_client, plan):
"""Executes the facet query to get aggregate data."""
if not plan or 'quantitative_request' not in plan or 'json.facet' not in plan.get('quantitative_request', {}):
return None, None
try:
params = {
"q": plan.get('query_filter', '*_*'),
"rows": 0,
"json.facet": json.dumps(plan['quantitative_request']['json.facet'])
}
# Build the full Solr URL manually (for logging) from the client's current URL
base_url = f"{solr_client.url}/select"
query_string = urllib.parse.urlencode(params)
full_url = f"{base_url}?{query_string}"
print(f"[DEBUG] Solr QUANTITATIVE query URL: {full_url}")
results = solr_client.search(**params)
return results.raw_response.get("facets", {}), full_url
except Exception as e:
print(f"Error in quantitative query on core specified in client ({solr_client.url}): {e}")
return None, None
def execute_qualitative_query(solr_client, plan):
"""Executes the grouping query to get the best example docs."""
if not plan or 'qualitative_request' not in plan:
return None, None
try:
qual_request = copy.deepcopy(plan['qualitative_request'])
params = {
"q": plan.get('query_filter', '*_*'),
"rows": 5, # Get a few examples per group
"fl": "*,score",
**qual_request
}
# Build the full Solr URL manually (for logging) from the client's current URL
base_url = f"{solr_client.url}/select"
query_string = urllib.parse.urlencode(params)
full_url = f"{base_url}?{query_string}"
print(f"[DEBUG] Solr QUALITATIVE query URL: {full_url}")
results = solr_client.search(**params)
return results.grouped, full_url
except Exception as e:
print(f"Error in qualitative query on core specified in client ({solr_client.url}): {e}")
return None, None
def llm_synthesize_enriched_report_stream(llm_model, query, quantitative_data, qualitative_data, plan):
"""
Generates an enriched report by synthesizing quantitative aggregates
and qualitative examples, and streams the result.
"""
prompt = get_synthesis_report_prompt(query, quantitative_data, qualitative_data, plan)
try:
response_stream = llm_model.generate_content(prompt, stream=True)
for chunk in response_stream:
yield chunk.text
except Exception as e:
print(f"Error in llm_synthesize_enriched_report_stream: {e}")
yield "Sorry, I was unable to generate a report for this data."
def llm_generate_visualization_code(llm_model, query_context, facet_data):
"""Generates Python code for visualization based on query and data."""
prompt = get_visualization_code_prompt(query_context, facet_data)
try:
generation_config = genai.types.GenerationConfig(temperature=0)
response = llm_model.generate_content(prompt, generation_config=generation_config)
code = re.sub(r'^```python\s*|```$', '', response.text, flags=re.MULTILINE)
return code
except Exception as e:
print(f"Error in llm_generate_visualization_code: {e}\nRaw response: {response.text}")
return None
def execute_viz_code_and_get_path(viz_code, facet_data):
"""Executes visualization code and returns the path to the saved plot image."""
if not viz_code: return None
try:
if not os.path.exists('/tmp/plots'): os.makedirs('/tmp/plots')
plot_path = f"/tmp/plots/plot_{datetime.datetime.now().timestamp()}.png"
exec_globals = {'facet_data': facet_data, 'plt': plt, 'sns': sns, 'pd': pd}
exec(viz_code, exec_globals)
fig = exec_globals.get('fig')
if fig:
fig.savefig(plot_path, bbox_inches='tight')
plt.close(fig)
return plot_path
return None
except Exception as e:
print(f"ERROR executing visualization code: {e}\n---Code---\n{viz_code}")
return None