Spaces:
Sleeping
Sleeping
""" | |
Agent Tuning Module for Agent Tuning Optimization Framework | |
This module provides functionality for efficiently tuning large language models | |
into specialized agents using a combination of positive examples, negative examples, | |
and synthetically generated interaction trajectories. | |
""" | |
import os | |
import torch | |
import numpy as np | |
from typing import List, Dict, Any, Union, Optional, Tuple | |
from tqdm import tqdm | |
from transformers import ( | |
Trainer, TrainingArguments, | |
DataCollatorForLanguageModeling, | |
AutoModelForCausalLM, AutoTokenizer | |
) | |
from datasets import Dataset | |
from data.trajectory_data import Trajectory, TrajectoryDataset | |
from models.llm_interface import LLMInterface | |
class AgentTuner: | |
"""Base class for agent tuning methods.""" | |
def __init__(self, name: str): | |
""" | |
Initialize the agent tuner. | |
Args: | |
name: Name of the tuning method | |
""" | |
self.name = name | |
def tune( | |
self, | |
model_name: str, | |
trajectories: List[Trajectory], | |
**kwargs | |
) -> Tuple[Any, Dict[str, Any]]: | |
""" | |
Tune a model into a specialized agent. | |
Args: | |
model_name: Name of the base model | |
trajectories: List of training trajectories | |
**kwargs: Additional tuning parameters | |
Returns: | |
Tuple of (tuned_model, training_metrics) | |
""" | |
raise NotImplementedError("Subclasses must implement this method") | |
def save_model(self, model: Any, path: str) -> None: | |
""" | |
Save the tuned model. | |
Args: | |
model: Tuned model | |
path: Path to save the model | |
""" | |
raise NotImplementedError("Subclasses must implement this method") | |
def load_model(self, path: str) -> Any: | |
""" | |
Load a tuned model. | |
Args: | |
path: Path to the model | |
Returns: | |
Loaded model | |
""" | |
raise NotImplementedError("Subclasses must implement this method") | |
class SupervisedFineTuner(AgentTuner): | |
"""Tune agents using supervised fine-tuning.""" | |
def __init__(self): | |
"""Initialize the supervised fine-tuner.""" | |
super().__init__("supervised_fine_tuning") | |
def tune( | |
self, | |
model_name: str, | |
trajectories: List[Trajectory], | |
output_dir: str = "./tuned_model", | |
num_train_epochs: int = 3, | |
learning_rate: float = 5e-5, | |
batch_size: int = 4, | |
gradient_accumulation_steps: int = 4, | |
max_seq_length: int = 512, | |
format_type: str = "interleaved", | |
positive_weight: float = 0.8, | |
device: str = "cuda" if torch.cuda.is_available() else "cpu", | |
**kwargs | |
) -> Tuple[Any, Dict[str, Any]]: | |
""" | |
Tune a model using supervised fine-tuning. | |
Args: | |
model_name: Name of the base model | |
trajectories: List of training trajectories | |
output_dir: Directory to save the model | |
num_train_epochs: Number of training epochs | |
learning_rate: Learning rate | |
batch_size: Batch size | |
gradient_accumulation_steps: Gradient accumulation steps | |
max_seq_length: Maximum sequence length | |
format_type: Format type for trajectories | |
positive_weight: Weight for positive examples | |
device: Device to use for training | |
**kwargs: Additional tuning parameters | |
Returns: | |
Tuple of (tuned_model, training_metrics) | |
""" | |
print(f"Starting supervised fine-tuning of {model_name}") | |
# Create output directory | |
os.makedirs(output_dir, exist_ok=True) | |
# Load model and tokenizer | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForCausalLM.from_pretrained(model_name) | |
# Ensure the tokenizer has a pad token | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
# Prepare training data | |
print("Preparing training data...") | |
# Separate positive and negative trajectories | |
positive_trajectories = [t for t in trajectories if t.is_positive] | |
negative_trajectories = [t for t in trajectories if not t.is_positive] | |
print(f"Found {len(positive_trajectories)} positive and {len(negative_trajectories)} negative trajectories") | |
# Calculate sample counts based on positive weight | |
total_samples = len(trajectories) | |
target_positive = int(total_samples * positive_weight) | |
target_negative = total_samples - target_positive | |
# Sample trajectories to achieve desired ratio | |
if len(positive_trajectories) > target_positive: | |
positive_trajectories = np.random.choice(positive_trajectories, target_positive, replace=False).tolist() | |
if len(negative_trajectories) > target_negative: | |
negative_trajectories = np.random.choice(negative_trajectories, target_negative, replace=False).tolist() | |
# Combine trajectories | |
sampled_trajectories = positive_trajectories + negative_trajectories | |
np.random.shuffle(sampled_trajectories) | |
print(f"Using {len(positive_trajectories)} positive and {len(negative_trajectories)} negative trajectories for training") | |
# Format trajectories for training | |
training_texts = [] | |
for trajectory in tqdm(sampled_trajectories, desc="Formatting trajectories"): | |
formatted = trajectory.to_training_format(format_type) | |
training_texts.append(formatted) | |
# Tokenize training data | |
def tokenize_function(examples): | |
return tokenizer( | |
examples["text"], | |
padding="max_length", | |
truncation=True, | |
max_length=max_seq_length | |
) | |
# Create dataset | |
dataset = Dataset.from_dict({"text": training_texts}) | |
tokenized_dataset = dataset.map( | |
tokenize_function, | |
batched=True, | |
remove_columns=["text"] | |
) | |
# Set up training arguments | |
training_args = TrainingArguments( | |
output_dir=output_dir, | |
num_train_epochs=num_train_epochs, | |
per_device_train_batch_size=batch_size, | |
gradient_accumulation_steps=gradient_accumulation_steps, | |
learning_rate=learning_rate, | |
weight_decay=0.01, | |
save_strategy="epoch", | |
save_total_limit=2, | |
logging_dir=f"{output_dir}/logs", | |
logging_steps=10, | |
report_to="none" | |
) | |
# Create data collator | |
data_collator = DataCollatorForLanguageModeling( | |
tokenizer=tokenizer, | |
mlm=False | |
) | |
# Create trainer | |
trainer = Trainer( | |
model=model, | |
args=training_args, | |
train_dataset=tokenized_dataset, | |
data_collator=data_collator | |
) | |
# Train the model | |
print("Starting training...") | |
train_result = trainer.train() | |
# Save the model | |
print(f"Saving model to {output_dir}") | |
trainer.save_model(output_dir) | |
tokenizer.save_pretrained(output_dir) | |
# Return the model and metrics | |
metrics = { | |
"train_loss": train_result.training_loss, | |
"train_runtime": train_result.metrics["train_runtime"], | |
"samples_per_second": train_result.metrics["train_samples_per_second"], | |
"num_train_samples": len(tokenized_dataset) | |
} | |
return model, metrics | |
def save_model(self, model: Any, path: str) -> None: | |
""" | |
Save the tuned model. | |
Args: | |
model: Tuned model | |
path: Path to save the model | |
""" | |
model.save_pretrained(path) | |
def load_model(self, path: str) -> Any: | |
""" | |
Load a tuned model. | |
Args: | |
path: Path to the model | |
Returns: | |
Loaded model | |
""" | |
return AutoModelForCausalLM.from_pretrained(path) | |
class ParameterEfficientFineTuner(AgentTuner): | |
"""Tune agents using parameter-efficient fine-tuning methods.""" | |
def __init__(self): | |
"""Initialize the parameter-efficient fine-tuner.""" | |
super().__init__("parameter_efficient_fine_tuning") | |
def tune( | |
self, | |
model_name: str, | |
trajectories: List[Trajectory], | |
output_dir: str = "./tuned_model", | |
method: str = "lora", # 'lora', 'prefix', 'prompt_tuning' | |
num_train_epochs: int = 3, | |
learning_rate: float = 1e-4, | |
batch_size: int = 4, | |
gradient_accumulation_steps: int = 4, | |
max_seq_length: int = 512, | |
format_type: str = "interleaved", | |
positive_weight: float = 0.8, | |
device: str = "cuda" if torch.cuda.is_available() else "cpu", | |
**kwargs | |
) -> Tuple[Any, Dict[str, Any]]: | |
""" | |
Tune a model using parameter-efficient methods. | |
Args: | |
model_name: Name of the base model | |
trajectories: List of training trajectories | |
output_dir: Directory to save the model | |
method: PEFT method to use | |
num_train_epochs: Number of training epochs | |
learning_rate: Learning rate | |
batch_size: Batch size | |
gradient_accumulation_steps: Gradient accumulation steps | |
max_seq_length: Maximum sequence length | |
format_type: Format type for trajectories | |
positive_weight: Weight for positive examples | |
device: Device to use for training | |
**kwargs: Additional tuning parameters | |
Returns: | |
Tuple of (tuned_model, training_metrics) | |
""" | |
try: | |
from peft import ( | |
get_peft_model, LoraConfig, PrefixTuningConfig, | |
PromptTuningConfig, TaskType, PeftModel | |
) | |
except ImportError: | |
raise ImportError("PEFT library is required for parameter-efficient fine-tuning. Install it with 'pip install peft'.") | |
print(f"Starting parameter-efficient fine-tuning of {model_name} using {method}") | |
# Create output directory | |
os.makedirs(output_dir, exist_ok=True) | |
# Load model and tokenizer | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForCausalLM.from_pretrained(model_name) | |
# Ensure the tokenizer has a pad token | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
# Configure PEFT method | |
if method == "lora": | |
peft_config = LoraConfig( | |
task_type=TaskType.CAUSAL_LM, | |
r=16, | |
lora_alpha=32, | |
lora_dropout=0.1, | |
target_modules=["q_proj", "v_proj"] | |
) | |
elif method == "prefix": | |
peft_config = PrefixTuningConfig( | |
task_type=TaskType.CAUSAL_LM, | |
num_virtual_tokens=20, | |
prefix_projection=True | |
) | |
elif method == "prompt_tuning": | |
peft_config = PromptTuningConfig( | |
task_type=TaskType.CAUSAL_LM, | |
num_virtual_tokens=20, | |
tokenizer_name_or_path=model_name | |
) | |
else: | |
raise ValueError(f"Unsupported PEFT method: {method}") | |
# Create PEFT model | |
model = get_peft_model(model, peft_config) | |
model.print_trainable_parameters() | |
# Prepare training data (same as SupervisedFineTuner) | |
print("Preparing training data...") | |
# Separate positive and negative trajectories | |
positive_trajectories = [t for t in trajectories if t.is_positive] | |
negative_trajectories = [t for t in trajectories if not t.is_positive] | |
print(f"Found {len(positive_trajectories)} positive and {len(negative_trajectories)} negative trajectories") | |
# Calculate sample counts based on positive weight | |
total_samples = len(trajectories) | |
target_positive = int(total_samples * positive_weight) | |
target_negative = total_samples - target_positive | |
# Sample trajectories to achieve desired ratio | |
if len(positive_trajectories) > target_positive: | |
positive_trajectories = np.random.choice(positive_trajectories, target_positive, replace=False).tolist() | |
if len(negative_trajectories) > target_negative: | |
negative_trajectories = np.random.choice(negative_trajectories, target_negative, replace=False).tolist() | |
# Combine trajectories | |
sampled_trajectories = positive_trajectories + negative_trajectories | |
np.random.shuffle(sampled_trajectories) | |
print(f"Using {len(positive_trajectories)} positive and {len(negative_trajectories)} negative trajectories for training") | |
# Format trajectories for training | |
training_texts = [] | |
for trajectory in tqdm(sampled_trajectories, desc="Formatting trajectories"): | |
formatted = trajectory.to_training_format(format_type) | |
training_texts.append(formatted) | |
# Tokenize training data | |
def tokenize_function(examples): | |
return tokenizer( | |
examples["text"], | |
padding="max_length", | |
truncation=True, | |
max_length=max_seq_length | |
) | |
# Create dataset | |
dataset = Dataset.from_dict({"text": training_texts}) | |
tokenized_dataset = dataset.map( | |
tokenize_function, | |
batched=True, | |
remove_columns=["text"] | |
) | |
# Set up training arguments | |
training_args = TrainingArguments( | |
output_dir=output_dir, | |
num_train_epochs=num_train_epochs, | |
per_device_train_batch_size=batch_size, | |
gradient_accumulation_steps=gradient_accumulation_steps, | |
learning_rate=learning_rate, | |
weight_decay=0.01, | |
save_strategy="epoch", | |
save_total_limit=2, | |
logging_dir=f"{output_dir}/logs", | |
logging_steps=10, | |
report_to="none" | |
) | |
# Create data collator | |
data_collator = DataCollatorForLanguageModeling( | |
tokenizer=tokenizer, | |
mlm=False | |
) | |
# Create trainer | |
trainer = Trainer( | |
model=model, | |
args=training_args, | |
train_dataset=tokenized_dataset, | |
data_collator=data_collator | |
) | |
# Train the model | |
print("Starting training...") | |
train_result = trainer.train() | |
# Save the model | |
print(f"Saving model to {output_dir}") | |
trainer.save_model(output_dir) | |
tokenizer.save_pretrained(output_dir) | |
# Return the model and metrics | |
metrics = { | |
"train_loss": train_result.training_loss, | |
"train_runtime": train_result.metrics["train_runtime"], | |
"samples_per_second": train_result.metrics["train_samples_per_second"], | |
"num_train_samples": len(tokenized_dataset), | |
"peft_method": method | |
} | |
return model, metrics | |
def save_model(self, model: Any, path: str) -> None: | |
""" | |
Save the tuned model. | |
Args: | |
model: Tuned model | |
path: Path to save the model | |
""" | |
model.save_pretrained(path) | |
(Content truncated due to size limit. Use line ranges to read in chunks) |