Spaces:
Sleeping
Sleeping
File size: 10,036 Bytes
b2d89cf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
"""
Advanced predictor with support for multiple models and ensemble predictions
"""
import os
import torch
import numpy as np
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
TextClassificationPipeline
)
import logging
from typing import Dict, List, Tuple, Optional
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AdvancedPredictor:
"""Advanced predictor with ensemble support and confidence calibration"""
def __init__(self, model_paths: List[str], weights: Optional[List[float]] = None):
"""
Initialize with multiple models for ensemble prediction
Args:
model_paths: List of paths to model directories
weights: Optional weights for each model (must sum to 1.0)
"""
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.models = []
self.tokenizers = []
self.pipelines = []
# Load all models
for path in model_paths:
if os.path.exists(path):
logger.info(f"Loading model from {path}")
tokenizer = AutoTokenizer.from_pretrained(path)
model = AutoModelForSequenceClassification.from_pretrained(path)
model = model.to(self.device)
model.eval()
# Create pipeline
pipeline = TextClassificationPipeline(
model=model,
tokenizer=tokenizer,
device=0 if torch.cuda.is_available() else -1,
top_k=None,
function_to_apply="sigmoid"
)
self.models.append(model)
self.tokenizers.append(tokenizer)
self.pipelines.append(pipeline)
else:
logger.warning(f"Model path not found: {path}")
if not self.models:
raise ValueError("No models loaded successfully")
# Set weights
if weights:
assert len(weights) == len(self.models), "Number of weights must match number of models"
assert abs(sum(weights) - 1.0) < 1e-6, "Weights must sum to 1.0"
self.weights = weights
else:
# Equal weights by default
self.weights = [1.0 / len(self.models)] * len(self.models)
logger.info(f"Initialized with {len(self.models)} models")
def predict(self, text: str, return_all_scores: bool = False) -> Dict:
"""
Make ensemble prediction
Args:
text: Email text to classify
return_all_scores: Whether to return individual model scores
Returns:
Dictionary with prediction results
"""
# Preprocess text
text = self._preprocess_email(text)
# Get predictions from all models
all_predictions = []
for pipeline in self.pipelines:
try:
result = pipeline(text)
all_predictions.append(result)
except Exception as e:
logger.error(f"Error in prediction: {e}")
continue
if not all_predictions:
return {
"label": "IMPORTANT",
"score": 0.5,
"confidence": "low",
"error": "Prediction failed"
}
# Aggregate predictions
ensemble_scores = self._aggregate_predictions(all_predictions)
# Determine final prediction
unsub_score = ensemble_scores.get("UNSUBSCRIBABLE", 0.5)
important_score = ensemble_scores.get("IMPORTANT", 0.5)
# Apply confidence calibration
calibrated_unsub = self._calibrate_confidence(unsub_score)
# Determine label
if calibrated_unsub > 0.75: # High confidence threshold
label = "UNSUBSCRIBABLE"
score = calibrated_unsub
else:
label = "IMPORTANT"
score = important_score
# Confidence level
if score > 0.9:
confidence = "high"
elif score > 0.7:
confidence = "medium"
else:
confidence = "low"
result = {
"label": label,
"score": float(score),
"confidence": confidence,
"raw_scores": {
"UNSUBSCRIBABLE": float(unsub_score),
"IMPORTANT": float(important_score)
}
}
if return_all_scores:
result["model_predictions"] = all_predictions
return result
def _preprocess_email(self, text: str) -> str:
"""Advanced email preprocessing"""
# Handle subject extraction
if "Subject:" in text:
parts = text.split("Subject:", 1)
if len(parts) > 1:
subject = parts[1].split("\n")[0].strip()
body = parts[1][len(subject):].strip()
# Emphasize subject
text = f"Email Subject: {subject}. Email Body: {body}"
# Clean text
text = text.replace("\\n", " ").replace("\\t", " ")
text = " ".join(text.split())
# Truncate if too long
if len(text) > 2000:
text = text[:2000] + "..."
return text
def _aggregate_predictions(self, predictions: List) -> Dict[str, float]:
"""Aggregate predictions from multiple models using weighted voting"""
aggregated = {"UNSUBSCRIBABLE": 0.0, "IMPORTANT": 0.0}
for i, pred_list in enumerate(predictions):
weight = self.weights[i]
# Handle different prediction formats
if isinstance(pred_list, list) and pred_list:
for pred in pred_list:
label = pred.get("label", "").upper()
score = pred.get("score", 0.5)
if label in aggregated:
aggregated[label] += score * weight
# Normalize
total = sum(aggregated.values())
if total > 0:
for key in aggregated:
aggregated[key] /= total
return aggregated
def _calibrate_confidence(self, score: float, temperature: float = 1.2) -> float:
"""Apply temperature scaling for confidence calibration"""
# Convert to logit
epsilon = 1e-7
score = np.clip(score, epsilon, 1 - epsilon)
logit = np.log(score / (1 - score))
# Apply temperature scaling
calibrated_logit = logit / temperature
# Convert back to probability
calibrated_score = 1 / (1 + np.exp(-calibrated_logit))
return float(calibrated_score)
def predict_batch(self, texts: List[str]) -> List[Dict]:
"""Predict multiple emails efficiently"""
results = []
# Process in batches for efficiency
batch_size = 8
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
batch_results = [self.predict(text) for text in batch]
results.extend(batch_results)
return results
def get_feature_importance(self, text: str) -> Dict:
"""Get feature importance for explainability"""
# This is a simplified version - in production, use SHAP or LIME
important_keywords = [
"unsubscribe", "opt out", "preferences", "newsletter",
"promotional", "marketing", "deal", "offer", "sale"
]
text_lower = text.lower()
found_keywords = [kw for kw in important_keywords if kw in text_lower]
return {
"important_features": found_keywords,
"feature_count": len(found_keywords)
}
def create_advanced_predictor():
"""Factory function to create predictor with best available models"""
model_paths = []
# Check for advanced model first
if os.path.exists("./advanced_unsubscriber_model"):
model_paths.append("./advanced_unsubscriber_model")
# Check for optimized model
if os.path.exists("./optimized_model"):
model_paths.append("./optimized_model")
# Fallback to original model
if os.path.exists("./ml_suite/models/fine_tuned_unsubscriber"):
model_paths.append("./ml_suite/models/fine_tuned_unsubscriber")
if not model_paths:
raise ValueError("No trained models found")
# Use ensemble if multiple models available
if len(model_paths) > 1:
logger.info(f"Creating ensemble predictor with {len(model_paths)} models")
# Give higher weight to advanced model
weights = [0.6, 0.4] if len(model_paths) == 2 else None
return AdvancedPredictor(model_paths, weights)
else:
logger.info(f"Creating single model predictor")
return AdvancedPredictor(model_paths)
# Example usage
if __name__ == "__main__":
# Test the predictor
predictor = create_advanced_predictor()
test_emails = [
"Subject: 50% OFF Everything! Limited time offer. Click here to shop now. Unsubscribe from promotional emails.",
"Subject: Security Alert: New login detected. We noticed a login from a new device. If this wasn't you, secure your account.",
"Subject: Your monthly newsletter is here! Check out our latest articles and tips. Manage your email preferences.",
]
for email in test_emails:
result = predictor.predict(email, return_all_scores=True)
print(f"\nEmail: {email[:100]}...")
print(f"Prediction: {result['label']}")
print(f"Confidence: {result['confidence']} ({result['score']:.2%})")
print(f"Raw scores: {result['raw_scores']}") |