|
import streamlit as st |
|
import warnings |
|
import os |
|
import tempfile |
|
|
|
from unsloth import FastVisionModel |
|
|
|
import torch |
|
|
|
torch._dynamo.config.disable = True |
|
|
|
torch._dynamo.config.suppress_errors = True |
|
|
|
from transformers import BlipProcessor, BlipForConditionalGeneration |
|
import torch |
|
import torch.nn as nn |
|
from torch.utils.data import DataLoader |
|
from torchvision import transforms |
|
from PIL import Image |
|
import numpy as np |
|
import io |
|
import base64 |
|
import cv2 |
|
import matplotlib.pyplot as plt |
|
from peft import PeftModel |
|
from gradcam_xception import generate_smoothgrad_visualizations_xception |
|
warnings.filterwarnings("ignore", category=UserWarning) |
|
|
|
|
|
|
|
|
|
st.set_page_config( |
|
page_title="Deepfake Analyzer", |
|
layout="wide", |
|
page_icon="π" |
|
) |
|
|
|
|
|
debug_mode = False |
|
if "debug" not in st.session_state: |
|
st.session_state.debug = debug_mode |
|
|
|
|
|
with st.sidebar: |
|
st.session_state.debug = st.toggle("Enable Debug Mode", value=debug_mode, key="debug_toggle_sidebar") |
|
|
|
|
|
with st.sidebar: |
|
if st.session_state.debug: |
|
st.write("### Connection Diagnostics") |
|
if st.button("Test File Upload Connection"): |
|
try: |
|
|
|
import io |
|
test_file = io.BytesIO(b"test content") |
|
test_file.name = "test.txt" |
|
|
|
|
|
st.write("Checking file upload capability...") |
|
st.write("Status: Testing... If this freezes, there may be connectivity issues.") |
|
|
|
|
|
test_path = "test_upload_capability.txt" |
|
try: |
|
with open(test_path, "w") as f: |
|
f.write("test") |
|
st.write("β
File write test: Success") |
|
import os |
|
os.remove(test_path) |
|
st.write("β
File delete test: Success") |
|
except Exception as e: |
|
st.write(f"β File operation test: Failed - {str(e)}") |
|
|
|
|
|
try: |
|
st.session_state.test_value = "test" |
|
if st.session_state.test_value == "test": |
|
st.write("β
Session state test: Success") |
|
except Exception as e: |
|
st.write(f"β Session state test: Failed - {str(e)}") |
|
|
|
|
|
import os |
|
st.write("### Environment Variables") |
|
for key in ["STREAMLIT_SERVER_ENABLE_CORS", "STREAMLIT_SERVER_ENABLE_XSRF_PROTECTION", |
|
"TEMP", "TMP", "TMPDIR"]: |
|
st.write(f"{key}: {os.environ.get(key, 'Not set')}") |
|
|
|
|
|
hf_vars = [k for k in os.environ if k.startswith("HF_")] |
|
if hf_vars: |
|
st.write("### Hugging Face Environment Variables") |
|
for key in hf_vars: |
|
st.write(f"{key}: {os.environ.get(key, 'Not set')}") |
|
|
|
st.success("Diagnostics completed!") |
|
except Exception as e: |
|
st.error(f"Diagnostics error: {str(e)}") |
|
import traceback |
|
st.error(traceback.format_exc()) |
|
|
|
def log_debug(message): |
|
"""Helper function to log debug messages only when debug mode is enabled""" |
|
if st.session_state.debug: |
|
st.sidebar.write(f"DEBUG: {message}") |
|
|
|
|
|
def check_environment(): |
|
import sys |
|
import platform |
|
|
|
if st.session_state.debug: |
|
st.sidebar.write("### Environment Info") |
|
st.sidebar.write(f"Python version: {sys.version}") |
|
st.sidebar.write(f"Platform: {platform.platform()}") |
|
try: |
|
import torch |
|
st.sidebar.write(f"Torch version: {torch.__version__}") |
|
st.sidebar.write(f"CUDA available: {torch.cuda.is_available()}") |
|
if torch.cuda.is_available(): |
|
st.sidebar.write(f"CUDA version: {torch.version.cuda}") |
|
st.sidebar.write(f"GPU: {torch.cuda.get_device_name(0)}") |
|
except: |
|
st.sidebar.write("Torch not available or error checking") |
|
|
|
|
|
def test_huggingface_hub_access(): |
|
"""Test connectivity to the Hugging Face Hub""" |
|
try: |
|
from huggingface_hub import HfApi |
|
api = HfApi() |
|
|
|
|
|
model_info = api.model_info("openai/clip-vit-base-patch32") |
|
|
|
|
|
st.sidebar.success("β
Hugging Face Hub connectivity: Good") |
|
return True |
|
except Exception as e: |
|
st.sidebar.error(f"β οΈ Hugging Face Hub connectivity issue: {str(e)}") |
|
if st.session_state.debug: |
|
import traceback |
|
st.sidebar.error(traceback.format_exc()) |
|
return False |
|
|
|
|
|
check_environment() |
|
|
|
|
|
if st.session_state.debug: |
|
try: |
|
test_huggingface_hub_access() |
|
except Exception as e: |
|
st.sidebar.error(f"Error testing HuggingFace Hub: {str(e)}") |
|
log_debug(f"HF Hub test error: {str(e)}") |
|
|
|
|
|
st.title("Deepfake Image Analyser") |
|
st.markdown("Analyse images for deepfake manipulation") |
|
|
|
|
|
def check_gpu(): |
|
if torch.cuda.is_available(): |
|
gpu_info = torch.cuda.get_device_properties(0) |
|
st.sidebar.success(f"β
GPU available: {gpu_info.name} ({gpu_info.total_memory / (1024**3):.2f} GB)") |
|
return True |
|
else: |
|
st.sidebar.warning("β οΈ No GPU detected. Analysis will be slower.") |
|
return False |
|
|
|
|
|
st.sidebar.title("Model Controls") |
|
|
|
|
|
with st.sidebar: |
|
st.write("### Load Models") |
|
|
|
|
|
if 'xception_model_loaded' not in st.session_state: |
|
st.session_state.xception_model_loaded = False |
|
st.session_state.xception_model = None |
|
|
|
if not st.session_state.xception_model_loaded: |
|
if st.button("π₯ Load Xception Model", type="primary"): |
|
|
|
try: |
|
from gradcam_xception import load_xception_model |
|
model = load_xception_model() |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
model = model.to(device) |
|
|
|
if model is not None: |
|
st.session_state.xception_model = model |
|
st.session_state.device = device |
|
st.session_state.xception_model_loaded = True |
|
st.success("β
Xception model loaded!") |
|
else: |
|
st.error("β Failed to load Xception model.") |
|
except Exception as e: |
|
st.error(f"Error loading model: {str(e)}") |
|
else: |
|
st.success("β
Xception model loaded") |
|
|
|
|
|
if 'blip_model_loaded' not in st.session_state: |
|
st.session_state.blip_model_loaded = False |
|
st.session_state.original_processor = None |
|
st.session_state.original_model = None |
|
st.session_state.finetuned_processor = None |
|
st.session_state.finetuned_model = None |
|
|
|
if not st.session_state.blip_model_loaded: |
|
if st.button("π₯ Load BLIP Models", type="primary"): |
|
|
|
try: |
|
with st.spinner("Loading BLIP captioning models..."): |
|
|
|
original_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large") |
|
original_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large") |
|
|
|
|
|
finetuned_processor = BlipProcessor.from_pretrained("saakshigupta/gradcam-xception-finetuned") |
|
finetuned_model = BlipForConditionalGeneration.from_pretrained("saakshigupta/gradcam-xception-finetuned") |
|
|
|
if all([original_processor, original_model, finetuned_processor, finetuned_model]): |
|
st.session_state.original_processor = original_processor |
|
st.session_state.original_model = original_model |
|
st.session_state.finetuned_processor = finetuned_processor |
|
st.session_state.finetuned_model = finetuned_model |
|
st.session_state.blip_model_loaded = True |
|
st.success("β
BLIP models loaded!") |
|
else: |
|
st.error("β Failed to load BLIP models.") |
|
except Exception as e: |
|
st.error(f"Error loading BLIP models: {str(e)}") |
|
else: |
|
st.success("β
BLIP models loaded") |
|
|
|
|
|
if 'llm_model_loaded' not in st.session_state: |
|
st.session_state.llm_model_loaded = False |
|
st.session_state.llm_model = None |
|
st.session_state.tokenizer = None |
|
|
|
if not st.session_state.llm_model_loaded: |
|
if st.button("π₯ Load Vision LLM", type="primary"): |
|
|
|
try: |
|
with st.spinner("Loading LLM vision model... This may take a few minutes. Please be patient..."): |
|
|
|
has_gpu = check_gpu() |
|
|
|
|
|
base_model_id = "unsloth/llama-3.2-11b-vision-instruct" |
|
model, tokenizer = FastVisionModel.from_pretrained( |
|
base_model_id, |
|
load_in_4bit=True, |
|
) |
|
|
|
|
|
adapter_id = "saakshigupta/deepfake-explainer-new" |
|
model = PeftModel.from_pretrained(model, adapter_id) |
|
|
|
|
|
FastVisionModel.for_inference(model) |
|
|
|
if model is not None and tokenizer is not None: |
|
st.session_state.llm_model = model |
|
st.session_state.tokenizer = tokenizer |
|
st.session_state.llm_model_loaded = True |
|
st.success("β
Vision LLM loaded!") |
|
else: |
|
st.error("β Failed to load Vision LLM.") |
|
except Exception as e: |
|
st.error(f"Error loading LLM model: {str(e)}") |
|
else: |
|
st.success("β
Vision LLM loaded") |
|
|
|
|
|
|
|
|
|
temperature = 0.7 |
|
max_tokens = 500 |
|
|
|
|
|
custom_instruction = "" |
|
|
|
|
|
class ImageDataset(torch.utils.data.Dataset): |
|
def __init__(self, image, transform=None, face_only=True, dataset_name=None): |
|
self.image = image |
|
self.transform = transform |
|
self.face_only = face_only |
|
self.dataset_name = dataset_name |
|
|
|
self.face_detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') |
|
|
|
def __len__(self): |
|
return 1 |
|
|
|
def detect_face(self, image_np): |
|
"""Detect face in image and return the face region""" |
|
gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY) |
|
faces = self.face_detector.detectMultiScale(gray, 1.1, 5) |
|
|
|
|
|
if len(faces) == 0: |
|
st.info("No face detected, using whole image for analysis") |
|
h, w = image_np.shape[:2] |
|
return (0, 0, w, h), image_np |
|
|
|
|
|
if len(faces) > 1: |
|
|
|
areas = [w*h for (x, y, w, h) in faces] |
|
largest_idx = np.argmax(areas) |
|
x, y, w, h = faces[largest_idx] |
|
else: |
|
x, y, w, h = faces[0] |
|
|
|
|
|
padding_x = int(w * 0.05) |
|
padding_y = int(h * 0.05) |
|
|
|
|
|
x1 = max(0, x - padding_x) |
|
y1 = max(0, y - padding_y) |
|
x2 = min(image_np.shape[1], x + w + padding_x) |
|
y2 = min(image_np.shape[0], y + h + padding_y) |
|
|
|
|
|
face_img = image_np[y1:y2, x1:x2] |
|
|
|
return (x1, y1, x2-x1, y2-y1), face_img |
|
|
|
def __getitem__(self, idx): |
|
image_np = np.array(self.image) |
|
label = 0 |
|
|
|
|
|
original_image = self.image.copy() |
|
|
|
|
|
if self.face_only: |
|
face_box, face_img_np = self.detect_face(image_np) |
|
face_img = Image.fromarray(face_img_np) |
|
|
|
|
|
IMAGE_SIZE = 299 |
|
if self.transform: |
|
face_tensor = self.transform(face_img) |
|
else: |
|
|
|
transform = transforms.Compose([ |
|
transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)), |
|
transforms.ToTensor(), |
|
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]), |
|
]) |
|
face_tensor = transform(face_img) |
|
|
|
return face_tensor, label, "uploaded_image", original_image, face_box, self.dataset_name |
|
else: |
|
|
|
IMAGE_SIZE = 299 |
|
if self.transform: |
|
image_tensor = self.transform(self.image) |
|
else: |
|
|
|
transform = transforms.Compose([ |
|
transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)), |
|
transforms.ToTensor(), |
|
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]), |
|
]) |
|
image_tensor = transform(self.image) |
|
|
|
return image_tensor, label, "uploaded_image", original_image, None, self.dataset_name |
|
|
|
|
|
def process_image_with_xception_gradcam(image, model, device, pred_class): |
|
"""Process an image with Xception GradCAM""" |
|
cam_results = generate_smoothgrad_visualizations_xception( |
|
model=model, |
|
image=image, |
|
target_class=pred_class, |
|
face_only=True, |
|
num_samples=5 |
|
) |
|
|
|
if cam_results and len(cam_results) == 4: |
|
raw_cam, cam_img, overlay, comparison = cam_results |
|
|
|
|
|
IMAGE_SIZE = 299 |
|
transform = transforms.Compose([ |
|
transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)), |
|
transforms.ToTensor(), |
|
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]), |
|
]) |
|
dataset = ImageDataset(image, transform=transform, face_only=True) |
|
_, _, _, _, face_box, _ = dataset[0] |
|
|
|
return raw_cam, overlay, comparison, face_box |
|
else: |
|
st.error("Failed to generate GradCAM visualization") |
|
return None, None, None, None |
|
|
|
|
|
@st.cache_resource |
|
def load_detection_model_xception(): |
|
"""Loads the Xception model from HF Hub.""" |
|
with st.spinner("Loading Xception model for deepfake detection..."): |
|
try: |
|
log_debug("Beginning Xception model loading") |
|
from gradcam_xception import load_xception_model |
|
log_debug("Loading Xception model (this may take a moment)...") |
|
model = load_xception_model() |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
log_debug(f"Using device: {device}") |
|
|
|
model.to(device) |
|
model.eval() |
|
log_debug(f"Xception model loaded to {device}.") |
|
return model, device |
|
except ImportError as e: |
|
st.error(f"Import Error: {str(e)}. Make sure gradcam_xception.py is present.") |
|
log_debug("Import error with gradcam_xception.py module") |
|
return None, None |
|
except Exception as e: |
|
st.error(f"Error loading Xception model: {str(e)}") |
|
import traceback |
|
error_details = traceback.format_exc() |
|
if st.session_state.debug: |
|
st.error(error_details) |
|
log_debug(f"Error details: {error_details}") |
|
return None, None |
|
|
|
|
|
|
|
|
|
def generate_gradcam_caption(image, processor, model, max_length=60): |
|
""" |
|
Generate a detailed analysis of GradCAM visualization using the fine-tuned BLIP model |
|
""" |
|
try: |
|
|
|
inputs = processor(image, return_tensors="pt") |
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
model = model.to(device) |
|
inputs = {k: v.to(device) if hasattr(v, 'to') else v for k, v in inputs.items()} |
|
|
|
|
|
with torch.no_grad(): |
|
output = model.generate(**inputs, max_length=max_length, num_beams=5) |
|
|
|
|
|
caption = processor.decode(output[0], skip_special_tokens=True) |
|
|
|
|
|
try: |
|
|
|
formatted_text = "" |
|
if "high activation :" in caption: |
|
high_match = caption.split("high activation :")[1].split("moderate")[0] |
|
formatted_text += f"**High activation**:\n{high_match.strip()}\n\n" |
|
|
|
if "moderate activation :" in caption: |
|
moderate_match = caption.split("moderate activation :")[1].split("low")[0] |
|
formatted_text += f"**Moderate activation**:\n{moderate_match.strip()}\n\n" |
|
|
|
if "low activation :" in caption: |
|
low_match = caption.split("low activation :")[1] |
|
formatted_text += f"**Low activation**:\n{low_match.strip()}" |
|
|
|
|
|
if not formatted_text.strip(): |
|
|
|
if ":" in caption: |
|
parts = caption.split(":") |
|
if len(parts) > 1: |
|
formatted_text = f"**GradCAM Analysis**:\n{parts[1].strip()}" |
|
else: |
|
|
|
formatted_text = f"**GradCAM Analysis**:\n{caption.strip()}" |
|
except Exception as parsing_error: |
|
|
|
formatted_text = f"**GradCAM Analysis**:\n{caption.strip()}" |
|
|
|
return formatted_text.strip() |
|
|
|
except Exception as e: |
|
st.error(f"Error analyzing GradCAM: {str(e)}") |
|
import traceback |
|
st.error(traceback.format_exc()) |
|
return "Error analyzing GradCAM visualization" |
|
|
|
|
|
def generate_image_caption(image, processor, model, max_length=75, num_beams=5): |
|
"""Generate a caption for the original image using the original BLIP model""" |
|
try: |
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
model = model.to(device) |
|
|
|
|
|
inputs = processor(image, return_tensors="pt").to(device) |
|
|
|
|
|
with torch.no_grad(): |
|
output = model.generate(**inputs, max_length=max_length, num_beams=num_beams) |
|
|
|
|
|
caption = processor.decode(output[0], skip_special_tokens=True) |
|
|
|
|
|
structured_caption = f""" |
|
**Subject**: The image shows a person in a photograph. |
|
|
|
**Appearance**: {caption} |
|
|
|
**Background**: The background appears to be a controlled environment. |
|
|
|
**Lighting**: The lighting appears to be professional with even illumination. |
|
|
|
**Colors**: The image contains natural skin tones and colors typical of photography. |
|
|
|
**Notable Elements**: The facial features and expression are the central focus of the image. |
|
""" |
|
return structured_caption.strip() |
|
|
|
except Exception as e: |
|
st.error(f"Error generating caption: {str(e)}") |
|
return "Error generating caption" |
|
|
|
|
|
|
|
|
|
def fix_cross_attention_mask(inputs): |
|
if 'cross_attention_mask' in inputs and 0 in inputs['cross_attention_mask'].shape: |
|
batch_size, seq_len, _, num_tiles = inputs['cross_attention_mask'].shape |
|
visual_features = 6404 |
|
new_mask = torch.ones((batch_size, seq_len, visual_features, num_tiles), |
|
device=inputs['cross_attention_mask'].device) |
|
inputs['cross_attention_mask'] = new_mask |
|
return inputs |
|
|
|
|
|
def analyze_image_with_llm(image, gradcam_overlay, face_box, pred_label, confidence, question, model, tokenizer, temperature=0.7, max_tokens=500, custom_instruction=""): |
|
|
|
if custom_instruction.strip(): |
|
full_prompt = f"{question}\n\nThe image has been processed with GradCAM and classified as {pred_label} with confidence {confidence:.2f}. Focus on the highlighted regions in red/yellow which show the areas the detection model found suspicious.\n\n{custom_instruction}" |
|
else: |
|
full_prompt = f"{question}\n\nThe image has been processed with GradCAM and classified as {pred_label} with confidence {confidence:.2f}. Focus on the highlighted regions in red/yellow which show the areas the detection model found suspicious." |
|
|
|
try: |
|
|
|
message_content = [{"type": "text", "text": full_prompt}] |
|
|
|
|
|
message_content.insert(0, {"type": "image", "image": image}) |
|
|
|
|
|
message_content.insert(1, {"type": "image", "image": gradcam_overlay}) |
|
|
|
|
|
if hasattr(st.session_state, 'comparison_image'): |
|
message_content.insert(2, {"type": "image", "image": st.session_state.comparison_image}) |
|
|
|
messages = [{"role": "user", "content": message_content}] |
|
|
|
|
|
input_text = tokenizer.apply_chat_template(messages, add_generation_prompt=True) |
|
|
|
|
|
image_list = [image, gradcam_overlay] |
|
if hasattr(st.session_state, 'comparison_image'): |
|
image_list.append(st.session_state.comparison_image) |
|
|
|
try: |
|
|
|
inputs = tokenizer( |
|
image_list, |
|
input_text, |
|
add_special_tokens=False, |
|
return_tensors="pt", |
|
).to(model.device) |
|
except Exception as e: |
|
st.warning(f"Multiple image analysis encountered an issue: {str(e)}") |
|
st.info("Falling back to single image analysis") |
|
|
|
inputs = tokenizer( |
|
image, |
|
input_text, |
|
add_special_tokens=False, |
|
return_tensors="pt", |
|
).to(model.device) |
|
|
|
|
|
inputs = fix_cross_attention_mask(inputs) |
|
|
|
|
|
with st.spinner("Generating detailed analysis... (this may take 15-30 seconds)"): |
|
with torch.no_grad(): |
|
output_ids = model.generate( |
|
**inputs, |
|
max_new_tokens=max_tokens, |
|
use_cache=True, |
|
temperature=temperature, |
|
top_p=0.9 |
|
) |
|
|
|
|
|
response = tokenizer.decode(output_ids[0], skip_special_tokens=True) |
|
|
|
|
|
if full_prompt in response: |
|
result = response.split(full_prompt)[-1].strip() |
|
else: |
|
result = response |
|
|
|
return result |
|
|
|
except Exception as e: |
|
st.error(f"Error during LLM analysis: {str(e)}") |
|
|
|
|
|
try: |
|
st.info("Attempting fallback with simplified input...") |
|
|
|
|
|
simple_message = [{"role": "user", "content": [ |
|
{"type": "text", "text": "Analyze this image and tell if it's a deepfake."}, |
|
{"type": "image", "image": image} |
|
]}] |
|
|
|
|
|
simple_text = tokenizer.apply_chat_template(simple_message, add_generation_prompt=True) |
|
|
|
|
|
with torch.no_grad(): |
|
simple_inputs = tokenizer( |
|
image, |
|
simple_text, |
|
add_special_tokens=False, |
|
return_tensors="pt", |
|
).to(model.device) |
|
|
|
simple_inputs = fix_cross_attention_mask(simple_inputs) |
|
|
|
output_ids = model.generate( |
|
**simple_inputs, |
|
max_new_tokens=200, |
|
use_cache=True, |
|
temperature=0.5, |
|
top_p=0.9 |
|
) |
|
|
|
|
|
fallback_response = tokenizer.decode(output_ids[0], skip_special_tokens=True) |
|
return "Error with primary analysis. Fallback result: " + fallback_response.split("Analyze this image and tell if it's a deepfake.")[-1].strip() |
|
except Exception as fallback_error: |
|
return f"Error analyzing image: {str(fallback_error)}" |
|
|
|
|
|
def preprocess_image_xception(image): |
|
"""Preprocesses image for Xception model input and face detection.""" |
|
try: |
|
log_debug("Starting image preprocessing for Xception model") |
|
face_detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') |
|
|
|
|
|
if image is None: |
|
log_debug("Image is None - this should never happen!") |
|
return None, None, None |
|
|
|
|
|
image_np = np.array(image.convert('RGB')) |
|
|
|
|
|
gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY) |
|
faces = face_detector.detectMultiScale(gray, 1.1, 5) |
|
|
|
face_img_for_transform = image |
|
face_box_display = None |
|
|
|
if len(faces) == 0: |
|
log_debug("No face detected in the image, using whole image") |
|
st.warning("No face detected, using whole image for prediction/CAM.") |
|
else: |
|
log_debug(f"Detected {len(faces)} faces in the image") |
|
areas = [w * h for (x, y, w, h) in faces] |
|
largest_idx = np.argmax(areas) |
|
x, y, w, h = faces[largest_idx] |
|
|
|
padding_x = int(w * 0.05) |
|
padding_y = int(h * 0.05) |
|
x1, y1 = max(0, x - padding_x), max(0, y - padding_y) |
|
x2, y2 = min(image_np.shape[1], x + w + padding_x), min(image_np.shape[0], y + h + padding_y) |
|
|
|
|
|
face_img_for_transform = Image.fromarray(image_np[y1:y2, x1:x2]) |
|
|
|
face_box_display = (x, y, w, h) |
|
log_debug(f"Face detected: Box {face_box_display}") |
|
|
|
|
|
IMAGE_SIZE = 299 |
|
transform = transforms.Compose([ |
|
transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)), |
|
transforms.ToTensor(), |
|
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]), |
|
]) |
|
|
|
input_tensor = transform(face_img_for_transform).unsqueeze(0) |
|
|
|
|
|
return input_tensor, image, face_box_display |
|
|
|
except Exception as e: |
|
st.error(f"Error in preprocessing image: {str(e)}") |
|
import traceback |
|
log_debug(f"Preprocessing error details: {traceback.format_exc()}") |
|
|
|
|
|
return None, None, None |
|
|
|
|
|
def main(): |
|
|
|
if 'xception_model_loaded' not in st.session_state: |
|
st.session_state.xception_model_loaded = False |
|
st.session_state.xception_model = None |
|
|
|
if 'llm_model_loaded' not in st.session_state: |
|
st.session_state.llm_model_loaded = False |
|
st.session_state.llm_model = None |
|
st.session_state.tokenizer = None |
|
|
|
if 'blip_model_loaded' not in st.session_state: |
|
st.session_state.blip_model_loaded = False |
|
st.session_state.original_processor = None |
|
st.session_state.original_model = None |
|
st.session_state.finetuned_processor = None |
|
st.session_state.finetuned_model = None |
|
|
|
|
|
if 'chat_history' not in st.session_state: |
|
st.session_state.chat_history = [] |
|
|
|
|
|
tab1, tab2, tab3 = st.tabs(["Deepfake Detection", "Image Captions", "LLM Analysis"]) |
|
|
|
|
|
with tab1: |
|
st.header("Deepfake Detection") |
|
|
|
|
|
st.subheader("Upload an Image") |
|
|
|
|
|
upload_tab1, upload_tab2 = st.tabs(["File Upload", "URL Input"]) |
|
|
|
uploaded_image = None |
|
|
|
with upload_tab1: |
|
uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"]) |
|
if uploaded_file is not None: |
|
try: |
|
|
|
image = Image.open(uploaded_file).convert("RGB") |
|
uploaded_image = image |
|
st.session_state.upload_method = "file" |
|
except Exception as e: |
|
st.error(f"Error loading image: {str(e)}") |
|
import traceback |
|
st.error(traceback.format_exc()) |
|
|
|
with upload_tab2: |
|
url = st.text_input("Enter image URL:") |
|
if url and url.strip(): |
|
try: |
|
import requests |
|
|
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
'Accept': 'image/jpeg, image/png, image/*, */*', |
|
'Referer': 'https://huggingface.co/' |
|
} |
|
|
|
|
|
try_methods = True |
|
|
|
|
|
if try_methods: |
|
try: |
|
response = requests.get(url, stream=True, headers=headers, timeout=10) |
|
if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''): |
|
try: |
|
image = Image.open(io.BytesIO(response.content)).convert("RGB") |
|
uploaded_image = image |
|
st.session_state.upload_method = "url_direct" |
|
try_methods = False |
|
st.success("β
Image loaded via direct request") |
|
except Exception as e: |
|
st.warning(f"Direct method received data but couldn't process as image: {str(e)}") |
|
else: |
|
st.info(f"Direct method failed: Status {response.status_code}, trying alternative method...") |
|
except Exception as e: |
|
st.info(f"Direct method error: {str(e)}, trying alternative method...") |
|
|
|
|
|
if try_methods: |
|
try: |
|
import urllib.request |
|
from urllib.error import HTTPError |
|
|
|
opener = urllib.request.build_opener() |
|
opener.addheaders = [('User-agent', headers['User-Agent'])] |
|
urllib.request.install_opener(opener) |
|
|
|
with urllib.request.urlopen(url, timeout=10) as response: |
|
image_data = response.read() |
|
image = Image.open(io.BytesIO(image_data)).convert("RGB") |
|
uploaded_image = image |
|
st.session_state.upload_method = "url_urllib" |
|
try_methods = False |
|
st.success("β
Image loaded via urllib") |
|
except HTTPError as e: |
|
st.info(f"urllib method failed: HTTP error {e.code}, trying next method...") |
|
except Exception as e: |
|
st.info(f"urllib method error: {str(e)}, trying next method...") |
|
|
|
|
|
if try_methods: |
|
try: |
|
|
|
|
|
proxy_url = f"https://images.weserv.nl/?url={url}" |
|
response = requests.get(proxy_url, stream=True, timeout=10) |
|
if response.status_code == 200: |
|
image = Image.open(io.BytesIO(response.content)).convert("RGB") |
|
uploaded_image = image |
|
st.session_state.upload_method = "url_proxy" |
|
try_methods = False |
|
st.success("β
Image loaded via proxy service") |
|
else: |
|
st.error(f"All methods failed to load the image from URL. Last status: {response.status_code}") |
|
except Exception as e: |
|
st.error(f"All methods failed. Final error: {str(e)}") |
|
|
|
if not uploaded_image: |
|
st.error("Failed to load image using all available methods.") |
|
|
|
except Exception as e: |
|
st.error(f"Error processing URL: {str(e)}") |
|
if st.session_state.debug: |
|
import traceback |
|
st.error(traceback.format_exc()) |
|
|
|
|
|
if uploaded_image is not None: |
|
|
|
image = uploaded_image |
|
col1, col2 = st.columns([1, 2]) |
|
with col1: |
|
st.image(image, caption="Uploaded Image", width=300) |
|
|
|
|
|
if st.session_state.xception_model_loaded: |
|
try: |
|
with st.spinner("Analyzing image with Xception model..."): |
|
|
|
input_tensor, original_image, face_box = preprocess_image_xception(image) |
|
|
|
if input_tensor is None: |
|
st.error("Failed to preprocess image. Please try another image.") |
|
st.stop() |
|
|
|
|
|
device = st.session_state.device |
|
model = st.session_state.xception_model |
|
|
|
|
|
model = model.to(device) |
|
model.eval() |
|
|
|
|
|
input_tensor = input_tensor.to(device) |
|
|
|
|
|
try: |
|
with torch.no_grad(): |
|
logits = model(input_tensor) |
|
probabilities = torch.softmax(logits, dim=1)[0] |
|
pred_class = torch.argmax(probabilities).item() |
|
confidence = probabilities[pred_class].item() |
|
|
|
|
|
pred_label = "Real" if pred_class == 0 else "Fake" |
|
except Exception as e: |
|
st.error(f"Error in model inference: {str(e)}") |
|
import traceback |
|
st.error(traceback.format_exc()) |
|
|
|
pred_class = 0 |
|
confidence = 0.5 |
|
pred_label = "Error in prediction" |
|
|
|
|
|
with col2: |
|
st.markdown("### Detection Result") |
|
st.markdown(f"**Classification:** {pred_label} (Confidence: {confidence:.2%})") |
|
|
|
|
|
st.subheader("GradCAM Visualization") |
|
try: |
|
cam, overlay, comparison, detected_face_box = process_image_with_xception_gradcam( |
|
image, model.to(device), device, pred_class |
|
) |
|
|
|
if comparison: |
|
|
|
st.image(comparison, caption="Original | CAM | Overlay", width=700) |
|
|
|
|
|
st.session_state.comparison_image = comparison |
|
else: |
|
st.error("GradCAM visualization failed - comparison image not generated") |
|
|
|
|
|
if st.session_state.blip_model_loaded and overlay: |
|
with st.spinner("Analyzing GradCAM visualization..."): |
|
gradcam_caption = generate_gradcam_caption( |
|
overlay, |
|
st.session_state.finetuned_processor, |
|
st.session_state.finetuned_model |
|
) |
|
st.session_state.gradcam_caption = gradcam_caption |
|
|
|
|
|
|
|
except Exception as e: |
|
st.error(f"Error generating GradCAM: {str(e)}") |
|
import traceback |
|
st.error(traceback.format_exc()) |
|
|
|
|
|
st.session_state.current_image = image |
|
st.session_state.current_overlay = overlay if 'overlay' in locals() else None |
|
st.session_state.current_face_box = detected_face_box if 'detected_face_box' in locals() else None |
|
st.session_state.current_pred_label = pred_label |
|
st.session_state.current_confidence = confidence |
|
|
|
st.success("β
Initial detection and GradCAM visualization complete!") |
|
except Exception as e: |
|
st.error(f"Overall error in Xception processing: {str(e)}") |
|
import traceback |
|
st.error(traceback.format_exc()) |
|
else: |
|
st.warning("β οΈ Please load the Xception model from the sidebar first.") |
|
|
|
|
|
with tab2: |
|
st.header("Image Captions") |
|
|
|
|
|
if hasattr(st.session_state, 'current_image'): |
|
col1, col2 = st.columns([1, 2]) |
|
|
|
with col1: |
|
st.image(st.session_state.current_image, caption="Original Image", width=300) |
|
|
|
if hasattr(st.session_state, 'current_overlay'): |
|
st.image(st.session_state.current_overlay, caption="GradCAM Visualization", width=300) |
|
|
|
with col2: |
|
if not st.session_state.blip_model_loaded: |
|
st.warning("β οΈ Please load the BLIP models from the sidebar first.") |
|
else: |
|
|
|
if not hasattr(st.session_state, 'image_caption') or st.button("Regenerate Image Caption"): |
|
with st.spinner("Generating image description..."): |
|
caption = generate_image_caption( |
|
st.session_state.current_image, |
|
st.session_state.original_processor, |
|
st.session_state.original_model |
|
) |
|
st.session_state.image_caption = caption |
|
|
|
|
|
if hasattr(st.session_state, 'image_caption'): |
|
st.markdown("### Image Description") |
|
st.markdown(st.session_state.image_caption) |
|
st.markdown("---") |
|
|
|
|
|
if hasattr(st.session_state, 'gradcam_caption'): |
|
st.markdown("### GradCAM Analysis") |
|
st.markdown(st.session_state.gradcam_caption) |
|
|
|
|
|
if hasattr(st.session_state, 'current_overlay') and st.button("Regenerate GradCAM Caption"): |
|
with st.spinner("Reanalyzing GradCAM visualization..."): |
|
gradcam_caption = generate_gradcam_caption( |
|
st.session_state.current_overlay, |
|
st.session_state.finetuned_processor, |
|
st.session_state.finetuned_model |
|
) |
|
st.session_state.gradcam_caption = gradcam_caption |
|
st.rerun() |
|
else: |
|
if hasattr(st.session_state, 'current_overlay'): |
|
if st.button("Generate GradCAM Caption"): |
|
with st.spinner("Analyzing GradCAM visualization..."): |
|
gradcam_caption = generate_gradcam_caption( |
|
st.session_state.current_overlay, |
|
st.session_state.finetuned_processor, |
|
st.session_state.finetuned_model |
|
) |
|
st.session_state.gradcam_caption = gradcam_caption |
|
st.rerun() |
|
else: |
|
st.info("GradCAM visualization not available. Visit the Detection tab to generate it.") |
|
else: |
|
st.info("Please upload and analyze an image in the Detection tab first.") |
|
|
|
|
|
with tab3: |
|
st.header("LLM Analysis") |
|
|
|
|
|
if hasattr(st.session_state, 'current_image') and st.session_state.llm_model_loaded: |
|
st.subheader("Deepfake Analysis Chat") |
|
|
|
|
|
col_images, col_chat = st.columns([1, 3]) |
|
|
|
with col_images: |
|
st.write("#### Reference Images") |
|
st.image(st.session_state.current_image, caption="Original", use_container_width=True) |
|
|
|
if hasattr(st.session_state, 'current_overlay'): |
|
st.image(st.session_state.current_overlay, caption="GradCAM", use_container_width=True) |
|
|
|
if hasattr(st.session_state, 'comparison_image'): |
|
st.image(st.session_state.comparison_image, caption="Comparison", use_container_width=True) |
|
|
|
if hasattr(st.session_state, 'current_pred_label'): |
|
st.info(f"**Classification:** {st.session_state.current_pred_label} (Confidence: {st.session_state.current_confidence:.2%})") |
|
|
|
with col_chat: |
|
|
|
for i, (question, answer) in enumerate(st.session_state.chat_history): |
|
st.markdown(f"**Question {i+1}:** {question}") |
|
st.markdown(f"**Answer:** {answer}") |
|
st.markdown("---") |
|
|
|
|
|
use_custom_instructions = st.toggle("Enable Custom Instructions", key="llm_custom_instructions", value=False) |
|
if use_custom_instructions: |
|
custom_instruction = st.text_area( |
|
"Custom Instructions (Advanced)", |
|
value="Specify your preferred style of explanation (e.g., 'Provide technical, detailed explanations' or 'Use simple, non-technical language'). You can also specify what aspects of the image to focus on.", |
|
help="Add specific instructions for the analysis" |
|
) |
|
else: |
|
custom_instruction = "" |
|
|
|
|
|
caption_text = "" |
|
if hasattr(st.session_state, 'image_caption'): |
|
caption_text += f"\n\nImage Description:\n{st.session_state.image_caption}" |
|
|
|
if hasattr(st.session_state, 'gradcam_caption'): |
|
caption_text += f"\n\nGradCAM Analysis:\n{st.session_state.gradcam_caption}" |
|
|
|
|
|
default_question = f"Ask your question about this image..." |
|
|
|
|
|
new_question = st.text_area("Ask a question about the image:", value=default_question if not st.session_state.chat_history else "", height=100) |
|
|
|
|
|
col1, col2 = st.columns([3, 1]) |
|
with col1: |
|
analyze_button = st.button("π Send Question", type="primary") |
|
with col2: |
|
clear_button = st.button("ποΈ Clear Chat History") |
|
|
|
if clear_button: |
|
st.session_state.chat_history = [] |
|
st.rerun() |
|
|
|
if analyze_button and new_question: |
|
try: |
|
|
|
if not st.session_state.chat_history: |
|
full_question = new_question + caption_text |
|
else: |
|
full_question = new_question |
|
|
|
result = analyze_image_with_llm( |
|
st.session_state.current_image, |
|
st.session_state.current_overlay, |
|
st.session_state.current_face_box, |
|
st.session_state.current_pred_label, |
|
st.session_state.current_confidence, |
|
full_question, |
|
st.session_state.llm_model, |
|
st.session_state.tokenizer, |
|
temperature=temperature, |
|
max_tokens=max_tokens, |
|
custom_instruction=custom_instruction |
|
) |
|
|
|
|
|
st.session_state.chat_history.append((new_question, result)) |
|
|
|
|
|
st.success("β
Analysis complete!") |
|
|
|
|
|
if "Technical" in result and "Non-Technical" in result: |
|
try: |
|
|
|
parts = result.split("Non-Technical") |
|
technical = parts[0] |
|
non_technical = "Non-Technical" + parts[1] |
|
|
|
|
|
tech_col, simple_col = st.columns(2) |
|
with tech_col: |
|
st.subheader("Technical Analysis") |
|
st.markdown(technical) |
|
|
|
with simple_col: |
|
st.subheader("Simple Explanation") |
|
st.markdown(non_technical) |
|
except Exception as e: |
|
|
|
st.subheader("Analysis Result") |
|
st.markdown(result) |
|
else: |
|
|
|
st.subheader("Analysis Result") |
|
st.markdown(result) |
|
|
|
|
|
st.rerun() |
|
|
|
except Exception as e: |
|
st.error(f"Error during LLM analysis: {str(e)}") |
|
else: |
|
if not hasattr(st.session_state, 'current_image'): |
|
st.warning("β οΈ Please upload an image in the Detection tab first.") |
|
else: |
|
st.warning("β οΈ Please load the Vision LLM from the sidebar to perform detailed analysis.") |
|
|
|
|
|
st.markdown("---") |
|
|
|
if __name__ == "__main__": |
|
main() |