ibfs_demo / agent_studies.py
MarginallyEffective's picture
Upload folder using huggingface_hub
55b0a04 verified
import os
import json
import uuid
import random
import yaml
import litellm
import tqdm
import concurrent.futures
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
from pathlib import Path
# Reuse components from the existing codebase
from prompts import PROMPTS, format_prompt, load_prompts
from utils import save_results, generate_user_id
from ibfs import generate_strategies, answer_query
from zero_shot import zero_shot_answer
# Load environment variables and prompts
load_prompts()
class UserAgent:
"""Simulates a user with a preferred answer and decision-making behavior."""
def __init__(self, llm_model: str = "gpt-4o", epsilon: float = 0.2):
"""
Initialize the UserAgent with properties.
Args:
llm_model: The LLM model to use for agent decisions
epsilon: Probability of making a random choice instead of optimal
"""
self.llm_model = llm_model
self.epsilon = epsilon
self.preferred_answer = None
self.query = None
self.id = generate_user_id()
def set_preferences(self, query: str):
"""
Set a query and generate the preferred answer for this user.
Args:
query: The question the user wants answered
"""
self.query = query
# Generate the user's preferred answer using the LLM
messages = [
{"role": "system",
"content": "You are generating a preferred answer that a user has in mind for their query. This represents what the user is hoping to learn or the perspective they're hoping to see."},
{"role": "user",
"content": f"For the query: '{query}', generate a detailed, thoughtful answer that will serve as the user's preferred answer. This is the information or perspective they are hoping to find. Make it 20 words."}
]
response = litellm.completion(
model=self.llm_model,
messages=messages,
max_tokens=1000
)
self.preferred_answer = response.choices[0].message.content
return self.preferred_answer
def choose_strategy(self, strategies: List[str]) -> int:
"""
Choose a strategy from the provided options.
Args:
strategies: List of strategy descriptions
Returns:
Index of the chosen strategy (0-based)
"""
# With probability epsilon, make a random choice
if random.random() < self.epsilon:
return random.randint(0, len(strategies) - 1)
# Otherwise, evaluate which strategy gets closest to preferred answer
if not self.preferred_answer or not strategies:
return 0 # Default to first option if no preference or strategies
# Prompt the LLM to rank the strategies based on similarity to preferred answer
strategy_list = "\n".join([f"{i + 1}. {s}" for i, s in enumerate(strategies)])
messages = [
{"role": "system",
"content": "You are helping a user select the strategy that would most likely lead to their preferred answer."},
{"role": "user", "content": f"""
Query: {self.query}
User's preferred answer: {self.preferred_answer}
Available strategies:
{strategy_list}
Which strategy (provide the number only) would most likely lead to an answer that matches the user's preferred answer? Respond with only a single number representing your choice.
"""}
]
try:
response = litellm.completion(
model=self.llm_model,
messages=messages,
temperature=0.2,
max_tokens=10
)
# Extract the chosen strategy number
content = response.choices[0].message.content.strip()
# Find the first number in the response
import re
match = re.search(r'\d+', content)
if match:
choice = int(match.group()) - 1 # Convert to 0-based index
# Ensure it's within bounds
if 0 <= choice < len(strategies):
return choice
# If we couldn't parse the response or it's out of bounds, make a random choice
return random.randint(0, len(strategies) - 1)
except Exception as e:
print(f"Error in choosing strategy: {e}")
# Fall back to random choice
return random.randint(0, len(strategies) - 1)
class IBFSAgent:
"""Implements the Interactive Best-First Search process."""
def __init__(self,
llm_model: str = "gpt-4o",
diversity_level: str = "medium",
branching_factor: int = 4,
max_depth: int = 2):
"""
Initialize the IBFSAgent with properties.
Args:
llm_model: The LLM model to use for generating candidates
diversity_level: How diverse the generated candidates should be (low, medium, high)
branching_factor: Number of candidates to generate at each step
max_depth: Maximum depth/iterations of the IBFS process
"""
self.llm_model = llm_model
self.diversity_level = diversity_level
self.branching_factor = branching_factor
self.max_depth = max_depth
self.id = generate_user_id()
# Set up diversity-specific prompts
self._setup_prompts()
def _setup_prompts(self):
"""Set up the candidate generation and refinement prompts based on diversity level."""
# Load the base prompts from PROMPTS dictionary
self.base_system_prompt = PROMPTS["ibfs"]["initial_strategies"]["system"]
self.base_user_prompt = PROMPTS["ibfs"]["initial_strategies"]["user"]
self.refinement_system_prompt = PROMPTS["ibfs"]["continuation_strategies"]["system"]
self.refinement_user_prompt = PROMPTS["ibfs"]["continuation_strategies"]["user"]
# Augment with diversity-specific instructions
diversity_instructions = {
"low": """
The strategies you generate can be similar to each other and explore related approaches.
There's no need to make them very different from each other.
""",
"medium": """
Each strategy should represent a somewhat different approach to answering the question.
Try to include some variety in the approaches.
""",
"high": """
Each strategy should represent a substantially different approach to answering the question.
Make sure the strategies are maximally diverse from each other - consider entirely different angles,
methodologies, perspectives, and areas of knowledge.
"""
}
# Add diversity instructions to the prompts
self.diversity_instructions = diversity_instructions[self.diversity_level]
def generate_strategies(self, query: str, current_path: List[str] = None) -> List[str]:
"""
Generate strategy options for the current step.
Args:
query: The user's query
current_path: List of previously selected strategies
Returns:
List of strategy descriptions
"""
if not current_path or len(current_path) == 0:
# Initial generation
system_prompt = self.base_system_prompt + "\n" + self.diversity_instructions
user_prompt = self.base_user_prompt
# Format the prompts
format_args = {
"query": query,
"k": self.branching_factor
}
else:
# Refinement of previously selected strategy
system_prompt = self.refinement_system_prompt + "\n" + self.diversity_instructions
user_prompt = self.refinement_user_prompt
# Format the prompts
format_args = {
"query": query,
"selected_strategy": current_path[-1],
"k": self.branching_factor
}
# Format the prompts
system_message = format_prompt(system_prompt, **format_args)
user_message = format_prompt(user_prompt, **format_args)
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": user_message}
]
try:
response = litellm.completion(
model=self.llm_model,
messages=messages,
temperature=0.7,
max_tokens=1000
)
content = response.choices[0].message.content
# Use the strategy parsing from ibfs.py
# Parse strategies using regex
import re
strategies = re.findall(r'\d+\.\s*(I can answer by[^\n\d]*(?:\n(?!\d+\.)[^\n]*)*)', content, re.IGNORECASE)
# If we didn't find enough strategies with that format, try alternative parsing
if len(strategies) < self.branching_factor:
strategies = re.findall(r'(?:^|\n)(I can answer by[^\n]*(?:\n(?!I can answer by)[^\n]*)*)', content,
re.IGNORECASE)
# Clean up the strategies
strategies = [s.strip() for s in strategies]
# Ensure we have exactly b strategies
if len(strategies) > self.branching_factor:
strategies = strategies[:self.branching_factor]
# If we still don't have enough strategies, create generic ones
while len(strategies) < self.branching_factor:
strategies.append(
f"I can answer by using approach #{len(strategies) + 1} (Note: Strategy generation incomplete)")
return strategies
except Exception as e:
print(f"Error generating strategies: {e}")
# Return fallback strategies
return [f"I can answer by approach #{i + 1} (Error: Could not generate strategies)" for i in
range(self.branching_factor)]
def generate_final_answer(self, query: str, strategy_path: List[str]) -> str:
"""
Generate the final answer based on the selected strategy path.
Args:
query: The original user query
strategy_path: List of selected strategies
Returns:
Final answer to the query
"""
if not strategy_path:
return "No strategy was selected to generate an answer."
final_strategy = strategy_path[-1]
# Use the answer_query function from ibfs.py
return answer_query(query, final_strategy)
def run_simulation(query: str,
user_agent: UserAgent,
ibfs_agent: IBFSAgent) -> Dict[str, Any]:
"""
Run a full simulation of a user interacting with the IBFS system.
Args:
query: The question to be answered
user_agent: The UserAgent instance
ibfs_agent: The IBFSAgent instance
Returns:
Dictionary containing the simulation results
"""
# Set up the user's preferred answer
user_agent.set_preferences(query)
# Initialize the strategy path
strategy_path = []
# Record all strategies presented and choices made
history = []
# Run through the IBFS process up to max_depth
for depth in range(ibfs_agent.max_depth):
# Generate strategies at this step
strategies = ibfs_agent.generate_strategies(query, strategy_path)
# Have the user agent choose a strategy
choice_idx = user_agent.choose_strategy(strategies)
chosen_strategy = strategies[choice_idx]
# Record this step
history.append({
"depth": depth,
"strategies": strategies,
"choice_idx": choice_idx,
"chosen_strategy": chosen_strategy
})
# Update the strategy path
strategy_path.append(chosen_strategy)
# Generate the final answer
final_answer = ibfs_agent.generate_final_answer(query, strategy_path)
# Create the simulation result
result = {
"query": query,
"user_id": user_agent.id,
"ibfs_id": ibfs_agent.id,
"user_preferred_answer": user_agent.preferred_answer,
"final_answer": final_answer,
"strategy_path": strategy_path,
"history": history,
"ibfs_config": {
"diversity_level": ibfs_agent.diversity_level,
"branching_factor": ibfs_agent.branching_factor,
"max_depth": ibfs_agent.max_depth
},
"user_config": {
"epsilon": user_agent.epsilon
},
"timestamp": datetime.now().isoformat()
}
return result
def evaluate_answer_similarity(answer1: str, answer2: str) -> float:
"""
Evaluate the similarity between two answers using the LLM.
Args:
answer1: First answer
answer2: Second answer
Returns:
Similarity score (0-1)
"""
messages = [
{"role": "system",
"content": "You are evaluating the similarity between a user's preferred answer and a generated answer. Provide a similarity score from 0 to 1, where 1 means identical in content and perspective, and 0 means completely different."},
{"role": "user", "content": f"""
Answer 1:
{answer1}
Answer 2:
{answer2}
On a scale from 0 to 1, how similar are these answers in terms of content, perspective, and key information?
Provide only a single number as your response.
"""}
]
try:
response = litellm.completion(
model="gpt-4o",
messages=messages,
temperature=0.1,
max_tokens=10
)
content = response.choices[0].message.content.strip()
# Extract the score from the response
import re
match = re.search(r'(\d+(\.\d+)?)', content)
if match:
score = float(match.group(1))
# Ensure it's in the range [0, 1]
return max(0, min(score, 1))
else:
# Default score if parsing fails
return 0.5
except Exception as e:
print(f"Error evaluating similarity: {e}")
return 0.5
def process_simulation(args):
"""
Process a single simulation for parallel execution.
Args:
args: Tuple containing (query, user_config, ibfs_config, experiment_id, sim_count)
Returns:
Simulation result
"""
query, user_config, ibfs_config, experiment_id, sim_count = args
try:
# Create agents with the current configuration
user_agent = UserAgent(epsilon=user_config["epsilon"])
ibfs_agent = IBFSAgent(
diversity_level=ibfs_config["diversity_level"],
branching_factor=ibfs_config["branching_factor"],
max_depth=ibfs_config["max_depth"]
)
# Run the simulation
result = run_simulation(query, user_agent, ibfs_agent)
# Add evaluation of similarity between preferred and final answers
similarity = evaluate_answer_similarity(
user_agent.preferred_answer,
result["final_answer"]
)
result["similarity_score"] = similarity
# Add metadata
result["experiment_id"] = experiment_id
result["simulation_id"] = sim_count
# Save individual result
os.makedirs("experiment_results", exist_ok=True)
simulation_id = f"{experiment_id}_sim_{sim_count}"
with open(f"experiment_results/{simulation_id}.json", "w") as f:
json.dump(result, f, indent=2)
return result
except Exception as e:
print(f"Error in simulation {sim_count}: {e}")
return {
"error": str(e),
"experiment_id": experiment_id,
"simulation_id": sim_count,
"query": query
}
def run_experiment(queries: List[str],
diversity_levels: List[str],
branching_factors: List[int],
max_depths: List[int],
epsilon_values: List[float],
repetitions: int = 3,
max_workers: int = 4) -> List[Dict[str, Any]]:
"""
Run a full experiment with different configurations using parallel processing.
Args:
queries: List of queries to test
diversity_levels: List of diversity levels to test
branching_factors: List of branching factors to test
max_depths: List of max depths to test
epsilon_values: List of epsilon values to test
repetitions: Number of repetitions for each configuration
max_workers: Maximum number of parallel workers
Returns:
List of results for all simulations
"""
# Create the results directory if it doesn't exist
os.makedirs("experiment_results", exist_ok=True)
# Generate a unique experiment ID
experiment_id = f"exp_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Create a config log for this experiment
config = {
"experiment_id": experiment_id,
"queries": queries,
"diversity_levels": diversity_levels,
"branching_factors": branching_factors,
"max_depths": max_depths,
"epsilon_values": epsilon_values,
"repetitions": repetitions,
"timestamp": datetime.now().isoformat()
}
# Save the configuration
with open(f"experiment_results/{experiment_id}_config.json", "w") as f:
json.dump(config, f, indent=2)
# Prepare all simulation configurations
simulation_args = []
sim_count = 0
for query in queries:
for diversity in diversity_levels:
for branching in branching_factors:
for depth in max_depths:
for epsilon in epsilon_values:
for rep in range(repetitions):
# Create configuration for this simulation
user_config = {"epsilon": epsilon}
ibfs_config = {
"diversity_level": diversity,
"branching_factor": branching,
"max_depth": depth
}
# Add to arguments list
simulation_args.append((query, user_config, ibfs_config, experiment_id, sim_count))
sim_count += 1
# Calculate total simulations
total_simulations = len(simulation_args)
# Run simulations in parallel with progress bar
results = []
with tqdm.tqdm(total=total_simulations, desc="Running simulations") as pbar:
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
# Submit all simulations and process as they complete
future_to_sim = {executor.submit(process_simulation, args): args for args in simulation_args}
for future in concurrent.futures.as_completed(future_to_sim):
result = future.result()
results.append(result)
pbar.update(1)
# Save aggregated results
with open(f"experiment_results/{experiment_id}_all_results.json", "w") as f:
json.dump(results, f, indent=2)
return results
def analyze_results(experiment_id: str) -> Dict[str, Any]:
"""
Analyze the results of an experiment.
Args:
experiment_id: ID of the experiment to analyze
Returns:
Dictionary with analysis results
"""
# Load all simulation results for this experiment
results = []
for filename in os.listdir("experiment_results"):
if filename.startswith(experiment_id) and not filename.endswith("config.json") and not filename.endswith(
"all_results.json") and not filename.endswith("analysis.json"):
with open(f"experiment_results/{filename}", "r") as f:
try:
results.append(json.load(f))
except json.JSONDecodeError:
print(f"Error loading {filename}")
# Aggregate metrics by configuration
aggregated = {}
for result in results:
if "error" in result:
continue # Skip failed simulations
# Create a key for this configuration
config_key = (
result["ibfs_config"]["diversity_level"],
result["ibfs_config"]["branching_factor"],
result["ibfs_config"]["max_depth"],
result["user_config"]["epsilon"]
)
# Initialize if this is the first result with this config
if config_key not in aggregated:
aggregated[config_key] = {
"similarity_scores": [],
"config": {
"diversity_level": result["ibfs_config"]["diversity_level"],
"branching_factor": result["ibfs_config"]["branching_factor"],
"max_depth": result["ibfs_config"]["max_depth"],
"epsilon": result["user_config"]["epsilon"]
},
"queries": []
}
# Add the similarity score
aggregated[config_key]["similarity_scores"].append(result["similarity_score"])
# Track queries for this configuration
if result["query"] not in aggregated[config_key]["queries"]:
aggregated[config_key]["queries"].append(result["query"])
# Calculate summary statistics
summary = []
for config_key, data in aggregated.items():
scores = data["similarity_scores"]
summary.append({
"config": data["config"],
"avg_similarity": sum(scores) / len(scores) if scores else 0,
"min_similarity": min(scores) if scores else 0,
"max_similarity": max(scores) if scores else 0,
"std_deviation": (sum((x - (sum(scores) / len(scores))) ** 2 for x in scores) / len(
scores)) ** 0.5 if scores else 0,
"num_samples": len(scores),
"queries_tested": len(data["queries"])
})
# Sort by average similarity (descending)
summary.sort(key=lambda x: x["avg_similarity"], reverse=True)
# Add query-specific analysis
query_analysis = {}
for result in results:
if "error" in result:
continue
query = result["query"]
if query not in query_analysis:
query_analysis[query] = {
"best_config": None,
"best_similarity": -1,
"configs_tested": 0,
"avg_similarity": 0,
"all_scores": []
}
# Track all scores for this query
query_analysis[query]["all_scores"].append(result["similarity_score"])
# Update best configuration for this query
if result["similarity_score"] > query_analysis[query]["best_similarity"]:
query_analysis[query]["best_similarity"] = result["similarity_score"]
query_analysis[query]["best_config"] = {
"diversity_level": result["ibfs_config"]["diversity_level"],
"branching_factor": result["ibfs_config"]["branching_factor"],
"max_depth": result["ibfs_config"]["max_depth"],
"epsilon": result["user_config"]["epsilon"]
}
# Calculate query statistics
for query, data in query_analysis.items():
scores = data["all_scores"]
data["avg_similarity"] = sum(scores) / len(scores) if scores else 0
data["configs_tested"] = len(scores)
# Remove the raw scores to keep the analysis file smaller
del data["all_scores"]
# Save the analysis
analysis = {
"experiment_id": experiment_id,
"total_simulations": len(results),
"summary": summary,
"query_analysis": query_analysis,
"timestamp": datetime.now().isoformat()
}
with open(f"experiment_results/{experiment_id}_analysis.json", "w") as f:
json.dump(analysis, f, indent=2)
return analysis
def compare_to_zero_shot(experiment_id: str, queries: List[str]) -> Dict[str, Any]:
"""
Compare IBFS results to zero-shot answers.
Args:
experiment_id: ID of the experiment to compare
queries: List of queries to test with zero-shot
Returns:
Comparison results
"""
# First, get zero-shot answers for all queries
zero_shot_results = []
print("Generating zero-shot answers...")
for query in tqdm.tqdm(queries):
try:
# Generate zero-shot answer using function from ibfs.py
answer = zero_shot_answer(query)
# Save the result
zero_shot_results.append({
"query": query,
"zero_shot_answer": answer
})
except Exception as e:
print(f"Error generating zero-shot answer for '{query}': {e}")
zero_shot_results.append({
"query": query,
"zero_shot_answer": f"Error: {str(e)}",
"error": True
})
# Load the IBFS experiment results
analysis = analyze_results(experiment_id)
# Load the best configurations from the analysis
best_config = analysis["summary"][0]["config"] if analysis["summary"] else None
# For each query, compare the best IBFS result to the zero-shot answer
comparison = []
print("Comparing zero-shot to IBFS results...")
for zero_shot_result in tqdm.tqdm(zero_shot_results):
query = zero_shot_result["query"]
zero_shot_answer = zero_shot_result["zero_shot_answer"]
# Find the best IBFS result for this query
query_data = analysis.get("query_analysis", {}).get(query, {})
best_config_for_query = query_data.get("best_config", best_config)
if best_config_for_query:
# Find a simulation with this configuration and query
matching_results = []
for filename in os.listdir("experiment_results"):
if filename.startswith(experiment_id) and not filename.endswith(
"config.json") and not filename.endswith("all_results.json") and not filename.endswith(
"analysis.json"):
try:
with open(f"experiment_results/{filename}", "r") as f:
result = json.load(f)
if (result.get("query") == query and
result.get("ibfs_config", {}).get("diversity_level") == best_config_for_query.get(
"diversity_level") and
result.get("ibfs_config", {}).get("branching_factor") == best_config_for_query.get(
"branching_factor") and
result.get("ibfs_config", {}).get("max_depth") == best_config_for_query.get(
"max_depth") and
result.get("user_config", {}).get("epsilon") == best_config_for_query.get(
"epsilon")):
matching_results.append(result)
except:
continue
# Use the best matching result (if any)
if matching_results:
# Sort by similarity score (descending)
matching_results.sort(key=lambda x: x.get("similarity_score", 0), reverse=True)
best_ibfs_result = matching_results[0]
# Compare zero-shot to user's preferred answer
preferred_answer = best_ibfs_result.get("user_preferred_answer", "")
zero_shot_similarity = evaluate_answer_similarity(preferred_answer, zero_shot_answer)
# Get the IBFS similarity (already calculated)
ibfs_similarity = best_ibfs_result.get("similarity_score", 0)
comparison.append({
"query": query,
"zero_shot_similarity": zero_shot_similarity,
"ibfs_similarity": ibfs_similarity,
"difference": ibfs_similarity - zero_shot_similarity,
"ibfs_config": best_config_for_query
})
else:
print(f"No valid configuration found for query: {query}")
# Calculate overall metrics
zero_shot_avg = sum(item["zero_shot_similarity"] for item in comparison) / len(comparison) if comparison else 0
ibfs_avg = sum(item["ibfs_similarity"] for item in comparison) / len(comparison) if comparison else 0
avg_difference = sum(item["difference"] for item in comparison) / len(comparison) if comparison else 0
# Save the comparison results
comparison_results = {
"experiment_id": experiment_id,
"zero_shot_avg_similarity": zero_shot_avg,
"ibfs_avg_similarity": ibfs_avg,
"avg_difference": avg_difference,
"query_comparisons": comparison,
"timestamp": datetime.now().isoformat()
}
with open(f"experiment_results/{experiment_id}_zero_shot_comparison.json", "w") as f:
json.dump(comparison_results, f, indent=2)
return comparison_results
if __name__ == "__main__":
# Ensure the prompts from YAML file are loaded
if not PROMPTS:
load_prompts()
# Sample queries to test
queries = [
"What are the environmental impacts of electric vehicles compared to traditional gasoline vehicles?",
"How has artificial intelligence changed the job market in the past decade?",
"What are the most effective strategies for reducing stress and anxiety?",
"What are the arguments for and against universal basic income?",
]
# Experiment parameters
diversity_levels = ["low", "medium", "high"]
branching_factors = [2, 4, 8]
max_depths = [1, 2, 4]
epsilon_values = [0.1, 0.3]
# Run a smaller test experiment
print("Running test experiment...")
test_results = run_experiment(
queries=queries[:1], # Just use the first query for testing
diversity_levels=diversity_levels[:2], # Test low and medium diversity
branching_factors=[2, 4], # Test b=2 and b=4
max_depths=[1, 2], # Test m=1 and m=2
epsilon_values=[0.2], # Test epsilon=0.2
repetitions=10, # Just 2 repetitions for testing
max_workers=7 # Use 2 parallel workers for testing
)