Spaces:
Sleeping
Sleeping
""" | |
Trajectory Data Management Module for Agent Tuning Optimization Framework | |
This module provides functionality for loading, processing, and managing agent interaction | |
trajectories for training and evaluation purposes. | |
""" | |
import os | |
import json | |
import pandas as pd | |
import numpy as np | |
from typing import List, Dict, Any, Union, Optional, Tuple | |
from tqdm import tqdm | |
class Trajectory: | |
"""Class representing a single agent interaction trajectory.""" | |
def __init__( | |
self, | |
task_description: str, | |
interactions: List[Dict[str, str]], | |
metadata: Optional[Dict[str, Any]] = None | |
): | |
""" | |
Initialize a trajectory. | |
Args: | |
task_description: Description of the task | |
interactions: List of interaction turns (each with 'user' and 'agent' keys) | |
metadata: Additional metadata about the trajectory | |
""" | |
self.task_description = task_description | |
self.interactions = interactions | |
self.metadata = metadata or {} | |
self.quality_score = self.metadata.get('quality_score', None) | |
self.is_positive = self.metadata.get('is_positive', True) | |
def to_dict(self) -> Dict[str, Any]: | |
""" | |
Convert trajectory to dictionary. | |
Returns: | |
Dictionary representation of the trajectory | |
""" | |
return { | |
'task_description': self.task_description, | |
'interactions': self.interactions, | |
'metadata': self.metadata | |
} | |
def from_dict(cls, data: Dict[str, Any]) -> 'Trajectory': | |
""" | |
Create trajectory from dictionary. | |
Args: | |
data: Dictionary representation of the trajectory | |
Returns: | |
Trajectory instance | |
""" | |
return cls( | |
task_description=data['task_description'], | |
interactions=data['interactions'], | |
metadata=data.get('metadata', {}) | |
) | |
def to_training_format(self, format_type: str = 'interleaved') -> str: | |
""" | |
Convert trajectory to training format. | |
Args: | |
format_type: Format type ('interleaved', 'completion', etc.) | |
Returns: | |
Formatted trajectory as string | |
""" | |
if format_type == 'interleaved': | |
# Format as interleaved conversation | |
result = f"Task: {self.task_description}\n\n" | |
for i, interaction in enumerate(self.interactions): | |
result += f"User: {interaction['user']}\n" | |
result += f"Agent: {interaction['agent']}\n\n" | |
return result.strip() | |
elif format_type == 'completion': | |
# Format as completion task (last agent response is the target) | |
if not self.interactions: | |
return "" | |
result = f"Task: {self.task_description}\n\n" | |
for i, interaction in enumerate(self.interactions[:-1]): | |
result += f"User: {interaction['user']}\n" | |
result += f"Agent: {interaction['agent']}\n\n" | |
# Add last user query without agent response | |
result += f"User: {self.interactions[-1]['user']}\n" | |
result += f"Agent:" | |
return result.strip(), self.interactions[-1]['agent'].strip() | |
else: | |
raise ValueError(f"Unsupported format type: {format_type}") | |
def get_quality_score(self) -> float: | |
""" | |
Get quality score for the trajectory. | |
Returns: | |
Quality score (0.0 to 1.0) | |
""" | |
if self.quality_score is not None: | |
return self.quality_score | |
# Calculate simple quality score based on response length and complexity | |
score = 0.0 | |
if not self.interactions: | |
return score | |
# Average response length (normalized) | |
avg_length = np.mean([len(turn['agent']) for turn in self.interactions]) | |
length_score = min(avg_length / 500, 1.0) # Normalize to max of 500 chars | |
# Response complexity (simple heuristic based on unique words) | |
all_responses = " ".join([turn['agent'] for turn in self.interactions]) | |
unique_words = len(set(all_responses.lower().split())) | |
complexity_score = min(unique_words / 200, 1.0) # Normalize to max of 200 unique words | |
# Combine scores | |
score = 0.6 * length_score + 0.4 * complexity_score | |
# Cache the score | |
self.quality_score = score | |
self.metadata['quality_score'] = score | |
return score | |
class TrajectoryDataset: | |
"""Dataset for managing collections of agent interaction trajectories.""" | |
def __init__(self, name: str): | |
""" | |
Initialize the trajectory dataset. | |
Args: | |
name: Name of the dataset | |
""" | |
self.name = name | |
self.trajectories: List[Trajectory] = [] | |
self.positive_trajectories: List[Trajectory] = [] | |
self.negative_trajectories: List[Trajectory] = [] | |
def add_trajectory(self, trajectory: Trajectory) -> None: | |
""" | |
Add a trajectory to the dataset. | |
Args: | |
trajectory: Trajectory to add | |
""" | |
self.trajectories.append(trajectory) | |
# Add to positive or negative list based on metadata | |
if trajectory.is_positive: | |
self.positive_trajectories.append(trajectory) | |
else: | |
self.negative_trajectories.append(trajectory) | |
def load_from_json(self, file_path: str) -> None: | |
""" | |
Load trajectories from JSON file. | |
Args: | |
file_path: Path to JSON file | |
""" | |
with open(file_path, 'r') as f: | |
data = json.load(f) | |
if isinstance(data, list): | |
# List of trajectories | |
for item in data: | |
self.add_trajectory(Trajectory.from_dict(item)) | |
elif isinstance(data, dict) and 'trajectories' in data: | |
# Dictionary with trajectories key | |
for item in data['trajectories']: | |
self.add_trajectory(Trajectory.from_dict(item)) | |
else: | |
raise ValueError(f"Unsupported JSON format in {file_path}") | |
def save_to_json(self, file_path: str) -> None: | |
""" | |
Save trajectories to JSON file. | |
Args: | |
file_path: Path to JSON file | |
""" | |
data = { | |
'name': self.name, | |
'trajectories': [t.to_dict() for t in self.trajectories] | |
} | |
with open(file_path, 'w') as f: | |
json.dump(data, f, indent=2) | |
def get_trajectories( | |
self, | |
positive_only: bool = False, | |
negative_only: bool = False, | |
min_quality: Optional[float] = None, | |
max_samples: Optional[int] = None | |
) -> List[Trajectory]: | |
""" | |
Get trajectories based on filtering criteria. | |
Args: | |
positive_only: Whether to return only positive trajectories | |
negative_only: Whether to return only negative trajectories | |
min_quality: Minimum quality score threshold | |
max_samples: Maximum number of samples to return | |
Returns: | |
Filtered list of trajectories | |
""" | |
if positive_only and negative_only: | |
raise ValueError("Cannot set both positive_only and negative_only to True") | |
# Select base list | |
if positive_only: | |
trajectories = self.positive_trajectories.copy() | |
elif negative_only: | |
trajectories = self.negative_trajectories.copy() | |
else: | |
trajectories = self.trajectories.copy() | |
# Apply quality filter | |
if min_quality is not None: | |
trajectories = [t for t in trajectories if t.get_quality_score() >= min_quality] | |
# Apply max samples limit | |
if max_samples is not None and max_samples < len(trajectories): | |
trajectories = trajectories[:max_samples] | |
return trajectories | |
def get_training_examples( | |
self, | |
format_type: str = 'interleaved', | |
positive_ratio: float = 0.8, | |
min_quality: Optional[float] = 0.5, | |
max_samples: Optional[int] = None | |
) -> Union[List[str], Tuple[List[str], List[str]]]: | |
""" | |
Get formatted training examples from trajectories. | |
Args: | |
format_type: Format type ('interleaved', 'completion', etc.) | |
positive_ratio: Ratio of positive to total examples | |
min_quality: Minimum quality score threshold | |
max_samples: Maximum number of samples to return | |
Returns: | |
Formatted training examples (format depends on format_type) | |
""" | |
# Get positive and negative trajectories | |
positive = self.get_trajectories(positive_only=True, min_quality=min_quality) | |
negative = self.get_trajectories(negative_only=True) | |
# Calculate sample counts | |
if max_samples is not None: | |
pos_count = int(max_samples * positive_ratio) | |
neg_count = max_samples - pos_count | |
else: | |
pos_count = len(positive) | |
neg_count = len(negative) | |
# Sample trajectories | |
if pos_count < len(positive): | |
positive = np.random.choice(positive, pos_count, replace=False).tolist() | |
if neg_count < len(negative): | |
negative = np.random.choice(negative, neg_count, replace=False).tolist() | |
# Format trajectories | |
if format_type == 'interleaved': | |
pos_examples = [t.to_training_format(format_type) for t in positive] | |
neg_examples = [t.to_training_format(format_type) for t in negative] | |
return pos_examples + neg_examples | |
elif format_type == 'completion': | |
pos_inputs = [] | |
pos_targets = [] | |
for t in positive: | |
inp, target = t.to_training_format(format_type) | |
pos_inputs.append(inp) | |
pos_targets.append(target) | |
neg_inputs = [] | |
neg_targets = [] | |
for t in negative: | |
inp, target = t.to_training_format(format_type) | |
neg_inputs.append(inp) | |
neg_targets.append(target) | |
return pos_inputs + neg_inputs, pos_targets + neg_targets | |
else: | |
raise ValueError(f"Unsupported format type: {format_type}") | |
def analyze_dataset(self) -> Dict[str, Any]: | |
""" | |
Analyze the dataset and return statistics. | |
Returns: | |
Dictionary of dataset statistics | |
""" | |
if not self.trajectories: | |
return { | |
'total_trajectories': 0, | |
'positive_count': 0, | |
'negative_count': 0 | |
} | |
# Basic counts | |
total = len(self.trajectories) | |
positive_count = len(self.positive_trajectories) | |
negative_count = len(self.negative_trajectories) | |
# Quality statistics | |
quality_scores = [t.get_quality_score() for t in self.trajectories] | |
avg_quality = np.mean(quality_scores) | |
min_quality = np.min(quality_scores) | |
max_quality = np.max(quality_scores) | |
# Interaction statistics | |
interaction_counts = [len(t.interactions) for t in self.trajectories] | |
avg_interactions = np.mean(interaction_counts) | |
max_interactions = np.max(interaction_counts) | |
# Task diversity (simple heuristic based on unique task descriptions) | |
unique_tasks = len(set([t.task_description for t in self.trajectories])) | |
return { | |
'total_trajectories': total, | |
'positive_count': positive_count, | |
'negative_count': negative_count, | |
'positive_ratio': positive_count / total if total > 0 else 0, | |
'avg_quality': avg_quality, | |
'min_quality': min_quality, | |
'max_quality': max_quality, | |
'avg_interactions': avg_interactions, | |
'max_interactions': max_interactions, | |
'unique_tasks': unique_tasks | |
} | |
def create_synthetic_dataset(num_trajectories: int = 10) -> TrajectoryDataset: | |
""" | |
Create a synthetic dataset for testing purposes. | |
Args: | |
num_trajectories: Number of trajectories to create | |
Returns: | |
Synthetic trajectory dataset | |
""" | |
dataset = TrajectoryDataset("synthetic_dataset") | |
# Sample task descriptions | |
task_descriptions = [ | |
"Book a flight from New York to London for next week", | |
"Find a vegetarian restaurant near downtown", | |
"Schedule a meeting with the marketing team for tomorrow", | |
"Order a new laptop with at least 16GB RAM", | |
"Write a congratulatory email to a colleague who got promoted", | |
"Research the best electric cars available in the market", | |
"Create a weekly meal plan with shopping list", | |
"Find information about tourist attractions in Barcelona", | |
"Help me debug a Python script that's giving an IndexError", | |
"Summarize the main points from the attached research paper" | |
] | |
# Create trajectories | |
for i in range(num_trajectories): | |
# Select task | |
task_idx = i % len(task_descriptions) | |
task = task_descriptions[task_idx] | |
# Create interactions (2-4 turns) | |
num_turns = np.random.randint(2, 5) | |
interactions = [] | |
for j in range(num_turns): | |
if j == 0: | |
user_msg = f"I need help with this task: {task}" | |
agent_msg = f"I'd be happy to help you {task.lower()}. Could you provide more details about your preferences?" | |
elif j == num_turns - 1: | |
user_msg = "That sounds good. Please proceed with the final steps." | |
agent_msg = f"I've completed the task to {task.lower()}. Here's a summary of what I did..." | |
else: | |
user_msg = f"I prefer options that are {['affordable', 'convenient', 'high-quality'][j % 3]}." | |
agent_msg = f"Based on your preference for {['affordable', 'convenient', 'high-quality'][j % 3]} options, I recommend..." | |
interactions.append({ | |
'user': user_msg, | |
'agent': agent_msg | |
}) | |
# Determine if positive or negative example | |
is_positive = (i % 4 != 0) # 75% positive, 25% negative | |
# Create metadata | |
metadata = { | |
'is_positive': is_positive, | |
'quality_score': np.random.uniform(0.7, 0.9) if is_positive else np.random.uniform(0.3, 0.5), | |
'created_at': '2025-05-21' | |
} | |
# Create and add trajectory | |
trajectory = Trajectory( | |
task_description=task, | |
interactions=interactions, | |
metadata=metadata | |
) | |
dataset.add_trajectory(trajectory) | |
return dataset | |