|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
from flask import Blueprint, render_template, request, jsonify, send_from_directory |
|
from pathlib import Path |
|
import shutil |
|
import json |
|
|
|
import os |
|
import config |
|
import utils |
|
from llm_client import make_chat_completion_request, is_initialized as llm_is_initialized |
|
from cache_store import cache |
|
from cache_store import cache_directory |
|
import requests |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
main_bp = Blueprint('main', __name__) |
|
|
|
|
|
|
|
|
|
@main_bp.route('/download_cache') |
|
def download_cache_zip(): |
|
"""Zips the cache directory and serves it for download.""" |
|
zip_filename = "radexplain-cache.zip" |
|
|
|
|
|
temp_dir = "/tmp" |
|
zip_base_path = os.path.join(temp_dir, "radexplain-cache") |
|
zip_filepath = zip_base_path + ".zip" |
|
|
|
|
|
if not os.path.isdir(cache_directory): |
|
logger.error(f"Cache directory not found at {cache_directory}") |
|
return jsonify({"error": f"Cache directory not found on server: {cache_directory}"}), 500 |
|
|
|
try: |
|
logger.info(f"Creating zip archive of cache directory: {cache_directory} to {zip_filepath}") |
|
shutil.make_archive( |
|
zip_base_path, |
|
"zip", |
|
cache_directory, |
|
) |
|
logger.info("Zip archive created successfully.") |
|
|
|
return send_from_directory(temp_dir, zip_filename, as_attachment=True) |
|
except Exception as e: |
|
logger.error(f"Error creating or sending zip archive of cache directory: {e}", exc_info=True) |
|
return jsonify({"error": f"Error creating or sending zip archive: {e}"}), 500 |
|
@main_bp.route('/') |
|
def index(): |
|
"""Serves the main HTML page.""" |
|
|
|
|
|
|
|
if not config.AVAILABLE_REPORTS: |
|
logger.warning("No reports found in config. AVAILABLE_REPORTS is empty.") |
|
|
|
return render_template( |
|
'index.html', |
|
available_reports=config.AVAILABLE_REPORTS |
|
) |
|
|
|
@main_bp.route('/get_report_details/<report_name>') |
|
def get_report_details(report_name): |
|
"""Fetches the text content and image path for a given report name.""" |
|
selected_report_info = next((item for item in config.AVAILABLE_REPORTS if item['name'] == report_name), None) |
|
|
|
if not selected_report_info: |
|
logger.error(f"Report '{report_name}' not found when fetching details.") |
|
return jsonify({"error": f"Report '{report_name}' not found."}), 404 |
|
|
|
report_file = selected_report_info.get('report_file') |
|
image_file = selected_report_info.get('image_file') |
|
|
|
report_text_content = "" |
|
|
|
if report_file: |
|
actual_server_report_path = config.BASE_DIR / report_file |
|
|
|
try: |
|
report_text_content = actual_server_report_path.read_text(encoding='utf-8').strip() |
|
except Exception as e: |
|
logger.error(f"Error reading report file {actual_server_report_path} for report '{report_name}': {e}", exc_info=True) |
|
return jsonify({"error": "Error reading report file."}), 500 |
|
|
|
|
|
image_type_from_config = selected_report_info.get('image_type') |
|
display_image_type = 'Chest X-Ray' if image_type_from_config == 'CXR' else ('CT' if image_type_from_config == 'CT' else 'Medical Image') |
|
|
|
return jsonify({"text": report_text_content, "image_file": image_file, "image_type": display_image_type}) |
|
|
|
|
|
|
|
@main_bp.route('/explain', methods=['POST']) |
|
def explain_sentence(): |
|
"""Handles the explanation request using LLM API with base64 encoded image.""" |
|
if not llm_is_initialized(): |
|
logger.error("LLM client (REST API) not initialized. Cannot process request.") |
|
return jsonify({"error": "LLM client (REST API) not initialized. Check API key and base URL."}), 500 |
|
|
|
data = request.get_json() |
|
if not data or 'sentence' not in data or 'report_name' not in data: |
|
logger.warning("Missing 'sentence' or 'report_name' in request payload.") |
|
return jsonify({"error": "Missing 'sentence' or 'report_name' in request"}), 400 |
|
|
|
selected_sentence = data['sentence'] |
|
report_name = data['report_name'] |
|
logger.info(f"Received request to explain: '{selected_sentence}' for report: '{report_name}'") |
|
|
|
|
|
selected_report_info = next((item for item in config.AVAILABLE_REPORTS if item['name'] == report_name), None) |
|
|
|
if not selected_report_info: |
|
logger.error(f"Report '{report_name}' not found in available reports.") |
|
return jsonify({"error": f"Report '{report_name}' not found."}), 404 |
|
|
|
image_file = selected_report_info.get('image_file') |
|
report_file = selected_report_info.get('report_file') |
|
image_type = selected_report_info.get('image_type') |
|
|
|
if not image_file: |
|
logger.error(f"Image or report file path (relative to static) missing in config for report '{report_name}'.") |
|
return jsonify({"error": f"File configuration missing for report '{report_name}'."}), 500 |
|
|
|
|
|
server_image_path = config.BASE_DIR / image_file |
|
|
|
|
|
|
|
if not server_image_path.is_file(): |
|
logger.error(f"Image file not found at {server_image_path}") |
|
return jsonify({"error": f"Image file for report '{report_name}' not found on server."}), 500 |
|
|
|
base64_image_data_url = utils.image_to_base64_data_url(str(server_image_path)) |
|
if not base64_image_data_url: |
|
logger.error("Failed to encode image to base64.") |
|
return jsonify({"error": "Could not encode image for API request"}), 500 |
|
|
|
logger.info("Image successfully encoded to base64 data URL for API.") |
|
|
|
full_report_text = "" |
|
if report_file: |
|
server_report_path = config.BASE_DIR / report_file |
|
try: |
|
full_report_text = server_report_path.read_text(encoding='utf-8') |
|
except FileNotFoundError: |
|
logger.error(f"Report file not found at {server_report_path}") |
|
return jsonify({"error": f"Report file for '{report_name}' not found on server."}), 500 |
|
except Exception as e: |
|
logger.error(f"Error reading report file {server_report_path}: {e}", exc_info=True) |
|
return jsonify({"error": "Error reading report file."}), 500 |
|
else: |
|
logger.info(f"No report file configured for report '{report_name}'. Proceeding without full report text for system prompt.") |
|
|
|
system_prompt = ( |
|
"You are a public-facing clinician. " |
|
f"A learning user has provided a sentence from a radiology report and is viewing the accompanying {image_type} image. " |
|
"Your task is to explain the meaning of ONLY the provided sentence in simple, clear terms. Explain terminology and abbriviations. Keep it concise. " |
|
"Directly address the meaning of the sentence. Do not use introductory phrases like 'Okay' or refer to the sentence itself or the report itself (e.g., 'This sentence means...'). " |
|
f"{f'Crucially, since the user is looking at their {image_type} image, provide guidance on where to look on the image to understand your explanation, if applicable. ' if image_type != 'CT' else ''}" |
|
"Do not discuss any other part of the report or any sentences not explicitly provided by the user. Stick to facts in the text. Do not infer anything. \n" |
|
"===\n" |
|
f"For context, the full REPORT is:\n{full_report_text}" |
|
) |
|
user_prompt_text = f"Explain this sentence from the radiology report: '{selected_sentence}'" |
|
|
|
messages_for_api = [ |
|
{"role": "system", "content": system_prompt}, |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": user_prompt_text} |
|
] |
|
} |
|
] |
|
|
|
cache_key = f"explain::{report_name}::{selected_sentence}" |
|
cached_result = cache.get(cache_key) |
|
if cached_result: |
|
logger.info("Returning cached explanation.") |
|
return jsonify({"explanation": cached_result}) |
|
|
|
try: |
|
logger.info("Sending request to LLM API (REST) with base64 image...") |
|
response = make_chat_completion_request( |
|
model="tgi", |
|
messages=messages_for_api, |
|
top_p=None, |
|
temperature=0, |
|
max_tokens=250, |
|
stream=True, |
|
seed=None, |
|
stop=None, |
|
frequency_penalty=None, |
|
presence_penalty=None |
|
) |
|
logger.info("Received response stream from LLM API (REST).") |
|
|
|
explanation_parts = [] |
|
for line in response.iter_lines(): |
|
if line: |
|
decoded_line = line.decode('utf-8') |
|
if decoded_line.startswith('data: '): |
|
json_data_str = decoded_line[len('data: '):].strip() |
|
if json_data_str == "[DONE]": |
|
break |
|
try: |
|
chunk = json.loads(json_data_str) |
|
if chunk.get("choices") and chunk["choices"][0].get("delta") and chunk["choices"][0]["delta"].get("content"): |
|
explanation_parts.append(chunk["choices"][0]["delta"]["content"]) |
|
except json.JSONDecodeError: |
|
logger.warning(f"Could not decode JSON from stream chunk: {json_data_str}") |
|
|
|
elif decoded_line.strip() == "[DONE]": |
|
break |
|
|
|
explanation = "".join(explanation_parts).strip() |
|
if explanation: |
|
cache.set(cache_key, explanation, expire=None) |
|
|
|
logger.info("Explanation generated successfully." if explanation else "Empty explanation from API.") |
|
return jsonify({"explanation": explanation or "No explanation content received from the API."}) |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Error during LLM API (REST) call: {e}", exc_info=True) |
|
user_error_message = ("Failed to generate explanation. The service might be temporarily unavailable " |
|
"and is now likely starting up. Please try again in a few moments.") |
|
return jsonify({"error": user_error_message}), 500 |
|
|