Spaces:
Running
Running
# agents/task_extraction_agent.py | |
import logging | |
from typing import Optional | |
from datetime import datetime, date # Ensure date is imported if used for type hints | |
from google.adk.agents import LlmAgent | |
from google.adk.runners import InMemoryRunner # Assuming this is used for direct agent running | |
from google.genai import types as genai_types # For constructing ADK agent inputs | |
# Project-specific imports | |
from data_models.tasks import ( | |
TaskExtractionOutput, | |
OKR, | |
KeyResult, | |
Task, | |
EffortLevel, | |
TimelineCategory, | |
PriorityLevel, | |
TaskType, | |
DataSubject # Ensure all are imported | |
) | |
from utils.retry_mechanism import RetryMechanism # If retries are needed for ADK calls | |
# Configure logger for this module | |
logger = logging.getLogger(__name__) | |
DEFAULT_AGENT_MODEL = "gemini-2.5-flash-preview-05-20" # Or your specific model | |
class TaskExtractionAgent: | |
""" | |
Agent specialized in extracting actionable tasks and OKRs from analysis insights, | |
with awareness of the current date and quarter. | |
""" | |
AGENT_NAME = "task_extractor" | |
AGENT_DESCRIPTION = "Specialist in converting strategic insights into specific, time-aware actionable tasks and OKRs." | |
def __init__(self, api_key: str, model_name: Optional[str] = None, current_date: Optional[date] = None): | |
""" | |
Initializes the TaskExtractionAgent. | |
Args: | |
api_key: API key (may be used by LlmAgent configuration or future needs). | |
model_name: Name of the language model to use. | |
current_date: The current date to use for quarter calculations. Defaults to today. | |
""" | |
self.api_key = api_key # Store if needed by LlmAgent or other components | |
self.model_name = model_name or DEFAULT_AGENT_MODEL | |
self.current_date = current_date or datetime.utcnow().date() # Use date object for consistency | |
# LlmAgent is initialized with dynamic instruction and output schema | |
self.agent = LlmAgent( | |
name=self.AGENT_NAME, | |
model=self.model_name, | |
description=self.AGENT_DESCRIPTION, | |
instruction=self._get_instruction_prompt(), # Instruction generated dynamically | |
output_schema=TaskExtractionOutput, # Pydantic model for structured output | |
output_key="extracted_tasks_okrs" # Key where LlmAgent stores structured output in state | |
) | |
self.retry_mechanism = RetryMechanism() # For retrying ADK runner if needed | |
logger.info(f"{self.AGENT_NAME} initialized for Q{self._get_quarter(self.current_date)}, " | |
f"{self._days_until_quarter_end(self.current_date)} days remaining in quarter. Model: {self.model_name}") | |
def _get_quarter(self, d: date) -> int: | |
"""Calculates the quarter for a given date.""" | |
return (d.month - 1) // 3 + 1 | |
def _days_until_quarter_end(self, d: date) -> int: | |
"""Calculates the number of days remaining in the current quarter from date d.""" | |
current_q = self._get_quarter(d) | |
year = d.year | |
if current_q == 1: | |
quarter_end_date = date(year, 3, 31) | |
elif current_q == 2: | |
quarter_end_date = date(year, 6, 30) | |
elif current_q == 3: | |
quarter_end_date = date(year, 9, 30) | |
else: # Quarter 4 | |
quarter_end_date = date(year, 12, 31) | |
days_remaining = (quarter_end_date - d).days | |
return max(0, days_remaining) # Ensure non-negative | |
def _get_instruction_prompt(self) -> str: | |
"""Generates the dynamic instruction string for the LlmAgent.""" | |
quarter = self._get_quarter(self.current_date) | |
days_remaining = self._days_until_quarter_end(self.current_date) | |
# Dynamically include Pydantic model field descriptions for better LLM guidance | |
# This part can be complex if done fully automatically. For now, manually summarizing key fields. | |
task_fields_summary = ( | |
"Each Task must include: task_category (e.g., Content Strategy), task_description, " | |
"objective_deliverable, effort (Small, Medium, Large), timeline (Immediate, Short-term, Medium-term, Long-term), " | |
"responsible_party, success_criteria_metrics, dependencies_prerequisites (optional), " | |
"priority (High, Medium, Low) with priority_justification, why_proposed (linking to analysis), " | |
"task_type (initiative or tracking), data_subject (for tracking tasks: follower_stats, posts, mentions, general)." | |
) | |
return f""" | |
You are a Time-Aware Task Extraction Specialist. Your primary function is to meticulously analyze strategic insights | |
derived from LinkedIn analytics and transform them into a structured set of actionable tasks. These tasks should be | |
organized within an Objectives and Key Results (OKRs) framework. | |
CURRENT CONTEXTUAL INFORMATION (DO NOT CHANGE THIS IN YOUR OUTPUT): | |
- Current Quarter: Q{quarter} | |
- Days remaining in current quarter: {days_remaining} | |
- Today's Date (for context): {self.current_date.isoformat()} | |
YOUR MANDATE: | |
1. Define clear, aspirational Objectives (qualitative goals). | |
2. For each Objective, formulate 2-3 specific, measurable Key Results. | |
3. Under each Key Result, list detailed, actionable Tasks required to achieve it. | |
4. CRITICAL: Each Task MUST strictly adhere to the 'Task' Pydantic model fields. This means providing values for ALL required fields: {task_fields_summary} | |
5. Task Timelines: Must be realistic given the {days_remaining} days left in Q{quarter}. Prioritize actions that can make significant progress or be completed within this timeframe. Use TimelineCategory enum values. | |
6. Task Types: Clearly distinguish between 'initiative' (new actions/projects) and 'tracking' (ongoing monitoring/measurement). | |
7. Data Subjects: For 'tracking' tasks, accurately specify the relevant 'data_subject'. For 'initiative' tasks, this can be 'general' or null if not specific to one data type. | |
8. Rationale ('why_proposed'): This is crucial. Each task's proposal must be explicitly justified by and linked back to specific findings, trends, or recommendations mentioned in the input 'comprehensive_analysis'. | |
9. Priority: Assign a priority (High, Medium, Low) to each task and provide a 'priority_justification'. | |
INPUT: You will receive a 'comprehensive_analysis' text. | |
OUTPUT FORMAT: | |
You MUST return a single JSON object that strictly conforms to the 'TaskExtractionOutput' Pydantic schema. | |
This JSON object will contain: | |
- 'current_quarter_info': A string exactly like "Q{quarter}, {days_remaining} days remaining". (This is fixed based on the context above). | |
- 'okrs': A list, where each item is an 'OKR' object. | |
- 'overall_strategic_focus': (Optional) A brief summary of the main strategic themes emerging from the OKRs. | |
- 'generation_timestamp': (This will be auto-filled if you conform to the schema, or you can provide an ISO timestamp). | |
Example of a Task (ensure all fields from the Pydantic model are covered): | |
{{ | |
"task_category": "Content Creation", | |
"task_description": "Launch a 3-part blog series on AI in Marketing.", | |
"objective_deliverable": "Objective: Increase thought leadership in AI marketing. Deliverable: 3 published blog posts.", | |
"effort": "Medium", | |
"timeline": "Short-term", | |
"responsible_party": "Content Team Lead", | |
"success_criteria_metrics": "Average 500 views per post, 10+ shares per post.", | |
"dependencies_prerequisites": "Keyword research for AI marketing topics completed.", | |
"priority": "High", | |
"priority_justification": "Addresses key strategic goal of establishing AI expertise.", | |
"why_proposed": "Analysis highlighted a gap in content related to AI, a high-interest area for our target audience.", | |
"task_type": "initiative", | |
"data_subject": "general" | |
}} | |
Focus on quality, actionability, and strict adherence to the output schema. | |
""" | |
async def extract_tasks(self, comprehensive_analysis: str) -> TaskExtractionOutput: | |
""" | |
Extracts time-aware actionable tasks from the comprehensive analysis text. | |
Args: | |
comprehensive_analysis: The text analysis from which to extract tasks. | |
Returns: | |
A TaskExtractionOutput Pydantic model instance. | |
""" | |
if not comprehensive_analysis or not comprehensive_analysis.strip(): | |
logger.warning("Comprehensive analysis text is empty. Cannot extract tasks.") | |
return TaskExtractionOutput( | |
current_quarter_info=f"Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining", | |
okrs=[], | |
overall_strategic_focus="No analysis provided to extract tasks." | |
) | |
# The LlmAgent's instruction already contains the dynamic date info and output format. | |
# The input to the agent's run method will be the comprehensive_analysis. | |
prompt_for_adk_agent = f""" | |
Comprehensive Analysis for Task Extraction: | |
--- | |
{comprehensive_analysis} | |
--- | |
Based on the analysis above, and adhering strictly to your primary instructions (especially regarding current quarter context, task field requirements, and JSON output schema 'TaskExtractionOutput'), generate the OKRs and tasks. | |
Ensure the 'current_quarter_info' field in your JSON output is exactly: "Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining". | |
""" | |
user_input_content = genai_types.Content( | |
role="user", | |
parts=[genai_types.Part(text=prompt_for_adk_agent)] | |
) | |
# Using InMemoryRunner as per original structure for LlmAgent with output_schema | |
runner = InMemoryRunner(agent=self.agent, app_name=f"{self.AGENT_NAME}Runner") | |
# Generate a unique user_id for each run to ensure fresh session state if needed. | |
user_id = f"system_user_task_extractor_{int(datetime.utcnow().timestamp())}" | |
session = await runner.session_service.create_session( | |
app_name=f"{self.AGENT_NAME}Runner", | |
user_id=user_id | |
) | |
extracted_data_dict = None | |
full_response_text_for_debug = "" # To capture raw text if parsing fails | |
try: | |
logger.info(f"Running TaskExtractionAgent for user_id: {user_id}, session_id: {session.id}") | |
async for event in runner.run( | |
user_id=user_id, | |
session_id=session.id, | |
new_message=user_input_content | |
): | |
# LlmAgent with output_schema stores the result in event.actions.state_delta[output_key] | |
if (hasattr(event, 'actions') and event.actions and | |
hasattr(event.actions, 'state_delta') and | |
isinstance(event.actions.state_delta, dict) and | |
self.agent.output_key in event.actions.state_delta): | |
extracted_data_dict = event.actions.state_delta[self.agent.output_key] | |
logger.info(f"Successfully extracted structured data via LlmAgent state_delta.") | |
break # Assuming full structured output comes in one event with state_delta | |
# Capture text parts for debugging if direct structured output isn't found first | |
if hasattr(event, 'content') and event.content and event.content.parts: | |
for part in event.content.parts: | |
if hasattr(part, 'text'): | |
full_response_text_for_debug += part.text | |
if not extracted_data_dict and full_response_text_for_debug: | |
logger.warning("LlmAgent did not produce structured output in state_delta. Raw text response was: %s", | |
full_response_text_for_debug[:500] + "...") # Log snippet | |
# Attempt to parse the raw text if it looks like JSON (fallback, not ideal) | |
# This is a basic fallback; robust JSON cleaning might be needed if LLM doesn't adhere to schema. | |
# For now, we rely on LlmAgent's output_schema handling. | |
except Exception as e: | |
logger.error(f"Error during TaskExtractionAgent execution: {e}", exc_info=True) | |
# Fallback to returning an empty TaskExtractionOutput with error info | |
return TaskExtractionOutput( | |
current_quarter_info=f"Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining", | |
okrs=[], | |
overall_strategic_focus=f"Error during task extraction: {e}", | |
generation_timestamp=datetime.utcnow().isoformat() | |
) | |
finally: | |
try: | |
await runner.session_service.delete_session( | |
app_name=f"{self.AGENT_NAME}Runner", user_id=user_id, session_id=session.id | |
) | |
except Exception as session_del_e: | |
logger.error(f"Error deleting task extractor session: {session_del_e}") | |
if extracted_data_dict: | |
if isinstance(extracted_data_dict, TaskExtractionOutput): # Already a Pydantic model | |
return extracted_data_dict | |
elif isinstance(extracted_data_dict, dict): # If it's a dict, parse it | |
try: | |
return TaskExtractionOutput(**extracted_data_dict) | |
except Exception as pydantic_error: | |
logger.error(f"Error parsing extracted dictionary into TaskExtractionOutput: {pydantic_error}", exc_info=True) | |
logger.error(f"Problematic dictionary data: {extracted_data_dict}") | |
else: | |
logger.error(f"Extracted data is not a dictionary or TaskExtractionOutput model: {type(extracted_data_dict)}") | |
# Fallback if no valid data extracted | |
logger.warning("No valid structured data extracted by TaskExtractionAgent.") | |
return TaskExtractionOutput( | |
current_quarter_info=f"Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining", | |
okrs=[], | |
overall_strategic_focus="Failed to extract tasks or no tasks were identified.", | |
generation_timestamp=datetime.utcnow().isoformat() | |
) | |
def update_current_date(self, new_date: date): | |
""" | |
Updates the current date for the agent and re-initializes the LlmAgent | |
to reflect the new date context in its instructions. | |
""" | |
self.current_date = new_date | |
# Re-initialize the LlmAgent with the new instruction based on the new date | |
self.agent = LlmAgent( | |
name=self.AGENT_NAME, | |
model=self.model_name, | |
description=self.AGENT_DESCRIPTION, | |
instruction=self._get_instruction_prompt(), # Get updated instruction | |
output_schema=TaskExtractionOutput, | |
output_key="extracted_tasks_okrs" | |
) | |
logger.info(f"{self.AGENT_NAME} date updated. New context: Q{self._get_quarter(self.current_date)}, " | |
f"{self._days_until_quarter_end(self.current_date)} days remaining.") | |
if __name__ == '__main__': | |
import asyncio | |
# (Ensure logging_config.py is in the same directory or PYTHONPATH is set for this example to run standalone) | |
try: | |
from utils.logging_config import setup_logging | |
setup_logging() | |
logger.info("Logging setup for TaskExtractionAgent test.") | |
except ImportError: | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
logger.warning("logging_config.py not found, using basicConfig for logging.") | |
MOCK_API_KEY = os.environ.get("GOOGLE_API_KEY", "test_api_key_task_extractor") # Use your actual key or env var | |
MODEL_NAME = DEFAULT_AGENT_MODEL | |
# Example comprehensive analysis text (replace with actual analysis output) | |
sample_analysis_text = """ | |
Overall Summary: Follower growth is steady at 5% MoM. Post engagement is highest for video content | |
(avg 8% engagement rate) published on weekdays. However, mentions sentiment dipped in the last month | |
(-0.2 avg score) due to complaints about customer service response times. | |
Key opportunity: Improve customer service communication and leverage video content more effectively. | |
Strategic Recommendation: Launch a 'Customer First' initiative and create a video series showcasing customer success stories. | |
""" | |
# Test with a specific date | |
test_date = date(2025, 4, 15) # Example: Mid-Q2 2025 | |
task_agent = TaskExtractionAgent(api_key=MOCK_API_KEY, model_name=MODEL_NAME, current_date=test_date) | |
logger.info(f"Task Agent Instruction for test_date {test_date}:\n{task_agent._get_instruction_prompt()[:500]}...") | |
async def run_extraction(): | |
logger.info("Extracting tasks from sample analysis...") | |
# In a real scenario, ensure GOOGLE_API_KEY is set if the LlmAgent makes actual calls. | |
# For local tests without real API calls, the LlmAgent might behave as a mock or require specific test setup. | |
if MOCK_API_KEY == "test_api_key_task_extractor": | |
logger.warning("Using a mock API key. LlmAgent behavior might be limited or mocked for task extraction.") | |
# Mock the runner if no real API call should be made | |
class MockADKRunner: | |
def __init__(self, agent, app_name): self.agent = agent | |
async def session_service_create_session(self, app_name, user_id): | |
class MockSession: id = "mock_session_id" | |
return MockSession() | |
async def run(self, user_id, session_id, new_message): | |
# Simulate a response structure | |
mock_okr = OKR( | |
objective_description="Improve Customer Satisfaction", | |
key_results=[KeyResult( | |
key_result_description="Reduce negative mentions by 10%", | |
tasks=[Task( | |
task_category="Customer Service", task_description="Respond to all negative mentions within 2 hours.", | |
objective_deliverable="Improved response time.", effort=EffortLevel.MEDIUM, timeline=TimelineCategory.IMMEDIATE, | |
responsible_party="Support Team", success_criteria_metrics="Avg response time < 2hrs.", | |
priority=PriorityLevel.HIGH, priority_justification="Critical for reputation.", | |
why_proposed="Analysis showed dip in sentiment due to slow responses.", task_type=TaskType.INITIATIVE, | |
data_subject=DataSubject.MENTIONS | |
)] | |
)], | |
objective_timeline=TimelineCategory.SHORT_TERM | |
) | |
mock_output = TaskExtractionOutput( | |
current_quarter_info=f"Q{task_agent._get_quarter(task_agent.current_date)}, {task_agent._days_until_quarter_end(task_agent.current_date)} days remaining", | |
okrs=[mock_okr], | |
overall_strategic_focus="Focus on customer service improvement." | |
) | |
# Simulate the event structure LlmAgent with output_schema would produce | |
class MockEvent: | |
def __init__(self): | |
self.actions = type('Actions', (), {'state_delta': {task_agent.agent.output_key: mock_output.model_dump()}})() # .model_dump() for Pydantic v2 | |
yield MockEvent() | |
async def session_service_delete_session(self, app_name, user_id, session_id): pass | |
# Monkey patch the InMemoryRunner for this test if using mock key | |
global InMemoryRunner | |
OriginalInMemoryRunner = InMemoryRunner | |
InMemoryRunner = MockADKRunner | |
extracted_okrs_output = await task_agent.extract_tasks(sample_analysis_text) | |
# Restore InMemoryRunner if it was patched | |
if MOCK_API_KEY == "test_api_key_task_extractor" and 'OriginalInMemoryRunner' in globals(): | |
InMemoryRunner = OriginalInMemoryRunner | |
print("\n--- TaskExtractionAgent Results ---") | |
if extracted_okrs_output: | |
print(f"Current Quarter Info: {extracted_okrs_output.current_quarter_info}") | |
print(f"Overall Strategic Focus: {extracted_okrs_output.overall_strategic_focus}") | |
print(f"Generated Timestamp: {extracted_okrs_output.generation_timestamp}") | |
print("\nOKRs Extracted:") | |
# Use .model_dump_json() for Pydantic v2 for pretty printing | |
print(extracted_okrs_output.model_dump_json(indent=2)) | |
else: | |
print("No OKRs extracted or an error occurred.") | |
if __name__ == '__main__': # This check is technically inside another if __name__ == '__main__' | |
asyncio.run(run_extraction()) | |
# Example of updating date | |
logger.info("\n--- Updating date for Task Agent ---") | |
new_test_date = date(2025, 10, 5) # Q4 | |
task_agent.update_current_date(new_test_date) | |
# The instruction within task_agent.agent is now updated. | |
# logger.info(f"Task Agent NEW Instruction for test_date {new_test_date}:\n{task_agent.agent.instruction[:500]}...") | |
# A new call to extract_tasks would use this updated context. | |