from utils.agent_logger import AgentLogger import torch import numpy as np from PIL import Image # For image processing context # import smolagents # Removed unused import agent_logger = AgentLogger() class ContextualIntelligenceAgent: def __init__(self): agent_logger.log("context_intelligence", "info", "Initializing ContextualIntelligenceAgent.") # This would be a more sophisticated model in a real scenario self.context_rules = { "high_resolution": {"min_width": 1920, "min_height": 1080, "tag": "high_resolution_image"}, "low_resolution": {"max_width": 640, "max_height": 480, "tag": "low_resolution_image"}, "grayscale": {"mode": "L", "tag": "grayscale_image"}, "potentially_natural_scene": {"keywords": ["Real"], "threshold": 0.7, "tag": "potentially_natural_scene"}, "potentially_ai_generated": {"keywords": ["AI", "Fake", "Deepfake"], "threshold": 0.7, "tag": "potentially_ai_generated"}, "outdoor": {"model_tags": ["sunny", "sky", "trees"], "tag": "outdoor"}, "indoor": {"model_tags": ["room", "furniture"], "tag": "indoor"}, "sunny": {"rgb_avg_min": [200, 200, 100], "tag": "sunny"}, "dark": {"rgb_avg_max": [50, 50, 50], "tag": "dark"}, } def infer_context_tags(self, image_metadata: dict, model_predictions: dict) -> list[str]: agent_logger.log("context_intelligence", "info", "Inferring context tags from image metadata and model predictions.") detected_tags = [] # Analyze image metadata width = image_metadata.get("width", 0) height = image_metadata.get("height", 0) mode = image_metadata.get("mode", "RGB") if width >= self.context_rules["high_resolution"]["min_width"] and \ height >= self.context_rules["high_resolution"]["min_height"]: detected_tags.append(self.context_rules["high_resolution"]["tag"]) agent_logger.log("context_intelligence", "debug", f"Detected tag: {self.context_rules['high_resolution']['tag']}") if width <= self.context_rules["low_resolution"]["max_width"] and \ height <= self.context_rules["low_resolution"]["max_height"]: detected_tags.append(self.context_rules["low_resolution"]["tag"]) agent_logger.log("context_intelligence", "debug", f"Detected tag: {self.context_rules['low_resolution']['tag']}") if mode == self.context_rules["grayscale"]["mode"]: detected_tags.append(self.context_rules["grayscale"]["tag"]) agent_logger.log("context_intelligence", "debug", f"Detected tag: {self.context_rules['grayscale']['tag']}") # Analyze model predictions for general context for model_id, prediction in model_predictions.items(): label = prediction.get("Label") ai_score = prediction.get("AI Score", 0.0) real_score = prediction.get("Real Score", 0.0) if label and "potentially_natural_scene" not in detected_tags: for keyword in self.context_rules["potentially_natural_scene"]["keywords"]: if keyword in label and real_score >= self.context_rules["potentially_natural_scene"]["threshold"]: detected_tags.append(self.context_rules["potentially_natural_scene"]["tag"]) agent_logger.log("context_intelligence", "debug", f"Detected tag: {self.context_rules['potentially_natural_scene']['tag']}") break # Only add once if label and "potentially_ai_generated" not in detected_tags: for keyword in self.context_rules["potentially_ai_generated"]["keywords"]: if keyword in label and ai_score >= self.context_rules["potentially_ai_generated"]["threshold"]: detected_tags.append(self.context_rules["potentially_ai_generated"]["tag"]) agent_logger.log("context_intelligence", "debug", f"Detected tag: {self.context_rules['potentially_ai_generated']['tag']}") break # Only add once # Simulate simple scene detection based on general consensus if available # This is a very basic simulation; a real system would use a separate scene classification model if "potentially_natural_scene" in detected_tags and "potentially_ai_generated" not in detected_tags: # Simulate outdoor/sunny detection based on presence of a real image tag # In a real scenario, this would involve analyzing image features if real_score > 0.8: # Placeholder for actual image feature analysis detected_tags.append(self.context_rules["outdoor"]["tag"]) detected_tags.append(self.context_rules["sunny"]["tag"]) agent_logger.log("context_intelligence", "debug", f"Simulated tags: {self.context_rules['outdoor']['tag']},{self.context_rules['sunny']['tag']}") agent_logger.log("context_intelligence", "info", f"Inferred context tags: {detected_tags}") return detected_tags class ForensicAnomalyDetectionAgent: def __init__(self): agent_logger.log("forensic_anomaly_detection", "info", "Initializing ForensicAnomalyDetectionAgent.") self.anomaly_thresholds = { "ELA": {"min_anomalies": 3, "max_error_std": 20}, # Example thresholds "gradient": {"min_sharp_edges": 500}, "minmax": {"min_local_deviation": 0.1} } def analyze_forensic_outputs(self, forensic_output_descriptions: list[str]) -> dict: agent_logger.log("forensic_anomaly_detection", "info", "Analyzing forensic outputs for anomalies.") anomalies_detected = [] summary_message = "No significant anomalies detected." # Example: Check for ELA anomalies (simplified) ela_anomalies = [desc for desc in forensic_output_descriptions if "ELA analysis" in desc and "enhanced contrast" in desc] if len(ela_anomalies) > self.anomaly_thresholds["ELA"]["min_anomalies"]: anomalies_detected.append("Multiple ELA passes indicate potential inconsistencies.") agent_logger.log("forensic_anomaly_detection", "warning", "Detected multiple ELA passes indicating potential inconsistencies.") # Example: Check for gradient anomalies (simplified) gradient_anomalies = [desc for desc in forensic_output_descriptions if "Gradient processing" in desc] if len(gradient_anomalies) > 1 and "Highlights edges and transitions" in gradient_anomalies[0]: # This is a placeholder for actual image analysis, e.g., checking standard deviation of gradients anomalies_detected.append("Gradient analysis shows unusual edge patterns.") agent_logger.log("forensic_anomaly_detection", "warning", "Detected unusual edge patterns from gradient analysis.") # Example: Check for MinMax anomalies (simplified) minmax_anomalies = [desc for desc in forensic_output_descriptions if "MinMax processing" in desc] if len(minmax_anomalies) > 1 and "Deviations in local pixel values" in minmax_anomalies[0]: # Placeholder for actual analysis of minmax output, e.g., deviation variance anomalies_detected.append("MinMax processing reveals subtle pixel deviations.") agent_logger.log("forensic_anomaly_detection", "warning", "Detected subtle pixel deviations from MinMax processing.") if "Bit Plane extractor" in str(forensic_output_descriptions): anomalies_detected.append("Bit Plane extraction performed.") agent_logger.log("forensic_anomaly_detection", "info", "Bit Plane extraction performed.") if anomalies_detected: summary_message = "Potential anomalies detected: " + "; ".join(anomalies_detected) agent_logger.log("forensic_anomaly_detection", "warning", f"Forensic anomaly detection summary: {summary_message}") else: agent_logger.log("forensic_anomaly_detection", "info", f"Forensic anomaly detection summary: {summary_message}") return {"anomalies": anomalies_detected, "summary": summary_message}