File size: 5,014 Bytes
8ed7829
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6d6572c
 
 
8ed7829
 
 
 
 
810c600
8ed7829
 
 
 
 
6d6572c
8ed7829
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# run_agentic_pipeline.py
import asyncio
import os
import json
import logging
from datetime import datetime
import pandas as pd
from typing import Dict, Any, Optional

# Assuming this script is at the same level as 'app.py' and 'insight_and_tasks/' is a subfolder
# If 'insight_and_tasks' is not in python path, you might need to adjust sys.path
# For example, if insight_and_tasks is a sibling of the dir containing this file:
# import sys
# script_dir = os.path.dirname(os.path.abspath(__file__))
# project_root = os.path.dirname(script_dir) # Or navigate to the correct root
# sys.path.insert(0, project_root)

os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY

# Imports from your project structure
from insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator
# setup_logging might be called in app.py, if not, call it here or ensure it's called once.
# from insight_and_tasks.utils.logging_config import setup_logging 
from analytics_data_processing import prepare_filtered_analytics_data 
# Placeholder for UI generator import - to be created later
# from .insights_ui_generator import format_orchestration_results_for_ui 

logger = logging.getLogger(__name__)



async def run_full_analytics_orchestration(
    token_state: Dict[str, Any],
    date_filter_selection: str,
    custom_start_date: Optional[datetime],
    custom_end_date: Optional[datetime]
) -> Optional[Dict[str, Any]]:
    """
    Runs the full analytics pipeline using data from token_state and date filters,
    and returns the raw orchestration results.

    Args:
        token_state: Gradio token_state containing raw data and config.
        date_filter_selection: String for date filter type.
        custom_start_date: Optional custom start date.
        custom_end_date: Optional custom end date.

    Returns:
        A dictionary containing the results from the analytics orchestrator,
        or None if a critical error occurs.
    """
    if not GOOGLE_API_KEY:
        logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.")
        return None

    logger.info("Starting full analytics orchestration process...")

    # 1. Prepare and filter data
    try:
        (
            filtered_posts_df,
            filtered_mentions_df,
            _date_filtered_follower_stats_df, # This might be used if FollowerAgent specifically needs pre-filtered time series
            raw_follower_stats_df, # FollowerAgent typically processes raw historical for some metrics
            _start_dt, # Filtered start date, for logging or context if needed
            _end_dt    # Filtered end date
        ) = prepare_filtered_analytics_data(
            token_state, date_filter_selection, custom_start_date, custom_end_date
        )
        logger.info(f"Data prepared: Posts({len(filtered_posts_df)}), Mentions({len(filtered_mentions_df)}), FollowerStatsRaw({len(raw_follower_stats_df)})")

    except Exception as e:
        logger.error(f"Error during data preparation: {e}", exc_info=True)
        return None

    # Check if essential dataframes are empty after filtering, which might make analysis trivial or erroneous
    if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty:
        logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.")
        # Depending on requirements, you might return a specific message or empty results structure.

    # 2. Initialize and run the orchestrator
    try:
        # You can pass a specific model name or let the orchestrator use its default
        llm_model_for_run = "gemini-2.5-flash-preview-05-20" #token_state.get("config_llm_model_override") # Example: if you store this in token_state
        
        orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
            api_key=GOOGLE_API_KEY,
            llm_model_name=llm_model_for_run, # Pass None to use orchestrator's default
            current_date_for_tasks=datetime.utcnow().date()
        )

        logger.info("Orchestrator initialized. Generating full analysis and tasks...")
        # The orchestrator expects the primary follower stats DF to be the one it can process for
        # time-series ('follower_gains_monthly') and demographics.
        # The `raw_follower_stats_df` is usually better for this, as FollowerAgent does its own processing.
        orchestration_results = await orchestrator.generate_full_analysis_and_tasks(
            follower_stats_df=raw_follower_stats_df, # Pass the full history for followers
            post_df=filtered_posts_df,
            mentions_df=filtered_mentions_df
        )
        logger.info("Orchestration process completed.")
        return orchestration_results

    except Exception as e:
        logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True)
        return None