File size: 8,435 Bytes
8ed7829
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# run_agentic_pipeline.py
import asyncio
import os
import json
import logging
from datetime import datetime
import pandas as pd
from typing import Dict, Any, Optional

# Assuming this script is at the same level as 'app.py' and 'insight_and_tasks/' is a subfolder
# If 'insight_and_tasks' is not in python path, you might need to adjust sys.path
# For example, if insight_and_tasks is a sibling of the dir containing this file:
# import sys
# script_dir = os.path.dirname(os.path.abspath(__file__))
# project_root = os.path.dirname(script_dir) # Or navigate to the correct root
# sys.path.insert(0, project_root)


# Imports from your project structure
from insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator
# setup_logging might be called in app.py, if not, call it here or ensure it's called once.
# from insight_and_tasks.utils.logging_config import setup_logging 
from .analytics_data_processing import prepare_filtered_analytics_data 
# Placeholder for UI generator import - to be created later
# from .insights_ui_generator import format_orchestration_results_for_ui 

logger = logging.getLogger(__name__)

# GOOGLE_API_KEY should be set in the environment where app.py runs
GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")

async def run_full_analytics_orchestration(
    token_state: Dict[str, Any],
    date_filter_selection: str,
    custom_start_date: Optional[datetime],
    custom_end_date: Optional[datetime]
) -> Optional[Dict[str, Any]]:
    """
    Runs the full analytics pipeline using data from token_state and date filters,
    and returns the raw orchestration results.

    Args:
        token_state: Gradio token_state containing raw data and config.
        date_filter_selection: String for date filter type.
        custom_start_date: Optional custom start date.
        custom_end_date: Optional custom end date.

    Returns:
        A dictionary containing the results from the analytics orchestrator,
        or None if a critical error occurs.
    """
    if not GOOGLE_API_KEY:
        logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.")
        return None

    logger.info("Starting full analytics orchestration process...")

    # 1. Prepare and filter data
    try:
        (
            filtered_posts_df,
            filtered_mentions_df,
            _date_filtered_follower_stats_df, # This might be used if FollowerAgent specifically needs pre-filtered time series
            raw_follower_stats_df, # FollowerAgent typically processes raw historical for some metrics
            _start_dt, # Filtered start date, for logging or context if needed
            _end_dt    # Filtered end date
        ) = prepare_filtered_analytics_data(
            token_state, date_filter_selection, custom_start_date, custom_end_date
        )
        logger.info(f"Data prepared: Posts({len(filtered_posts_df)}), Mentions({len(filtered_mentions_df)}), FollowerStatsRaw({len(raw_follower_stats_df)})")

    except Exception as e:
        logger.error(f"Error during data preparation: {e}", exc_info=True)
        return None

    # Check if essential dataframes are empty after filtering, which might make analysis trivial or erroneous
    if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty:
        logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.")
        # Depending on requirements, you might return a specific message or empty results structure.

    # 2. Initialize and run the orchestrator
    try:
        # You can pass a specific model name or let the orchestrator use its default
        llm_model_for_run = "gemini-2.5-flash-preview-05-20" #token_state.get("config_llm_model_override") # Example: if you store this in token_state
        
        orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
            api_key=GOOGLE_API_KEY,
            llm_model_name=llm_model_for_run, # Pass None to use orchestrator's default
            current_date_for_tasks=datetime.utcnow().date()
        )

        logger.info("Orchestrator initialized. Generating full analysis and tasks...")
        # The orchestrator expects the primary follower stats DF to be the one it can process for
        # time-series ('follower_gains_monthly') and demographics.
        # The `raw_follower_stats_df` is usually better for this, as FollowerAgent does its own processing.
        orchestration_results = await orchestrator.generate_full_analysis_and_tasks(
            follower_stats_df=raw_follower_stats_df, # Pass the full history for followers
            post_df=filtered_posts_df,
            mentions_df=filtered_mentions_df
        )
        logger.info("Orchestration process completed.")
        return orchestration_results

    except Exception as e:
        logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True)
        return None

# Example of how app.py might call this (for testing this module standalone)
if __name__ == "__main__":
    # This block is for testing `run_analytics_pipeline.py` directly.
    # In a real scenario, `app.py` would import and call `run_full_analytics_orchestration`.
    
    # Ensure logging is set up for standalone test
    from insight_and_tasks.utils.logging_config import setup_logging
    setup_logging()

    if not GOOGLE_API_KEY:
        print("Please set the GOOGLE_API_KEY environment variable to run this test.")
    else:
        # Create mock token_state and filter parameters
        mock_token_state = {
            "bubble_posts_df": pd.DataFrame({
                'id': [1, 2, 3], 'published_at': pd.to_datetime(['2023-01-01', '2023-07-15', '2023-12-31']),
                'li_eb_label': ['A', 'B', 'A'], 'media_type': ['X', 'Y', 'X'], 'is_ad': [False, True, False],
                'sentiment': ['pos', 'neg', 'neu'], 'engagement': [10,5,8], 'impressionCount': [100,50,80],
                'clickCount': [1,2,3], 'likeCount': [8,3,6], 'commentCount': [1,1,1], 'shareCount': [1,1,1]
            }),
            "bubble_post_stats_df": pd.DataFrame({'post_id': [1, 2, 3]}), # Simplified
            "bubble_mentions_df": pd.DataFrame({
                'date': pd.to_datetime(['2023-06-01', '2023-08-20']), 'sentiment_label': ['Positive πŸ‘', 'Negative πŸ‘Ž']
            }),
            "bubble_follower_stats_df": pd.DataFrame({
                'category_name': ['2023-01-01', 'Industry A', '2023-02-01'], 
                'follower_count_organic': [100, 500, 120],
                'follower_count_paid': [10, 50, 20],
                'follower_count_type': ['follower_gains_monthly', 'follower_industry', 'follower_gains_monthly']
            }),
            "config_date_col_posts": "published_at",
            "config_date_col_mentions": "date",
        }
        mock_date_filter = "Ultimi 365 Giorni" # Example, adjust prepare_filtered_analytics_data if new options
        # For "Ultimi 365 Giorni", prepare_filtered_analytics_data needs to handle it or be updated.
        # Let's use a known filter for testing:
        mock_date_filter = "Sempre"
        mock_custom_start = None
        mock_custom_end = None

        logger.info("Running standalone test of run_full_analytics_orchestration...")
        
        async def test_run():
            results = await run_full_analytics_orchestration(
                mock_token_state, mock_date_filter, mock_custom_start, mock_custom_end
            )

            if results:
                print("\n\n" + "="*30 + " MOCK ORCHESTRATION RESULTS " + "="*30)
                print("\n--- Comprehensive Analysis Report ---")
                print(results.get("comprehensive_analysis_report", "N/A"))
                
                print("\n--- Actionable OKRs and Tasks ---")
                okrs_data = results.get("actionable_okrs_and_tasks")
                if okrs_data:
                    print(json.dumps(okrs_data, indent=2))
                else:
                    print("N/A")
                
                # Optionally print detailed metrics if needed for debugging
                # print("\n--- Detailed Agent Metrics ---")
                # print(json.dumps(results.get("detailed_metrics"), indent=2, default=str))
            else:
                print("Standalone test: Orchestration returned no results or failed.")

        asyncio.run(test_run())