LinkedinMonitor / chatbot_handler.py
GuglielmoTor's picture
Update chatbot_handler.py
24f43be verified
raw
history blame
10.6 kB
# chatbot_handler.py
import logging
import json
from google import genai
from google.genai import types as genai_types # Import types for GenerateContentConfig
import os
import asyncio
# Gemini API key configuration
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', '')
client = None
model_name = "gemini-1.5-flash-latest" # Using a more recent Flash model
# model_name = "gemini-2.0-flash" # As per user's documentation snippet, ensure this model is available with their API key type
# This will be used to create genai_types.GenerateContentConfig
generation_config_params = {
"temperature": 0.7,
"top_p": 1,
"top_k": 1,
"max_output_tokens": 2048,
}
# Safety settings list
common_safety_settings = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
]
try:
if GEMINI_API_KEY:
# Initialize client using genai.Client as per user's documentation and error
client = genai.Client(api_key=GEMINI_API_KEY)
logging.info(f"Gemini client (genai.Client) initialized. Target model for generation: '{model_name}'")
else:
logging.error("Gemini API Key is not set.")
except Exception as e:
logging.error(f"Failed to initialize Gemini client (genai.Client): {e}", exc_info=True)
def format_history_for_gemini(gradio_chat_history: list) -> list:
"""Converts Gradio chat history to Gemini content format."""
gemini_contents = []
for msg in gradio_chat_history:
role = "user" if msg.get("role") == "user" else "model"
content = msg.get("content")
if isinstance(content, str):
gemini_contents.append({"role": role, "parts": [{"text": content}]})
elif isinstance(content, list) and len(content) > 0 and isinstance(content[0], dict) and "type" in content[0]:
parts = []
for part_item in content:
if part_item.get("type") == "text":
parts.append({"text": part_item.get("text", "")})
if parts:
gemini_contents.append({"role": role, "parts": parts})
else:
logging.warning(f"Skipping complex but empty content part in chat history: {content}")
else:
logging.warning(f"Skipping non-string/non-standard content in chat history: {content}")
# For the older client.models.generate_content, the 'contents' is typically a list of strings or multimodal parts,
# not a list of role-based dicts. The role-based dicts are for chat history with newer .start_chat().send_message().
# The user's example shows: contents=["Explain how AI works"]
# If the history is to be used, it needs to be formatted as a flat list of alternating user/model prompts for some older chat patterns,
# or the API might only take the latest user message if not using a dedicated chat session object.
# Given the `client.models.generate_content` structure, we might need to adjust how history is passed.
# For now, let's assume gemini_formatted_history is what `contents` expects, or it should be just the latest user message.
# The documentation for client.models.generate_content shows `contents` can be a list of parts.
# Let's re-evaluate: if chat_history_for_plot is a list of {"role": ..., "parts": ...},
# client.models.generate_content might expect `contents` to be just the parts of the last user message,
# or a more complex structure if it supports multi-turn via this method directly.
# The example `contents=[image, "Tell me about this instrument"]` suggests a list of content parts.
# Let's assume for now that the `gemini_formatted_history` (which is a list of {"role": ..., "parts": ...})
# is the correct format for the `contents` argument if the SDK version handles it.
# If not, this function or its usage in generate_llm_response will need adjustment.
# For a simple non-chat scenario, contents would be like: `[{"parts": [{"text": user_message}]}]`
# For a multi-turn conversation, the `contents` parameter for `generate_content`
# expects a list of `Content` objects (or dicts that can be cast to them).
# Each `Content` object has 'role' and 'parts'.
# So, the current `format_history_for_gemini` output *should* be correct.
return gemini_contents
async def generate_llm_response(user_message: str, plot_id: str, plot_label: str, chat_history_for_plot: list, plot_data_summary: str = None):
if not client:
logging.error("Gemini client (genai.Client) not initialized.")
return "The AI model is not available. Configuration error."
# gemini_formatted_history will be a list of {"role": ..., "parts": ...} dicts
gemini_formatted_history = format_history_for_gemini(chat_history_for_plot)
if not gemini_formatted_history: # Should not happen if chat_history_for_plot has at least one message
logging.error("Formatted history for Gemini is empty.")
return "There was an issue processing the conversation history (empty)."
# Ensure the last message has text if it's the only one (e.g. initial prompt)
if not any(part.get("text","").strip() for message in gemini_formatted_history for part in message.get("parts",[])):
logging.error("Formatted history for Gemini contains no text parts.")
return "There was an issue processing the conversation history for the AI model (empty text)."
try:
response = None
# We are now certain we need to use client.models.generate_content
if hasattr(client, 'models') and hasattr(client.models, 'generate_content'):
logging.debug(f"Using genai.Client.models.generate_content for model '{model_name}' (synchronous via asyncio.to_thread)")
# The model name for client.models.generate_content should not be prefixed with "models/"
# if it's like "gemini-1.5-flash-latest" or "gemini-2.0-flash".
# If your model_name is already "models/gemini-1.5-flash-latest", then it's fine.
# Let's assume model_name is like "gemini-1.5-flash-latest"
effective_model_name = model_name
if not model_name.startswith("models/"): # Ensure it's not like "models/models/gemini..."
effective_model_name = f"models/{model_name}" # Prepend "models/" if not already there
# Create the GenerateContentConfig object from our parameters
gen_config_obj = genai_types.GenerateContentConfig(**generation_config_params)
response = await asyncio.to_thread(
client.models.generate_content,
model=effective_model_name, # Pass the model name string
contents=gemini_formatted_history, # This should be the list of Content dicts
generation_config=gen_config_obj,
safety_settings=common_safety_settings
)
else:
logging.error(f"Gemini client (genai.Client) does not have 'models.generate_content' method. Type: {type(client)}")
return "AI model interaction error (SDK method not found)."
# Process response (this part should be largely consistent)
if hasattr(response, 'prompt_feedback') and response.prompt_feedback and response.prompt_feedback.block_reason:
reason = response.prompt_feedback.block_reason
reason_name = getattr(reason, 'name', str(reason)) # .name might not exist
logging.warning(f"Blocked by prompt feedback: {reason_name}")
return f"Blocked due to content policy: {reason_name}."
# The user's documentation example uses `response.text` directly.
# This implies the response object from `client.models.generate_content` might be simpler.
# Let's check for `response.text` first.
if hasattr(response, 'text') and response.text:
logging.debug("Response has a direct .text attribute.")
return response.text
# Fallback to candidates structure if .text is not available or empty
logging.debug("Response does not have a direct .text attribute or it's empty, checking candidates.")
if response.candidates and response.candidates[0].content and response.candidates[0].content.parts:
return "".join(part.text for part in response.candidates[0].content.parts if hasattr(part, 'text'))
finish_reason = "UNKNOWN"
if response.candidates and response.candidates[0].finish_reason:
finish_reason_val = response.candidates[0].finish_reason
finish_reason = getattr(finish_reason_val, 'name', str(finish_reason_val)) # .name might not exist
if not (hasattr(response, 'text') and response.text) and \
not (response.candidates and response.candidates[0].content and response.candidates[0].content.parts):
logging.warning(f"No content parts in response and no direct .text. Finish reason: {finish_reason}")
if finish_reason == "SAFETY": # Or other relevant finish reasons
return f"Response generation stopped due to safety reasons. Finish reason: {finish_reason}."
return f"The AI model returned an empty response. Finish reason: {finish_reason}."
# If we reach here, it means .text was empty and candidates structure was also empty/problematic
return f"Unexpected response structure from AI model (checked .text and .candidates). Finish reason: {finish_reason}."
except AttributeError as ae:
logging.error(f"AttributeError during Gemini call for plot '{plot_label}': {ae}", exc_info=True)
return f"AI model error (Attribute): {type(ae).__name__} - {ae}."
except Exception as e:
logging.error(f"Error generating response for plot '{plot_label}': {e}", exc_info=True)
# Check for specific API errors if possible
if "API_KEY_INVALID" in str(e) or "API key not valid" in str(e):
return "AI model error: API key is not valid. Please check configuration."
if "400" in str(e) and "model" in str(e).lower() and "not found" in str(e).lower(): # Example for model not found
return f"AI model error: Model '{model_name}' not found or not accessible with your API key."
return f"An unexpected error occurred while contacting the AI model: {type(e).__name__}."