Spaces:
Sleeping
Sleeping
import os | |
import json | |
import uuid | |
import random | |
import yaml | |
import litellm | |
import tqdm | |
import concurrent.futures | |
from typing import List, Dict, Any, Optional, Tuple | |
from datetime import datetime | |
from pathlib import Path | |
# Reuse components from the existing codebase | |
from prompts import PROMPTS, format_prompt, load_prompts | |
from utils import save_results, generate_user_id | |
from ibfs import generate_strategies, answer_query | |
from zero_shot import zero_shot_answer | |
# Load environment variables and prompts | |
load_prompts() | |
class UserAgent: | |
"""Simulates a user with a preferred answer and decision-making behavior.""" | |
def __init__(self, llm_model: str = "gpt-4o", epsilon: float = 0.2): | |
""" | |
Initialize the UserAgent with properties. | |
Args: | |
llm_model: The LLM model to use for agent decisions | |
epsilon: Probability of making a random choice instead of optimal | |
""" | |
self.llm_model = llm_model | |
self.epsilon = epsilon | |
self.preferred_answer = None | |
self.query = None | |
self.id = generate_user_id() | |
def set_preferences(self, query: str): | |
""" | |
Set a query and generate the preferred answer for this user. | |
Args: | |
query: The question the user wants answered | |
""" | |
self.query = query | |
# Generate the user's preferred answer using the LLM | |
messages = [ | |
{"role": "system", | |
"content": "You are generating a preferred answer that a user has in mind for their query. This represents what the user is hoping to learn or the perspective they're hoping to see."}, | |
{"role": "user", | |
"content": f"For the query: '{query}', generate a detailed, thoughtful answer that will serve as the user's preferred answer. This is the information or perspective they are hoping to find. Make it 20 words."} | |
] | |
response = litellm.completion( | |
model=self.llm_model, | |
messages=messages, | |
max_tokens=1000 | |
) | |
self.preferred_answer = response.choices[0].message.content | |
return self.preferred_answer | |
def choose_strategy(self, strategies: List[str]) -> int: | |
""" | |
Choose a strategy from the provided options. | |
Args: | |
strategies: List of strategy descriptions | |
Returns: | |
Index of the chosen strategy (0-based) | |
""" | |
# With probability epsilon, make a random choice | |
if random.random() < self.epsilon: | |
return random.randint(0, len(strategies) - 1) | |
# Otherwise, evaluate which strategy gets closest to preferred answer | |
if not self.preferred_answer or not strategies: | |
return 0 # Default to first option if no preference or strategies | |
# Prompt the LLM to rank the strategies based on similarity to preferred answer | |
strategy_list = "\n".join([f"{i + 1}. {s}" for i, s in enumerate(strategies)]) | |
messages = [ | |
{"role": "system", | |
"content": "You are helping a user select the strategy that would most likely lead to their preferred answer."}, | |
{"role": "user", "content": f""" | |
Query: {self.query} | |
User's preferred answer: {self.preferred_answer} | |
Available strategies: | |
{strategy_list} | |
Which strategy (provide the number only) would most likely lead to an answer that matches the user's preferred answer? Respond with only a single number representing your choice. | |
"""} | |
] | |
try: | |
response = litellm.completion( | |
model=self.llm_model, | |
messages=messages, | |
temperature=0.2, | |
max_tokens=10 | |
) | |
# Extract the chosen strategy number | |
content = response.choices[0].message.content.strip() | |
# Find the first number in the response | |
import re | |
match = re.search(r'\d+', content) | |
if match: | |
choice = int(match.group()) - 1 # Convert to 0-based index | |
# Ensure it's within bounds | |
if 0 <= choice < len(strategies): | |
return choice | |
# If we couldn't parse the response or it's out of bounds, make a random choice | |
return random.randint(0, len(strategies) - 1) | |
except Exception as e: | |
print(f"Error in choosing strategy: {e}") | |
# Fall back to random choice | |
return random.randint(0, len(strategies) - 1) | |
class IBFSAgent: | |
"""Implements the Interactive Best-First Search process.""" | |
def __init__(self, | |
llm_model: str = "gpt-4o", | |
diversity_level: str = "medium", | |
branching_factor: int = 4, | |
max_depth: int = 2): | |
""" | |
Initialize the IBFSAgent with properties. | |
Args: | |
llm_model: The LLM model to use for generating candidates | |
diversity_level: How diverse the generated candidates should be (low, medium, high) | |
branching_factor: Number of candidates to generate at each step | |
max_depth: Maximum depth/iterations of the IBFS process | |
""" | |
self.llm_model = llm_model | |
self.diversity_level = diversity_level | |
self.branching_factor = branching_factor | |
self.max_depth = max_depth | |
self.id = generate_user_id() | |
# Set up diversity-specific prompts | |
self._setup_prompts() | |
def _setup_prompts(self): | |
"""Set up the candidate generation and refinement prompts based on diversity level.""" | |
# Load the base prompts from PROMPTS dictionary | |
self.base_system_prompt = PROMPTS["ibfs"]["initial_strategies"]["system"] | |
self.base_user_prompt = PROMPTS["ibfs"]["initial_strategies"]["user"] | |
self.refinement_system_prompt = PROMPTS["ibfs"]["continuation_strategies"]["system"] | |
self.refinement_user_prompt = PROMPTS["ibfs"]["continuation_strategies"]["user"] | |
# Augment with diversity-specific instructions | |
diversity_instructions = { | |
"low": """ | |
The strategies you generate can be similar to each other and explore related approaches. | |
There's no need to make them very different from each other. | |
""", | |
"medium": """ | |
Each strategy should represent a somewhat different approach to answering the question. | |
Try to include some variety in the approaches. | |
""", | |
"high": """ | |
Each strategy should represent a substantially different approach to answering the question. | |
Make sure the strategies are maximally diverse from each other - consider entirely different angles, | |
methodologies, perspectives, and areas of knowledge. | |
""" | |
} | |
# Add diversity instructions to the prompts | |
self.diversity_instructions = diversity_instructions[self.diversity_level] | |
def generate_strategies(self, query: str, current_path: List[str] = None) -> List[str]: | |
""" | |
Generate strategy options for the current step. | |
Args: | |
query: The user's query | |
current_path: List of previously selected strategies | |
Returns: | |
List of strategy descriptions | |
""" | |
if not current_path or len(current_path) == 0: | |
# Initial generation | |
system_prompt = self.base_system_prompt + "\n" + self.diversity_instructions | |
user_prompt = self.base_user_prompt | |
# Format the prompts | |
format_args = { | |
"query": query, | |
"k": self.branching_factor | |
} | |
else: | |
# Refinement of previously selected strategy | |
system_prompt = self.refinement_system_prompt + "\n" + self.diversity_instructions | |
user_prompt = self.refinement_user_prompt | |
# Format the prompts | |
format_args = { | |
"query": query, | |
"selected_strategy": current_path[-1], | |
"k": self.branching_factor | |
} | |
# Format the prompts | |
system_message = format_prompt(system_prompt, **format_args) | |
user_message = format_prompt(user_prompt, **format_args) | |
messages = [ | |
{"role": "system", "content": system_message}, | |
{"role": "user", "content": user_message} | |
] | |
try: | |
response = litellm.completion( | |
model=self.llm_model, | |
messages=messages, | |
temperature=0.7, | |
max_tokens=1000 | |
) | |
content = response.choices[0].message.content | |
# Use the strategy parsing from ibfs.py | |
# Parse strategies using regex | |
import re | |
strategies = re.findall(r'\d+\.\s*(I can answer by[^\n\d]*(?:\n(?!\d+\.)[^\n]*)*)', content, re.IGNORECASE) | |
# If we didn't find enough strategies with that format, try alternative parsing | |
if len(strategies) < self.branching_factor: | |
strategies = re.findall(r'(?:^|\n)(I can answer by[^\n]*(?:\n(?!I can answer by)[^\n]*)*)', content, | |
re.IGNORECASE) | |
# Clean up the strategies | |
strategies = [s.strip() for s in strategies] | |
# Ensure we have exactly b strategies | |
if len(strategies) > self.branching_factor: | |
strategies = strategies[:self.branching_factor] | |
# If we still don't have enough strategies, create generic ones | |
while len(strategies) < self.branching_factor: | |
strategies.append( | |
f"I can answer by using approach #{len(strategies) + 1} (Note: Strategy generation incomplete)") | |
return strategies | |
except Exception as e: | |
print(f"Error generating strategies: {e}") | |
# Return fallback strategies | |
return [f"I can answer by approach #{i + 1} (Error: Could not generate strategies)" for i in | |
range(self.branching_factor)] | |
def generate_final_answer(self, query: str, strategy_path: List[str]) -> str: | |
""" | |
Generate the final answer based on the selected strategy path. | |
Args: | |
query: The original user query | |
strategy_path: List of selected strategies | |
Returns: | |
Final answer to the query | |
""" | |
if not strategy_path: | |
return "No strategy was selected to generate an answer." | |
final_strategy = strategy_path[-1] | |
# Use the answer_query function from ibfs.py | |
return answer_query(query, final_strategy) | |
def run_simulation(query: str, | |
user_agent: UserAgent, | |
ibfs_agent: IBFSAgent) -> Dict[str, Any]: | |
""" | |
Run a full simulation of a user interacting with the IBFS system. | |
Args: | |
query: The question to be answered | |
user_agent: The UserAgent instance | |
ibfs_agent: The IBFSAgent instance | |
Returns: | |
Dictionary containing the simulation results | |
""" | |
# Set up the user's preferred answer | |
user_agent.set_preferences(query) | |
# Initialize the strategy path | |
strategy_path = [] | |
# Record all strategies presented and choices made | |
history = [] | |
# Run through the IBFS process up to max_depth | |
for depth in range(ibfs_agent.max_depth): | |
# Generate strategies at this step | |
strategies = ibfs_agent.generate_strategies(query, strategy_path) | |
# Have the user agent choose a strategy | |
choice_idx = user_agent.choose_strategy(strategies) | |
chosen_strategy = strategies[choice_idx] | |
# Record this step | |
history.append({ | |
"depth": depth, | |
"strategies": strategies, | |
"choice_idx": choice_idx, | |
"chosen_strategy": chosen_strategy | |
}) | |
# Update the strategy path | |
strategy_path.append(chosen_strategy) | |
# Generate the final answer | |
final_answer = ibfs_agent.generate_final_answer(query, strategy_path) | |
# Create the simulation result | |
result = { | |
"query": query, | |
"user_id": user_agent.id, | |
"ibfs_id": ibfs_agent.id, | |
"user_preferred_answer": user_agent.preferred_answer, | |
"final_answer": final_answer, | |
"strategy_path": strategy_path, | |
"history": history, | |
"ibfs_config": { | |
"diversity_level": ibfs_agent.diversity_level, | |
"branching_factor": ibfs_agent.branching_factor, | |
"max_depth": ibfs_agent.max_depth | |
}, | |
"user_config": { | |
"epsilon": user_agent.epsilon | |
}, | |
"timestamp": datetime.now().isoformat() | |
} | |
return result | |
def evaluate_answer_similarity(answer1: str, answer2: str) -> float: | |
""" | |
Evaluate the similarity between two answers using the LLM. | |
Args: | |
answer1: First answer | |
answer2: Second answer | |
Returns: | |
Similarity score (0-1) | |
""" | |
messages = [ | |
{"role": "system", | |
"content": "You are evaluating the similarity between a user's preferred answer and a generated answer. Provide a similarity score from 0 to 1, where 1 means identical in content and perspective, and 0 means completely different."}, | |
{"role": "user", "content": f""" | |
Answer 1: | |
{answer1} | |
Answer 2: | |
{answer2} | |
On a scale from 0 to 1, how similar are these answers in terms of content, perspective, and key information? | |
Provide only a single number as your response. | |
"""} | |
] | |
try: | |
response = litellm.completion( | |
model="gpt-4o", | |
messages=messages, | |
temperature=0.1, | |
max_tokens=10 | |
) | |
content = response.choices[0].message.content.strip() | |
# Extract the score from the response | |
import re | |
match = re.search(r'(\d+(\.\d+)?)', content) | |
if match: | |
score = float(match.group(1)) | |
# Ensure it's in the range [0, 1] | |
return max(0, min(score, 1)) | |
else: | |
# Default score if parsing fails | |
return 0.5 | |
except Exception as e: | |
print(f"Error evaluating similarity: {e}") | |
return 0.5 | |
def process_simulation(args): | |
""" | |
Process a single simulation for parallel execution. | |
Args: | |
args: Tuple containing (query, user_config, ibfs_config, experiment_id, sim_count) | |
Returns: | |
Simulation result | |
""" | |
query, user_config, ibfs_config, experiment_id, sim_count = args | |
try: | |
# Create agents with the current configuration | |
user_agent = UserAgent(epsilon=user_config["epsilon"]) | |
ibfs_agent = IBFSAgent( | |
diversity_level=ibfs_config["diversity_level"], | |
branching_factor=ibfs_config["branching_factor"], | |
max_depth=ibfs_config["max_depth"] | |
) | |
# Run the simulation | |
result = run_simulation(query, user_agent, ibfs_agent) | |
# Add evaluation of similarity between preferred and final answers | |
similarity = evaluate_answer_similarity( | |
user_agent.preferred_answer, | |
result["final_answer"] | |
) | |
result["similarity_score"] = similarity | |
# Add metadata | |
result["experiment_id"] = experiment_id | |
result["simulation_id"] = sim_count | |
# Save individual result | |
os.makedirs("experiment_results", exist_ok=True) | |
simulation_id = f"{experiment_id}_sim_{sim_count}" | |
with open(f"experiment_results/{simulation_id}.json", "w") as f: | |
json.dump(result, f, indent=2) | |
return result | |
except Exception as e: | |
print(f"Error in simulation {sim_count}: {e}") | |
return { | |
"error": str(e), | |
"experiment_id": experiment_id, | |
"simulation_id": sim_count, | |
"query": query | |
} | |
def run_experiment(queries: List[str], | |
diversity_levels: List[str], | |
branching_factors: List[int], | |
max_depths: List[int], | |
epsilon_values: List[float], | |
repetitions: int = 3, | |
max_workers: int = 4) -> List[Dict[str, Any]]: | |
""" | |
Run a full experiment with different configurations using parallel processing. | |
Args: | |
queries: List of queries to test | |
diversity_levels: List of diversity levels to test | |
branching_factors: List of branching factors to test | |
max_depths: List of max depths to test | |
epsilon_values: List of epsilon values to test | |
repetitions: Number of repetitions for each configuration | |
max_workers: Maximum number of parallel workers | |
Returns: | |
List of results for all simulations | |
""" | |
# Create the results directory if it doesn't exist | |
os.makedirs("experiment_results", exist_ok=True) | |
# Generate a unique experiment ID | |
experiment_id = f"exp_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
# Create a config log for this experiment | |
config = { | |
"experiment_id": experiment_id, | |
"queries": queries, | |
"diversity_levels": diversity_levels, | |
"branching_factors": branching_factors, | |
"max_depths": max_depths, | |
"epsilon_values": epsilon_values, | |
"repetitions": repetitions, | |
"timestamp": datetime.now().isoformat() | |
} | |
# Save the configuration | |
with open(f"experiment_results/{experiment_id}_config.json", "w") as f: | |
json.dump(config, f, indent=2) | |
# Prepare all simulation configurations | |
simulation_args = [] | |
sim_count = 0 | |
for query in queries: | |
for diversity in diversity_levels: | |
for branching in branching_factors: | |
for depth in max_depths: | |
for epsilon in epsilon_values: | |
for rep in range(repetitions): | |
# Create configuration for this simulation | |
user_config = {"epsilon": epsilon} | |
ibfs_config = { | |
"diversity_level": diversity, | |
"branching_factor": branching, | |
"max_depth": depth | |
} | |
# Add to arguments list | |
simulation_args.append((query, user_config, ibfs_config, experiment_id, sim_count)) | |
sim_count += 1 | |
# Calculate total simulations | |
total_simulations = len(simulation_args) | |
# Run simulations in parallel with progress bar | |
results = [] | |
with tqdm.tqdm(total=total_simulations, desc="Running simulations") as pbar: | |
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: | |
# Submit all simulations and process as they complete | |
future_to_sim = {executor.submit(process_simulation, args): args for args in simulation_args} | |
for future in concurrent.futures.as_completed(future_to_sim): | |
result = future.result() | |
results.append(result) | |
pbar.update(1) | |
# Save aggregated results | |
with open(f"experiment_results/{experiment_id}_all_results.json", "w") as f: | |
json.dump(results, f, indent=2) | |
return results | |
def analyze_results(experiment_id: str) -> Dict[str, Any]: | |
""" | |
Analyze the results of an experiment. | |
Args: | |
experiment_id: ID of the experiment to analyze | |
Returns: | |
Dictionary with analysis results | |
""" | |
# Load all simulation results for this experiment | |
results = [] | |
for filename in os.listdir("experiment_results"): | |
if filename.startswith(experiment_id) and not filename.endswith("config.json") and not filename.endswith( | |
"all_results.json") and not filename.endswith("analysis.json"): | |
with open(f"experiment_results/{filename}", "r") as f: | |
try: | |
results.append(json.load(f)) | |
except json.JSONDecodeError: | |
print(f"Error loading {filename}") | |
# Aggregate metrics by configuration | |
aggregated = {} | |
for result in results: | |
if "error" in result: | |
continue # Skip failed simulations | |
# Create a key for this configuration | |
config_key = ( | |
result["ibfs_config"]["diversity_level"], | |
result["ibfs_config"]["branching_factor"], | |
result["ibfs_config"]["max_depth"], | |
result["user_config"]["epsilon"] | |
) | |
# Initialize if this is the first result with this config | |
if config_key not in aggregated: | |
aggregated[config_key] = { | |
"similarity_scores": [], | |
"config": { | |
"diversity_level": result["ibfs_config"]["diversity_level"], | |
"branching_factor": result["ibfs_config"]["branching_factor"], | |
"max_depth": result["ibfs_config"]["max_depth"], | |
"epsilon": result["user_config"]["epsilon"] | |
}, | |
"queries": [] | |
} | |
# Add the similarity score | |
aggregated[config_key]["similarity_scores"].append(result["similarity_score"]) | |
# Track queries for this configuration | |
if result["query"] not in aggregated[config_key]["queries"]: | |
aggregated[config_key]["queries"].append(result["query"]) | |
# Calculate summary statistics | |
summary = [] | |
for config_key, data in aggregated.items(): | |
scores = data["similarity_scores"] | |
summary.append({ | |
"config": data["config"], | |
"avg_similarity": sum(scores) / len(scores) if scores else 0, | |
"min_similarity": min(scores) if scores else 0, | |
"max_similarity": max(scores) if scores else 0, | |
"std_deviation": (sum((x - (sum(scores) / len(scores))) ** 2 for x in scores) / len( | |
scores)) ** 0.5 if scores else 0, | |
"num_samples": len(scores), | |
"queries_tested": len(data["queries"]) | |
}) | |
# Sort by average similarity (descending) | |
summary.sort(key=lambda x: x["avg_similarity"], reverse=True) | |
# Add query-specific analysis | |
query_analysis = {} | |
for result in results: | |
if "error" in result: | |
continue | |
query = result["query"] | |
if query not in query_analysis: | |
query_analysis[query] = { | |
"best_config": None, | |
"best_similarity": -1, | |
"configs_tested": 0, | |
"avg_similarity": 0, | |
"all_scores": [] | |
} | |
# Track all scores for this query | |
query_analysis[query]["all_scores"].append(result["similarity_score"]) | |
# Update best configuration for this query | |
if result["similarity_score"] > query_analysis[query]["best_similarity"]: | |
query_analysis[query]["best_similarity"] = result["similarity_score"] | |
query_analysis[query]["best_config"] = { | |
"diversity_level": result["ibfs_config"]["diversity_level"], | |
"branching_factor": result["ibfs_config"]["branching_factor"], | |
"max_depth": result["ibfs_config"]["max_depth"], | |
"epsilon": result["user_config"]["epsilon"] | |
} | |
# Calculate query statistics | |
for query, data in query_analysis.items(): | |
scores = data["all_scores"] | |
data["avg_similarity"] = sum(scores) / len(scores) if scores else 0 | |
data["configs_tested"] = len(scores) | |
# Remove the raw scores to keep the analysis file smaller | |
del data["all_scores"] | |
# Save the analysis | |
analysis = { | |
"experiment_id": experiment_id, | |
"total_simulations": len(results), | |
"summary": summary, | |
"query_analysis": query_analysis, | |
"timestamp": datetime.now().isoformat() | |
} | |
with open(f"experiment_results/{experiment_id}_analysis.json", "w") as f: | |
json.dump(analysis, f, indent=2) | |
return analysis | |
def compare_to_zero_shot(experiment_id: str, queries: List[str]) -> Dict[str, Any]: | |
""" | |
Compare IBFS results to zero-shot answers. | |
Args: | |
experiment_id: ID of the experiment to compare | |
queries: List of queries to test with zero-shot | |
Returns: | |
Comparison results | |
""" | |
# First, get zero-shot answers for all queries | |
zero_shot_results = [] | |
print("Generating zero-shot answers...") | |
for query in tqdm.tqdm(queries): | |
try: | |
# Generate zero-shot answer using function from ibfs.py | |
answer = zero_shot_answer(query) | |
# Save the result | |
zero_shot_results.append({ | |
"query": query, | |
"zero_shot_answer": answer | |
}) | |
except Exception as e: | |
print(f"Error generating zero-shot answer for '{query}': {e}") | |
zero_shot_results.append({ | |
"query": query, | |
"zero_shot_answer": f"Error: {str(e)}", | |
"error": True | |
}) | |
# Load the IBFS experiment results | |
analysis = analyze_results(experiment_id) | |
# Load the best configurations from the analysis | |
best_config = analysis["summary"][0]["config"] if analysis["summary"] else None | |
# For each query, compare the best IBFS result to the zero-shot answer | |
comparison = [] | |
print("Comparing zero-shot to IBFS results...") | |
for zero_shot_result in tqdm.tqdm(zero_shot_results): | |
query = zero_shot_result["query"] | |
zero_shot_answer = zero_shot_result["zero_shot_answer"] | |
# Find the best IBFS result for this query | |
query_data = analysis.get("query_analysis", {}).get(query, {}) | |
best_config_for_query = query_data.get("best_config", best_config) | |
if best_config_for_query: | |
# Find a simulation with this configuration and query | |
matching_results = [] | |
for filename in os.listdir("experiment_results"): | |
if filename.startswith(experiment_id) and not filename.endswith( | |
"config.json") and not filename.endswith("all_results.json") and not filename.endswith( | |
"analysis.json"): | |
try: | |
with open(f"experiment_results/{filename}", "r") as f: | |
result = json.load(f) | |
if (result.get("query") == query and | |
result.get("ibfs_config", {}).get("diversity_level") == best_config_for_query.get( | |
"diversity_level") and | |
result.get("ibfs_config", {}).get("branching_factor") == best_config_for_query.get( | |
"branching_factor") and | |
result.get("ibfs_config", {}).get("max_depth") == best_config_for_query.get( | |
"max_depth") and | |
result.get("user_config", {}).get("epsilon") == best_config_for_query.get( | |
"epsilon")): | |
matching_results.append(result) | |
except: | |
continue | |
# Use the best matching result (if any) | |
if matching_results: | |
# Sort by similarity score (descending) | |
matching_results.sort(key=lambda x: x.get("similarity_score", 0), reverse=True) | |
best_ibfs_result = matching_results[0] | |
# Compare zero-shot to user's preferred answer | |
preferred_answer = best_ibfs_result.get("user_preferred_answer", "") | |
zero_shot_similarity = evaluate_answer_similarity(preferred_answer, zero_shot_answer) | |
# Get the IBFS similarity (already calculated) | |
ibfs_similarity = best_ibfs_result.get("similarity_score", 0) | |
comparison.append({ | |
"query": query, | |
"zero_shot_similarity": zero_shot_similarity, | |
"ibfs_similarity": ibfs_similarity, | |
"difference": ibfs_similarity - zero_shot_similarity, | |
"ibfs_config": best_config_for_query | |
}) | |
else: | |
print(f"No valid configuration found for query: {query}") | |
# Calculate overall metrics | |
zero_shot_avg = sum(item["zero_shot_similarity"] for item in comparison) / len(comparison) if comparison else 0 | |
ibfs_avg = sum(item["ibfs_similarity"] for item in comparison) / len(comparison) if comparison else 0 | |
avg_difference = sum(item["difference"] for item in comparison) / len(comparison) if comparison else 0 | |
# Save the comparison results | |
comparison_results = { | |
"experiment_id": experiment_id, | |
"zero_shot_avg_similarity": zero_shot_avg, | |
"ibfs_avg_similarity": ibfs_avg, | |
"avg_difference": avg_difference, | |
"query_comparisons": comparison, | |
"timestamp": datetime.now().isoformat() | |
} | |
with open(f"experiment_results/{experiment_id}_zero_shot_comparison.json", "w") as f: | |
json.dump(comparison_results, f, indent=2) | |
return comparison_results | |
if __name__ == "__main__": | |
# Ensure the prompts from YAML file are loaded | |
if not PROMPTS: | |
load_prompts() | |
# Sample queries to test | |
queries = [ | |
"What are the environmental impacts of electric vehicles compared to traditional gasoline vehicles?", | |
"How has artificial intelligence changed the job market in the past decade?", | |
"What are the most effective strategies for reducing stress and anxiety?", | |
"What are the arguments for and against universal basic income?", | |
] | |
# Experiment parameters | |
diversity_levels = ["low", "medium", "high"] | |
branching_factors = [2, 4, 8] | |
max_depths = [1, 2, 4] | |
epsilon_values = [0.1, 0.3] | |
# Run a smaller test experiment | |
print("Running test experiment...") | |
test_results = run_experiment( | |
queries=queries[:1], # Just use the first query for testing | |
diversity_levels=diversity_levels[:2], # Test low and medium diversity | |
branching_factors=[2, 4], # Test b=2 and b=4 | |
max_depths=[1, 2], # Test m=1 and m=2 | |
epsilon_values=[0.2], # Test epsilon=0.2 | |
repetitions=10, # Just 2 repetitions for testing | |
max_workers=7 # Use 2 parallel workers for testing | |
) | |