GuglielmoTor commited on
Commit
67e5b47
·
verified ·
1 Parent(s): 2db7713

Update eb_agent_module.py

Browse files
Files changed (1) hide show
  1. eb_agent_module.py +110 -80
eb_agent_module.py CHANGED
@@ -11,6 +11,11 @@ import traceback
11
  import pandasai as pai
12
  from pandasai_litellm import LiteLLM
13
 
 
 
 
 
 
14
  # Configure logging
15
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(module)s - %(message)s')
16
 
@@ -575,54 +580,78 @@ class EmployerBrandingAgent:
575
  else:
576
  return "general"
577
 
578
- async def _generate_pandas_response(self, query: str) -> tuple[str, bool]:
579
- """Generate response using PandasAI for data queries"""
580
- if not self.pandas_agent or not hasattr(self, 'pandas_dfs'):
581
- return "Data analysis not available - PandasAI not initialized.", False
 
 
 
582
 
 
 
 
 
583
  try:
584
- logging.info(f"Processing data query with PandasAI: {query[:100]}...")
585
-
586
- # Use the first available dataframe for single-df queries
587
- # For multi-df queries, you'd use pai.chat(query, df1, df2, ...)
588
- if len(self.pandas_dfs) == 1:
589
- df = list(self.pandas_dfs.values())[0]
590
- logging.info(f"Using single DataFrame for query with shape: {df.df.shape}")
591
- pandas_response = df.chat(query)
592
- else:
593
- # For multiple dataframes, use pai.chat with all dfs
594
- dfs = list(self.pandas_dfs.values())
595
- pandas_response = pai.chat(query, *dfs)
596
-
597
- # Handle different response types (text, charts, etc.)
598
- response_text = ""
599
- chart_info = ""
600
-
601
- # Check if a chart was generated
602
- import os
603
- charts_dir = "./charts"
604
- if os.path.exists(charts_dir):
605
- chart_files = [f for f in os.listdir(charts_dir) if f.endswith(('.png', '.jpg', '.jpeg', '.svg'))]
606
- if chart_files:
607
- # Get the most recent chart file
608
- chart_files.sort(key=lambda x: os.path.getmtime(os.path.join(charts_dir, x)), reverse=True)
609
- latest_chart = chart_files[0]
610
- chart_path = os.path.join(charts_dir, latest_chart)
611
- chart_info = f"\n\n📊 **Chart Generated**: {latest_chart}\nChart saved at: {chart_path}"
612
- logging.info(f"Chart generated: {chart_path}")
613
-
614
- # Combine text response with chart info
615
- if pandas_response and str(pandas_response).strip():
616
- response_text = str(pandas_response).strip()
617
- else:
618
- response_text = "Analysis completed"
619
 
620
- final_response = response_text + chart_info
621
- return final_response, True
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
622
 
623
  except Exception as e:
624
- logging.error(f"Error in PandasAI processing: {e}", exc_info=True)
625
- return f"I encountered an error while analyzing the data: {str(e)}", False
 
 
 
 
626
 
627
  async def _generate_enhanced_response(self, query: str, pandas_result: str = "", query_type: str = "general") -> str:
628
  """Generate enhanced response combining PandasAI results with RAG context"""
@@ -753,64 +782,65 @@ class EmployerBrandingAgent:
753
  return False
754
  return True
755
 
756
- async def process_query(self, user_query: str) -> str:
757
  """
758
- Main method to process user queries with hybrid approach:
759
- 1. Classify query type (data/advice/hybrid)
760
- 2. Use PandasAI for data queries
761
- 3. Use enhanced LLM for interpretation and advice
762
- 4. Combine results for comprehensive responses
763
  """
764
  if not self._validate_query(user_query):
765
- return "Please provide a valid query (3 to 3000 characters)."
766
 
767
  if not self.is_ready:
768
  logging.warning("process_query called but agent is not ready. Attempting re-initialization.")
769
  init_success = await self.initialize()
770
  if not init_success:
771
- return "The agent is not properly initialized and could not be started. Please check configuration and logs."
772
 
773
  try:
774
- # Classify the query type
775
  query_type = self._classify_query_type(user_query)
776
  logging.info(f"Query classified as: {query_type}")
777
 
778
- pandas_result = ""
779
- pandas_success = False
 
780
 
781
  # For data-related queries, try PandasAI first
782
  if query_type in ["data", "hybrid"] and self.pandas_agent:
783
  logging.info("Attempting PandasAI analysis...")
784
- pandas_result, pandas_success = await self._generate_pandas_response(user_query)
785
 
786
  if pandas_success:
787
- logging.info("PandasAI analysis successful")
788
- # For pure data queries with successful analysis, we might return enhanced result
789
- if query_type == "data":
790
- enhanced_response = await self._generate_enhanced_response(
791
- user_query, pandas_result, query_type
792
- )
793
- return enhanced_response
794
  else:
795
- logging.warning("PandasAI analysis failed, falling back to general response")
796
-
797
- # For hybrid queries, advice queries, or when PandasAI fails
798
- if query_type == "hybrid" and pandas_success:
799
- # Combine PandasAI results with enhanced advice
800
- enhanced_response = await self._generate_enhanced_response(
801
- user_query, pandas_result, query_type
802
- )
803
- return enhanced_response
804
- else:
805
- # General advice or fallback response
806
- enhanced_response = await self._generate_enhanced_response(
807
- user_query, "", query_type
808
- )
809
- return enhanced_response
810
-
 
 
 
 
 
 
 
 
 
811
  except Exception as e:
812
- logging.error(f"Error in process_query: {e}", exc_info=True)
813
- return f"I encountered an error while processing your request: {str(e)}"
 
814
 
815
  def update_dataframes(self, new_dataframes: Dict[str, pd.DataFrame]):
816
  """Updates the agent's DataFrames and reinitializes PandasAI agent"""
 
11
  import pandasai as pai
12
  from pandasai_litellm import LiteLLM
13
 
14
+ # Add this early, before matplotlib.pyplot is imported directly or by pandasai
15
+ import matplotlib
16
+ matplotlib.use('Agg') # Use a non-interactive backend for Matplotlib
17
+ import matplotlib.pyplot as plt
18
+
19
  # Configure logging
20
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(module)s - %(message)s')
21
 
 
580
  else:
581
  return "general"
582
 
583
+ async def _generate_pandas_response(self, query: str) -> Tuple[Optional[str], Optional[str], bool]:
584
+ """
585
+ Generate response using PandasAI for data queries.
586
+ Returns: (textual_output_from_pandas_ai, chart_file_path, success_flag)
587
+ """
588
+ if not self.pandas_agent or not self.pandas_dfs: # Check if pandas_dfs is populated
589
+ return "Data analysis tool (PandasAI) is not initialized or no data is loaded.", None, False
590
 
591
+ latest_chart_path: Optional[str] = None
592
+ textual_pandas_response: Optional[str] = None
593
+ pandas_response_raw: Any = None
594
+
595
  try:
596
+ logging.info(f"Processing data query with PandasAI: '{query[:100]}...' using {len(self.pandas_dfs)} DataFrame(s).")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
597
 
598
+ # PandasAI's `chat` method can take multiple DataFrames.
599
+ # The `pai.DataFrame` objects are already stored in self.pandas_dfs.
600
+ # We pass the original pandas.DataFrame objects wrapped in pai.DataFrame to the agent.
601
+ # The `chat` method is called on these pai.DataFrame objects or globally with `pai.chat`.
602
+
603
+ dfs_to_query = list(self.pandas_dfs.values()) # List of pai.DataFrame objects
604
+
605
+ if not dfs_to_query:
606
+ return "No dataframes available for PandasAI to query.", None, False
607
+
608
+ # Use pai.chat() for potentially multiple dataframes
609
+ # The *dfs_to_query unpacks the list of pai.DataFrame objects
610
+ pandas_response_raw = await asyncio.to_thread(pai.chat, query, *dfs_to_query)
611
+
612
+ # Check if a chart was generated and saved by PandasAI
613
+ # PandasAI should save charts to the path specified in its config.
614
+ charts_dir = pai.config.save_charts_path # Get configured path
615
+ if charts_dir and os.path.exists(charts_dir):
616
+ # Look for the most recently created chart file
617
+ chart_files = [os.path.join(charts_dir, f) for f in os.listdir(charts_dir) if f.endswith(('.png', '.jpg', '.jpeg', '.svg'))]
618
+ if chart_files:
619
+ chart_files.sort(key=os.path.getmtime, reverse=True)
620
+ latest_chart_path = chart_files[0] # Full path
621
+ logging.info(f"Chart detected/generated by PandasAI: {latest_chart_path}")
622
+
623
+ # Determine the textual part of the PandasAI response
624
+ if isinstance(pandas_response_raw, str):
625
+ # If the raw response IS the chart path, then text is minimal.
626
+ # This can happen if PandasAI's primary output for a query is a chart.
627
+ if latest_chart_path and pandas_response_raw == latest_chart_path:
628
+ textual_pandas_response = "A chart was generated to answer your query."
629
+ else:
630
+ textual_pandas_response = pandas_response_raw.strip()
631
+ elif isinstance(pandas_response_raw, (int, float, bool)):
632
+ textual_pandas_response = str(pandas_response_raw)
633
+ elif isinstance(pandas_response_raw, pd.DataFrame):
634
+ # If PandasAI returns a DataFrame, summarize it.
635
+ # Avoid sending overly long strings to the LLM.
636
+ textual_pandas_response = f"PandasAI returned a data table with {len(pandas_response_raw)} rows. Here are the first few entries:\n{pandas_response_raw.head(3).to_string()}"
637
+ elif pandas_response_raw is not None: # Other types
638
+ textual_pandas_response = str(pandas_response_raw).strip()
639
+
640
+ # If no textual response formed yet, but a chart exists, make a note.
641
+ if not textual_pandas_response and latest_chart_path:
642
+ textual_pandas_response = "A chart was generated as a result of the analysis."
643
+ elif textual_pandas_response is None and latest_chart_path is None: # No output at all
644
+ textual_pandas_response = "PandasAI processed the query but did not return a specific textual result or chart."
645
+
646
+ return textual_pandas_response, latest_chart_path, True
647
 
648
  except Exception as e:
649
+ logging.error(f"Error during PandasAI processing for query '{query[:100]}...': {e}", exc_info=True)
650
+ # Try to provide a more user-friendly error message if possible
651
+ error_msg = f"I encountered an issue while analyzing the data with the data tool: {type(e).__name__}."
652
+ if "duckdb" in str(e).lower() and "binder error" in str(e).lower():
653
+ error_msg += " This might be due to a mismatch in data types or an unsupported operation on the data."
654
+ return error_msg, None, False
655
 
656
  async def _generate_enhanced_response(self, query: str, pandas_result: str = "", query_type: str = "general") -> str:
657
  """Generate enhanced response combining PandasAI results with RAG context"""
 
782
  return False
783
  return True
784
 
785
+ async def process_query(self, user_query: str) -> Dict[str, Optional[str]]:
786
  """
787
+ Main method to process user queries.
788
+ Returns a dictionary: {"text": llm_response_string, "image_path": path_to_chart_or_none}
 
 
 
789
  """
790
  if not self._validate_query(user_query):
791
+ return {"text": "Please provide a valid query (3 to 3000 characters).", "image_path": None}
792
 
793
  if not self.is_ready:
794
  logging.warning("process_query called but agent is not ready. Attempting re-initialization.")
795
  init_success = await self.initialize()
796
  if not init_success:
797
+ return {"text": "The agent is not properly initialized and could not be started. Please check configuration and logs.", "image_path": None}
798
 
799
  try:
 
800
  query_type = self._classify_query_type(user_query)
801
  logging.info(f"Query classified as: {query_type}")
802
 
803
+ pandas_text_output: Optional[str] = None
804
+ pandas_chart_path: Optional[str] = None
805
+ pandas_success = False # Flag to track if PandasAI ran successfully
806
 
807
  # For data-related queries, try PandasAI first
808
  if query_type in ["data", "hybrid"] and self.pandas_agent:
809
  logging.info("Attempting PandasAI analysis...")
810
+ pandas_text_output, pandas_chart_path, pandas_success = await self._generate_pandas_response(user_query)
811
 
812
  if pandas_success:
813
+ logging.info(f"PandasAI analysis successful. Text: '{str(pandas_text_output)[:100]}...', Chart: '{pandas_chart_path}'")
 
 
 
 
 
 
814
  else:
815
+ # pandas_text_output might contain the error message from PandasAI
816
+ logging.warning(f"PandasAI analysis failed or returned no specific result. Message from PandasAI: {pandas_text_output}")
817
+
818
+ # Prepare the context from PandasAI for the LLM
819
+ llm_context_from_pandas = ""
820
+ if pandas_text_output: # This could be a success message or an error message from PandasAI
821
+ llm_context_from_pandas += f"Data Analysis Tool Output: {pandas_text_output}\n"
822
+ if pandas_chart_path and pandas_success: # Only mention chart path if PandasAI was successful
823
+ llm_context_from_pandas += f"[A chart has been generated by the data tool and saved at '{pandas_chart_path}'. You should refer to this chart in your explanation if it's relevant to the user's query.]\n"
824
+ elif query_type in ["data", "hybrid"] and not self.pandas_agent:
825
+ llm_context_from_pandas += "Note: The data analysis tool is currently unavailable.\n"
826
+
827
+
828
+ # Always call the LLM to formulate the final response
829
+ final_llm_response = await self._generate_enhanced_response(
830
+ query=user_query,
831
+ pandas_result=llm_context_from_pandas, # Pass the textual summary from PandasAI
832
+ query_type=query_type
833
+ )
834
+
835
+ # Return the LLM's response and the chart path if PandasAI was successful and generated one.
836
+ # If PandasAI failed, pandas_chart_path would be None.
837
+ # The final_llm_response should ideally explain any failures if pandas_text_output contained an error.
838
+ return {"text": final_llm_response, "image_path": pandas_chart_path if pandas_success else None}
839
+
840
  except Exception as e:
841
+ logging.error(f"Critical error in process_query: {e}", exc_info=True)
842
+ return {"text": f"I encountered a critical error while processing your request: {type(e).__name__}. Please check the logs.", "image_path": None}
843
+
844
 
845
  def update_dataframes(self, new_dataframes: Dict[str, pd.DataFrame]):
846
  """Updates the agent's DataFrames and reinitializes PandasAI agent"""