File size: 10,036 Bytes
b2d89cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
"""
Advanced predictor with support for multiple models and ensemble predictions
"""

import os
import torch
import numpy as np
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TextClassificationPipeline
)
import logging
from typing import Dict, List, Tuple, Optional

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AdvancedPredictor:
    """Advanced predictor with ensemble support and confidence calibration"""
    
    def __init__(self, model_paths: List[str], weights: Optional[List[float]] = None):
        """
        Initialize with multiple models for ensemble prediction
        
        Args:
            model_paths: List of paths to model directories
            weights: Optional weights for each model (must sum to 1.0)
        """
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.models = []
        self.tokenizers = []
        self.pipelines = []
        
        # Load all models
        for path in model_paths:
            if os.path.exists(path):
                logger.info(f"Loading model from {path}")
                tokenizer = AutoTokenizer.from_pretrained(path)
                model = AutoModelForSequenceClassification.from_pretrained(path)
                model = model.to(self.device)
                model.eval()
                
                # Create pipeline
                pipeline = TextClassificationPipeline(
                    model=model,
                    tokenizer=tokenizer,
                    device=0 if torch.cuda.is_available() else -1,
                    top_k=None,
                    function_to_apply="sigmoid"
                )
                
                self.models.append(model)
                self.tokenizers.append(tokenizer)
                self.pipelines.append(pipeline)
            else:
                logger.warning(f"Model path not found: {path}")
        
        if not self.models:
            raise ValueError("No models loaded successfully")
        
        # Set weights
        if weights:
            assert len(weights) == len(self.models), "Number of weights must match number of models"
            assert abs(sum(weights) - 1.0) < 1e-6, "Weights must sum to 1.0"
            self.weights = weights
        else:
            # Equal weights by default
            self.weights = [1.0 / len(self.models)] * len(self.models)
        
        logger.info(f"Initialized with {len(self.models)} models")
    
    def predict(self, text: str, return_all_scores: bool = False) -> Dict:
        """
        Make ensemble prediction
        
        Args:
            text: Email text to classify
            return_all_scores: Whether to return individual model scores
            
        Returns:
            Dictionary with prediction results
        """
        # Preprocess text
        text = self._preprocess_email(text)
        
        # Get predictions from all models
        all_predictions = []
        for pipeline in self.pipelines:
            try:
                result = pipeline(text)
                all_predictions.append(result)
            except Exception as e:
                logger.error(f"Error in prediction: {e}")
                continue
        
        if not all_predictions:
            return {
                "label": "IMPORTANT",
                "score": 0.5,
                "confidence": "low",
                "error": "Prediction failed"
            }
        
        # Aggregate predictions
        ensemble_scores = self._aggregate_predictions(all_predictions)
        
        # Determine final prediction
        unsub_score = ensemble_scores.get("UNSUBSCRIBABLE", 0.5)
        important_score = ensemble_scores.get("IMPORTANT", 0.5)
        
        # Apply confidence calibration
        calibrated_unsub = self._calibrate_confidence(unsub_score)
        
        # Determine label
        if calibrated_unsub > 0.75:  # High confidence threshold
            label = "UNSUBSCRIBABLE"
            score = calibrated_unsub
        else:
            label = "IMPORTANT"
            score = important_score
        
        # Confidence level
        if score > 0.9:
            confidence = "high"
        elif score > 0.7:
            confidence = "medium"
        else:
            confidence = "low"
        
        result = {
            "label": label,
            "score": float(score),
            "confidence": confidence,
            "raw_scores": {
                "UNSUBSCRIBABLE": float(unsub_score),
                "IMPORTANT": float(important_score)
            }
        }
        
        if return_all_scores:
            result["model_predictions"] = all_predictions
        
        return result
    
    def _preprocess_email(self, text: str) -> str:
        """Advanced email preprocessing"""
        # Handle subject extraction
        if "Subject:" in text:
            parts = text.split("Subject:", 1)
            if len(parts) > 1:
                subject = parts[1].split("\n")[0].strip()
                body = parts[1][len(subject):].strip()
                # Emphasize subject
                text = f"Email Subject: {subject}. Email Body: {body}"
        
        # Clean text
        text = text.replace("\\n", " ").replace("\\t", " ")
        text = " ".join(text.split())
        
        # Truncate if too long
        if len(text) > 2000:
            text = text[:2000] + "..."
        
        return text
    
    def _aggregate_predictions(self, predictions: List) -> Dict[str, float]:
        """Aggregate predictions from multiple models using weighted voting"""
        aggregated = {"UNSUBSCRIBABLE": 0.0, "IMPORTANT": 0.0}
        
        for i, pred_list in enumerate(predictions):
            weight = self.weights[i]
            
            # Handle different prediction formats
            if isinstance(pred_list, list) and pred_list:
                for pred in pred_list:
                    label = pred.get("label", "").upper()
                    score = pred.get("score", 0.5)
                    
                    if label in aggregated:
                        aggregated[label] += score * weight
        
        # Normalize
        total = sum(aggregated.values())
        if total > 0:
            for key in aggregated:
                aggregated[key] /= total
        
        return aggregated
    
    def _calibrate_confidence(self, score: float, temperature: float = 1.2) -> float:
        """Apply temperature scaling for confidence calibration"""
        # Convert to logit
        epsilon = 1e-7
        score = np.clip(score, epsilon, 1 - epsilon)
        logit = np.log(score / (1 - score))
        
        # Apply temperature scaling
        calibrated_logit = logit / temperature
        
        # Convert back to probability
        calibrated_score = 1 / (1 + np.exp(-calibrated_logit))
        
        return float(calibrated_score)
    
    def predict_batch(self, texts: List[str]) -> List[Dict]:
        """Predict multiple emails efficiently"""
        results = []
        
        # Process in batches for efficiency
        batch_size = 8
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]
            batch_results = [self.predict(text) for text in batch]
            results.extend(batch_results)
        
        return results
    
    def get_feature_importance(self, text: str) -> Dict:
        """Get feature importance for explainability"""
        # This is a simplified version - in production, use SHAP or LIME
        important_keywords = [
            "unsubscribe", "opt out", "preferences", "newsletter",
            "promotional", "marketing", "deal", "offer", "sale"
        ]
        
        text_lower = text.lower()
        found_keywords = [kw for kw in important_keywords if kw in text_lower]
        
        return {
            "important_features": found_keywords,
            "feature_count": len(found_keywords)
        }


def create_advanced_predictor():
    """Factory function to create predictor with best available models"""
    model_paths = []
    
    # Check for advanced model first
    if os.path.exists("./advanced_unsubscriber_model"):
        model_paths.append("./advanced_unsubscriber_model")
    
    # Check for optimized model
    if os.path.exists("./optimized_model"):
        model_paths.append("./optimized_model")
    
    # Fallback to original model
    if os.path.exists("./ml_suite/models/fine_tuned_unsubscriber"):
        model_paths.append("./ml_suite/models/fine_tuned_unsubscriber")
    
    if not model_paths:
        raise ValueError("No trained models found")
    
    # Use ensemble if multiple models available
    if len(model_paths) > 1:
        logger.info(f"Creating ensemble predictor with {len(model_paths)} models")
        # Give higher weight to advanced model
        weights = [0.6, 0.4] if len(model_paths) == 2 else None
        return AdvancedPredictor(model_paths, weights)
    else:
        logger.info(f"Creating single model predictor")
        return AdvancedPredictor(model_paths)


# Example usage
if __name__ == "__main__":
    # Test the predictor
    predictor = create_advanced_predictor()
    
    test_emails = [
        "Subject: 50% OFF Everything! Limited time offer. Click here to shop now. Unsubscribe from promotional emails.",
        "Subject: Security Alert: New login detected. We noticed a login from a new device. If this wasn't you, secure your account.",
        "Subject: Your monthly newsletter is here! Check out our latest articles and tips. Manage your email preferences.",
    ]
    
    for email in test_emails:
        result = predictor.predict(email, return_all_scores=True)
        print(f"\nEmail: {email[:100]}...")
        print(f"Prediction: {result['label']}")
        print(f"Confidence: {result['confidence']} ({result['score']:.2%})")
        print(f"Raw scores: {result['raw_scores']}")