|
""" |
|
Core data processing and analysis logic for the PharmaCircle AI Data Analyst. |
|
|
|
This module orchestrates the main analysis workflow: |
|
1. Takes a user's natural language query. |
|
2. Uses the LLM to generate a structured analysis plan. |
|
3. Executes parallel queries against Solr for quantitative and qualitative data. |
|
4. Generates a data visualization using the LLM. |
|
5. Synthesizes the findings into a comprehensive, user-facing report. |
|
""" |
|
|
|
|
|
import json |
|
import re |
|
import datetime |
|
import pandas as pd |
|
import matplotlib.pyplot as plt |
|
import seaborn as sns |
|
import os |
|
import concurrent.futures |
|
import copy |
|
import google.generativeai as genai |
|
|
|
from llm_prompts import ( |
|
get_analysis_plan_prompt, |
|
get_synthesis_report_prompt, |
|
get_visualization_code_prompt |
|
) |
|
|
|
def parse_suggestions_from_report(report_text): |
|
"""Extracts numbered suggestions from the report's markdown text.""" |
|
suggestions_match = re.search(r"### (?:Deeper Dive: Suggested Follow-up Analyses|Suggestions for Further Exploration)\s*\n(.*?)$", report_text, re.DOTALL | re.IGNORECASE) |
|
if not suggestions_match: return [] |
|
suggestions_text = suggestions_match.group(1) |
|
suggestions = re.findall(r"^\s*\d+\.\s*(.*)", suggestions_text, re.MULTILINE) |
|
return [s.strip() for s in suggestions] |
|
|
|
def llm_generate_analysis_plan_with_history(llm_model, natural_language_query, chat_history): |
|
""" |
|
Generates a complete analysis plan from a user query, considering chat history. |
|
""" |
|
prompt = get_analysis_plan_prompt(natural_language_query, chat_history) |
|
try: |
|
response = llm_model.generate_content(prompt) |
|
cleaned_text = re.sub(r'```json\s*|\s*```', '', response.text, flags=re.MULTILINE | re.DOTALL).strip() |
|
plan = json.loads(cleaned_text) |
|
return plan |
|
except Exception as e: |
|
raw_response_text = response.text if 'response' in locals() else 'N/A' |
|
print(f"Error in llm_generate_analysis_plan_with_history: {e}\nRaw Response:\n{raw_response_text}") |
|
return None |
|
|
|
def execute_quantitative_query(solr_client, plan): |
|
"""Executes the facet query to get aggregate data.""" |
|
if not plan or 'quantitative_request' not in plan or 'json.facet' not in plan.get('quantitative_request', {}): |
|
return None |
|
try: |
|
params = { |
|
"q": plan.get('query_filter', '*_*'), |
|
"rows": 0, |
|
"json.facet": json.dumps(plan['quantitative_request']['json.facet']) |
|
} |
|
results = solr_client.search(**params) |
|
return results.raw_response.get("facets", {}) |
|
except Exception as e: |
|
print(f"Error in quantitative query: {e}") |
|
return None |
|
|
|
def execute_qualitative_query(solr_client, plan): |
|
"""Executes the grouping query to get the best example docs.""" |
|
if not plan or 'qualitative_request' not in plan: |
|
return None |
|
try: |
|
qual_request = copy.deepcopy(plan['qualitative_request']) |
|
params = { |
|
"q": plan.get('query_filter', '*_*'), |
|
"rows": 3, |
|
"fl": "*,score", |
|
**qual_request |
|
} |
|
results = solr_client.search(**params) |
|
return results.grouped |
|
except Exception as e: |
|
print(f"Error in qualitative query: {e}") |
|
return None |
|
|
|
def llm_synthesize_enriched_report_stream(llm_model, query, quantitative_data, qualitative_data, plan): |
|
""" |
|
Generates an enriched report by synthesizing quantitative aggregates |
|
and qualitative examples, and streams the result. |
|
""" |
|
prompt = get_synthesis_report_prompt(query, quantitative_data, qualitative_data, plan) |
|
try: |
|
response_stream = llm_model.generate_content(prompt, stream=True) |
|
for chunk in response_stream: |
|
yield chunk.text |
|
except Exception as e: |
|
print(f"Error in llm_synthesize_enriched_report_stream: {e}") |
|
yield "Sorry, I was unable to generate a report for this data." |
|
|
|
def llm_generate_visualization_code(llm_model, query_context, facet_data): |
|
"""Generates Python code for visualization based on query and data.""" |
|
prompt = get_visualization_code_prompt(query_context, facet_data) |
|
try: |
|
generation_config = genai.types.GenerationConfig(temperature=0, max_output_tokens=2048) |
|
response = llm_model.generate_content(prompt, generation_config=generation_config) |
|
code = re.sub(r'^```python\s*|```$', '', response.text, flags=re.MULTILINE) |
|
return code |
|
except Exception as e: |
|
print(f"Error in llm_generate_visualization_code: {e}\nRaw response: {response.text}") |
|
return None |
|
|
|
def execute_viz_code_and_get_path(viz_code, facet_data): |
|
"""Executes visualization code and returns the path to the saved plot image.""" |
|
if not viz_code: return None |
|
try: |
|
if not os.path.exists('/tmp/plots'): os.makedirs('/tmp/plots') |
|
plot_path = f"/tmp/plots/plot_{datetime.datetime.now().timestamp()}.png" |
|
exec_globals = {'facet_data': facet_data, 'plt': plt, 'sns': sns, 'pd': pd} |
|
exec(viz_code, exec_globals) |
|
fig = exec_globals.get('fig') |
|
if fig: |
|
fig.savefig(plot_path, bbox_inches='tight') |
|
plt.close(fig) |
|
return plot_path |
|
return None |
|
except Exception as e: |
|
print(f"ERROR executing visualization code: {e}\n---Code---\n{viz_code}") |
|
return None |
|
|
|
|