import streamlit as st import torch import torch.nn as nn from torch.utils.data import DataLoader from torchvision import transforms from transformers import BlipProcessor, BlipForConditionalGeneration from PIL import Image import numpy as np import io import base64 import cv2 import matplotlib.pyplot as plt from peft import PeftModel from unsloth import FastVisionModel import os import tempfile import warnings from gradcam_xception import load_xception_model, generate_smoothgrad_visualizations_xception, get_xception_transform warnings.filterwarnings("ignore", category=UserWarning) # App title and description st.set_page_config( page_title="Deepfake Analyzer", layout="wide", page_icon="🔍" ) # Main title and description st.title("Deepfake Image Analyser") st.markdown("Analyse images for deepfake manipulation") # Check for GPU availability def check_gpu(): if torch.cuda.is_available(): gpu_info = torch.cuda.get_device_properties(0) st.sidebar.success(f"✅ GPU available: {gpu_info.name} ({gpu_info.total_memory / (1024**3):.2f} GB)") return True else: st.sidebar.warning("⚠️ No GPU detected. Analysis will be slower.") return False # Sidebar components st.sidebar.title("About") st.sidebar.markdown(""" This tool detects deepfakes using three AI models: - **Xception**: Initial Real/Fake classification - **BLIP**: Describes image content - **Llama 3.2**: Explains potential manipulations ### Quick Start 1. **Load Models** - Start with Xception, add others as needed 2. **Upload Image** - View classification and heat map 3. **Analyze** - Get explanations and ask questions *GPU recommended for better performance* """) # Fixed values for temperature and max tokens temperature = 0.7 max_tokens = 500 # Custom instruction text area in sidebar use_custom_instructions = st.sidebar.toggle("Enable Custom Instructions", value=False, help="Toggle to enable/disable custom instructions") if use_custom_instructions: custom_instruction = st.sidebar.text_area( "Custom Instructions (Advanced)", value="Specify your preferred style of explanation (e.g., 'Provide technical, detailed explanations' or 'Use simple, non-technical language'). You can also specify what aspects of the image to focus on.", help="Add specific instructions for the analysis" ) else: custom_instruction = "" # ----- GradCAM Implementation for Xception ----- class ImageDataset(torch.utils.data.Dataset): def __init__(self, image, transform=None, face_only=True, dataset_name=None): self.image = image self.transform = transform self.face_only = face_only self.dataset_name = dataset_name # Load face detector self.face_detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') def __len__(self): return 1 # Only one image def detect_face(self, image_np): """Detect face in image and return the face region""" gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY) faces = self.face_detector.detectMultiScale(gray, 1.1, 5) # If no face is detected, use the whole image if len(faces) == 0: st.info("No face detected, using whole image for analysis") h, w = image_np.shape[:2] return (0, 0, w, h), image_np # Get the largest face if len(faces) > 1: # Choose the largest face by area areas = [w*h for (x, y, w, h) in faces] largest_idx = np.argmax(areas) x, y, w, h = faces[largest_idx] else: x, y, w, h = faces[0] # Add padding around the face (5% on each side) padding_x = int(w * 0.05) padding_y = int(h * 0.05) # Ensure padding doesn't go outside image bounds x1 = max(0, x - padding_x) y1 = max(0, y - padding_y) x2 = min(image_np.shape[1], x + w + padding_x) y2 = min(image_np.shape[0], y + h + padding_y) # Extract the face region face_img = image_np[y1:y2, x1:x2] return (x1, y1, x2-x1, y2-y1), face_img def __getitem__(self, idx): image_np = np.array(self.image) label = 0 # Default label; will be overridden by prediction # Store original image for visualization original_image = self.image.copy() # Detect face if required if self.face_only: face_box, face_img_np = self.detect_face(image_np) face_img = Image.fromarray(face_img_np) # Apply transform to face image if self.transform: face_tensor = self.transform(face_img) else: face_tensor = transforms.ToTensor()(face_img) return face_tensor, label, "uploaded_image", original_image, face_box, self.dataset_name else: # Process the whole image if self.transform: image_tensor = self.transform(self.image) else: image_tensor = transforms.ToTensor()(self.image) return image_tensor, label, "uploaded_image", original_image, None, self.dataset_name # Function to process image with Xception GradCAM def process_image_with_xception_gradcam(image, model, device, pred_class): """Process an image with Xception GradCAM""" cam_results = generate_smoothgrad_visualizations_xception( model=model, image=image, target_class=pred_class, face_only=True, num_samples=5 # Can be adjusted ) if cam_results and len(cam_results) == 4: raw_cam, cam_img, overlay, comparison = cam_results # Extract the face box from the dataset if needed transform = get_xception_transform() dataset = ImageDataset(image, transform=transform, face_only=True) _, _, _, _, face_box, _ = dataset[0] return raw_cam, overlay, comparison, face_box else: st.error("Failed to generate GradCAM visualization") return None, None, None, None # ----- Xception Model Loading ----- @st.cache_resource def load_detection_model_xception(): """Loads the Xception model from our module""" with st.spinner("Loading Xception model for deepfake detection..."): try: model = load_xception_model() # Get the device device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) model.eval() return model, device except Exception as e: st.error(f"Error loading Xception model: {str(e)}") return None, None # ----- BLIP Image Captioning ----- # Function to load BLIP captioning models @st.cache_resource def load_blip_models(): with st.spinner("Loading BLIP captioning models..."): try: # Load original BLIP model for general image captioning original_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large") original_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large") # Load fine-tuned BLIP model for GradCAM analysis finetuned_processor = BlipProcessor.from_pretrained("saakshigupta/deepfake-blip-large") finetuned_model = BlipForConditionalGeneration.from_pretrained("saakshigupta/deepfake-blip-large") return original_processor, original_model, finetuned_processor, finetuned_model except Exception as e: st.error(f"Error loading BLIP models: {str(e)}") return None, None, None, None # Function to generate image caption using BLIP's VQA approach for GradCAM def generate_gradcam_caption(image, processor, model, max_length=60): """ Generate a detailed analysis of GradCAM visualization using the fine-tuned BLIP model """ try: # Process image first inputs = processor(image, return_tensors="pt") # Check for available GPU and move model and inputs device = "cuda" if torch.cuda.is_available() else "cpu" model = model.to(device) inputs = {k: v.to(device) if hasattr(v, 'to') else v for k, v in inputs.items()} # Generate caption with torch.no_grad(): output = model.generate(**inputs, max_length=max_length, num_beams=5) # Decode the output caption = processor.decode(output[0], skip_special_tokens=True) # Extract descriptions using the full text high_match = caption.split("high activation :")[1].split("moderate")[0] if "high activation :" in caption else "" moderate_match = caption.split("moderate activation :")[1].split("low")[0] if "moderate activation :" in caption else "" low_match = caption.split("low activation :")[1] if "low activation :" in caption else "" # Format the output formatted_text = "" if high_match: formatted_text += f"**High activation**:\n{high_match.strip()}\n\n" if moderate_match: formatted_text += f"**Moderate activation**:\n{moderate_match.strip()}\n\n" if low_match: formatted_text += f"**Low activation**:\n{low_match.strip()}" return formatted_text.strip() except Exception as e: st.error(f"Error analyzing GradCAM: {str(e)}") return "Error analyzing GradCAM visualization" # Function to generate caption for original image def generate_image_caption(image, processor, model, max_length=75, num_beams=5): """Generate a caption for the original image using the original BLIP model""" try: # Check for available GPU device = "cuda" if torch.cuda.is_available() else "cpu" model = model.to(device) # For original image, use unconditional captioning inputs = processor(image, return_tensors="pt").to(device) # Generate caption with torch.no_grad(): output = model.generate(**inputs, max_length=max_length, num_beams=num_beams) # Decode the output caption = processor.decode(output[0], skip_special_tokens=True) # Format into structured description structured_caption = f""" **Subject**: The image shows a person in a photograph. **Appearance**: {caption} **Background**: The background appears to be a controlled environment. **Lighting**: The lighting appears to be professional with even illumination. **Colors**: The image contains natural skin tones and colors typical of photography. **Notable Elements**: The facial features and expression are the central focus of the image. """ return structured_caption.strip() except Exception as e: st.error(f"Error generating caption: {str(e)}") return "Error generating caption" # ----- Fine-tuned Vision LLM ----- # Function to fix cross-attention masks def fix_cross_attention_mask(inputs): if 'cross_attention_mask' in inputs and 0 in inputs['cross_attention_mask'].shape: batch_size, seq_len, _, num_tiles = inputs['cross_attention_mask'].shape visual_features = 6404 # Critical dimension new_mask = torch.ones((batch_size, seq_len, visual_features, num_tiles), device=inputs['cross_attention_mask'].device) inputs['cross_attention_mask'] = new_mask return inputs # Load model function @st.cache_resource def load_llm_model(): with st.spinner("Loading LLM vision model... This may take a few minutes. Please be patient..."): try: # Check for GPU has_gpu = check_gpu() # Load base model and tokenizer using Unsloth base_model_id = "unsloth/llama-3.2-11b-vision-instruct" model, tokenizer = FastVisionModel.from_pretrained( base_model_id, load_in_4bit=True, ) # Load the adapter adapter_id = "saakshigupta/deepfake-explainer-2" model = PeftModel.from_pretrained(model, adapter_id) # Set to inference mode FastVisionModel.for_inference(model) return model, tokenizer except Exception as e: st.error(f"Error loading model: {str(e)}") return None, None # Analyze image function def analyze_image_with_llm(image, gradcam_overlay, face_box, pred_label, confidence, question, model, tokenizer, temperature=0.7, max_tokens=500, custom_instruction=""): # Create a prompt that includes GradCAM information if custom_instruction.strip(): full_prompt = f"{question}\n\nThe image has been processed with GradCAM and classified as {pred_label} with confidence {confidence:.2f}. Focus on the highlighted regions in red/yellow which show the areas the detection model found suspicious.\n\n{custom_instruction}" else: full_prompt = f"{question}\n\nThe image has been processed with GradCAM and classified as {pred_label} with confidence {confidence:.2f}. Focus on the highlighted regions in red/yellow which show the areas the detection model found suspicious." try: # Format the message to include all available images message_content = [{"type": "text", "text": full_prompt}] # Add original image message_content.insert(0, {"type": "image", "image": image}) # Add GradCAM overlay message_content.insert(1, {"type": "image", "image": gradcam_overlay}) # Add comparison image if available if hasattr(st.session_state, 'comparison_image'): message_content.insert(2, {"type": "image", "image": st.session_state.comparison_image}) messages = [{"role": "user", "content": message_content}] # Apply chat template input_text = tokenizer.apply_chat_template(messages, add_generation_prompt=True) # Create list of images to process image_list = [image, gradcam_overlay] if hasattr(st.session_state, 'comparison_image'): image_list.append(st.session_state.comparison_image) try: # Try with multiple images first inputs = tokenizer( image_list, input_text, add_special_tokens=False, return_tensors="pt", ).to(model.device) except Exception as e: st.warning(f"Multiple image analysis encountered an issue: {str(e)}") st.info("Falling back to single image analysis") # Fallback to single image inputs = tokenizer( image, input_text, add_special_tokens=False, return_tensors="pt", ).to(model.device) # Fix cross-attention mask if needed inputs = fix_cross_attention_mask(inputs) # Generate response with st.spinner("Generating detailed analysis... (this may take 15-30 seconds)"): with torch.no_grad(): output_ids = model.generate( **inputs, max_new_tokens=max_tokens, use_cache=True, temperature=temperature, top_p=0.9 ) # Decode the output response = tokenizer.decode(output_ids[0], skip_special_tokens=True) # Try to extract just the model's response (after the prompt) if full_prompt in response: result = response.split(full_prompt)[-1].strip() else: result = response return result except Exception as e: st.error(f"Error during LLM analysis: {str(e)}") return f"Error analyzing image: {str(e)}" # Preprocess image for Xception def preprocess_image_xception(image): """Preprocesses image for Xception model input and face detection.""" face_detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') image_np = np.array(image.convert('RGB')) # Ensure RGB gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY) faces = face_detector.detectMultiScale(gray, 1.1, 5) face_img_for_transform = image # Default to whole image face_box_display = None # For drawing on original image if len(faces) == 0: st.warning("No face detected, using whole image for prediction/CAM.") else: areas = [w * h for (x, y, w, h) in faces] largest_idx = np.argmax(areas) x, y, w, h = faces[largest_idx] padding_x = int(w * 0.05) # Use percentages as in gradcam_xception padding_y = int(h * 0.05) x1, y1 = max(0, x - padding_x), max(0, y - padding_y) x2, y2 = min(image_np.shape[1], x + w + padding_x), min(image_np.shape[0], y + h + padding_y) # Use the padded face region for the model transform face_img_for_transform = Image.fromarray(image_np[y1:y2, x1:x2]) # Use the original detected box (without padding) for display rectangle face_box_display = (x, y, w, h) # Xception specific transform transform = get_xception_transform() # Apply transform to the selected region (face or whole image) input_tensor = transform(face_img_for_transform).unsqueeze(0) # Return tensor, original full image, and the display face box return input_tensor, image, face_box_display # Main app def main(): # Initialize session state variables if 'xception_model_loaded' not in st.session_state: st.session_state.xception_model_loaded = False st.session_state.xception_model = None if 'llm_model_loaded' not in st.session_state: st.session_state.llm_model_loaded = False st.session_state.llm_model = None st.session_state.tokenizer = None if 'blip_model_loaded' not in st.session_state: st.session_state.blip_model_loaded = False st.session_state.original_processor = None st.session_state.original_model = None st.session_state.finetuned_processor = None st.session_state.finetuned_model = None # Initialize chat history if 'chat_history' not in st.session_state: st.session_state.chat_history = [] # Create expanders for each stage with st.expander("Stage 1: Model Loading", expanded=True): st.write("Please load the models using the buttons below:") # Button for loading models xception_col, blip_col, llm_col = st.columns(3) with xception_col: if not st.session_state.xception_model_loaded: if st.button("📥 Load Xception Model for Detection", type="primary"): # Load Xception model model, device = load_detection_model_xception() if model is not None: st.session_state.xception_model = model st.session_state.device = device st.session_state.xception_model_loaded = True st.success("✅ Xception model loaded successfully!") else: st.error("❌ Failed to load Xception model.") else: st.success("✅ Xception model loaded and ready!") with blip_col: if not st.session_state.blip_model_loaded: if st.button("📥 Load BLIP for Captioning", type="primary"): # Load BLIP models original_processor, original_model, finetuned_processor, finetuned_model = load_blip_models() if all([original_processor, original_model, finetuned_processor, finetuned_model]): st.session_state.original_processor = original_processor st.session_state.original_model = original_model st.session_state.finetuned_processor = finetuned_processor st.session_state.finetuned_model = finetuned_model st.session_state.blip_model_loaded = True st.success("✅ BLIP captioning models loaded successfully!") else: st.error("❌ Failed to load BLIP models.") else: st.success("✅ BLIP captioning models loaded and ready!") with llm_col: if not st.session_state.llm_model_loaded: if st.button("📥 Load Vision LLM for Analysis", type="primary"): # Load LLM model model, tokenizer = load_llm_model() if model is not None and tokenizer is not None: st.session_state.llm_model = model st.session_state.tokenizer = tokenizer st.session_state.llm_model_loaded = True st.success("✅ Vision LLM loaded successfully!") else: st.error("❌ Failed to load Vision LLM.") else: st.success("✅ Vision LLM loaded and ready!") # Image upload section with st.expander("Stage 2: Image Upload & Initial Detection", expanded=True): st.subheader("Upload an Image") uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"]) if uploaded_file is not None: try: # Load and display the image (with controlled size) image = Image.open(uploaded_file).convert("RGB") # Display the image with a controlled width col1, col2 = st.columns([1, 2]) with col1: st.image(image, caption="Uploaded Image", width=300) # Generate detailed caption for original image if BLIP model is loaded if st.session_state.blip_model_loaded: with st.spinner("Generating image description..."): caption = generate_image_caption( image, st.session_state.original_processor, st.session_state.original_model ) st.session_state.image_caption = caption # Detect with Xception model if loaded if st.session_state.xception_model_loaded: with st.spinner("Analyzing image with Xception model..."): # Preprocess image for Xception input_tensor, original_image, face_box = preprocess_image_xception(image) # Get device and model device = st.session_state.device model = st.session_state.xception_model # Move tensor to device input_tensor = input_tensor.to(device) # Forward pass with torch.no_grad(): logits = model(input_tensor) probabilities = torch.softmax(logits, dim=1)[0] pred_class = torch.argmax(probabilities).item() confidence = probabilities[pred_class].item() pred_label = "Fake" if pred_class == 0 else "Real" # Check class mapping # Display results with col2: st.markdown("### Detection Result") st.markdown(f"**Classification:** {pred_label} (Confidence: {confidence:.2%})") # Display face box on image if detected if face_box: img_to_show = original_image.copy() img_draw = np.array(img_to_show) x, y, w, h = face_box cv2.rectangle(img_draw, (x, y), (x + w, y + h), (0, 255, 0), 2) st.image(Image.fromarray(img_draw), caption="Detected Face", width=300) # GradCAM visualization st.subheader("GradCAM Visualization") cam, overlay, comparison, detected_face_box = process_image_with_xception_gradcam( image, model, device, pred_class ) if comparison: # Display GradCAM results (controlled size) st.image(comparison, caption="Original | CAM | Overlay", width=700) # Save for later use st.session_state.comparison_image = comparison # Generate caption for GradCAM overlay image if BLIP model is loaded if st.session_state.blip_model_loaded and overlay: with st.spinner("Analyzing GradCAM visualization..."): gradcam_caption = generate_gradcam_caption( overlay, st.session_state.finetuned_processor, st.session_state.finetuned_model ) st.session_state.gradcam_caption = gradcam_caption # Save results in session state for LLM analysis st.session_state.current_image = image st.session_state.current_overlay = overlay st.session_state.current_face_box = detected_face_box st.session_state.current_pred_label = pred_label st.session_state.current_confidence = confidence st.success("✅ Initial detection and GradCAM visualization complete!") else: st.warning("⚠️ Please load the Xception model first to perform initial detection.") except Exception as e: st.error(f"Error processing image: {str(e)}") import traceback st.error(traceback.format_exc()) # This will show the full error traceback # Image Analysis Summary section - AFTER Stage 2 if hasattr(st.session_state, 'current_image') and (hasattr(st.session_state, 'image_caption') or hasattr(st.session_state, 'gradcam_caption')): with st.expander("Image Analysis Summary", expanded=True): # Display images and analysis in organized layout col1, col2 = st.columns([1, 2]) with col1: # Display original image st.image(st.session_state.current_image, caption="Original Image", width=300) # Display GradCAM overlay if hasattr(st.session_state, 'current_overlay'): st.image(st.session_state.current_overlay, caption="GradCAM Visualization", width=300) with col2: # Image description if hasattr(st.session_state, 'image_caption'): st.markdown("### Image Description") st.markdown(st.session_state.image_caption) st.markdown("---") # GradCAM analysis if hasattr(st.session_state, 'gradcam_caption'): st.markdown("### GradCAM Analysis") st.markdown(st.session_state.gradcam_caption) st.markdown("---") # LLM Analysis section - AFTER Image Analysis Summary with st.expander("Stage 3: Detailed Analysis with Vision LLM", expanded=False): if hasattr(st.session_state, 'current_image') and st.session_state.llm_model_loaded: st.subheader("Detailed Deepfake Analysis") # Display chat history for i, (question, answer) in enumerate(st.session_state.chat_history): st.markdown(f"**Question {i+1}:** {question}") st.markdown(f"**Answer:** {answer}") st.markdown("---") # Include both captions in the prompt if available caption_text = "" if hasattr(st.session_state, 'image_caption'): caption_text += f"\n\nImage Description:\n{st.session_state.image_caption}" if hasattr(st.session_state, 'gradcam_caption'): caption_text += f"\n\nGradCAM Analysis:\n{st.session_state.gradcam_caption}" # Default question with option to customize default_question = f"This image has been classified as {{pred_label}}. Analyze all the provided images (original, GradCAM visualization, and comparison) to determine if this is a deepfake. Focus on highlighted areas in the GradCAM visualization. Provide both a technical explanation for experts and a simple explanation for non-technical users." # User input for new question new_question = st.text_area("Ask a question about the image:", value=default_question if not st.session_state.chat_history else "", height=100) # Analyze button and Clear Chat button in the same row col1, col2 = st.columns([3, 1]) with col1: analyze_button = st.button("🔍 Send Question", type="primary") with col2: clear_button = st.button("🗑️ Clear Chat History") if clear_button: st.session_state.chat_history = [] st.experimental_rerun() if analyze_button and new_question: try: # Add caption info if it's the first question if not st.session_state.chat_history: full_question = new_question + caption_text else: full_question = new_question result = analyze_image_with_llm( st.session_state.current_image, st.session_state.current_overlay, st.session_state.current_face_box, st.session_state.current_pred_label, st.session_state.current_confidence, full_question, st.session_state.llm_model, st.session_state.tokenizer, temperature=temperature, max_tokens=max_tokens, custom_instruction=custom_instruction ) # Add to chat history st.session_state.chat_history.append((new_question, result)) # Display the latest result too st.success("✅ Analysis complete!") # Check if the result contains both technical and non-technical explanations if "Technical" in result and "Non-Technical" in result: try: # Split the result into technical and non-technical sections parts = result.split("Non-Technical") technical = parts[0] non_technical = "Non-Technical" + parts[1] # Display in two columns tech_col, simple_col = st.columns(2) with tech_col: st.subheader("Technical Analysis") st.markdown(technical) with simple_col: st.subheader("Simple Explanation") st.markdown(non_technical) except Exception as e: # Fallback if splitting fails st.subheader("Analysis Result") st.markdown(result) else: # Just display the whole result st.subheader("Analysis Result") st.markdown(result) # Rerun to update the chat history display st.experimental_rerun() except Exception as e: st.error(f"Error during LLM analysis: {str(e)}") elif not hasattr(st.session_state, 'current_image'): st.warning("⚠️ Please upload an image and complete the initial detection first.") else: st.warning("⚠️ Please load the Vision LLM to perform detailed analysis.") # Footer st.markdown("---") # Add model version indicator in sidebar st.sidebar.info("Using Xception + deepfake-explainer-2 models") if __name__ == "__main__": main()