File size: 22,594 Bytes
c7fbc19
 
 
 
 
 
 
 
 
 
01a86dd
c7fbc19
 
 
 
 
 
 
 
 
 
01a86dd
c7fbc19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
415fd0b
 
 
c7fbc19
 
 
415fd0b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c7fbc19
415fd0b
 
 
 
 
c7fbc19
 
 
415fd0b
c7fbc19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
# agents/task_extraction_agent.py
import logging
from typing import Optional
from datetime import datetime, date # Ensure date is imported if used for type hints

from google.adk.agents import LlmAgent
from google.adk.runners import InMemoryRunner # Assuming this is used for direct agent running
from google.genai import types as genai_types # For constructing ADK agent inputs

# Project-specific imports
from insight_and_tasks.data_models.tasks import (
    TaskExtractionOutput, 
    OKR, 
    KeyResult, 
    Task, 
    EffortLevel, 
    TimelineCategory, 
    PriorityLevel, 
    TaskType, 
    DataSubject # Ensure all are imported
)
from insight_and_tasks.utils.retry_mechanism import RetryMechanism # If retries are needed for ADK calls

# Configure logger for this module
logger = logging.getLogger(__name__)

DEFAULT_AGENT_MODEL = "gemini-2.5-flash-preview-05-20" # Or your specific model

class TaskExtractionAgent:
    """
    Agent specialized in extracting actionable tasks and OKRs from analysis insights,
    with awareness of the current date and quarter.
    """
    AGENT_NAME = "task_extractor"
    AGENT_DESCRIPTION = "Specialist in converting strategic insights into specific, time-aware actionable tasks and OKRs."

    def __init__(self, api_key: str, model_name: Optional[str] = None, current_date: Optional[date] = None):
        """
        Initializes the TaskExtractionAgent.

        Args:
            api_key: API key (may be used by LlmAgent configuration or future needs).
            model_name: Name of the language model to use.
            current_date: The current date to use for quarter calculations. Defaults to today.
        """
        self.api_key = api_key # Store if needed by LlmAgent or other components
        self.model_name = model_name or DEFAULT_AGENT_MODEL
        self.current_date = current_date or datetime.utcnow().date() # Use date object for consistency

        # LlmAgent is initialized with dynamic instruction and output schema
        self.agent = LlmAgent(
            name=self.AGENT_NAME,
            model=self.model_name,
            description=self.AGENT_DESCRIPTION,
            instruction=self._get_instruction_prompt(), # Instruction generated dynamically
            output_schema=TaskExtractionOutput, # Pydantic model for structured output
            output_key="extracted_tasks_okrs"   # Key where LlmAgent stores structured output in state
        )
        self.retry_mechanism = RetryMechanism() # For retrying ADK runner if needed
        logger.info(f"{self.AGENT_NAME} initialized for Q{self._get_quarter(self.current_date)}, "
                    f"{self._days_until_quarter_end(self.current_date)} days remaining in quarter. Model: {self.model_name}")

    def _get_quarter(self, d: date) -> int:
        """Calculates the quarter for a given date."""
        return (d.month - 1) // 3 + 1

    def _days_until_quarter_end(self, d: date) -> int:
        """Calculates the number of days remaining in the current quarter from date d."""
        current_q = self._get_quarter(d)
        year = d.year
        if current_q == 1:
            quarter_end_date = date(year, 3, 31)
        elif current_q == 2:
            quarter_end_date = date(year, 6, 30)
        elif current_q == 3:
            quarter_end_date = date(year, 9, 30)
        else:  # Quarter 4
            quarter_end_date = date(year, 12, 31)
        
        days_remaining = (quarter_end_date - d).days
        return max(0, days_remaining) # Ensure non-negative

    def _get_instruction_prompt(self) -> str:
        """Generates the dynamic instruction string for the LlmAgent."""
        quarter = self._get_quarter(self.current_date)
        days_remaining = self._days_until_quarter_end(self.current_date)
        
        # Dynamically include Pydantic model field descriptions for better LLM guidance
        # This part can be complex if done fully automatically. For now, manually summarizing key fields.
        task_fields_summary = (
            "Each Task must include: task_category (e.g., Content Strategy), task_description, "
            "objective_deliverable, effort (Small, Medium, Large), timeline (Immediate, Short-term, Medium-term, Long-term), "
            "responsible_party, success_criteria_metrics, dependencies_prerequisites (optional), "
            "priority (High, Medium, Low) with priority_justification, why_proposed (linking to analysis), "
            "task_type (initiative or tracking), data_subject (for tracking tasks: follower_stats, posts, mentions, general)."
        )

        return f"""
        You are a Time-Aware Task Extraction Specialist. Your primary function is to meticulously analyze strategic insights 
        derived from LinkedIn analytics and transform them into a structured set of actionable tasks. These tasks should be 
        organized within an Objectives and Key Results (OKRs) framework.

        CURRENT CONTEXTUAL INFORMATION (DO NOT CHANGE THIS IN YOUR OUTPUT):
        - Current Quarter: Q{quarter}
        - Days remaining in current quarter: {days_remaining}
        - Today's Date (for context): {self.current_date.isoformat()}

        YOUR MANDATE:
        1.  Define clear, aspirational Objectives (qualitative goals).
        2.  For each Objective, formulate 2-3 specific, measurable Key Results.
        3.  Under each Key Result, list detailed, actionable Tasks required to achieve it.
        4.  CRITICAL: Each Task MUST strictly adhere to the 'Task' Pydantic model fields. This means providing values for ALL required fields: {task_fields_summary}
        5.  Task Timelines: Must be realistic given the {days_remaining} days left in Q{quarter}. Prioritize actions that can make significant progress or be completed within this timeframe. Use TimelineCategory enum values.
        6.  Task Types: Clearly distinguish between 'initiative' (new actions/projects) and 'tracking' (ongoing monitoring/measurement).
        7.  Data Subjects: For 'tracking' tasks, accurately specify the relevant 'data_subject'. For 'initiative' tasks, this can be 'general' or null if not specific to one data type.
        8.  Rationale ('why_proposed'): This is crucial. Each task's proposal must be explicitly justified by and linked back to specific findings, trends, or recommendations mentioned in the input 'comprehensive_analysis'.
        9.  Priority: Assign a priority (High, Medium, Low) to each task and provide a 'priority_justification'.

        INPUT: You will receive a 'comprehensive_analysis' text.

        OUTPUT FORMAT:
        You MUST return a single JSON object that strictly conforms to the 'TaskExtractionOutput' Pydantic schema.
        This JSON object will contain:
        - 'current_quarter_info': A string exactly like "Q{quarter}, {days_remaining} days remaining". (This is fixed based on the context above).
        - 'okrs': A list, where each item is an 'OKR' object.
        - 'overall_strategic_focus': (Optional) A brief summary of the main strategic themes emerging from the OKRs.
        - 'generation_timestamp': (This will be auto-filled if you conform to the schema, or you can provide an ISO timestamp).

        Example of a Task (ensure all fields from the Pydantic model are covered):
        {{
            "task_category": "Content Creation",
            "task_description": "Launch a 3-part blog series on AI in Marketing.",
            "objective_deliverable": "Objective: Increase thought leadership in AI marketing. Deliverable: 3 published blog posts.",
            "effort": "Medium",
            "timeline": "Short-term",
            "responsible_party": "Content Team Lead",
            "success_criteria_metrics": "Average 500 views per post, 10+ shares per post.",
            "dependencies_prerequisites": "Keyword research for AI marketing topics completed.",
            "priority": "High",
            "priority_justification": "Addresses key strategic goal of establishing AI expertise.",
            "why_proposed": "Analysis highlighted a gap in content related to AI, a high-interest area for our target audience.",
            "task_type": "initiative",
            "data_subject": "general"
        }}

        Focus on quality, actionability, and strict adherence to the output schema.
        """

    async def extract_tasks(self, comprehensive_analysis: str) -> TaskExtractionOutput:
        """
        Extracts time-aware actionable tasks from the comprehensive analysis text.

        Args:
            comprehensive_analysis: The text analysis from which to extract tasks.

        Returns:
            A TaskExtractionOutput Pydantic model instance.
        """
        if not comprehensive_analysis or not comprehensive_analysis.strip():
            logger.warning("Comprehensive analysis text is empty. Cannot extract tasks.")
            return TaskExtractionOutput(
                current_quarter_info=f"Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining",
                okrs=[],
                overall_strategic_focus="No analysis provided to extract tasks."
            )

        # The LlmAgent's instruction already contains the dynamic date info and output format.
        # The input to the agent's run method will be the comprehensive_analysis.
        prompt_for_adk_agent = f"""
        Comprehensive Analysis for Task Extraction:
        ---
        {comprehensive_analysis}
        ---
        Based on the analysis above, and adhering strictly to your primary instructions (especially regarding current quarter context, task field requirements, and JSON output schema 'TaskExtractionOutput'), generate the OKRs and tasks.
        Ensure the 'current_quarter_info' field in your JSON output is exactly: "Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining".
        """

        user_input_content = genai_types.Content(
            role="user",
            parts=[genai_types.Part(text=prompt_for_adk_agent)]
        )

        # Using InMemoryRunner as per original structure for LlmAgent with output_schema
        runner = InMemoryRunner(agent=self.agent, app_name=f"{self.AGENT_NAME}Runner")
        # Generate a unique user_id for each run to ensure fresh session state if needed.
        user_id = f"system_user_task_extractor_{int(datetime.utcnow().timestamp())}"
        
        session = await runner.session_service.create_session(
            app_name=f"{self.AGENT_NAME}Runner",
            user_id=user_id
        )

        extracted_data_dict = None
        full_response_text_for_debug = "" # To capture raw text if parsing fails

        try:
            logger.info(f"Running TaskExtractionAgent for user_id: {user_id}, session_id: {session.id}")
            
            # Fix: Use regular for loop instead of async for, since runner.run() returns a generator
            run_result = runner.run(
                user_id=user_id,
                session_id=session.id,
                new_message=user_input_content
            )
            
            # Check if it's an async iterator or regular generator
            if hasattr(run_result, '__aiter__'):
                # It's an async iterator, use async for
                async for event in run_result:
                    if (hasattr(event, 'actions') and event.actions and
                        hasattr(event.actions, 'state_delta') and
                        isinstance(event.actions.state_delta, dict) and
                        self.agent.output_key in event.actions.state_delta):
                        
                        extracted_data_dict = event.actions.state_delta[self.agent.output_key]
                        logger.info(f"Successfully extracted structured data via LlmAgent state_delta.")
                        break
                    
                    # Capture text parts for debugging if direct structured output isn't found first
                    if hasattr(event, 'content') and event.content and event.content.parts:
                        for part in event.content.parts:
                            if hasattr(part, 'text'):
                                 full_response_text_for_debug += part.text
            else:
                # It's a regular generator, use regular for loop
                for event in run_result:
                    if (hasattr(event, 'actions') and event.actions and
                        hasattr(event.actions, 'state_delta') and
                        isinstance(event.actions.state_delta, dict) and
                        self.agent.output_key in event.actions.state_delta):
                        
                        extracted_data_dict = event.actions.state_delta[self.agent.output_key]
                        logger.info(f"Successfully extracted structured data via LlmAgent state_delta.")
                        break
                    
                    # Capture text parts for debugging if direct structured output isn't found first
                    if hasattr(event, 'content') and event.content and event.content.parts:
                        for part in event.content.parts:
                            if hasattr(part, 'text'):
                                 full_response_text_for_debug += part.text

            if not extracted_data_dict and full_response_text_for_debug:
                logger.warning("LlmAgent did not produce structured output in state_delta. Raw text response was: %s", 
                               full_response_text_for_debug[:500] + "...") 

        except Exception as e:
            logger.error(f"Error during TaskExtractionAgent execution: {e}", exc_info=True)
        finally:
            try:
                await runner.session_service.delete_session(
                    app_name=f"{self.AGENT_NAME}Runner", user_id=user_id, session_id=session.id
                )
            except Exception as session_del_e:
                logger.error(f"Error deleting task extractor session: {session_del_e}")

        if extracted_data_dict:
            if isinstance(extracted_data_dict, TaskExtractionOutput): # Already a Pydantic model
                return extracted_data_dict
            elif isinstance(extracted_data_dict, dict): # If it's a dict, parse it
                try:
                    return TaskExtractionOutput(**extracted_data_dict)
                except Exception as pydantic_error:
                    logger.error(f"Error parsing extracted dictionary into TaskExtractionOutput: {pydantic_error}", exc_info=True)
                    logger.error(f"Problematic dictionary data: {extracted_data_dict}")
            else:
                logger.error(f"Extracted data is not a dictionary or TaskExtractionOutput model: {type(extracted_data_dict)}")
        
        # Fallback if no valid data extracted
        logger.warning("No valid structured data extracted by TaskExtractionAgent.")
        return TaskExtractionOutput(
            current_quarter_info=f"Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining",
            okrs=[],
            overall_strategic_focus="Failed to extract tasks or no tasks were identified.",
            generation_timestamp=datetime.utcnow().isoformat()
        )

    def update_current_date(self, new_date: date):
        """
        Updates the current date for the agent and re-initializes the LlmAgent
        to reflect the new date context in its instructions.
        """
        self.current_date = new_date
        # Re-initialize the LlmAgent with the new instruction based on the new date
        self.agent = LlmAgent(
            name=self.AGENT_NAME,
            model=self.model_name,
            description=self.AGENT_DESCRIPTION,
            instruction=self._get_instruction_prompt(), # Get updated instruction
            output_schema=TaskExtractionOutput,
            output_key="extracted_tasks_okrs"
        )
        logger.info(f"{self.AGENT_NAME} date updated. New context: Q{self._get_quarter(self.current_date)}, "
                    f"{self._days_until_quarter_end(self.current_date)} days remaining.")


if __name__ == '__main__':
    import asyncio
    # (Ensure logging_config.py is in the same directory or PYTHONPATH is set for this example to run standalone)
    try:
        from utils.logging_config import setup_logging
        setup_logging()
        logger.info("Logging setup for TaskExtractionAgent test.")
    except ImportError:
        logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        logger.warning("logging_config.py not found, using basicConfig for logging.")

    MOCK_API_KEY = os.environ.get("GOOGLE_API_KEY", "test_api_key_task_extractor") # Use your actual key or env var
    MODEL_NAME = DEFAULT_AGENT_MODEL

    # Example comprehensive analysis text (replace with actual analysis output)
    sample_analysis_text = """
    Overall Summary: Follower growth is steady at 5% MoM. Post engagement is highest for video content 
    (avg 8% engagement rate) published on weekdays. However, mentions sentiment dipped in the last month 
    (-0.2 avg score) due to complaints about customer service response times. 
    Key opportunity: Improve customer service communication and leverage video content more effectively.
    Strategic Recommendation: Launch a 'Customer First' initiative and create a video series showcasing customer success stories.
    """

    # Test with a specific date
    test_date = date(2025, 4, 15) # Example: Mid-Q2 2025
    task_agent = TaskExtractionAgent(api_key=MOCK_API_KEY, model_name=MODEL_NAME, current_date=test_date)

    logger.info(f"Task Agent Instruction for test_date {test_date}:\n{task_agent._get_instruction_prompt()[:500]}...")

    async def run_extraction():
        logger.info("Extracting tasks from sample analysis...")
        # In a real scenario, ensure GOOGLE_API_KEY is set if the LlmAgent makes actual calls.
        # For local tests without real API calls, the LlmAgent might behave as a mock or require specific test setup.
        if MOCK_API_KEY == "test_api_key_task_extractor":
             logger.warning("Using a mock API key. LlmAgent behavior might be limited or mocked for task extraction.")
             # Mock the runner if no real API call should be made
             class MockADKRunner:
                def __init__(self, agent, app_name): self.agent = agent
                async def session_service_create_session(self, app_name, user_id): 
                    class MockSession: id = "mock_session_id"
                    return MockSession()
                async def run(self, user_id, session_id, new_message):
                    # Simulate a response structure
                    mock_okr = OKR(
                        objective_description="Improve Customer Satisfaction",
                        key_results=[KeyResult(
                            key_result_description="Reduce negative mentions by 10%",
                            tasks=[Task(
                                task_category="Customer Service", task_description="Respond to all negative mentions within 2 hours.",
                                objective_deliverable="Improved response time.", effort=EffortLevel.MEDIUM, timeline=TimelineCategory.IMMEDIATE,
                                responsible_party="Support Team", success_criteria_metrics="Avg response time < 2hrs.",
                                priority=PriorityLevel.HIGH, priority_justification="Critical for reputation.",
                                why_proposed="Analysis showed dip in sentiment due to slow responses.", task_type=TaskType.INITIATIVE,
                                data_subject=DataSubject.MENTIONS
                            )]
                        )],
                        objective_timeline=TimelineCategory.SHORT_TERM
                    )
                    mock_output = TaskExtractionOutput(
                        current_quarter_info=f"Q{task_agent._get_quarter(task_agent.current_date)}, {task_agent._days_until_quarter_end(task_agent.current_date)} days remaining",
                        okrs=[mock_okr],
                        overall_strategic_focus="Focus on customer service improvement."
                    )
                    # Simulate the event structure LlmAgent with output_schema would produce
                    class MockEvent:
                        def __init__(self):
                            self.actions = type('Actions', (), {'state_delta': {task_agent.agent.output_key: mock_output.model_dump()}})() # .model_dump() for Pydantic v2
                    yield MockEvent()

                async def session_service_delete_session(self, app_name, user_id, session_id): pass
             
             # Monkey patch the InMemoryRunner for this test if using mock key
             global InMemoryRunner 
             OriginalInMemoryRunner = InMemoryRunner
             InMemoryRunner = MockADKRunner


        extracted_okrs_output = await task_agent.extract_tasks(sample_analysis_text)
        
        # Restore InMemoryRunner if it was patched
        if MOCK_API_KEY == "test_api_key_task_extractor" and 'OriginalInMemoryRunner' in globals():
            InMemoryRunner = OriginalInMemoryRunner


        print("\n--- TaskExtractionAgent Results ---")
        if extracted_okrs_output:
            print(f"Current Quarter Info: {extracted_okrs_output.current_quarter_info}")
            print(f"Overall Strategic Focus: {extracted_okrs_output.overall_strategic_focus}")
            print(f"Generated Timestamp: {extracted_okrs_output.generation_timestamp}")
            print("\nOKRs Extracted:")
            # Use .model_dump_json() for Pydantic v2 for pretty printing
            print(extracted_okrs_output.model_dump_json(indent=2))
        else:
            print("No OKRs extracted or an error occurred.")

    if __name__ == '__main__': # This check is technically inside another if __name__ == '__main__'
        asyncio.run(run_extraction())

        # Example of updating date
        logger.info("\n--- Updating date for Task Agent ---")
        new_test_date = date(2025, 10, 5) # Q4
        task_agent.update_current_date(new_test_date)
        # The instruction within task_agent.agent is now updated.
        # logger.info(f"Task Agent NEW Instruction for test_date {new_test_date}:\n{task_agent.agent.instruction[:500]}...")
        # A new call to extract_tasks would use this updated context.