LinkedinMonitor / insight_and_tasks /agents /task_extraction_agent.py
GuglielmoTor's picture
Update insight_and_tasks/agents/task_extraction_agent.py
c7fbc19 verified
raw
history blame
22 kB
# agents/task_extraction_agent.py
import logging
from typing import Optional
from datetime import datetime, date # Ensure date is imported if used for type hints
from google.adk.agents import LlmAgent
from google.adk.runners import InMemoryRunner # Assuming this is used for direct agent running
from google.genai import types as genai_types # For constructing ADK agent inputs
# Project-specific imports
from data_models.tasks import (
TaskExtractionOutput,
OKR,
KeyResult,
Task,
EffortLevel,
TimelineCategory,
PriorityLevel,
TaskType,
DataSubject # Ensure all are imported
)
from utils.retry_mechanism import RetryMechanism # If retries are needed for ADK calls
# Configure logger for this module
logger = logging.getLogger(__name__)
DEFAULT_AGENT_MODEL = "gemini-2.5-flash-preview-05-20" # Or your specific model
class TaskExtractionAgent:
"""
Agent specialized in extracting actionable tasks and OKRs from analysis insights,
with awareness of the current date and quarter.
"""
AGENT_NAME = "task_extractor"
AGENT_DESCRIPTION = "Specialist in converting strategic insights into specific, time-aware actionable tasks and OKRs."
def __init__(self, api_key: str, model_name: Optional[str] = None, current_date: Optional[date] = None):
"""
Initializes the TaskExtractionAgent.
Args:
api_key: API key (may be used by LlmAgent configuration or future needs).
model_name: Name of the language model to use.
current_date: The current date to use for quarter calculations. Defaults to today.
"""
self.api_key = api_key # Store if needed by LlmAgent or other components
self.model_name = model_name or DEFAULT_AGENT_MODEL
self.current_date = current_date or datetime.utcnow().date() # Use date object for consistency
# LlmAgent is initialized with dynamic instruction and output schema
self.agent = LlmAgent(
name=self.AGENT_NAME,
model=self.model_name,
description=self.AGENT_DESCRIPTION,
instruction=self._get_instruction_prompt(), # Instruction generated dynamically
output_schema=TaskExtractionOutput, # Pydantic model for structured output
output_key="extracted_tasks_okrs" # Key where LlmAgent stores structured output in state
)
self.retry_mechanism = RetryMechanism() # For retrying ADK runner if needed
logger.info(f"{self.AGENT_NAME} initialized for Q{self._get_quarter(self.current_date)}, "
f"{self._days_until_quarter_end(self.current_date)} days remaining in quarter. Model: {self.model_name}")
def _get_quarter(self, d: date) -> int:
"""Calculates the quarter for a given date."""
return (d.month - 1) // 3 + 1
def _days_until_quarter_end(self, d: date) -> int:
"""Calculates the number of days remaining in the current quarter from date d."""
current_q = self._get_quarter(d)
year = d.year
if current_q == 1:
quarter_end_date = date(year, 3, 31)
elif current_q == 2:
quarter_end_date = date(year, 6, 30)
elif current_q == 3:
quarter_end_date = date(year, 9, 30)
else: # Quarter 4
quarter_end_date = date(year, 12, 31)
days_remaining = (quarter_end_date - d).days
return max(0, days_remaining) # Ensure non-negative
def _get_instruction_prompt(self) -> str:
"""Generates the dynamic instruction string for the LlmAgent."""
quarter = self._get_quarter(self.current_date)
days_remaining = self._days_until_quarter_end(self.current_date)
# Dynamically include Pydantic model field descriptions for better LLM guidance
# This part can be complex if done fully automatically. For now, manually summarizing key fields.
task_fields_summary = (
"Each Task must include: task_category (e.g., Content Strategy), task_description, "
"objective_deliverable, effort (Small, Medium, Large), timeline (Immediate, Short-term, Medium-term, Long-term), "
"responsible_party, success_criteria_metrics, dependencies_prerequisites (optional), "
"priority (High, Medium, Low) with priority_justification, why_proposed (linking to analysis), "
"task_type (initiative or tracking), data_subject (for tracking tasks: follower_stats, posts, mentions, general)."
)
return f"""
You are a Time-Aware Task Extraction Specialist. Your primary function is to meticulously analyze strategic insights
derived from LinkedIn analytics and transform them into a structured set of actionable tasks. These tasks should be
organized within an Objectives and Key Results (OKRs) framework.
CURRENT CONTEXTUAL INFORMATION (DO NOT CHANGE THIS IN YOUR OUTPUT):
- Current Quarter: Q{quarter}
- Days remaining in current quarter: {days_remaining}
- Today's Date (for context): {self.current_date.isoformat()}
YOUR MANDATE:
1. Define clear, aspirational Objectives (qualitative goals).
2. For each Objective, formulate 2-3 specific, measurable Key Results.
3. Under each Key Result, list detailed, actionable Tasks required to achieve it.
4. CRITICAL: Each Task MUST strictly adhere to the 'Task' Pydantic model fields. This means providing values for ALL required fields: {task_fields_summary}
5. Task Timelines: Must be realistic given the {days_remaining} days left in Q{quarter}. Prioritize actions that can make significant progress or be completed within this timeframe. Use TimelineCategory enum values.
6. Task Types: Clearly distinguish between 'initiative' (new actions/projects) and 'tracking' (ongoing monitoring/measurement).
7. Data Subjects: For 'tracking' tasks, accurately specify the relevant 'data_subject'. For 'initiative' tasks, this can be 'general' or null if not specific to one data type.
8. Rationale ('why_proposed'): This is crucial. Each task's proposal must be explicitly justified by and linked back to specific findings, trends, or recommendations mentioned in the input 'comprehensive_analysis'.
9. Priority: Assign a priority (High, Medium, Low) to each task and provide a 'priority_justification'.
INPUT: You will receive a 'comprehensive_analysis' text.
OUTPUT FORMAT:
You MUST return a single JSON object that strictly conforms to the 'TaskExtractionOutput' Pydantic schema.
This JSON object will contain:
- 'current_quarter_info': A string exactly like "Q{quarter}, {days_remaining} days remaining". (This is fixed based on the context above).
- 'okrs': A list, where each item is an 'OKR' object.
- 'overall_strategic_focus': (Optional) A brief summary of the main strategic themes emerging from the OKRs.
- 'generation_timestamp': (This will be auto-filled if you conform to the schema, or you can provide an ISO timestamp).
Example of a Task (ensure all fields from the Pydantic model are covered):
{{
"task_category": "Content Creation",
"task_description": "Launch a 3-part blog series on AI in Marketing.",
"objective_deliverable": "Objective: Increase thought leadership in AI marketing. Deliverable: 3 published blog posts.",
"effort": "Medium",
"timeline": "Short-term",
"responsible_party": "Content Team Lead",
"success_criteria_metrics": "Average 500 views per post, 10+ shares per post.",
"dependencies_prerequisites": "Keyword research for AI marketing topics completed.",
"priority": "High",
"priority_justification": "Addresses key strategic goal of establishing AI expertise.",
"why_proposed": "Analysis highlighted a gap in content related to AI, a high-interest area for our target audience.",
"task_type": "initiative",
"data_subject": "general"
}}
Focus on quality, actionability, and strict adherence to the output schema.
"""
async def extract_tasks(self, comprehensive_analysis: str) -> TaskExtractionOutput:
"""
Extracts time-aware actionable tasks from the comprehensive analysis text.
Args:
comprehensive_analysis: The text analysis from which to extract tasks.
Returns:
A TaskExtractionOutput Pydantic model instance.
"""
if not comprehensive_analysis or not comprehensive_analysis.strip():
logger.warning("Comprehensive analysis text is empty. Cannot extract tasks.")
return TaskExtractionOutput(
current_quarter_info=f"Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining",
okrs=[],
overall_strategic_focus="No analysis provided to extract tasks."
)
# The LlmAgent's instruction already contains the dynamic date info and output format.
# The input to the agent's run method will be the comprehensive_analysis.
prompt_for_adk_agent = f"""
Comprehensive Analysis for Task Extraction:
---
{comprehensive_analysis}
---
Based on the analysis above, and adhering strictly to your primary instructions (especially regarding current quarter context, task field requirements, and JSON output schema 'TaskExtractionOutput'), generate the OKRs and tasks.
Ensure the 'current_quarter_info' field in your JSON output is exactly: "Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining".
"""
user_input_content = genai_types.Content(
role="user",
parts=[genai_types.Part(text=prompt_for_adk_agent)]
)
# Using InMemoryRunner as per original structure for LlmAgent with output_schema
runner = InMemoryRunner(agent=self.agent, app_name=f"{self.AGENT_NAME}Runner")
# Generate a unique user_id for each run to ensure fresh session state if needed.
user_id = f"system_user_task_extractor_{int(datetime.utcnow().timestamp())}"
session = await runner.session_service.create_session(
app_name=f"{self.AGENT_NAME}Runner",
user_id=user_id
)
extracted_data_dict = None
full_response_text_for_debug = "" # To capture raw text if parsing fails
try:
logger.info(f"Running TaskExtractionAgent for user_id: {user_id}, session_id: {session.id}")
async for event in runner.run(
user_id=user_id,
session_id=session.id,
new_message=user_input_content
):
# LlmAgent with output_schema stores the result in event.actions.state_delta[output_key]
if (hasattr(event, 'actions') and event.actions and
hasattr(event.actions, 'state_delta') and
isinstance(event.actions.state_delta, dict) and
self.agent.output_key in event.actions.state_delta):
extracted_data_dict = event.actions.state_delta[self.agent.output_key]
logger.info(f"Successfully extracted structured data via LlmAgent state_delta.")
break # Assuming full structured output comes in one event with state_delta
# Capture text parts for debugging if direct structured output isn't found first
if hasattr(event, 'content') and event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text'):
full_response_text_for_debug += part.text
if not extracted_data_dict and full_response_text_for_debug:
logger.warning("LlmAgent did not produce structured output in state_delta. Raw text response was: %s",
full_response_text_for_debug[:500] + "...") # Log snippet
# Attempt to parse the raw text if it looks like JSON (fallback, not ideal)
# This is a basic fallback; robust JSON cleaning might be needed if LLM doesn't adhere to schema.
# For now, we rely on LlmAgent's output_schema handling.
except Exception as e:
logger.error(f"Error during TaskExtractionAgent execution: {e}", exc_info=True)
# Fallback to returning an empty TaskExtractionOutput with error info
return TaskExtractionOutput(
current_quarter_info=f"Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining",
okrs=[],
overall_strategic_focus=f"Error during task extraction: {e}",
generation_timestamp=datetime.utcnow().isoformat()
)
finally:
try:
await runner.session_service.delete_session(
app_name=f"{self.AGENT_NAME}Runner", user_id=user_id, session_id=session.id
)
except Exception as session_del_e:
logger.error(f"Error deleting task extractor session: {session_del_e}")
if extracted_data_dict:
if isinstance(extracted_data_dict, TaskExtractionOutput): # Already a Pydantic model
return extracted_data_dict
elif isinstance(extracted_data_dict, dict): # If it's a dict, parse it
try:
return TaskExtractionOutput(**extracted_data_dict)
except Exception as pydantic_error:
logger.error(f"Error parsing extracted dictionary into TaskExtractionOutput: {pydantic_error}", exc_info=True)
logger.error(f"Problematic dictionary data: {extracted_data_dict}")
else:
logger.error(f"Extracted data is not a dictionary or TaskExtractionOutput model: {type(extracted_data_dict)}")
# Fallback if no valid data extracted
logger.warning("No valid structured data extracted by TaskExtractionAgent.")
return TaskExtractionOutput(
current_quarter_info=f"Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining",
okrs=[],
overall_strategic_focus="Failed to extract tasks or no tasks were identified.",
generation_timestamp=datetime.utcnow().isoformat()
)
def update_current_date(self, new_date: date):
"""
Updates the current date for the agent and re-initializes the LlmAgent
to reflect the new date context in its instructions.
"""
self.current_date = new_date
# Re-initialize the LlmAgent with the new instruction based on the new date
self.agent = LlmAgent(
name=self.AGENT_NAME,
model=self.model_name,
description=self.AGENT_DESCRIPTION,
instruction=self._get_instruction_prompt(), # Get updated instruction
output_schema=TaskExtractionOutput,
output_key="extracted_tasks_okrs"
)
logger.info(f"{self.AGENT_NAME} date updated. New context: Q{self._get_quarter(self.current_date)}, "
f"{self._days_until_quarter_end(self.current_date)} days remaining.")
if __name__ == '__main__':
import asyncio
# (Ensure logging_config.py is in the same directory or PYTHONPATH is set for this example to run standalone)
try:
from utils.logging_config import setup_logging
setup_logging()
logger.info("Logging setup for TaskExtractionAgent test.")
except ImportError:
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger.warning("logging_config.py not found, using basicConfig for logging.")
MOCK_API_KEY = os.environ.get("GOOGLE_API_KEY", "test_api_key_task_extractor") # Use your actual key or env var
MODEL_NAME = DEFAULT_AGENT_MODEL
# Example comprehensive analysis text (replace with actual analysis output)
sample_analysis_text = """
Overall Summary: Follower growth is steady at 5% MoM. Post engagement is highest for video content
(avg 8% engagement rate) published on weekdays. However, mentions sentiment dipped in the last month
(-0.2 avg score) due to complaints about customer service response times.
Key opportunity: Improve customer service communication and leverage video content more effectively.
Strategic Recommendation: Launch a 'Customer First' initiative and create a video series showcasing customer success stories.
"""
# Test with a specific date
test_date = date(2025, 4, 15) # Example: Mid-Q2 2025
task_agent = TaskExtractionAgent(api_key=MOCK_API_KEY, model_name=MODEL_NAME, current_date=test_date)
logger.info(f"Task Agent Instruction for test_date {test_date}:\n{task_agent._get_instruction_prompt()[:500]}...")
async def run_extraction():
logger.info("Extracting tasks from sample analysis...")
# In a real scenario, ensure GOOGLE_API_KEY is set if the LlmAgent makes actual calls.
# For local tests without real API calls, the LlmAgent might behave as a mock or require specific test setup.
if MOCK_API_KEY == "test_api_key_task_extractor":
logger.warning("Using a mock API key. LlmAgent behavior might be limited or mocked for task extraction.")
# Mock the runner if no real API call should be made
class MockADKRunner:
def __init__(self, agent, app_name): self.agent = agent
async def session_service_create_session(self, app_name, user_id):
class MockSession: id = "mock_session_id"
return MockSession()
async def run(self, user_id, session_id, new_message):
# Simulate a response structure
mock_okr = OKR(
objective_description="Improve Customer Satisfaction",
key_results=[KeyResult(
key_result_description="Reduce negative mentions by 10%",
tasks=[Task(
task_category="Customer Service", task_description="Respond to all negative mentions within 2 hours.",
objective_deliverable="Improved response time.", effort=EffortLevel.MEDIUM, timeline=TimelineCategory.IMMEDIATE,
responsible_party="Support Team", success_criteria_metrics="Avg response time < 2hrs.",
priority=PriorityLevel.HIGH, priority_justification="Critical for reputation.",
why_proposed="Analysis showed dip in sentiment due to slow responses.", task_type=TaskType.INITIATIVE,
data_subject=DataSubject.MENTIONS
)]
)],
objective_timeline=TimelineCategory.SHORT_TERM
)
mock_output = TaskExtractionOutput(
current_quarter_info=f"Q{task_agent._get_quarter(task_agent.current_date)}, {task_agent._days_until_quarter_end(task_agent.current_date)} days remaining",
okrs=[mock_okr],
overall_strategic_focus="Focus on customer service improvement."
)
# Simulate the event structure LlmAgent with output_schema would produce
class MockEvent:
def __init__(self):
self.actions = type('Actions', (), {'state_delta': {task_agent.agent.output_key: mock_output.model_dump()}})() # .model_dump() for Pydantic v2
yield MockEvent()
async def session_service_delete_session(self, app_name, user_id, session_id): pass
# Monkey patch the InMemoryRunner for this test if using mock key
global InMemoryRunner
OriginalInMemoryRunner = InMemoryRunner
InMemoryRunner = MockADKRunner
extracted_okrs_output = await task_agent.extract_tasks(sample_analysis_text)
# Restore InMemoryRunner if it was patched
if MOCK_API_KEY == "test_api_key_task_extractor" and 'OriginalInMemoryRunner' in globals():
InMemoryRunner = OriginalInMemoryRunner
print("\n--- TaskExtractionAgent Results ---")
if extracted_okrs_output:
print(f"Current Quarter Info: {extracted_okrs_output.current_quarter_info}")
print(f"Overall Strategic Focus: {extracted_okrs_output.overall_strategic_focus}")
print(f"Generated Timestamp: {extracted_okrs_output.generation_timestamp}")
print("\nOKRs Extracted:")
# Use .model_dump_json() for Pydantic v2 for pretty printing
print(extracted_okrs_output.model_dump_json(indent=2))
else:
print("No OKRs extracted or an error occurred.")
if __name__ == '__main__': # This check is technically inside another if __name__ == '__main__'
asyncio.run(run_extraction())
# Example of updating date
logger.info("\n--- Updating date for Task Agent ---")
new_test_date = date(2025, 10, 5) # Q4
task_agent.update_current_date(new_test_date)
# The instruction within task_agent.agent is now updated.
# logger.info(f"Task Agent NEW Instruction for test_date {new_test_date}:\n{task_agent.agent.instruction[:500]}...")
# A new call to extract_tasks would use this updated context.